fraud-detection-system / evaluation.py
rajvivan's picture
Complete fraud detection system: code, figures, models, paper
408a9b2 verified
"""
Module 4: Model Evaluation
Comprehensive evaluation: metrics, confusion matrices, ROC/PR curves,
threshold analysis, business impact estimation.
"""
import os, sys
sys.path.insert(0, '/app/fraud_detection')
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import warnings
warnings.filterwarnings('ignore')
from ae_model import AutoencoderWrapper, Autoencoder
from sklearn.metrics import (
precision_score, recall_score, f1_score, roc_auc_score,
average_precision_score, matthews_corrcoef, confusion_matrix,
roc_curve, precision_recall_curve, classification_report
)
from config import DATA_DIR, MODELS_DIR, FIGURES_DIR, FIG_DPI, FIG_BG
plt.style.use('seaborn-v0_8-whitegrid')
def evaluate_model(model, X, y, model_name, threshold=0.5):
"""Evaluate a single model with all metrics."""
proba = model.predict_proba(X)[:, 1]
preds = (proba >= threshold).astype(int)
metrics = {
'Model': model_name,
'Precision': precision_score(y, preds, zero_division=0),
'Recall': recall_score(y, preds, zero_division=0),
'F1': f1_score(y, preds, zero_division=0),
'ROC-AUC': roc_auc_score(y, proba),
'PR-AUC': average_precision_score(y, proba),
'MCC': matthews_corrcoef(y, preds),
}
cm = confusion_matrix(y, preds)
return metrics, cm, proba, preds
def evaluate_all_models(models, X_test, y_test):
"""Evaluate all models on test set."""
print("=" * 60)
print("MODEL EVALUATION ON TEST SET")
print("=" * 60)
all_metrics = []
all_cm = {}
all_proba = {}
all_preds = {}
for name, model in models.items():
print(f"\nEvaluating: {name}")
metrics, cm, proba, preds = evaluate_model(model, X_test, y_test, name)
all_metrics.append(metrics)
all_cm[name] = cm
all_proba[name] = proba
all_preds[name] = preds
print(f" Precision: {metrics['Precision']:.4f}")
print(f" Recall: {metrics['Recall']:.4f}")
print(f" F1: {metrics['F1']:.4f}")
print(f" ROC-AUC: {metrics['ROC-AUC']:.4f}")
print(f" PR-AUC: {metrics['PR-AUC']:.4f}")
print(f" MCC: {metrics['MCC']:.4f}")
# Create comparison table
df_metrics = pd.DataFrame(all_metrics)
df_metrics = df_metrics.sort_values('PR-AUC', ascending=False)
print("\n" + "=" * 60)
print("MODEL COMPARISON TABLE")
print("=" * 60)
print(df_metrics.to_string(index=False, float_format='%.4f'))
# Save table
df_metrics.to_csv(os.path.join(FIGURES_DIR, "model_comparison.csv"), index=False)
return df_metrics, all_cm, all_proba, all_preds
def plot_confusion_matrices(all_cm, model_names):
"""Plot confusion matrix grid."""
n = len(model_names)
cols = 4
rows = (n + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=(5*cols, 4*rows), facecolor=FIG_BG)
if rows == 1:
axes = axes.reshape(1, -1)
for idx, name in enumerate(model_names):
r, c = idx // cols, idx % cols
cm = all_cm[name]
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[r, c],
xticklabels=['Legit', 'Fraud'], yticklabels=['Legit', 'Fraud'])
axes[r, c].set_title(name.replace('_', ' '), fontsize=10, fontweight='bold')
axes[r, c].set_ylabel('Actual')
axes[r, c].set_xlabel('Predicted')
# Hide empty subplots
for idx in range(n, rows*cols):
r, c = idx // cols, idx % cols
axes[r, c].set_visible(False)
plt.suptitle('Confusion Matrices (Test Set)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(FIGURES_DIR, "confusion_matrices.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG)
plt.savefig(os.path.join(FIGURES_DIR, "confusion_matrices.pdf"), bbox_inches='tight', facecolor=FIG_BG)
plt.close()
print("Saved: confusion_matrices.png/pdf")
def plot_roc_curves(all_proba, y_test):
"""Plot ROC curves for all models."""
fig, ax = plt.subplots(1, 1, figsize=(10, 8), facecolor=FIG_BG)
colors = plt.cm.tab20(np.linspace(0, 1, len(all_proba)))
for (name, proba), color in zip(all_proba.items(), colors):
fpr, tpr, _ = roc_curve(y_test, proba)
auc = roc_auc_score(y_test, proba)
ax.plot(fpr, tpr, color=color, linewidth=2, label=f'{name.replace("_", " ")} (AUC={auc:.4f})')
ax.plot([0, 1], [0, 1], 'k--', linewidth=1, label='Random')
ax.set_xlabel('False Positive Rate', fontsize=12)
ax.set_ylabel('True Positive Rate', fontsize=12)
ax.set_title('ROC Curves - All Models', fontsize=14, fontweight='bold')
ax.legend(loc='lower right', fontsize=9)
ax.set_xlim([0, 1])
ax.set_ylim([0, 1.02])
plt.tight_layout()
plt.savefig(os.path.join(FIGURES_DIR, "roc_curves.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG)
plt.savefig(os.path.join(FIGURES_DIR, "roc_curves.pdf"), bbox_inches='tight', facecolor=FIG_BG)
plt.close()
print("Saved: roc_curves.png/pdf")
def plot_pr_curves(all_proba, y_test):
"""Plot Precision-Recall curves for all models."""
fig, ax = plt.subplots(1, 1, figsize=(10, 8), facecolor=FIG_BG)
colors = plt.cm.tab20(np.linspace(0, 1, len(all_proba)))
for (name, proba), color in zip(all_proba.items(), colors):
precision, recall, _ = precision_recall_curve(y_test, proba)
pr_auc = average_precision_score(y_test, proba)
ax.plot(recall, precision, color=color, linewidth=2, label=f'{name.replace("_", " ")} (AP={pr_auc:.4f})')
baseline = y_test.mean()
ax.axhline(y=baseline, color='k', linestyle='--', linewidth=1, label=f'Baseline ({baseline:.4f})')
ax.set_xlabel('Recall', fontsize=12)
ax.set_ylabel('Precision', fontsize=12)
ax.set_title('Precision-Recall Curves - All Models', fontsize=14, fontweight='bold')
ax.legend(loc='upper right', fontsize=9)
ax.set_xlim([0, 1])
ax.set_ylim([0, 1.02])
plt.tight_layout()
plt.savefig(os.path.join(FIGURES_DIR, "pr_curves.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG)
plt.savefig(os.path.join(FIGURES_DIR, "pr_curves.pdf"), bbox_inches='tight', facecolor=FIG_BG)
plt.close()
print("Saved: pr_curves.png/pdf")
def threshold_analysis(best_model_name, best_proba, y_test):
"""Analyze threshold sensitivity for the best model."""
print("\n" + "=" * 60)
print(f"THRESHOLD SENSITIVITY ANALYSIS ({best_model_name})")
print("=" * 60)
thresholds = np.arange(0.05, 0.96, 0.05)
results = []
for t in thresholds:
preds = (best_proba >= t).astype(int)
prec = precision_score(y_test, preds, zero_division=0)
rec = recall_score(y_test, preds, zero_division=0)
f1 = f1_score(y_test, preds, zero_division=0)
mcc = matthews_corrcoef(y_test, preds)
results.append({'Threshold': t, 'Precision': prec, 'Recall': rec, 'F1': f1, 'MCC': mcc})
df_thresh = pd.DataFrame(results)
print(df_thresh.to_string(index=False, float_format='%.4f'))
# Find optimal threshold by F1
best_idx = df_thresh['F1'].idxmax()
best_thresh = df_thresh.loc[best_idx, 'Threshold']
print(f"\nOptimal threshold (by F1): {best_thresh:.2f} → F1={df_thresh.loc[best_idx, 'F1']:.4f}")
# Plot
fig, axes = plt.subplots(1, 2, figsize=(14, 5), facecolor=FIG_BG)
axes[0].plot(df_thresh['Threshold'], df_thresh['Precision'], 'b-', linewidth=2, label='Precision')
axes[0].plot(df_thresh['Threshold'], df_thresh['Recall'], 'r-', linewidth=2, label='Recall')
axes[0].plot(df_thresh['Threshold'], df_thresh['F1'], 'g-', linewidth=2, label='F1 Score')
axes[0].axvline(x=best_thresh, color='gray', linestyle='--', label=f'Best Threshold ({best_thresh:.2f})')
axes[0].set_xlabel('Decision Threshold', fontsize=12)
axes[0].set_ylabel('Score', fontsize=12)
axes[0].set_title(f'Threshold Analysis - {best_model_name}', fontsize=12, fontweight='bold')
axes[0].legend()
axes[1].plot(df_thresh['Threshold'], df_thresh['MCC'], 'm-', linewidth=2, label='MCC')
axes[1].axvline(x=best_thresh, color='gray', linestyle='--')
axes[1].set_xlabel('Decision Threshold', fontsize=12)
axes[1].set_ylabel('MCC', fontsize=12)
axes[1].set_title('Matthews Correlation Coefficient', fontsize=12, fontweight='bold')
axes[1].legend()
plt.tight_layout()
plt.savefig(os.path.join(FIGURES_DIR, "threshold_analysis.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG)
plt.savefig(os.path.join(FIGURES_DIR, "threshold_analysis.pdf"), bbox_inches='tight', facecolor=FIG_BG)
plt.close()
print("Saved: threshold_analysis.png/pdf")
return df_thresh, best_thresh
def business_impact_analysis(all_cm, y_test, X_test_amounts):
"""Estimate business impact: fraud loss caught vs missed."""
print("\n" + "=" * 60)
print("BUSINESS IMPACT ANALYSIS")
print("=" * 60)
# Load raw amounts for test set
data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib"))
# Get actual fraud amounts from the original dataset
df = pd.read_csv(os.path.join(DATA_DIR, "creditcard.csv"))
avg_fraud_amount = df[df['Class'] == 1]['Amount'].mean()
avg_legit_amount = df[df['Class'] == 0]['Amount'].mean()
total_fraud_in_test = y_test.sum()
print(f"Average fraud transaction amount: ${avg_fraud_amount:.2f}")
print(f"Total fraudulent transactions in test set: {total_fraud_in_test}")
print(f"Estimated total fraud exposure: ${total_fraud_in_test * avg_fraud_amount:,.2f}")
impact_results = []
for name, cm in all_cm.items():
tn, fp, fn, tp = cm.ravel()
fraud_caught = tp * avg_fraud_amount
fraud_missed = fn * avg_fraud_amount
false_alarm_cost = fp * 5 # $5 investigation cost per false alarm
net_savings = fraud_caught - false_alarm_cost
catch_rate = tp / (tp + fn) if (tp + fn) > 0 else 0
impact_results.append({
'Model': name,
'True Positives': tp,
'False Negatives': fn,
'False Positives': fp,
'Fraud Caught ($)': fraud_caught,
'Fraud Missed ($)': fraud_missed,
'False Alarm Cost ($)': false_alarm_cost,
'Net Savings ($)': net_savings,
'Catch Rate (%)': catch_rate * 100
})
df_impact = pd.DataFrame(impact_results)
df_impact = df_impact.sort_values('Net Savings ($)', ascending=False)
print("\n" + df_impact.to_string(index=False, float_format='%.2f'))
df_impact.to_csv(os.path.join(FIGURES_DIR, "business_impact.csv"), index=False)
return df_impact
def plot_feature_importance(models, feature_names):
"""Plot feature importance for tree-based models."""
fig, axes = plt.subplots(2, 2, figsize=(16, 12), facecolor=FIG_BG)
tree_models = {
'Random Forest': 'Random_Forest_Tuned',
'XGBoost': 'XGBoost_Tuned',
'LightGBM': 'LightGBM_Tuned',
}
for idx, (title, key) in enumerate(tree_models.items()):
if key in models:
r, c = idx // 2, idx % 2
model = models[key]
importances = model.feature_importances_
indices = np.argsort(importances)[-15:] # Top 15
axes[r, c].barh(range(len(indices)), importances[indices], color='steelblue', edgecolor='black', linewidth=0.3)
axes[r, c].set_yticks(range(len(indices)))
axes[r, c].set_yticklabels([feature_names[i] for i in indices], fontsize=9)
axes[r, c].set_title(f'{title} - Top 15 Features', fontsize=11, fontweight='bold')
axes[r, c].set_xlabel('Importance')
# LR coefficients
if 'Logistic_Regression' in models:
lr = models['Logistic_Regression']
coefs = np.abs(lr.coef_[0])
indices = np.argsort(coefs)[-15:]
axes[1, 1].barh(range(len(indices)), coefs[indices], color='coral', edgecolor='black', linewidth=0.3)
axes[1, 1].set_yticks(range(len(indices)))
axes[1, 1].set_yticklabels([feature_names[i] for i in indices], fontsize=9)
axes[1, 1].set_title('Logistic Regression - Top 15 Features (|coef|)', fontsize=11, fontweight='bold')
axes[1, 1].set_xlabel('Absolute Coefficient')
plt.suptitle('Feature Importance Across Models', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(FIGURES_DIR, "feature_importance.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG)
plt.savefig(os.path.join(FIGURES_DIR, "feature_importance.pdf"), bbox_inches='tight', facecolor=FIG_BG)
plt.close()
print("Saved: feature_importance.png/pdf")
def run_evaluation():
"""Run complete evaluation pipeline."""
# Load data and models
data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib"))
models = joblib.load(os.path.join(MODELS_DIR, "all_models_with_ae.joblib"))
X_test = data['X_test']
y_test = data['y_test']
feature_names = data['feature_names']
# Evaluate all models
df_metrics, all_cm, all_proba, all_preds = evaluate_all_models(models, X_test, y_test)
# Best model by PR-AUC
best_model_name = df_metrics.iloc[0]['Model']
print(f"\nBest model by PR-AUC: {best_model_name}")
# Plot confusion matrices
plot_confusion_matrices(all_cm, list(models.keys()))
# Plot ROC curves
plot_roc_curves(all_proba, y_test)
# Plot PR curves
plot_pr_curves(all_proba, y_test)
# Threshold analysis on best model
df_thresh, best_thresh = threshold_analysis(best_model_name, all_proba[best_model_name], y_test)
# Business impact
df_impact = business_impact_analysis(all_cm, y_test, X_test)
# Feature importance
plot_feature_importance(models, feature_names)
# Save evaluation results
eval_results = {
'metrics': df_metrics,
'confusion_matrices': all_cm,
'probabilities': all_proba,
'predictions': all_preds,
'threshold_analysis': df_thresh,
'best_threshold': best_thresh,
'business_impact': df_impact,
'best_model': best_model_name
}
joblib.dump(eval_results, os.path.join(DATA_DIR, "evaluation_results.joblib"))
print("\n" + "=" * 60)
print("EVALUATION COMPLETE")
print("=" * 60)
return eval_results
if __name__ == "__main__":
eval_results = run_evaluation()