| from __future__ import annotations |
|
|
| import numpy as np |
| import pandas as pd |
| from src.core.config_loader import Config |
|
|
|
|
| KYC_MAP = { |
| "low": 1.0, |
| "medium": 0.6, |
| "full": 0.2, |
| } |
|
|
| RISK_PROFILE_MAP = { |
| "low": 0.2, |
| "medium": 0.5, |
| "high": 1.0, |
| } |
|
|
|
|
| def _compute_features(df: pd.DataFrame, users: pd.DataFrame): |
| user_map = users.set_index("user_id") |
| sender_features = user_map.loc[df["sender_id"]] |
|
|
| |
| amount_ratio = df["amount"] / df["amount"].max() |
|
|
| |
| df["_day"] = (df["timestamp"] // 86400).astype(np.int32) |
| daily_cumsum = df.groupby(["sender_id", "_day"])["amount"].cumsum() |
| daily_ratio = daily_cumsum / df["amount"].max() |
|
|
| |
| df_sorted = df.sort_values(["sender_id", "timestamp"]) |
| time_diff = df_sorted.groupby("sender_id")["timestamp"].diff().fillna(1) |
| velocity = 1 / (time_diff + 1) |
| velocity = velocity.reindex(df.index, fill_value=0) |
|
|
| |
| hours = (df["timestamp"] % 86400) / 3600 |
| time_anomaly = ((hours < 6) | (hours > 23)).astype(float) |
|
|
| |
| retry_flag = (time_diff < 60).astype(float) |
| retry_flag = retry_flag.reindex(df.index, fill_value=0) |
|
|
| |
| pair_counts = df.groupby(["sender_id", "receiver_id"]).cumcount() |
| graph_anomaly = 1 / (pair_counts + 1) |
|
|
| |
| kyc = sender_features["kyc_level"].map(KYC_MAP).values |
| user_risk = sender_features["risk_profile"].map(RISK_PROFILE_MAP).values |
|
|
| df.drop(columns=["_day"], inplace=True) |
|
|
| return { |
| "amount_ratio": amount_ratio.values, |
| "daily_ratio": daily_ratio.values, |
| "velocity": velocity.values, |
| "time_anomaly": time_anomaly.values, |
| "graph_anomaly": graph_anomaly.values, |
| "retry": retry_flag.values, |
| "kyc": kyc, |
| "user_risk": user_risk, |
| } |
|
|
|
|
| def _compute_risk_score(features: dict, weights: dict): |
| score = np.zeros(len(next(iter(features.values())))) |
|
|
| for k, v in features.items(): |
| if k in weights: |
| score += weights[k] * v |
|
|
| return score |
|
|
|
|
| def _decision(score: np.ndarray): |
| score = score / (np.std(score) + 1e-6) |
| score = score - np.mean(score) |
|
|
| temperature = 5.0 |
| score = score / temperature |
| score = np.clip(score, -5, 5) |
|
|
| prob = 1 / (1 + np.exp(-score)) |
| threshold = 0.7 |
| rand = np.random.rand(len(prob)) |
|
|
| failed = rand < (prob * 0.4 + (prob > threshold) * 0.3) |
| return failed.astype(np.int8), prob |
|
|
|
|
| def _simulate_retries(df: pd.DataFrame, failed_mask: np.ndarray): |
| failed_txns = df[failed_mask] |
|
|
| if len(failed_txns) == 0: |
| return pd.DataFrame(columns=df.columns) |
|
|
| retry_mask = np.random.rand(len(failed_txns)) < 0.25 |
| retry_df = failed_txns[retry_mask].copy() |
|
|
| retry_df["amount"] *= np.random.uniform(0.7, 0.95, size=len(retry_df)) |
| retry_df["timestamp"] += np.random.exponential(30, size=len(retry_df)) |
| retry_df["is_retry"] = 1 |
|
|
| return retry_df |
|
|
|
|
| def apply_risk_engine( |
| df: pd.DataFrame, |
| users: pd.DataFrame, |
| config: Config |
| ) -> pd.DataFrame: |
|
|
| df = df.copy() |
| df["is_retry"] = 0 |
|
|
| features = _compute_features(df, users) |
|
|
| score = _compute_risk_score(features, config.risk_model.weights) |
|
|
| failed, prob = _decision(score) |
|
|
| df["risk_score"] = score.astype(np.float32) |
| df["fail_prob"] = prob.astype(np.float32) |
| df["failed"] = failed |
|
|
| retry_df = _simulate_retries(df, failed.astype(bool)) |
|
|
| final_df = pd.concat([df, retry_df], ignore_index=True) |
|
|
| final_df = final_df.sort_values("timestamp", kind="mergesort").reset_index(drop=True) |
|
|
| return final_df |