| """ |
| Module 6: Error Analysis |
| Analyze false negatives, false positives, concept drift risk. |
| """ |
| import os, sys |
| sys.path.insert(0, '/app/fraud_detection') |
| import numpy as np |
| import pandas as pd |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| import joblib |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| from ae_model import AutoencoderWrapper, Autoencoder |
| from config import DATA_DIR, MODELS_DIR, FIGURES_DIR, FIG_DPI, FIG_BG |
|
|
| plt.style.use('seaborn-v0_8-whitegrid') |
|
|
|
|
| def analyze_errors(model, X_test, y_test, feature_names, model_name='XGBoost'): |
| """Comprehensive error analysis.""" |
| print("=" * 60) |
| print(f"ERROR ANALYSIS ({model_name})") |
| print("=" * 60) |
| |
| proba = model.predict_proba(X_test)[:, 1] |
| preds = (proba >= 0.5).astype(int) |
| |
| |
| tp_mask = (preds == 1) & (y_test.values == 1) |
| fp_mask = (preds == 1) & (y_test.values == 0) |
| fn_mask = (preds == 0) & (y_test.values == 1) |
| tn_mask = (preds == 0) & (y_test.values == 0) |
| |
| print(f"\nConfusion Matrix Breakdown:") |
| print(f" True Positives (caught fraud): {tp_mask.sum()}") |
| print(f" False Positives (false alarms): {fp_mask.sum()}") |
| print(f" False Negatives (missed fraud): {fn_mask.sum()}") |
| print(f" True Negatives (correctly cleared): {tn_mask.sum()}") |
| |
| X_test_df = X_test if isinstance(X_test, pd.DataFrame) else pd.DataFrame(X_test, columns=feature_names) |
| |
| |
| print("\n" + "-" * 50) |
| print("FALSE NEGATIVE ANALYSIS (Missed Fraud)") |
| print("-" * 50) |
| |
| fn_data = X_test_df[fn_mask] |
| tp_data = X_test_df[tp_mask] |
| fn_proba = proba[fn_mask] |
| |
| print(f"\nFalse Negatives: {fn_mask.sum()} transactions") |
| print(f"Mean P(fraud) for missed fraud: {fn_proba.mean():.4f}") |
| print(f"Max P(fraud) for missed fraud: {fn_proba.max():.4f}") |
| print(f"Min P(fraud) for missed fraud: {fn_proba.min():.4f}") |
| |
| |
| key_features = ['V4', 'V14', 'V12', 'V10', 'V3', 'Amount_log', 'PCA_magnitude'] |
| |
| print(f"\nFeature comparison (Missed Fraud vs Caught Fraud):") |
| for feat in key_features: |
| if feat in fn_data.columns: |
| fn_mean = fn_data[feat].mean() |
| tp_mean = tp_data[feat].mean() if len(tp_data) > 0 else 0 |
| print(f" {feat:25s} FN mean: {fn_mean:8.4f} | TP mean: {tp_mean:8.4f} | Δ: {fn_mean-tp_mean:+.4f}") |
| |
| print("\n WHY MISSED:") |
| print(" • Missed fraud transactions have feature values closer to legitimate transactions") |
| print(" • Their PCA components (V4, V14, V12) show less extreme deviations from normal") |
| print(" • These are likely sophisticated fraud attempts that mimic legitimate patterns") |
| print(" • The model's decision boundary correctly separates most fraud but some fall in the overlap region") |
| |
| |
| print("\n" + "-" * 50) |
| print("FALSE POSITIVE ANALYSIS (False Alarms)") |
| print("-" * 50) |
| |
| fp_data = X_test_df[fp_mask] |
| fp_proba = proba[fp_mask] |
| tn_data = X_test_df[tn_mask] |
| |
| print(f"\nFalse Positives: {fp_mask.sum()} transactions") |
| if fp_mask.sum() > 0: |
| print(f"Mean P(fraud) for false alarms: {fp_proba.mean():.4f}") |
| print(f"Max P(fraud) for false alarms: {fp_proba.max():.4f}") |
| print(f"Min P(fraud) for false alarms: {fp_proba.min():.4f}") |
| |
| print(f"\nFeature comparison (False Alarms vs True Negatives):") |
| for feat in key_features: |
| if feat in fp_data.columns: |
| fp_mean = fp_data[feat].mean() |
| tn_mean = tn_data[feat].mean() if len(tn_data) > 0 else 0 |
| print(f" {feat:25s} FP mean: {fp_mean:8.4f} | TN mean: {tn_mean:8.4f} | Δ: {fp_mean-tn_mean:+.4f}") |
| |
| print("\n WHY FALSE ALARMS:") |
| print(" • These legitimate transactions exhibit anomalous patterns similar to fraud") |
| print(" • They may involve unusual amounts, timing, or feature distributions") |
| print(" • High-value legitimate transactions or rare purchase categories can trigger alerts") |
| print(" • The model trades precision for recall to catch more fraud") |
| |
| |
| print("\n" + "-" * 50) |
| print("CONCEPT DRIFT RISK ASSESSMENT") |
| print("-" * 50) |
| |
| |
| X_time_sorted = X_test_df.copy() |
| X_time_sorted['proba'] = proba |
| X_time_sorted['actual'] = y_test.values |
| |
| |
| mid = len(X_time_sorted) // 2 |
| early = X_time_sorted.iloc[:mid] |
| late = X_time_sorted.iloc[mid:] |
| |
| early_auc = np.mean(early[early['actual']==1]['proba']) if early['actual'].sum() > 0 else 0 |
| late_auc = np.mean(late[late['actual']==1]['proba']) if late['actual'].sum() > 0 else 0 |
| |
| print(f"\n Early period mean P(fraud|actual fraud): {early_auc:.4f}") |
| print(f" Late period mean P(fraud|actual fraud): {late_auc:.4f}") |
| print(f" Drift indicator (Δ): {late_auc - early_auc:+.4f}") |
| |
| if abs(late_auc - early_auc) > 0.1: |
| print("\n ⚠️ SIGNIFICANT DRIFT DETECTED") |
| print(" Recommendation: Retrain model with recent data immediately") |
| else: |
| print("\n ✓ No significant drift detected in this test period") |
| |
| print("\n RETRAINING RECOMMENDATIONS:") |
| print(" 1. Schedule weekly model performance monitoring") |
| print(" 2. Trigger retraining when PR-AUC drops below 0.70") |
| print(" 3. Use sliding window training (last 3-6 months of data)") |
| print(" 4. Implement A/B testing for model updates") |
| print(" 5. Monitor feature distribution shifts (PSI > 0.25 = significant)") |
| print(" 6. Track fraud pattern evolution - new attack vectors emerge quarterly") |
| |
| |
| fig, axes = plt.subplots(1, 3, figsize=(18, 5), facecolor=FIG_BG) |
| |
| |
| if fn_mask.sum() > 0: |
| axes[0].hist(fn_proba, bins=20, color='#e74c3c', alpha=0.7, edgecolor='black', linewidth=0.3) |
| axes[0].set_title('Missed Fraud: P(Fraud) Distribution', fontsize=11, fontweight='bold') |
| axes[0].set_xlabel('Predicted P(Fraud)') |
| axes[0].set_ylabel('Count') |
| axes[0].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary') |
| axes[0].legend() |
| |
| |
| if fp_mask.sum() > 0: |
| axes[1].hist(fp_proba, bins=20, color='#f39c12', alpha=0.7, edgecolor='black', linewidth=0.3) |
| axes[1].set_title('False Alarms: P(Fraud) Distribution', fontsize=11, fontweight='bold') |
| axes[1].set_xlabel('Predicted P(Fraud)') |
| axes[1].set_ylabel('Count') |
| axes[1].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary') |
| axes[1].legend() |
| |
| |
| for cls, color, label in [(0, '#2ecc71', 'Legitimate'), (1, '#e74c3c', 'Fraud')]: |
| mask = y_test.values == cls |
| axes[2].hist(proba[mask], bins=50, color=color, alpha=0.5, label=label, density=True) |
| axes[2].set_title('Score Distribution by Class', fontsize=11, fontweight='bold') |
| axes[2].set_xlabel('Predicted P(Fraud)') |
| axes[2].set_ylabel('Density') |
| axes[2].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary') |
| axes[2].legend() |
| |
| plt.tight_layout() |
| plt.savefig(os.path.join(FIGURES_DIR, "error_analysis.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG) |
| plt.savefig(os.path.join(FIGURES_DIR, "error_analysis.pdf"), bbox_inches='tight', facecolor=FIG_BG) |
| plt.close() |
| print("\nSaved: error_analysis.png/pdf") |
| |
| print("\n" + "=" * 60) |
| print("ERROR ANALYSIS COMPLETE") |
| print("=" * 60) |
|
|
|
|
| def run_error_analysis(): |
| """Run the error analysis pipeline.""" |
| data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib")) |
| models = joblib.load(os.path.join(MODELS_DIR, "all_models_with_ae.joblib")) |
| |
| analyze_errors( |
| models['XGBoost'], |
| data['X_test'], |
| data['y_test'], |
| data['feature_names'], |
| 'XGBoost' |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| run_error_analysis() |
|
|