| """ |
| models/xgboost_model.py |
| ======================= |
| Leakage-free XGBoost baseline trained on causal node-prefix features. |
| |
| The baseline intentionally uses the real `xgboost.XGBClassifier` only. |
| It does not rely on multiprocessing or sklearn substitutes. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import List |
|
|
| import numpy as np |
| import pandas as pd |
| from xgboost import XGBClassifier |
|
|
| from models.base import TemporalModel |
|
|
| |
| _BLOCKED_COLS = frozenset({ |
| "motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx", |
| "label_delay", "is_fallback_label", "fraud_source", |
| "twin_role", "twin_label", "twin_pair_id", "template_id", |
| "dynamic_fraud_state", "motif_chain_state", "motif_strength", |
| }) |
|
|
|
|
|
|
| class XGBoostWrapper(TemporalModel): |
| """XGBoost baseline with node-level prefix aggregates.""" |
|
|
| def __init__(self, n_estimators: int = 200, max_depth: int = 6): |
| self.n_estimators = n_estimators |
| self.max_depth = max_depth |
| self._model: XGBClassifier | None = None |
| self._constant_prob: float | None = None |
| self._feature_names: List[str] = [] |
|
|
| @property |
| def name(self) -> str: |
| return "XGBoost" |
|
|
| @property |
| def is_temporal(self) -> bool: |
| return False |
|
|
| @staticmethod |
| def _extract_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Causal node-level aggregation from a sorted prefix only.""" |
| leaked = _BLOCKED_COLS & set(df.columns) |
| assert not leaked, f"Oracle columns leaked into XGBoost: {leaked}" |
|
|
| df = df.sort_values("timestamp").reset_index(drop=True).copy() |
| df["_td"] = df.groupby("sender_id")["timestamp"].diff().fillna(0.0) |
| df["_rc10"] = ( |
| df.groupby("sender_id")["timestamp"] |
| .transform(lambda x: x.rolling(10, min_periods=1).count()) |
| ) |
|
|
| grp = df.groupby("sender_id") |
| feats = pd.DataFrame({ |
| "txn_count": grp["sender_id"].count(), |
| "txn_cnt10_last": grp["_rc10"].last(), |
| "amount_mean": grp["amount"].mean(), |
| "amount_std": grp["amount"].std().fillna(0.0), |
| "amount_max": grp["amount"].max(), |
| "td_mean": grp["_td"].mean(), |
| "td_std": grp["_td"].std().fillna(0.0), |
| "fail_rate": grp["failed"].mean() if "failed" in df.columns else 0.0, |
| "retry_rate": grp["is_retry"].mean() if "is_retry" in df.columns else 0.0, |
| }) |
|
|
| pair_counts = ( |
| df.groupby(["sender_id", "receiver_id"]) |
| .size() |
| .reset_index(name="_n") |
| ) |
| pair_counts["_tot"] = pair_counts.groupby("sender_id")["_n"].transform("sum") |
| pair_counts["_p"] = pair_counts["_n"] / pair_counts["_tot"] |
| pair_counts["_h"] = -pair_counts["_p"] * np.log2(pair_counts["_p"] + 1e-9) |
| feats["recv_entropy"] = pair_counts.groupby("sender_id")["_h"].sum() |
|
|
| if "pair_freq" in df.columns: |
| feats["pair_freq_mean"] = grp["pair_freq"].mean() |
| else: |
| feats["pair_freq_mean"] = 0.0 |
|
|
| return feats.fillna(0.0) |
|
|
|
|
| def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None: |
| """No-op backbone step; actual supervised fit happens on a training prefix.""" |
| self._model = None |
| self._constant_prob = None |
| self._feature_names = [] |
|
|
| def train_node_classifier_on_prefix( |
| self, |
| df_prefix: pd.DataFrame, |
| eval_nodes: List[int], |
| y_labels: np.ndarray, |
| num_epochs: int = 150, |
| ) -> None: |
| X = self._extract_features(df_prefix).reindex(eval_nodes).fillna(0.0) |
| y = np.asarray(y_labels, dtype=np.int64) |
| self._feature_names = list(X.columns) |
|
|
| if len(np.unique(y)) < 2: |
| self._model = None |
| self._constant_prob = float(y.mean()) if len(y) else 0.0 |
| return |
|
|
| scale_pos_weight = max(1.0, float((y == 0).sum()) / max(float((y == 1).sum()), 1.0)) |
| self._model = XGBClassifier( |
| n_estimators=self.n_estimators, |
| max_depth=self.max_depth, |
| learning_rate=0.05, |
| objective="binary:logistic", |
| eval_metric="logloss", |
| scale_pos_weight=scale_pos_weight, |
| random_state=42, |
| verbosity=0, |
| n_jobs=1, |
| tree_method="exact", |
| ) |
| self._model.fit(X.values.astype(np.float32), y) |
| self._constant_prob = None |
|
|
| |
| importances = self._model.feature_importances_ |
| ranked = np.argsort(importances)[::-1] |
| feat_names = list(X.columns) |
| print(" [XGBoost] Top-5 feature importances:") |
| for i in ranked[:5]: |
| print(f" {feat_names[i]:<20}: {importances[i]:.4f}") |
|
|
|
|
| def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray: |
| X_eval = self._extract_features(df_eval).reindex(eval_nodes).fillna(0.0) |
| if self._constant_prob is not None: |
| return np.full(len(eval_nodes), self._constant_prob, dtype=np.float32) |
| assert self._model is not None, "Call train_node_classifier_on_prefix() first." |
| probs = self._model.predict_proba(X_eval.values.astype(np.float32))[:, 1] |
| return np.asarray(probs, dtype=np.float32) |
|
|
| def reset_memory(self) -> None: |
| """No-op: XGBoost has no temporal memory.""" |
| pass |
|
|