Initial release: attack_lifecycle_phase 5-class baseline + 11-oracle-path leakage diagnostic
e2c4702 verified | """ | |
| feature_engineering.py | |
| ====================== | |
| Feature pipeline for the CYB010 baseline classifier. | |
| Predicts `attack_lifecycle_phase` (5-class attack phase) from per-event | |
| features on the CYB010 sample dataset. | |
| CSV inputs: | |
| security_events.csv (primary, one row per event, 21,896 events) | |
| host_inventory.csv (per-host registry, joined for host context) | |
| alert_records.csv (per-alert records; reserved) | |
| incident_summary.csv (per-incident summaries; reserved) | |
| Target classes (5): | |
| benign_background, initial_access, lateral_movement, | |
| persistence_establishment, exfiltration_or_impact | |
| Why this task | |
| ------------- | |
| The CYB010 README's central concept is the "5-phase attack lifecycle | |
| state machine", and `attack_lifecycle_phase` is the data's headline | |
| target. We piloted six candidate targets and found it gives the | |
| strongest honest result on the sample (acc 0.95, macro-F1 0.78, | |
| ROC-AUC 0.99 with group-aware split on incident_id). | |
| The other README-suggested targets either have unrecoverable structural | |
| leakage or are weaker after honest leak removal: | |
| - `threat_actor_profile` 5-class works (acc 0.84) but is benign-driven | |
| - 4-class malicious-only collapses to acc 0.57 vs majority 0.61. | |
| - `label_true_positive` on alerts has 9 oracle features; after dropping | |
| all of them, honest acc 0.80, AUC 0.89 (documented as a secondary | |
| finding in leakage_diagnostic.json). | |
| - `mitre_tactic` 14-class hits 0.90 acc but macro-F1 0.37 - imbalance | |
| gaming (benign class dominates at 57%). | |
| - `event_class` 12-class is unlearnable (acc 0.35 vs majority 0.42). | |
| Group structure | |
| --------------- | |
| 500 incidents x ~44 events each. The per-event task has clear group | |
| structure: events from the same incident share host, threat actor, and | |
| phase trajectory. Group-aware split by `incident_id` is required to | |
| prevent train/test contamination. With 500 incidents, ~75 test | |
| incidents per fold gives reasonable estimation precision. | |
| Leakage audit | |
| ------------- | |
| Four columns dropped from features because they're structural oracles | |
| for the target: | |
| 1. `mitre_tactic`: when == "benign", deterministically pins | |
| attack_lifecycle_phase == "benign_background" (12,448 cases - all | |
| benign events). | |
| 2. `mitre_technique_id`: perfect oracle for `mitre_tactic` by ATT&CK | |
| design (54 techniques, each maps to exactly one tactic). Dropped | |
| because it indirectly encodes the benign vs malicious distinction. | |
| 3. `label_malicious`: when False, perfect oracle for | |
| benign_background phase. | |
| 4. `threat_actor_id`: when == "NONE", perfect oracle for benign | |
| profile/phase. The non-"NONE" actor IDs are 10 distinct labels | |
| that would also leak actor profile information indirectly. | |
| 5. `threat_actor_profile`: contains "benign_user" which trivially | |
| identifies benign_background phase. | |
| 6. `event_type`: many event types are phase-specific | |
| (`c2_beacon_outbound` -> 99% exfiltration_or_impact). Dropped to | |
| avoid this near-oracle path. | |
| KEPT features that are informative but NOT oracles: | |
| - `event_class` (12 values): max purity 0.87, mean 0.72 - real signal | |
| with substantial overlap. C2 beacons (network_flow class) hit 65% | |
| exfil phase but also 29% benign. Strong feature, kept. | |
| - `severity_level`, `cvss_score_analogue`: per-event severity is a | |
| real observable, correlates with phase, has overlap. | |
| - `label_log_tampered`: real observable (APTs tamper more), correlates | |
| with malicious phases but not deterministic. | |
| - `log_source_type`, `siem_platform`: not phase-deterministic. | |
| - All host context features. | |
| Public API | |
| ---------- | |
| build_features(events_path, hosts_path) -> (X, y, ids, groups, meta) | |
| transform_single(record, meta, host_lookup=None) -> np.ndarray | |
| save_meta(meta, path) / load_meta(path) | |
| build_host_lookup(hosts_path) -> dict | |
| License | |
| ------- | |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, | |
| matching the dataset license. See README.md. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Label space | |
| # --------------------------------------------------------------------------- | |
| # Ordered by attack progression. | |
| LABEL_ORDER = [ | |
| "benign_background", | |
| "initial_access", | |
| "lateral_movement", | |
| "persistence_establishment", | |
| "exfiltration_or_impact", | |
| ] | |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} | |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} | |
| # --------------------------------------------------------------------------- | |
| # Identifier and target columns | |
| # --------------------------------------------------------------------------- | |
| ID_COLUMNS = [ | |
| "event_id", "host_id", "incident_id", "timestamp", "user_id", | |
| "source_ip", "dest_ip", "raw_log_payload", | |
| ] | |
| TARGET_COLUMN = "attack_lifecycle_phase" | |
| GROUP_COLUMN = "incident_id" | |
| # Oracle columns dropped from features. | |
| ORACLE_COLUMNS = [ | |
| "mitre_tactic", # benign value -> benign_background phase | |
| "mitre_technique_id", # ATT&CK technique -> tactic deterministic | |
| "label_malicious", # False -> benign_background | |
| "threat_actor_id", # NONE -> benign | |
| "threat_actor_profile", # benign_user -> benign_background | |
| "event_type", # many event types phase-specific (e.g. c2_beacon_outbound) | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Per-event numeric features | |
| # --------------------------------------------------------------------------- | |
| EVENT_NUMERIC_FEATURES = [ | |
| "source_port", | |
| "dest_port", | |
| "cvss_score_analogue", | |
| "label_log_tampered", # bool kept as observable | |
| "label_false_positive", # bool kept as observable (all False on events) | |
| ] | |
| EVENT_CATEGORICAL_FEATURES = [ | |
| "event_class", # 12 values | |
| "log_source_type", # 8 values | |
| "severity_level", # 5 values | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Host features (joined on host_id from host_inventory.csv) | |
| # --------------------------------------------------------------------------- | |
| HOST_NUMERIC_FEATURES = [ | |
| "edr_agent_installed", | |
| "patch_compliance_level", | |
| "vulnerability_count_open", | |
| ] | |
| HOST_CATEGORICAL_FEATURES = [ | |
| "os_type", # 7 values | |
| "host_role", # 10 values | |
| "network_segment", # 8 values | |
| "defender_posture_tier", # 4 values | |
| "criticality_rating", # 4 values | |
| "cloud_provider", # 4 values | |
| "siem_platform", # 8 values | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Engineered features | |
| # --------------------------------------------------------------------------- | |
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Six engineered features encoding phase-discriminative hypotheses. | |
| Each composite is something a SOC analyst would compute by hand. | |
| """ | |
| df = df.copy() | |
| # 1. Hour of day (0-23) from timestamp, if available | |
| if "timestamp" in df.columns: | |
| ts = pd.to_datetime(df["timestamp"], errors="coerce") | |
| df["hour_of_day"] = ts.dt.hour.fillna(12).astype(int) | |
| df["is_off_hours"] = ((ts.dt.hour < 9) | (ts.dt.hour > 17)).fillna(False).astype(int) | |
| df["is_weekend"] = (ts.dt.weekday >= 5).fillna(False).astype(int) | |
| else: | |
| df["hour_of_day"] = 12 | |
| df["is_off_hours"] = 0 | |
| df["is_weekend"] = 0 | |
| # 2. Log-scaled CVSS (heavy-tailed) | |
| df["log_cvss"] = np.log1p( | |
| df.get("cvss_score_analogue", 0).clip(lower=0) | |
| ).astype(float) | |
| # 3. High-CVSS indicator | |
| df["is_high_cvss"] = ( | |
| df.get("cvss_score_analogue", 0) >= 7.0 | |
| ).astype(int) | |
| # 4. Port category: well-known (<1024) vs registered vs dynamic | |
| dest = df.get("dest_port", 0).fillna(0).astype(int) | |
| df["is_well_known_port"] = (dest < 1024).astype(int) | |
| df["is_dynamic_port"] = (dest >= 49152).astype(int) | |
| # 5. Network direction: same-network if source_port equals dest_port | |
| # OR if specific dest_port matches common service. Rough proxy. | |
| df["is_outbound_web"] = (dest.isin([80, 443, 8080, 8443])).astype(int) | |
| # 6. Risk composite: CVSS x defender_weakness. Higher composite -> later phase. | |
| if "patch_compliance_level" in df.columns: | |
| df["risk_composite"] = ( | |
| df["cvss_score_analogue"].fillna(0) * | |
| (1 - df["patch_compliance_level"].fillna(1)) | |
| ).astype(float) | |
| else: | |
| df["risk_composite"] = 0.0 | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_features( | |
| events_path: str | Path, | |
| hosts_path: str | Path, | |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]: | |
| """ | |
| Load security_events.csv, join host_inventory.csv, drop target + | |
| identifiers + oracle columns, engineer features, one-hot encode, | |
| return (X, y, ids, groups, meta). | |
| """ | |
| events = pd.read_csv(events_path) | |
| hosts = pd.read_csv(hosts_path) | |
| y = events[TARGET_COLUMN].map(LABEL_TO_INT) | |
| if y.isna().any(): | |
| bad = events.loc[y.isna(), TARGET_COLUMN].unique() | |
| raise ValueError(f"Unknown attack_lifecycle_phase values: {bad}") | |
| y = y.astype(int) | |
| ids = events["event_id"].copy() | |
| groups = events[GROUP_COLUMN].copy() | |
| host_cols_needed = ( | |
| ["host_id"] + HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES | |
| ) | |
| events = events.merge( | |
| hosts[host_cols_needed], on="host_id", how="left", | |
| ) | |
| # Apply engineered features BEFORE dropping timestamp | |
| events = _add_engineered_features(events) | |
| events = events.drop( | |
| columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS, | |
| errors="ignore", | |
| ) | |
| numeric_features = ( | |
| EVENT_NUMERIC_FEATURES | |
| + HOST_NUMERIC_FEATURES | |
| + [ | |
| "hour_of_day", "is_off_hours", "is_weekend", | |
| "log_cvss", "is_high_cvss", | |
| "is_well_known_port", "is_dynamic_port", "is_outbound_web", | |
| "risk_composite", | |
| ] | |
| ) | |
| numeric_features = [c for c in numeric_features if c in events.columns] | |
| X_numeric = events[numeric_features].apply( | |
| lambda s: s.astype(float) if s.dtype != bool else s.astype(int).astype(float) | |
| ) | |
| all_categorical = EVENT_CATEGORICAL_FEATURES + HOST_CATEGORICAL_FEATURES | |
| categorical_levels: dict[str, list[str]] = {} | |
| blocks: list[pd.DataFrame] = [] | |
| for col in all_categorical: | |
| if col not in events.columns: | |
| continue | |
| levels = sorted(events[col].dropna().astype(str).unique().tolist()) | |
| categorical_levels[col] = levels | |
| block = pd.get_dummies( | |
| events[col].astype(str).astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| blocks.append(block) | |
| X = pd.concat( | |
| [X_numeric.reset_index(drop=True)] | |
| + [b.reset_index(drop=True) for b in blocks], | |
| axis=1, | |
| ).fillna(0.0) | |
| meta = { | |
| "feature_names": X.columns.tolist(), | |
| "numeric_features": numeric_features, | |
| "categorical_levels": categorical_levels, | |
| "label_to_int": LABEL_TO_INT, | |
| "int_to_label": INT_TO_LABEL, | |
| "oracle_excluded": ORACLE_COLUMNS, | |
| } | |
| return X, y, ids, groups, meta | |
| def transform_single( | |
| record: dict | pd.DataFrame, | |
| meta: dict[str, Any], | |
| host_lookup: dict | None = None, | |
| ) -> np.ndarray: | |
| """Encode a single event record for inference.""" | |
| if isinstance(record, dict): | |
| df = pd.DataFrame([record.copy()]) | |
| else: | |
| df = record.copy() | |
| if host_lookup is not None and "host_id" in df.columns: | |
| host_id = df["host_id"].iloc[0] | |
| host_feats = host_lookup.get(host_id, {}) | |
| for k, v in host_feats.items(): | |
| if k not in df.columns: | |
| df[k] = v | |
| df = _add_engineered_features(df) | |
| numeric = pd.DataFrame() | |
| for col in meta["numeric_features"]: | |
| s = df.get(col, pd.Series([0.0] * len(df))) | |
| if s.dtype == bool: | |
| s = s.astype(int) | |
| numeric[col] = s.astype(float).values | |
| blocks: list[pd.DataFrame] = [numeric] | |
| for col, levels in meta["categorical_levels"].items(): | |
| val = df.get(col, pd.Series([None] * len(df))).astype(str) | |
| block = pd.get_dummies( | |
| val.astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| for lvl in levels: | |
| cname = f"{col}_{lvl}" | |
| if cname not in block.columns: | |
| block[cname] = 0 | |
| block = block[[f"{col}_{lvl}" for lvl in levels]] | |
| blocks.append(block) | |
| X = pd.concat(blocks, axis=1).fillna(0.0) | |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) | |
| return X.values.astype(np.float32) | |
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: | |
| serializable = { | |
| "feature_names": meta["feature_names"], | |
| "numeric_features": meta["numeric_features"], | |
| "categorical_levels": meta["categorical_levels"], | |
| "label_to_int": meta["label_to_int"], | |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, | |
| "oracle_excluded": meta.get("oracle_excluded", []), | |
| } | |
| with open(path, "w") as f: | |
| json.dump(serializable, f, indent=2) | |
| def load_meta(path: str | Path) -> dict[str, Any]: | |
| with open(path) as f: | |
| meta = json.load(f) | |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} | |
| return meta | |
| def build_host_lookup(hosts_path: str | Path) -> dict[str, dict]: | |
| """Build {host_id: {host feature values}} for inference-time lookup.""" | |
| hosts = pd.read_csv(hosts_path) | |
| cols = HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES | |
| out = {} | |
| for _, row in hosts.iterrows(): | |
| out[row["host_id"]] = {c: row[c] for c in cols if c in hosts.columns} | |
| return out | |
| if __name__ == "__main__": | |
| import sys | |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") | |
| X, y, ids, groups, meta = build_features( | |
| base / "security_events.csv", | |
| base / "host_inventory.csv", | |
| ) | |
| print(f"X shape: {X.shape}") | |
| print(f"y shape: {y.shape}") | |
| print(f"groups: {groups.nunique()} unique incidents") | |
| print(f"n_features: {len(meta['feature_names'])}") | |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") | |
| print(f"X has NaN: {X.isnull().any().any()}") | |