""" feature_engineering.py ====================== Feature pipeline for the CYB007 baseline classifier. Predicts `actor_threat_type` (3-class: negligent_user / malicious_employee / privileged_insider) from per-timestep insider threat trajectory data on the CYB007 sample dataset. CSV inputs: insider_trajectories.csv (primary, per-timestep, 500 incidents x 65 timesteps = 32,500 rows) incident_summary.csv (per-incident aggregates; reserved for future work) incident_events.csv (discrete incident event log; reserved for future work - 191 collusion records out of 38,687 events) org_topology.csv (per-department defender configuration; joinable to events but not directly to per-timestep trajectories without a department key on the trajectory row) Target classes (3): negligent_user, malicious_employee, privileged_insider The CYB007 README claims 4 actor tiers (adds compromised_account) but the sample data contains only 3. We train on the 3 that exist. Sample-size note ---------------- 500 incidents with 65 timesteps each is the same volume profile as CYB005 (500 campaigns × 75 timesteps). At this scale, group-aware splitting yields ~75 test incidents (~11-25 per tier), which is enough to learn tier attribution honestly. CYB003/4/6 pivoted away from the README's stated tier-attribution headline because their samples had only 100 groups; CYB007 ships the headline use case. Leakage audit ------------- Two features have strongly tier-correlated means but with substantial distributional overlap: - data_access_volume_mb: privileged 0-2541, malicious 0-328, negligent 0-88. Overlap region [0, 88] covers most timesteps for all three tiers (median ~9 MB each). Real observable, not oracle. KEPT. - exfiltration_volume_mb_cumulative: similar shape, overlap [0, ~5]. Real observable. KEPT. Removing both features drops accuracy from 0.85 to 0.47 (below majority). This confirms they are not oracles - they carry legitimate discriminative signal that defines what privileged_insider means. `detection_outcome` is near-oracle for incident_phase (purity 0.79, max 1.00 for reconnaissance). For TIER prediction it has no oracle relationship (purity vs tier is uniform around 0.50). KEPT. No columns dropped for this task. Public API ---------- build_features(trajectories_path) -> (X, y, groups, meta) transform_single(record, meta) -> np.ndarray save_meta(meta, path) / load_meta(path) License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Ordered roughly by access/sophistication. The CYB007 README claims a 4th # tier 'compromised_account' but the sample data contains only 3. LABEL_ORDER = [ "negligent_user", "malicious_employee", "privileged_insider", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns # --------------------------------------------------------------------------- ID_COLUMNS = ["incident_id", "actor_id"] TARGET_COLUMN = "actor_threat_type" # No columns dropped for leakage. See module docstring's "Leakage audit". LEAKY_COLUMNS: list[str] = [] # --------------------------------------------------------------------------- # Per-timestep numeric features # --------------------------------------------------------------------------- DIRECT_NUMERIC_TIMESTEP_FEATURES = [ "timestep", # position in 65-step lifecycle "data_access_volume_mb", "privilege_event_count", "communication_anomaly_score", "dlp_confidence_score", "exfiltration_volume_mb_cumulative", "behavioural_risk_score", ] # Per-timestep categoricals to one-hot CATEGORICAL_TIMESTEP_FEATURES = [ "incident_phase", # 8 values "detection_outcome", # 4 values "target_data_sensitivity_tier", # 3 values ] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Six engineered features encoding tier-discriminative hypotheses. Each composite would be computed by a security analyst by hand. """ df = df.copy() # 1. Log-scaled data volume. data_access_volume_mb is heavy-tailed # (median ~9 MB, max ~2541 MB for privileged insiders). log1p # compresses for both XGBoost and MLP. df["log_data_volume"] = np.log1p( df["data_access_volume_mb"].clip(lower=0) ).astype(float) # 2. Log-scaled cumulative exfiltration. Same heavy-tail shape. df["log_cumulative_exfil"] = np.log1p( df["exfiltration_volume_mb_cumulative"].clip(lower=0) ).astype(float) # 3. Exfil velocity: cumulative exfil per timestep elapsed. # High = aggressive exfiltration; low = patient or accidental. df["exfil_velocity"] = ( df["exfiltration_volume_mb_cumulative"] / df["timestep"].clip(lower=1) ).astype(float) # 4. Privileged event indicator. privilege_event_count > 0 marks # timesteps with privileged operations. Strong privileged_insider # signature. df["is_privileged_event"] = (df["privilege_event_count"] > 0).astype(int) # 5. Risk x DLP composite. Combines behavioural risk score with # DLP confidence - high values indicate both behavioural anomaly # AND DLP-recognised risk pattern. df["risk_x_dlp_composite"] = ( df["behavioural_risk_score"] * df["dlp_confidence_score"] ).astype(float) # 6. Late-stage indicator. Timesteps after 40 sit in cover_tracks / # incident_resolution / late exfiltration_attempt; tier signal # differs across these late phases. df["is_late_stage"] = (df["timestep"] > 40).astype(int) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( trajectories_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load CSV, drop target + identifiers, engineer features, one-hot encode, return (X, y, groups, meta). `groups` is a Series of incident_id values aligned with X. Use it with GroupShuffleSplit / GroupKFold so train and test sets contain disjoint incidents - each incident generates 65 highly-correlated timesteps. """ traj = pd.read_csv(trajectories_path) y = traj[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = traj.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown actor_threat_type values: {bad}") y = y.astype(int) groups = traj["incident_id"].copy() traj = traj.drop( columns=ID_COLUMNS + [TARGET_COLUMN] + LEAKY_COLUMNS, errors="ignore", ) traj = _add_engineered_features(traj) numeric_features = ( DIRECT_NUMERIC_TIMESTEP_FEATURES + [ "log_data_volume", "log_cumulative_exfil", "exfil_velocity", "is_privileged_event", "risk_x_dlp_composite", "is_late_stage", ] ) X_numeric = traj[numeric_features].astype(float) categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col in CATEGORICAL_TIMESTEP_FEATURES: if col not in traj.columns: continue levels = sorted(traj[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( traj[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "leakage_excluded": LEAKY_COLUMNS, } return X, y, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], ) -> np.ndarray: """Encode a single timestep record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "leakage_excluded": meta.get("leakage_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, groups, meta = build_features(base / "insider_trajectories.csv") print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} incidents") print(f"n_features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")