import pandas as pd import matplotlib.pyplot as plt import numpy as np from sklearn.metrics import mean_squared_error, r2_score from sklearn.linear_model import LinearRegression import mlflow import sys sys.path.insert(1, '../') from greenplum import Greenplum from global_config import cfg as global_config from inflow.inflow_sql import sql from inflow.inflow_config import inflow import warnings warnings.filterwarnings('ignore') import tempfile import os class AutoCalibrator: def __init__(self, method): ''' Конструктор класса, в котором происходит инициализация необходимых переменных. ''' self.method = method self.func = ['Linear', 'Ln', 'Power', 'Exp'] self.func_res = {} self.best_func = None self.sql_a = None self.sql_b = None self.sql_stage_2 = None # self.mlflow_uri = "http://0.0.0.0:8999" # mlflow.set_tracking_uri(self.mlflow_uri) # mlflow.set_experiment('Autocalibration') # mlflow.set_tracking_uri(self.mlflow_uri) def initialize_greenplum(self, stage): ''' Метод подключения к GP для выгрузки таблицы. ''' formatted_sql, path = self.prepare_sql(stage) GP = Greenplum(formatted_sql, path, inflow.table_name, global_config.omega_login) GP.start() def prepare_sql(self, stage): ''' Метод подготовки SQL-скрипта в зависимости от стадии калибровки. ''' if stage == 1: script = sql.stage_1.format(table_name = inflow.table_name, agreement_gen = inflow.agreement_gen, report_gen = inflow.report_dt, mob = inflow.mob_col, score_col = inflow.score_col, target = inflow.target_col, debt = inflow.debt_col, amount = inflow.amount_col, fact_table = inflow.fact_table, score_table = global_config.score_table, appl_id = inflow.fact_appl_id_col, appl_id_ = inflow.score_appl_id_col ) path = inflow.path+global_config.path_1 else: script = sql.stage_2.format(table_name = inflow.table_name, agreement_gen = inflow.agreement_gen, report_gen = inflow.report_dt, mob = inflow.mob_col, score_col = inflow.score_col, target = inflow.target_col, debt = inflow.debt_col, amount = inflow.amount_col, new_score = self.sql_stage_2, fact_table = inflow.fact_table, score_table = global_config.score_table, appl_id = inflow.fact_appl_id_col, appl_id_ = inflow.score_appl_id_col ) path = inflow.path+global_config.path_2 return script, path def calibrate(self): ''' Метод запуска всех необходимых стадий калибровки. ''' self.initialize_greenplum(1) self.df = pd.read_csv(inflow.path+global_config.path_1) self.stage_1() self.stage_2() self.initialize_greenplum(2) self.df = pd.read_csv(inflow.path+global_config.path_2) self.stage_final() def stage_1(self): ''' Метод первой стадии калибровки, в которой зависимости от метода (что мы фиксируем: квантиль или моб) производится группировка таблицы и её передача в методы построения калибровочных функций и подбора коэффициентов на каждый моб/квантиль. ''' self.df = self.df[ (self.df['agreement_gen'] >= inflow.start_aggr_dt) & (self.df['agreement_gen'] <= inflow.end_aggr_dt) & (~self.df['agreement_gen'].isin(inflow.aggrs_to_exclude)) & (self.df['report_gen'] >= inflow.start_report_dt) & (self.df['report_gen'] <= inflow.end_report_dt) & (~self.df['report_gen'].isin(inflow.reps_to_exclude)) ] if self.method == 'mob': for func in self.func: func_res = pd.DataFrame() for quantile in range(1, self.df['quantile'].max()+1): df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile) r2, a, b = eval('self.'+func)(mob, fact) df_iter_row = pd.DataFrame({ 'quantile' : quantile, 'mean_model' : model.mean(), 'r2' : r2, 'a' : a, 'b': b }, index = [quantile]) func_res = pd.concat([func_res, df_iter_row], axis = 0) self.func_res[func] = func_res else: for func in self.func: func_res = pd.DataFrame() for mob in range(1, self.df['mob'].max()+1): df, quantile, model, fact = self.preprocess(self.df, method = self.method, mob = mob) r2, a, b = eval('self.'+func)(model, fact) df_iter_row = pd.DataFrame({ 'mob' : mob, 'r2' : r2, 'a' : a, 'b': b }, index = [mob]) func_res = pd.concat([func_res, df_iter_row], axis = 0) self.func_res[func] = func_res def stage_2(self): ''' Метод второй стадии калибровки, в которой зависимости от метода (что мы фиксируем: квантиль или моб) производится аппроксимация полученных коэффициентов на первой стадии. ''' best_func_key = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean()) calibration_data = self.func_res[best_func_key] results_a = self.evaluate_model(calibration_data, 'a') results_b = self.evaluate_model(calibration_data, 'b') best_func_a, best_a, best_b_a = self.find_best_model(results_a) best_func_b, best_a_b, best_b = self.find_best_model(results_b) if self.method == 'mob': calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mean_model'].to_numpy(), best_a, best_b_a, sql=True) calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mean_model'].to_numpy(), best_a_b, best_b, sql=True) else: calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mob'].to_numpy(), best_a, best_b_a, sql=True) calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mob'].to_numpy(), best_a_b, best_b, sql=True) self.plot_results(calibration_data.iloc[:, 0], calibration_data['a'], calibration_data['a_model'], f"Coef A = {self.sql_a}") self.plot_results(calibration_data.iloc[:, 0], calibration_data['b'], calibration_data['b_model'], f"Coef B = {self.sql_b}") self.sql_stage_2 = self.get_sql(best_func_key, self.sql_a, self.sql_b) def stage_final(self): ''' Финальный метод, в котором после получения необходимой калибровочной функции от моба и скора производится подключение к GP и выгрузка итоговой таблицы, а также вывод результатов в разрезе мобов, квантилей, отчётных дат и дат выдач. ''' self.df = self.df[ (self.df['agreement_gen'] >= inflow.start_aggr_dt) & (self.df['agreement_gen'] <= inflow.end_aggr_dt) & (~self.df['agreement_gen'].isin(inflow.aggrs_to_exclude)) & (self.df['report_gen'] >= inflow.start_report_dt) & (self.df['report_gen'] <= inflow.end_report_dt) & (~self.df['report_gen'].isin(inflow.reps_to_exclude)) ] for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']: df, x, model, fact = self.preprocess(self.df, groupby_col = col) self.plot_results(x, fact, model, col.upper()) df = self.df.copy() self.pivot_plot(df, ['quantile', 'mob']) self.pivot_plot(df, ['year', 'quarter', 'mob']) def pivot_plot(self, df, group): df['agreement_gen'] = pd.to_datetime(df['agreement_gen']) df['year'] = df['agreement_gen'].dt.year df['quarter'] = df['agreement_gen'].dt.quarter pivot_df = df.pivot_table( values=['debt', 'total_debt', 'score', 'cnt'], index=group, aggfunc = np.sum ) pivot_df['fact'] = pivot_df['debt']/pivot_df['total_debt'] pivot_df['model'] = pivot_df['score']/pivot_df['total_debt'] pivot_df = pivot_df[['fact', 'model']] fig, ax = plt.subplots(figsize=(20, 15)) pivot_df.plot(ax = ax) plt.xticks(rotation=60) plt.legend() plt.savefig(f"{inflow.path}plots/{'_'.join(group)}.png") plt.show() def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None): ''' Метод группировки калибровочной таблицы в зависимости от метода и стадии калибровки. ''' if method == 'quantile': df_calib = df_calib[df_calib['mob'] == mob] df_calib = df_calib.groupby([method]) \ .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \ .reset_index() quantile = np.array(df_calib['quantile']) model = np.array(df_calib['score']/df_calib['cnt']) fact = np.array(df_calib['debt']/df_calib['total_debt']) return df_calib, quantile, model, fact if method == 'mob': df_calib = df_calib[df_calib['quantile'] == quantile] df_calib = df_calib.groupby([method]) \ .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \ .reset_index() df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1] mob = np.array(df_calib['mob']) model = np.array(df_calib['score']/df_calib['cnt']) fact = np.array(df_calib['debt']/df_calib['total_debt']) return df_calib, mob, model, fact else: df_calib = df_calib.groupby([groupby_col]) \ .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \ .reset_index() x = np.array(df_calib[groupby_col]) model = np.array(df_calib['score']/df_calib['total_debt']) fact = np.array(df_calib['debt']/df_calib['total_debt']) return df_calib, x, model, fact def evaluate_model(self, model_data, parameter): ''' Метод перебора калибровочных функций для аппроксимации коэффициентов. ''' results = pd.DataFrame() for func_name in self.func: x = model_data['mean_model'].to_numpy() if self.method == 'mob' else model_data['mob'].to_numpy() r2, a, b = eval(f'self.{func_name}')(x, model_data[parameter].to_numpy()) results = pd.concat([results, pd.DataFrame({ 'func': func_name, 'r2': r2, 'a': a, 'b': b }, index=[0])], ignore_index=True) return results def find_best_model(self, results): ''' Метод нахождения наилучшей калибровочной функции. ''' best_idx = results['r2'].idxmax() best_func_name = results.loc[best_idx, 'func'] a, b = results.loc[best_idx, ['a', 'b']] return best_func_name, a, b def plot_results(self, group, fact, model, title): ''' Метод вывода результирующих графиков калибровки. ''' # with mlflow.start_run(run_name = f'AutoCalibration = {title}'): plt.figure(figsize=(20, 15)) plt.scatter(list(map(str, group)), fact, color = 'Black') plt.plot(list(map(str, group)), fact, label = 'Fact', color = 'Blue') plt.plot(list(map(str, group)), model, label = 'Model', color = 'Red') plt.xticks(rotation=60) plt.title(title) plt.legend() plt.savefig(f"{inflow.path}plots/{title}.png") plt.show() ''' Блок методов с функциями калибровки. ''' def Linear(self, x, fact, plot = False): linear_model = LinearRegression() linear_model.fit(x.reshape(-1, 1), fact) predicts = linear_model.predict(x.reshape(-1, 1)) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(x, x, fact, linear_model.coef_, linear_model.intercept_, 'Linear') return r2, linear_model.coef_, linear_model.intercept_ def Ln(self, x, fact, plot = False): if x[0] == 0: x = x[1:] fact = fact[1:] coef = np.polyfit(np.log(x), fact, 1) coef = list(coef) polyline = np.linspace(np.min(x), np.max(x), 100) predicts = coef[0]*np.log(x) + coef[1] r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(polyline, x, fact, coef[0], coef[1], 'Ln') return r2, coef[0], coef[1] def Power(self, x, fact, plot = False): if x[0] == 0: x = x[1:] fact = fact[1:] coef = np.polyfit(np.log(x), np.log(fact), 1) coef = list(coef) polyline = np.linspace(np.min(x), np.max(x), 100) predicts = np.exp(coef[1])*(x**coef[0]) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power') return r2, np.exp(coef[1]), coef[0] def Exp(self, x, fact, plot = False): coef = np.polyfit(x, np.log(fact), 1) coef = list(coef) polyline = np.linspace(np.min(x), np.max(x), 100) predicts = np.exp(coef[1])*np.exp(coef[0]*x) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp') return r2, np.exp(coef[1]), coef[0] def plot_calib(self, polyline, x, fact, a, b, func): plt.figure(figsize=(20, 15)) plt.scatter(x, fact, color = 'Black') plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red') plt.plot(x, fact, label = 'Fact', color = 'Blue') plt.legend() plt.show() def apply_func(self, func, x, a, b, sql=False): ''' Метод приминения калибровочных функций к данным. ''' operations = { 'Linear': (lambda x, a, b: a*x+b, '({} * {} + {})'), 'Ln': (lambda x, a, b: a*np.log(x)+b, '({} * ln({}) + {})'), 'Power': (lambda x, a, b: a*(x**b), '({} * power({}, {}))'), 'Exp': (lambda x, a, b: a*np.exp(x*b), '({} * exp({} * {}))'), } calc, sql_template = operations[func] result = calc(x, a, b) if self.method == 'mob': sql_str = sql_template.format(a, f'c.{inflow.score_col}', b) else: sql_str = sql_template.format(a, f'p.{inflow.mob_col}', b) return (result, sql_str) if sql else result def get_sql(self, best_func, func_a, func_b): ''' Метод формирования итогового SQL-скрипта с калибровочной функцией. ''' sql_patterns = { 'Linear': '{} * {} + {}', 'Ln': '{} * ln({}) + {}', 'Power': '{} * power({}, {})', 'Exp': '{} * exp({} * {})', } column = f'p.{inflow.mob_col}' if self.method == 'mob' else f'c.{inflow.score_col}' sql_template = sql_patterns[best_func] return sql_template.format(func_a, column, func_b)