from __future__ import annotations import numpy as np import pandas as pd from src.core.config_loader import Config KYC_MAP = { "low": 1.0, "medium": 0.6, "full": 0.2, } RISK_PROFILE_MAP = { "low": 0.2, "medium": 0.5, "high": 1.0, } def _compute_features(df: pd.DataFrame, users: pd.DataFrame): user_map = users.set_index("user_id") sender_features = user_map.loc[df["sender_id"]] # Amount ratio amount_ratio = df["amount"] / df["amount"].max() # Daily ratio df["_day"] = (df["timestamp"] // 86400).astype(np.int32) daily_cumsum = df.groupby(["sender_id", "_day"])["amount"].cumsum() daily_ratio = daily_cumsum / df["amount"].max() # Velocity df_sorted = df.sort_values(["sender_id", "timestamp"]) time_diff = df_sorted.groupby("sender_id")["timestamp"].diff().fillna(1) velocity = 1 / (time_diff + 1) velocity = velocity.reindex(df.index, fill_value=0) # Time anomaly hours = (df["timestamp"] % 86400) / 3600 time_anomaly = ((hours < 6) | (hours > 23)).astype(float) # Retry signal retry_flag = (time_diff < 60).astype(float) retry_flag = retry_flag.reindex(df.index, fill_value=0) # Graph anomaly (new interactions) pair_counts = df.groupby(["sender_id", "receiver_id"]).cumcount() graph_anomaly = 1 / (pair_counts + 1) # KYC + user risk kyc = sender_features["kyc_level"].map(KYC_MAP).values user_risk = sender_features["risk_profile"].map(RISK_PROFILE_MAP).values df.drop(columns=["_day"], inplace=True) return { "amount_ratio": amount_ratio.values, "daily_ratio": daily_ratio.values, "velocity": velocity.values, "time_anomaly": time_anomaly.values, "graph_anomaly": graph_anomaly.values, "retry": retry_flag.values, "kyc": kyc, "user_risk": user_risk, } def _compute_risk_score(features: dict, weights: dict): score = np.zeros(len(next(iter(features.values())))) for k, v in features.items(): if k in weights: score += weights[k] * v return score def _decision(score: np.ndarray): score = score / (np.std(score) + 1e-6) score = score - np.mean(score) temperature = 5.0 score = score / temperature score = np.clip(score, -5, 5) prob = 1 / (1 + np.exp(-score)) threshold = 0.7 rand = np.random.rand(len(prob)) failed = rand < (prob * 0.4 + (prob > threshold) * 0.3) return failed.astype(np.int8), prob def _simulate_retries(df: pd.DataFrame, failed_mask: np.ndarray): failed_txns = df[failed_mask] if len(failed_txns) == 0: return pd.DataFrame(columns=df.columns) retry_mask = np.random.rand(len(failed_txns)) < 0.25 retry_df = failed_txns[retry_mask].copy() retry_df["amount"] *= np.random.uniform(0.7, 0.95, size=len(retry_df)) retry_df["timestamp"] += np.random.exponential(30, size=len(retry_df)) retry_df["is_retry"] = 1 return retry_df def apply_risk_engine( df: pd.DataFrame, users: pd.DataFrame, config: Config ) -> pd.DataFrame: df = df.copy() df["is_retry"] = 0 features = _compute_features(df, users) score = _compute_risk_score(features, config.risk_model.weights) failed, prob = _decision(score) df["risk_score"] = score.astype(np.float32) df["fail_prob"] = prob.astype(np.float32) df["failed"] = failed retry_df = _simulate_retries(df, failed.astype(bool)) final_df = pd.concat([df, retry_df], ignore_index=True) final_df = final_df.sort_values("timestamp", kind="mergesort").reset_index(drop=True) return final_df