| import pandas as pd
|
| import numpy as np
|
| import os
|
| import joblib
|
| from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
|
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
|
| from sklearn.impute import SimpleImputer
|
| from sklearn.metrics import classification_report, accuracy_score, roc_curve, auc
|
| from sklearn.feature_selection import SelectFromModel
|
| import matplotlib.pyplot as plt
|
| import seaborn as sns
|
| from scipy.stats import zscore
|
| from imblearn.over_sampling import SMOTE
|
| import os
|
| os.environ["LOKY_MAX_CPU_COUNT"] = "4"
|
|
|
|
|
|
|
| def load_data(file_path):
|
| df = pd.read_excel(file_path, header=1)
|
| return df
|
|
|
|
|
|
|
| def preprocess_data_with_categoricals(df):
|
|
|
| df.replace(-9, np.nan, inplace=True)
|
|
|
|
|
| missing_percentage = df.isnull().sum() / len(df) * 100
|
| df = df.drop(columns=missing_percentage[missing_percentage > 50].index)
|
|
|
|
|
| imputer = SimpleImputer(strategy='median')
|
| numeric_cols = df.select_dtypes(include=['number']).columns
|
| df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
|
|
|
|
|
| if 'Binary diagnosis' in df.columns:
|
| df['Binary diagnosis'] = df['Binary diagnosis'].apply(
|
| lambda x: 1 if str(x).strip().lower() == "ipf" else 0
|
| )
|
|
|
| if 'Death' in df.columns:
|
| df['Death'] = df['Death'].apply(
|
| lambda x: 1 if str(x).strip().lower() == "yes" else 0
|
| )
|
|
|
|
|
| df = apply_one_hot_encoding(df)
|
|
|
|
|
| categorical_cols = df.select_dtypes(include=['object']).columns
|
| numeric_cols = df.select_dtypes(include=['number']).columns
|
| print("Categorical Variables:", categorical_cols.tolist())
|
| print("Numerical Variables:", numeric_cols.tolist())
|
| return df, numeric_cols, categorical_cols
|
|
|
|
|
| def apply_one_hot_encoding(df):
|
| categorical_cols = df.select_dtypes(include=['object']).columns
|
| df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)
|
| return df
|
|
|
| def remove_outliers(df, numeric_cols, z_threshold=4):
|
| for col in numeric_cols:
|
| z_scores = zscore(df[col])
|
| df = df[(np.abs(z_scores) < z_threshold) | (pd.isnull(z_scores))]
|
| return df
|
|
|
|
|
| def select_important_features(X, y, threshold=0.03):
|
| model = RandomForestClassifier(random_state=42)
|
| model.fit(X, y)
|
| selector = SelectFromModel(model, threshold=threshold, prefit=True)
|
| selected_mask = selector.get_support()
|
| selected_features = X.columns[selected_mask]
|
| X_reduced = X.loc[:, selected_features]
|
| return X_reduced, selected_features
|
|
|
|
|
| def plot_feature_importance(model, features, target):
|
| importance = model.feature_importances_
|
| sorted_idx = np.argsort(importance)[::-1]
|
| plt.figure(figsize=(10, 6))
|
| sns.barplot(x=importance[sorted_idx], y=np.array(features)[sorted_idx])
|
| plt.title(f'Feature Importance for {target}')
|
| plt.xlabel('Importance')
|
| plt.ylabel('Feature')
|
| plt.tight_layout()
|
| plt.show()
|
|
|
|
|
| def plot_model_performance(cv_scores, train_scores, test_scores, target ,metric_name="Accuracy"):
|
| plt.figure(figsize=(12, 6))
|
|
|
|
|
| plt.subplot(1, 2, 1)
|
| plt.plot(cv_scores, label='Cross-validation scores', marker='o')
|
| plt.title(f'Cross-validation {metric_name} for {target}')
|
| plt.xlabel('Fold')
|
| plt.ylabel(metric_name)
|
| plt.grid(True)
|
| plt.legend()
|
|
|
|
|
| plt.subplot(1, 2, 2)
|
| plt.bar(['Train', 'Test'], [train_scores.mean(), test_scores], color=['blue', 'orange'])
|
| plt.title(f'{metric_name}: Train vs Test')
|
| plt.ylabel(metric_name)
|
| plt.grid(True)
|
|
|
| plt.tight_layout()
|
| plt.show()
|
|
|
|
|
| def plot_roc_auc(model, X_test, y_test, target):
|
| y_prob = model.predict_proba(X_test)[:, 1]
|
| fpr, tpr, thresholds = roc_curve(y_test, y_prob)
|
| roc_auc = auc(fpr, tpr)
|
|
|
| plt.figure(figsize=(8, 6))
|
| plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
|
| plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
|
| plt.xlabel('False Positive Rate')
|
| plt.ylabel('True Positive Rate')
|
| plt.title(f'ROC-AUC Curve for {target}')
|
| plt.legend(loc="lower right")
|
| plt.grid(True)
|
| plt.show()
|
|
|
|
|
| def save_model(model, target, selected_features):
|
| if not os.path.exists("models"):
|
| os.makedirs("models")
|
| file_name = f"models/{target}_random_forest_model.pkl"
|
| joblib.dump({'model': model, 'features': selected_features}, file_name)
|
| print(f"Model and features saved to {file_name}")
|
|
|
|
|
| def main():
|
| file_path = 'FibroPredCODIFICADA.xlsx'
|
| df = load_data(file_path)
|
|
|
|
|
| target_columns = ['Death', 'Binary diagnosis', 'Necessity of transplantation', 'Progressive disease']
|
|
|
|
|
| predictors_to_remove_dict = {
|
| 'Death': ['Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER','FVC (L) 1 year after diagnosis',
|
| 'FVC (%) 1 year after diagnosis','DLCO (%) 1 year after diagnosis'],
|
| 'Binary diagnosis': ['ProgressiveDisease', 'Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER','Pirfenidone','Nintedanib',
|
| 'Antifibrotic Drug','Prednisone','Mycophenolate','FVC (L) 1 year after diagnosis','FVC (%) 1 year after diagnosis',
|
| 'DLCO (%) 1 year after diagnosis','RadioWorsening2y'],
|
| 'Necessity of transplantation': ['ProgressiveDisease', 'Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER','Age at diagnosis'],
|
| 'Progressive disease': ['ProgressiveDisease', 'Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER', 'FVC (L) 1 year after diagnosis',
|
| 'FVC (%) 1 year after diagnosis','DLCO (%) 1 year after diagnosis','RadioWorsening2y']
|
| }
|
|
|
|
|
| df, numeric_cols, categorical_cols = preprocess_data_with_categoricals(df)
|
|
|
| for target in target_columns:
|
| print(f"Processing target: {target}")
|
|
|
| if target in ['Necessity of transplantation', 'Progressive disease']:
|
| print(f"Removing outliers for target: {target}")
|
| df = remove_outliers(df, numeric_cols)
|
|
|
|
|
| predictors_to_remove = predictors_to_remove_dict.get(target, [])
|
|
|
| X = df[numeric_cols].drop(columns=target_columns + predictors_to_remove, errors='ignore')
|
| y = df[target]
|
|
|
|
|
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
|
|
|
|
| if target in ['Binary diagnosis', 'Necessity of transplantation']:
|
| print(f"Applying SMOTE to balance the training set for target: {target}")
|
| smote = SMOTE(random_state=42)
|
| X_train, y_train = smote.fit_resample(X_train, y_train)
|
|
|
| X_train_selected, selected_features = select_important_features(X_train, y_train)
|
| X_test_selected = X_test[selected_features]
|
|
|
| print(f"Selected predictors for training {target} ({len(selected_features)} predictors): {selected_features.tolist()}")
|
|
|
|
|
| model = RandomForestClassifier(n_estimators=300,
|
| max_depth=4,
|
| min_samples_split=10,
|
| min_samples_leaf=10,
|
| class_weight='balanced',
|
| max_features='sqrt',
|
| random_state=42)
|
| model.fit(X_train_selected, y_train)
|
|
|
|
|
| cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
|
| cv_scores = cross_val_score(model, X_train_selected, y_train, cv=cv, scoring='accuracy')
|
| train_scores = cross_val_score(model, X_train_selected, y_train, cv=10, scoring='accuracy')
|
| y_pred_test = model.predict(X_test_selected)
|
| test_score = accuracy_score(y_test, y_pred_test)
|
|
|
| print(f"Cross-validation accuracy for {target}: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")
|
| print(f"Test accuracy for {target}: {test_score:.4f}")
|
| print(classification_report(y_test, y_pred_test))
|
|
|
|
|
| plot_model_performance(cv_scores, train_scores, test_score, target, metric_name="Accuracy")
|
|
|
|
|
| print(f"Feature importance for {target}:")
|
| plot_feature_importance(model, selected_features, target)
|
|
|
|
|
| plot_roc_auc(model, X_test_selected, y_test, target)
|
|
|
|
|
| save_model(model, target, selected_features.tolist())
|
|
|
| print("Pipeline completed.")
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|