""" models/xgboost_model.py ======================= Leakage-free XGBoost baseline trained on causal node-prefix features. The baseline intentionally uses the real `xgboost.XGBClassifier` only. It does not rely on multiprocessing or sklearn substitutes. """ from __future__ import annotations from typing import List import numpy as np import pandas as pd from xgboost import XGBClassifier from models.base import TemporalModel # Columns that must never reach a learned baseline _BLOCKED_COLS = frozenset({ "motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx", "label_delay", "is_fallback_label", "fraud_source", "twin_role", "twin_label", "twin_pair_id", "template_id", "dynamic_fraud_state", "motif_chain_state", "motif_strength", }) class XGBoostWrapper(TemporalModel): """XGBoost baseline with node-level prefix aggregates.""" def __init__(self, n_estimators: int = 200, max_depth: int = 6): self.n_estimators = n_estimators self.max_depth = max_depth self._model: XGBClassifier | None = None self._constant_prob: float | None = None self._feature_names: List[str] = [] @property def name(self) -> str: return "XGBoost" @property def is_temporal(self) -> bool: return False @staticmethod def _extract_features(df: pd.DataFrame) -> pd.DataFrame: """Causal node-level aggregation from a sorted prefix only.""" leaked = _BLOCKED_COLS & set(df.columns) assert not leaked, f"Oracle columns leaked into XGBoost: {leaked}" df = df.sort_values("timestamp").reset_index(drop=True).copy() df["_td"] = df.groupby("sender_id")["timestamp"].diff().fillna(0.0) df["_rc10"] = ( df.groupby("sender_id")["timestamp"] .transform(lambda x: x.rolling(10, min_periods=1).count()) ) grp = df.groupby("sender_id") feats = pd.DataFrame({ "txn_count": grp["sender_id"].count(), "txn_cnt10_last": grp["_rc10"].last(), "amount_mean": grp["amount"].mean(), "amount_std": grp["amount"].std().fillna(0.0), "amount_max": grp["amount"].max(), "td_mean": grp["_td"].mean(), "td_std": grp["_td"].std().fillna(0.0), "fail_rate": grp["failed"].mean() if "failed" in df.columns else 0.0, "retry_rate": grp["is_retry"].mean() if "is_retry" in df.columns else 0.0, }) pair_counts = ( df.groupby(["sender_id", "receiver_id"]) .size() .reset_index(name="_n") ) pair_counts["_tot"] = pair_counts.groupby("sender_id")["_n"].transform("sum") pair_counts["_p"] = pair_counts["_n"] / pair_counts["_tot"] pair_counts["_h"] = -pair_counts["_p"] * np.log2(pair_counts["_p"] + 1e-9) feats["recv_entropy"] = pair_counts.groupby("sender_id")["_h"].sum() if "pair_freq" in df.columns: feats["pair_freq_mean"] = grp["pair_freq"].mean() else: feats["pair_freq_mean"] = 0.0 return feats.fillna(0.0) def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None: """No-op backbone step; actual supervised fit happens on a training prefix.""" self._model = None self._constant_prob = None self._feature_names = [] def train_node_classifier_on_prefix( self, df_prefix: pd.DataFrame, eval_nodes: List[int], y_labels: np.ndarray, num_epochs: int = 150, ) -> None: X = self._extract_features(df_prefix).reindex(eval_nodes).fillna(0.0) y = np.asarray(y_labels, dtype=np.int64) self._feature_names = list(X.columns) if len(np.unique(y)) < 2: self._model = None self._constant_prob = float(y.mean()) if len(y) else 0.0 return scale_pos_weight = max(1.0, float((y == 0).sum()) / max(float((y == 1).sum()), 1.0)) self._model = XGBClassifier( n_estimators=self.n_estimators, max_depth=self.max_depth, learning_rate=0.05, objective="binary:logistic", eval_metric="logloss", scale_pos_weight=scale_pos_weight, random_state=42, verbosity=0, n_jobs=1, tree_method="exact", ) self._model.fit(X.values.astype(np.float32), y) self._constant_prob = None # Print top-5 feature importances for static shortcut audit importances = self._model.feature_importances_ ranked = np.argsort(importances)[::-1] feat_names = list(X.columns) print(" [XGBoost] Top-5 feature importances:") for i in ranked[:5]: print(f" {feat_names[i]:<20}: {importances[i]:.4f}") def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray: X_eval = self._extract_features(df_eval).reindex(eval_nodes).fillna(0.0) if self._constant_prob is not None: return np.full(len(eval_nodes), self._constant_prob, dtype=np.float32) assert self._model is not None, "Call train_node_classifier_on_prefix() first." probs = self._model.predict_proba(X_eval.values.astype(np.float32))[:, 1] return np.asarray(probs, dtype=np.float32) def reset_memory(self) -> None: """No-op: XGBoost has no temporal memory.""" pass