""" feature_engineering.py ====================== Feature pipeline for the CYB006 baseline classifier. Predicts `user_risk_tier` (3-class: low / medium / high) from per-user identity aggregates on the CYB006 sample dataset. CSV inputs: user_risk_summary.csv (primary, per-user aggregates, 200 rows) login_sessions.csv (per-session telemetry, joined as per-user behavioural aggregates) identity_topology.csv (identity domain registry; reserved for future work - no direct user join key) auth_events.csv (discrete event log; reserved for future work) Target classes (3): low, medium, high Why this task instead of threat_actor_capability_tier ----------------------------------------------------- The CYB006 README lists "threat-actor tier classification (4-class)" as its primary suggested use case. We piloted that target first and found the sample dataset has STRUCTURAL DETERMINISM: every actor-tier signal in the data (velocity_anomaly_score, session_timestamp, credential attempt count, login outcome, geo country code, device trust level, user risk tier itself, geo anomaly score) carries non-overlapping distributions between threat and legitimate sessions. As a result, a plain XGBoost achieves 100% test accuracy on threat-actor binary classification across every random seed - and stays at 97-100% accuracy even with all six oracle feature groups removed. This is not a methodological failure; it's a property of how the sample was generated. Real-world identity telemetry has substantial overlap between threat-actor and legitimate behaviour. The model card documents this as a diagnostic finding for the dataset author and a caveat for buyers planning to train detection models on the sample. For a working baseline that demonstrates honest ML on the dataset, we shifted to predicting `user_risk_tier` from per-user aggregates. This task has overlapping per-tier feature distributions, no oracle features, and lifts modestly over majority baseline (acc 0.66 vs 0.57 majority). Public API ---------- build_features(user_risk_path, sessions_path) -> (X, y, ids, meta) transform_single(record, meta) -> np.ndarray save_meta(meta, path) / load_meta(path) License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Ordered low -> high. Note: CYB006 README claims a 4th tier 'critical' but # the sample data contains only 3 (low, medium, high). LABEL_ORDER = ["low", "medium", "high"] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns # --------------------------------------------------------------------------- ID_COLUMNS = ["user_id"] TARGET_COLUMN = "user_risk_tier" # --------------------------------------------------------------------------- # Per-user numeric features from user_risk_summary.csv # --------------------------------------------------------------------------- # These are aggregate counts and continuous scores. They carry overlapping # distributions across tiers - not oracles. USER_NUMERIC_FEATURES = [ "total_login_attempts", "successful_logins", "failed_logins", "mfa_failures", "impossible_travel_events", "lateral_hop_count", "privilege_escalations", "account_lockout_count", "geo_dispersion_score", "login_velocity_score", "session_anomaly_rate", "ueba_alert_count", "overall_identity_risk_score", "insider_threat_indicator_score", ] USER_CATEGORICAL_FEATURES = [ "peak_privilege_level_accessed", # 6 values ] # Note: we intentionally exclude `threat_actor_flag`, `account_takeover_flag`, # and `credential_attack_victim_flag` from user_risk_summary as features. # threat_actor_flag is a perfect oracle for whether tier=high (only high-tier # users can be flagged threat actors). account_takeover and credential_attack # are extremely rare (2/200 and 1/200) - not useful as features in the # sample, and using them risks the same kind of structural leakage we # documented for threat-actor classification. USER_LEAKY_COLUMNS = [ "threat_actor_flag", "account_takeover_flag", "credential_attack_victim_flag", ] # --------------------------------------------------------------------------- # Per-session aggregates joined into the user-level row # --------------------------------------------------------------------------- # We compute these from login_sessions.csv aggregated by user_id. They add # behavioural color (avg session duration, fraction of sessions with # impossible travel, etc.) without introducing leakage. We explicitly # exclude session-level columns that exhibit non-overlap with threat actors # (velocity_anomaly_score, session_timestamp_utc, credential_attempt_count, # login_outcome) because those features create degenerate signal even when # aggregated, and would compromise the user_risk_tier evaluation by # enabling shortcuts via the threat_actor_flag-correlated structure. SESSION_AGGS_NUMERIC = [ "avg_session_duration_seconds", "avg_mfa_response_latency_ms", "avg_geo_anomaly_score", "max_geo_anomaly_score", "frac_impossible_travel", "n_unique_countries", "n_unique_devices", "n_unique_applications", ] def _aggregate_sessions(sessions: pd.DataFrame) -> pd.DataFrame: """Compute per-user session aggregates without using leaky features.""" g = sessions.groupby("user_id") aggs = pd.DataFrame({ "avg_session_duration_seconds": g["session_duration_seconds"].mean(), "avg_mfa_response_latency_ms": g["mfa_response_latency_ms"].mean(), "avg_geo_anomaly_score": g["geo_anomaly_score"].mean(), "max_geo_anomaly_score": g["geo_anomaly_score"].max(), "frac_impossible_travel": g["impossible_travel_flag"].mean(), "n_unique_countries": g["geo_country_code"].nunique(), "n_unique_devices": g["device_id_hash"].nunique(), "n_unique_applications": g["target_application_id"].nunique(), }).reset_index() return aggs # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Six engineered features that combine the raw aggregates into risk-discriminative composites. None encode the target directly. """ df = df.copy() # 1. Failed-login fraction. Common signal across all risk tiers but # high-tier users have systematically more failures. denom = df["total_login_attempts"].clip(lower=1) df["failed_login_rate"] = (df["failed_logins"] / denom).astype(float) # 2. MFA failure rate per login. df["mfa_failure_rate"] = (df["mfa_failures"] / denom).astype(float) # 3. UEBA alerts per session - normalizes alert count to session volume. sess_denom = df["successful_logins"].clip(lower=1) df["ueba_alerts_per_session"] = (df["ueba_alert_count"] / sess_denom).astype(float) # 4. Lateral movement intensity (hops per privilege escalation). pe_denom = df["privilege_escalations"].clip(lower=1) df["hops_per_escalation"] = (df["lateral_hop_count"] / pe_denom).astype(float) # 5. Geo-velocity composite: dispersion x velocity score (continuous). df["geo_velocity_composite"] = ( df["geo_dispersion_score"] * df["login_velocity_score"] ).astype(float) # 6. Composite identity-anomaly score: average of risk + insider scores. df["composite_anomaly_score"] = ( (df["overall_identity_risk_score"] + df["insider_threat_indicator_score"]) / 2.0 ).astype(float) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( user_risk_path: str | Path, sessions_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load user_risk_summary, join non-leaky session aggregates, engineer features, one-hot encode, return (X, y, ids, meta). `ids` is a Series of user_id values aligned with X (used for deterministic predictions / round-tripping; not a group label since this task is user-level, not session-level). """ users = pd.read_csv(user_risk_path) sessions = pd.read_csv(sessions_path) y = users[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = users.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown user_risk_tier values: {bad}") y = y.astype(int) ids = users["user_id"].copy() users = users.drop( columns=ID_COLUMNS + [TARGET_COLUMN] + USER_LEAKY_COLUMNS, errors="ignore", ) session_aggs = _aggregate_sessions(sessions) users["__user_id__"] = ids users = users.merge( session_aggs.rename(columns={"user_id": "__user_id__"}), on="__user_id__", how="left", ).drop(columns=["__user_id__"]) users = _add_engineered_features(users) numeric_features = ( USER_NUMERIC_FEATURES + SESSION_AGGS_NUMERIC + [ "failed_login_rate", "mfa_failure_rate", "ueba_alerts_per_session", "hops_per_escalation", "geo_velocity_composite", "composite_anomaly_score", ] ) numeric_features = [c for c in numeric_features if c in users.columns] X_numeric = users[numeric_features].astype(float) categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col in USER_CATEGORICAL_FEATURES: if col not in users.columns: continue levels = sorted(users[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( users[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "user_leaky_excluded": USER_LEAKY_COLUMNS, } return X, y, ids, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], ) -> np.ndarray: """Encode a single per-user record for inference. Caller is responsible for computing session aggregates (the SESSION_AGGS_NUMERIC fields) and passing them in record. See the inference notebook for the standard pattern. """ if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "user_leaky_excluded": meta.get("user_leaky_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def compute_session_aggregates_for_user( user_sessions: pd.DataFrame, ) -> dict: """Compute session aggregates for a single user (used at inference).""" aggs = { "avg_session_duration_seconds": float(user_sessions["session_duration_seconds"].mean()), "avg_mfa_response_latency_ms": float(user_sessions["mfa_response_latency_ms"].mean()), "avg_geo_anomaly_score": float(user_sessions["geo_anomaly_score"].mean()), "max_geo_anomaly_score": float(user_sessions["geo_anomaly_score"].max()), "frac_impossible_travel": float(user_sessions["impossible_travel_flag"].mean()), "n_unique_countries": int(user_sessions["geo_country_code"].nunique()), "n_unique_devices": int(user_sessions["device_id_hash"].nunique()), "n_unique_applications": int(user_sessions["target_application_id"].nunique()), } return aggs if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, ids, meta = build_features( base / "user_risk_summary.csv", base / "login_sessions.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"n_features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")