| import numpy as np |
| import pandas as pd |
| from collections import Counter |
|
|
| |
| |
| |
| ORACLE_ONLY_COLS: frozenset = frozenset({ |
| "motif_hit_count", |
| "motif_source", |
| "trigger_event_idx", |
| "label_event_idx", |
| "label_delay", |
| "is_fallback_label", |
| "fraud_source", |
| "twin_role", |
| "twin_label", |
| "twin_pair_id", |
| "template_id", |
| "dynamic_fraud_state", |
| "motif_chain_state", |
| "motif_strength", |
| }) |
|
|
|
|
| |
| |
| |
| DIFFICULTY_PRESETS = { |
| "easy": { |
| "noise_std": 0.2, |
| "quantile_type": 0.90, |
| "quantile_suspicious": 0.92, |
| "pair_freq_mult": 0.7, |
| "velocity_logit": 0.20, |
| "burst_divisor": 10.0, |
| "retry_logit": 0.8, |
| "ring_logit": 1.2, |
| "global_noise": 0.4, |
| "graph_feat_noise": 0.0, |
| "delayed_fraction": 0.0, |
| "thresh_velocity": 0.93, |
| "thresh_burst": 0.90, |
| "thresh_retry": 0.88, |
| "thresh_ring": 0.90, |
| "thresh_none": 0.9995, |
| }, |
| "medium": { |
| "noise_std": 0.3, |
| "quantile_type": 0.94, |
| "quantile_suspicious": 0.96, |
| "pair_freq_mult": 0.35, |
| "velocity_logit": 0.15, |
| "burst_divisor": 12.0, |
| "retry_logit": 0.6, |
| "ring_logit": 0.5, |
| "global_noise": 0.7, |
| "graph_feat_noise": 0.2, |
| "delayed_fraction": 0.3, |
| "thresh_velocity": 0.95, |
| "thresh_burst": 0.93, |
| "thresh_retry": 0.92, |
| "thresh_ring": 0.95, |
| "thresh_none": 0.9998, |
| }, |
| "hard": { |
| "noise_std": 0.4, |
| "quantile_type": 0.97, |
| "quantile_suspicious": 0.98, |
| "pair_freq_mult": 0.2, |
| "velocity_logit": 0.12, |
| "burst_divisor": 15.0, |
| "retry_logit": 0.5, |
| "ring_logit": 0.15, |
| "global_noise": 1.5, |
| "graph_feat_noise": 0.5, |
| "delayed_fraction": 0.5, |
| "thresh_velocity": 0.97, |
| "thresh_burst": 0.96, |
| "thresh_retry": 0.96, |
| "thresh_ring": 0.98, |
| "thresh_none": 0.9999, |
| }, |
| } |
|
|
|
|
| TEMPORAL_TWIN_STANDARD_PROFILES = { |
| "easy": { |
| "receiver_gap": 3, |
| "delta_recipe": "easy", |
| "event_divisor": 4, |
| "min_events": 5, |
| "max_events_cap": 12, |
| "source_keep_frac": 1.00, |
| "min_true_sources": 4, |
| "max_chain_fallback": 1, |
| "delay_range": (4, 9), |
| "source_pool_factor": 1.0, |
| "chain_pool_factor": 1.0, |
| "fraud_block_prob": 1.0, |
| "motif_cycle_prob": 1.0, |
| "camouflage_prob": 0.0, |
| }, |
| "medium": { |
| "receiver_gap": 4, |
| "delta_recipe": "medium", |
| "event_divisor": 5, |
| "min_events": 4, |
| "max_events_cap": 10, |
| "source_keep_frac": 0.75, |
| "min_true_sources": 3, |
| "max_chain_fallback": 3, |
| "delay_range": (7, 14), |
| "source_pool_factor": 2.0, |
| "chain_pool_factor": 2.0, |
| "fraud_block_prob": 0.30, |
| "motif_cycle_prob": 0.40, |
| "camouflage_prob": 0.60, |
| }, |
| "hard": { |
| "receiver_gap": 5, |
| "delta_recipe": "hard", |
| "event_divisor": 6, |
| "min_events": 4, |
| "max_events_cap": 8, |
| "source_keep_frac": 0.45, |
| "min_true_sources": 2, |
| "max_chain_fallback": 5, |
| "delay_range": (10, 20), |
| "source_pool_factor": 3.0, |
| "chain_pool_factor": 3.0, |
| "fraud_block_prob": 0.22, |
| "motif_cycle_prob": 0.28, |
| "camouflage_prob": 0.78, |
| }, |
| } |
|
|
|
|
| def temporal_twin_motif_trace( |
| timestamps: np.ndarray, |
| receivers: np.ndarray, |
| ) -> dict: |
| """Shared finite-state motif program for temporal-twin calibration. |
| |
| The signal intentionally depends on event order and timing only: |
| quiet -> accelerating cadence -> delayed receiver revisit -> burst-release-burst |
| """ |
| timestamps = np.asarray(timestamps, dtype=np.float64) |
| receivers = np.asarray(receivers, dtype=np.int64) |
| n = len(timestamps) |
| empty = np.zeros(n, dtype=np.float32) |
| if n == 0: |
| return { |
| "state": empty, |
| "chain": empty, |
| "motif_strength": empty, |
| "quiet": empty, |
| "accel": empty, |
| "revisit": empty, |
| "burst_release_burst": empty, |
| "source": np.zeros(n, dtype=np.int8), |
| } |
|
|
| if n > 1: |
| dts = np.diff(timestamps) |
| base_dts = np.clip(dts, 60.0, None) |
| else: |
| base_dts = np.array([1800.0], dtype=np.float64) |
|
|
| short_q = float(np.quantile(base_dts, 0.55)) |
| medium_q = float(np.quantile(base_dts, 0.70)) |
| long_q = float(np.quantile(base_dts, 0.82)) |
| short_q = max(short_q, 60.0) |
| medium_q = max(medium_q, short_q * 1.10) |
| long_q = max(long_q, medium_q * 1.15) |
|
|
| state = np.zeros(n, dtype=np.float32) |
| chain = np.zeros(n, dtype=np.float32) |
| motif_strength = np.zeros(n, dtype=np.float32) |
| quiet_flags = np.zeros(n, dtype=np.float32) |
| accel_flags = np.zeros(n, dtype=np.float32) |
| revisit_flags = np.zeros(n, dtype=np.float32) |
| brb_flags = np.zeros(n, dtype=np.float32) |
| source = np.zeros(n, dtype=np.int8) |
|
|
| prev_dts = [long_q, long_q, long_q, long_q] |
| receiver_last_idx: dict[int, int] = {} |
| recent_accel = 0.0 |
| recent_revisit = 0.0 |
| recent_brb = 0.0 |
| chain_state = 0.0 |
| hidden_state = 0.0 |
| last_source = -99 |
|
|
| for idx in range(n): |
| dt = long_q if idx == 0 else max(float(timestamps[idx] - timestamps[idx - 1]), 60.0) |
| current_receiver = int(receivers[idx]) |
|
|
| quiet = float(prev_dts[-1] >= long_q) |
| accel = float( |
| prev_dts[-3] >= long_q |
| and prev_dts[-2] > prev_dts[-1] > dt |
| and dt <= short_q |
| ) |
| gap_events = idx - receiver_last_idx.get(current_receiver, idx) |
| revisit = float( |
| current_receiver in receiver_last_idx |
| and 3 <= gap_events <= 8 |
| and max(prev_dts[-2], prev_dts[-1]) >= long_q * 0.85 |
| ) |
| burst_release_burst = float( |
| prev_dts[-3] <= short_q |
| and prev_dts[-2] >= long_q |
| and prev_dts[-1] <= short_q |
| and dt <= short_q |
| ) |
|
|
| recent_accel = max(0.0, 0.86 * recent_accel + accel) |
| recent_revisit = max(0.0, 0.88 * recent_revisit + revisit) |
| recent_brb = max(0.0, 0.88 * recent_brb + burst_release_burst) |
|
|
| local_speed = max(0.0, (short_q / max(dt, 60.0)) - 0.55) |
| signal = ( |
| 1.20 * accel |
| + 1.25 * revisit |
| + 1.10 * burst_release_burst |
| + 0.30 * quiet |
| + 0.20 * local_speed |
| ) |
| chain_state = max( |
| 0.0, |
| 0.82 * chain_state |
| + 0.75 * signal |
| + 0.22 * min(recent_accel, 1.0) |
| + 0.28 * min(recent_revisit, 1.0) |
| + 0.24 * min(recent_brb, 1.0) |
| - 0.30, |
| ) |
| hidden_state = max(0.0, 0.97 * hidden_state + 0.22 * chain_state + 0.34 * signal) |
|
|
| if ( |
| idx >= 6 |
| and burst_release_burst > 0.0 |
| and recent_accel > 0.20 |
| and recent_revisit > 0.30 |
| and chain_state > 0.80 |
| and idx - last_source >= 4 |
| ): |
| source[idx] = 1 |
| last_source = idx |
|
|
| quiet_flags[idx] = quiet |
| accel_flags[idx] = accel |
| revisit_flags[idx] = revisit |
| brb_flags[idx] = burst_release_burst |
| motif_strength[idx] = signal |
| chain[idx] = chain_state |
| state[idx] = hidden_state |
| receiver_last_idx[current_receiver] = idx |
| prev_dts = (prev_dts + [dt])[-4:] |
|
|
| return { |
| "state": state.astype(np.float32), |
| "chain": chain.astype(np.float32), |
| "motif_strength": motif_strength.astype(np.float32), |
| "quiet": quiet_flags.astype(np.float32), |
| "accel": accel_flags.astype(np.float32), |
| "revisit": revisit_flags.astype(np.float32), |
| "burst_release_burst": brb_flags.astype(np.float32), |
| "source": source.astype(np.int8), |
| } |
|
|
|
|
| |
| _CALIB_MOTIF_RETRY_BUDGET = 8 |
| _BENIGN_MOTIF_REPAIR_STEPS = 16 |
|
|
|
|
| class FraudEngine: |
| def __init__(self, seed=42, difficulty="medium", benchmark_mode="temporal_twins"): |
| self.rng = np.random.default_rng(seed) |
| self.difficulty = difficulty |
| self.benchmark_mode = benchmark_mode |
| self.params = DIFFICULTY_PRESETS[difficulty] |
|
|
| def apply(self, df: pd.DataFrame) -> pd.DataFrame: |
| if self.benchmark_mode in ("temporal_twins", "temporal_twins_oracle_calib"): |
| return self._apply_temporal_twins(df) |
|
|
| df = df.copy() |
| df = df.sort_values("timestamp").reset_index(drop=True) |
| p = self.params |
|
|
| n = len(df) |
|
|
| |
| |
| |
| noise = self.rng.normal(0, p["noise_std"], size=n) |
| df["risk_noisy"] = df["risk_score"] * 0.2 + noise |
|
|
| df["txn_count_10"] = ( |
| df.groupby("sender_id")["timestamp"] |
| .transform(lambda x: x.rolling(10, min_periods=1).count()) |
| ) |
|
|
| df["amount_sum_10"] = ( |
| df.groupby("sender_id")["amount"] |
| .transform(lambda x: x.rolling(10, min_periods=1).sum()) |
| ) |
|
|
| velocity = df["txn_count_10"] * 0.6 + df["amount_sum_10"] * 0.0002 |
|
|
| retry_signal = ( |
| df["is_retry"] * 1.2 + |
| df["failed"] * 1.5 + |
| df["fail_prob"] * 0.7 |
| ) |
|
|
| |
| |
| |
| q_type = p["quantile_type"] |
| q_susp = p["quantile_suspicious"] |
|
|
| velocity_q_type = velocity.quantile(q_type) |
| velocity_q_susp = velocity.quantile(q_susp) |
| txn_q_type = df["txn_count_10"].quantile(q_type) |
| retry_q_type = retry_signal.quantile(q_type) |
| retry_q_susp = retry_signal.quantile(q_susp) |
|
|
| |
| |
| |
| import math |
| neighbor_score = np.zeros(n, dtype=np.float32) |
| recent = {} |
| |
| |
| velocity_arr = velocity.to_numpy().tolist() |
| retry_arr = retry_signal.to_numpy().tolist() |
| sender_arr = df["sender_id"].to_numpy().tolist() |
| receiver_arr = df["receiver_id"].to_numpy().tolist() |
| time_arr = df["timestamp"].to_numpy().tolist() |
|
|
| for i in range(n): |
| s = sender_arr[i] |
| r = receiver_arr[i] |
|
|
| score = recent.get(s, 0.0) + recent.get(r, 0.0) |
| neighbor_score[i] = math.tanh(score) |
|
|
| suspicious = ( |
| velocity_arr[i] > velocity_q_susp |
| or retry_arr[i] > retry_q_susp |
| ) |
|
|
| if suspicious: |
| recent[s] = recent.get(s, 0.0) + 1.0 |
| recent[r] = recent.get(r, 0.0) + 1.0 |
| else: |
| if s in recent: |
| recent[s] *= 0.9 |
| if r in recent: |
| recent[r] *= 0.9 |
| |
| df["neighbor_score"] = neighbor_score |
|
|
| |
| |
| |
| pairs = list(zip(df["sender_id"], df["receiver_id"])) |
| pair_counts = pd.Series(pairs).value_counts() |
|
|
| df["pair_freq"] = [pair_counts[(s, r)] for s, r in pairs] |
| df["pair_freq"] = np.log1p(df["pair_freq"]) * p["pair_freq_mult"] |
|
|
| |
| if p["graph_feat_noise"] > 0: |
| gf_noise = p["graph_feat_noise"] |
| df["pair_freq"] += self.rng.normal(0, gf_noise, size=n) |
| df["neighbor_score"] += self.rng.normal(0, gf_noise * 0.5, size=n) |
|
|
| |
| |
| |
| |
| |
| df["fraud_type"] = "none" |
| df["is_fraud"] = 0 |
|
|
| |
| df["amount"] = self.rng.normal(0, 1, size=n) |
| df["risk_score"] = self.rng.normal(0, 1, size=n) |
| df["fail_prob"] = self.rng.normal(0, 1, size=n) |
|
|
| |
| |
| |
| |
| |
| user_state = {} |
| last_txn = {} |
| |
| |
| thresh_state = {"easy": 6.0, "medium": 7.0, "hard": 8.5}[self.difficulty] |
| diff_scale = {"easy": 1.0, "medium": 0.8, "hard": 0.6}[self.difficulty] |
| |
| |
| velocity_idx = [] |
| ring_idx = [] |
| dynamic_state = np.zeros(n, dtype=np.float32) |
| ring_memory = {} |
| burst_memory = {} |
| receiver_history = {} |
| temporal_candidates = [] |
| cadence_ema = {} |
| user_event_pos = {} |
| cooldown_until = {} |
| cooldown_span = {"easy": 10, "medium": 12, "hard": 15}[self.difficulty] |
| |
| max_r = max(receiver_arr) if receiver_arr else 1 |
| |
| for i in range(n): |
| u = sender_arr[i] |
| r_id = receiver_arr[i] |
| t = time_arr[i] |
| user_event_pos[u] = user_event_pos.get(u, 0) + 1 |
| event_pos = user_event_pos[u] |
| can_trigger = event_pos >= cooldown_until.get(u, 0) |
| |
| prev_state = user_state.get(u, 0.0) |
| dt = t - last_txn.get(u, t) |
| last_txn[u] = t |
|
|
| |
| |
| prev_cadence = cadence_ema.get(u, 3600.0) |
| if dt == 0: |
| time_factor = 0.8 * diff_scale |
| else: |
| eff_dt = max(float(dt), 60.0) |
| rel_speed = prev_cadence / eff_dt |
| if rel_speed > 3.0: |
| time_factor = 1.8 * diff_scale |
| elif rel_speed > 1.8: |
| time_factor = 1.4 * diff_scale |
| elif rel_speed > 1.2: |
| time_factor = 1.0 * diff_scale |
| elif rel_speed > 0.8: |
| time_factor = 0.6 * diff_scale |
| else: |
| time_factor = 0.25 * diff_scale |
| cadence_ema[u] = 0.97 * prev_cadence + 0.03 * eff_dt |
| |
| |
| |
| |
| |
| if prev_state > (0.7 * thresh_state): |
| time_factor *= 0.6 |
|
|
| |
| if self.rng.random() < 0.02: |
| time_factor *= 1.5 |
| |
| |
| if prev_state > (0.8 * thresh_state) and self.rng.random() < 0.3: |
| r_id = self.rng.integers(0, max_r + 1) |
|
|
| hist = receiver_history.get(u, ()) |
| revisit_motif = len(hist) >= 2 and (r_id in hist[-3:]) and hist[-1] != r_id |
| |
| |
| noise = self.rng.normal(0, 0.03) |
| new_state = max(0.0, 0.975 * prev_state + 0.22 * time_factor + noise) |
|
|
| |
| if prev_state > (0.6 * thresh_state) and dt < 7200: |
| new_state += 0.3 * diff_scale |
|
|
| prev_burst = burst_memory.get(u, 0.0) |
| if dt < 600: |
| burst_impulse = 1.0 |
| elif dt < 1800: |
| burst_impulse = 0.4 |
| elif dt < 7200: |
| burst_impulse = -0.5 |
| else: |
| burst_impulse = -0.8 |
| burst_state = max(0.0, 0.92 * prev_burst + burst_impulse) |
| burst_memory[u] = burst_state |
|
|
| crossed_state = prev_state <= thresh_state and new_state > thresh_state |
| release_event = prev_burst > 2.5 and dt > 1800 |
| if revisit_motif and (release_event or crossed_state or prev_burst > 1.5): |
| temporal_candidates.append(i) |
|
|
| user_state[u] = new_state |
| |
| dynamic_state[i] = new_state |
| |
| |
| |
| |
| n_velocity_before = len(velocity_idx) |
| n_ring_before = len(ring_idx) |
|
|
| |
| |
| if can_trigger and revisit_motif and release_event and new_state > (0.75 * thresh_state): |
| if self.rng.random() < 0.12: |
| velocity_idx.append(i) |
|
|
| if self.difficulty == "easy": |
| |
| if can_trigger and revisit_motif and (crossed_state or (release_event and new_state > (0.85 * thresh_state))): |
| prob = min(0.55, 0.15 + 0.25 * (new_state / max(thresh_state, 1e-6))) |
| if self.rng.random() < prob: |
| velocity_idx.append(i) |
| |
| |
| |
| |
| key = tuple(sorted((u, r_id))) |
| prev_ring = ring_memory.get(key, 0.0) |
| ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0) |
| ring_cross = prev_ring <= 6.0 and ring_memory[key] > 6.0 |
| if can_trigger and revisit_motif and ring_cross and release_event: |
| ring_idx.append(i) |
|
|
| elif self.difficulty == "medium": |
| |
| if can_trigger and revisit_motif and crossed_state and release_event: |
| prob = min(0.45, 0.10 + 0.22 * (new_state / max(thresh_state * 1.2, 1e-6))) |
| if self.rng.random() < prob: |
| velocity_idx.append(i) |
|
|
| |
| if can_trigger and revisit_motif and retry_arr[i] > retry_q_type and release_event: |
| if self.rng.random() < 0.15: |
| velocity_idx.append(i) |
|
|
| |
| |
| |
| key = tuple(sorted((u, r_id))) |
| prev_ring = ring_memory.get(key, 0.0) |
| ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0) |
| ring_cross = prev_ring <= 5.0 and ring_memory[key] > 5.0 |
| if can_trigger and revisit_motif and ring_cross and (release_event or new_state > thresh_state): |
| ring_idx.append(i) |
|
|
| elif self.difficulty == "hard": |
| |
| |
| if can_trigger and revisit_motif and crossed_state and release_event and new_state > thresh_state: |
| if self.rng.random() < 0.1: |
| velocity_idx.append(i) |
| |
| |
| |
| |
| key = tuple(sorted((u, r_id))) |
| prev_ring = ring_memory.get(key, 0.0) |
| ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0) |
| ring_cross = prev_ring <= 3.5 and ring_memory[key] > 3.5 |
| |
| if can_trigger and revisit_motif and ring_cross and release_event and new_state > (0.65 * thresh_state): |
| ring_idx.append(i) |
|
|
| if can_trigger and ( |
| len(velocity_idx) > n_velocity_before or len(ring_idx) > n_ring_before |
| ): |
| cooldown_until[u] = event_pos + cooldown_span |
|
|
| receiver_history[u] = (hist + (r_id,))[-3:] |
|
|
| |
| df["dynamic_fraud_state"] = dynamic_state |
| |
| if ring_idx: |
| df.loc[ring_idx, "is_fraud"] = 1 |
| df.loc[ring_idx, "fraud_type"] = "graph_ring" |
| |
| |
| |
| if velocity_idx: |
| velocity_mask = df.index.isin(velocity_idx) & (df["fraud_type"] == "none") |
| df.loc[velocity_mask, "is_fraud"] = 1 |
| df.loc[velocity_mask, "fraud_type"] = "velocity" |
|
|
| |
| |
| |
| |
| |
| delayed_frac = { |
| "easy": 0.2, |
| "medium": 0.6, |
| "hard": 1.0 |
| }[self.difficulty] |
| if delayed_frac > 0: |
| fraud_idx = df[(df["is_fraud"] == 1)].index.to_numpy() |
| n_delay = int(len(fraud_idx) * delayed_frac) |
| if n_delay > 0: |
| delay_sources = self.rng.choice(fraud_idx, size=n_delay, replace=False) |
| |
| |
| user_groups = {k: v.to_numpy() for k, v in df.groupby("sender_id").groups.items()} |
| delayed_targets = [] |
| valid_sources = [] |
| |
| for src in delay_sources: |
| u = df._get_value(src, "sender_id") |
| idxs = user_groups[u] |
| pos = np.searchsorted(idxs, src) |
| |
| delay = self.rng.integers(5, 15) |
| if pos + delay < len(idxs): |
| valid_sources.append(src) |
| delayed_targets.append(idxs[pos + delay]) |
| |
| |
| df.loc[valid_sources, "is_fraud"] = 0 |
| if delayed_targets: |
| df.loc[delayed_targets, "is_fraud"] = 1 |
|
|
| |
| |
| |
| min_rate = { |
| "easy": 0.06, |
| "medium": 0.05, |
| "hard": 0.03 |
| }[self.difficulty] |
|
|
| current_rate = df["is_fraud"].mean() |
|
|
| if current_rate < min_rate: |
| deficit = int((min_rate - current_rate) * len(df)) |
|
|
| |
| temporal_pool = np.array(sorted(set(temporal_candidates)), dtype=np.int64) |
| eligible = df.loc[temporal_pool] if len(temporal_pool) else df.iloc[0:0] |
| eligible = eligible[eligible["fraud_type"] == "none"] |
|
|
| if len(eligible) < deficit: |
| state_thresh = np.percentile(df["dynamic_fraud_state"], 70) |
| state_eligible = df[ |
| (df["fraud_type"] == "none") & |
| (df["dynamic_fraud_state"] > state_thresh) |
| ] |
| eligible = pd.concat([eligible, state_eligible], ignore_index=False) |
| eligible = eligible[~eligible.index.duplicated(keep="first")] |
|
|
| n_sample = min(deficit, len(eligible)) |
| candidates = eligible.sample(n_sample, random_state=42).index |
| |
| |
| df.loc[candidates, "is_fraud"] = 1 |
| df.loc[candidates, "fraud_type"] = "weak_velocity" |
| |
| |
| df.loc[candidates, "dynamic_fraud_state"] += self.rng.normal(0.5, 0.1, size=len(candidates)).astype(np.float32) |
|
|
| |
| |
| |
| |
| |
| |
| df["amount"] = self.rng.normal(0, 1, size=n).astype(np.float32) |
| df["risk_score"] = self.rng.normal(0, 1, size=n).astype(np.float32) |
| df["fail_prob"] = self.rng.normal(0, 1, size=n).astype(np.float32) |
| df["risk_noisy"] = self.rng.normal(0, 1, size=n).astype(np.float32) |
|
|
| failed_rate = float(df["failed"].mean()) if "failed" in df.columns else 0.0 |
| retry_rate = float(df["is_retry"].mean()) if "is_retry" in df.columns else 0.0 |
| df["failed"] = self.rng.binomial(1, failed_rate, size=n).astype(np.int8) |
| df["is_retry"] = self.rng.binomial(1, retry_rate, size=n).astype(np.int8) |
|
|
| df["txn_count_10"] = self.rng.permutation(df["txn_count_10"].to_numpy()) |
| df["amount_sum_10"] = self.rng.permutation(df["amount_sum_10"].to_numpy()) |
| df["neighbor_score"] = self.rng.normal(0, 1, size=n).astype(np.float32) |
| df["pair_freq"] = self.rng.normal(0, 1, size=n).astype(np.float32) |
|
|
| return df |
|
|
| def _is_standard_temporal_twins(self) -> bool: |
| return self.benchmark_mode == "temporal_twins" |
|
|
| def _standard_twin_profile(self) -> dict: |
| return TEMPORAL_TWIN_STANDARD_PROFILES[self.difficulty] |
|
|
| def _apply_temporal_twins(self, df: pd.DataFrame) -> pd.DataFrame: |
| df = df.copy() |
| df = df.sort_values("timestamp").reset_index(drop=True) |
|
|
| for column, default in ( |
| ("is_retry", 0), |
| ("failed", 0), |
| ("risk_score", 0.0), |
| ("fail_prob", 0.0), |
| ): |
| if column not in df.columns: |
| df[column] = default |
|
|
| sender_groups = { |
| int(sender_id): group.sort_values("timestamp").reset_index(drop=True).copy() |
| for sender_id, group in df.groupby("sender_id", sort=False) |
| } |
| if not sender_groups: |
| return df |
|
|
| out_frames = [] |
| pair_id = 0 |
| min_pair_events = 18 |
| user_meta = [] |
| for sender_id, group in sender_groups.items(): |
| receiver_counts = Counter(int(receiver_id) for receiver_id in group["receiver_id"].tolist()) |
| repeated_receivers = int(sum(count >= 2 for count in receiver_counts.values())) |
| user_meta.append({ |
| "sender_id": int(sender_id), |
| "group": group, |
| "count": int(len(group)), |
| "repeated_receivers": repeated_receivers, |
| "start_time": float(group["timestamp"].min()) if len(group) else 0.0, |
| }) |
|
|
| eligible_templates = [ |
| meta for meta in user_meta |
| if meta["count"] >= min_pair_events and meta["repeated_receivers"] >= 2 |
| ] |
| eligible_templates = sorted( |
| eligible_templates, |
| key=lambda meta: (-meta["count"], -meta["repeated_receivers"], meta["start_time"], meta["sender_id"]), |
| ) |
| carrier_meta = sorted( |
| user_meta, |
| key=lambda meta: (meta["start_time"], meta["sender_id"]), |
| ) |
|
|
| carrier_cursor = 0 |
| template_cursor = 0 |
| if not eligible_templates: |
| while carrier_cursor < len(carrier_meta): |
| carrier = carrier_meta[carrier_cursor] |
| out_frames.append(self._make_background_user(carrier["group"], int(carrier["sender_id"]))) |
| carrier_cursor += 1 |
| out = pd.concat(out_frames, ignore_index=True) |
| out = out.sort_values("timestamp").reset_index(drop=True) |
| out["txn_id"] = np.arange(len(out), dtype=np.int32) |
| return self._finalise_temporal_twin_features(out) |
|
|
| while carrier_cursor + 1 < len(carrier_meta): |
| fraud_carrier = carrier_meta[carrier_cursor] |
| benign_carrier = carrier_meta[carrier_cursor + 1] |
| built_pair = False |
|
|
| for template_offset in range(len(eligible_templates)): |
| template_idx = (template_cursor + template_offset) % len(eligible_templates) |
| template_meta = eligible_templates[template_idx] |
| template = template_meta["group"].copy().reset_index(drop=True) |
| count_target = len(template) |
| shared_layout = { |
| "ordered_dts": self._order_deltas( |
| np.diff(template["timestamp"].to_numpy(dtype=np.float64)), |
| role="shared", |
| ), |
| "amount_perm": self.rng.permutation(count_target), |
| "retry_perm": self.rng.permutation(count_target), |
| "failed_perm": self.rng.permutation(count_target), |
| } |
| pair_start_time = float(template_meta["start_time"]) |
|
|
| fraud_frame = self._build_temporal_twin_user( |
| template_df=template, |
| sender_id=int(fraud_carrier["sender_id"]), |
| start_time=pair_start_time, |
| pair_id=pair_id, |
| role="fraud", |
| shared_layout=shared_layout, |
| template_id=int(template_meta["sender_id"]), |
| ) |
| if fraud_frame is None: |
| continue |
|
|
| benign_frame = self._build_temporal_twin_user( |
| template_df=template, |
| sender_id=int(benign_carrier["sender_id"]), |
| start_time=pair_start_time, |
| pair_id=pair_id, |
| role="benign", |
| shared_layout=shared_layout, |
| fraud_reference=fraud_frame, |
| template_id=int(template_meta["sender_id"]), |
| ) |
| if benign_frame is None: |
| continue |
|
|
| out_frames.append(fraud_frame) |
| out_frames.append(benign_frame) |
| pair_id += 1 |
| carrier_cursor += 2 |
| template_cursor = (template_idx + 1) % len(eligible_templates) |
| built_pair = True |
| break |
|
|
| if not built_pair: |
| out_frames.append(self._make_background_user(fraud_carrier["group"], int(fraud_carrier["sender_id"]))) |
| out_frames.append(self._make_background_user(benign_carrier["group"], int(benign_carrier["sender_id"]))) |
| carrier_cursor += 2 |
|
|
| while carrier_cursor < len(carrier_meta): |
| carrier = carrier_meta[carrier_cursor] |
| out_frames.append(self._make_background_user(carrier["group"], int(carrier["sender_id"]))) |
| carrier_cursor += 1 |
|
|
| out = pd.concat(out_frames, ignore_index=True) |
| out = out.sort_values("timestamp").reset_index(drop=True) |
| out["txn_id"] = np.arange(len(out), dtype=np.int32) |
| return self._finalise_temporal_twin_features(out) |
|
|
| def _make_background_user(self, user_df: pd.DataFrame, sender_id: int) -> pd.DataFrame: |
| out = user_df.copy().sort_values("timestamp").reset_index(drop=True) |
| out["sender_id"] = int(sender_id) |
| out["is_fraud"] = np.zeros(len(out), dtype=np.int8) |
| out["fraud_type"] = "none" |
| out["dynamic_fraud_state"] = np.zeros(len(out), dtype=np.float32) |
| out["motif_source"] = np.zeros(len(out), dtype=np.int8) |
| out["motif_chain_state"] = np.zeros(len(out), dtype=np.float32) |
| out["motif_strength"] = np.zeros(len(out), dtype=np.float32) |
| out["twin_pair_id"] = -1 |
| out["template_id"] = -1 |
| out["twin_role"] = "background" |
| out["twin_label"] = 0 |
| return out |
|
|
| def _build_temporal_twin_user( |
| self, |
| template_df: pd.DataFrame, |
| sender_id: int, |
| start_time: float, |
| pair_id: int, |
| role: str, |
| shared_layout: dict | None = None, |
| fraud_reference: pd.DataFrame | None = None, |
| template_id: int | None = None, |
| ) -> pd.DataFrame: |
| """Build one twin user, with retry logic in calib mode for fraud twins.""" |
| calib_mode = self.benchmark_mode == "temporal_twins_oracle_calib" |
| max_attempts = _CALIB_MOTIF_RETRY_BUDGET if (calib_mode and role == "fraud") else 1 |
|
|
| for attempt in range(max_attempts): |
| out = template_df.copy().reset_index(drop=True) |
| n = len(out) |
| timestamps = out["timestamp"].to_numpy(dtype=np.float64) |
| if n <= 1: |
| ordered_dts = np.zeros(0, dtype=np.float64) |
| else: |
| if shared_layout is not None and "ordered_dts" in shared_layout: |
| ordered_dts = np.asarray(shared_layout["ordered_dts"], dtype=np.float64) |
| else: |
| ordered_dts = self._order_deltas(np.diff(timestamps), role=role) |
|
|
| new_timestamps = np.empty(n, dtype=np.float64) |
| new_timestamps[0] = max(0.0, float(start_time)) |
| if n > 1: |
| new_timestamps[1:] = new_timestamps[0] + np.cumsum(ordered_dts) |
| out["timestamp"] = new_timestamps.astype(np.float32) |
|
|
| camouflage_fraud = False |
| if role == "fraud" and self._is_standard_temporal_twins(): |
| camouflage_fraud = self.rng.random() < float(self._standard_twin_profile()["camouflage_prob"]) |
|
|
| if role == "benign" and fraud_reference is not None: |
| label_boundaries = sorted( |
| fraud_reference.loc[ |
| fraud_reference["is_fraud"] == 1, |
| "label_event_idx", |
| ].astype(int).unique().tolist() |
| ) |
| receiver_seq = self._order_receivers_benign_matched( |
| fraud_receivers=fraud_reference["receiver_id"].to_numpy(dtype=np.int64), |
| label_boundaries=label_boundaries, |
| timestamps=out["timestamp"].to_numpy(dtype=np.float64), |
| ) |
| elif camouflage_fraud: |
| receiver_seq = self._order_receivers_benign_greedy( |
| receivers=out["receiver_id"].to_numpy(dtype=np.int64), |
| timestamps=out["timestamp"].to_numpy(dtype=np.float64), |
| ) |
| else: |
| receiver_seq = self._order_receivers( |
| out["receiver_id"].to_numpy(dtype=np.int64), |
| role=role, |
| timestamps=out["timestamp"].to_numpy(dtype=np.float64), |
| ) |
| out["receiver_id"] = np.asarray(receiver_seq, dtype=np.int32) |
| if role == "benign" and fraud_reference is not None: |
| out = self._repair_benign_twin_segmented(out, label_boundaries) |
|
|
| if shared_layout is not None: |
| amount_perm = np.asarray(shared_layout["amount_perm"], dtype=np.int64) |
| retry_perm = np.asarray(shared_layout["retry_perm"], dtype=np.int64) |
| failed_perm = np.asarray(shared_layout["failed_perm"], dtype=np.int64) |
| else: |
| amount_perm = self.rng.permutation(n) |
| retry_perm = self.rng.permutation(n) |
| failed_perm = self.rng.permutation(n) |
| out["amount"] = out["amount"].to_numpy(dtype=np.float32)[amount_perm] |
| out["txn_type"] = out["txn_type"].to_numpy(dtype=np.int8) |
| out["is_retry"] = out["is_retry"].to_numpy(dtype=np.int8)[retry_perm] |
| out["failed"] = out["failed"].to_numpy(dtype=np.int8)[failed_perm] |
| out["risk_score"] = out["risk_score"].to_numpy(dtype=np.float32) |
| out["fail_prob"] = out["fail_prob"].to_numpy(dtype=np.float32) |
| out["sender_id"] = int(sender_id) |
| out["is_fraud"] = 0 |
| out["fraud_type"] = "none" |
| out["twin_pair_id"] = int(pair_id) |
| out["template_id"] = int(template_id if template_id is not None else pair_id) |
| out["twin_role"] = role |
| out["twin_label"] = 1 if role == "fraud" else 0 |
|
|
| out = out.sort_values("timestamp").reset_index(drop=True) |
| if role == "benign" and fraud_reference is None: |
| out = self._repair_benign_twin(out) |
|
|
| if calib_mode: |
| result = self._apply_twin_labels_calib(out, role=role) |
| |
| if role == "fraud": |
| if int(result["is_fraud"].sum()) > 0: |
| return result |
| if attempt < max_attempts - 1: |
| continue |
| |
| print( |
| f"[calib] WARNING: pair_id={pair_id} sender={sender_id} " |
| f"produced 0 motif hits after {max_attempts} attempts — dropping pair." |
| ) |
| return None |
| if int(result["motif_hit_count"].max()) > 0: |
| return None |
| return result |
| else: |
| result = self._apply_twin_labels_standard(out, role=role) |
| if role == "benign" and int(result["motif_hit_count"].max()) > 0: |
| return None |
| return result |
|
|
| |
| return self._apply_twin_labels_standard( |
| out.sort_values("timestamp").reset_index(drop=True), role=role |
| ) |
|
|
| def _repair_benign_twin(self, user_df: pd.DataFrame) -> pd.DataFrame: |
| """Greedily perturb a benign receiver order to minimize motif hits.""" |
| out = user_df.copy().sort_values("timestamp").reset_index(drop=True) |
| receivers = out["receiver_id"].to_numpy(dtype=np.int64).copy() |
| timestamps = out["timestamp"].to_numpy(dtype=np.float64) |
|
|
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| if int(np.sum(trace["source"])) == 0: |
| return out |
|
|
| best_receivers = receivers.copy() |
| best_hits = int(np.sum(trace["source"])) |
|
|
| for _ in range(_BENIGN_MOTIF_REPAIR_STEPS): |
| source_positions = np.flatnonzero(trace["source"]).tolist() |
| if not source_positions: |
| out["receiver_id"] = receivers.astype(np.int32) |
| return out |
|
|
| src_idx = int(source_positions[0]) |
| candidate_receivers = None |
| candidate_hits = best_hits |
|
|
| for swap_offset in (1, -1, 2, -2, 3, -3): |
| swap_idx = src_idx + swap_offset |
| if swap_idx < 0 or swap_idx >= len(receivers): |
| continue |
| if receivers[swap_idx] == receivers[src_idx]: |
| continue |
|
|
| trial = receivers.copy() |
| trial[src_idx], trial[swap_idx] = trial[swap_idx], trial[src_idx] |
| trial_hits = int(np.sum(temporal_twin_motif_trace(timestamps, trial)["source"])) |
| if trial_hits < candidate_hits: |
| candidate_receivers = trial |
| candidate_hits = trial_hits |
| if trial_hits == 0: |
| break |
|
|
| if candidate_receivers is None: |
| break |
|
|
| receivers = candidate_receivers |
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| best_receivers = receivers.copy() |
| best_hits = candidate_hits |
|
|
| out["receiver_id"] = best_receivers.astype(np.int32) |
| return out |
|
|
| def _repair_benign_twin_segmented( |
| self, |
| user_df: pd.DataFrame, |
| label_boundaries: list[int], |
| ) -> pd.DataFrame: |
| """Reduce benign motif hits while preserving each matched prefix segment multiset.""" |
| out = user_df.copy().sort_values("timestamp").reset_index(drop=True) |
| receivers = out["receiver_id"].to_numpy(dtype=np.int64).copy() |
| timestamps = out["timestamp"].to_numpy(dtype=np.float64) |
| n = len(receivers) |
| if n == 0: |
| return out |
|
|
| boundaries = sorted(int(boundary) for boundary in label_boundaries if 0 <= int(boundary) < n) |
| if not boundaries or boundaries[-1] != n - 1: |
| boundaries.append(n - 1) |
| segments: list[tuple[int, int]] = [] |
| start = 0 |
| for end in boundaries: |
| segments.append((start, end)) |
| start = end + 1 |
|
|
| def segment_bounds(idx: int) -> tuple[int, int]: |
| for lo, hi in segments: |
| if lo <= idx <= hi: |
| return lo, hi |
| return 0, n - 1 |
|
|
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| if int(np.sum(trace["source"])) == 0: |
| return out |
|
|
| best_receivers = receivers.copy() |
| best_hits = int(np.sum(trace["source"])) |
|
|
| for _ in range(_BENIGN_MOTIF_REPAIR_STEPS * 2): |
| source_positions = np.flatnonzero(trace["source"]).tolist() |
| if not source_positions: |
| out["receiver_id"] = receivers.astype(np.int32) |
| return out |
|
|
| src_idx = int(source_positions[0]) |
| seg_lo, seg_hi = segment_bounds(src_idx) |
| candidate_receivers = None |
| candidate_hits = best_hits |
|
|
| for swap_offset in (1, -1, 2, -2, 3, -3, 4, -4): |
| swap_idx = src_idx + swap_offset |
| if swap_idx < seg_lo or swap_idx > seg_hi: |
| continue |
| if receivers[swap_idx] == receivers[src_idx]: |
| continue |
|
|
| trial = receivers.copy() |
| trial[src_idx], trial[swap_idx] = trial[swap_idx], trial[src_idx] |
| trial_hits = int(np.sum(temporal_twin_motif_trace(timestamps, trial)["source"])) |
| if trial_hits < candidate_hits: |
| candidate_receivers = trial |
| candidate_hits = trial_hits |
| if trial_hits == 0: |
| break |
|
|
| if candidate_receivers is None: |
| continue |
|
|
| receivers = candidate_receivers |
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| best_receivers = receivers.copy() |
| best_hits = candidate_hits |
|
|
| out["receiver_id"] = best_receivers.astype(np.int32) |
| return out |
|
|
| def _order_deltas(self, deltas: np.ndarray, role: str) -> np.ndarray: |
| deltas = np.asarray(deltas, dtype=np.float64) |
| if len(deltas) == 0: |
| return deltas |
|
|
| deltas = np.clip(deltas, 60.0, None) |
| short_q = float(np.quantile(deltas, 0.55)) |
| long_q = float(np.quantile(deltas, 0.82)) |
| shorts = list(np.sort(deltas[deltas <= short_q]).astype(np.float64)) |
| mediums = list(np.sort(deltas[(deltas > short_q) & (deltas < long_q)]).astype(np.float64)) |
| longs = list(np.sort(deltas[deltas >= long_q])[::-1].astype(np.float64)) |
|
|
| def pop_front(pool): |
| return pool.pop(0) if pool else None |
|
|
| def pop_back(pool): |
| return pool.pop() if pool else None |
|
|
| def pop_short(): |
| return pop_front(shorts) |
|
|
| def pop_short_fast(): |
| return pop_front(shorts) |
|
|
| def pop_short_slow(): |
| return pop_back(shorts) if shorts else None |
|
|
| def pop_medium(): |
| if mediums: |
| return pop_front(mediums) |
| if len(shorts) >= 2: |
| return pop_back(shorts) |
| if longs: |
| return pop_back(longs) |
| return None |
|
|
| def pop_long(): |
| if longs: |
| return pop_front(longs) |
| if mediums: |
| return pop_back(mediums) |
| if shorts: |
| return pop_back(shorts) |
| return None |
|
|
| def pop_any(): |
| for getter in (pop_medium, pop_long, pop_short): |
| value = getter() |
| if value is not None: |
| return value |
| return None |
|
|
| ordered: list[float] = [] |
| if self._is_standard_temporal_twins(): |
| recipe_name = self._standard_twin_profile()["delta_recipe"] |
| if recipe_name == "easy": |
| motif_recipe = [ |
| pop_long, |
| pop_medium, |
| pop_short_slow, |
| pop_short_fast, |
| pop_long, |
| pop_short_slow, |
| pop_short_fast, |
| ] |
| elif recipe_name == "medium": |
| motif_recipe = [ |
| pop_long, |
| pop_medium, |
| pop_short_slow, |
| pop_medium, |
| pop_short_fast, |
| pop_long, |
| pop_medium, |
| pop_short_fast, |
| ] |
| else: |
| motif_recipe = [ |
| pop_long, |
| pop_medium, |
| pop_short_slow, |
| pop_medium, |
| pop_short_fast, |
| pop_long, |
| pop_medium, |
| pop_short_slow, |
| pop_short_fast, |
| ] |
| else: |
| motif_recipe = [ |
| pop_long, |
| pop_medium, |
| pop_short_slow, |
| pop_short_fast, |
| pop_long, |
| pop_short_slow, |
| pop_short_fast, |
| ] |
|
|
| while len(ordered) < len(deltas): |
| if self._is_standard_temporal_twins(): |
| if self.rng.random() > float(self._standard_twin_profile()["motif_cycle_prob"]): |
| value = pop_any() |
| if value is None: |
| break |
| ordered.append(float(value)) |
| continue |
| emitted = False |
| for getter in motif_recipe: |
| value = getter() |
| if value is None: |
| continue |
| ordered.append(float(value)) |
| emitted = True |
| if len(ordered) >= len(deltas): |
| break |
| if not emitted: |
| value = pop_any() |
| if value is None: |
| break |
| ordered.append(float(value)) |
|
|
| if len(ordered) != len(deltas): |
| fallback = np.sort(deltas) |
| ordered = list(fallback[: len(deltas)]) |
| return np.asarray(ordered, dtype=np.float64) |
|
|
| def _order_receivers( |
| self, |
| receivers: np.ndarray, |
| role: str, |
| timestamps: np.ndarray | None = None, |
| ) -> list[int]: |
| if role == "benign" and timestamps is not None: |
| return self._order_receivers_benign_greedy( |
| receivers=np.asarray(receivers, dtype=np.int64), |
| timestamps=np.asarray(timestamps, dtype=np.float64), |
| ) |
|
|
| counts = Counter(int(receiver_id) for receiver_id in receivers.tolist()) |
| ordered: list[int] = [] |
|
|
| def sorted_candidates(exclude: set[int] | None = None): |
| exclude = exclude or set() |
| return [ |
| receiver |
| for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])) |
| if count > 0 and receiver not in exclude |
| ] |
|
|
| def pop_receiver(exclude: set[int] | None = None): |
| candidates = sorted_candidates(exclude=exclude) |
| if not candidates: |
| return None |
| receiver = int(candidates[0]) |
| counts[receiver] -= 1 |
| return receiver |
|
|
| while len(ordered) < len(receivers): |
| if role == "fraud": |
| anchor = next( |
| ( |
| receiver |
| for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])) |
| if count >= 2 |
| ), |
| None, |
| ) |
| inject_block = True |
| if self._is_standard_temporal_twins(): |
| inject_block = self.rng.random() <= float(self._standard_twin_profile()["fraud_block_prob"]) |
| if inject_block and anchor is not None and len(receivers) - len(ordered) >= 8: |
| fillers = [] |
| used_in_block = {int(anchor)} |
| for _ in range(6): |
| filler = pop_receiver(exclude=used_in_block) |
| if filler is None: |
| break |
| fillers.append(filler) |
| used_in_block.add(int(filler)) |
| if len(fillers) == 6: |
| counts[int(anchor)] -= 2 |
| if self._is_standard_temporal_twins(): |
| gap = int(self._standard_twin_profile()["receiver_gap"]) |
| block = [int(anchor)] |
| block.extend(fillers[: gap - 1]) |
| block.append(int(anchor)) |
| block.extend(fillers[gap - 1 :]) |
| ordered.extend(block[:8]) |
| else: |
| ordered.extend( |
| [ |
| int(anchor), |
| fillers[0], |
| fillers[1], |
| int(anchor), |
| fillers[2], |
| fillers[3], |
| fillers[4], |
| fillers[5], |
| ] |
| ) |
| continue |
| for filler in fillers: |
| counts[int(filler)] += 1 |
|
|
| if role == "benign": |
| anchor = next( |
| ( |
| receiver |
| for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])) |
| if count >= 2 |
| ), |
| None, |
| ) |
| if anchor is not None and len(receivers) - len(ordered) >= 8: |
| fillers = [] |
| used_in_block = {int(anchor)} |
| for _ in range(6): |
| filler = pop_receiver(exclude=used_in_block) |
| if filler is None: |
| break |
| fillers.append(filler) |
| used_in_block.add(int(filler)) |
| if len(fillers) == 6: |
| counts[int(anchor)] -= 2 |
| ordered.extend( |
| [ |
| int(anchor), |
| fillers[0], |
| int(anchor), |
| fillers[1], |
| fillers[2], |
| fillers[3], |
| fillers[4], |
| fillers[5], |
| ] |
| ) |
| continue |
| for filler in fillers: |
| counts[int(filler)] += 1 |
|
|
| exclude = {int(ordered[-1])} if ordered else set() |
| chosen = pop_receiver(exclude=exclude) |
| if chosen is not None: |
| ordered.append(chosen) |
| continue |
|
|
| chosen = pop_receiver(exclude=None) |
| if chosen is not None: |
| ordered.append(chosen) |
| continue |
|
|
| return ordered[: len(receivers)] |
|
|
| def _select_standard_twin_sources( |
| self, |
| trace: dict, |
| n_events: int, |
| ) -> list[tuple[int, bool]]: |
| profile = self._standard_twin_profile() |
| target_events = max( |
| int(profile["min_events"]), |
| min(int(profile["max_events_cap"]), max(1, n_events // int(profile["event_divisor"]))), |
| ) |
| min_idx = 7 |
| source_positions = [ |
| int(pos) |
| for pos in np.flatnonzero(trace["source"]).tolist() |
| if int(pos) >= min_idx |
| ] |
| ranked_chain = [ |
| int(pos) |
| for pos in np.argsort(trace["chain"])[::-1].tolist() |
| if int(pos) >= min_idx |
| ] |
| chain_only = [pos for pos in ranked_chain if pos not in set(source_positions)] |
|
|
| if source_positions: |
| keep_n = int(np.ceil(len(source_positions) * float(profile["source_keep_frac"]))) |
| keep_n = max(int(profile["min_true_sources"]), min(len(source_positions), keep_n)) |
| else: |
| keep_n = 0 |
|
|
| source_pool_n = min( |
| len(source_positions), |
| max(keep_n, int(np.ceil(keep_n * float(profile["source_pool_factor"])))), |
| ) |
| source_pool = source_positions[:source_pool_n] |
| if keep_n > 0 and len(source_pool) > keep_n: |
| sampled_true = self.rng.choice(np.asarray(source_pool, dtype=np.int64), size=keep_n, replace=False) |
| true_sources = sorted(int(pos) for pos in sampled_true.tolist()) |
| else: |
| true_sources = source_pool[:keep_n] |
|
|
| selected: list[tuple[int, bool]] = [(pos, False) for pos in true_sources] |
| used = {pos for pos, _ in selected} |
|
|
| fallback_cap = int(profile["max_chain_fallback"]) |
| chain_pool_n = min( |
| len(chain_only), |
| max(fallback_cap, int(np.ceil(fallback_cap * float(profile["chain_pool_factor"])))), |
| ) |
| chain_pool = chain_only[:chain_pool_n] |
| if fallback_cap > 0 and len(chain_pool) > fallback_cap: |
| sampled_chain = self.rng.choice(np.asarray(chain_pool, dtype=np.int64), size=fallback_cap, replace=False) |
| chain_choices = sorted(int(pos) for pos in sampled_chain.tolist()) |
| else: |
| chain_choices = chain_pool[:fallback_cap] |
|
|
| for pos in chain_choices: |
| if len(selected) >= target_events: |
| break |
| selected.append((pos, True)) |
| used.add(pos) |
|
|
| if not selected: |
| fallback_candidates = ranked_chain[:target_events] |
| selected = [(pos, True) for pos in fallback_candidates] |
|
|
| if len(selected) < target_events: |
| for pos in source_positions[keep_n:]: |
| if pos in used: |
| continue |
| selected.append((pos, False)) |
| used.add(pos) |
| if len(selected) >= target_events: |
| break |
|
|
| if len(selected) < target_events: |
| for pos in ranked_chain: |
| if pos in used: |
| continue |
| selected.append((pos, True)) |
| used.add(pos) |
| if len(selected) >= target_events: |
| break |
|
|
| selected.sort(key=lambda item: item[0]) |
| return selected[:target_events] |
|
|
| def _order_receivers_benign_greedy( |
| self, |
| receivers: np.ndarray, |
| timestamps: np.ndarray, |
| ) -> list[int]: |
| """Build a benign ordering that avoids 3..8-step receiver revisits.""" |
| counts = Counter(int(receiver_id) for receiver_id in receivers.tolist()) |
| ordered: list[int] = [] |
| last_pos: dict[int, int] = {} |
|
|
| while len(ordered) < len(receivers): |
| best_receiver = None |
| best_key = None |
|
|
| for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])): |
| if count <= 0: |
| continue |
| prev = last_pos.get(int(receiver)) |
| if prev is None: |
| revisit_penalty = 0 |
| adjacent_bonus = 1 |
| long_gap_bonus = 1 |
| else: |
| gap = len(ordered) - prev |
| revisit_penalty = 1 if 3 <= gap <= 8 else 0 |
| adjacent_bonus = 0 if gap <= 2 else 1 |
| long_gap_bonus = 0 if gap > 8 else 1 |
|
|
| key = ( |
| revisit_penalty, |
| adjacent_bonus, |
| long_gap_bonus, |
| -int(count), |
| int(receiver), |
| ) |
| if best_key is None or key < best_key: |
| best_key = key |
| best_receiver = int(receiver) |
|
|
| assert best_receiver is not None |
| counts[best_receiver] -= 1 |
| ordered.append(best_receiver) |
| last_pos[best_receiver] = len(ordered) - 1 |
|
|
| return ordered |
|
|
| def _order_receivers_benign_matched( |
| self, |
| fraud_receivers: np.ndarray, |
| label_boundaries: list[int], |
| timestamps: np.ndarray, |
| ) -> list[int]: |
| """Match fraud prefix histograms at every label boundary while reordering within segments.""" |
| n = len(fraud_receivers) |
| if n == 0: |
| return [] |
|
|
| boundaries = sorted( |
| int(boundary) |
| for boundary in label_boundaries |
| if 0 <= int(boundary) < n |
| ) |
| if not boundaries or boundaries[-1] != n - 1: |
| boundaries.append(n - 1) |
|
|
| ordered: list[int] = [] |
| last_pos: dict[int, int] = {} |
| start = 0 |
| for end in boundaries: |
| segment = fraud_receivers[start : end + 1] |
| ordered.extend( |
| self._order_benign_segment( |
| segment_receivers=segment, |
| ordered_prefix=ordered, |
| last_pos=last_pos, |
| full_timestamps=np.asarray(timestamps[: end + 1], dtype=np.float64), |
| ) |
| ) |
| start = end + 1 |
| return ordered |
|
|
| def _order_benign_segment( |
| self, |
| segment_receivers: np.ndarray, |
| ordered_prefix: list[int], |
| last_pos: dict[int, int], |
| full_timestamps: np.ndarray, |
| ) -> list[int]: |
| counts = Counter(int(receiver_id) for receiver_id in segment_receivers.tolist()) |
| segment_out: list[int] = [] |
|
|
| while len(segment_out) < len(segment_receivers): |
| best_receiver = None |
| best_key = None |
| global_idx = len(ordered_prefix) + len(segment_out) |
|
|
| for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])): |
| if count <= 0: |
| continue |
| prev = last_pos.get(int(receiver)) |
| if prev is None: |
| revisit_penalty = 0 |
| seen_penalty = 0 |
| adjacent_bonus = 1 |
| long_gap_bonus = 1 |
| else: |
| gap = global_idx - prev |
| revisit_penalty = 1 if 3 <= gap <= 8 else 0 |
| seen_penalty = 1 |
| adjacent_bonus = 0 if gap <= 2 else 1 |
| long_gap_bonus = 0 if gap > 8 else 1 |
|
|
| key = ( |
| revisit_penalty, |
| adjacent_bonus, |
| long_gap_bonus, |
| seen_penalty, |
| -int(count), |
| int(receiver), |
| ) |
| if best_key is None or key < best_key: |
| best_key = key |
| best_receiver = int(receiver) |
|
|
| assert best_receiver is not None |
| counts[best_receiver] -= 1 |
| segment_out.append(best_receiver) |
| last_pos[best_receiver] = global_idx |
|
|
| return segment_out |
|
|
| |
| |
| |
|
|
| def _attach_audit_columns( |
| self, |
| out: pd.DataFrame, |
| fraud_flags: np.ndarray, |
| trigger_idxs: list, |
| is_fallback: np.ndarray, |
| trace: dict, |
| ) -> pd.DataFrame: |
| """Attach per-event audit columns to the twin user DataFrame.""" |
| n = len(out) |
| motif_hit_count = int(np.sum(trace["source"])) |
|
|
| fraud_source_col = np.full(n, "none", dtype=object) |
| trigger_event_idx_col = np.full(n, -1, dtype=np.int32) |
| label_event_idx_col = np.full(n, -1, dtype=np.int32) |
| label_delay_col = np.full(n, -1, dtype=np.int32) |
|
|
| for target_idx, src_idx in trigger_idxs: |
| fraud_source_col[target_idx] = "motif" if not is_fallback[target_idx] else "chain_fallback" |
| trigger_event_idx_col[target_idx] = int(src_idx) |
| label_event_idx_col[target_idx] = int(target_idx) |
| label_delay_col[target_idx] = int(target_idx - src_idx) |
|
|
| out["fraud_source"] = fraud_source_col |
| out["motif_hit_count"] = motif_hit_count |
| out["trigger_event_idx"] = trigger_event_idx_col |
| out["label_event_idx"] = label_event_idx_col |
| out["label_delay"] = label_delay_col |
| out["is_fallback_label"] = is_fallback.astype(np.int8) |
| return out |
|
|
| |
| |
| |
|
|
| def _apply_twin_labels_standard(self, user_df: pd.DataFrame, role: str) -> pd.DataFrame: |
| out = user_df.copy().sort_values("timestamp").reset_index(drop=True) |
| n = len(out) |
| empty_audit = { |
| "fraud_source": np.full(n, "none", dtype=object), |
| "motif_hit_count": 0, |
| "trigger_event_idx": np.full(n, -1, dtype=np.int32), |
| "label_event_idx": np.full(n, -1, dtype=np.int32), |
| "label_delay": np.full(n, -1, dtype=np.int32), |
| "is_fallback_label": np.zeros(n, dtype=np.int8), |
| } |
| if n == 0: |
| out["dynamic_fraud_state"] = np.zeros(0, dtype=np.float32) |
| out["motif_source"] = np.zeros(0, dtype=np.int8) |
| out["motif_chain_state"] = np.zeros(0, dtype=np.float32) |
| out["motif_strength"] = np.zeros(0, dtype=np.float32) |
| for col, val in empty_audit.items(): |
| out[col] = val if isinstance(val, int) else val |
| return out |
|
|
| timestamps = out["timestamp"].to_numpy(dtype=np.float64) |
| receivers = out["receiver_id"].to_numpy(dtype=np.int64) |
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| state = trace["state"].copy() |
| fraud_flags = np.zeros(n, dtype=np.int8) |
| fraud_type = np.full(n, "none", dtype=object) |
| is_fallback = np.zeros(n, dtype=np.int8) |
| source_positions = np.flatnonzero(trace["source"]).tolist() |
| trigger_pairs: list = [] |
|
|
| if role == "fraud": |
| if self._is_standard_temporal_twins(): |
| selected_sources = self._select_standard_twin_sources(trace, n) |
| else: |
| max_events = max(4, min(12, n // 5)) |
| used_fallback = False |
| if not source_positions: |
| ranked = np.argsort(trace["chain"])[::-1] |
| source_positions = [int(pos) for pos in ranked if int(pos) >= 7][:max_events] |
| used_fallback = True |
| selected_sources = [(src, used_fallback) for src in source_positions[:max_events]] |
|
|
| used_targets = set() |
| for src, used_fallback in selected_sources: |
| if src >= n - 1: |
| target = src |
| else: |
| if self._is_standard_temporal_twins(): |
| delay_lo, delay_hi = self._standard_twin_profile()["delay_range"] |
| sampled_delay = int(self.rng.integers(delay_lo, delay_hi + 1)) |
| else: |
| sampled_delay = int(self.rng.integers(6, 17)) |
| delay = min(sampled_delay, (n - 1) - src) |
| target = src + max(delay, 1) |
| if target in used_targets: |
| continue |
| used_targets.add(target) |
| fraud_flags[target] = 1 |
| fraud_type[target] = "temporal_twin" |
| if used_fallback: |
| is_fallback[target] = 1 |
| trigger_pairs.append((target, src)) |
| lo = max(0, src) |
| hi = min(n, target + 1) |
| ramp = np.linspace(0.15, 0.85, num=max(1, hi - lo), dtype=np.float32) |
| state[lo:hi] += ramp |
|
|
| out["motif_source"] = trace["source"].astype(np.int8) |
| out["motif_chain_state"] = trace["chain"].astype(np.float32) |
| out["motif_strength"] = trace["motif_strength"].astype(np.float32) |
| out["dynamic_fraud_state"] = state.astype(np.float32) |
| out["is_fraud"] = fraud_flags.astype(np.int8) |
| out["fraud_type"] = fraud_type |
| return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace) |
|
|
| |
| |
| |
|
|
| def _apply_twin_labels_calib(self, user_df: pd.DataFrame, role: str) -> pd.DataFrame: |
| out = user_df.copy().sort_values("timestamp").reset_index(drop=True) |
| n = len(out) |
| if n == 0: |
| out["dynamic_fraud_state"] = np.zeros(0, dtype=np.float32) |
| out["motif_source"] = np.zeros(0, dtype=np.int8) |
| out["motif_chain_state"] = np.zeros(0, dtype=np.float32) |
| out["motif_strength"] = np.zeros(0, dtype=np.float32) |
| for col in ("fraud_source", "motif_hit_count", "trigger_event_idx", |
| "label_event_idx", "label_delay", "is_fallback_label"): |
| out[col] = 0 |
| return out |
|
|
| timestamps = out["timestamp"].to_numpy(dtype=np.float64) |
| receivers = out["receiver_id"].to_numpy(dtype=np.int64) |
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| state = trace["state"].copy() |
| fraud_flags = np.zeros(n, dtype=np.int8) |
| fraud_type = np.full(n, "none", dtype=object) |
| is_fallback = np.zeros(n, dtype=np.int8) |
| trigger_pairs: list = [] |
|
|
| if role == "fraud": |
| source_positions = np.flatnonzero(trace["source"]).tolist() |
| |
| |
| if not source_positions: |
| |
| out["motif_source"] = trace["source"].astype(np.int8) |
| out["motif_chain_state"] = trace["chain"].astype(np.float32) |
| out["motif_strength"] = trace["motif_strength"].astype(np.float32) |
| out["dynamic_fraud_state"] = state.astype(np.float32) |
| out["is_fraud"] = np.zeros(n, dtype=np.int8) |
| out["fraud_type"] = fraud_type |
| return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace) |
|
|
| max_events = max(4, min(12, n // 5)) |
| used_targets = set() |
| for src in source_positions[:max_events]: |
| if src >= n - 1: |
| target = src |
| else: |
| delay = min(int(self.rng.integers(6, 17)), (n - 1) - src) |
| target = src + max(delay, 1) |
| if target in used_targets: |
| continue |
| used_targets.add(target) |
| fraud_flags[target] = 1 |
| fraud_type[target] = "temporal_twin_calib" |
| trigger_pairs.append((target, src)) |
| lo = max(0, src) |
| hi = min(n, target + 1) |
| ramp = np.linspace(0.15, 0.85, num=max(1, hi - lo), dtype=np.float32) |
| state[lo:hi] += ramp |
|
|
| out["motif_source"] = trace["source"].astype(np.int8) |
| out["motif_chain_state"] = trace["chain"].astype(np.float32) |
| out["motif_strength"] = trace["motif_strength"].astype(np.float32) |
| out["dynamic_fraud_state"] = state.astype(np.float32) |
| out["is_fraud"] = fraud_flags.astype(np.int8) |
| out["fraud_type"] = fraud_type |
| return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace) |
|
|
| def _finalise_temporal_twin_features(self, df: pd.DataFrame) -> pd.DataFrame: |
| out = df.copy().sort_values("timestamp").reset_index(drop=True) |
| n = len(out) |
|
|
| out["amount"] = np.zeros(n, dtype=np.float32) |
| out["risk_score"] = np.zeros(n, dtype=np.float32) |
| out["fail_prob"] = np.zeros(n, dtype=np.float32) |
| out["risk_noisy"] = np.zeros(n, dtype=np.float32) |
| out["neighbor_score"] = np.zeros(n, dtype=np.float32) |
| out["pair_freq"] = np.zeros(n, dtype=np.float32) |
|
|
| out["txn_count_10"] = ( |
| out.groupby("sender_id")["timestamp"] |
| .transform(lambda x: x.rolling(10, min_periods=1).count()) |
| .astype(np.float32) |
| ) |
| out["amount_sum_10"] = ( |
| out.groupby("sender_id")["amount"] |
| .transform(lambda x: x.rolling(10, min_periods=1).sum()) |
| .astype(np.float32) |
| ) |
|
|
| out["is_fraud"] = out["is_fraud"].astype(np.int8) |
| out["is_retry"] = out["is_retry"].astype(np.int8) |
| out["failed"] = out["failed"].astype(np.int8) |
| out["twin_pair_id"] = out["twin_pair_id"].astype(np.int32) |
| out["template_id"] = out["template_id"].astype(np.int32) |
| out["twin_label"] = out["twin_label"].astype(np.int8) |
| out["receiver_id"] = out["receiver_id"].astype(np.int32) |
| out["sender_id"] = out["sender_id"].astype(np.int32) |
| if "motif_source" in out.columns: |
| out["motif_source"] = out["motif_source"].astype(np.int8) |
| |
| for col, default, dtype in ( |
| ("motif_hit_count", 0, np.int32), |
| ("trigger_event_idx", -1, np.int32), |
| ("label_event_idx", -1, np.int32), |
| ("label_delay", -1, np.int32), |
| ("is_fallback_label", 0, np.int8), |
| ): |
| if col in out.columns: |
| out[col] = out[col].fillna(default).astype(dtype) |
| else: |
| out[col] = np.full(n, default, dtype=dtype) |
| if "fraud_source" not in out.columns: |
| out["fraud_source"] = np.full(n, "none", dtype=object) |
| else: |
| out["fraud_source"] = out["fraud_source"].fillna("none") |
|
|
| return out |
|
|