diff --git a/conf/base/parameters.yml b/conf/base/parameters.yml index cff23cfc..3da5bd71 100644 --- a/conf/base/parameters.yml +++ b/conf/base/parameters.yml @@ -382,17 +382,17 @@ models: flavor: DataSet config: features: - - 'no_show_before' + # - 'no_show_before' - 'appts_before' - - 'show_before' + # - 'show_before' - 'no_show_rate' - 'sched_days_advanced' - - 'month' + # - 'month' - 'age' - 'modality' - 'occupation' - 'reason' - - 'sex' + # - 'sex' - 'hour_sched' - 'distance_to_usz' - 'day_of_week_str' @@ -400,9 +400,9 @@ models: - 'times_rescheduled' target: NoShow Stratifier: - flavor: PartitionedLabelStratifier + flavor: PartitionedFeatureStratifier config: - n_partitions: 5 + split_feature: 'year' Architecture: flavor: Pipeline config: @@ -417,14 +417,14 @@ models: with_mean: True args: columns: - - 'no_show_before' + # - 'no_show_before' - 'sched_days_advanced' - 'age' - 'hour_sched' - 'distance_to_usz' - 'times_rescheduled' - 'appts_before' - - 'show_before' + # - 'show_before' - 'no_show_rate' - name: 'onehot' flavor: sklearn.preprocessing.OneHotEncoder @@ -437,13 +437,13 @@ models: - 'reason' - 'modality' - 'day_of_week_str' - - name: 'cyc' - flavor: mridle.utilities.modeling.CyclicalTransformer - config: - period: 12 - args: - columns: - - 'month' + #- name: 'cyc' + # flavor: mridle.utilities.modeling.CyclicalTransformer + # config: + # period: 12 + # args: + # columns: + # - 'month' - flavor: XGBClassifier name: 'classifier' config: diff --git a/src/mridle/experiment/architecture.py b/src/mridle/experiment/architecture.py index b71dbaad..1f4f290a 100644 --- a/src/mridle/experiment/architecture.py +++ b/src/mridle/experiment/architecture.py @@ -2,7 +2,7 @@ import skorch from sklearn.base import BaseEstimator -from sklearn.ensemble import RandomForestClassifier +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer @@ -34,6 +34,7 @@ class ArchitectureInterface(ComponentInterface): registered_flavors = { 'RandomForestClassifier': RandomForestClassifier, # TODO enable auto-loading from sklearn + 'RandomForestRegressor': RandomForestRegressor, # TODO enable auto-loading from sklearn 'LogisticRegression': LogisticRegression, 'XGBClassifier': xgb.XGBClassifier, 'Pipeline': Pipeline, diff --git a/src/mridle/experiment/dataset.py b/src/mridle/experiment/dataset.py index 28e5b721..1d3281b3 100644 --- a/src/mridle/experiment/dataset.py +++ b/src/mridle/experiment/dataset.py @@ -36,8 +36,13 @@ def validate_config(config, data): if col not in data.columns: raise ValueError(f'Feature column {col} not found in dataset.') - if config['target'] not in data.columns: - raise ValueError(f"Target column {config['target']} not found in dataset.") + if isinstance(config['target'], str): + if config['target'] not in data.columns: + raise ValueError(f"Target column {config['target']} not found in dataset.") + elif isinstance(config['target'], list): + if not set(config['target']).issubset(data.columns): + not_in_list = list(set(config['target']).difference(data.columns)) + raise ValueError(f"Target columns {not_in_list} not found in dataset.") return True diff --git a/src/mridle/experiment/metric.py b/src/mridle/experiment/metric.py index bc11f93b..d61f6d77 100644 --- a/src/mridle/experiment/metric.py +++ b/src/mridle/experiment/metric.py @@ -135,6 +135,10 @@ class MetricInterface(ComponentInterface): 'AUPRC': AUPRC, 'AUROC': AUROC, 'LogLoss': LogLoss, + 'MAE': MAE, + 'MSE': MSE, + 'RMSE': RMSE, + 'MedianAbsoluteError': MedianAbsoluteError } @classmethod diff --git a/src/mridle/experiment/stratifier.py b/src/mridle/experiment/stratifier.py index ed1ba078..6acda4be 100644 --- a/src/mridle/experiment/stratifier.py +++ b/src/mridle/experiment/stratifier.py @@ -58,7 +58,7 @@ def materialize_partition(self, partition_id: int, data_set: DataSet) -> Tuple[p class PartitionedLabelStratifier(Stratifier): def partition_data(self, data_set: DataSet) -> List[Tuple[List[int], List[int]]]: - """Randomly shuffle and split the doc_list into n_partitions roughly equal lists, stratified by label.""" + """Randomly shuffle and split the data_set into n_partitions roughly equal lists, stratified by label.""" label_list = data_set.y skf = StratifiedKFold(n_splits=self.config['n_partitions'], random_state=42, shuffle=True) x = np.zeros(len(label_list)) # split takes a X argument for backwards compatibility and is not used @@ -100,9 +100,33 @@ def validate_config(cls, config): return True +class PartitionedFeatureStratifier(Stratifier): + + def partition_data(self, data_set: DataSet) -> List[Tuple[List[int], List[int]]]: + """Split dataset by feature values of provided column.""" + data_set_copy = data_set.data.copy() + data_set_copy = data_set_copy.reset_index() + label_list = data_set_copy[self.config['split_feature']].unique() + partitions = [] + for l_id, f_label in enumerate(label_list): + print(f_label) + train_ids = np.array(data_set_copy[data_set_copy[self.config['split_feature']] != f_label].index) + test_ids = np.array(data_set_copy[data_set_copy[self.config['split_feature']] == f_label].index) + partitions.append([train_ids, test_ids]) + return partitions + + @classmethod + def validate_config(cls, config): + for key in ['split_feature', ]: + if key not in config: + raise ValueError(f"{cls.__name__} config must contain entry '{key}'.") + return True + + class StratifierInterface(ComponentInterface): registered_flavors = { + 'PartitionedFeatureStratifier': PartitionedFeatureStratifier, 'PartitionedLabelStratifier': PartitionedLabelStratifier, 'TrainTestStratifier': TrainTestStratifier, } diff --git a/src/mridle/pipelines/data_science/feature_engineering/nodes.py b/src/mridle/pipelines/data_science/feature_engineering/nodes.py index ff79e593..a9938013 100644 --- a/src/mridle/pipelines/data_science/feature_engineering/nodes.py +++ b/src/mridle/pipelines/data_science/feature_engineering/nodes.py @@ -15,6 +15,13 @@ def daterange(date1, date2): yield date1 + timedelta(n) +def get_last_non_na(x): + if x.last_valid_index() is None: + return '0' + else: + return x[x.last_valid_index()] + + def generate_training_data(status_df, valid_date_range, append_outcome=True, add_no_show_before=True): """ Build data for use in models by trying to replicate the conditions under which the model would be used in reality @@ -201,13 +208,15 @@ def build_feature_set(status_df: pd.DataFrame, valid_date_range: List[str], mast 'distance_to_usz_sq': 'last', 'close_to_usz': 'last', 'times_rescheduled': 'last', - 'start_time': 'last' + 'start_time': 'last', + 'Telefon': lambda x: get_last_non_na(x) } slot_df = build_slot_df(status_df, valid_date_range, agg_dict, build_future_slots=build_future_slots, include_id_cols=True) slot_df = feature_days_scheduled_in_advance(status_df, slot_df) + slot_df = feature_year(slot_df) slot_df = feature_month(slot_df) slot_df = feature_hour_sched(slot_df) slot_df = feature_day_of_week(slot_df) @@ -217,7 +226,7 @@ def build_feature_set(status_df: pd.DataFrame, valid_date_range: List[str], mast slot_df = feature_cyclical_month(slot_df) slot_df = slot_df[slot_df['day_of_week_str'].isin(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'])] slot_df = slot_df[slot_df['sched_days_advanced'] > 2] - + slot_df = limit_to_day_hours(slot_df) return slot_df @@ -274,6 +283,20 @@ def feature_month(slot_df: pd.DataFrame) -> pd.DataFrame: return slot_df +def feature_year(slot_df: pd.DataFrame) -> pd.DataFrame: + """ + Append the year feature to the dataframe. + + Args: + slot_df: A dataframe containing appointment slots. + + Returns: A row-per-status-change dataframe with additional column 'year'. + + """ + slot_df['year'] = slot_df['start_time'].dt.year + return slot_df + + def feature_hour_sched(slot_df: pd.DataFrame) -> pd.DataFrame: """ Append the hour_sched feature to the dataframe using was_sched_for_date. @@ -628,10 +651,6 @@ def feature_occupation(df): df_remap.loc[df_remap['Beruf'] == 'nan', 'occupation'] = 'none_given' df_remap.loc[df_remap['Beruf'] == '-', 'occupation'] = 'none_given' - df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='rentner|Renter|pensioniert|pens.|rente'), - 'occupation'] = 'retired' - df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='keine Angaben|keine Ang'), - 'occupation'] = 'none_given' df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='Angestellte|ang.|baue|angest.|Hauswart|dozent|designer|^KV$|' 'masseu|Raumpflegerin|Apothekerin|Ing.|fotog|Psycholog|' @@ -649,14 +668,24 @@ def feature_occupation(df): 'ingenieur|Kauf|mitarbeiter|Verkäufer|Informatiker|koch|' 'lehrer|arbeiter|architekt'), 'occupation'] = 'employed' + df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='rentner|Renter|pensioniert|pens.|rente'), + 'occupation'] = 'retired' + df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='IV-Rentner'), + 'occupation'] = 'iv_retired' + + df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='keine Angaben|keine Ang'), + 'occupation'] = 'none_given' + df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='student|Schüler|Doktorand|' 'Kind|Stud.|Ausbildung|^MA$'), 'occupation'] = 'student' df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='^IV$|^IV-Bezüger|^$|arbeitslos|ohne Arbeit|' 'ohne|o.A.|nicht Arbeitstätig|' 'Sozialhilfeempfänger|o. Arbeit|keine Arbeit|' - 'Asyl|RAV|Hausfrau|Hausmann'), + 'Asyl|RAV'), 'occupation'] = 'unemployed' + df_remap.loc[ + df_remap['Beruf'].apply(regex_search, search_str='Hausfrau|Hausmann'), 'occupation'] = 'stay_at_home_parent' df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='selbst'), 'occupation'] = 'self_employed' df_remap.loc[df_remap['Beruf'].apply(regex_search, search_str='arzt|aerzt|ärzt|pflegefachfrau|Pflegehelfer|' 'MTRA|Erzieherin|Fachfrau Betreuung|' @@ -667,7 +696,9 @@ def feature_occupation(df): df_remap.loc[df_remap['occupation'] == '', 'occupation'] = 'other' df_remap.loc[df_remap['occupation'].isna(), 'occupation'] = 'other' + df_remap = df_remap.drop('Beruf', axis=1) + return df_remap @@ -715,5 +746,9 @@ def feature_duration(dicom_df: pd.DataFrame) -> pd.DataFrame: return dicom_df +def limit_to_day_hours(df): + return df[(df['hour_sched'] < 18) & (df['hour_sched'] > 6)] + + def regex_search(x, search_str): return bool(re.search(search_str, x, re.IGNORECASE)) diff --git a/src/mridle/pipelines/data_science/live_data/nodes.py b/src/mridle/pipelines/data_science/live_data/nodes.py index fdc49bb5..6d5459bb 100644 --- a/src/mridle/pipelines/data_science/live_data/nodes.py +++ b/src/mridle/pipelines/data_science/live_data/nodes.py @@ -8,7 +8,7 @@ def get_slt_with_outcome(): '/data/mridle/data/silent_live_test/live_files/all/out_features_data/features_master_slt_features.csv', parse_dates=['start_time', 'end_time']) preds.drop(columns=['NoShow'], inplace=True) - actuals = pd.read_csv('/data/mridle/data/silent_live_test/live_files/all/actuals/master_actuals_with_filename.csv', + actuals = pd.read_csv('/data/mridle/data/silent_live_test/live_files/all/actuals/master_actuals.csv', parse_dates=['start_time', 'end_time']) preds['MRNCmpdId'] = preds['MRNCmpdId'].astype(str) diff --git a/src/mridle/utilities/intervention.py b/src/mridle/utilities/intervention.py new file mode 100644 index 00000000..afb76323 --- /dev/null +++ b/src/mridle/utilities/intervention.py @@ -0,0 +1,215 @@ +import pandas as pd +import matplotlib.pyplot as plt +from matplotlib.backends.backend_pdf import PdfPages +import smtplib +import datetime +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.utils import formatdate +import configparser +import os + +from mridle.pipelines.data_science.feature_engineering.nodes import add_business_days + + +def intervention(dt): + """ + df: dataframe with appointments that need to be called for that day. Both intervention and control included . i.e. + just the top 20 (or above threshold...). Should have a col called 'control' indicating if it is control or + intervention. + """ + + # Read the configuration file + config = configparser.ConfigParser() + config.read('/data/mridle/data/intervention/config.ini') + + # Access the values in the configuration file + username = config['DEFAULT']['username'] + password = config['DEFAULT']['password'] + recipients = config['DEFAULT']['recipients'].split(',') + threshold = float(config['DEFAULT']['threshold']) + + today = dt.strftime('%Y_%m_%d') + filename_date = add_business_days(dt, 3).date().strftime('%Y_%m_%d') + filename = '/data/mridle/data/silent_live_test/live_files/all/out_features_data/features_{}.csv'.format( + filename_date) + + preds = pd.read_csv(filename, parse_dates=['start_time']) + preds.rename(columns={"prediction_xgboost": "prediction"}, inplace=True) + preds.drop(columns=[x for x in preds.columns if 'prediction_' in x], inplace=True) + preds.drop(columns=[x for x in preds.columns if 'Unnamed:' in x], inplace=True) + + # Take the top X appts + # preds = preds.sort_values("prediction", ascending=False)[:split_config[day_of_week_from_filename]['num_preds']] + + # Take appts above a certain threshold + preds = preds[preds['prediction'] > threshold] + # We don't want overlap (i.e. appts being in both the Monday batch of calls as well as the thursday batch, so for + # the Thursday batch we remove the appts for following Thursday (which would be the day where a potential overlap + # would occur) + if dt.strftime("%A") == 'Thursday': + preds = preds[preds['start_time'].dt.strftime("%A") != "Thursday"] + + preds['control'] = 'control' + + # use the index of a sampling to change ~50% of the labels to 'intervention' + preds.loc[preds.sample(frac=0.5, replace=False).index, 'control'] = 'intervention' + + intervention_df = preds[preds['control'] == 'intervention'][['MRNCmpdId', 'FillerOrderNo', 'start_time', 'Telefon']] + + # Save the original as csv, and then the intervention one as PDF to be emailed + preds.to_csv("/data/mridle/data/intervention/intervention_{}.csv".format(today), index=False) + + fig, ax = plt.subplots(figsize=(12, 4)) + ax.axis('tight') + ax.axis('off') + ax.table(cellText=intervention_df.values, colLabels=intervention_df.columns, loc='center') + + pp = PdfPages("/data/mridle/data/intervention/intervention_{}.pdf".format(today)) + pp.savefig(fig, bbox_inches='tight') + pp.close() + + for_feedback_file = intervention_df[['MRNCmpdId', 'FillerOrderNo', 'start_time']] + feedback_file = "/data/mridle/data/intervention/feedback.txt" + with open(feedback_file, 'a') as f: + for_feedback_file.to_csv(f, header=False, index=False, sep=',') + + # create an SMTP object + smtp_obj = smtplib.SMTP('outlook.usz.ch', 587) + + # establish a secure connection + smtp_obj.starttls() + + # login to the email server using your email address and password + smtp_obj.login(username, password) + + # create the email message + msg = MIMEMultipart() + msg['From'] = username + msg['To'] = ", ".join(recipients) + msg['Date'] = formatdate(localtime=True) + msg['Subject'] = 'Intervention Study - {}'.format(today) + body = """ + Dear Namka, Dear Emir, + + Here are the upcoming appointments which we would like to include in the study. + + As discussed, could you please: + + 1. Try to call these patients today and remind them of their appointment + 2. Send me an email with some feedback (i.e. whether you could get talking with the patient, what they said, etc.)\ + in whichever form suits you. + + Let me know if you have any questions. + + Regards, + Mark + """ + msg.attach(MIMEText(body, 'plain')) + + path_to_pdf = '/data/mridle/data/intervention/intervention_{}.pdf'.format(today) + + with open(path_to_pdf, "rb") as f: + attach = MIMEApplication(f.read(), _subtype="pdf") + attach.add_header('Content-Disposition', 'attachment', filename='Intervention_{}'.format(today)) + msg.attach(attach) + + # send the email + smtp_obj.sendmail(username, recipients, msg.as_string()) + + # close the SMTP connection + smtp_obj.quit() + + +def send_results(): + """ + Sends updated results by email to me + + """ + all_actuals = pd.read_csv("/data/mridle/data/silent_live_test/live_files/all/actuals/master_actuals.csv", + parse_dates=['start_time']) + most_recent_data = all_actuals['start_time'].max() + file_dir = '/data/mridle/data/intervention/' + + intervention_df = pd.DataFrame() + + for filename in os.listdir(file_dir): + if filename.endswith(".csv"): + i_df = pd.read_csv(os.path.join(file_dir, filename), parse_dates=['start_time']) + i_df['file'] = filename + intervention_df = pd.concat([intervention_df, i_df]) + + intervention_df = intervention_df[intervention_df['start_time'] < most_recent_data] + intervention_df.drop(columns=['NoShow'], inplace=True) + intervention_df = intervention_df.merge(all_actuals[['FillerOrderNo', 'start_time', 'NoShow']], how='left') + feedback = pd.read_csv('/data/mridle/data/intervention/feedback.txt', sep=",") + feedback['start_time'] = pd.to_datetime(feedback['start_time']) + intervention_df = intervention_df.merge(feedback, how='left') + intervention_df = intervention_df[~(intervention_df['feedback'].isin(['appt not found', 'delete']))] + intervention_df['NoShow'].fillna(False, inplace=True) + intervention_df.loc[intervention_df['control'] == 'control', 'feedback'] = 'control' + + # remove duplicates (some appts were included in both monday and thursday's intervention/control group) + intervention_df = intervention_df.sort_values('file').groupby(['FillerOrderNo', 'start_time']).last().reset_index() + intervention_df['reached'] = intervention_df['feedback'].map({ + 'control': 'control', + 'comes': 'reached', + 'not reached': 'not reached', + 'to be rescheduled': 'reached' + }) + intervention_df['reached_2'] = intervention_df['feedback'].map({ + 'control': 'control', + 'comes': 'reached', + 'not reached': 'control', + 'to be rescheduled': 'reached' + }) + + r_1 = intervention_df.groupby('control').agg({'NoShow': ['count', 'sum', 'mean']}).reset_index() + r_2 = intervention_df.groupby(['control', 'feedback']).agg({'NoShow': ['count', 'sum', 'mean']}).reset_index() + r_3 = intervention_df.groupby(['reached']).agg({'NoShow': ['count', 'sum', 'mean']}) + r_4 = intervention_df.groupby(['reached_2']).agg({'NoShow': ['count', 'sum', 'mean']}) + + # Read the configuration file + config = configparser.ConfigParser() + config.read('/data/mridle/data/intervention/config.ini') + + # Access the values in the configuration file + username = config['DEFAULT']['username'] + password = config['DEFAULT']['password'] + + # create an SMTP object + smtp_obj = smtplib.SMTP('outlook.usz.ch', 587) + + # establish a secure connection + smtp_obj.starttls() + + # login to the email server using your email address and password + smtp_obj.login(username, password) + + # create the email message + msg = MIMEMultipart() + msg['From'] = username + msg['To'] = 'markronan.mcmahon@usz.ch' + msg['Date'] = formatdate(localtime=True) + msg['Subject'] = 'Intervention results - {}'.format(datetime.datetime.today().strftime('%Y_%m_%d')) + body = """ + {} + + {} + + {} + + {} + """.format(r_1.to_html(), r_2.to_html(), r_3.to_html(), r_4.to_html()) + msg.attach(MIMEText(body, 'html')) + + # send the email + smtp_obj.sendmail(username, username, msg.as_string()) + + # close the SMTP connection + smtp_obj.quit() + + +if __name__ == '__main__': + intervention(dt=datetime.datetime.today()) diff --git a/src/mridle/utilities/process_live_data.py b/src/mridle/utilities/process_live_data.py index 81692a1f..e43c8648 100644 --- a/src/mridle/utilities/process_live_data.py +++ b/src/mridle/utilities/process_live_data.py @@ -3,6 +3,7 @@ from mridle.pipelines.data_engineering.ris.nodes import build_status_df, prep_raw_df_for_parquet, build_slot_df from mridle.pipelines.data_science.feature_engineering.nodes import remove_na, \ generate_3_5_days_ahead_features, add_business_days, subtract_business_days, feature_no_show_before +from mridle.pipelines.data_science.live_data.nodes import get_slt_with_outcome from mridle.experiment.experiment import Experiment from mridle.experiment.dataset import DataSet import os @@ -10,7 +11,8 @@ from dateutil.relativedelta import relativedelta from pathlib import Path import pickle - +import numpy as np +import csv AGO_DIR = '/data/mridle/data/silent_live_test/live_files/all/ago/' OUT_DIR = '/data/mridle/data/silent_live_test/live_files/all/out/' @@ -53,7 +55,7 @@ def get_slt_features_delete_if_ok_to_do_so(): rfs_df = pd.read_csv('/data/mridle/data/silent_live_test/live_files/all/' 'retrospective_reasonforstudy/content/[dbo].[MRIdle_retrospective].csv') - rfs_df[['FillerOrderNo', 'ReasonForStudy']].drop_duplicates() + rfs_df[['FillerOrderNo', 'ReasonForStudy']].drop_duplicates(inplace=True) # add on proper noshow ago_st = get_slt_status_data('ago') @@ -113,7 +115,6 @@ def get_sorted_filenames(file_dir): def process_live_data(): - # process_live_data() function already_processed_filename = '/data/mridle/data/silent_live_test/live_files/already_processed.txt' master_feature_set = pd.read_parquet( '/data/mridle/data/kedro_data_catalog/04_feature/master_feature_set_na_removed.parquet') @@ -160,7 +161,7 @@ def process_live_data(): ago_features_df['file'] = filename master_ago_filepath = '/data/mridle/data/silent_live_test/live_files/all/' \ - 'actuals/master_actuals_with_filename.csv' + 'actuals/master_actuals.csv' if os.path.exists(master_ago_filepath): master_ago = pd.read_csv(master_ago_filepath) else: @@ -171,7 +172,7 @@ def process_live_data(): master_ago_updated.to_csv(master_ago_filepath, index=False) ago_features_df.to_csv( - '/data/mridle/data/silent_live_test/live_files/all/actuals/actuals_{}_{}_{}_with_filename.csv'.format( + '/data/mridle/data/silent_live_test/live_files/all/actuals/actuals_{}_{}_{}.csv'.format( ago_day, ago_month, ago_year)) @@ -201,7 +202,7 @@ def process_live_data(): data_path = '/data/mridle/data/silent_live_test/live_files/all/out/{}'.format(filename_row['filename']) model_dir = '/data/mridle/data/kedro_data_catalog/06_models/' output_path = '/data/mridle/data/silent_live_test/live_files/all/' \ - 'out_features_data/features_{}_{}_{}.csv'.format(out_day, out_month, out_year) + 'out_features_data/features_{}_{}_{}.csv'.format(out_year, out_month, out_day) make_out_prediction(data_path, model_dir, output_path, valid_date_range=out_valid_date_range, file_encoding='utf-16', master_feature_set=master_feature_set, rfs_df=rfs, @@ -233,10 +234,11 @@ def remove_redundant(df): & (st_df['now_sched_for_date'] == st_df['was_sched_for_date']))] return st_df - if file_encoding: + try: + raw_df = pd.read_csv(data_path, encoding=file_encoding) + except pd.errors.ParserError: + fix_csv_file(data_path) raw_df = pd.read_csv(data_path, encoding=file_encoding) - else: - raw_df = pd.read_csv(data_path) exclude_pat_ids = list() # TODO! @@ -271,13 +273,18 @@ def remove_redundant(df): # Get number of previous no shows from historical data and add to data set master_df = master_feature_set.copy() master_df = master_df[master_df['MRNCmpdId'] != 'SMS0016578'] - master_slt_filepath = '/data/mridle/data/silent_live_test/live_files/all/' \ - 'out_features_data/features_master_slt_features.csv' - if os.path.exists(master_slt_filepath): - master_slt = pd.read_parquet('/data/mridle/data/kedro_data_catalog/04_feature/live_data.parquet') + + master_slt_feature_filepath = '/data/mridle/data/silent_live_test/live_files/all/' \ + 'out_features_data/features_master_slt_features.csv' + + if os.path.exists(master_slt_feature_filepath): + master_slt_with_outcome = get_slt_with_outcome() + master_feature_slt = pd.read_csv(master_slt_feature_filepath, parse_dates=['start_time']) else: - master_slt = pd.DataFrame() - historic_data = pd.concat([master_df, master_slt], axis=0) + master_slt_with_outcome = pd.DataFrame() + master_feature_slt = pd.DataFrame() + + historic_data = pd.concat([master_df, master_slt_with_outcome], axis=0) historic_data['MRNCmpdId'] = historic_data['MRNCmpdId'].astype(str) features_df['MRNCmpdId'] = features_df['MRNCmpdId'].astype(str) @@ -290,25 +297,31 @@ def remove_redundant(df): model_dirs = Path(model_dir).glob('*') for model_dir in model_dirs: - model_paths = model_dir.glob('*') - for model_path in model_paths: - with open(model_path, "rb+") as f: - serialized_model = pickle.load(f) - exp = Experiment.deserialize(serialized_model) - data_set = DataSet(exp.stratified_dataset.config, features_df) - preds_proba = exp.final_predictor.predict_proba(data_set.x) - model_name = exp.metadata.get('name', model_path.name) - features_df[f'prediction_{model_name}'] = preds_proba + if str(model_dir) in ['/data/mridle/data/kedro_data_catalog/06_models/xgboost', + '/data/mridle/data/kedro_data_catalog/06_models/random_forest', + '/data/mridle/data/kedro_data_catalog/06_models/logistic_regression']: + model_paths = model_dir.glob('*') + for model_path in model_paths: + with open(model_path, "rb+") as f: + serialized_model = pickle.load(f) + exp = Experiment.deserialize(serialized_model) + data_set = DataSet(exp.stratified_dataset.config, features_df) + preds_proba = exp.final_predictor.predict_proba(data_set.x) + model_name = exp.metadata.get('name', model_path.name) + features_df[f'prediction_{model_name}'] = preds_proba features_df.to_csv(output_path, index=False) + print(features_df.shape) new_appts = features_df.merge(historic_data[['FillerOrderNo', 'start_time']], how='left', indicator=True) new_appts = new_appts[new_appts['_merge'] == 'left_only'] new_appts.drop(columns=['_merge'], inplace=True) - - master_slt_updated = pd.concat([master_slt, new_appts], axis=0) + print(new_appts.shape) + # print(features_df[features_df['MRNCmpdId'] == '60184934'][['MRNCmpdId', 'FillerOrderNo', 'no_show_before', + # 'start_time', 'no_show_rate', 'NoShow']]) + master_slt_updated = pd.concat([master_feature_slt, new_appts], axis=0) master_slt_updated.drop_duplicates(inplace=True) - master_slt_updated.to_csv(master_slt_filepath, index=False) + master_slt_updated.to_csv(master_slt_feature_filepath, index=False) def get_silent_live_test_predictions(model_str='prediction_xgboost', all_columns=True): @@ -406,3 +419,27 @@ def get_silent_live_test_actuals(all_columns=True): else: all_actuals = pd.concat([all_actuals, actuals], axis=0) return all_actuals + + +def fix_csv_file(filename_to_fix): + + res = [] + + with open(filename_to_fix, 'r', encoding='utf-16') as read_obj: + # pass the file object to reader() to get the reader object + # csv_reader = reader(read_obj, skipinitialspace=True) + csv_reader = csv.DictReader(read_obj, restkey='ReasonForStudy2') + for row in csv_reader: + # row variable is a list that represents a row in csv + res.append(row) + + res_df = pd.DataFrame(res) + if 'ReasonForStudy2' in res_df.columns: + res_df['ReasonForStudy'] = np.where(res_df['ReasonForStudy2'].isna(), res_df['ReasonForStudy'], + res_df['ReasonForStudy'].astype(str) + res_df['ReasonForStudy2'].astype( + str)) + res_df.drop(columns=['ReasonForStudy2'], inplace=True) + res_df['ReasonForStudy'].replace('"|,', " ", inplace=True) + + res_df.to_csv(filename_to_fix, encoding='utf-16') + return None diff --git a/src/tests/pipelines/data_engineering/test_feature_engineering.py b/src/tests/pipelines/data_engineering/test_feature_engineering.py index b64ddf33..09f59053 100644 --- a/src/tests/pipelines/data_engineering/test_feature_engineering.py +++ b/src/tests/pipelines/data_engineering/test_feature_engineering.py @@ -543,6 +543,7 @@ def test_future_appointments_one_row(self): raw_df = self._fill_out_static_columns(raw_df, create_fon=True) status_df = build_status_df(raw_df, exclude_patient_ids=[]) + status_df['Telefon'] = '0' feature_df = build_feature_set(status_df, valid_date_range) cols = [c for c in expected_feature_df.columns.values] feature_df = feature_df.loc[:, feature_df.columns.isin(cols)] @@ -569,6 +570,7 @@ def test_future_appointments_moved_forward(self): raw_df = self._fill_out_static_columns(raw_df, create_fon=True) status_df = build_status_df(raw_df, exclude_patient_ids=[]) + status_df['Telefon'] = '0' test_dt = day(2) feature_df = generate_3_5_days_ahead_features(status_df, test_dt) @@ -600,6 +602,7 @@ def test_future_appointments_rescheduled(self): raw_df = self._fill_out_static_columns(raw_df, create_fon=True) status_df = build_status_df(raw_df, exclude_patient_ids=[]) + status_df['Telefon'] = '0' feature_df = generate_training_data(status_df, valid_date_range) cols = [c for c in expected_feature_df.columns.values] @@ -701,6 +704,7 @@ def test_multiple_appts(self): raw_df = pd.concat([raw_df_1, raw_df_2, raw_df_3, raw_df_4], axis=0) status_df = build_status_df(raw_df, exclude_patient_ids=[]) + status_df['Telefon'] = '0' # Set valid date range to as if we were generating this data on day(12) test_vdr = [pd.Timestamp(year=2019, month=1, day=1, hour=0, minute=0), @@ -780,6 +784,7 @@ def test_appointment_one_row(self): raw_df = self._fill_out_static_columns(raw_df, create_fon=True) status_df = build_status_df(raw_df, exclude_patient_ids=[]) + status_df['Telefon'] = '0' model_data_df = generate_training_data(status_df, valid_date_range) @@ -887,6 +892,7 @@ def test_multiple_appts(self): raw_df = pd.concat([raw_df_1, raw_df_2, raw_df_3, raw_df_4], axis=0) status_df = build_status_df(raw_df, exclude_patient_ids=[]) + status_df['Telefon'] = '0' # Set valid date range to as if we were generating this data on day(12) test_vdr = [pd.Timestamp(year=2019, month=1, day=1, hour=0, minute=0),