File size: 5,468 Bytes
a3682cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """
models/xgboost_model.py
=======================
Leakage-free XGBoost baseline trained on causal node-prefix features.
The baseline intentionally uses the real `xgboost.XGBClassifier` only.
It does not rely on multiprocessing or sklearn substitutes.
"""
from __future__ import annotations
from typing import List
import numpy as np
import pandas as pd
from xgboost import XGBClassifier
from models.base import TemporalModel
# Columns that must never reach a learned baseline
_BLOCKED_COLS = frozenset({
"motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx",
"label_delay", "is_fallback_label", "fraud_source",
"twin_role", "twin_label", "twin_pair_id", "template_id",
"dynamic_fraud_state", "motif_chain_state", "motif_strength",
})
class XGBoostWrapper(TemporalModel):
"""XGBoost baseline with node-level prefix aggregates."""
def __init__(self, n_estimators: int = 200, max_depth: int = 6):
self.n_estimators = n_estimators
self.max_depth = max_depth
self._model: XGBClassifier | None = None
self._constant_prob: float | None = None
self._feature_names: List[str] = []
@property
def name(self) -> str:
return "XGBoost"
@property
def is_temporal(self) -> bool:
return False
@staticmethod
def _extract_features(df: pd.DataFrame) -> pd.DataFrame:
"""Causal node-level aggregation from a sorted prefix only."""
leaked = _BLOCKED_COLS & set(df.columns)
assert not leaked, f"Oracle columns leaked into XGBoost: {leaked}"
df = df.sort_values("timestamp").reset_index(drop=True).copy()
df["_td"] = df.groupby("sender_id")["timestamp"].diff().fillna(0.0)
df["_rc10"] = (
df.groupby("sender_id")["timestamp"]
.transform(lambda x: x.rolling(10, min_periods=1).count())
)
grp = df.groupby("sender_id")
feats = pd.DataFrame({
"txn_count": grp["sender_id"].count(),
"txn_cnt10_last": grp["_rc10"].last(),
"amount_mean": grp["amount"].mean(),
"amount_std": grp["amount"].std().fillna(0.0),
"amount_max": grp["amount"].max(),
"td_mean": grp["_td"].mean(),
"td_std": grp["_td"].std().fillna(0.0),
"fail_rate": grp["failed"].mean() if "failed" in df.columns else 0.0,
"retry_rate": grp["is_retry"].mean() if "is_retry" in df.columns else 0.0,
})
pair_counts = (
df.groupby(["sender_id", "receiver_id"])
.size()
.reset_index(name="_n")
)
pair_counts["_tot"] = pair_counts.groupby("sender_id")["_n"].transform("sum")
pair_counts["_p"] = pair_counts["_n"] / pair_counts["_tot"]
pair_counts["_h"] = -pair_counts["_p"] * np.log2(pair_counts["_p"] + 1e-9)
feats["recv_entropy"] = pair_counts.groupby("sender_id")["_h"].sum()
if "pair_freq" in df.columns:
feats["pair_freq_mean"] = grp["pair_freq"].mean()
else:
feats["pair_freq_mean"] = 0.0
return feats.fillna(0.0)
def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None:
"""No-op backbone step; actual supervised fit happens on a training prefix."""
self._model = None
self._constant_prob = None
self._feature_names = []
def train_node_classifier_on_prefix(
self,
df_prefix: pd.DataFrame,
eval_nodes: List[int],
y_labels: np.ndarray,
num_epochs: int = 150,
) -> None:
X = self._extract_features(df_prefix).reindex(eval_nodes).fillna(0.0)
y = np.asarray(y_labels, dtype=np.int64)
self._feature_names = list(X.columns)
if len(np.unique(y)) < 2:
self._model = None
self._constant_prob = float(y.mean()) if len(y) else 0.0
return
scale_pos_weight = max(1.0, float((y == 0).sum()) / max(float((y == 1).sum()), 1.0))
self._model = XGBClassifier(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=0.05,
objective="binary:logistic",
eval_metric="logloss",
scale_pos_weight=scale_pos_weight,
random_state=42,
verbosity=0,
n_jobs=1,
tree_method="exact",
)
self._model.fit(X.values.astype(np.float32), y)
self._constant_prob = None
# Print top-5 feature importances for static shortcut audit
importances = self._model.feature_importances_
ranked = np.argsort(importances)[::-1]
feat_names = list(X.columns)
print(" [XGBoost] Top-5 feature importances:")
for i in ranked[:5]:
print(f" {feat_names[i]:<20}: {importances[i]:.4f}")
def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray:
X_eval = self._extract_features(df_eval).reindex(eval_nodes).fillna(0.0)
if self._constant_prob is not None:
return np.full(len(eval_nodes), self._constant_prob, dtype=np.float32)
assert self._model is not None, "Call train_node_classifier_on_prefix() first."
probs = self._model.predict_proba(X_eval.values.astype(np.float32))[:, 1]
return np.asarray(probs, dtype=np.float32)
def reset_memory(self) -> None:
"""No-op: XGBoost has no temporal memory."""
pass
|