| """ |
| Data Drift Detection using Scipy KS Test. |
| Detects distribution shifts between baseline and new data. |
| """ |
|
|
| import pickle |
| import json |
| import requests |
| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
| from datetime import datetime |
| from scipy.stats import ks_2samp |
| from typing import Dict, Tuple |
|
|
| |
| PROJECT_ROOT = Path(__file__).parent.parent.parent.parent |
| BASELINE_DIR = Path(__file__).parent.parent / "baseline" |
| REPORTS_DIR = Path(__file__).parent.parent / "reports" |
| REPORTS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| PUSHGATEWAY_URL = "http://localhost:9091" |
| P_VALUE_THRESHOLD = 0.05 |
|
|
|
|
| def load_baseline() -> np.ndarray: |
| """Load reference/baseline data.""" |
| baseline_path = BASELINE_DIR / "reference_data.pkl" |
| |
| if not baseline_path.exists(): |
| raise FileNotFoundError( |
| f"Baseline data not found at {baseline_path}\n" |
| f"Run `python prepare_baseline.py` first!" |
| ) |
| |
| with open(baseline_path, 'rb') as f: |
| X_baseline = pickle.load(f) |
| |
| print(f"Loaded baseline data: {X_baseline.shape}") |
| return X_baseline |
|
|
|
|
| def load_new_data() -> np.ndarray: |
| """ |
| Load new/production data to check for drift. |
| |
| In production, this would fetch from: |
| - Database |
| - S3 bucket |
| - API logs |
| - Data lake |
| |
| For now, simulate or load from file. |
| """ |
| |
| |
| data_path = PROJECT_ROOT / "data" / "test.csv" |
| if data_path.exists(): |
| df = pd.read_csv(data_path) |
| |
| feature_columns = [col for col in df.columns if col not in ['label', 'id', 'timestamp']] |
| X_new = df[feature_columns].values[:500] |
| print(f"Loaded new data from file: {X_new.shape}") |
| return X_new |
| |
| |
| print("Simulating new data (no test file found)") |
| X_baseline = load_baseline() |
| |
| X_new = X_baseline[:500] + np.random.normal(0, 0.1, (500, X_baseline.shape[1])) |
| return X_new |
|
|
|
|
| def run_drift_detection(X_baseline: np.ndarray, X_new: np.ndarray) -> Dict: |
| """ |
| Run Kolmogorov-Smirnov drift detection using scipy. |
| |
| Args: |
| X_baseline: Reference data |
| X_new: New data to check |
| |
| Returns: |
| Drift detection results |
| """ |
| print("\n" + "=" * 60) |
| print("Running Drift Detection (Kolmogorov-Smirnov Test)") |
| print("=" * 60) |
| |
| |
| p_values = [] |
| distances = [] |
| |
| for i in range(X_baseline.shape[1]): |
| statistic, p_value = ks_2samp(X_baseline[:, i], X_new[:, i]) |
| p_values.append(p_value) |
| distances.append(statistic) |
| |
| |
| min_p_value = np.min(p_values) |
| max_distance = np.max(distances) |
| |
| |
| adjusted_threshold = P_VALUE_THRESHOLD / X_baseline.shape[1] |
| drift_detected = min_p_value < adjusted_threshold |
| |
| |
| results = { |
| "timestamp": datetime.now().isoformat(), |
| "drift_detected": int(drift_detected), |
| "p_value": float(min_p_value), |
| "threshold": adjusted_threshold, |
| "distance": float(max_distance), |
| "baseline_samples": X_baseline.shape[0], |
| "new_samples": X_new.shape[0], |
| "num_features": X_baseline.shape[1] |
| } |
| |
| |
| print(f"\nResults:") |
| print(f" Drift Detected: {'YES' if results['drift_detected'] else 'NO'}") |
| print(f" P-Value: {results['p_value']:.6f} (adjusted threshold: {adjusted_threshold:.6f})") |
| print(f" Distance: {results['distance']:.6f}") |
| print(f" Baseline: {X_baseline.shape[0]} samples") |
| print(f" New Data: {X_new.shape[0]} samples") |
| |
| return results |
|
|
|
|
| def save_report(results: Dict): |
| """Save drift detection report to file.""" |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| report_path = REPORTS_DIR / f"drift_report_{timestamp}.json" |
| |
| with open(report_path, 'w') as f: |
| json.dump(results, f, indent=2) |
| |
| print(f"\nReport saved to: {report_path}") |
|
|
|
|
| def push_to_prometheus(results: Dict): |
| """ |
| Push drift metrics to Prometheus via Pushgateway. |
| |
| This allows Prometheus to scrape short-lived job metrics. |
| """ |
| metrics = f"""# TYPE drift_detected gauge |
| # HELP drift_detected Whether data drift was detected (1=yes, 0=no) |
| drift_detected {results['drift_detected']} |
| |
| # TYPE drift_p_value gauge |
| # HELP drift_p_value P-value from drift detection test |
| drift_p_value {results['p_value']} |
| |
| # TYPE drift_distance gauge |
| # HELP drift_distance Statistical distance between distributions |
| drift_distance {results['distance']} |
| |
| # TYPE drift_check_timestamp gauge |
| # HELP drift_check_timestamp Unix timestamp of last drift check |
| drift_check_timestamp {datetime.now().timestamp()} |
| """ |
| |
| try: |
| response = requests.post( |
| f"{PUSHGATEWAY_URL}/metrics/job/drift_detection/instance/hopcroft", |
| data=metrics, |
| headers={'Content-Type': 'text/plain'} |
| ) |
| response.raise_for_status() |
| print(f"Metrics pushed to Pushgateway at {PUSHGATEWAY_URL}") |
| except requests.exceptions.RequestException as e: |
| print(f"Failed to push to Pushgateway: {e}") |
| print(f" Make sure Pushgateway is running: docker compose ps pushgateway") |
|
|
|
|
| def main(): |
| """Main execution.""" |
| print("\n" + "=" * 60) |
| print("Hopcroft Data Drift Detection") |
| print("=" * 60) |
| |
| try: |
| |
| X_baseline = load_baseline() |
| X_new = load_new_data() |
| |
| |
| results = run_drift_detection(X_baseline, X_new) |
| |
| |
| save_report(results) |
| |
| |
| push_to_prometheus(results) |
| |
| print("\n" + "=" * 60) |
| print("Drift Detection Complete!") |
| print("=" * 60) |
| |
| if results['drift_detected']: |
| print("\nWARNING: Data drift detected!") |
| print(f" P-value: {results['p_value']:.6f} < {P_VALUE_THRESHOLD}") |
| return 1 |
| else: |
| print("\nNo significant drift detected") |
| return 0 |
| |
| except Exception as e: |
| print(f"\nError: {e}") |
| import traceback |
| traceback.print_exc() |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| exit(main()) |