import numpy as np import pandas as pd from collections import Counter # ============================================================ # ORACLE / AUDIT COLUMNS — never exposed to learned baselines # ============================================================ ORACLE_ONLY_COLS: frozenset = frozenset({ "motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx", "label_delay", "is_fallback_label", "fraud_source", "twin_role", "twin_label", "twin_pair_id", "template_id", "dynamic_fraud_state", "motif_chain_state", "motif_strength", }) # ========================= # DIFFICULTY PRESETS # ========================= DIFFICULTY_PRESETS = { "easy": { "noise_std": 0.2, "quantile_type": 0.90, "quantile_suspicious": 0.92, "pair_freq_mult": 0.7, "velocity_logit": 0.20, "burst_divisor": 10.0, "retry_logit": 0.8, "ring_logit": 1.2, "global_noise": 0.4, "graph_feat_noise": 0.0, # no noise on features "delayed_fraction": 0.0, # no delayed fraud "thresh_velocity": 0.93, "thresh_burst": 0.90, "thresh_retry": 0.88, "thresh_ring": 0.90, "thresh_none": 0.9995, }, "medium": { "noise_std": 0.3, "quantile_type": 0.94, "quantile_suspicious": 0.96, "pair_freq_mult": 0.35, "velocity_logit": 0.15, "burst_divisor": 12.0, "retry_logit": 0.6, "ring_logit": 0.5, "global_noise": 0.7, "graph_feat_noise": 0.2, "delayed_fraction": 0.3, # 30% of velocity fraud is delayed "thresh_velocity": 0.95, "thresh_burst": 0.93, "thresh_retry": 0.92, "thresh_ring": 0.95, "thresh_none": 0.9998, }, "hard": { "noise_std": 0.4, "quantile_type": 0.97, "quantile_suspicious": 0.98, "pair_freq_mult": 0.2, # Increased from 0.05 to prevent OOD collapse "velocity_logit": 0.12, "burst_divisor": 15.0, "retry_logit": 0.5, "ring_logit": 0.15, "global_noise": 1.5, # Increased global noise to maintain difficulty "graph_feat_noise": 0.5, "delayed_fraction": 0.5, "thresh_velocity": 0.97, "thresh_burst": 0.96, "thresh_retry": 0.96, "thresh_ring": 0.98, "thresh_none": 0.9999, }, } TEMPORAL_TWIN_STANDARD_PROFILES = { "easy": { "receiver_gap": 3, "delta_recipe": "easy", "event_divisor": 4, "min_events": 5, "max_events_cap": 12, "source_keep_frac": 1.00, "min_true_sources": 4, "max_chain_fallback": 1, "delay_range": (4, 9), "source_pool_factor": 1.0, "chain_pool_factor": 1.0, "fraud_block_prob": 1.0, "motif_cycle_prob": 1.0, "camouflage_prob": 0.0, }, "medium": { "receiver_gap": 4, "delta_recipe": "medium", "event_divisor": 5, "min_events": 4, "max_events_cap": 10, "source_keep_frac": 0.75, "min_true_sources": 3, "max_chain_fallback": 3, "delay_range": (7, 14), "source_pool_factor": 2.0, "chain_pool_factor": 2.0, "fraud_block_prob": 0.30, "motif_cycle_prob": 0.40, "camouflage_prob": 0.60, }, "hard": { "receiver_gap": 5, "delta_recipe": "hard", "event_divisor": 6, "min_events": 4, "max_events_cap": 8, "source_keep_frac": 0.45, "min_true_sources": 2, "max_chain_fallback": 5, "delay_range": (10, 20), "source_pool_factor": 3.0, "chain_pool_factor": 3.0, "fraud_block_prob": 0.22, "motif_cycle_prob": 0.28, "camouflage_prob": 0.78, }, } def temporal_twin_motif_trace( timestamps: np.ndarray, receivers: np.ndarray, ) -> dict: """Shared finite-state motif program for temporal-twin calibration. The signal intentionally depends on event order and timing only: quiet -> accelerating cadence -> delayed receiver revisit -> burst-release-burst """ timestamps = np.asarray(timestamps, dtype=np.float64) receivers = np.asarray(receivers, dtype=np.int64) n = len(timestamps) empty = np.zeros(n, dtype=np.float32) if n == 0: return { "state": empty, "chain": empty, "motif_strength": empty, "quiet": empty, "accel": empty, "revisit": empty, "burst_release_burst": empty, "source": np.zeros(n, dtype=np.int8), } if n > 1: dts = np.diff(timestamps) base_dts = np.clip(dts, 60.0, None) else: base_dts = np.array([1800.0], dtype=np.float64) short_q = float(np.quantile(base_dts, 0.55)) medium_q = float(np.quantile(base_dts, 0.70)) long_q = float(np.quantile(base_dts, 0.82)) short_q = max(short_q, 60.0) medium_q = max(medium_q, short_q * 1.10) long_q = max(long_q, medium_q * 1.15) state = np.zeros(n, dtype=np.float32) chain = np.zeros(n, dtype=np.float32) motif_strength = np.zeros(n, dtype=np.float32) quiet_flags = np.zeros(n, dtype=np.float32) accel_flags = np.zeros(n, dtype=np.float32) revisit_flags = np.zeros(n, dtype=np.float32) brb_flags = np.zeros(n, dtype=np.float32) source = np.zeros(n, dtype=np.int8) prev_dts = [long_q, long_q, long_q, long_q] receiver_last_idx: dict[int, int] = {} recent_accel = 0.0 recent_revisit = 0.0 recent_brb = 0.0 chain_state = 0.0 hidden_state = 0.0 last_source = -99 for idx in range(n): dt = long_q if idx == 0 else max(float(timestamps[idx] - timestamps[idx - 1]), 60.0) current_receiver = int(receivers[idx]) quiet = float(prev_dts[-1] >= long_q) accel = float( prev_dts[-3] >= long_q and prev_dts[-2] > prev_dts[-1] > dt and dt <= short_q ) gap_events = idx - receiver_last_idx.get(current_receiver, idx) revisit = float( current_receiver in receiver_last_idx and 3 <= gap_events <= 8 and max(prev_dts[-2], prev_dts[-1]) >= long_q * 0.85 ) burst_release_burst = float( prev_dts[-3] <= short_q and prev_dts[-2] >= long_q and prev_dts[-1] <= short_q and dt <= short_q ) recent_accel = max(0.0, 0.86 * recent_accel + accel) recent_revisit = max(0.0, 0.88 * recent_revisit + revisit) recent_brb = max(0.0, 0.88 * recent_brb + burst_release_burst) local_speed = max(0.0, (short_q / max(dt, 60.0)) - 0.55) signal = ( 1.20 * accel + 1.25 * revisit + 1.10 * burst_release_burst + 0.30 * quiet + 0.20 * local_speed ) chain_state = max( 0.0, 0.82 * chain_state + 0.75 * signal + 0.22 * min(recent_accel, 1.0) + 0.28 * min(recent_revisit, 1.0) + 0.24 * min(recent_brb, 1.0) - 0.30, ) hidden_state = max(0.0, 0.97 * hidden_state + 0.22 * chain_state + 0.34 * signal) if ( idx >= 6 and burst_release_burst > 0.0 and recent_accel > 0.20 and recent_revisit > 0.30 and chain_state > 0.80 and idx - last_source >= 4 ): source[idx] = 1 last_source = idx quiet_flags[idx] = quiet accel_flags[idx] = accel revisit_flags[idx] = revisit brb_flags[idx] = burst_release_burst motif_strength[idx] = signal chain[idx] = chain_state state[idx] = hidden_state receiver_last_idx[current_receiver] = idx prev_dts = (prev_dts + [dt])[-4:] return { "state": state.astype(np.float32), "chain": chain.astype(np.float32), "motif_strength": motif_strength.astype(np.float32), "quiet": quiet_flags.astype(np.float32), "accel": accel_flags.astype(np.float32), "revisit": revisit_flags.astype(np.float32), "burst_release_burst": brb_flags.astype(np.float32), "source": source.astype(np.int8), } # Maximum retries when a calib-mode fraud twin has no motif hits _CALIB_MOTIF_RETRY_BUDGET = 8 _BENIGN_MOTIF_REPAIR_STEPS = 16 class FraudEngine: def __init__(self, seed=42, difficulty="medium", benchmark_mode="temporal_twins"): self.rng = np.random.default_rng(seed) self.difficulty = difficulty self.benchmark_mode = benchmark_mode self.params = DIFFICULTY_PRESETS[difficulty] def apply(self, df: pd.DataFrame) -> pd.DataFrame: if self.benchmark_mode in ("temporal_twins", "temporal_twins_oracle_calib"): return self._apply_temporal_twins(df) df = df.copy() df = df.sort_values("timestamp").reset_index(drop=True) p = self.params n = len(df) # ------------------------- # BASE FEATURES # ------------------------- noise = self.rng.normal(0, p["noise_std"], size=n) df["risk_noisy"] = df["risk_score"] * 0.2 + noise df["txn_count_10"] = ( df.groupby("sender_id")["timestamp"] .transform(lambda x: x.rolling(10, min_periods=1).count()) ) df["amount_sum_10"] = ( df.groupby("sender_id")["amount"] .transform(lambda x: x.rolling(10, min_periods=1).sum()) ) velocity = df["txn_count_10"] * 0.6 + df["amount_sum_10"] * 0.0002 retry_signal = ( df["is_retry"] * 1.2 + df["failed"] * 1.5 + df["fail_prob"] * 0.7 ) # ------------------------- # QUANTILES (controlled by difficulty) # ------------------------- q_type = p["quantile_type"] q_susp = p["quantile_suspicious"] velocity_q_type = velocity.quantile(q_type) velocity_q_susp = velocity.quantile(q_susp) txn_q_type = df["txn_count_10"].quantile(q_type) retry_q_type = retry_signal.quantile(q_type) retry_q_susp = retry_signal.quantile(q_susp) # ------------------------- # GRAPH CONTAGION # ------------------------- import math neighbor_score = np.zeros(n, dtype=np.float32) recent = {} # Convert to fast python lists for loop access velocity_arr = velocity.to_numpy().tolist() retry_arr = retry_signal.to_numpy().tolist() sender_arr = df["sender_id"].to_numpy().tolist() receiver_arr = df["receiver_id"].to_numpy().tolist() time_arr = df["timestamp"].to_numpy().tolist() for i in range(n): s = sender_arr[i] r = receiver_arr[i] score = recent.get(s, 0.0) + recent.get(r, 0.0) neighbor_score[i] = math.tanh(score) suspicious = ( velocity_arr[i] > velocity_q_susp or retry_arr[i] > retry_q_susp ) if suspicious: recent[s] = recent.get(s, 0.0) + 1.0 recent[r] = recent.get(r, 0.0) + 1.0 else: if s in recent: recent[s] *= 0.9 if r in recent: recent[r] *= 0.9 df["neighbor_score"] = neighbor_score # -------------------------------- # GRAPH RING (STRUCTURAL) + NOISE # -------------------------------- pairs = list(zip(df["sender_id"], df["receiver_id"])) pair_counts = pd.Series(pairs).value_counts() df["pair_freq"] = [pair_counts[(s, r)] for s, r in pairs] df["pair_freq"] = np.log1p(df["pair_freq"]) * p["pair_freq_mult"] # Add noise to structural features (breaks GNN) if p["graph_feat_noise"] > 0: gf_noise = p["graph_feat_noise"] df["pair_freq"] += self.rng.normal(0, gf_noise, size=n) df["neighbor_score"] += self.rng.normal(0, gf_noise * 0.5, size=n) # ------------------------------------------------------- # ALL STATIC FRAUD SIGNALS REMOVED # Fraud is ONLY triggered by stateful temporal accumulation below. # This ensures static models (XGBoost, GNN) cannot solve the task. # ------------------------------------------------------- df["fraud_type"] = "none" df["is_fraud"] = 0 # Randomize edge features so GNN cannot exploit them df["amount"] = self.rng.normal(0, 1, size=n) df["risk_score"] = self.rng.normal(0, 1, size=n) df["fail_prob"] = self.rng.normal(0, 1, size=n) # ------------------------- # STATEFUL TEMPORAL ACCUMULATION (velocity & burst) # ------------------------- # Fraud strictly depends on the hidden history of the user, # perfectly breaking any static mapping from current features to the label. user_state = {} last_txn = {} # State threshold (difficulty specific) — raised to force longer buildup thresh_state = {"easy": 6.0, "medium": 7.0, "hard": 8.5}[self.difficulty] diff_scale = {"easy": 1.0, "medium": 0.8, "hard": 0.6}[self.difficulty] # Track logic without inline DataFrame modifications velocity_idx = [] ring_idx = [] dynamic_state = np.zeros(n, dtype=np.float32) ring_memory = {} burst_memory = {} receiver_history = {} temporal_candidates = [] cadence_ema = {} user_event_pos = {} cooldown_until = {} cooldown_span = {"easy": 10, "medium": 12, "hard": 15}[self.difficulty] max_r = max(receiver_arr) if receiver_arr else 1 for i in range(n): u = sender_arr[i] r_id = receiver_arr[i] t = time_arr[i] user_event_pos[u] = user_event_pos.get(u, 0) + 1 event_pos = user_event_pos[u] can_trigger = event_pos >= cooldown_until.get(u, 0) prev_state = user_state.get(u, 0.0) dt = t - last_txn.get(u, t) last_txn[u] = t # Relative acceleration matters more than absolute volume. # This suppresses static "busy user" shortcuts and rewards temporal memory. prev_cadence = cadence_ema.get(u, 3600.0) if dt == 0: time_factor = 0.8 * diff_scale else: eff_dt = max(float(dt), 60.0) rel_speed = prev_cadence / eff_dt if rel_speed > 3.0: time_factor = 1.8 * diff_scale elif rel_speed > 1.8: time_factor = 1.4 * diff_scale elif rel_speed > 1.2: time_factor = 1.0 * diff_scale elif rel_speed > 0.8: time_factor = 0.6 * diff_scale else: time_factor = 0.25 * diff_scale cadence_ema[u] = 0.97 * prev_cadence + 0.03 * eff_dt # ========================= # ADVERSARIAL ADAPTATION # ========================= # Adversarial slowdown near detection (tamed) if prev_state > (0.7 * thresh_state): time_factor *= 0.6 # Adversarial burst attack (rare, moderate) if self.rng.random() < 0.02: time_factor *= 1.5 # 🚨 Evasion behavior (switch receiver) if prev_state > (0.8 * thresh_state) and self.rng.random() < 0.3: r_id = self.rng.integers(0, max_r + 1) hist = receiver_history.get(u, ()) revisit_motif = len(hist) >= 2 and (r_id in hist[-3:]) and hist[-1] != r_id # Hidden EMA accumulation: Low noise to preserve learnability noise = self.rng.normal(0, 0.03) new_state = max(0.0, 0.975 * prev_state + 0.22 * time_factor + noise) # Delayed reinforcement (forces multi-step buildup across time) if prev_state > (0.6 * thresh_state) and dt < 7200: new_state += 0.3 * diff_scale prev_burst = burst_memory.get(u, 0.0) if dt < 600: burst_impulse = 1.0 elif dt < 1800: burst_impulse = 0.4 elif dt < 7200: burst_impulse = -0.5 else: burst_impulse = -0.8 burst_state = max(0.0, 0.92 * prev_burst + burst_impulse) burst_memory[u] = burst_state crossed_state = prev_state <= thresh_state and new_state > thresh_state release_event = prev_burst > 2.5 and dt > 1800 if revisit_motif and (release_event or crossed_state or prev_burst > 1.5): temporal_candidates.append(i) user_state[u] = new_state dynamic_state[i] = new_state # ========================= # FRAUD MECHANISM BY DIFFICULTY # ========================= n_velocity_before = len(velocity_idx) n_ring_before = len(ring_idx) # Order-specific release after a short-gap burst. # This keeps fraud tied to chronology rather than to static activity volume. if can_trigger and revisit_motif and release_event and new_state > (0.75 * thresh_state): if self.rng.random() < 0.12: velocity_idx.append(i) if self.difficulty == "easy": # Pure velocity fraud (learnable, local temporal) if can_trigger and revisit_motif and (crossed_state or (release_event and new_state > (0.85 * thresh_state))): prob = min(0.55, 0.15 + 0.25 * (new_state / max(thresh_state, 1e-6))) if self.rng.random() < prob: velocity_idx.append(i) # -------------------------------- # C. TRUE MULTI-AGENT RINGS # -------------------------------- key = tuple(sorted((u, r_id))) prev_ring = ring_memory.get(key, 0.0) ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0) ring_cross = prev_ring <= 6.0 and ring_memory[key] > 6.0 if can_trigger and revisit_motif and ring_cross and release_event: ring_idx.append(i) elif self.difficulty == "medium": # Mixed mechanisms if can_trigger and revisit_motif and crossed_state and release_event: prob = min(0.45, 0.10 + 0.22 * (new_state / max(thresh_state * 1.2, 1e-6))) if self.rng.random() < prob: velocity_idx.append(i) # Retry abuse (adds orthogonal signal) if can_trigger and revisit_motif and retry_arr[i] > retry_q_type and release_event: if self.rng.random() < 0.15: velocity_idx.append(i) # -------------------------------- # C. TRUE MULTI-AGENT RINGS # -------------------------------- key = tuple(sorted((u, r_id))) prev_ring = ring_memory.get(key, 0.0) ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0) ring_cross = prev_ring <= 5.0 and ring_memory[key] > 5.0 if can_trigger and revisit_motif and ring_cross and (release_event or new_state > thresh_state): ring_idx.append(i) elif self.difficulty == "hard": # Mostly rings, small velocity residual # Partial mechanism overlap ensures shared latent structure across difficulties! if can_trigger and revisit_motif and crossed_state and release_event and new_state > thresh_state: if self.rng.random() < 0.1: velocity_idx.append(i) # -------------------------------- # C. TRUE MULTI-AGENT RINGS # -------------------------------- key = tuple(sorted((u, r_id))) prev_ring = ring_memory.get(key, 0.0) ring_memory[key] = 0.9 * prev_ring + (1.0 if dt < 600 else 0.0) ring_cross = prev_ring <= 3.5 and ring_memory[key] > 3.5 # HARD keeps rings, but only on burst-to-release transitions. if can_trigger and revisit_motif and ring_cross and release_event and new_state > (0.65 * thresh_state): ring_idx.append(i) if can_trigger and ( len(velocity_idx) > n_velocity_before or len(ring_idx) > n_ring_before ): cooldown_until[u] = event_pos + cooldown_span receiver_history[u] = (hist + (r_id,))[-3:] # Apply state array and fraud indices to DataFrame vectorially df["dynamic_fraud_state"] = dynamic_state if ring_idx: df.loc[ring_idx, "is_fraud"] = 1 df.loc[ring_idx, "fraud_type"] = "graph_ring" # Velocity fraud applied after ring to not overwrite graph_ring if both triggered, # but velocity is the primary type we are delaying. if velocity_idx: velocity_mask = df.index.isin(velocity_idx) & (df["fraud_type"] == "none") df.loc[velocity_mask, "is_fraud"] = 1 df.loc[velocity_mask, "fraud_type"] = "velocity" # ------------------------- # DELAYED FRAUD (CRITICAL FOR TEMPORAL ADVANTAGE) # ------------------------- # Group user transactions to ensure delayed fraud is attributed to the SAME user. # This prevents breaking the causal mapping to sender_id. delayed_frac = { "easy": 0.2, "medium": 0.6, "hard": 1.0 }[self.difficulty] if delayed_frac > 0: fraud_idx = df[(df["is_fraud"] == 1)].index.to_numpy() n_delay = int(len(fraud_idx) * delayed_frac) if n_delay > 0: delay_sources = self.rng.choice(fraud_idx, size=n_delay, replace=False) # Fast grouped indices tracking (pre-cached to raw numpy arrays) user_groups = {k: v.to_numpy() for k, v in df.groupby("sender_id").groups.items()} delayed_targets = [] valid_sources = [] for src in delay_sources: u = df._get_value(src, "sender_id") idxs = user_groups[u] pos = np.searchsorted(idxs, src) delay = self.rng.integers(5, 15) # Shift by 5-14 future transactions (longer memory dependency) if pos + delay < len(idxs): valid_sources.append(src) delayed_targets.append(idxs[pos + delay]) # Apply delays df.loc[valid_sources, "is_fraud"] = 0 if delayed_targets: df.loc[delayed_targets, "is_fraud"] = 1 # ------------------------- # MINIMUM FRAUD FLOOR (CRITICAL FOR EVAL STABILITY) # ------------------------- min_rate = { "easy": 0.06, "medium": 0.05, "hard": 0.03 }[self.difficulty] current_rate = df["is_fraud"].mean() if current_rate < min_rate: deficit = int((min_rate - current_rate) * len(df)) # Backfill with sequence-motif candidates first so the floor remains temporal. temporal_pool = np.array(sorted(set(temporal_candidates)), dtype=np.int64) eligible = df.loc[temporal_pool] if len(temporal_pool) else df.iloc[0:0] eligible = eligible[eligible["fraud_type"] == "none"] if len(eligible) < deficit: state_thresh = np.percentile(df["dynamic_fraud_state"], 70) state_eligible = df[ (df["fraud_type"] == "none") & (df["dynamic_fraud_state"] > state_thresh) ] eligible = pd.concat([eligible, state_eligible], ignore_index=False) eligible = eligible[~eligible.index.duplicated(keep="first")] n_sample = min(deficit, len(eligible)) candidates = eligible.sample(n_sample, random_state=42).index # Instead of random labels → use WEAK temporal signal df.loc[candidates, "is_fraud"] = 1 df.loc[candidates, "fraud_type"] = "weak_velocity" # Inject minimal temporal consistency df.loc[candidates, "dynamic_fraud_state"] += self.rng.normal(0.5, 0.1, size=len(candidates)).astype(np.float32) # ------------------------------------------------------- # FINAL FEATURE SANITISATION # ------------------------------------------------------- # Fraud is driven by latent chronology, not by any directly observable # per-event shortcut. Keep dynamic_fraud_state for mechanistic analysis, # but decorrelate the exported model-facing features after labels are fixed. df["amount"] = self.rng.normal(0, 1, size=n).astype(np.float32) df["risk_score"] = self.rng.normal(0, 1, size=n).astype(np.float32) df["fail_prob"] = self.rng.normal(0, 1, size=n).astype(np.float32) df["risk_noisy"] = self.rng.normal(0, 1, size=n).astype(np.float32) failed_rate = float(df["failed"].mean()) if "failed" in df.columns else 0.0 retry_rate = float(df["is_retry"].mean()) if "is_retry" in df.columns else 0.0 df["failed"] = self.rng.binomial(1, failed_rate, size=n).astype(np.int8) df["is_retry"] = self.rng.binomial(1, retry_rate, size=n).astype(np.int8) df["txn_count_10"] = self.rng.permutation(df["txn_count_10"].to_numpy()) df["amount_sum_10"] = self.rng.permutation(df["amount_sum_10"].to_numpy()) df["neighbor_score"] = self.rng.normal(0, 1, size=n).astype(np.float32) df["pair_freq"] = self.rng.normal(0, 1, size=n).astype(np.float32) return df def _is_standard_temporal_twins(self) -> bool: return self.benchmark_mode == "temporal_twins" def _standard_twin_profile(self) -> dict: return TEMPORAL_TWIN_STANDARD_PROFILES[self.difficulty] def _apply_temporal_twins(self, df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df = df.sort_values("timestamp").reset_index(drop=True) for column, default in ( ("is_retry", 0), ("failed", 0), ("risk_score", 0.0), ("fail_prob", 0.0), ): if column not in df.columns: df[column] = default sender_groups = { int(sender_id): group.sort_values("timestamp").reset_index(drop=True).copy() for sender_id, group in df.groupby("sender_id", sort=False) } if not sender_groups: return df out_frames = [] pair_id = 0 min_pair_events = 18 user_meta = [] for sender_id, group in sender_groups.items(): receiver_counts = Counter(int(receiver_id) for receiver_id in group["receiver_id"].tolist()) repeated_receivers = int(sum(count >= 2 for count in receiver_counts.values())) user_meta.append({ "sender_id": int(sender_id), "group": group, "count": int(len(group)), "repeated_receivers": repeated_receivers, "start_time": float(group["timestamp"].min()) if len(group) else 0.0, }) eligible_templates = [ meta for meta in user_meta if meta["count"] >= min_pair_events and meta["repeated_receivers"] >= 2 ] eligible_templates = sorted( eligible_templates, key=lambda meta: (-meta["count"], -meta["repeated_receivers"], meta["start_time"], meta["sender_id"]), ) carrier_meta = sorted( user_meta, key=lambda meta: (meta["start_time"], meta["sender_id"]), ) carrier_cursor = 0 template_cursor = 0 if not eligible_templates: while carrier_cursor < len(carrier_meta): carrier = carrier_meta[carrier_cursor] out_frames.append(self._make_background_user(carrier["group"], int(carrier["sender_id"]))) carrier_cursor += 1 out = pd.concat(out_frames, ignore_index=True) out = out.sort_values("timestamp").reset_index(drop=True) out["txn_id"] = np.arange(len(out), dtype=np.int32) return self._finalise_temporal_twin_features(out) while carrier_cursor + 1 < len(carrier_meta): fraud_carrier = carrier_meta[carrier_cursor] benign_carrier = carrier_meta[carrier_cursor + 1] built_pair = False for template_offset in range(len(eligible_templates)): template_idx = (template_cursor + template_offset) % len(eligible_templates) template_meta = eligible_templates[template_idx] template = template_meta["group"].copy().reset_index(drop=True) count_target = len(template) shared_layout = { "ordered_dts": self._order_deltas( np.diff(template["timestamp"].to_numpy(dtype=np.float64)), role="shared", ), "amount_perm": self.rng.permutation(count_target), "retry_perm": self.rng.permutation(count_target), "failed_perm": self.rng.permutation(count_target), } pair_start_time = float(template_meta["start_time"]) fraud_frame = self._build_temporal_twin_user( template_df=template, sender_id=int(fraud_carrier["sender_id"]), start_time=pair_start_time, pair_id=pair_id, role="fraud", shared_layout=shared_layout, template_id=int(template_meta["sender_id"]), ) if fraud_frame is None: continue benign_frame = self._build_temporal_twin_user( template_df=template, sender_id=int(benign_carrier["sender_id"]), start_time=pair_start_time, pair_id=pair_id, role="benign", shared_layout=shared_layout, fraud_reference=fraud_frame, template_id=int(template_meta["sender_id"]), ) if benign_frame is None: continue out_frames.append(fraud_frame) out_frames.append(benign_frame) pair_id += 1 carrier_cursor += 2 template_cursor = (template_idx + 1) % len(eligible_templates) built_pair = True break if not built_pair: out_frames.append(self._make_background_user(fraud_carrier["group"], int(fraud_carrier["sender_id"]))) out_frames.append(self._make_background_user(benign_carrier["group"], int(benign_carrier["sender_id"]))) carrier_cursor += 2 while carrier_cursor < len(carrier_meta): carrier = carrier_meta[carrier_cursor] out_frames.append(self._make_background_user(carrier["group"], int(carrier["sender_id"]))) carrier_cursor += 1 out = pd.concat(out_frames, ignore_index=True) out = out.sort_values("timestamp").reset_index(drop=True) out["txn_id"] = np.arange(len(out), dtype=np.int32) return self._finalise_temporal_twin_features(out) def _make_background_user(self, user_df: pd.DataFrame, sender_id: int) -> pd.DataFrame: out = user_df.copy().sort_values("timestamp").reset_index(drop=True) out["sender_id"] = int(sender_id) out["is_fraud"] = np.zeros(len(out), dtype=np.int8) out["fraud_type"] = "none" out["dynamic_fraud_state"] = np.zeros(len(out), dtype=np.float32) out["motif_source"] = np.zeros(len(out), dtype=np.int8) out["motif_chain_state"] = np.zeros(len(out), dtype=np.float32) out["motif_strength"] = np.zeros(len(out), dtype=np.float32) out["twin_pair_id"] = -1 out["template_id"] = -1 out["twin_role"] = "background" out["twin_label"] = 0 return out def _build_temporal_twin_user( self, template_df: pd.DataFrame, sender_id: int, start_time: float, pair_id: int, role: str, shared_layout: dict | None = None, fraud_reference: pd.DataFrame | None = None, template_id: int | None = None, ) -> pd.DataFrame: """Build one twin user, with retry logic in calib mode for fraud twins.""" calib_mode = self.benchmark_mode == "temporal_twins_oracle_calib" max_attempts = _CALIB_MOTIF_RETRY_BUDGET if (calib_mode and role == "fraud") else 1 for attempt in range(max_attempts): out = template_df.copy().reset_index(drop=True) n = len(out) timestamps = out["timestamp"].to_numpy(dtype=np.float64) if n <= 1: ordered_dts = np.zeros(0, dtype=np.float64) else: if shared_layout is not None and "ordered_dts" in shared_layout: ordered_dts = np.asarray(shared_layout["ordered_dts"], dtype=np.float64) else: ordered_dts = self._order_deltas(np.diff(timestamps), role=role) new_timestamps = np.empty(n, dtype=np.float64) new_timestamps[0] = max(0.0, float(start_time)) if n > 1: new_timestamps[1:] = new_timestamps[0] + np.cumsum(ordered_dts) out["timestamp"] = new_timestamps.astype(np.float32) camouflage_fraud = False if role == "fraud" and self._is_standard_temporal_twins(): camouflage_fraud = self.rng.random() < float(self._standard_twin_profile()["camouflage_prob"]) if role == "benign" and fraud_reference is not None: label_boundaries = sorted( fraud_reference.loc[ fraud_reference["is_fraud"] == 1, "label_event_idx", ].astype(int).unique().tolist() ) receiver_seq = self._order_receivers_benign_matched( fraud_receivers=fraud_reference["receiver_id"].to_numpy(dtype=np.int64), label_boundaries=label_boundaries, timestamps=out["timestamp"].to_numpy(dtype=np.float64), ) elif camouflage_fraud: receiver_seq = self._order_receivers_benign_greedy( receivers=out["receiver_id"].to_numpy(dtype=np.int64), timestamps=out["timestamp"].to_numpy(dtype=np.float64), ) else: receiver_seq = self._order_receivers( out["receiver_id"].to_numpy(dtype=np.int64), role=role, timestamps=out["timestamp"].to_numpy(dtype=np.float64), ) out["receiver_id"] = np.asarray(receiver_seq, dtype=np.int32) if role == "benign" and fraud_reference is not None: out = self._repair_benign_twin_segmented(out, label_boundaries) if shared_layout is not None: amount_perm = np.asarray(shared_layout["amount_perm"], dtype=np.int64) retry_perm = np.asarray(shared_layout["retry_perm"], dtype=np.int64) failed_perm = np.asarray(shared_layout["failed_perm"], dtype=np.int64) else: amount_perm = self.rng.permutation(n) retry_perm = self.rng.permutation(n) failed_perm = self.rng.permutation(n) out["amount"] = out["amount"].to_numpy(dtype=np.float32)[amount_perm] out["txn_type"] = out["txn_type"].to_numpy(dtype=np.int8) out["is_retry"] = out["is_retry"].to_numpy(dtype=np.int8)[retry_perm] out["failed"] = out["failed"].to_numpy(dtype=np.int8)[failed_perm] out["risk_score"] = out["risk_score"].to_numpy(dtype=np.float32) out["fail_prob"] = out["fail_prob"].to_numpy(dtype=np.float32) out["sender_id"] = int(sender_id) out["is_fraud"] = 0 out["fraud_type"] = "none" out["twin_pair_id"] = int(pair_id) out["template_id"] = int(template_id if template_id is not None else pair_id) out["twin_role"] = role out["twin_label"] = 1 if role == "fraud" else 0 out = out.sort_values("timestamp").reset_index(drop=True) if role == "benign" and fraud_reference is None: out = self._repair_benign_twin(out) if calib_mode: result = self._apply_twin_labels_calib(out, role=role) # In calib mode, fraud twin MUST have >= 1 motif-sourced positive if role == "fraud": if int(result["is_fraud"].sum()) > 0: return result if attempt < max_attempts - 1: continue # retry with a fresh random permutation # Exhausted retries — drop this pair (caller detects via None) print( f"[calib] WARNING: pair_id={pair_id} sender={sender_id} " f"produced 0 motif hits after {max_attempts} attempts — dropping pair." ) return None # type: ignore[return-value] if int(result["motif_hit_count"].max()) > 0: return None # type: ignore[return-value] return result else: result = self._apply_twin_labels_standard(out, role=role) if role == "benign" and int(result["motif_hit_count"].max()) > 0: return None # type: ignore[return-value] return result # Should not reach here return self._apply_twin_labels_standard( out.sort_values("timestamp").reset_index(drop=True), role=role ) def _repair_benign_twin(self, user_df: pd.DataFrame) -> pd.DataFrame: """Greedily perturb a benign receiver order to minimize motif hits.""" out = user_df.copy().sort_values("timestamp").reset_index(drop=True) receivers = out["receiver_id"].to_numpy(dtype=np.int64).copy() timestamps = out["timestamp"].to_numpy(dtype=np.float64) trace = temporal_twin_motif_trace(timestamps, receivers) if int(np.sum(trace["source"])) == 0: return out best_receivers = receivers.copy() best_hits = int(np.sum(trace["source"])) for _ in range(_BENIGN_MOTIF_REPAIR_STEPS): source_positions = np.flatnonzero(trace["source"]).tolist() if not source_positions: out["receiver_id"] = receivers.astype(np.int32) return out src_idx = int(source_positions[0]) candidate_receivers = None candidate_hits = best_hits for swap_offset in (1, -1, 2, -2, 3, -3): swap_idx = src_idx + swap_offset if swap_idx < 0 or swap_idx >= len(receivers): continue if receivers[swap_idx] == receivers[src_idx]: continue trial = receivers.copy() trial[src_idx], trial[swap_idx] = trial[swap_idx], trial[src_idx] trial_hits = int(np.sum(temporal_twin_motif_trace(timestamps, trial)["source"])) if trial_hits < candidate_hits: candidate_receivers = trial candidate_hits = trial_hits if trial_hits == 0: break if candidate_receivers is None: break receivers = candidate_receivers trace = temporal_twin_motif_trace(timestamps, receivers) best_receivers = receivers.copy() best_hits = candidate_hits out["receiver_id"] = best_receivers.astype(np.int32) return out def _repair_benign_twin_segmented( self, user_df: pd.DataFrame, label_boundaries: list[int], ) -> pd.DataFrame: """Reduce benign motif hits while preserving each matched prefix segment multiset.""" out = user_df.copy().sort_values("timestamp").reset_index(drop=True) receivers = out["receiver_id"].to_numpy(dtype=np.int64).copy() timestamps = out["timestamp"].to_numpy(dtype=np.float64) n = len(receivers) if n == 0: return out boundaries = sorted(int(boundary) for boundary in label_boundaries if 0 <= int(boundary) < n) if not boundaries or boundaries[-1] != n - 1: boundaries.append(n - 1) segments: list[tuple[int, int]] = [] start = 0 for end in boundaries: segments.append((start, end)) start = end + 1 def segment_bounds(idx: int) -> tuple[int, int]: for lo, hi in segments: if lo <= idx <= hi: return lo, hi return 0, n - 1 trace = temporal_twin_motif_trace(timestamps, receivers) if int(np.sum(trace["source"])) == 0: return out best_receivers = receivers.copy() best_hits = int(np.sum(trace["source"])) for _ in range(_BENIGN_MOTIF_REPAIR_STEPS * 2): source_positions = np.flatnonzero(trace["source"]).tolist() if not source_positions: out["receiver_id"] = receivers.astype(np.int32) return out src_idx = int(source_positions[0]) seg_lo, seg_hi = segment_bounds(src_idx) candidate_receivers = None candidate_hits = best_hits for swap_offset in (1, -1, 2, -2, 3, -3, 4, -4): swap_idx = src_idx + swap_offset if swap_idx < seg_lo or swap_idx > seg_hi: continue if receivers[swap_idx] == receivers[src_idx]: continue trial = receivers.copy() trial[src_idx], trial[swap_idx] = trial[swap_idx], trial[src_idx] trial_hits = int(np.sum(temporal_twin_motif_trace(timestamps, trial)["source"])) if trial_hits < candidate_hits: candidate_receivers = trial candidate_hits = trial_hits if trial_hits == 0: break if candidate_receivers is None: continue receivers = candidate_receivers trace = temporal_twin_motif_trace(timestamps, receivers) best_receivers = receivers.copy() best_hits = candidate_hits out["receiver_id"] = best_receivers.astype(np.int32) return out def _order_deltas(self, deltas: np.ndarray, role: str) -> np.ndarray: deltas = np.asarray(deltas, dtype=np.float64) if len(deltas) == 0: return deltas deltas = np.clip(deltas, 60.0, None) short_q = float(np.quantile(deltas, 0.55)) long_q = float(np.quantile(deltas, 0.82)) shorts = list(np.sort(deltas[deltas <= short_q]).astype(np.float64)) mediums = list(np.sort(deltas[(deltas > short_q) & (deltas < long_q)]).astype(np.float64)) longs = list(np.sort(deltas[deltas >= long_q])[::-1].astype(np.float64)) def pop_front(pool): return pool.pop(0) if pool else None def pop_back(pool): return pool.pop() if pool else None def pop_short(): return pop_front(shorts) def pop_short_fast(): return pop_front(shorts) def pop_short_slow(): return pop_back(shorts) if shorts else None def pop_medium(): if mediums: return pop_front(mediums) if len(shorts) >= 2: return pop_back(shorts) if longs: return pop_back(longs) return None def pop_long(): if longs: return pop_front(longs) if mediums: return pop_back(mediums) if shorts: return pop_back(shorts) return None def pop_any(): for getter in (pop_medium, pop_long, pop_short): value = getter() if value is not None: return value return None ordered: list[float] = [] if self._is_standard_temporal_twins(): recipe_name = self._standard_twin_profile()["delta_recipe"] if recipe_name == "easy": motif_recipe = [ pop_long, pop_medium, pop_short_slow, pop_short_fast, pop_long, pop_short_slow, pop_short_fast, ] elif recipe_name == "medium": motif_recipe = [ pop_long, pop_medium, pop_short_slow, pop_medium, pop_short_fast, pop_long, pop_medium, pop_short_fast, ] else: motif_recipe = [ pop_long, pop_medium, pop_short_slow, pop_medium, pop_short_fast, pop_long, pop_medium, pop_short_slow, pop_short_fast, ] else: motif_recipe = [ pop_long, # quiet period pop_medium, # accelerating cadence starts pop_short_slow, pop_short_fast, # delayed revisit lands here pop_long, # release pop_short_slow, pop_short_fast, # burst-release-burst completion ] while len(ordered) < len(deltas): if self._is_standard_temporal_twins(): if self.rng.random() > float(self._standard_twin_profile()["motif_cycle_prob"]): value = pop_any() if value is None: break ordered.append(float(value)) continue emitted = False for getter in motif_recipe: value = getter() if value is None: continue ordered.append(float(value)) emitted = True if len(ordered) >= len(deltas): break if not emitted: value = pop_any() if value is None: break ordered.append(float(value)) if len(ordered) != len(deltas): fallback = np.sort(deltas) ordered = list(fallback[: len(deltas)]) return np.asarray(ordered, dtype=np.float64) def _order_receivers( self, receivers: np.ndarray, role: str, timestamps: np.ndarray | None = None, ) -> list[int]: if role == "benign" and timestamps is not None: return self._order_receivers_benign_greedy( receivers=np.asarray(receivers, dtype=np.int64), timestamps=np.asarray(timestamps, dtype=np.float64), ) counts = Counter(int(receiver_id) for receiver_id in receivers.tolist()) ordered: list[int] = [] def sorted_candidates(exclude: set[int] | None = None): exclude = exclude or set() return [ receiver for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])) if count > 0 and receiver not in exclude ] def pop_receiver(exclude: set[int] | None = None): candidates = sorted_candidates(exclude=exclude) if not candidates: return None receiver = int(candidates[0]) counts[receiver] -= 1 return receiver while len(ordered) < len(receivers): if role == "fraud": anchor = next( ( receiver for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])) if count >= 2 ), None, ) inject_block = True if self._is_standard_temporal_twins(): inject_block = self.rng.random() <= float(self._standard_twin_profile()["fraud_block_prob"]) if inject_block and anchor is not None and len(receivers) - len(ordered) >= 8: fillers = [] used_in_block = {int(anchor)} for _ in range(6): filler = pop_receiver(exclude=used_in_block) if filler is None: break fillers.append(filler) used_in_block.add(int(filler)) if len(fillers) == 6: counts[int(anchor)] -= 2 if self._is_standard_temporal_twins(): gap = int(self._standard_twin_profile()["receiver_gap"]) block = [int(anchor)] block.extend(fillers[: gap - 1]) block.append(int(anchor)) block.extend(fillers[gap - 1 :]) ordered.extend(block[:8]) else: ordered.extend( [ int(anchor), fillers[0], fillers[1], int(anchor), fillers[2], fillers[3], fillers[4], fillers[5], ] ) continue for filler in fillers: counts[int(filler)] += 1 if role == "benign": anchor = next( ( receiver for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])) if count >= 2 ), None, ) if anchor is not None and len(receivers) - len(ordered) >= 8: fillers = [] used_in_block = {int(anchor)} for _ in range(6): filler = pop_receiver(exclude=used_in_block) if filler is None: break fillers.append(filler) used_in_block.add(int(filler)) if len(fillers) == 6: counts[int(anchor)] -= 2 ordered.extend( [ int(anchor), fillers[0], int(anchor), fillers[1], fillers[2], fillers[3], fillers[4], fillers[5], ] ) continue for filler in fillers: counts[int(filler)] += 1 exclude = {int(ordered[-1])} if ordered else set() chosen = pop_receiver(exclude=exclude) if chosen is not None: ordered.append(chosen) continue chosen = pop_receiver(exclude=None) if chosen is not None: ordered.append(chosen) continue return ordered[: len(receivers)] def _select_standard_twin_sources( self, trace: dict, n_events: int, ) -> list[tuple[int, bool]]: profile = self._standard_twin_profile() target_events = max( int(profile["min_events"]), min(int(profile["max_events_cap"]), max(1, n_events // int(profile["event_divisor"]))), ) min_idx = 7 source_positions = [ int(pos) for pos in np.flatnonzero(trace["source"]).tolist() if int(pos) >= min_idx ] ranked_chain = [ int(pos) for pos in np.argsort(trace["chain"])[::-1].tolist() if int(pos) >= min_idx ] chain_only = [pos for pos in ranked_chain if pos not in set(source_positions)] if source_positions: keep_n = int(np.ceil(len(source_positions) * float(profile["source_keep_frac"]))) keep_n = max(int(profile["min_true_sources"]), min(len(source_positions), keep_n)) else: keep_n = 0 source_pool_n = min( len(source_positions), max(keep_n, int(np.ceil(keep_n * float(profile["source_pool_factor"])))), ) source_pool = source_positions[:source_pool_n] if keep_n > 0 and len(source_pool) > keep_n: sampled_true = self.rng.choice(np.asarray(source_pool, dtype=np.int64), size=keep_n, replace=False) true_sources = sorted(int(pos) for pos in sampled_true.tolist()) else: true_sources = source_pool[:keep_n] selected: list[tuple[int, bool]] = [(pos, False) for pos in true_sources] used = {pos for pos, _ in selected} fallback_cap = int(profile["max_chain_fallback"]) chain_pool_n = min( len(chain_only), max(fallback_cap, int(np.ceil(fallback_cap * float(profile["chain_pool_factor"])))), ) chain_pool = chain_only[:chain_pool_n] if fallback_cap > 0 and len(chain_pool) > fallback_cap: sampled_chain = self.rng.choice(np.asarray(chain_pool, dtype=np.int64), size=fallback_cap, replace=False) chain_choices = sorted(int(pos) for pos in sampled_chain.tolist()) else: chain_choices = chain_pool[:fallback_cap] for pos in chain_choices: if len(selected) >= target_events: break selected.append((pos, True)) used.add(pos) if not selected: fallback_candidates = ranked_chain[:target_events] selected = [(pos, True) for pos in fallback_candidates] if len(selected) < target_events: for pos in source_positions[keep_n:]: if pos in used: continue selected.append((pos, False)) used.add(pos) if len(selected) >= target_events: break if len(selected) < target_events: for pos in ranked_chain: if pos in used: continue selected.append((pos, True)) used.add(pos) if len(selected) >= target_events: break selected.sort(key=lambda item: item[0]) return selected[:target_events] def _order_receivers_benign_greedy( self, receivers: np.ndarray, timestamps: np.ndarray, ) -> list[int]: """Build a benign ordering that avoids 3..8-step receiver revisits.""" counts = Counter(int(receiver_id) for receiver_id in receivers.tolist()) ordered: list[int] = [] last_pos: dict[int, int] = {} while len(ordered) < len(receivers): best_receiver = None best_key = None for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])): if count <= 0: continue prev = last_pos.get(int(receiver)) if prev is None: revisit_penalty = 0 adjacent_bonus = 1 long_gap_bonus = 1 else: gap = len(ordered) - prev revisit_penalty = 1 if 3 <= gap <= 8 else 0 adjacent_bonus = 0 if gap <= 2 else 1 long_gap_bonus = 0 if gap > 8 else 1 key = ( revisit_penalty, adjacent_bonus, long_gap_bonus, -int(count), int(receiver), ) if best_key is None or key < best_key: best_key = key best_receiver = int(receiver) assert best_receiver is not None counts[best_receiver] -= 1 ordered.append(best_receiver) last_pos[best_receiver] = len(ordered) - 1 return ordered def _order_receivers_benign_matched( self, fraud_receivers: np.ndarray, label_boundaries: list[int], timestamps: np.ndarray, ) -> list[int]: """Match fraud prefix histograms at every label boundary while reordering within segments.""" n = len(fraud_receivers) if n == 0: return [] boundaries = sorted( int(boundary) for boundary in label_boundaries if 0 <= int(boundary) < n ) if not boundaries or boundaries[-1] != n - 1: boundaries.append(n - 1) ordered: list[int] = [] last_pos: dict[int, int] = {} start = 0 for end in boundaries: segment = fraud_receivers[start : end + 1] ordered.extend( self._order_benign_segment( segment_receivers=segment, ordered_prefix=ordered, last_pos=last_pos, full_timestamps=np.asarray(timestamps[: end + 1], dtype=np.float64), ) ) start = end + 1 return ordered def _order_benign_segment( self, segment_receivers: np.ndarray, ordered_prefix: list[int], last_pos: dict[int, int], full_timestamps: np.ndarray, ) -> list[int]: counts = Counter(int(receiver_id) for receiver_id in segment_receivers.tolist()) segment_out: list[int] = [] while len(segment_out) < len(segment_receivers): best_receiver = None best_key = None global_idx = len(ordered_prefix) + len(segment_out) for receiver, count in sorted(counts.items(), key=lambda item: (-item[1], item[0])): if count <= 0: continue prev = last_pos.get(int(receiver)) if prev is None: revisit_penalty = 0 seen_penalty = 0 adjacent_bonus = 1 long_gap_bonus = 1 else: gap = global_idx - prev revisit_penalty = 1 if 3 <= gap <= 8 else 0 seen_penalty = 1 adjacent_bonus = 0 if gap <= 2 else 1 long_gap_bonus = 0 if gap > 8 else 1 key = ( revisit_penalty, adjacent_bonus, long_gap_bonus, seen_penalty, -int(count), int(receiver), ) if best_key is None or key < best_key: best_key = key best_receiver = int(receiver) assert best_receiver is not None counts[best_receiver] -= 1 segment_out.append(best_receiver) last_pos[best_receiver] = global_idx return segment_out # ------------------------------------------------------------------ # Label-assignment: shared helpers # ------------------------------------------------------------------ def _attach_audit_columns( self, out: pd.DataFrame, fraud_flags: np.ndarray, trigger_idxs: list, # list of (target_idx, src_idx) tuples is_fallback: np.ndarray, trace: dict, ) -> pd.DataFrame: """Attach per-event audit columns to the twin user DataFrame.""" n = len(out) motif_hit_count = int(np.sum(trace["source"])) fraud_source_col = np.full(n, "none", dtype=object) trigger_event_idx_col = np.full(n, -1, dtype=np.int32) label_event_idx_col = np.full(n, -1, dtype=np.int32) label_delay_col = np.full(n, -1, dtype=np.int32) for target_idx, src_idx in trigger_idxs: fraud_source_col[target_idx] = "motif" if not is_fallback[target_idx] else "chain_fallback" trigger_event_idx_col[target_idx] = int(src_idx) label_event_idx_col[target_idx] = int(target_idx) label_delay_col[target_idx] = int(target_idx - src_idx) out["fraud_source"] = fraud_source_col out["motif_hit_count"] = motif_hit_count out["trigger_event_idx"] = trigger_event_idx_col out["label_event_idx"] = label_event_idx_col out["label_delay"] = label_delay_col out["is_fallback_label"] = is_fallback.astype(np.int8) return out # ------------------------------------------------------------------ # Standard mode: motif hits preferred, chain-rank fallback allowed # ------------------------------------------------------------------ def _apply_twin_labels_standard(self, user_df: pd.DataFrame, role: str) -> pd.DataFrame: out = user_df.copy().sort_values("timestamp").reset_index(drop=True) n = len(out) empty_audit = { "fraud_source": np.full(n, "none", dtype=object), "motif_hit_count": 0, "trigger_event_idx": np.full(n, -1, dtype=np.int32), "label_event_idx": np.full(n, -1, dtype=np.int32), "label_delay": np.full(n, -1, dtype=np.int32), "is_fallback_label": np.zeros(n, dtype=np.int8), } if n == 0: out["dynamic_fraud_state"] = np.zeros(0, dtype=np.float32) out["motif_source"] = np.zeros(0, dtype=np.int8) out["motif_chain_state"] = np.zeros(0, dtype=np.float32) out["motif_strength"] = np.zeros(0, dtype=np.float32) for col, val in empty_audit.items(): out[col] = val if isinstance(val, int) else val return out timestamps = out["timestamp"].to_numpy(dtype=np.float64) receivers = out["receiver_id"].to_numpy(dtype=np.int64) trace = temporal_twin_motif_trace(timestamps, receivers) state = trace["state"].copy() fraud_flags = np.zeros(n, dtype=np.int8) fraud_type = np.full(n, "none", dtype=object) is_fallback = np.zeros(n, dtype=np.int8) source_positions = np.flatnonzero(trace["source"]).tolist() trigger_pairs: list = [] # (target_idx, src_idx) if role == "fraud": if self._is_standard_temporal_twins(): selected_sources = self._select_standard_twin_sources(trace, n) else: max_events = max(4, min(12, n // 5)) used_fallback = False if not source_positions: ranked = np.argsort(trace["chain"])[::-1] source_positions = [int(pos) for pos in ranked if int(pos) >= 7][:max_events] used_fallback = True selected_sources = [(src, used_fallback) for src in source_positions[:max_events]] used_targets = set() for src, used_fallback in selected_sources: if src >= n - 1: target = src else: if self._is_standard_temporal_twins(): delay_lo, delay_hi = self._standard_twin_profile()["delay_range"] sampled_delay = int(self.rng.integers(delay_lo, delay_hi + 1)) else: sampled_delay = int(self.rng.integers(6, 17)) delay = min(sampled_delay, (n - 1) - src) target = src + max(delay, 1) if target in used_targets: continue used_targets.add(target) fraud_flags[target] = 1 fraud_type[target] = "temporal_twin" if used_fallback: is_fallback[target] = 1 trigger_pairs.append((target, src)) lo = max(0, src) hi = min(n, target + 1) ramp = np.linspace(0.15, 0.85, num=max(1, hi - lo), dtype=np.float32) state[lo:hi] += ramp out["motif_source"] = trace["source"].astype(np.int8) out["motif_chain_state"] = trace["chain"].astype(np.float32) out["motif_strength"] = trace["motif_strength"].astype(np.float32) out["dynamic_fraud_state"] = state.astype(np.float32) out["is_fraud"] = fraud_flags.astype(np.int8) out["fraud_type"] = fraud_type return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace) # ------------------------------------------------------------------ # Calib mode: ONLY true motif hits allowed — zero fallback # ------------------------------------------------------------------ def _apply_twin_labels_calib(self, user_df: pd.DataFrame, role: str) -> pd.DataFrame: out = user_df.copy().sort_values("timestamp").reset_index(drop=True) n = len(out) if n == 0: out["dynamic_fraud_state"] = np.zeros(0, dtype=np.float32) out["motif_source"] = np.zeros(0, dtype=np.int8) out["motif_chain_state"] = np.zeros(0, dtype=np.float32) out["motif_strength"] = np.zeros(0, dtype=np.float32) for col in ("fraud_source", "motif_hit_count", "trigger_event_idx", "label_event_idx", "label_delay", "is_fallback_label"): out[col] = 0 return out timestamps = out["timestamp"].to_numpy(dtype=np.float64) receivers = out["receiver_id"].to_numpy(dtype=np.int64) trace = temporal_twin_motif_trace(timestamps, receivers) state = trace["state"].copy() fraud_flags = np.zeros(n, dtype=np.int8) fraud_type = np.full(n, "none", dtype=object) is_fallback = np.zeros(n, dtype=np.int8) # always 0 in calib trigger_pairs: list = [] if role == "fraud": source_positions = np.flatnonzero(trace["source"]).tolist() # No fallback: if 0 motif sources → return with all-zero fraud flags # (caller will retry or drop the pair) if not source_positions: # Still attach trace metadata but produce no positive labels out["motif_source"] = trace["source"].astype(np.int8) out["motif_chain_state"] = trace["chain"].astype(np.float32) out["motif_strength"] = trace["motif_strength"].astype(np.float32) out["dynamic_fraud_state"] = state.astype(np.float32) out["is_fraud"] = np.zeros(n, dtype=np.int8) out["fraud_type"] = fraud_type return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace) max_events = max(4, min(12, n // 5)) used_targets = set() for src in source_positions[:max_events]: if src >= n - 1: target = src else: delay = min(int(self.rng.integers(6, 17)), (n - 1) - src) target = src + max(delay, 1) if target in used_targets: continue used_targets.add(target) fraud_flags[target] = 1 fraud_type[target] = "temporal_twin_calib" trigger_pairs.append((target, src)) lo = max(0, src) hi = min(n, target + 1) ramp = np.linspace(0.15, 0.85, num=max(1, hi - lo), dtype=np.float32) state[lo:hi] += ramp out["motif_source"] = trace["source"].astype(np.int8) out["motif_chain_state"] = trace["chain"].astype(np.float32) out["motif_strength"] = trace["motif_strength"].astype(np.float32) out["dynamic_fraud_state"] = state.astype(np.float32) out["is_fraud"] = fraud_flags.astype(np.int8) out["fraud_type"] = fraud_type return self._attach_audit_columns(out, fraud_flags, trigger_pairs, is_fallback, trace) def _finalise_temporal_twin_features(self, df: pd.DataFrame) -> pd.DataFrame: out = df.copy().sort_values("timestamp").reset_index(drop=True) n = len(out) out["amount"] = np.zeros(n, dtype=np.float32) out["risk_score"] = np.zeros(n, dtype=np.float32) out["fail_prob"] = np.zeros(n, dtype=np.float32) out["risk_noisy"] = np.zeros(n, dtype=np.float32) out["neighbor_score"] = np.zeros(n, dtype=np.float32) out["pair_freq"] = np.zeros(n, dtype=np.float32) out["txn_count_10"] = ( out.groupby("sender_id")["timestamp"] .transform(lambda x: x.rolling(10, min_periods=1).count()) .astype(np.float32) ) out["amount_sum_10"] = ( out.groupby("sender_id")["amount"] .transform(lambda x: x.rolling(10, min_periods=1).sum()) .astype(np.float32) ) out["is_fraud"] = out["is_fraud"].astype(np.int8) out["is_retry"] = out["is_retry"].astype(np.int8) out["failed"] = out["failed"].astype(np.int8) out["twin_pair_id"] = out["twin_pair_id"].astype(np.int32) out["template_id"] = out["template_id"].astype(np.int32) out["twin_label"] = out["twin_label"].astype(np.int8) out["receiver_id"] = out["receiver_id"].astype(np.int32) out["sender_id"] = out["sender_id"].astype(np.int32) if "motif_source" in out.columns: out["motif_source"] = out["motif_source"].astype(np.int8) # Audit columns: fill defaults for background users, then cast for col, default, dtype in ( ("motif_hit_count", 0, np.int32), ("trigger_event_idx", -1, np.int32), ("label_event_idx", -1, np.int32), ("label_delay", -1, np.int32), ("is_fallback_label", 0, np.int8), ): if col in out.columns: out[col] = out[col].fillna(default).astype(dtype) else: out[col] = np.full(n, default, dtype=dtype) if "fraud_source" not in out.columns: out["fraud_source"] = np.full(n, "none", dtype=object) else: out["fraud_source"] = out["fraud_source"].fillna("none") return out