Initial release: XGBoost + MLP for user-risk-tier classification, plus structural-leakage diagnostic on threat-actor detection
e6a6835 verified | """ | |
| feature_engineering.py | |
| ====================== | |
| Feature pipeline for the CYB006 baseline classifier. | |
| Predicts `user_risk_tier` (3-class: low / medium / high) from per-user | |
| identity aggregates on the CYB006 sample dataset. | |
| CSV inputs: | |
| user_risk_summary.csv (primary, per-user aggregates, 200 rows) | |
| login_sessions.csv (per-session telemetry, joined as | |
| per-user behavioural aggregates) | |
| identity_topology.csv (identity domain registry; reserved for | |
| future work - no direct user join key) | |
| auth_events.csv (discrete event log; reserved for | |
| future work) | |
| Target classes (3): | |
| low, medium, high | |
| Why this task instead of threat_actor_capability_tier | |
| ----------------------------------------------------- | |
| The CYB006 README lists "threat-actor tier classification (4-class)" as | |
| its primary suggested use case. We piloted that target first and found | |
| the sample dataset has STRUCTURAL DETERMINISM: every actor-tier signal | |
| in the data (velocity_anomaly_score, session_timestamp, credential | |
| attempt count, login outcome, geo country code, device trust level, | |
| user risk tier itself, geo anomaly score) carries non-overlapping | |
| distributions between threat and legitimate sessions. As a result, a | |
| plain XGBoost achieves 100% test accuracy on threat-actor binary | |
| classification across every random seed - and stays at 97-100% | |
| accuracy even with all six oracle feature groups removed. | |
| This is not a methodological failure; it's a property of how the | |
| sample was generated. Real-world identity telemetry has substantial | |
| overlap between threat-actor and legitimate behaviour. The model card | |
| documents this as a diagnostic finding for the dataset author and a | |
| caveat for buyers planning to train detection models on the sample. | |
| For a working baseline that demonstrates honest ML on the dataset, we | |
| shifted to predicting `user_risk_tier` from per-user aggregates. This | |
| task has overlapping per-tier feature distributions, no oracle features, | |
| and lifts modestly over majority baseline (acc 0.66 vs 0.57 majority). | |
| Public API | |
| ---------- | |
| build_features(user_risk_path, sessions_path) -> (X, y, ids, meta) | |
| transform_single(record, meta) -> np.ndarray | |
| save_meta(meta, path) / load_meta(path) | |
| License | |
| ------- | |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, | |
| matching the dataset license. See README.md. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Label space | |
| # --------------------------------------------------------------------------- | |
| # Ordered low -> high. Note: CYB006 README claims a 4th tier 'critical' but | |
| # the sample data contains only 3 (low, medium, high). | |
| LABEL_ORDER = ["low", "medium", "high"] | |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} | |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} | |
| # --------------------------------------------------------------------------- | |
| # Identifier and target columns | |
| # --------------------------------------------------------------------------- | |
| ID_COLUMNS = ["user_id"] | |
| TARGET_COLUMN = "user_risk_tier" | |
| # --------------------------------------------------------------------------- | |
| # Per-user numeric features from user_risk_summary.csv | |
| # --------------------------------------------------------------------------- | |
| # These are aggregate counts and continuous scores. They carry overlapping | |
| # distributions across tiers - not oracles. | |
| USER_NUMERIC_FEATURES = [ | |
| "total_login_attempts", | |
| "successful_logins", | |
| "failed_logins", | |
| "mfa_failures", | |
| "impossible_travel_events", | |
| "lateral_hop_count", | |
| "privilege_escalations", | |
| "account_lockout_count", | |
| "geo_dispersion_score", | |
| "login_velocity_score", | |
| "session_anomaly_rate", | |
| "ueba_alert_count", | |
| "overall_identity_risk_score", | |
| "insider_threat_indicator_score", | |
| ] | |
| USER_CATEGORICAL_FEATURES = [ | |
| "peak_privilege_level_accessed", # 6 values | |
| ] | |
| # Note: we intentionally exclude `threat_actor_flag`, `account_takeover_flag`, | |
| # and `credential_attack_victim_flag` from user_risk_summary as features. | |
| # threat_actor_flag is a perfect oracle for whether tier=high (only high-tier | |
| # users can be flagged threat actors). account_takeover and credential_attack | |
| # are extremely rare (2/200 and 1/200) - not useful as features in the | |
| # sample, and using them risks the same kind of structural leakage we | |
| # documented for threat-actor classification. | |
| USER_LEAKY_COLUMNS = [ | |
| "threat_actor_flag", | |
| "account_takeover_flag", | |
| "credential_attack_victim_flag", | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Per-session aggregates joined into the user-level row | |
| # --------------------------------------------------------------------------- | |
| # We compute these from login_sessions.csv aggregated by user_id. They add | |
| # behavioural color (avg session duration, fraction of sessions with | |
| # impossible travel, etc.) without introducing leakage. We explicitly | |
| # exclude session-level columns that exhibit non-overlap with threat actors | |
| # (velocity_anomaly_score, session_timestamp_utc, credential_attempt_count, | |
| # login_outcome) because those features create degenerate signal even when | |
| # aggregated, and would compromise the user_risk_tier evaluation by | |
| # enabling shortcuts via the threat_actor_flag-correlated structure. | |
| SESSION_AGGS_NUMERIC = [ | |
| "avg_session_duration_seconds", | |
| "avg_mfa_response_latency_ms", | |
| "avg_geo_anomaly_score", | |
| "max_geo_anomaly_score", | |
| "frac_impossible_travel", | |
| "n_unique_countries", | |
| "n_unique_devices", | |
| "n_unique_applications", | |
| ] | |
| def _aggregate_sessions(sessions: pd.DataFrame) -> pd.DataFrame: | |
| """Compute per-user session aggregates without using leaky features.""" | |
| g = sessions.groupby("user_id") | |
| aggs = pd.DataFrame({ | |
| "avg_session_duration_seconds": g["session_duration_seconds"].mean(), | |
| "avg_mfa_response_latency_ms": g["mfa_response_latency_ms"].mean(), | |
| "avg_geo_anomaly_score": g["geo_anomaly_score"].mean(), | |
| "max_geo_anomaly_score": g["geo_anomaly_score"].max(), | |
| "frac_impossible_travel": g["impossible_travel_flag"].mean(), | |
| "n_unique_countries": g["geo_country_code"].nunique(), | |
| "n_unique_devices": g["device_id_hash"].nunique(), | |
| "n_unique_applications": g["target_application_id"].nunique(), | |
| }).reset_index() | |
| return aggs | |
| # --------------------------------------------------------------------------- | |
| # Engineered features | |
| # --------------------------------------------------------------------------- | |
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Six engineered features that combine the raw aggregates into | |
| risk-discriminative composites. None encode the target directly. | |
| """ | |
| df = df.copy() | |
| # 1. Failed-login fraction. Common signal across all risk tiers but | |
| # high-tier users have systematically more failures. | |
| denom = df["total_login_attempts"].clip(lower=1) | |
| df["failed_login_rate"] = (df["failed_logins"] / denom).astype(float) | |
| # 2. MFA failure rate per login. | |
| df["mfa_failure_rate"] = (df["mfa_failures"] / denom).astype(float) | |
| # 3. UEBA alerts per session - normalizes alert count to session volume. | |
| sess_denom = df["successful_logins"].clip(lower=1) | |
| df["ueba_alerts_per_session"] = (df["ueba_alert_count"] / sess_denom).astype(float) | |
| # 4. Lateral movement intensity (hops per privilege escalation). | |
| pe_denom = df["privilege_escalations"].clip(lower=1) | |
| df["hops_per_escalation"] = (df["lateral_hop_count"] / pe_denom).astype(float) | |
| # 5. Geo-velocity composite: dispersion x velocity score (continuous). | |
| df["geo_velocity_composite"] = ( | |
| df["geo_dispersion_score"] * df["login_velocity_score"] | |
| ).astype(float) | |
| # 6. Composite identity-anomaly score: average of risk + insider scores. | |
| df["composite_anomaly_score"] = ( | |
| (df["overall_identity_risk_score"] + df["insider_threat_indicator_score"]) / 2.0 | |
| ).astype(float) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_features( | |
| user_risk_path: str | Path, | |
| sessions_path: str | Path, | |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: | |
| """ | |
| Load user_risk_summary, join non-leaky session aggregates, engineer | |
| features, one-hot encode, return (X, y, ids, meta). | |
| `ids` is a Series of user_id values aligned with X (used for | |
| deterministic predictions / round-tripping; not a group label since | |
| this task is user-level, not session-level). | |
| """ | |
| users = pd.read_csv(user_risk_path) | |
| sessions = pd.read_csv(sessions_path) | |
| y = users[TARGET_COLUMN].map(LABEL_TO_INT) | |
| if y.isna().any(): | |
| bad = users.loc[y.isna(), TARGET_COLUMN].unique() | |
| raise ValueError(f"Unknown user_risk_tier values: {bad}") | |
| y = y.astype(int) | |
| ids = users["user_id"].copy() | |
| users = users.drop( | |
| columns=ID_COLUMNS + [TARGET_COLUMN] + USER_LEAKY_COLUMNS, | |
| errors="ignore", | |
| ) | |
| session_aggs = _aggregate_sessions(sessions) | |
| users["__user_id__"] = ids | |
| users = users.merge( | |
| session_aggs.rename(columns={"user_id": "__user_id__"}), | |
| on="__user_id__", how="left", | |
| ).drop(columns=["__user_id__"]) | |
| users = _add_engineered_features(users) | |
| numeric_features = ( | |
| USER_NUMERIC_FEATURES | |
| + SESSION_AGGS_NUMERIC | |
| + [ | |
| "failed_login_rate", "mfa_failure_rate", "ueba_alerts_per_session", | |
| "hops_per_escalation", "geo_velocity_composite", "composite_anomaly_score", | |
| ] | |
| ) | |
| numeric_features = [c for c in numeric_features if c in users.columns] | |
| X_numeric = users[numeric_features].astype(float) | |
| categorical_levels: dict[str, list[str]] = {} | |
| blocks: list[pd.DataFrame] = [] | |
| for col in USER_CATEGORICAL_FEATURES: | |
| if col not in users.columns: | |
| continue | |
| levels = sorted(users[col].dropna().unique().tolist()) | |
| categorical_levels[col] = levels | |
| block = pd.get_dummies( | |
| users[col].astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| blocks.append(block) | |
| X = pd.concat( | |
| [X_numeric.reset_index(drop=True)] | |
| + [b.reset_index(drop=True) for b in blocks], | |
| axis=1, | |
| ).fillna(0.0) | |
| meta = { | |
| "feature_names": X.columns.tolist(), | |
| "numeric_features": numeric_features, | |
| "categorical_levels": categorical_levels, | |
| "label_to_int": LABEL_TO_INT, | |
| "int_to_label": INT_TO_LABEL, | |
| "user_leaky_excluded": USER_LEAKY_COLUMNS, | |
| } | |
| return X, y, ids, meta | |
| def transform_single( | |
| record: dict | pd.DataFrame, | |
| meta: dict[str, Any], | |
| ) -> np.ndarray: | |
| """Encode a single per-user record for inference. | |
| Caller is responsible for computing session aggregates (the | |
| SESSION_AGGS_NUMERIC fields) and passing them in record. See the | |
| inference notebook for the standard pattern. | |
| """ | |
| if isinstance(record, dict): | |
| df = pd.DataFrame([record.copy()]) | |
| else: | |
| df = record.copy() | |
| df = _add_engineered_features(df) | |
| numeric = pd.DataFrame({ | |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values | |
| for col in meta["numeric_features"] | |
| }) | |
| blocks: list[pd.DataFrame] = [numeric] | |
| for col, levels in meta["categorical_levels"].items(): | |
| val = df.get(col, pd.Series([None] * len(df))) | |
| block = pd.get_dummies( | |
| val.astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| for lvl in levels: | |
| cname = f"{col}_{lvl}" | |
| if cname not in block.columns: | |
| block[cname] = 0 | |
| block = block[[f"{col}_{lvl}" for lvl in levels]] | |
| blocks.append(block) | |
| X = pd.concat(blocks, axis=1).fillna(0.0) | |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) | |
| return X.values.astype(np.float32) | |
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: | |
| serializable = { | |
| "feature_names": meta["feature_names"], | |
| "numeric_features": meta["numeric_features"], | |
| "categorical_levels": meta["categorical_levels"], | |
| "label_to_int": meta["label_to_int"], | |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, | |
| "user_leaky_excluded": meta.get("user_leaky_excluded", []), | |
| } | |
| with open(path, "w") as f: | |
| json.dump(serializable, f, indent=2) | |
| def load_meta(path: str | Path) -> dict[str, Any]: | |
| with open(path) as f: | |
| meta = json.load(f) | |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} | |
| return meta | |
| def compute_session_aggregates_for_user( | |
| user_sessions: pd.DataFrame, | |
| ) -> dict: | |
| """Compute session aggregates for a single user (used at inference).""" | |
| aggs = { | |
| "avg_session_duration_seconds": float(user_sessions["session_duration_seconds"].mean()), | |
| "avg_mfa_response_latency_ms": float(user_sessions["mfa_response_latency_ms"].mean()), | |
| "avg_geo_anomaly_score": float(user_sessions["geo_anomaly_score"].mean()), | |
| "max_geo_anomaly_score": float(user_sessions["geo_anomaly_score"].max()), | |
| "frac_impossible_travel": float(user_sessions["impossible_travel_flag"].mean()), | |
| "n_unique_countries": int(user_sessions["geo_country_code"].nunique()), | |
| "n_unique_devices": int(user_sessions["device_id_hash"].nunique()), | |
| "n_unique_applications": int(user_sessions["target_application_id"].nunique()), | |
| } | |
| return aggs | |
| if __name__ == "__main__": | |
| import sys | |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") | |
| X, y, ids, meta = build_features( | |
| base / "user_risk_summary.csv", | |
| base / "login_sessions.csv", | |
| ) | |
| print(f"X shape: {X.shape}") | |
| print(f"y shape: {y.shape}") | |
| print(f"n_features: {len(meta['feature_names'])}") | |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") | |
| print(f"X has NaN: {X.isnull().any().any()}") | |