| """ |
| Module 7: Production FastAPI Endpoint |
| POST /predict - Real-time fraud detection API. |
| """ |
| import os |
| import sys |
| import time |
| import numpy as np |
| import pandas as pd |
| import joblib |
| from typing import Dict, List, Optional |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel, Field |
| import uvicorn |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| MODELS_DIR = os.path.join(BASE_DIR, "models") |
| DATA_DIR = os.path.join(BASE_DIR, "data") |
|
|
| |
| |
| |
|
|
| class TransactionInput(BaseModel): |
| """Input transaction for fraud prediction.""" |
| Time: float = Field(..., description="Seconds elapsed since first transaction") |
| V1: float = 0.0 |
| V2: float = 0.0 |
| V3: float = 0.0 |
| V4: float = 0.0 |
| V5: float = 0.0 |
| V6: float = 0.0 |
| V7: float = 0.0 |
| V8: float = 0.0 |
| V9: float = 0.0 |
| V10: float = 0.0 |
| V11: float = 0.0 |
| V12: float = 0.0 |
| V13: float = 0.0 |
| V14: float = 0.0 |
| V15: float = 0.0 |
| V16: float = 0.0 |
| V17: float = 0.0 |
| V18: float = 0.0 |
| V19: float = 0.0 |
| V20: float = 0.0 |
| V21: float = 0.0 |
| V22: float = 0.0 |
| V23: float = 0.0 |
| V24: float = 0.0 |
| V25: float = 0.0 |
| V26: float = 0.0 |
| V27: float = 0.0 |
| V28: float = 0.0 |
| Amount: float = Field(..., description="Transaction amount in USD") |
|
|
| class Config: |
| json_schema_extra = { |
| "example": { |
| "Time": 406.0, |
| "V1": -2.312, "V2": 1.951, "V3": -1.609, "V4": 3.997, |
| "V5": -0.522, "V6": -1.426, "V7": -2.537, "V8": 1.391, |
| "V9": -2.770, "V10": -2.772, "V11": 3.202, "V12": -2.899, |
| "V13": -0.595, "V14": -4.289, "V15": 0.389, "V16": -1.140, |
| "V17": -2.830, "V18": -0.016, "V19": 0.416, "V20": 0.126, |
| "V21": 0.517, "V22": -0.035, "V23": -0.465, "V24": -0.018, |
| "V25": -0.010, "V26": -0.002, "V27": -0.154, "V28": -0.048, |
| "Amount": 239.93 |
| } |
| } |
|
|
|
|
| class PredictionOutput(BaseModel): |
| """Output prediction result.""" |
| transaction_id: str |
| fraud_probability: float |
| decision: str |
| risk_level: str |
| top_risk_factors: List[Dict[str, float]] |
| response_time_ms: float |
| threshold_used: float |
| model_used: str |
|
|
|
|
| class HealthResponse(BaseModel): |
| status: str |
| model_loaded: bool |
| version: str |
|
|
|
|
| |
| |
| |
|
|
| app = FastAPI( |
| title="Fraud Detection API", |
| description="Real-time credit card fraud detection using XGBoost", |
| version="1.0.0" |
| ) |
|
|
| |
| model_cache = {} |
|
|
|
|
| def load_model(): |
| """Load model and scaler at startup.""" |
| if 'model' not in model_cache: |
| models = joblib.load(os.path.join(MODELS_DIR, "all_models.joblib")) |
| model_cache['model'] = models['XGBoost'] |
| model_cache['scaler'] = joblib.load(os.path.join(MODELS_DIR, "scaler.joblib")) |
| |
| |
| data = joblib.load(os.path.join(DATA_DIR, "processed_data.joblib")) |
| model_cache['feature_names'] = data['feature_names'] |
| model_cache['threshold'] = 0.55 |
| |
| |
| df = pd.read_csv(os.path.join(DATA_DIR, "creditcard.csv")) |
| model_cache['amount_mean'] = df['Amount'].mean() |
| model_cache['amount_median'] = df['Amount'].median() |
| model_cache['amount_std'] = df['Amount'].std() |
|
|
|
|
| def engineer_single_transaction(txn: TransactionInput) -> pd.DataFrame: |
| """Engineer features for a single transaction.""" |
| row = txn.model_dump() |
| |
| |
| row['Hour_sin'] = np.sin(2 * np.pi * ((row['Time'] / 3600) % 24) / 24) |
| row['Hour_cos'] = np.cos(2 * np.pi * ((row['Time'] / 3600) % 24) / 24) |
| row['Time_diff'] = 0.0 |
| row['Amount_log'] = np.log1p(row['Amount']) |
| row['Amount_deviation_mean'] = row['Amount'] - model_cache['amount_mean'] |
| row['Amount_deviation_median'] = row['Amount'] - model_cache['amount_median'] |
| row['Transaction_velocity'] = 1.0 |
| row['Amount_zscore'] = (row['Amount'] - model_cache['amount_mean']) / (model_cache['amount_std'] + 1e-8) |
| row['V14_V17_interaction'] = row['V14'] * row['V17'] |
| row['V12_V14_interaction'] = row['V12'] * row['V14'] |
| row['V10_V14_interaction'] = row['V10'] * row['V14'] |
| |
| pca_features = [f'V{i}' for i in range(1, 29)] |
| row['PCA_magnitude'] = np.sqrt(sum(row[f]**2 for f in pca_features)) |
| |
| |
| df = pd.DataFrame([row]) |
| feature_names = model_cache['feature_names'] |
| |
| |
| for col in feature_names: |
| if col not in df.columns: |
| df[col] = 0.0 |
| |
| df = df[feature_names] |
| return df |
|
|
|
|
| def get_risk_factors(features_df, feature_names): |
| """Get top risk factors using feature importance.""" |
| model = model_cache['model'] |
| importances = model.feature_importances_ |
| |
| |
| risk_factors = [] |
| for i, name in enumerate(feature_names): |
| val = float(features_df.iloc[0][name]) |
| imp = float(importances[i]) |
| if imp > 0.01: |
| risk_factors.append({'feature': name, 'importance': round(imp, 4), 'value': round(val, 4)}) |
| |
| risk_factors.sort(key=lambda x: x['importance'], reverse=True) |
| return risk_factors[:10] |
|
|
|
|
| @app.on_event("startup") |
| async def startup(): |
| load_model() |
|
|
|
|
| @app.get("/health", response_model=HealthResponse) |
| async def health_check(): |
| return HealthResponse( |
| status="healthy", |
| model_loaded='model' in model_cache, |
| version="1.0.0" |
| ) |
|
|
|
|
| @app.post("/predict", response_model=PredictionOutput) |
| async def predict(transaction: TransactionInput): |
| """Predict fraud probability for a transaction.""" |
| start_time = time.time() |
| |
| if 'model' not in model_cache: |
| load_model() |
| |
| try: |
| |
| features_df = engineer_single_transaction(transaction) |
| |
| |
| features_scaled = pd.DataFrame( |
| model_cache['scaler'].transform(features_df), |
| columns=features_df.columns |
| ) |
| |
| |
| fraud_prob = float(model_cache['model'].predict_proba(features_scaled)[0, 1]) |
| threshold = model_cache['threshold'] |
| |
| |
| if fraud_prob >= threshold: |
| decision = "BLOCKED - SUSPECTED FRAUD" |
| if fraud_prob >= 0.9: |
| risk_level = "CRITICAL" |
| elif fraud_prob >= 0.7: |
| risk_level = "HIGH" |
| else: |
| risk_level = "MEDIUM" |
| else: |
| decision = "APPROVED" |
| if fraud_prob >= 0.3: |
| risk_level = "LOW" |
| else: |
| risk_level = "MINIMAL" |
| |
| |
| risk_factors = get_risk_factors(features_scaled, model_cache['feature_names']) |
| |
| response_time = (time.time() - start_time) * 1000 |
| |
| return PredictionOutput( |
| transaction_id=f"TXN-{int(time.time()*1000)}", |
| fraud_probability=round(fraud_prob, 6), |
| decision=decision, |
| risk_level=risk_level, |
| top_risk_factors=risk_factors, |
| response_time_ms=round(response_time, 2), |
| threshold_used=threshold, |
| model_used="XGBoost (Optimized)" |
| ) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}") |
|
|
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "service": "Fraud Detection API", |
| "version": "1.0.0", |
| "endpoints": { |
| "/predict": "POST - Predict fraud probability", |
| "/health": "GET - Health check", |
| "/docs": "GET - API documentation" |
| } |
| } |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|