temporal-twins-code / src /risk /risk_engine.py
temporal-twins-anon's picture
Add anonymous Temporal Twins code release
a3682cf verified
from __future__ import annotations
import numpy as np
import pandas as pd
from src.core.config_loader import Config
KYC_MAP = {
"low": 1.0,
"medium": 0.6,
"full": 0.2,
}
RISK_PROFILE_MAP = {
"low": 0.2,
"medium": 0.5,
"high": 1.0,
}
def _compute_features(df: pd.DataFrame, users: pd.DataFrame):
user_map = users.set_index("user_id")
sender_features = user_map.loc[df["sender_id"]]
# Amount ratio
amount_ratio = df["amount"] / df["amount"].max()
# Daily ratio
df["_day"] = (df["timestamp"] // 86400).astype(np.int32)
daily_cumsum = df.groupby(["sender_id", "_day"])["amount"].cumsum()
daily_ratio = daily_cumsum / df["amount"].max()
# Velocity
df_sorted = df.sort_values(["sender_id", "timestamp"])
time_diff = df_sorted.groupby("sender_id")["timestamp"].diff().fillna(1)
velocity = 1 / (time_diff + 1)
velocity = velocity.reindex(df.index, fill_value=0)
# Time anomaly
hours = (df["timestamp"] % 86400) / 3600
time_anomaly = ((hours < 6) | (hours > 23)).astype(float)
# Retry signal
retry_flag = (time_diff < 60).astype(float)
retry_flag = retry_flag.reindex(df.index, fill_value=0)
# Graph anomaly (new interactions)
pair_counts = df.groupby(["sender_id", "receiver_id"]).cumcount()
graph_anomaly = 1 / (pair_counts + 1)
# KYC + user risk
kyc = sender_features["kyc_level"].map(KYC_MAP).values
user_risk = sender_features["risk_profile"].map(RISK_PROFILE_MAP).values
df.drop(columns=["_day"], inplace=True)
return {
"amount_ratio": amount_ratio.values,
"daily_ratio": daily_ratio.values,
"velocity": velocity.values,
"time_anomaly": time_anomaly.values,
"graph_anomaly": graph_anomaly.values,
"retry": retry_flag.values,
"kyc": kyc,
"user_risk": user_risk,
}
def _compute_risk_score(features: dict, weights: dict):
score = np.zeros(len(next(iter(features.values()))))
for k, v in features.items():
if k in weights:
score += weights[k] * v
return score
def _decision(score: np.ndarray):
score = score / (np.std(score) + 1e-6)
score = score - np.mean(score)
temperature = 5.0
score = score / temperature
score = np.clip(score, -5, 5)
prob = 1 / (1 + np.exp(-score))
threshold = 0.7
rand = np.random.rand(len(prob))
failed = rand < (prob * 0.4 + (prob > threshold) * 0.3)
return failed.astype(np.int8), prob
def _simulate_retries(df: pd.DataFrame, failed_mask: np.ndarray):
failed_txns = df[failed_mask]
if len(failed_txns) == 0:
return pd.DataFrame(columns=df.columns)
retry_mask = np.random.rand(len(failed_txns)) < 0.25
retry_df = failed_txns[retry_mask].copy()
retry_df["amount"] *= np.random.uniform(0.7, 0.95, size=len(retry_df))
retry_df["timestamp"] += np.random.exponential(30, size=len(retry_df))
retry_df["is_retry"] = 1
return retry_df
def apply_risk_engine(
df: pd.DataFrame,
users: pd.DataFrame,
config: Config
) -> pd.DataFrame:
df = df.copy()
df["is_retry"] = 0
features = _compute_features(df, users)
score = _compute_risk_score(features, config.risk_model.weights)
failed, prob = _decision(score)
df["risk_score"] = score.astype(np.float32)
df["fail_prob"] = prob.astype(np.float32)
df["failed"] = failed
retry_df = _simulate_retries(df, failed.astype(bool))
final_df = pd.concat([df, retry_df], ignore_index=True)
final_df = final_df.sort_values("timestamp", kind="mergesort").reset_index(drop=True)
return final_df