Untitled
2 months ago in Plain Text
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.linear_model import LinearRegression
import mlflow
import sys
sys.path.insert(1, '../')
from greenplum import Greenplum
from global_config import cfg as global_config
from inflow.inflow_sql import sql
from inflow.inflow_config import inflow
import warnings
warnings.filterwarnings('ignore')
import tempfile
import os
class AutoCalibrator:
def __init__(self, method):
'''
Конструктор класса, в котором происходит
инициализация необходимых переменных.
'''
self.method = method
self.func = ['Linear', 'Ln', 'Power', 'Exp']
self.func_res = {}
self.best_func = None
self.sql_a = None
self.sql_b = None
self.sql_stage_2 = None
# self.mlflow_uri = "http://0.0.0.0:8999"
# mlflow.set_tracking_uri(self.mlflow_uri)
# mlflow.set_experiment('Autocalibration')
# mlflow.set_tracking_uri(self.mlflow_uri)
def initialize_greenplum(self, stage):
'''
Метод подключения к GP для выгрузки таблицы.
'''
formatted_sql, path = self.prepare_sql(stage)
GP = Greenplum(formatted_sql, path, inflow.table_name, global_config.omega_login)
GP.start()
def prepare_sql(self, stage):
'''
Метод подготовки SQL-скрипта в зависимости от стадии калибровки.
'''
if stage == 1:
script = sql.stage_1.format(table_name = inflow.table_name,
agreement_gen = inflow.agreement_gen,
report_gen = inflow.report_dt,
mob = inflow.mob_col,
score_col = inflow.score_col,
target = inflow.target_col,
debt = inflow.debt_col,
amount = inflow.amount_col,
fact_table = inflow.fact_table,
score_table = global_config.score_table,
appl_id = inflow.fact_appl_id_col,
appl_id_ = inflow.score_appl_id_col
)
path = inflow.path+global_config.path_1
else:
script = sql.stage_2.format(table_name = inflow.table_name,
agreement_gen = inflow.agreement_gen,
report_gen = inflow.report_dt,
mob = inflow.mob_col,
score_col = inflow.score_col,
target = inflow.target_col,
debt = inflow.debt_col,
amount = inflow.amount_col,
new_score = self.sql_stage_2,
fact_table = inflow.fact_table,
score_table = global_config.score_table,
appl_id = inflow.fact_appl_id_col,
appl_id_ = inflow.score_appl_id_col
)
path = inflow.path+global_config.path_2
return script, path
def calibrate(self):
'''
Метод запуска всех необходимых стадий калибровки.
'''
self.initialize_greenplum(1)
self.df = pd.read_csv(inflow.path+global_config.path_1)
self.stage_1()
self.stage_2()
self.initialize_greenplum(2)
self.df = pd.read_csv(inflow.path+global_config.path_2)
self.stage_final()
def stage_1(self):
'''
Метод первой стадии калибровки, в которой зависимости от метода
(что мы фиксируем: квантиль или моб) производится группировка таблицы
и её передача в методы построения калибровочных функций и подбора
коэффициентов на каждый моб/квантиль.
'''
self.df = self.df[
(self.df['agreement_gen'] >= inflow.start_aggr_dt) &
(self.df['agreement_gen'] <= inflow.end_aggr_dt) &
(~self.df['agreement_gen'].isin(inflow.aggrs_to_exclude)) &
(self.df['report_gen'] >= inflow.start_report_dt) &
(self.df['report_gen'] <= inflow.end_report_dt) &
(~self.df['report_gen'].isin(inflow.reps_to_exclude))
]
if self.method == 'mob':
for func in self.func:
func_res = pd.DataFrame()
for quantile in range(1, self.df['quantile'].max()+1):
df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile)
r2, a, b = eval('self.'+func)(mob, fact)
df_iter_row = pd.DataFrame({
'quantile' : quantile,
'mean_model' : model.mean(),
'r2' : r2,
'a' : a,
'b': b
}, index = [quantile])
func_res = pd.concat([func_res, df_iter_row], axis = 0)
self.func_res[func] = func_res
else:
for func in self.func:
func_res = pd.DataFrame()
for mob in range(1, self.df['mob'].max()+1):
df, quantile, model, fact = self.preprocess(self.df, method = self.method, mob = mob)
r2, a, b = eval('self.'+func)(model, fact)
df_iter_row = pd.DataFrame({
'mob' : mob,
'r2' : r2,
'a' : a,
'b': b
}, index = [mob])
func_res = pd.concat([func_res, df_iter_row], axis = 0)
self.func_res[func] = func_res
def stage_2(self):
'''
Метод второй стадии калибровки, в которой зависимости от метода
(что мы фиксируем: квантиль или моб) производится аппроксимация
полученных коэффициентов на первой стадии.
'''
best_func_key = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean())
calibration_data = self.func_res[best_func_key]
results_a = self.evaluate_model(calibration_data, 'a')
results_b = self.evaluate_model(calibration_data, 'b')
best_func_a, best_a, best_b_a = self.find_best_model(results_a)
best_func_b, best_a_b, best_b = self.find_best_model(results_b)
if self.method == 'mob':
calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mean_model'].to_numpy(), best_a, best_b_a, sql=True)
calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mean_model'].to_numpy(), best_a_b, best_b, sql=True)
else:
calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mob'].to_numpy(), best_a, best_b_a, sql=True)
calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mob'].to_numpy(), best_a_b, best_b, sql=True)
self.plot_results(calibration_data.iloc[:, 0], calibration_data['a'], calibration_data['a_model'], f"Coef A = {self.sql_a}")
self.plot_results(calibration_data.iloc[:, 0], calibration_data['b'], calibration_data['b_model'], f"Coef B = {self.sql_b}")
self.sql_stage_2 = self.get_sql(best_func_key, self.sql_a, self.sql_b)
def stage_final(self):
'''
Финальный метод, в котором после получения необходимой
калибровочной функции от моба и скора производится подключение
к GP и выгрузка итоговой таблицы, а также вывод результатов
в разрезе мобов, квантилей, отчётных дат и дат выдач.
'''
self.df = self.df[
(self.df['agreement_gen'] >= inflow.start_aggr_dt) &
(self.df['agreement_gen'] <= inflow.end_aggr_dt) &
(~self.df['agreement_gen'].isin(inflow.aggrs_to_exclude)) &
(self.df['report_gen'] >= inflow.start_report_dt) &
(self.df['report_gen'] <= inflow.end_report_dt) &
(~self.df['report_gen'].isin(inflow.reps_to_exclude))
]
for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']:
df, x, model, fact = self.preprocess(self.df, groupby_col = col)
self.plot_results(x, fact, model, col.upper())
df = self.df.copy()
self.pivot_plot(df, ['quantile', 'mob'])
self.pivot_plot(df, ['year', 'quarter', 'mob'])
def pivot_plot(self, df, group):
df['agreement_gen'] = pd.to_datetime(df['agreement_gen'])
df['year'] = df['agreement_gen'].dt.year
df['quarter'] = df['agreement_gen'].dt.quarter
pivot_df = df.pivot_table(
values=['debt', 'total_debt', 'score', 'cnt'],
index=group,
aggfunc = np.sum
)
pivot_df['fact'] = pivot_df['debt']/pivot_df['total_debt']
pivot_df['model'] = pivot_df['score']/pivot_df['total_debt']
pivot_df = pivot_df[['fact', 'model']]
fig, ax = plt.subplots(figsize=(20, 15))
pivot_df.plot(ax = ax)
plt.xticks(rotation=60)
plt.legend()
plt.savefig(f"{inflow.path}plots/{'_'.join(group)}.png")
plt.show()
def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None):
'''
Метод группировки калибровочной таблицы в зависимости от метода и стадии калибровки.
'''
if method == 'quantile':
df_calib = df_calib[df_calib['mob'] == mob]
df_calib = df_calib.groupby([method]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
quantile = np.array(df_calib['quantile'])
model = np.array(df_calib['score']/df_calib['cnt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, quantile, model, fact
if method == 'mob':
df_calib = df_calib[df_calib['quantile'] == quantile]
df_calib = df_calib.groupby([method]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1]
mob = np.array(df_calib['mob'])
model = np.array(df_calib['score']/df_calib['cnt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, mob, model, fact
else:
df_calib = df_calib.groupby([groupby_col]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
x = np.array(df_calib[groupby_col])
model = np.array(df_calib['score']/df_calib['total_debt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, x, model, fact
def evaluate_model(self, model_data, parameter):
'''
Метод перебора калибровочных функций для аппроксимации коэффициентов.
'''
results = pd.DataFrame()
for func_name in self.func:
x = model_data['mean_model'].to_numpy() if self.method == 'mob' else model_data['mob'].to_numpy()
r2, a, b = eval(f'self.{func_name}')(x, model_data[parameter].to_numpy())
results = pd.concat([results, pd.DataFrame({
'func': func_name,
'r2': r2,
'a': a,
'b': b
}, index=[0])], ignore_index=True)
return results
def find_best_model(self, results):
'''
Метод нахождения наилучшей калибровочной функции.
'''
best_idx = results['r2'].idxmax()
best_func_name = results.loc[best_idx, 'func']
a, b = results.loc[best_idx, ['a', 'b']]
return best_func_name, a, b
def plot_results(self, group, fact, model, title):
'''
Метод вывода результирующих графиков калибровки.
'''
# with mlflow.start_run(run_name = f'AutoCalibration = {title}'):
plt.figure(figsize=(20, 15))
plt.scatter(list(map(str, group)), fact, color = 'Black')
plt.plot(list(map(str, group)), fact, label = 'Fact', color = 'Blue')
plt.plot(list(map(str, group)), model, label = 'Model', color = 'Red')
plt.xticks(rotation=60)
plt.title(title)
plt.legend()
plt.savefig(f"{inflow.path}plots/{title}.png")
plt.show()
'''
Блок методов с функциями калибровки.
'''
def Linear(self, x, fact, plot = False):
linear_model = LinearRegression()
linear_model.fit(x.reshape(-1, 1), fact)
predicts = linear_model.predict(x.reshape(-1, 1))
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(x, x, fact, linear_model.coef_, linear_model.intercept_, 'Linear')
return r2, linear_model.coef_, linear_model.intercept_
def Ln(self, x, fact, plot = False):
if x[0] == 0:
x = x[1:]
fact = fact[1:]
coef = np.polyfit(np.log(x), fact, 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = coef[0]*np.log(x) + coef[1]
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, coef[0], coef[1], 'Ln')
return r2, coef[0], coef[1]
def Power(self, x, fact, plot = False):
if x[0] == 0:
x = x[1:]
fact = fact[1:]
coef = np.polyfit(np.log(x), np.log(fact), 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = np.exp(coef[1])*(x**coef[0])
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power')
return r2, np.exp(coef[1]), coef[0]
def Exp(self, x, fact, plot = False):
coef = np.polyfit(x, np.log(fact), 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = np.exp(coef[1])*np.exp(coef[0]*x)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp')
return r2, np.exp(coef[1]), coef[0]
def plot_calib(self, polyline, x, fact, a, b, func):
plt.figure(figsize=(20, 15))
plt.scatter(x, fact, color = 'Black')
plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red')
plt.plot(x, fact, label = 'Fact', color = 'Blue')
plt.legend()
plt.show()
def apply_func(self, func, x, a, b, sql=False):
'''
Метод приминения калибровочных функций к данным.
'''
operations = {
'Linear': (lambda x, a, b: a*x+b, '({} * {} + {})'),
'Ln': (lambda x, a, b: a*np.log(x)+b, '({} * ln({}) + {})'),
'Power': (lambda x, a, b: a*(x**b), '({} * power({}, {}))'),
'Exp': (lambda x, a, b: a*np.exp(x*b), '({} * exp({} * {}))'),
}
calc, sql_template = operations[func]
result = calc(x, a, b)
if self.method == 'mob':
sql_str = sql_template.format(a, f'c.{inflow.score_col}', b)
else:
sql_str = sql_template.format(a, f'p.{inflow.mob_col}', b)
return (result, sql_str) if sql else result
def get_sql(self, best_func, func_a, func_b):
'''
Метод формирования итогового SQL-скрипта с калибровочной функцией.
'''
sql_patterns = {
'Linear': '{} * {} + {}',
'Ln': '{} * ln({}) + {}',
'Power': '{} * power({}, {})',
'Exp': '{} * exp({} * {})',
}
column = f'p.{inflow.mob_col}' if self.method == 'mob' else f'c.{inflow.score_col}'
sql_template = sql_patterns[best_func]
return sql_template.format(func_a, column, func_b)