""" Module 7: Production FastAPI Endpoint POST /predict - Real-time fraud detection API. """ import os import sys import time import numpy as np import pandas as pd import joblib from typing import Dict, List, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field import uvicorn # Paths BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) MODELS_DIR = os.path.join(BASE_DIR, "models") DATA_DIR = os.path.join(BASE_DIR, "data") # ============================================================ # Pydantic Models # ============================================================ class TransactionInput(BaseModel): """Input transaction for fraud prediction.""" Time: float = Field(..., description="Seconds elapsed since first transaction") V1: float = 0.0 V2: float = 0.0 V3: float = 0.0 V4: float = 0.0 V5: float = 0.0 V6: float = 0.0 V7: float = 0.0 V8: float = 0.0 V9: float = 0.0 V10: float = 0.0 V11: float = 0.0 V12: float = 0.0 V13: float = 0.0 V14: float = 0.0 V15: float = 0.0 V16: float = 0.0 V17: float = 0.0 V18: float = 0.0 V19: float = 0.0 V20: float = 0.0 V21: float = 0.0 V22: float = 0.0 V23: float = 0.0 V24: float = 0.0 V25: float = 0.0 V26: float = 0.0 V27: float = 0.0 V28: float = 0.0 Amount: float = Field(..., description="Transaction amount in USD") class Config: json_schema_extra = { "example": { "Time": 406.0, "V1": -2.312, "V2": 1.951, "V3": -1.609, "V4": 3.997, "V5": -0.522, "V6": -1.426, "V7": -2.537, "V8": 1.391, "V9": -2.770, "V10": -2.772, "V11": 3.202, "V12": -2.899, "V13": -0.595, "V14": -4.289, "V15": 0.389, "V16": -1.140, "V17": -2.830, "V18": -0.016, "V19": 0.416, "V20": 0.126, "V21": 0.517, "V22": -0.035, "V23": -0.465, "V24": -0.018, "V25": -0.010, "V26": -0.002, "V27": -0.154, "V28": -0.048, "Amount": 239.93 } } class PredictionOutput(BaseModel): """Output prediction result.""" transaction_id: str fraud_probability: float decision: str risk_level: str top_risk_factors: List[Dict[str, float]] response_time_ms: float threshold_used: float model_used: str class HealthResponse(BaseModel): status: str model_loaded: bool version: str # ============================================================ # App # ============================================================ app = FastAPI( title="Fraud Detection API", description="Real-time credit card fraud detection using XGBoost", version="1.0.0" ) # Global model storage model_cache = {} def load_model(): """Load model and scaler at startup.""" if 'model' not in model_cache: models = joblib.load(os.path.join(MODELS_DIR, "all_models.joblib")) model_cache['model'] = models['XGBoost'] model_cache['scaler'] = joblib.load(os.path.join(MODELS_DIR, "scaler.joblib")) # Load feature names data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib")) model_cache['feature_names'] = data['feature_names'] model_cache['threshold'] = 0.55 # Optimal threshold from analysis # Precompute global stats for feature engineering df = pd.read_csv(os.path.join(DATA_DIR, "creditcard.csv")) model_cache['amount_mean'] = df['Amount'].mean() model_cache['amount_median'] = df['Amount'].median() model_cache['amount_std'] = df['Amount'].std() def engineer_single_transaction(txn: TransactionInput) -> pd.DataFrame: """Engineer features for a single transaction.""" row = txn.model_dump() # Feature engineering (matching preprocessing.py) row['Hour_sin'] = np.sin(2 * np.pi * ((row['Time'] / 3600) % 24) / 24) row['Hour_cos'] = np.cos(2 * np.pi * ((row['Time'] / 3600) % 24) / 24) row['Time_diff'] = 0.0 # No previous transaction for single prediction row['Amount_log'] = np.log1p(row['Amount']) row['Amount_deviation_mean'] = row['Amount'] - model_cache['amount_mean'] row['Amount_deviation_median'] = row['Amount'] - model_cache['amount_median'] row['Transaction_velocity'] = 1.0 # Default for single transaction row['Amount_zscore'] = (row['Amount'] - model_cache['amount_mean']) / (model_cache['amount_std'] + 1e-8) row['V14_V17_interaction'] = row['V14'] * row['V17'] row['V12_V14_interaction'] = row['V12'] * row['V14'] row['V10_V14_interaction'] = row['V10'] * row['V14'] pca_features = [f'V{i}' for i in range(1, 29)] row['PCA_magnitude'] = np.sqrt(sum(row[f]**2 for f in pca_features)) # Create DataFrame in correct column order df = pd.DataFrame([row]) feature_names = model_cache['feature_names'] # Ensure all columns present for col in feature_names: if col not in df.columns: df[col] = 0.0 df = df[feature_names] return df def get_risk_factors(features_df, feature_names): """Get top risk factors using feature importance.""" model = model_cache['model'] importances = model.feature_importances_ # Get feature values and their importance risk_factors = [] for i, name in enumerate(feature_names): val = float(features_df.iloc[0][name]) imp = float(importances[i]) if imp > 0.01: # Only significant features risk_factors.append({'feature': name, 'importance': round(imp, 4), 'value': round(val, 4)}) risk_factors.sort(key=lambda x: x['importance'], reverse=True) return risk_factors[:10] @app.on_event("startup") async def startup(): load_model() @app.get("/health", response_model=HealthResponse) async def health_check(): return HealthResponse( status="healthy", model_loaded='model' in model_cache, version="1.0.0" ) @app.post("/predict", response_model=PredictionOutput) async def predict(transaction: TransactionInput): """Predict fraud probability for a transaction.""" start_time = time.time() if 'model' not in model_cache: load_model() try: # Feature engineering features_df = engineer_single_transaction(transaction) # Scale features features_scaled = pd.DataFrame( model_cache['scaler'].transform(features_df), columns=features_df.columns ) # Predict fraud_prob = float(model_cache['model'].predict_proba(features_scaled)[0, 1]) threshold = model_cache['threshold'] # Decision if fraud_prob >= threshold: decision = "BLOCKED - SUSPECTED FRAUD" if fraud_prob >= 0.9: risk_level = "CRITICAL" elif fraud_prob >= 0.7: risk_level = "HIGH" else: risk_level = "MEDIUM" else: decision = "APPROVED" if fraud_prob >= 0.3: risk_level = "LOW" else: risk_level = "MINIMAL" # Get risk factors risk_factors = get_risk_factors(features_scaled, model_cache['feature_names']) response_time = (time.time() - start_time) * 1000 # ms return PredictionOutput( transaction_id=f"TXN-{int(time.time()*1000)}", fraud_probability=round(fraud_prob, 6), decision=decision, risk_level=risk_level, top_risk_factors=risk_factors, response_time_ms=round(response_time, 2), threshold_used=threshold, model_used="XGBoost (Optimized)" ) except Exception as e: raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}") @app.get("/") async def root(): return { "service": "Fraud Detection API", "version": "1.0.0", "endpoints": { "/predict": "POST - Predict fraud probability", "/health": "GET - Health check", "/docs": "GET - API documentation" } } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)