fraud-detection-system / error_analysis.py
rajvivan's picture
Complete fraud detection system: code, figures, models, paper
408a9b2 verified
raw
history blame
8.3 kB
"""
Module 6: Error Analysis
Analyze false negatives, false positives, concept drift risk.
"""
import os, sys
sys.path.insert(0, '/app/fraud_detection')
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import warnings
warnings.filterwarnings('ignore')
from ae_model import AutoencoderWrapper, Autoencoder
from config import DATA_DIR, MODELS_DIR, FIGURES_DIR, FIG_DPI, FIG_BG
plt.style.use('seaborn-v0_8-whitegrid')
def analyze_errors(model, X_test, y_test, feature_names, model_name='XGBoost'):
"""Comprehensive error analysis."""
print("=" * 60)
print(f"ERROR ANALYSIS ({model_name})")
print("=" * 60)
proba = model.predict_proba(X_test)[:, 1]
preds = (proba >= 0.5).astype(int)
# Get indices of different categories
tp_mask = (preds == 1) & (y_test.values == 1)
fp_mask = (preds == 1) & (y_test.values == 0)
fn_mask = (preds == 0) & (y_test.values == 1)
tn_mask = (preds == 0) & (y_test.values == 0)
print(f"\nConfusion Matrix Breakdown:")
print(f" True Positives (caught fraud): {tp_mask.sum()}")
print(f" False Positives (false alarms): {fp_mask.sum()}")
print(f" False Negatives (missed fraud): {fn_mask.sum()}")
print(f" True Negatives (correctly cleared): {tn_mask.sum()}")
X_test_df = X_test if isinstance(X_test, pd.DataFrame) else pd.DataFrame(X_test, columns=feature_names)
# === FALSE NEGATIVE ANALYSIS ===
print("\n" + "-" * 50)
print("FALSE NEGATIVE ANALYSIS (Missed Fraud)")
print("-" * 50)
fn_data = X_test_df[fn_mask]
tp_data = X_test_df[tp_mask]
fn_proba = proba[fn_mask]
print(f"\nFalse Negatives: {fn_mask.sum()} transactions")
print(f"Mean P(fraud) for missed fraud: {fn_proba.mean():.4f}")
print(f"Max P(fraud) for missed fraud: {fn_proba.max():.4f}")
print(f"Min P(fraud) for missed fraud: {fn_proba.min():.4f}")
# Compare FN vs TP distributions for key features
key_features = ['V4', 'V14', 'V12', 'V10', 'V3', 'Amount_log', 'PCA_magnitude']
print(f"\nFeature comparison (Missed Fraud vs Caught Fraud):")
for feat in key_features:
if feat in fn_data.columns:
fn_mean = fn_data[feat].mean()
tp_mean = tp_data[feat].mean() if len(tp_data) > 0 else 0
print(f" {feat:25s} FN mean: {fn_mean:8.4f} | TP mean: {tp_mean:8.4f} | Δ: {fn_mean-tp_mean:+.4f}")
print("\n WHY MISSED:")
print(" • Missed fraud transactions have feature values closer to legitimate transactions")
print(" • Their PCA components (V4, V14, V12) show less extreme deviations from normal")
print(" • These are likely sophisticated fraud attempts that mimic legitimate patterns")
print(" • The model's decision boundary correctly separates most fraud but some fall in the overlap region")
# === FALSE POSITIVE ANALYSIS ===
print("\n" + "-" * 50)
print("FALSE POSITIVE ANALYSIS (False Alarms)")
print("-" * 50)
fp_data = X_test_df[fp_mask]
fp_proba = proba[fp_mask]
tn_data = X_test_df[tn_mask]
print(f"\nFalse Positives: {fp_mask.sum()} transactions")
if fp_mask.sum() > 0:
print(f"Mean P(fraud) for false alarms: {fp_proba.mean():.4f}")
print(f"Max P(fraud) for false alarms: {fp_proba.max():.4f}")
print(f"Min P(fraud) for false alarms: {fp_proba.min():.4f}")
print(f"\nFeature comparison (False Alarms vs True Negatives):")
for feat in key_features:
if feat in fp_data.columns:
fp_mean = fp_data[feat].mean()
tn_mean = tn_data[feat].mean() if len(tn_data) > 0 else 0
print(f" {feat:25s} FP mean: {fp_mean:8.4f} | TN mean: {tn_mean:8.4f} | Δ: {fp_mean-tn_mean:+.4f}")
print("\n WHY FALSE ALARMS:")
print(" • These legitimate transactions exhibit anomalous patterns similar to fraud")
print(" • They may involve unusual amounts, timing, or feature distributions")
print(" • High-value legitimate transactions or rare purchase categories can trigger alerts")
print(" • The model trades precision for recall to catch more fraud")
# === CONCEPT DRIFT RISK ===
print("\n" + "-" * 50)
print("CONCEPT DRIFT RISK ASSESSMENT")
print("-" * 50)
# Simulate drift by comparing early vs late transactions
X_time_sorted = X_test_df.copy()
X_time_sorted['proba'] = proba
X_time_sorted['actual'] = y_test.values
# Split by time (first half vs second half)
mid = len(X_time_sorted) // 2
early = X_time_sorted.iloc[:mid]
late = X_time_sorted.iloc[mid:]
early_auc = np.mean(early[early['actual']==1]['proba']) if early['actual'].sum() > 0 else 0
late_auc = np.mean(late[late['actual']==1]['proba']) if late['actual'].sum() > 0 else 0
print(f"\n Early period mean P(fraud|actual fraud): {early_auc:.4f}")
print(f" Late period mean P(fraud|actual fraud): {late_auc:.4f}")
print(f" Drift indicator (Δ): {late_auc - early_auc:+.4f}")
if abs(late_auc - early_auc) > 0.1:
print("\n ⚠️ SIGNIFICANT DRIFT DETECTED")
print(" Recommendation: Retrain model with recent data immediately")
else:
print("\n ✓ No significant drift detected in this test period")
print("\n RETRAINING RECOMMENDATIONS:")
print(" 1. Schedule weekly model performance monitoring")
print(" 2. Trigger retraining when PR-AUC drops below 0.70")
print(" 3. Use sliding window training (last 3-6 months of data)")
print(" 4. Implement A/B testing for model updates")
print(" 5. Monitor feature distribution shifts (PSI > 0.25 = significant)")
print(" 6. Track fraud pattern evolution - new attack vectors emerge quarterly")
# Error distribution plot
fig, axes = plt.subplots(1, 3, figsize=(18, 5), facecolor=FIG_BG)
# FN probability distribution
if fn_mask.sum() > 0:
axes[0].hist(fn_proba, bins=20, color='#e74c3c', alpha=0.7, edgecolor='black', linewidth=0.3)
axes[0].set_title('Missed Fraud: P(Fraud) Distribution', fontsize=11, fontweight='bold')
axes[0].set_xlabel('Predicted P(Fraud)')
axes[0].set_ylabel('Count')
axes[0].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary')
axes[0].legend()
# FP probability distribution
if fp_mask.sum() > 0:
axes[1].hist(fp_proba, bins=20, color='#f39c12', alpha=0.7, edgecolor='black', linewidth=0.3)
axes[1].set_title('False Alarms: P(Fraud) Distribution', fontsize=11, fontweight='bold')
axes[1].set_xlabel('Predicted P(Fraud)')
axes[1].set_ylabel('Count')
axes[1].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary')
axes[1].legend()
# Overall score distribution by class
for cls, color, label in [(0, '#2ecc71', 'Legitimate'), (1, '#e74c3c', 'Fraud')]:
mask = y_test.values == cls
axes[2].hist(proba[mask], bins=50, color=color, alpha=0.5, label=label, density=True)
axes[2].set_title('Score Distribution by Class', fontsize=11, fontweight='bold')
axes[2].set_xlabel('Predicted P(Fraud)')
axes[2].set_ylabel('Density')
axes[2].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary')
axes[2].legend()
plt.tight_layout()
plt.savefig(os.path.join(FIGURES_DIR, "error_analysis.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG)
plt.savefig(os.path.join(FIGURES_DIR, "error_analysis.pdf"), bbox_inches='tight', facecolor=FIG_BG)
plt.close()
print("\nSaved: error_analysis.png/pdf")
print("\n" + "=" * 60)
print("ERROR ANALYSIS COMPLETE")
print("=" * 60)
def run_error_analysis():
"""Run the error analysis pipeline."""
data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib"))
models = joblib.load(os.path.join(MODELS_DIR, "all_models_with_ae.joblib"))
analyze_errors(
models['XGBoost'],
data['X_test'],
data['y_test'],
data['feature_names'],
'XGBoost'
)
if __name__ == "__main__":
run_error_analysis()