| from __future__ import annotations |
|
|
| from typing import List |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.linear_model import LogisticRegression |
|
|
| from models.base import TemporalModel |
| from src.fraud.fraud_engine import temporal_twin_motif_trace |
|
|
|
|
| def _motif_features_for_user(user_df: pd.DataFrame) -> dict: |
| user_df = user_df.sort_values("timestamp").reset_index(drop=True) |
| n = len(user_df) |
| if n == 0: |
| return { |
| "chain_last": 0.0, |
| "chain_max": 0.0, |
| "motif_last": 0.0, |
| "motif_mean_last8": 0.0, |
| "source_count": 0.0, |
| "source_recent8": 0.0, |
| "source_recent16": 0.0, |
| "source_recent24": 0.0, |
| "last_source_age": 999.0, |
| "quiet_sum": 0.0, |
| "accel_sum": 0.0, |
| "revisit_sum": 0.0, |
| "burst_release_burst": 0.0, |
| "revisit_recent8": 0.0, |
| "brb_recent8": 0.0, |
| "txn_count": 0.0, |
| } |
|
|
| timestamps = user_df["timestamp"].to_numpy(dtype=np.float64) |
| receivers = user_df["receiver_id"].to_numpy(dtype=np.int64) |
| trace = temporal_twin_motif_trace(timestamps, receivers) |
| chain_vals = trace["chain"].tolist() |
| motif_vals = trace["motif_strength"].tolist() |
| source_positions = np.flatnonzero(trace["source"]).tolist() |
| last8 = motif_vals[-8:] if motif_vals else [0.0] |
| recent8_cutoff = max(0, n - 8) |
| recent16_cutoff = max(0, n - 16) |
| recent24_cutoff = max(0, n - 24) |
| last_source_age = float(n - 1 - source_positions[-1]) if source_positions else float(n + 1) |
| return { |
| "chain_last": float(chain_vals[-1]) if chain_vals else 0.0, |
| "chain_max": float(max(chain_vals)) if chain_vals else 0.0, |
| "motif_last": float(motif_vals[-1]) if motif_vals else 0.0, |
| "motif_mean_last8": float(np.mean(last8)), |
| "source_count": float(len(source_positions)), |
| "source_recent8": float(sum(pos >= recent8_cutoff for pos in source_positions)), |
| "source_recent16": float(sum(pos >= recent16_cutoff for pos in source_positions)), |
| "source_recent24": float(sum(pos >= recent24_cutoff for pos in source_positions)), |
| "last_source_age": last_source_age, |
| "quiet_sum": float(np.sum(trace["quiet"])), |
| "accel_sum": float(np.sum(trace["accel"])), |
| "revisit_sum": float(np.sum(trace["revisit"])), |
| "burst_release_burst": float(np.sum(trace["burst_release_burst"])), |
| "revisit_recent8": float(np.sum(trace["revisit"][recent8_cutoff:])), |
| "brb_recent8": float(np.sum(trace["burst_release_burst"][recent8_cutoff:])), |
| "txn_count": float(n), |
| } |
|
|
|
|
| class OracleMotifWrapper(TemporalModel): |
| def __init__(self): |
| self._model: LogisticRegression | None = None |
| self._constant_prob: float | None = None |
| self._feature_cols: list[str] = [] |
| self._mean: np.ndarray | None = None |
| self._std: np.ndarray | None = None |
|
|
| @property |
| def name(self) -> str: |
| return "OracleMotif" |
|
|
| @property |
| def is_temporal(self) -> bool: |
| return True |
|
|
| def fit(self, df_train: pd.DataFrame, num_epochs: int = 3) -> None: |
| self._model = None |
| self._constant_prob = None |
| self._feature_cols = [] |
| self._mean = None |
| self._std = None |
|
|
| @staticmethod |
| def _extract_features(df: pd.DataFrame) -> pd.DataFrame: |
| rows = [] |
| for sender_id, group in df.groupby("sender_id", sort=False): |
| feats = _motif_features_for_user(group) |
| feats["sender_id"] = int(sender_id) |
| rows.append(feats) |
| if not rows: |
| return pd.DataFrame(columns=["sender_id"]) |
| return pd.DataFrame(rows).set_index("sender_id").sort_index() |
|
|
| def train_node_classifier_on_prefix( |
| self, |
| df_prefix: pd.DataFrame, |
| eval_nodes: List[int], |
| y_labels: np.ndarray, |
| num_epochs: int = 150, |
| ) -> None: |
| X = self._extract_features(df_prefix).reindex(eval_nodes).fillna(0.0) |
| y = np.asarray(y_labels, dtype=np.int64) |
| self._feature_cols = list(X.columns) |
|
|
| if len(y) == 0 or len(np.unique(y)) < 2: |
| self._model = None |
| self._constant_prob = float(y.mean()) if len(y) else 0.0 |
| return |
|
|
| x_train = X.to_numpy(dtype=np.float32) |
| self._mean = x_train.mean(axis=0, keepdims=True) |
| self._std = x_train.std(axis=0, keepdims=True) + 1e-6 |
| x_train = (x_train - self._mean) / self._std |
|
|
| self._model = LogisticRegression( |
| max_iter=2000, |
| class_weight="balanced", |
| solver="liblinear", |
| random_state=42, |
| ) |
| self._model.fit(x_train, y) |
| self._constant_prob = None |
|
|
| def predict(self, df_eval: pd.DataFrame, eval_nodes: List[int]) -> np.ndarray: |
| X = self._extract_features(df_eval).reindex(eval_nodes).fillna(0.0) |
| if self._constant_prob is not None: |
| return np.full(len(eval_nodes), self._constant_prob, dtype=np.float32) |
| assert self._model is not None and self._mean is not None and self._std is not None |
| x_eval = (X.to_numpy(dtype=np.float32) - self._mean) / self._std |
| probs = self._model.predict_proba(x_eval)[:, 1] |
| return probs.astype(np.float32) |
|
|
| def reset_memory(self) -> None: |
| pass |
|
|