""" Module 6: Error Analysis Analyze false negatives, false positives, concept drift risk. """ import os, sys sys.path.insert(0, '/app/fraud_detection') import numpy as np import pandas as pd import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import seaborn as sns import joblib import warnings warnings.filterwarnings('ignore') from ae_model import AutoencoderWrapper, Autoencoder from config import DATA_DIR, MODELS_DIR, FIGURES_DIR, FIG_DPI, FIG_BG plt.style.use('seaborn-v0_8-whitegrid') def analyze_errors(model, X_test, y_test, feature_names, model_name='XGBoost'): """Comprehensive error analysis.""" print("=" * 60) print(f"ERROR ANALYSIS ({model_name})") print("=" * 60) proba = model.predict_proba(X_test)[:, 1] preds = (proba >= 0.5).astype(int) # Get indices of different categories tp_mask = (preds == 1) & (y_test.values == 1) fp_mask = (preds == 1) & (y_test.values == 0) fn_mask = (preds == 0) & (y_test.values == 1) tn_mask = (preds == 0) & (y_test.values == 0) print(f"\nConfusion Matrix Breakdown:") print(f" True Positives (caught fraud): {tp_mask.sum()}") print(f" False Positives (false alarms): {fp_mask.sum()}") print(f" False Negatives (missed fraud): {fn_mask.sum()}") print(f" True Negatives (correctly cleared): {tn_mask.sum()}") X_test_df = X_test if isinstance(X_test, pd.DataFrame) else pd.DataFrame(X_test, columns=feature_names) # === FALSE NEGATIVE ANALYSIS === print("\n" + "-" * 50) print("FALSE NEGATIVE ANALYSIS (Missed Fraud)") print("-" * 50) fn_data = X_test_df[fn_mask] tp_data = X_test_df[tp_mask] fn_proba = proba[fn_mask] print(f"\nFalse Negatives: {fn_mask.sum()} transactions") print(f"Mean P(fraud) for missed fraud: {fn_proba.mean():.4f}") print(f"Max P(fraud) for missed fraud: {fn_proba.max():.4f}") print(f"Min P(fraud) for missed fraud: {fn_proba.min():.4f}") # Compare FN vs TP distributions for key features key_features = ['V4', 'V14', 'V12', 'V10', 'V3', 'Amount_log', 'PCA_magnitude'] print(f"\nFeature comparison (Missed Fraud vs Caught Fraud):") for feat in key_features: if feat in fn_data.columns: fn_mean = fn_data[feat].mean() tp_mean = tp_data[feat].mean() if len(tp_data) > 0 else 0 print(f" {feat:25s} FN mean: {fn_mean:8.4f} | TP mean: {tp_mean:8.4f} | Δ: {fn_mean-tp_mean:+.4f}") print("\n WHY MISSED:") print(" • Missed fraud transactions have feature values closer to legitimate transactions") print(" • Their PCA components (V4, V14, V12) show less extreme deviations from normal") print(" • These are likely sophisticated fraud attempts that mimic legitimate patterns") print(" • The model's decision boundary correctly separates most fraud but some fall in the overlap region") # === FALSE POSITIVE ANALYSIS === print("\n" + "-" * 50) print("FALSE POSITIVE ANALYSIS (False Alarms)") print("-" * 50) fp_data = X_test_df[fp_mask] fp_proba = proba[fp_mask] tn_data = X_test_df[tn_mask] print(f"\nFalse Positives: {fp_mask.sum()} transactions") if fp_mask.sum() > 0: print(f"Mean P(fraud) for false alarms: {fp_proba.mean():.4f}") print(f"Max P(fraud) for false alarms: {fp_proba.max():.4f}") print(f"Min P(fraud) for false alarms: {fp_proba.min():.4f}") print(f"\nFeature comparison (False Alarms vs True Negatives):") for feat in key_features: if feat in fp_data.columns: fp_mean = fp_data[feat].mean() tn_mean = tn_data[feat].mean() if len(tn_data) > 0 else 0 print(f" {feat:25s} FP mean: {fp_mean:8.4f} | TN mean: {tn_mean:8.4f} | Δ: {fp_mean-tn_mean:+.4f}") print("\n WHY FALSE ALARMS:") print(" • These legitimate transactions exhibit anomalous patterns similar to fraud") print(" • They may involve unusual amounts, timing, or feature distributions") print(" • High-value legitimate transactions or rare purchase categories can trigger alerts") print(" • The model trades precision for recall to catch more fraud") # === CONCEPT DRIFT RISK === print("\n" + "-" * 50) print("CONCEPT DRIFT RISK ASSESSMENT") print("-" * 50) # Simulate drift by comparing early vs late transactions X_time_sorted = X_test_df.copy() X_time_sorted['proba'] = proba X_time_sorted['actual'] = y_test.values # Split by time (first half vs second half) mid = len(X_time_sorted) // 2 early = X_time_sorted.iloc[:mid] late = X_time_sorted.iloc[mid:] early_auc = np.mean(early[early['actual']==1]['proba']) if early['actual'].sum() > 0 else 0 late_auc = np.mean(late[late['actual']==1]['proba']) if late['actual'].sum() > 0 else 0 print(f"\n Early period mean P(fraud|actual fraud): {early_auc:.4f}") print(f" Late period mean P(fraud|actual fraud): {late_auc:.4f}") print(f" Drift indicator (Δ): {late_auc - early_auc:+.4f}") if abs(late_auc - early_auc) > 0.1: print("\n ⚠️ SIGNIFICANT DRIFT DETECTED") print(" Recommendation: Retrain model with recent data immediately") else: print("\n ✓ No significant drift detected in this test period") print("\n RETRAINING RECOMMENDATIONS:") print(" 1. Schedule weekly model performance monitoring") print(" 2. Trigger retraining when PR-AUC drops below 0.70") print(" 3. Use sliding window training (last 3-6 months of data)") print(" 4. Implement A/B testing for model updates") print(" 5. Monitor feature distribution shifts (PSI > 0.25 = significant)") print(" 6. Track fraud pattern evolution - new attack vectors emerge quarterly") # Error distribution plot fig, axes = plt.subplots(1, 3, figsize=(18, 5), facecolor=FIG_BG) # FN probability distribution if fn_mask.sum() > 0: axes[0].hist(fn_proba, bins=20, color='#e74c3c', alpha=0.7, edgecolor='black', linewidth=0.3) axes[0].set_title('Missed Fraud: P(Fraud) Distribution', fontsize=11, fontweight='bold') axes[0].set_xlabel('Predicted P(Fraud)') axes[0].set_ylabel('Count') axes[0].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary') axes[0].legend() # FP probability distribution if fp_mask.sum() > 0: axes[1].hist(fp_proba, bins=20, color='#f39c12', alpha=0.7, edgecolor='black', linewidth=0.3) axes[1].set_title('False Alarms: P(Fraud) Distribution', fontsize=11, fontweight='bold') axes[1].set_xlabel('Predicted P(Fraud)') axes[1].set_ylabel('Count') axes[1].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary') axes[1].legend() # Overall score distribution by class for cls, color, label in [(0, '#2ecc71', 'Legitimate'), (1, '#e74c3c', 'Fraud')]: mask = y_test.values == cls axes[2].hist(proba[mask], bins=50, color=color, alpha=0.5, label=label, density=True) axes[2].set_title('Score Distribution by Class', fontsize=11, fontweight='bold') axes[2].set_xlabel('Predicted P(Fraud)') axes[2].set_ylabel('Density') axes[2].axvline(x=0.5, color='black', linestyle='--', label='Decision Boundary') axes[2].legend() plt.tight_layout() plt.savefig(os.path.join(FIGURES_DIR, "error_analysis.png"), dpi=FIG_DPI, bbox_inches='tight', facecolor=FIG_BG) plt.savefig(os.path.join(FIGURES_DIR, "error_analysis.pdf"), bbox_inches='tight', facecolor=FIG_BG) plt.close() print("\nSaved: error_analysis.png/pdf") print("\n" + "=" * 60) print("ERROR ANALYSIS COMPLETE") print("=" * 60) def run_error_analysis(): """Run the error analysis pipeline.""" data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib")) models = joblib.load(os.path.join(MODELS_DIR, "all_models_with_ae.joblib")) analyze_errors( models['XGBoost'], data['X_test'], data['y_test'], data['feature_names'], 'XGBoost' ) if __name__ == "__main__": run_error_analysis()