| """ |
| Module 3: Model Training |
| Train all models: LR, RF, XGBoost, LightGBM, MLP, Autoencoder, Voting Ensemble. |
| Hyperparameter tuning with Optuna for top 3 models. |
| """ |
| import os |
| import sys |
| import numpy as np |
| import pandas as pd |
| import joblib |
| import optuna |
| import warnings |
| warnings.filterwarnings('ignore') |
| optuna.logging.set_verbosity(optuna.logging.WARNING) |
|
|
| from sklearn.linear_model import LogisticRegression |
| from sklearn.ensemble import RandomForestClassifier, VotingClassifier |
| from sklearn.neural_network import MLPClassifier |
| from sklearn.metrics import f1_score, roc_auc_score, average_precision_score |
| import xgboost as xgb |
| import lightgbm as lgb |
|
|
| from config import DATA_DIR, MODELS_DIR, SEED |
|
|
|
|
| def load_processed_data(): |
| """Load preprocessed data.""" |
| data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib")) |
| print(f"Loaded processed data:") |
| print(f" Train: {data['X_train'].shape}, SMOTE: {data['X_train_smote'].shape}") |
| print(f" Val: {data['X_val'].shape}") |
| print(f" Test: {data['X_test'].shape}") |
| return data |
|
|
|
|
| def train_logistic_regression(X_train, y_train, X_val, y_val, class_weights): |
| """Train Logistic Regression baseline.""" |
| print("\n" + "-" * 50) |
| print("Training: Logistic Regression (Baseline)") |
| print("-" * 50) |
| |
| model = LogisticRegression( |
| class_weight=class_weights, |
| max_iter=1000, |
| random_state=SEED, |
| C=0.1, |
| penalty='l2', |
| solver='lbfgs' |
| ) |
| model.fit(X_train, y_train) |
| |
| val_pred = model.predict_proba(X_val)[:, 1] |
| val_auc = roc_auc_score(y_val, val_pred) |
| val_pr_auc = average_precision_score(y_val, val_pred) |
| print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| return model |
|
|
|
|
| def train_random_forest(X_train, y_train, X_val, y_val, class_weights): |
| """Train Random Forest.""" |
| print("\n" + "-" * 50) |
| print("Training: Random Forest") |
| print("-" * 50) |
| |
| model = RandomForestClassifier( |
| n_estimators=200, |
| max_depth=15, |
| min_samples_split=5, |
| min_samples_leaf=2, |
| class_weight=class_weights, |
| random_state=SEED, |
| n_jobs=-1 |
| ) |
| model.fit(X_train, y_train) |
| |
| val_pred = model.predict_proba(X_val)[:, 1] |
| val_auc = roc_auc_score(y_val, val_pred) |
| val_pr_auc = average_precision_score(y_val, val_pred) |
| print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| return model |
|
|
|
|
| def train_xgboost(X_train, y_train, X_val, y_val, class_weights): |
| """Train XGBoost.""" |
| print("\n" + "-" * 50) |
| print("Training: XGBoost") |
| print("-" * 50) |
| |
| scale_pos_weight = class_weights[1] / class_weights[0] |
| |
| model = xgb.XGBClassifier( |
| n_estimators=200, |
| max_depth=6, |
| learning_rate=0.1, |
| scale_pos_weight=scale_pos_weight, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| reg_alpha=0.1, |
| reg_lambda=1.0, |
| random_state=SEED, |
| eval_metric='aucpr', |
| n_jobs=-1, |
| tree_method='hist' |
| ) |
| model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False) |
| |
| val_pred = model.predict_proba(X_val)[:, 1] |
| val_auc = roc_auc_score(y_val, val_pred) |
| val_pr_auc = average_precision_score(y_val, val_pred) |
| print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| return model |
|
|
|
|
| def train_lightgbm(X_train, y_train, X_val, y_val, class_weights): |
| """Train LightGBM.""" |
| print("\n" + "-" * 50) |
| print("Training: LightGBM") |
| print("-" * 50) |
| |
| scale_pos_weight = class_weights[1] / class_weights[0] |
| |
| model = lgb.LGBMClassifier( |
| n_estimators=200, |
| max_depth=8, |
| learning_rate=0.05, |
| scale_pos_weight=scale_pos_weight, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| reg_alpha=0.1, |
| reg_lambda=1.0, |
| random_state=SEED, |
| n_jobs=-1, |
| verbose=-1 |
| ) |
| model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) |
| |
| val_pred = model.predict_proba(X_val)[:, 1] |
| val_auc = roc_auc_score(y_val, val_pred) |
| val_pr_auc = average_precision_score(y_val, val_pred) |
| print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| return model |
|
|
|
|
| def train_mlp(X_train, y_train, X_val, y_val): |
| """Train MLP Neural Network.""" |
| print("\n" + "-" * 50) |
| print("Training: MLP Neural Network") |
| print("-" * 50) |
| |
| model = MLPClassifier( |
| hidden_layer_sizes=(128, 64, 32), |
| activation='relu', |
| solver='adam', |
| alpha=0.001, |
| batch_size=256, |
| learning_rate='adaptive', |
| learning_rate_init=0.001, |
| max_iter=200, |
| random_state=SEED, |
| early_stopping=True, |
| validation_fraction=0.1, |
| n_iter_no_change=10 |
| ) |
| model.fit(X_train, y_train) |
| |
| val_pred = model.predict_proba(X_val)[:, 1] |
| val_auc = roc_auc_score(y_val, val_pred) |
| val_pr_auc = average_precision_score(y_val, val_pred) |
| print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| return model |
|
|
|
|
| def train_autoencoder(X_train, X_val, y_val): |
| """Train Autoencoder for anomaly detection (train on legitimate only).""" |
| print("\n" + "-" * 50) |
| print("Training: Autoencoder (Anomaly Detection)") |
| print("-" * 50) |
| |
| import torch |
| import torch.nn as nn |
| from torch.utils.data import DataLoader, TensorDataset |
| |
| |
| X_train_np = X_train.values if isinstance(X_train, pd.DataFrame) else X_train |
| |
| input_dim = X_train_np.shape[1] |
| |
| class Autoencoder(nn.Module): |
| def __init__(self, input_dim): |
| super().__init__() |
| self.encoder = nn.Sequential( |
| nn.Linear(input_dim, 64), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(64, 32), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(32, 16), |
| nn.ReLU(), |
| ) |
| self.decoder = nn.Sequential( |
| nn.Linear(16, 32), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(32, 64), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(64, input_dim), |
| ) |
| |
| def forward(self, x): |
| encoded = self.encoder(x) |
| decoded = self.decoder(encoded) |
| return decoded |
| |
| model = Autoencoder(input_dim) |
| criterion = nn.MSELoss() |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) |
| |
| |
| train_tensor = torch.FloatTensor(X_train_np) |
| train_dataset = TensorDataset(train_tensor, train_tensor) |
| train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True) |
| |
| |
| model.train() |
| for epoch in range(50): |
| epoch_loss = 0 |
| for batch_x, _ in train_loader: |
| optimizer.zero_grad() |
| output = model(batch_x) |
| loss = criterion(output, batch_x) |
| loss.backward() |
| optimizer.step() |
| epoch_loss += loss.item() |
| if (epoch + 1) % 10 == 0: |
| print(f" Epoch {epoch+1}/50, Loss: {epoch_loss/len(train_loader):.6f}") |
| |
| |
| model.eval() |
| X_val_np = X_val.values if isinstance(X_val, pd.DataFrame) else X_val |
| with torch.no_grad(): |
| val_tensor = torch.FloatTensor(X_val_np) |
| val_output = model(val_tensor) |
| reconstruction_error = torch.mean((val_output - val_tensor) ** 2, dim=1).numpy() |
| |
| |
| val_auc = roc_auc_score(y_val, reconstruction_error) |
| val_pr_auc = average_precision_score(y_val, reconstruction_error) |
| print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| |
| ae_info = { |
| 'model': model, |
| 'input_dim': input_dim, |
| 'type': 'autoencoder' |
| } |
| |
| return ae_info |
|
|
|
|
| class AutoencoderWrapper: |
| """Wrapper to make autoencoder compatible with sklearn interface.""" |
| def __init__(self, ae_info): |
| self.model = ae_info['model'] |
| self.input_dim = ae_info['input_dim'] |
| self.classes_ = np.array([0, 1]) |
| |
| def predict_proba(self, X): |
| import torch |
| self.model.eval() |
| X_np = X.values if isinstance(X, pd.DataFrame) else X |
| with torch.no_grad(): |
| X_tensor = torch.FloatTensor(X_np) |
| output = self.model(X_tensor) |
| reconstruction_error = torch.mean((output - X_tensor) ** 2, dim=1).numpy() |
| |
| |
| |
| scores = 1 / (1 + np.exp(-10 * (reconstruction_error - np.median(reconstruction_error)))) |
| proba = np.column_stack([1 - scores, scores]) |
| return proba |
| |
| def predict(self, X, threshold=0.5): |
| proba = self.predict_proba(X) |
| return (proba[:, 1] >= threshold).astype(int) |
|
|
|
|
| def optuna_tune_xgboost(X_train, y_train, X_val, y_val, class_weights, n_trials=50): |
| """Tune XGBoost with Optuna.""" |
| print("\n" + "-" * 50) |
| print("Optuna Tuning: XGBoost") |
| print("-" * 50) |
| |
| scale_pos_weight = class_weights[1] / class_weights[0] |
| |
| def objective(trial): |
| params = { |
| 'n_estimators': trial.suggest_int('n_estimators', 100, 300), |
| 'max_depth': trial.suggest_int('max_depth', 3, 10), |
| 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), |
| 'subsample': trial.suggest_float('subsample', 0.6, 1.0), |
| 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), |
| 'reg_alpha': trial.suggest_float('reg_alpha', 1e-4, 10.0, log=True), |
| 'reg_lambda': trial.suggest_float('reg_lambda', 1e-4, 10.0, log=True), |
| 'min_child_weight': trial.suggest_int('min_child_weight', 1, 10), |
| 'scale_pos_weight': scale_pos_weight, |
| 'random_state': SEED, |
| 'eval_metric': 'aucpr', |
| 'n_jobs': -1, |
| 'tree_method': 'hist' |
| } |
| |
| model = xgb.XGBClassifier(**params) |
| model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False) |
| val_pred = model.predict_proba(X_val)[:, 1] |
| return average_precision_score(y_val, val_pred) |
| |
| study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED)) |
| study.optimize(objective, n_trials=n_trials, show_progress_bar=False) |
| |
| print(f" Best PR-AUC: {study.best_value:.4f}") |
| print(f" Best params: {study.best_params}") |
| |
| |
| best_params = study.best_params |
| best_params['scale_pos_weight'] = scale_pos_weight |
| best_params['random_state'] = SEED |
| best_params['eval_metric'] = 'aucpr' |
| best_params['n_jobs'] = -1 |
| best_params['tree_method'] = 'hist' |
| |
| best_model = xgb.XGBClassifier(**best_params) |
| best_model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False) |
| |
| return best_model, study.best_params |
|
|
|
|
| def optuna_tune_lightgbm(X_train, y_train, X_val, y_val, class_weights, n_trials=50): |
| """Tune LightGBM with Optuna.""" |
| print("\n" + "-" * 50) |
| print("Optuna Tuning: LightGBM") |
| print("-" * 50) |
| |
| scale_pos_weight = class_weights[1] / class_weights[0] |
| |
| def objective(trial): |
| params = { |
| 'n_estimators': trial.suggest_int('n_estimators', 100, 300), |
| 'max_depth': trial.suggest_int('max_depth', 3, 12), |
| 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), |
| 'subsample': trial.suggest_float('subsample', 0.6, 1.0), |
| 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), |
| 'reg_alpha': trial.suggest_float('reg_alpha', 1e-4, 10.0, log=True), |
| 'reg_lambda': trial.suggest_float('reg_lambda', 1e-4, 10.0, log=True), |
| 'min_child_samples': trial.suggest_int('min_child_samples', 5, 50), |
| 'num_leaves': trial.suggest_int('num_leaves', 15, 127), |
| 'scale_pos_weight': scale_pos_weight, |
| 'random_state': SEED, |
| 'n_jobs': -1, |
| 'verbose': -1 |
| } |
| |
| model = lgb.LGBMClassifier(**params) |
| model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) |
| val_pred = model.predict_proba(X_val)[:, 1] |
| return average_precision_score(y_val, val_pred) |
| |
| study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED)) |
| study.optimize(objective, n_trials=n_trials, show_progress_bar=False) |
| |
| print(f" Best PR-AUC: {study.best_value:.4f}") |
| print(f" Best params: {study.best_params}") |
| |
| |
| best_params = study.best_params |
| best_params['scale_pos_weight'] = scale_pos_weight |
| best_params['random_state'] = SEED |
| best_params['n_jobs'] = -1 |
| best_params['verbose'] = -1 |
| |
| best_model = lgb.LGBMClassifier(**best_params) |
| best_model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) |
| |
| return best_model, study.best_params |
|
|
|
|
| def optuna_tune_random_forest(X_train, y_train, X_val, y_val, class_weights, n_trials=30): |
| """Tune Random Forest with Optuna.""" |
| print("\n" + "-" * 50) |
| print("Optuna Tuning: Random Forest") |
| print("-" * 50) |
| |
| def objective(trial): |
| params = { |
| 'n_estimators': trial.suggest_int('n_estimators', 100, 300), |
| 'max_depth': trial.suggest_int('max_depth', 5, 20), |
| 'min_samples_split': trial.suggest_int('min_samples_split', 2, 20), |
| 'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10), |
| 'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]), |
| 'class_weight': class_weights, |
| 'random_state': SEED, |
| 'n_jobs': -1 |
| } |
| |
| model = RandomForestClassifier(**params) |
| model.fit(X_train, y_train) |
| val_pred = model.predict_proba(X_val)[:, 1] |
| return average_precision_score(y_val, val_pred) |
| |
| study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED)) |
| study.optimize(objective, n_trials=n_trials, show_progress_bar=False) |
| |
| print(f" Best PR-AUC: {study.best_value:.4f}") |
| print(f" Best params: {study.best_params}") |
| |
| best_params = study.best_params |
| best_params['class_weight'] = class_weights |
| best_params['random_state'] = SEED |
| best_params['n_jobs'] = -1 |
| |
| best_model = RandomForestClassifier(**best_params) |
| best_model.fit(X_train, y_train) |
| |
| return best_model, study.best_params |
|
|
|
|
| def create_voting_ensemble(models_dict): |
| """Create a voting ensemble from the best 3 models.""" |
| print("\n" + "-" * 50) |
| print("Creating: Voting Ensemble (Top 3 Models)") |
| print("-" * 50) |
| |
| |
| eligible = {k: v for k, v in models_dict.items() if k != 'Autoencoder'} |
| |
| |
| ensemble_models = [] |
| for name in ['XGBoost_Tuned', 'LightGBM_Tuned', 'Random_Forest_Tuned']: |
| if name in eligible: |
| clean_name = name.replace(' ', '_') |
| ensemble_models.append((clean_name, eligible[name])) |
| |
| if len(ensemble_models) < 3: |
| |
| for name in ['XGBoost', 'LightGBM', 'Random_Forest']: |
| if name in eligible and len(ensemble_models) < 3: |
| clean_name = name.replace(' ', '_') |
| if not any(n == clean_name for n, _ in ensemble_models): |
| ensemble_models.append((clean_name, eligible[name])) |
| |
| print(f" Ensemble members: {[n for n, _ in ensemble_models]}") |
| |
| voting_clf = VotingClassifier( |
| estimators=ensemble_models, |
| voting='soft' |
| ) |
| |
| return voting_clf, ensemble_models |
|
|
|
|
| def run_training(): |
| """Run the complete training pipeline.""" |
| print("=" * 60) |
| print("FRAUD DETECTION SYSTEM - MODEL TRAINING") |
| print("=" * 60) |
| |
| |
| data = load_processed_data() |
| X_train = data['X_train'] |
| X_val = data['X_val'] |
| X_test = data['X_test'] |
| y_train = data['y_train'] |
| y_val = data['y_val'] |
| y_test = data['y_test'] |
| X_train_smote = data['X_train_smote'] |
| y_train_smote = data['y_train_smote'] |
| class_weights = data['class_weights'] |
| |
| models = {} |
| |
| |
| |
| |
| models['Logistic_Regression'] = train_logistic_regression( |
| X_train, y_train, X_val, y_val, class_weights |
| ) |
| |
| |
| |
| |
| models['Random_Forest'] = train_random_forest( |
| X_train, y_train, X_val, y_val, class_weights |
| ) |
| |
| |
| |
| |
| models['XGBoost'] = train_xgboost( |
| X_train, y_train, X_val, y_val, class_weights |
| ) |
| |
| |
| |
| |
| models['LightGBM'] = train_lightgbm( |
| X_train, y_train, X_val, y_val, class_weights |
| ) |
| |
| |
| |
| |
| models['MLP'] = train_mlp( |
| X_train_smote, y_train_smote, X_val, y_val |
| ) |
| |
| |
| |
| |
| |
| X_train_legit = X_train[y_train == 0] |
| ae_info = train_autoencoder(X_train_legit, X_val, y_val) |
| models['Autoencoder'] = AutoencoderWrapper(ae_info) |
| |
| |
| |
| |
| print("\n" + "=" * 60) |
| print("HYPERPARAMETER TUNING WITH OPTUNA") |
| print("=" * 60) |
| |
| models['XGBoost_Tuned'], xgb_params = optuna_tune_xgboost( |
| X_train, y_train, X_val, y_val, class_weights, n_trials=20 |
| ) |
| |
| models['LightGBM_Tuned'], lgbm_params = optuna_tune_lightgbm( |
| X_train, y_train, X_val, y_val, class_weights, n_trials=20 |
| ) |
| |
| models['Random_Forest_Tuned'], rf_params = optuna_tune_random_forest( |
| X_train, y_train, X_val, y_val, class_weights, n_trials=15 |
| ) |
| |
| |
| |
| |
| voting_clf, ensemble_members = create_voting_ensemble(models) |
| |
| voting_clf.fit(X_train, y_train) |
| models['Voting_Ensemble'] = voting_clf |
| |
| val_pred = voting_clf.predict_proba(X_val)[:, 1] |
| val_auc = roc_auc_score(y_val, val_pred) |
| val_pr_auc = average_precision_score(y_val, val_pred) |
| print(f" Voting Ensemble Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}") |
| |
| |
| models_path = os.path.join(MODELS_DIR, "all_models.joblib") |
| |
| save_models = {k: v for k, v in models.items() if k != 'Autoencoder'} |
| joblib.dump(save_models, models_path) |
| |
| |
| import torch |
| ae_path = os.path.join(MODELS_DIR, "autoencoder.pt") |
| torch.save(ae_info['model'].state_dict(), ae_path) |
| |
| |
| all_models_path = os.path.join(MODELS_DIR, "all_models_with_ae.joblib") |
| joblib.dump(models, all_models_path) |
| |
| tuning_results = { |
| 'xgboost': xgb_params, |
| 'lightgbm': lgbm_params, |
| 'random_forest': rf_params |
| } |
| joblib.dump(tuning_results, os.path.join(MODELS_DIR, "tuning_results.joblib")) |
| |
| print("\n" + "=" * 60) |
| print("TRAINING COMPLETE - All models saved") |
| print("=" * 60) |
| |
| return models, tuning_results |
|
|
|
|
| if __name__ == "__main__": |
| models, tuning_results = run_training() |
|
|