File size: 5,468 Bytes
a3682cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
models/xgboost_model.py
=======================
Leakage-free XGBoost baseline trained on causal node-prefix features.

The baseline intentionally uses the real `xgboost.XGBClassifier` only.
It does not rely on multiprocessing or sklearn substitutes.
"""

from __future__ import annotations

from typing import List

import numpy as np
import pandas as pd
from xgboost import XGBClassifier

from models.base import TemporalModel

# Columns that must never reach a learned baseline
_BLOCKED_COLS = frozenset({
    "motif_hit_count", "motif_source", "trigger_event_idx", "label_event_idx",
    "label_delay", "is_fallback_label", "fraud_source",
    "twin_role", "twin_label", "twin_pair_id", "template_id",
    "dynamic_fraud_state", "motif_chain_state", "motif_strength",
})



class XGBoostWrapper(TemporalModel):
    """XGBoost baseline with node-level prefix aggregates."""

    def __init__(self, n_estimators: int = 200, max_depth: int = 6):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self._model: XGBClassifier | None = None
        self._constant_prob: float | None = None
        self._feature_names: List[str] = []

    @property
    def name(self) -> str:
        return "XGBoost"

    @property
    def is_temporal(self) -> bool:
        return False

    @staticmethod
    def _extract_features(df: pd.DataFrame) -> pd.DataFrame:
        """Causal node-level aggregation from a sorted prefix only."""
        leaked = _BLOCKED_COLS & set(df.columns)
        assert not leaked, f"Oracle columns leaked into XGBoost: {leaked}"

        df = df.sort_values("timestamp").reset_index(drop=True).copy()
        df["_td"] = df.groupby("sender_id")["timestamp"].diff().fillna(0.0)
        df["_rc10"] = (
            df.groupby("sender_id")["timestamp"]
            .transform(lambda x: x.rolling(10, min_periods=1).count())
        )

        grp = df.groupby("sender_id")
        feats = pd.DataFrame({
            "txn_count": grp["sender_id"].count(),
            "txn_cnt10_last": grp["_rc10"].last(),
            "amount_mean": grp["amount"].mean(),
            "amount_std": grp["amount"].std().fillna(0.0),
            "amount_max": grp["amount"].max(),
            "td_mean": grp["_td"].mean(),
            "td_std": grp["_td"].std().fillna(0.0),
            "fail_rate": grp["failed"].mean() if "failed" in df.columns else 0.0,
            "retry_rate": grp["is_retry"].mean() if "is_retry" in df.columns else 0.0,
        })

        pair_counts = (
            df.groupby(["sender_id", "receiver_id"])
            .size()
            .reset_index(name="_n")
        )
        pair_counts["_tot"] = pair_counts.groupby("sender_id")["_n"].transform("sum")
        pair_counts["_p"] = pair_counts["_n"] / pair_counts["_tot"]
        pair_counts["_h"] = -pair_counts["_p"] * np.log2(pair_counts["_p"] + 1e-9)
        feats["recv_entropy"] = pair_counts.groupby("sender_id")["_h"].sum()

        if "pair_freq" in df.columns:
            feats["pair_freq_mean"] = grp["pair_freq"].mean()
        else:
            feats["pair_freq_mean"] = 0.0

        return feats.fillna(0.0)


    def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None:
        """No-op backbone step; actual supervised fit happens on a training prefix."""
        self._model = None
        self._constant_prob = None
        self._feature_names = []

    def train_node_classifier_on_prefix(
        self,
        df_prefix: pd.DataFrame,
        eval_nodes: List[int],
        y_labels: np.ndarray,
        num_epochs: int = 150,
    ) -> None:
        X = self._extract_features(df_prefix).reindex(eval_nodes).fillna(0.0)
        y = np.asarray(y_labels, dtype=np.int64)
        self._feature_names = list(X.columns)

        if len(np.unique(y)) < 2:
            self._model = None
            self._constant_prob = float(y.mean()) if len(y) else 0.0
            return

        scale_pos_weight = max(1.0, float((y == 0).sum()) / max(float((y == 1).sum()), 1.0))
        self._model = XGBClassifier(
            n_estimators=self.n_estimators,
            max_depth=self.max_depth,
            learning_rate=0.05,
            objective="binary:logistic",
            eval_metric="logloss",
            scale_pos_weight=scale_pos_weight,
            random_state=42,
            verbosity=0,
            n_jobs=1,
            tree_method="exact",
        )
        self._model.fit(X.values.astype(np.float32), y)
        self._constant_prob = None

        # Print top-5 feature importances for static shortcut audit
        importances = self._model.feature_importances_
        ranked = np.argsort(importances)[::-1]
        feat_names = list(X.columns)
        print("  [XGBoost] Top-5 feature importances:")
        for i in ranked[:5]:
            print(f"    {feat_names[i]:<20}: {importances[i]:.4f}")


    def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray:
        X_eval = self._extract_features(df_eval).reindex(eval_nodes).fillna(0.0)
        if self._constant_prob is not None:
            return np.full(len(eval_nodes), self._constant_prob, dtype=np.float32)
        assert self._model is not None, "Call train_node_classifier_on_prefix() first."
        probs = self._model.predict_proba(X_eval.values.astype(np.float32))[:, 1]
        return np.asarray(probs, dtype=np.float32)

    def reset_memory(self) -> None:
        """No-op: XGBoost has no temporal memory."""
        pass