fraud-detection-system / training.py
rajvivan's picture
Complete fraud detection system: code, figures, models, paper
408a9b2 verified
"""
Module 3: Model Training
Train all models: LR, RF, XGBoost, LightGBM, MLP, Autoencoder, Voting Ensemble.
Hyperparameter tuning with Optuna for top 3 models.
"""
import os
import sys
import numpy as np
import pandas as pd
import joblib
import optuna
import warnings
warnings.filterwarnings('ignore')
optuna.logging.set_verbosity(optuna.logging.WARNING)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import f1_score, roc_auc_score, average_precision_score
import xgboost as xgb
import lightgbm as lgb
from config import DATA_DIR, MODELS_DIR, SEED
def load_processed_data():
"""Load preprocessed data."""
data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib"))
print(f"Loaded processed data:")
print(f" Train: {data['X_train'].shape}, SMOTE: {data['X_train_smote'].shape}")
print(f" Val: {data['X_val'].shape}")
print(f" Test: {data['X_test'].shape}")
return data
def train_logistic_regression(X_train, y_train, X_val, y_val, class_weights):
"""Train Logistic Regression baseline."""
print("\n" + "-" * 50)
print("Training: Logistic Regression (Baseline)")
print("-" * 50)
model = LogisticRegression(
class_weight=class_weights,
max_iter=1000,
random_state=SEED,
C=0.1,
penalty='l2',
solver='lbfgs'
)
model.fit(X_train, y_train)
val_pred = model.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_pred)
val_pr_auc = average_precision_score(y_val, val_pred)
print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
return model
def train_random_forest(X_train, y_train, X_val, y_val, class_weights):
"""Train Random Forest."""
print("\n" + "-" * 50)
print("Training: Random Forest")
print("-" * 50)
model = RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
class_weight=class_weights,
random_state=SEED,
n_jobs=-1
)
model.fit(X_train, y_train)
val_pred = model.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_pred)
val_pr_auc = average_precision_score(y_val, val_pred)
print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
return model
def train_xgboost(X_train, y_train, X_val, y_val, class_weights):
"""Train XGBoost."""
print("\n" + "-" * 50)
print("Training: XGBoost")
print("-" * 50)
scale_pos_weight = class_weights[1] / class_weights[0]
model = xgb.XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.1,
scale_pos_weight=scale_pos_weight,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=1.0,
random_state=SEED,
eval_metric='aucpr',
n_jobs=-1,
tree_method='hist'
)
model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
val_pred = model.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_pred)
val_pr_auc = average_precision_score(y_val, val_pred)
print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
return model
def train_lightgbm(X_train, y_train, X_val, y_val, class_weights):
"""Train LightGBM."""
print("\n" + "-" * 50)
print("Training: LightGBM")
print("-" * 50)
scale_pos_weight = class_weights[1] / class_weights[0]
model = lgb.LGBMClassifier(
n_estimators=200,
max_depth=8,
learning_rate=0.05,
scale_pos_weight=scale_pos_weight,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=1.0,
random_state=SEED,
n_jobs=-1,
verbose=-1
)
model.fit(X_train, y_train, eval_set=[(X_val, y_val)])
val_pred = model.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_pred)
val_pr_auc = average_precision_score(y_val, val_pred)
print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
return model
def train_mlp(X_train, y_train, X_val, y_val):
"""Train MLP Neural Network."""
print("\n" + "-" * 50)
print("Training: MLP Neural Network")
print("-" * 50)
model = MLPClassifier(
hidden_layer_sizes=(128, 64, 32),
activation='relu',
solver='adam',
alpha=0.001,
batch_size=256,
learning_rate='adaptive',
learning_rate_init=0.001,
max_iter=200,
random_state=SEED,
early_stopping=True,
validation_fraction=0.1,
n_iter_no_change=10
)
model.fit(X_train, y_train)
val_pred = model.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_pred)
val_pr_auc = average_precision_score(y_val, val_pred)
print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
return model
def train_autoencoder(X_train, X_val, y_val):
"""Train Autoencoder for anomaly detection (train on legitimate only)."""
print("\n" + "-" * 50)
print("Training: Autoencoder (Anomaly Detection)")
print("-" * 50)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
# Train on legitimate transactions only
X_train_np = X_train.values if isinstance(X_train, pd.DataFrame) else X_train
input_dim = X_train_np.shape[1]
class Autoencoder(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(32, 16),
nn.ReLU(),
)
self.decoder = nn.Sequential(
nn.Linear(16, 32),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(32, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, input_dim),
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
model = Autoencoder(input_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
# DataLoader
train_tensor = torch.FloatTensor(X_train_np)
train_dataset = TensorDataset(train_tensor, train_tensor)
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
# Train
model.train()
for epoch in range(50):
epoch_loss = 0
for batch_x, _ in train_loader:
optimizer.zero_grad()
output = model(batch_x)
loss = criterion(output, batch_x)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
if (epoch + 1) % 10 == 0:
print(f" Epoch {epoch+1}/50, Loss: {epoch_loss/len(train_loader):.6f}")
# Compute reconstruction error on validation set
model.eval()
X_val_np = X_val.values if isinstance(X_val, pd.DataFrame) else X_val
with torch.no_grad():
val_tensor = torch.FloatTensor(X_val_np)
val_output = model(val_tensor)
reconstruction_error = torch.mean((val_output - val_tensor) ** 2, dim=1).numpy()
# Use reconstruction error as anomaly score
val_auc = roc_auc_score(y_val, reconstruction_error)
val_pr_auc = average_precision_score(y_val, reconstruction_error)
print(f" Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
# Save model info
ae_info = {
'model': model,
'input_dim': input_dim,
'type': 'autoencoder'
}
return ae_info
class AutoencoderWrapper:
"""Wrapper to make autoencoder compatible with sklearn interface."""
def __init__(self, ae_info):
self.model = ae_info['model']
self.input_dim = ae_info['input_dim']
self.classes_ = np.array([0, 1])
def predict_proba(self, X):
import torch
self.model.eval()
X_np = X.values if isinstance(X, pd.DataFrame) else X
with torch.no_grad():
X_tensor = torch.FloatTensor(X_np)
output = self.model(X_tensor)
reconstruction_error = torch.mean((output - X_tensor) ** 2, dim=1).numpy()
# Normalize reconstruction error to [0, 1]
# Use sigmoid-like mapping
scores = 1 / (1 + np.exp(-10 * (reconstruction_error - np.median(reconstruction_error))))
proba = np.column_stack([1 - scores, scores])
return proba
def predict(self, X, threshold=0.5):
proba = self.predict_proba(X)
return (proba[:, 1] >= threshold).astype(int)
def optuna_tune_xgboost(X_train, y_train, X_val, y_val, class_weights, n_trials=50):
"""Tune XGBoost with Optuna."""
print("\n" + "-" * 50)
print("Optuna Tuning: XGBoost")
print("-" * 50)
scale_pos_weight = class_weights[1] / class_weights[0]
def objective(trial):
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-4, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-4, 10.0, log=True),
'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
'scale_pos_weight': scale_pos_weight,
'random_state': SEED,
'eval_metric': 'aucpr',
'n_jobs': -1,
'tree_method': 'hist'
}
model = xgb.XGBClassifier(**params)
model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
val_pred = model.predict_proba(X_val)[:, 1]
return average_precision_score(y_val, val_pred)
study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED))
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
print(f" Best PR-AUC: {study.best_value:.4f}")
print(f" Best params: {study.best_params}")
# Train with best params
best_params = study.best_params
best_params['scale_pos_weight'] = scale_pos_weight
best_params['random_state'] = SEED
best_params['eval_metric'] = 'aucpr'
best_params['n_jobs'] = -1
best_params['tree_method'] = 'hist'
best_model = xgb.XGBClassifier(**best_params)
best_model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
return best_model, study.best_params
def optuna_tune_lightgbm(X_train, y_train, X_val, y_val, class_weights, n_trials=50):
"""Tune LightGBM with Optuna."""
print("\n" + "-" * 50)
print("Optuna Tuning: LightGBM")
print("-" * 50)
scale_pos_weight = class_weights[1] / class_weights[0]
def objective(trial):
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-4, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-4, 10.0, log=True),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 50),
'num_leaves': trial.suggest_int('num_leaves', 15, 127),
'scale_pos_weight': scale_pos_weight,
'random_state': SEED,
'n_jobs': -1,
'verbose': -1
}
model = lgb.LGBMClassifier(**params)
model.fit(X_train, y_train, eval_set=[(X_val, y_val)])
val_pred = model.predict_proba(X_val)[:, 1]
return average_precision_score(y_val, val_pred)
study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED))
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
print(f" Best PR-AUC: {study.best_value:.4f}")
print(f" Best params: {study.best_params}")
# Train with best params
best_params = study.best_params
best_params['scale_pos_weight'] = scale_pos_weight
best_params['random_state'] = SEED
best_params['n_jobs'] = -1
best_params['verbose'] = -1
best_model = lgb.LGBMClassifier(**best_params)
best_model.fit(X_train, y_train, eval_set=[(X_val, y_val)])
return best_model, study.best_params
def optuna_tune_random_forest(X_train, y_train, X_val, y_val, class_weights, n_trials=30):
"""Tune Random Forest with Optuna."""
print("\n" + "-" * 50)
print("Optuna Tuning: Random Forest")
print("-" * 50)
def objective(trial):
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 300),
'max_depth': trial.suggest_int('max_depth', 5, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]),
'class_weight': class_weights,
'random_state': SEED,
'n_jobs': -1
}
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
val_pred = model.predict_proba(X_val)[:, 1]
return average_precision_score(y_val, val_pred)
study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=SEED))
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
print(f" Best PR-AUC: {study.best_value:.4f}")
print(f" Best params: {study.best_params}")
best_params = study.best_params
best_params['class_weight'] = class_weights
best_params['random_state'] = SEED
best_params['n_jobs'] = -1
best_model = RandomForestClassifier(**best_params)
best_model.fit(X_train, y_train)
return best_model, study.best_params
def create_voting_ensemble(models_dict):
"""Create a voting ensemble from the best 3 models."""
print("\n" + "-" * 50)
print("Creating: Voting Ensemble (Top 3 Models)")
print("-" * 50)
# Select top 3 by validation PR-AUC (exclude autoencoder - different interface)
eligible = {k: v for k, v in models_dict.items() if k != 'Autoencoder'}
# We'll use the tuned versions when available
ensemble_models = []
for name in ['XGBoost_Tuned', 'LightGBM_Tuned', 'Random_Forest_Tuned']:
if name in eligible:
clean_name = name.replace(' ', '_')
ensemble_models.append((clean_name, eligible[name]))
if len(ensemble_models) < 3:
# Fallback to untuned
for name in ['XGBoost', 'LightGBM', 'Random_Forest']:
if name in eligible and len(ensemble_models) < 3:
clean_name = name.replace(' ', '_')
if not any(n == clean_name for n, _ in ensemble_models):
ensemble_models.append((clean_name, eligible[name]))
print(f" Ensemble members: {[n for n, _ in ensemble_models]}")
voting_clf = VotingClassifier(
estimators=ensemble_models,
voting='soft'
)
return voting_clf, ensemble_models
def run_training():
"""Run the complete training pipeline."""
print("=" * 60)
print("FRAUD DETECTION SYSTEM - MODEL TRAINING")
print("=" * 60)
# Load data
data = load_processed_data()
X_train = data['X_train']
X_val = data['X_val']
X_test = data['X_test']
y_train = data['y_train']
y_val = data['y_val']
y_test = data['y_test']
X_train_smote = data['X_train_smote']
y_train_smote = data['y_train_smote']
class_weights = data['class_weights']
models = {}
# =========================================
# 1. Logistic Regression (Baseline)
# =========================================
models['Logistic_Regression'] = train_logistic_regression(
X_train, y_train, X_val, y_val, class_weights
)
# =========================================
# 2. Random Forest
# =========================================
models['Random_Forest'] = train_random_forest(
X_train, y_train, X_val, y_val, class_weights
)
# =========================================
# 3. XGBoost
# =========================================
models['XGBoost'] = train_xgboost(
X_train, y_train, X_val, y_val, class_weights
)
# =========================================
# 4. LightGBM
# =========================================
models['LightGBM'] = train_lightgbm(
X_train, y_train, X_val, y_val, class_weights
)
# =========================================
# 5. MLP Neural Network (uses SMOTE data)
# =========================================
models['MLP'] = train_mlp(
X_train_smote, y_train_smote, X_val, y_val
)
# =========================================
# 6. Autoencoder (anomaly detection)
# =========================================
# Train only on legitimate transactions
X_train_legit = X_train[y_train == 0]
ae_info = train_autoencoder(X_train_legit, X_val, y_val)
models['Autoencoder'] = AutoencoderWrapper(ae_info)
# =========================================
# 7. Optuna Tuning of Top 3
# =========================================
print("\n" + "=" * 60)
print("HYPERPARAMETER TUNING WITH OPTUNA")
print("=" * 60)
models['XGBoost_Tuned'], xgb_params = optuna_tune_xgboost(
X_train, y_train, X_val, y_val, class_weights, n_trials=20
)
models['LightGBM_Tuned'], lgbm_params = optuna_tune_lightgbm(
X_train, y_train, X_val, y_val, class_weights, n_trials=20
)
models['Random_Forest_Tuned'], rf_params = optuna_tune_random_forest(
X_train, y_train, X_val, y_val, class_weights, n_trials=15
)
# =========================================
# 8. Voting Ensemble
# =========================================
voting_clf, ensemble_members = create_voting_ensemble(models)
# Fit the voting ensemble
voting_clf.fit(X_train, y_train)
models['Voting_Ensemble'] = voting_clf
val_pred = voting_clf.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_pred)
val_pr_auc = average_precision_score(y_val, val_pred)
print(f" Voting Ensemble Val ROC-AUC: {val_auc:.4f}, PR-AUC: {val_pr_auc:.4f}")
# Save all models
models_path = os.path.join(MODELS_DIR, "all_models.joblib")
# Save non-autoencoder models with joblib, save AE separately
save_models = {k: v for k, v in models.items() if k != 'Autoencoder'}
joblib.dump(save_models, models_path)
# Save autoencoder separately
import torch
ae_path = os.path.join(MODELS_DIR, "autoencoder.pt")
torch.save(ae_info['model'].state_dict(), ae_path)
# Save all models dict including autoencoder wrapper
all_models_path = os.path.join(MODELS_DIR, "all_models_with_ae.joblib")
joblib.dump(models, all_models_path)
tuning_results = {
'xgboost': xgb_params,
'lightgbm': lgbm_params,
'random_forest': rf_params
}
joblib.dump(tuning_results, os.path.join(MODELS_DIR, "tuning_results.joblib"))
print("\n" + "=" * 60)
print("TRAINING COMPLETE - All models saved")
print("=" * 60)
return models, tuning_results
if __name__ == "__main__":
models, tuning_results = run_training()