File size: 4,002 Bytes
a3682cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
models/base.py
==============
Abstract base class for all temporal fraud models.

All models MUST:
  - Accept a raw DataFrame event stream (sorted by timestamp)
  - Maintain internal memory (or not, for static models)
  - Return node-level fraud probabilities for a specified set of eval_nodes
  - Support reset_memory() for temporal ablation experiments
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List

import numpy as np
import pandas as pd


class TemporalModel(ABC):
    """
    Unified interface for all temporal and static fraud detection models.

    Data contract
    -------------
    df_train / df_eval must contain at minimum:
        sender_id    int   — source node
        receiver_id  int   — destination node
        timestamp    float — unix seconds, sorted ascending
        is_fraud     int   — edge-level binary label (0/1)
        dynamic_fraud_state  float — hidden EMA state (available for mechanistic analysis but
                                     MUST NOT be used as a feature)

    All models receive the complete DataFrame so they can build any internal
    features they need. Models are responsible for respecting the data leakage
    constraint (no dynamic_fraud_state in features).
    """

    # ------------------------------------------------------------------ #
    # Abstract interface                                                   #
    # ------------------------------------------------------------------ #

    @property
    @abstractmethod
    def name(self) -> str:
        """Human-readable model identifier used in CSV/plot outputs."""

    @abstractmethod
    def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None:
        """
        Train on chronologically ordered event stream.

        Parameters
        ----------
        df_train : pd.DataFrame
            All events available for training (sorted by timestamp).
        num_epochs : int
            Number of passes over the training data.
        """

    @abstractmethod
    def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray:
        """
        Return fraud probability scores for eval_nodes.

        The model may perform a warm-up memory pass over df_eval events
        (reading timestamps/IDs only — NOT fraud labels) before scoring.

        Parameters
        ----------
        df_eval : pd.DataFrame
            Events in the evaluation window.
        eval_nodes : List[int]
            Sender IDs of nodes to score, in order.

        Returns
        -------
        probs : np.ndarray, shape (len(eval_nodes),), dtype float32
            Fraud probability in [0, 1] for each node.
        """

    @abstractmethod
    def reset_memory(self) -> None:
        """
        Zero out all internal memory / hidden states.

        Used in the temporal ablation experiment to measure how much
        the model relies on accumulated temporal history vs. static structure.
        For static models (XGBoost, StaticGNN) this is a no-op.
        """

    # ------------------------------------------------------------------ #
    # Optional properties                                                  #
    # ------------------------------------------------------------------ #

    @property
    def is_temporal(self) -> bool:
        """True for models that maintain temporal memory across events."""
        return True

    # ------------------------------------------------------------------ #
    # Shared helpers                                                       #
    # ------------------------------------------------------------------ #

    @staticmethod
    def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float:
        """ROC-AUC that returns 0.5 when only one class is present."""
        from sklearn.metrics import roc_auc_score
        if len(np.unique(y_true)) < 2:
            return 0.5
        return float(roc_auc_score(y_true, y_score))