temporal-twins-code / src /fraud /fraud_engine.py
temporal-twins-anon's picture
Add anonymous Temporal Twins code release
a3682cf verified
raw
history blame
72.7 kB
import numpy as np
import pandas as pd
from collections import Counter
# ============================================================
# ORACLE / AUDIT COLUMNS — never exposed to learned baselines
# ============================================================
ORACLE_ONLY_COLS: frozenset = frozenset({
"motif_hit_count",
"motif_source",
"trigger_event_idx",
"label_event_idx",
"label_delay",
"is_fallback_label",
"fraud_source",
"twin_role",
"twin_label",
"twin_pair_id",
"template_id",
"dynamic_fraud_state",
"motif_chain_state",
"motif_strength",
})
# =========================
# DIFFICULTY PRESETS
# =========================
DIFFICULTY_PRESETS = {
"easy": {
"noise_std": 0.2,
"quantile_type": 0.90,
"quantile_suspicious": 0.92,
"pair_freq_mult": 0.7,
"velocity_logit": 0.20,
"burst_divisor": 10.0,
"retry_logit": 0.8,
"ring_logit": 1.2,
"global_noise": 0.4,
"graph_feat_noise": 0.0, # no noise on features
"delayed_fraction": 0.0, # no delayed fraud
"thresh_velocity": 0.93,
"thresh_burst": 0.90,
"thresh_retry": 0.88,
"thresh_ring": 0.90,
"thresh_none": 0.9995,
},
"medium": {
"noise_std": 0.3,
"quantile_type": 0.94,
"quantile_suspicious": 0.96,
"pair_freq_mult": 0.35,
"velocity_logit": 0.15,
"burst_divisor": 12.0,
"retry_logit": 0.6,
"ring_logit": 0.5,
"global_noise": 0.7,
"graph_feat_noise": 0.2,
"delayed_fraction": 0.3, # 30% of velocity fraud is delayed
"thresh_velocity": 0.95,
"thresh_burst": 0.93,
"thresh_retry": 0.92,
"thresh_ring": 0.95,
"thresh_none": 0.9998,
},
"hard": {
"noise_std": 0.4,
"quantile_type": 0.97,
"quantile_suspicious": 0.98,
"pair_freq_mult": 0.2, # Increased from 0.05 to prevent OOD collapse
"velocity_logit": 0.12,
"burst_divisor": 15.0,
"retry_logit": 0.5,
"ring_logit": 0.15,
"global_noise": 1.5, # Increased global noise to maintain difficulty
"graph_feat_noise": 0.5,
"delayed_fraction": 0.5,
"thresh_velocity": 0.97,
"thresh_burst": 0.96,
"thresh_retry": 0.96,
"thresh_ring": 0.98,
"thresh_none": 0.9999,
},
}
TEMPORAL_TWIN_STANDARD_PROFILES = {
"easy": {
"receiver_gap": 3,
"delta_recipe": "easy",
"event_divisor": 4,
"min_events": 5,
"max_events_cap": 12,
"source_keep_frac": 1.00,
"min_true_sources": 4,
"max_chain_fallback": 1,
"delay_range": (4, 9),
"source_pool_factor": 1.0,
"chain_pool_factor": 1.0,
"fraud_block_prob": 1.0,
"motif_cycle_prob": 1.0,
"camouflage_prob": 0.0,
},
"medium": {
"receiver_gap": 4,
"delta_recipe": "medium",
"event_divisor": 5,
"min_events": 4,
"max_events_cap": 10,
"source_keep_frac": 0.75,
"min_true_sources": 3,
"max_chain_fallback": 3,
"delay_range": (7, 14),
"source_pool_factor": 2.0,
"chain_pool_factor": 2.0,
"fraud_block_prob": 0.30,
"motif_cycle_prob": 0.40,
"camouflage_prob": 0.60,
},
"hard": {
"receiver_gap": 5,
"delta_recipe": "hard",
"event_divisor": 6,
"min_events": 4,
"max_events_cap": 8,
"source_keep_frac": 0.45,
"min_true_sources": 2,
"max_chain_fallback": 5,
"delay_range": (10, 20),
"source_pool_factor": 3.0,
"chain_pool_factor": 3.0,
"fraud_block_prob": 0.22,
"motif_cycle_prob": 0.28,
"camouflage_prob": 0.78,
},
}
def temporal_twin_motif_trace(
timestamps: np.ndarray,
receivers: np.ndarray,
) -> dict:
"""Shared finite-state motif program for temporal-twin calibration.
The signal intentionally depends on event order and timing only:
quiet -> accelerating cadence -> delayed receiver revisit -> burst-release-burst
"""
timestamps = np.asarray(timestamps, dtype=np.float64)
receivers = np.asarray(receivers, dtype=np.int64)
n = len(timestamps)
empty = np.zeros(n, dtype=np.float32)
if n == 0:
return {
"state": empty,
"chain": empty,
"motif_strength": empty,
"quiet": empty,
"accel": empty,
"revisit": empty,
"burst_release_burst": empty,
"source": np.zeros(n, dtype=np.int8),
}
if n > 1:
dts = np.diff(timestamps)
base_dts = np.clip(dts, 60.0, None)
else:
base_dts = np.array([1800.0], dtype=np.float64)
short_q = float(np.quantile(base_dts, 0.55))
medium_q = float(np.quantile(base_dts, 0.70))
long_q = float(np.quantile(base_dts, 0.82))
short_q = max(short_q, 60.0)
medium_q = max(medium_q, short_q * 1.10)
long_q = max(long_q, medium_q * 1.15)
state = np.zeros(n, dtype=np.float32)
chain = np.zeros(n, dtype=np.float32)
motif_strength = np.zeros(n, dtype=np.float32)
quiet_flags = np.zeros(n, dtype=np.float32)
accel_flags = np.zeros(n, dtype=np.float32)
revisit_flags = np.zeros(n, dtype=np.float32)
brb_flags = np.zeros(n, dtype=np.float32)
source = np.zeros(n, dtype=np.int8)
prev_dts = [long_q, long_q, long_q, long_q]
receiver_last_idx: dict[int, int] = {}
recent_accel = 0.0
recent_revisit = 0.0
recent_brb = 0.0
chain_state = 0.0
hidden_state = 0.0
last_source = -99
for idx in range(n):
dt = long_q if idx == 0 else max(float(timestamps[idx] - timestamps[idx - 1]), 60.0)
current_receiver = int(receivers[idx])
quiet = float(prev_dts[-1] >= long_q)
accel = float(
prev_dts[-3] >= long_q
and prev_dts[-2] > prev_dts[-1] > dt
and dt <= short_q
)
gap_events = idx - receiver_last_idx.get(current_receiver, idx)
revisit = float(
current_receiver in receiver_last_idx
and 3 <= gap_events <= 8
and max(prev_dts[-2], prev_dts[-1]) >= long_q * 0.85
)
burst_release_burst = float(
prev_dts[-3] <= short_q
and prev_dts[-2] >= long_q
and prev_dts[-1] <= short_q
and dt <= short_q
)
recent_accel = max(0.0, 0.86 * recent_accel + accel)
recent_revisit = max(0.0, 0.88 * recent_revisit + revisit)
recent_brb = max(0.0, 0.88 * recent_brb + burst_release_burst)
local_speed = max(0.0, (short_q / max(dt, 60.0)) - 0.55)
signal = (
1.20 * accel
+ 1.25 * revisit
+ 1.10 * burst_release_burst
+ 0.30 * quiet
+ 0.20 * local_speed
)
chain_state = max(
0.0,
0.82 * chain_state
+ 0.75 * signal
+ 0.22 * min(recent_accel, 1.0)
+ 0.28 * min(recent_revisit, 1.0)
+ 0.24 * min(recent_brb, 1.0)
- 0.30,
)
hidden_state = max(0.0, 0.97 * hidden_state + 0.22 * chain_state + 0.34 * signal)
if (
idx >= 6
and burst_release_burst > 0.0
and recent_accel > 0.20
and recent_revisit > 0.30
and chain_state > 0.80
and idx - last_source >= 4
):
source[idx] = 1
last_source = idx
quiet_flags[idx] = quiet
accel_flags[idx] = accel
revisit_flags[idx] = revisit
brb_flags[idx] = burst_release_burst
motif_strength[idx] = signal
chain[idx] = chain_state
state[idx] = hidden_state
receiver_last_idx[current_receiver] = idx
prev_dts = (prev_dts + [dt])[-4:]
return {
"state": state.astype(np.float32),
"chain": chain.astype(np.float32),
"motif_strength": motif_strength.astype(np.float32),
"quiet": quiet_flags.astype(np.float32),
"accel": accel_flags.astype(np.float32),
"revisit": revisit_flags.astype(np.float32),
"burst_release_burst": brb_flags.astype(np.float32),
"source": source.astype(np.int8),
}
# Maximum retries when a calib-mode fraud twin has no motif hits
_CALIB_MOTIF_RETRY_BUDGET = 8
_BENIGN_MOTIF_REPAIR_STEPS = 16
class FraudEngine:
def __init__(self, seed=42, difficulty="medium", benchmark_mode="temporal_twins"):
self.rng = np.random.default_rng(seed)
self.difficulty = difficulty
self.benchmark_mode = benchmark_mode
self.params = DIFFICULTY_PRESETS[difficulty]
def apply(self, df: pd.DataFrame) -> pd.DataFrame:
if self.benchmark_mode in ("temporal_twins", "temporal_twins_oracle_calib"):
return self._apply_temporal_twins(df)
df = df.copy()
df = df.sort_values("timestamp").reset_index(drop=True)
p = self.params
n = len(df)
# -------------------------
# BASE FEATURES
# -------------------------
noise = self.rng.normal(0, p["noise_std"], size=n)
df["risk_noisy"] = df["risk_score"] * 0.2 + noise
df["txn_count_10"] = (
df.groupby("sender_id")["timestamp"]
.transform(lambda x: x.rolling(10, min_periods=1).count())
)
df["amount_sum_10"] = (
df.groupby("sender_id")["amount"]
.transform(lambda x: x.rolling(10, min_periods=1).sum())
)
velocity = df["txn_count_10"] * 0.6 + df["amount_sum_10"] * 0.0002
retry_signal = (
df["is_retry"] * 1.2 +
df["failed"] * 1.5 +
df["fail_prob"] * 0.7
)
# -------------------------
# QUANTILES (controlled by difficulty)
# -------------------------
q_type = p["quantile_type"]
q_susp = p["quantile_suspicious"]
velocity_q_type = velocity.quantile(q_type)
velocity_q_susp = velocity.quantile(q_susp)
txn_q_type = df["txn_count_10"].quantile(q_type)
retry_q_type = retry_signal.quantile(q_type)
retry_q_susp = retry_signal.quantile(q_susp)
# -------------------------
# GRAPH CONTAGION
# -------------------------
import math
neighbor_score = np.zeros(n, dtype=np.float32)
recent = {}
# Convert to fast python lists for loop access
velocity_arr = velocity.to_numpy().tolist()
retry_arr = retry_signal.to_numpy().tolist()
sender_arr = df["sender_id"].to_numpy().tolist()
receiver_arr = df["receiver_id"].to_numpy().tolist()
time_arr = df["timestamp"].to_numpy().tolist()
for i in range(n):
s = sender_arr[i]
r = receiver_arr[i]
score = recent.get(s, 0.0) + recent.get(r, 0.0)
neighbor_score[i] = math.tanh(score)
suspicious = (
velocity_arr[i] > velocity_q_susp
or retry_arr[i] > retry_q_susp
)
if suspicious:
recent[s] = recent.get(s, 0.0) + 1.0
recent[r] = recent.get(r, 0.0) + 1.0
else:
if s in recent:
recent[s] *= 0.9
if r in recent:
recent[r] *= 0.9
df["neighbor_score"] = neighbor_score
# --------------------------------
# GRAPH RING (STRUCTURAL) + NOISE
# --------------------------------
pairs = list(zip(df["sender_id"], df["receiver_id"]))
pair_counts = pd.Series(pairs).value_counts()
df["pair_freq"] = [pair_counts[(s, r)] for s, r in pairs]
df["pair_freq"] = np.log1p(df["pair_freq"]) * p["pair_freq_mult"]
# Add noise to structural features (breaks GNN)
if p["graph_feat_noise"] > 0:
gf_noise = p["graph_feat_noise"]
df["pair_freq"] += self.rng.normal(0, gf_noise, size=n)
df["neighbor_score"] += self.rng.normal(0, gf_noise * 0.5, size=n)
# -------------------------------------------------------
# ALL STATIC FRAUD SIGNALS REMOVED
# Fraud is ONLY triggered by stateful temporal accumulation below.
# This ensures static models (XGBoost, GNN) cannot solve the task.
# -------------------------------------------------------
df["fraud_type"] = "none"
df["is_fraud"] = 0
# Randomize edge features so GNN cannot exploit them
df["amount"] = self.rng.normal(0, 1, size=n)
df["risk_score"] = self.rng.normal(0, 1, size=n)
df["fail_prob"] = self.rng.normal(0, 1, size=n)
# -------------------------
# STATEFUL TEMPORAL ACCUMULATION (velocity & burst)
# -------------------------
# Fraud strictly depends on the hidden history of the user,
# perfectly breaking any static mapping from current features to the label.
user_state = {}
last_txn = {}
# State threshold (difficulty specific) — raised to force longer buildup
thresh_state = {"easy": 6.0, "medium": 7.0, "hard": 8.5}[self.difficulty]
diff_scale = {"easy": 1.0, "medium": 0.8, "hard": 0.6}[self.difficulty]
# Track logic without inline DataFrame modifications
velocity_idx = []
ring_idx = []
dynamic_state = np.zeros(n, dtype=np.float32)
ring_memory = {}
burst_memory = {}
receiver_history = {}
temporal_candidates = []
cadence_ema = {}
user_event_pos = {}
cooldown_until = {}
cooldown_span = {"easy": 10, "medium": 12, "hard": 15}[self.difficulty]
max_r = max(receiver_arr) if receiver_arr else 1
for i in range(n):
u = sender_arr[i]
r_id = receiver_arr[i]
t = time_arr[i]
user_event_pos[u] = user_event_pos.get(u, 0) + 1
event_pos = user_event_pos[u]
can_trigger = event_pos >= cooldown_until.get(u, 0)
prev_state = user_state.get(u, 0.0)
dt = t - last_txn.get(u, t)
last_txn[u] = t
# Relative acceleration matters more than absolute volume.
# This suppresses static "busy user" shortcuts and rewards temporal memory.
prev_cadence = cadence_ema.get(u, 3600.0)
if dt == 0:
time_factor = 0.8 * diff_scale
else:
eff_dt = max(float(dt), 60.0)
rel_speed = prev_cadence / eff_dt
if rel_speed > 3.0:
time_factor = 1.8 * diff_scale
elif rel_speed > 1.8:
time_factor = 1.4 * diff_scale
elif rel_speed > 1.2:
time_factor = 1.0 * diff_scale
elif rel_speed > 0.8:
time_factor = 0.6 * diff_scale
else:
time_factor = 0.25 * diff_scale
cadence_ema[u] = 0.97 * prev_cadence + 0.03 * eff_dt
# =========================
# ADVERSARIAL ADAPTATION
# =========================
# Adversarial slowdown near detection (tamed)
if prev_state > (0.7 * thresh_state):
time_factor *= 0.6
# Adversarial burst attack (rare, moderate)
if self.rng.random() < 0.02:
time_factor *= 1.5
# 🚨 Evasion behavior (switch receiver)
if prev_state > (0.8 * thresh_state) and self.rng.random() < 0.3:
r_id = self.rng.integers(0, max_r + 1)
hist = receiver_history.get(u, ())
revisit_motif = len(hist) >= 2 and (r_id in hist[-3:]) and hist[-1] != r_id
# Hidden EMA accumulation: Low noise to preserve learnability
noise = self.rng.normal(0, 0.03)
new_state = max(0.0, 0.975 * prev_state + 0.22 * time_factor + noise)
# Delayed reinforcement (forces multi-step buildup across time)
if prev_state > (0.6 * thresh_state) and dt < 7200:
new_state += 0.3 * diff_scale
prev_burst = burst_memory.get(u, 0.0)
if dt < 600:
burst_impulse = 1.0
elif dt < 1800:
burst_impulse = 0.4
elif dt < 7200:
burst_impulse = -0.5
else:
burst_impulse = -0.8
burst_state = max(0.0, 0.92 * prev_burst + burst_impulse)
burst_memory[u] = burst_state
crossed_state = prev_state <= thresh_state and new_state > thresh_state
release_event = prev_burst > 2.5 and dt > 1800
if revisit_motif and (release_event or crossed_state or prev_burst > 1.5):
temporal_candidates.append(i)
user_state[u] = new_state
dynamic_state[i] = new_state
# =========================
# FRAUD MECHANISM BY DIFFICULTY
# =========================
n_velocity_before = len(velocity_idx)
n_ring_before = len(ring_idx)
# Order-specific release after a short-gap burst.
# This keeps fraud tied to chronology rather than to static activity volume.
if can_trigger and revisit_motif and release_event and new_state > (0.75 * thresh_state):
if self.rng.random() < 0.12:
velocity_idx.append(i)
if self.difficulty == "easy":
# Pure velocity fraud (learnable, local temporal)
if can_trigger and revisit_motif and (crossed_state or (release_event and new_state > (0.85 * thresh_state))):
prob = min(0.55, 0.15 + 0.25 * (new_state / max(thresh_state, 1e-6)))
if self.rng.random() < prob:
velocity_idx.append(i)
# --------------------------------
# C. TRUE MULTI-AGENT RINGS
# --------------------------------
key = tuple(sorted((u, r_id)))
prev_ring = ring_memory.get(key, 0.0)
ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0)
ring_cross = prev_ring <= 6.0 and ring_memory[key] > 6.0
if can_trigger and revisit_motif and ring_cross and release_event:
ring_idx.append(i)
elif self.difficulty == "medium":
# Mixed mechanisms
if can_trigger and revisit_motif and crossed_state and release_event:
prob = min(0.45, 0.10 + 0.22 * (new_state / max(thresh_state * 1.2, 1e-6)))
if self.rng.random() < prob:
velocity_idx.append(i)
# Retry abuse (adds orthogonal signal)
if can_trigger and revisit_motif and retry_arr[i] > retry_q_type and release_event:
if self.rng.random() < 0.15:
velocity_idx.append(i)
# --------------------------------
# C. TRUE MULTI-AGENT RINGS
# --------------------------------
key = tuple(sorted((u, r_id)))
prev_ring = ring_memory.get(key, 0.0)
ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0)
ring_cross = prev_ring <= 5.0 and ring_memory[key] > 5.0
if can_trigger and revisit_motif and ring_cross and (release_event or new_state > thresh_state):
ring_idx.append(i)
elif self.difficulty == "hard":
# Mostly rings, small velocity residual
# Partial mechanism overlap ensures shared latent structure across difficulties!
if can_trigger and revisit_motif and crossed_state and release_event and new_state > thresh_state:
if self.rng.random() < 0.1:
velocity_idx.append(i)
# --------------------------------
# C. TRUE MULTI-AGENT RINGS
# --------------------------------
key = tuple(sorted((u, r_id)))
prev_ring = ring_memory.get(key, 0.0)
ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0)
ring_cross = prev_ring <= 3.5 and ring_memory[key] > 3.5
# HARD keeps rings, but only on burst-to-release transitions.
if can_trigger and revisit_motif and ring_cross and release_event and new_state > (0.65 * thresh_state):
ring_idx.append(i)
if can_trigger and (
len(velocity_idx) > n_velocity_before or len(ring_idx) > n_ring_before
):
cooldown_until[u] = event_pos + cooldown_span
receiver_history[u] = (hist + (r_id,))[-3:]
# Apply state array and fraud indices to DataFrame vectorially
df["dynamic_fraud_state"] = dynamic_state
if ring_idx:
df.loc[ring_idx, "is_fraud"] = 1
df.loc[ring_idx, "fraud_type"] = "graph_ring"
# Velocity fraud applied after ring to not overwrite graph_ring if both triggered,
# but velocity is the primary type we are delaying.
if velocity_idx:
velocity_mask = df.index.isin(velocity_idx) & (df["fraud_type"] == "none")
df.loc[velocity_mask, "is_fraud"] = 1
df.loc[velocity_mask, "fraud_type"] = "velocity"
# -------------------------
# DELAYED FRAUD (CRITICAL FOR TEMPORAL ADVANTAGE)
# -------------------------
# Group user transactions to ensure delayed fraud is attributed to the SAME user.
# This prevents breaking the causal mapping to sender_id.
delayed_frac = {
"easy": 0.2,
"medium": 0.6,
"hard": 1.0
}[self.difficulty]
if delayed_frac > 0:
fraud_idx = df[(df["is_fraud"] == 1)].index.to_numpy()
n_delay = int(len(fraud_idx) * delayed_frac)
if n_delay > 0:
delay_sources = self.rng.choice(fraud_idx, size=n_delay, replace=False)
# Fast grouped indices tracking (pre-cached to raw numpy arrays)
user_groups = {k: v.to_numpy() for k, v in df.groupby("sender_id").groups.items()}
delayed_targets = []
valid_sources = []
for src in delay_sources:
u = df._get_value(src, "sender_id")
idxs = user_groups[u]
pos = np.searchsorted(idxs, src)
delay = self.rng.integers(5, 15) # Shift by 5-14 future transactions (longer memory dependency)
if pos + delay < len(idxs):
valid_sources.append(src)
delayed_targets.append(idxs[pos + delay])
# Apply delays
df.loc[valid_sources, "is_fraud"] = 0
if delayed_targets:
df.loc[delayed_targets, "is_fraud"] = 1
# -------------------------
# MINIMUM FRAUD FLOOR (CRITICAL FOR EVAL STABILITY)
# -------------------------
min_rate = {
"easy": 0.06,
"medium": 0.05,
"hard": 0.03
}[self.difficulty]
current_rate = df["is_fraud"].mean()
if current_rate < min_rate:
deficit = int((min_rate - current_rate) * len(df))
# Backfill with sequence-motif candidates first so the floor remains temporal.
temporal_pool = np.array(sorted(set(temporal_candidates)), dtype=np.int64)
eligible = df.loc[temporal_pool] if len(temporal_pool) else df.iloc[0:0]
eligible = eligible[eligible["fraud_type"] == "none"]
if len(eligible) < deficit:
state_thresh = np.percentile(df["dynamic_fraud_state"], 70)
state_eligible = df[
(df["fraud_type"] == "none") &
(df["dynamic_fraud_state"] > state_thresh)
]
eligible = pd.concat([eligible, state_eligible], ignore_index=False)
eligible = eligible[~eligible.index.duplicated(keep="first")]
n_sample = min(deficit, len(eligible))
candidates = eligible.sample(n_sample, random_state=42).index
# Instead of random labels → use WEAK temporal signal
df.loc[candidates, "is_fraud"] = 1
df.loc[candidates, "fraud_type"] = "weak_velocity"
# Inject minimal temporal consistency
df.loc[candidates, "dynamic_fraud_state"] += self.rng.normal(0.5, 0.1, size=len(candidates)).astype(np.float32)
# -------------------------------------------------------
# FINAL FEATURE SANITISATION
# -------------------------------------------------------
# Fraud is driven by latent chronology, not by any directly observable
# per-event shortcut. Keep dynamic_fraud_state for mechanistic analysis,
# but decorrelate the exported model-facing features after labels are fixed.
df["amount"] = self.rng.normal(0, 1, size=n).astype(np.float32)
df["risk_score"] = self.rng.normal(0, 1, size=n).astype(np.float32)
df["fail_prob"] = self.rng.normal(0, 1, size=n).astype(np.float32)
df["risk_noisy"] = self.rng.normal(0, 1, size=n).astype(np.float32)
failed_rate = float(df["failed"].mean()) if "failed" in df.columns else 0.0
retry_rate = float(df["is_retry"].mean()) if "is_retry" in df.columns else 0.0
df["failed"] = self.rng.binomial(1, failed_rate, size=n).astype(np.int8)
df["is_retry"] = self.rng.binomial(1, retry_rate, size=n).astype(np.int8)
df["txn_count_10"] = self.rng.permutation(df["txn_count_10"].to_numpy())
df["amount_sum_10"] = self.rng.permutation(df["amount_sum_10"].to_numpy())
df["neighbor_score"] = self.rng.normal(0, 1, size=n).astype(np.float32)
df["pair_freq"] = self.rng.normal(0, 1, size=n).astype(np.float32)
return df
def _is_standard_temporal_twins(self) -> bool:
return self.benchmark_mode == "temporal_twins"
def _standard_twin_profile(self) -> dict:
return TEMPORAL_TWIN_STANDARD_PROFILES[self.difficulty]
def _apply_temporal_twins(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df = df.sort_values("timestamp").reset_index(drop=True)
for column, default in (
("is_retry", 0),
("failed", 0),
("risk_score", 0.0),
("fail_prob", 0.0),
):
if column not in df.columns:
df[column] = default
sender_groups = {
int(sender_id): group.sort_values("timestamp").reset_index(drop=True).copy()
for sender_id, group in df.groupby("sender_id", sort=False)
}
if not sender_groups:
return df
out_frames = []
pair_id = 0
min_pair_events = 18
user_meta = []
for sender_id, group in sender_groups.items():
receiver_counts = Counter(int(receiver_id) for receiver_id in group["receiver_id"].tolist())
repeated_receivers = int(sum(count >= 2 for count in receiver_counts.values()))
user_meta.append({
"sender_id": int(sender_id),
"group": group,
"count": int(len(group)),
"repeated_receivers": repeated_receivers,
"start_time": float(group["timestamp"].min()) if len(group) else 0.0,
})
eligible_templates = [
meta for meta in user_meta
if meta["count"] >= min_pair_events and meta["repeated_receivers"] >= 2
]
eligible_templates = sorted(
eligible_templates,
key=lambda meta: (-meta["count"], -meta["repeated_receivers"], meta["start_time"], meta["sender_id"]),
)
carrier_meta = sorted(
user_meta,
key=lambda meta: (meta["start_time"], meta["sender_id"]),
)
carrier_cursor = 0
template_cursor = 0
if not eligible_templates:
while carrier_cursor < len(carrier_meta):
carrier = carrier_meta[carrier_cursor]
out_frames.append(self._make_background_user(carrier["group"], int(carrier["sender_id"])))
carrier_cursor += 1
out = pd.concat(out_frames, ignore_index=True)
out = out.sort_values("timestamp").reset_index(drop=True)
out["txn_id"] = np.arange(len(out), dtype=np.int32)
return self._finalise_temporal_twin_features(out)
while carrier_cursor + 1 < len(carrier_meta):
fraud_carrier = carrier_meta[carrier_cursor]
benign_carrier = carrier_meta[carrier_cursor + 1]
built_pair = False
for template_offset in range(len(eligible_templates)):
template_idx = (template_cursor + template_offset) % len(eligible_templates)
template_meta = eligible_templates[template_idx]
template = template_meta["group"].copy().reset_index(drop=True)
count_target = len(template)
shared_layout = {
"ordered_dts": self._order_deltas(
np.diff(template["timestamp"].to_numpy(dtype=np.float64)),
role="shared",
),
"amount_perm": self.rng.permutation(count_target),
"retry_perm": self.rng.permutation(count_target),
"failed_perm": self.rng.permutation(count_target),
}
pair_start_time = float(template_meta["start_time"])
fraud_frame = self._build_temporal_twin_user(
template_df=template,
sender_id=int(fraud_carrier["sender_id"]),
start_time=pair_start_time,
pair_id=pair_id,
role="fraud",
shared_layout=shared_layout,
template_id=int(template_meta["sender_id"]),
)
if fraud_frame is None:
continue
benign_frame = self._build_temporal_twin_user(
template_df=template,
sender_id=int(benign_carrier["sender_id"]),
start_time=pair_start_time,
pair_id=pair_id,
role="benign",
shared_layout=shared_layout,
fraud_reference=fraud_frame,
template_id=int(template_meta["sender_id"]),
)
if benign_frame is None:
continue
out_frames.append(fraud_frame)
out_frames.append(benign_frame)
pair_id += 1
carrier_cursor += 2
template_cursor = (template_idx + 1) % len(eligible_templates)
built_pair = True
break
if not built_pair:
out_frames.append(self._make_background_user(fraud_carrier["group"], int(fraud_carrier["sender_id"])))
out_frames.append(self._make_background_user(benign_carrier["group"], int(benign_carrier["sender_id"])))
carrier_cursor += 2
while carrier_cursor < len(carrier_meta):
carrier = carrier_meta[carrier_cursor]
out_frames.append(self._make_background_user(carrier["group"], int(carrier["sender_id"])))
carrier_cursor += 1
out = pd.concat(out_frames, ignore_index=True)
out = out.sort_values("timestamp").reset_index(drop=True)
out["txn_id"] = np.arange(len(out), dtype=np.int32)
return self._finalise_temporal_twin_features(out)
def _make_background_user(self, user_df: pd.DataFrame, sender_id: int) -> pd.DataFrame:
out = user_df.copy().sort_values("timestamp").reset_index(drop=True)
out["sender_id"] = int(sender_id)
out["is_fraud"] = np.zeros(len(out), dtype=np.int8)
out["fraud_type"] = "none"
out["dynamic_fraud_state"] = np.zeros(len(out), dtype=np.float32)
out["motif_source"] = np.zeros(len(out), dtype=np.int8)
out["motif_chain_state"] = np.zeros(len(out), dtype=np.float32)
out["motif_strength"] = np.zeros(len(out), dtype=np.float32)
out["twin_pair_id"] = -1
out["template_id"] = -1
out["twin_role"] = "background"
out["twin_label"] = 0
return out
def _build_temporal_twin_user(
self,
template_df: pd.DataFrame,
sender_id: int,
start_time: float,
pair_id: int,
role: str,
shared_layout: dict | None = None,
fraud_reference: pd.DataFrame | None = None,
template_id: int | None = None,
) -> pd.DataFrame:
"""Build one twin user, with retry logic in calib mode for fraud twins."""
calib_mode = self.benchmark_mode == "temporal_twins_oracle_calib"
max_attempts = _CALIB_MOTIF_RETRY_BUDGET if (calib_mode and role == "fraud") else 1
for attempt in range(max_attempts):
out = template_df.copy().reset_index(drop=True)
n = len(out)
timestamps = out["timestamp"].to_numpy(dtype=np.float64)
if n <= 1:
ordered_dts = np.zeros(0, dtype=np.float64)
else:
if shared_layout is not None and "ordered_dts" in shared_layout:
ordered_dts = np.asarray(shared_layout["ordered_dts"], dtype=np.float64)
else:
ordered_dts = self._order_deltas(np.diff(timestamps), role=role)
new_timestamps = np.empty(n, dtype=np.float64)
new_timestamps[0] = max(0.0, float(start_time))
if n > 1:
new_timestamps[1:] = new_timestamps[0] + np.cumsum(ordered_dts)
out["timestamp"] = new_timestamps.astype(np.float32)
camouflage_fraud = False
if role == "fraud" and self._is_standard_temporal_twins():
camouflage_fraud = self.rng.random() < float(self._standard_twin_profile()["camouflage_prob"])
if role == "benign" and fraud_reference is not None:
label_boundaries = sorted(
fraud_reference.loc[
fraud_reference["is_fraud"] == 1,
"label_event_idx",
].astype(int).unique().tolist()
)
receiver_seq = self._order_receivers_benign_matched(
fraud_receivers=fraud_reference["receiver_id"].to_numpy(dtype=np.int64),
label_boundaries=label_boundaries,
timestamps=out["timestamp"].to_numpy(dtype=np.float64),
)
elif camouflage_fraud:
receiver_seq = self._order_receivers_benign_greedy(
receivers=out["receiver_id"].to_numpy(dtype=np.int64),
timestamps=out["timestamp"].to_numpy(dtype=np.float64),
)
else:
receiver_seq = self._order_receivers(
out["receiver_id"].to_numpy(dtype=np.int64),
role=role,
timestamps=out["timestamp"].to_numpy(dtype=np.float64),
)
out["receiver_id"] = np.asarray(receiver_seq, dtype=np.int32)
if role == "benign" and fraud_reference is not None:
out = self._repair_benign_twin_segmented(out, label_boundaries)
if shared_layout is not None:
amount_perm = np.asarray(shared_layout["amount_perm"], dtype=np.int64)
retry_perm = np.asarray(shared_layout["retry_perm"], dtype=np.int64)
failed_perm = np.asarray(shared_layout["failed_perm"], dtype=np.int64)
else:
amount_perm = self.rng.permutation(n)
retry_perm = self.rng.permutation(n)
failed_perm = self.rng.permutation(n)
out["amount"] = out["amount"].to_numpy(dtype=np.float32)[amount_perm]
out["txn_type"] = out["txn_type"].to_numpy(dtype=np.int8)
out["is_retry"] = out["is_retry"].to_numpy(dtype=np.int8)[retry_perm]
out["failed"] = out["failed"].to_numpy(dtype=np.int8)[failed_perm]
out["risk_score"] = out["risk_score"].to_numpy(dtype=np.float32)
out["fail_prob"] = out["fail_prob"].to_numpy(dtype=np.float32)
out["sender_id"] = int(sender_id)
out["is_fraud"] = 0
out["fraud_type"] = "none"
out["twin_pair_id"] = int(pair_id)
out["template_id"] = int(template_id if template_id is not None else pair_id)
out["twin_role"] = role
out["twin_label"] = 1 if role == "fraud" else 0
out = out.sort_values("timestamp").reset_index(drop=True)
if role == "benign" and fraud_reference is None:
out = self._repair_benign_twin(out)
if calib_mode:
result = self._apply_twin_labels_calib(out, role=role)
# In calib mode, fraud twin MUST have >= 1 motif-sourced positive
if role == "fraud":
if int(result["is_fraud"].sum()) > 0:
return result
if attempt < max_attempts - 1:
continue # retry with a fresh random permutation
# Exhausted retries — drop this pair (caller detects via None)
print(
f"[calib] WARNING: pair_id={pair_id} sender={sender_id} "
f"produced 0 motif hits after {max_attempts} attempts — dropping pair."
)
return None # type: ignore[return-value]
if int(result["motif_hit_count"].max()) > 0:
return None # type: ignore[return-value]
return result
else:
result = self._apply_twin_labels_standard(out, role=role)
if role == "benign" and int(result["motif_hit_count"].max()) > 0:
return None # type: ignore[return-value]
return result
# Should not reach here
return self._apply_twin_labels_standard(
out.sort_values("timestamp").reset_index(drop=True), role=role
)
def _repair_benign_twin(self, user_df: pd.DataFrame) -> pd.DataFrame:
"""Greedily perturb a benign receiver order to minimize motif hits."""
out = user_df.copy().sort_values("timestamp").reset_index(drop=True)
receivers = out["receiver_id"].to_numpy(dtype=np.int64).copy()
timestamps = out["timestamp"].to_numpy(dtype=np.float64)
trace = temporal_twin_motif_trace(timestamps, receivers)
if int(np.sum(trace["source"])) == 0:
return out
best_receivers = receivers.copy()
best_hits = int(np.sum(trace["source"]))
for _ in range(_BENIGN_MOTIF_REPAIR_STEPS):
source_positions = np.flatnonzero(trace["source"]).tolist()
if not source_positions:
out["receiver_id"] = receivers.astype(np.int32)
return out
src_idx = int(source_positions[0])
candidate_receivers = None
candidate_hits = best_hits
for swap_offset in (1, -1, 2, -2, 3, -3):
swap_idx = src_idx + swap_offset
if swap_idx < 0 or swap_idx >= len(receivers):
continue
if receivers[swap_idx] == receivers[src_idx]:
continue
trial = receivers.copy()
trial[src_idx], trial[swap_idx] = trial[swap_idx], trial[src_idx]
trial_hits = int(np.sum(temporal_twin_motif_trace(timestamps, trial)["source"]))
if trial_hits < candidate_hits:
candidate_receivers = trial
candidate_hits = trial_hits
if trial_hits == 0:
break
if candidate_receivers is None:
break
receivers = candidate_receivers
trace = temporal_twin_motif_trace(timestamps, receivers)
best_receivers = receivers.copy()
best_hits = candidate_hits
out["receiver_id"] = best_receivers.astype(np.int32)
return out
def _repair_benign_twin_segmented(
self,
user_df: pd.DataFrame,
label_boundaries: list[int],
) -> pd.DataFrame:
"""Reduce benign motif hits while preserving each matched prefix segment multiset."""
out = user_df.copy().sort_values("timestamp").reset_index(drop=True)
receivers = out["receiver_id"].to_numpy(dtype=np.int64).copy()
timestamps = out["timestamp"].to_numpy(dtype=np.float64)
n = len(receivers)
if n == 0:
return out
boundaries = sorted(int(boundary) for boundary in label_boundaries if 0 <= int(boundary) < n)
if not boundaries or boundaries[-1] != n - 1:
boundaries.append(n - 1)
segments: list[tuple[int, int]] = []
start = 0
for end in boundaries:
segments.append((start, end))
start = end + 1
def segment_bounds(idx: int) -> tuple[int, int]:
for lo, hi in segments:
if lo <= idx <= hi:
return lo, hi
return 0, n - 1
trace = temporal_twin_motif_trace(timestamps, receivers)
if int(np.sum(trace["source"])) == 0:
return out
best_receivers = receivers.copy()
best_hits = int(np.sum(trace["source"]))
for _ in range(_BENIGN_MOTIF_REPAIR_STEPS * 2):
source_positions = np.flatnonzero(trace["source"]).tolist()
if not source_positions:
out["receiver_id"] = receivers.astype(np.int32)
return out
src_idx = int(source_positions[0])
seg_lo, seg_hi = segment_bounds(src_idx)
candidate_receivers = None
candidate_hits = best_hits
for swap_offset in (1, -1, 2, -2, 3, -3, 4, -4):
swap_idx = src_idx + swap_offset
if swap_idx < seg_lo or swap_idx > seg_hi:
continue
if receivers[swap_idx] == receivers[src_idx]:
continue
trial = receivers.copy()
trial[src_idx], trial[swap_idx] = trial[swap_idx], trial[src_idx]
trial_hits = int(np.sum(temporal_twin_motif_trace(timestamps, trial)["source"]))
if trial_hits < candidate_hits:
candidate_receivers = trial
candidate_hits = trial_hits
if trial_hits == 0:
break
if candidate_receivers is None:
continue
receivers = candidate_receivers
trace = temporal_twin_motif_trace(timestamps, receivers)
best_receivers = receivers.copy()
best_hits = candidate_hits
out["receiver_id"] = best_receivers.astype(np.int32)
return out
def _order_deltas(self, deltas: np.ndarray, role: str) -> np.ndarray:
deltas = np.asarray(deltas, dtype=np.float64)
if len(deltas) == 0:
return deltas
deltas = np.clip(deltas, 60.0, None)
short_q = float(np.quantile(deltas, 0.55))
long_q = float(np.quantile(deltas, 0.82))
shorts = list(np.sort(deltas[deltas <= short_q]).astype(np.float64))
mediums = list(np.sort(deltas[(deltas > short_q) & (deltas < long_q)]).astype(np.float64))
longs = list(np.sort(deltas[deltas >= long_q])[::-1].astype(np.float64))
def pop_front(pool):
return pool.pop(0) if pool else None
def pop_back(pool):
return pool.pop() if pool else None
def pop_short():
return pop_front(shorts)
def pop_short_fast():
return pop_front(shorts)
def pop_short_slow():
return pop_back(shorts) if shorts else None
def pop_medium():
if mediums:
return pop_front(mediums)
if len(shorts) >= 2:
return pop_back(shorts)
if longs:
return pop_back(longs)
return None
def pop_long():
if longs:
return pop_front(longs)
if mediums:
return pop_back(mediums)
if shorts:
return pop_back(shorts)
return None
def pop_any():
for getter in (pop_medium, pop_long, pop_short):
value = getter()
if value is not None:
return value
return None
ordered: list[float] = []
if self._is_standard_temporal_twins():
recipe_name = self._standard_twin_profile()["delta_recipe"]
if recipe_name == "easy":
motif_recipe = [
pop_long,
pop_medium,
pop_short_slow,
pop_short_fast,
pop_long,
pop_short_slow,
pop_short_fast,
]
elif recipe_name == "medium":
motif_recipe = [
pop_long,
pop_medium,
pop_short_slow,
pop_medium,
pop_short_fast,
pop_long,
pop_medium,
pop_short_fast,
]
else:
motif_recipe = [
pop_long,
pop_medium,
pop_short_slow,
pop_medium,
pop_short_fast,
pop_long,
pop_medium,
pop_short_slow,
pop_short_fast,
]
else:
motif_recipe = [
pop_long, # quiet period
pop_medium, # accelerating cadence starts
pop_short_slow,
pop_short_fast, # delayed revisit lands here
pop_long, # release
pop_short_slow,
pop_short_fast, # burst-release-burst completion
]
while len(ordered) < len(deltas):
if self._is_standard_temporal_twins():
if self.rng.random() > float(self._standard_twin_profile()["motif_cycle_prob"]):
value = pop_any()
if value is None:
break
ordered.append(float(value))
continue
emitted = False
for getter in motif_recipe:
value = getter()
if value is None:
continue
ordered.append(float(value))
emitted = True
if len(ordered) >= len(deltas):
break
if not emitted:
value = pop_any()
if value is None:
break
ordered.append(float(value))
if len(ordered) != len(deltas):
fallback = np.sort(deltas)
ordered = list(fallback[: len(deltas)])
return np.asarray(ordered, dtype=np.float64)
def _order_receivers(
self,
receivers: np.ndarray,
role: str,
timestamps: np.ndarray | None = None,
) -> list[int]:
if role == "benign" and timestamps is not None:
return self._order_receivers_benign_greedy(
receivers=np.asarray(receivers, dtype=np.int64),
timestamps=np.asarray(timestamps, dtype=np.float64),
)
counts = Counter(int(receiver_id) for receiver_id in receivers.tolist())
ordered: list[int] = []
def sorted_candidates(exclude: set[int] | None = None):
exclude = exclude or set()
return [
receiver
for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))
if count > 0 and receiver not in exclude
]
def pop_receiver(exclude: set[int] | None = None):
candidates = sorted_candidates(exclude=exclude)
if not candidates:
return None
receiver = int(candidates[0])
counts[receiver] -= 1
return receiver
while len(ordered) < len(receivers):
if role == "fraud":
anchor = next(
(
receiver
for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))
if count >= 2
),
None,
)
inject_block = True
if self._is_standard_temporal_twins():
inject_block = self.rng.random() <= float(self._standard_twin_profile()["fraud_block_prob"])
if inject_block and anchor is not None and len(receivers) - len(ordered) >= 8:
fillers = []
used_in_block = {int(anchor)}
for _ in range(6):
filler = pop_receiver(exclude=used_in_block)
if filler is None:
break
fillers.append(filler)
used_in_block.add(int(filler))
if len(fillers) == 6:
counts[int(anchor)] -= 2
if self._is_standard_temporal_twins():
gap = int(self._standard_twin_profile()["receiver_gap"])
block = [int(anchor)]
block.extend(fillers[: gap - 1])
block.append(int(anchor))
block.extend(fillers[gap - 1 :])
ordered.extend(block[:8])
else:
ordered.extend(
[
int(anchor),
fillers[0],
fillers[1],
int(anchor),
fillers[2],
fillers[3],
fillers[4],
fillers[5],
]
)
continue
for filler in fillers:
counts[int(filler)] += 1
if role == "benign":
anchor = next(
(
receiver
for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))
if count >= 2
),
None,
)
if anchor is not None and len(receivers) - len(ordered) >= 8:
fillers = []
used_in_block = {int(anchor)}
for _ in range(6):
filler = pop_receiver(exclude=used_in_block)
if filler is None:
break
fillers.append(filler)
used_in_block.add(int(filler))
if len(fillers) == 6:
counts[int(anchor)] -= 2
ordered.extend(
[
int(anchor),
fillers[0],
int(anchor),
fillers[1],
fillers[2],
fillers[3],
fillers[4],
fillers[5],
]
)
continue
for filler in fillers:
counts[int(filler)] += 1
exclude = {int(ordered[-1])} if ordered else set()
chosen = pop_receiver(exclude=exclude)
if chosen is not None:
ordered.append(chosen)
continue
chosen = pop_receiver(exclude=None)
if chosen is not None:
ordered.append(chosen)
continue
return ordered[: len(receivers)]
def _select_standard_twin_sources(
self,
trace: dict,
n_events: int,
) -> list[tuple[int, bool]]:
profile = self._standard_twin_profile()
target_events = max(
int(profile["min_events"]),
min(int(profile["max_events_cap"]), max(1, n_events // int(profile["event_divisor"]))),
)
min_idx = 7
source_positions = [
int(pos)
for pos in np.flatnonzero(trace["source"]).tolist()
if int(pos) >= min_idx
]
ranked_chain = [
int(pos)
for pos in np.argsort(trace["chain"])[::-1].tolist()
if int(pos) >= min_idx
]
chain_only = [pos for pos in ranked_chain if pos not in set(source_positions)]
if source_positions:
keep_n = int(np.ceil(len(source_positions) * float(profile["source_keep_frac"])))
keep_n = max(int(profile["min_true_sources"]), min(len(source_positions), keep_n))
else:
keep_n = 0
source_pool_n = min(
len(source_positions),
max(keep_n, int(np.ceil(keep_n * float(profile["source_pool_factor"])))),
)
source_pool = source_positions[:source_pool_n]
if keep_n > 0 and len(source_pool) > keep_n:
sampled_true = self.rng.choice(np.asarray(source_pool, dtype=np.int64), size=keep_n, replace=False)
true_sources = sorted(int(pos) for pos in sampled_true.tolist())
else:
true_sources = source_pool[:keep_n]
selected: list[tuple[int, bool]] = [(pos, False) for pos in true_sources]
used = {pos for pos, _ in selected}
fallback_cap = int(profile["max_chain_fallback"])
chain_pool_n = min(
len(chain_only),
max(fallback_cap, int(np.ceil(fallback_cap * float(profile["chain_pool_factor"])))),
)
chain_pool = chain_only[:chain_pool_n]
if fallback_cap > 0 and len(chain_pool) > fallback_cap:
sampled_chain = self.rng.choice(np.asarray(chain_pool, dtype=np.int64), size=fallback_cap, replace=False)
chain_choices = sorted(int(pos) for pos in sampled_chain.tolist())
else:
chain_choices = chain_pool[:fallback_cap]
for pos in chain_choices:
if len(selected) >= target_events:
break
selected.append((pos, True))
used.add(pos)
if not selected:
fallback_candidates = ranked_chain[:target_events]
selected = [(pos, True) for pos in fallback_candidates]
if len(selected) < target_events:
for pos in source_positions[keep_n:]:
if pos in used:
continue
selected.append((pos, False))
used.add(pos)
if len(selected) >= target_events:
break
if len(selected) < target_events:
for pos in ranked_chain:
if pos in used:
continue
selected.append((pos, True))
used.add(pos)
if len(selected) >= target_events:
break
selected.sort(key=lambda item: item[0])
return selected[:target_events]
def _order_receivers_benign_greedy(
self,
receivers: np.ndarray,
timestamps: np.ndarray,
) -> list[int]:
"""Build a benign ordering that avoids 3..8-step receiver revisits."""
counts = Counter(int(receiver_id) for receiver_id in receivers.tolist())
ordered: list[int] = []
last_pos: dict[int, int] = {}
while len(ordered) < len(receivers):
best_receiver = None
best_key = None
for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])):
if count <= 0:
continue
prev = last_pos.get(int(receiver))
if prev is None:
revisit_penalty = 0
adjacent_bonus = 1
long_gap_bonus = 1
else:
gap = len(ordered) - prev
revisit_penalty = 1 if 3 <= gap <= 8 else 0
adjacent_bonus = 0 if gap <= 2 else 1
long_gap_bonus = 0 if gap > 8 else 1
key = (
revisit_penalty,
adjacent_bonus,
long_gap_bonus,
-int(count),
int(receiver),
)
if best_key is None or key < best_key:
best_key = key
best_receiver = int(receiver)
assert best_receiver is not None
counts[best_receiver] -= 1
ordered.append(best_receiver)
last_pos[best_receiver] = len(ordered) - 1
return ordered
def _order_receivers_benign_matched(
self,
fraud_receivers: np.ndarray,
label_boundaries: list[int],
timestamps: np.ndarray,
) -> list[int]:
"""Match fraud prefix histograms at every label boundary while reordering within segments."""
n = len(fraud_receivers)
if n == 0:
return []
boundaries = sorted(
int(boundary)
for boundary in label_boundaries
if 0 <= int(boundary) < n
)
if not boundaries or boundaries[-1] != n - 1:
boundaries.append(n - 1)
ordered: list[int] = []
last_pos: dict[int, int] = {}
start = 0
for end in boundaries:
segment = fraud_receivers[start : end + 1]
ordered.extend(
self._order_benign_segment(
segment_receivers=segment,
ordered_prefix=ordered,
last_pos=last_pos,
full_timestamps=np.asarray(timestamps[: end + 1], dtype=np.float64),
)
)
start = end + 1
return ordered
def _order_benign_segment(
self,
segment_receivers: np.ndarray,
ordered_prefix: list[int],
last_pos: dict[int, int],
full_timestamps: np.ndarray,
) -> list[int]:
counts = Counter(int(receiver_id) for receiver_id in segment_receivers.tolist())
segment_out: list[int] = []
while len(segment_out) < len(segment_receivers):
best_receiver = None
best_key = None
global_idx = len(ordered_prefix) + len(segment_out)
for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])):
if count <= 0:
continue
prev = last_pos.get(int(receiver))
if prev is None:
revisit_penalty = 0
seen_penalty = 0
adjacent_bonus = 1
long_gap_bonus = 1
else:
gap = global_idx - prev
revisit_penalty = 1 if 3 <= gap <= 8 else 0
seen_penalty = 1
adjacent_bonus = 0 if gap <= 2 else 1
long_gap_bonus = 0 if gap > 8 else 1
key = (
revisit_penalty,
adjacent_bonus,
long_gap_bonus,
seen_penalty,
-int(count),
int(receiver),
)
if best_key is None or key < best_key:
best_key = key
best_receiver = int(receiver)
assert best_receiver is not None
counts[best_receiver] -= 1
segment_out.append(best_receiver)
last_pos[best_receiver] = global_idx
return segment_out
# ------------------------------------------------------------------
# Label-assignment: shared helpers
# ------------------------------------------------------------------
def _attach_audit_columns(
self,
out: pd.DataFrame,
fraud_flags: np.ndarray,
trigger_idxs: list, # list of (target_idx, src_idx) tuples
is_fallback: np.ndarray,
trace: dict,
) -> pd.DataFrame:
"""Attach per-event audit columns to the twin user DataFrame."""
n = len(out)
motif_hit_count = int(np.sum(trace["source"]))
fraud_source_col = np.full(n, "none", dtype=object)
trigger_event_idx_col = np.full(n, -1, dtype=np.int32)
label_event_idx_col = np.full(n, -1, dtype=np.int32)
label_delay_col = np.full(n, -1, dtype=np.int32)
for target_idx, src_idx in trigger_idxs:
fraud_source_col[target_idx] = "motif" if not is_fallback[target_idx] else "chain_fallback"
trigger_event_idx_col[target_idx] = int(src_idx)
label_event_idx_col[target_idx] = int(target_idx)
label_delay_col[target_idx] = int(target_idx - src_idx)
out["fraud_source"] = fraud_source_col
out["motif_hit_count"] = motif_hit_count
out["trigger_event_idx"] = trigger_event_idx_col
out["label_event_idx"] = label_event_idx_col
out["label_delay"] = label_delay_col
out["is_fallback_label"] = is_fallback.astype(np.int8)
return out
# ------------------------------------------------------------------
# Standard mode: motif hits preferred, chain-rank fallback allowed
# ------------------------------------------------------------------
def _apply_twin_labels_standard(self, user_df: pd.DataFrame, role: str) -> pd.DataFrame:
out = user_df.copy().sort_values("timestamp").reset_index(drop=True)
n = len(out)
empty_audit = {
"fraud_source": np.full(n, "none", dtype=object),
"motif_hit_count": 0,
"trigger_event_idx": np.full(n, -1, dtype=np.int32),
"label_event_idx": np.full(n, -1, dtype=np.int32),
"label_delay": np.full(n, -1, dtype=np.int32),
"is_fallback_label": np.zeros(n, dtype=np.int8),
}
if n == 0:
out["dynamic_fraud_state"] = np.zeros(0, dtype=np.float32)
out["motif_source"] = np.zeros(0, dtype=np.int8)
out["motif_chain_state"] = np.zeros(0, dtype=np.float32)
out["motif_strength"] = np.zeros(0, dtype=np.float32)
for col, val in empty_audit.items():
out[col] = val if isinstance(val, int) else val
return out
timestamps = out["timestamp"].to_numpy(dtype=np.float64)
receivers = out["receiver_id"].to_numpy(dtype=np.int64)
trace = temporal_twin_motif_trace(timestamps, receivers)
state = trace["state"].copy()
fraud_flags = np.zeros(n, dtype=np.int8)
fraud_type = np.full(n, "none", dtype=object)
is_fallback = np.zeros(n, dtype=np.int8)
source_positions = np.flatnonzero(trace["source"]).tolist()
trigger_pairs: list = [] # (target_idx, src_idx)
if role == "fraud":
if self._is_standard_temporal_twins():
selected_sources = self._select_standard_twin_sources(trace, n)
else:
max_events = max(4, min(12, n // 5))
used_fallback = False
if not source_positions:
ranked = np.argsort(trace["chain"])[::-1]
source_positions = [int(pos) for pos in ranked if int(pos) >= 7][:max_events]
used_fallback = True
selected_sources = [(src, used_fallback) for src in source_positions[:max_events]]
used_targets = set()
for src, used_fallback in selected_sources:
if src >= n - 1:
target = src
else:
if self._is_standard_temporal_twins():
delay_lo, delay_hi = self._standard_twin_profile()["delay_range"]
sampled_delay = int(self.rng.integers(delay_lo, delay_hi + 1))
else:
sampled_delay = int(self.rng.integers(6, 17))
delay = min(sampled_delay, (n - 1) - src)
target = src + max(delay, 1)
if target in used_targets:
continue
used_targets.add(target)
fraud_flags[target] = 1
fraud_type[target] = "temporal_twin"
if used_fallback:
is_fallback[target] = 1
trigger_pairs.append((target, src))
lo = max(0, src)
hi = min(n, target + 1)
ramp = np.linspace(0.15, 0.85, num=max(1, hi - lo), dtype=np.float32)
state[lo:hi] += ramp
out["motif_source"] = trace["source"].astype(np.int8)
out["motif_chain_state"] = trace["chain"].astype(np.float32)
out["motif_strength"] = trace["motif_strength"].astype(np.float32)
out["dynamic_fraud_state"] = state.astype(np.float32)
out["is_fraud"] = fraud_flags.astype(np.int8)
out["fraud_type"] = fraud_type
return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace)
# ------------------------------------------------------------------
# Calib mode: ONLY true motif hits allowed — zero fallback
# ------------------------------------------------------------------
def _apply_twin_labels_calib(self, user_df: pd.DataFrame, role: str) -> pd.DataFrame:
out = user_df.copy().sort_values("timestamp").reset_index(drop=True)
n = len(out)
if n == 0:
out["dynamic_fraud_state"] = np.zeros(0, dtype=np.float32)
out["motif_source"] = np.zeros(0, dtype=np.int8)
out["motif_chain_state"] = np.zeros(0, dtype=np.float32)
out["motif_strength"] = np.zeros(0, dtype=np.float32)
for col in ("fraud_source", "motif_hit_count", "trigger_event_idx",
"label_event_idx", "label_delay", "is_fallback_label"):
out[col] = 0
return out
timestamps = out["timestamp"].to_numpy(dtype=np.float64)
receivers = out["receiver_id"].to_numpy(dtype=np.int64)
trace = temporal_twin_motif_trace(timestamps, receivers)
state = trace["state"].copy()
fraud_flags = np.zeros(n, dtype=np.int8)
fraud_type = np.full(n, "none", dtype=object)
is_fallback = np.zeros(n, dtype=np.int8) # always 0 in calib
trigger_pairs: list = []
if role == "fraud":
source_positions = np.flatnonzero(trace["source"]).tolist()
# No fallback: if 0 motif sources → return with all-zero fraud flags
# (caller will retry or drop the pair)
if not source_positions:
# Still attach trace metadata but produce no positive labels
out["motif_source"] = trace["source"].astype(np.int8)
out["motif_chain_state"] = trace["chain"].astype(np.float32)
out["motif_strength"] = trace["motif_strength"].astype(np.float32)
out["dynamic_fraud_state"] = state.astype(np.float32)
out["is_fraud"] = np.zeros(n, dtype=np.int8)
out["fraud_type"] = fraud_type
return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace)
max_events = max(4, min(12, n // 5))
used_targets = set()
for src in source_positions[:max_events]:
if src >= n - 1:
target = src
else:
delay = min(int(self.rng.integers(6, 17)), (n - 1) - src)
target = src + max(delay, 1)
if target in used_targets:
continue
used_targets.add(target)
fraud_flags[target] = 1
fraud_type[target] = "temporal_twin_calib"
trigger_pairs.append((target, src))
lo = max(0, src)
hi = min(n, target + 1)
ramp = np.linspace(0.15, 0.85, num=max(1, hi - lo), dtype=np.float32)
state[lo:hi] += ramp
out["motif_source"] = trace["source"].astype(np.int8)
out["motif_chain_state"] = trace["chain"].astype(np.float32)
out["motif_strength"] = trace["motif_strength"].astype(np.float32)
out["dynamic_fraud_state"] = state.astype(np.float32)
out["is_fraud"] = fraud_flags.astype(np.int8)
out["fraud_type"] = fraud_type
return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace)
def _finalise_temporal_twin_features(self, df: pd.DataFrame) -> pd.DataFrame:
out = df.copy().sort_values("timestamp").reset_index(drop=True)
n = len(out)
out["amount"] = np.zeros(n, dtype=np.float32)
out["risk_score"] = np.zeros(n, dtype=np.float32)
out["fail_prob"] = np.zeros(n, dtype=np.float32)
out["risk_noisy"] = np.zeros(n, dtype=np.float32)
out["neighbor_score"] = np.zeros(n, dtype=np.float32)
out["pair_freq"] = np.zeros(n, dtype=np.float32)
out["txn_count_10"] = (
out.groupby("sender_id")["timestamp"]
.transform(lambda x: x.rolling(10, min_periods=1).count())
.astype(np.float32)
)
out["amount_sum_10"] = (
out.groupby("sender_id")["amount"]
.transform(lambda x: x.rolling(10, min_periods=1).sum())
.astype(np.float32)
)
out["is_fraud"] = out["is_fraud"].astype(np.int8)
out["is_retry"] = out["is_retry"].astype(np.int8)
out["failed"] = out["failed"].astype(np.int8)
out["twin_pair_id"] = out["twin_pair_id"].astype(np.int32)
out["template_id"] = out["template_id"].astype(np.int32)
out["twin_label"] = out["twin_label"].astype(np.int8)
out["receiver_id"] = out["receiver_id"].astype(np.int32)
out["sender_id"] = out["sender_id"].astype(np.int32)
if "motif_source" in out.columns:
out["motif_source"] = out["motif_source"].astype(np.int8)
# Audit columns: fill defaults for background users, then cast
for col, default, dtype in (
("motif_hit_count", 0, np.int32),
("trigger_event_idx", -1, np.int32),
("label_event_idx", -1, np.int32),
("label_delay", -1, np.int32),
("is_fallback_label", 0, np.int8),
):
if col in out.columns:
out[col] = out[col].fillna(default).astype(dtype)
else:
out[col] = np.full(n, default, dtype=dtype)
if "fraud_source" not in out.columns:
out["fraud_source"] = np.full(n, "none", dtype=object)
else:
out["fraud_source"] = out["fraud_source"].fillna("none")
return out