""" feature_engineering.py ====================== Feature pipeline for the CYB002 baseline classifier. Predicts `kill_chain_phase` (10-class) from event + segment-level observables on the CYB002 sample dataset. CSV inputs: attack_events.csv (primary, one row per timestep-level action) network_topology.csv (asset-level inventory; aggregated to segment level before joining on target_segment_id) campaign_summary.csv (reserved for future work, not used in v1) campaign_events.csv (reserved for future work, not used in v1) Target classes: dwell_idle, reconnaissance, initial_access, execution, persistence, privilege_escalation, lateral_movement, collection, exfiltration, impact This corresponds to the README's first listed use case: predicting the next ATT&CK phase from observable features. The challenge is that three fields perfectly determine phase by construction: - technique_id -> 62 of 63 techniques map 1:1 to a single phase - technique_name -> 1:1 with technique_id - tactic_category -> direct alias of phase These are dropped before feature assembly. Phase is predicted from: timestep position (recon mean=6, impact mean=66), target asset type, protocol/port, byte volumes, connection duration, auth-failure count, process-injection / lateral-hop counts, attacker tier vs defender maturity, and segment-level topology aggregates. Public API ---------- build_features(attack_events_path, topology_path, campaign_summary_path=None) -> (X, y, groups, meta) transform_single(record, meta, segment_aggregates=None) -> np.ndarray save_meta(meta, path) / load_meta(path) build_segment_lookup(topology_path) -> dict License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # The 10 phases observed in the sample. dwell_idle is a no-op step # between actions; technique_id=T0000, tactic_category=NaN. Ordering # follows tactic flow for readability; CE-loss doesn't care. LABEL_ORDER = [ "dwell_idle", "reconnaissance", "initial_access", "execution", "persistence", "privilege_escalation", "lateral_movement", "collection", "exfiltration", "impact", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Columns dropped because they leak the target (kill_chain_phase) # --------------------------------------------------------------------------- # `technique_id`: 62 of 63 ATT&CK techniques map 1:1 to a single phase. # T1078 Valid Accounts is the one shared technique (appears in both # initial_access and persistence, which is correct ATT&CK behavior). # Including technique_id as a feature is effectively label memorization. # # `technique_name`: 1:1 alias of technique_id (63 unique values each). # # `tactic_category`: direct alias of kill_chain_phase; the two columns # carry identical information except tactic_category is null for # dwell_idle steps. Drop. LEAKY_COLUMNS = [ "technique_id", "technique_name", "tactic_category", ] # --------------------------------------------------------------------------- # Columns kept as features # --------------------------------------------------------------------------- DIRECT_NUMERIC_EVENT_FEATURES = [ "timestep", # strong signal: recon mean=6, impact mean=66 "dest_port", "bytes_transferred", "connection_duration_s", "auth_failure_count", "process_injection_flag", "lateral_hop_count", "c2_beacon_interval_s", # null-aware; filled with -1 + has_c2_beacon flag # Detection-related fields. These are POST-HOC observables from the # SOC's perspective. We keep them as features because in the realistic # phase-prediction use case, a SOC analyst has just seen an action and # its initial detection outcome, and is trying to reason about which # phase the campaign is in. Buyers who want a strictly pre-detection # model can drop these four columns and retrain. "edr_blocked_flag", "siem_rule_triggered", ] CATEGORICAL_EVENT_FEATURES = [ "target_asset_type", "source_ip_class", "protocol", "attacker_capability_tier", "defender_maturity_level", "alert_severity", # critical / high / medium / low / informational "detection_outcome", # see note above re: post-hoc observables ] ID_COLUMNS = ["campaign_id", "attacker_id"] # --------------------------------------------------------------------------- # Topology aggregation # --------------------------------------------------------------------------- # # network_topology.csv is ASSET-LEVEL (651 rows, 12 segments, ~54 assets # per segment). Direct join would explode rows. Aggregate to segment level: # constant fields as-is, numeric fields mean/max as appropriate, 0/1 flags # as fraction-with-coverage. SEGMENT_CONSTANT_TOPO_COLS = ["segment_type", "defender_maturity_level"] SEGMENT_NUMERIC_AGGREGATES = { "patch_lag_days": "mean", "exposure_score": "mean", "vulnerability_count": "max", # worst-case asset matters more "inter_segment_trust_level": "mean", "alert_threshold_sensitivity": "mean", "mttd_baseline_hours": "mean", "mttr_baseline_hours": "mean", "siem_coverage_flag": "mean", # fraction with SIEM "edr_deployed_flag": "mean", # fraction with EDR "ndr_coverage_flag": "mean", "mfa_enforced_flag": "mean", } def _aggregate_topology(topology: pd.DataFrame) -> pd.DataFrame: """Collapse asset-level topology to one row per segment.""" parts = [] for col in SEGMENT_CONSTANT_TOPO_COLS: parts.append(topology.groupby("segment_id")[col].first().rename(f"seg_{col}")) for col, agg in SEGMENT_NUMERIC_AGGREGATES.items(): parts.append(topology.groupby("segment_id")[col].agg(agg).rename(f"seg_{col}_{agg}")) return pd.concat(parts, axis=1).reset_index() TOPOLOGY_FEATURE_NAMES_NUMERIC = [ f"seg_{col}_{agg}" for col, agg in SEGMENT_NUMERIC_AGGREGATES.items() ] TOPOLOGY_FEATURE_NAMES_CATEGORICAL = [f"seg_{col}" for col in SEGMENT_CONSTANT_TOPO_COLS] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- # # Important: NO phase-derived engineered features. is_dwell_idle, # is_high_severity_phase, phase_order_index would all be oracles when # phase is the target. Six features instead, each a stated hypothesis # about phase-discriminative signal in pre-phase observables. TIER_RANK = {"script_kiddie": 1, "opportunistic": 2, "apt": 3, "nation_state": 4} DEFENDER_RANK = {"minimal": 1, "baseline": 2, "managed": 3, "advanced": 4, "zero_trust": 5} def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """Six engineered features, no phase-derived oracles.""" df = df.copy() # 1. Byte volume on log scale. Heavy-tailed across phases: recon # transfers tend to be bytes; exfiltration megabytes. log1p tames # the tail and gives both XGBoost and the MLP a usable feature. df["byte_volume_log"] = np.log1p(df["bytes_transferred"].clip(lower=0)).astype(float) # 2. C2 beacon presence. c2_beacon_interval_s is null for non-C2 # actions. Encode presence as a binary flag and fill the value # column with -1 so it stays usable. df["has_c2_beacon"] = df["c2_beacon_interval_s"].notna().astype(int) df["c2_beacon_interval_s"] = df["c2_beacon_interval_s"].fillna(-1.0) # 3. Brute-force indicator. auth_failure_count > 0 distinguishes # credential-stuffing style actions from authenticated-path # actions; loads differently into early phases. df["is_brute_forcing"] = (df["auth_failure_count"] > 0).astype(int) # 4. Attacker vs defender advantage. Positive when attacker outclasses # defender; influences which phases an attacker can reach. tier_r = df["attacker_capability_tier"].map(TIER_RANK).fillna(2).astype(int) def_r = df["defender_maturity_level"].map(DEFENDER_RANK).fillna(2).astype(int) df["attacker_defender_advantage"] = (tier_r - def_r).astype(int) # 5. High-volume action indicator. Simple binary above 100 KB, # correlates with collection / exfiltration phases. df["is_high_volume"] = (df["bytes_transferred"] > 100_000).astype(int) # 6. Privileged-port indicator. dest_port < 1024, typically system # services; common in initial-access and lateral-movement actions. df["is_privileged_port"] = (df["dest_port"] < 1024).astype(int) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( attack_events_path: str | Path, topology_path: str | Path, campaign_summary_path: str | Path | None = None, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load CSVs, aggregate topology, drop leaky columns, engineer features, one-hot encode, return (X, y, groups, meta). `groups` is a Series of campaign_id values aligned with X for GroupShuffleSplit / GroupKFold use. A single campaign generates ~40 correlated events; row-level random splitting inflates metrics. """ events = pd.read_csv(attack_events_path) topology = pd.read_csv(topology_path) events = events.drop(columns=LEAKY_COLUMNS, errors="ignore") topo_agg = _aggregate_topology(topology) events = events.merge( topo_agg, left_on="target_segment_id", right_on="segment_id", how="left", ).drop(columns=["segment_id"], errors="ignore") y = events["kill_chain_phase"].map(LABEL_TO_INT) if y.isna().any(): bad = events.loc[y.isna(), "kill_chain_phase"].unique() raise ValueError(f"Unknown kill_chain_phase values: {bad}") y = y.astype(int) groups = events["campaign_id"].copy() events = _add_engineered_features(events) numeric_features = ( DIRECT_NUMERIC_EVENT_FEATURES + TOPOLOGY_FEATURE_NAMES_NUMERIC + [ "byte_volume_log", "has_c2_beacon", "is_brute_forcing", "attacker_defender_advantage", "is_high_volume", "is_privileged_port", ] ) X_numeric = events[numeric_features].astype(float) all_categorical = ( [(col, "event") for col in CATEGORICAL_EVENT_FEATURES] + [(col, "topology") for col in TOPOLOGY_FEATURE_NAMES_CATEGORICAL] ) categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col, _src in all_categorical: levels = sorted(events[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( events[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "topology_aggregation": { "segment_constant": SEGMENT_CONSTANT_TOPO_COLS, "segment_numeric_aggregates": SEGMENT_NUMERIC_AGGREGATES, }, } return X, y, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], segment_aggregates: dict | None = None, ) -> np.ndarray: """Encode a single event record for inference. `record` must contain event-level fields (sans leaky columns) plus the segment-level aggregate fields. If you only have the raw event, pass `segment_aggregates` as a dict {seg_*: value, ...} and they'll be merged in. """ if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() if segment_aggregates is not None: for k, v in segment_aggregates.items(): df[k] = v df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "topology_aggregation": meta["topology_aggregation"], } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]: """Build a {segment_id: {seg_* feature values}} lookup for inference.""" topology = pd.read_csv(topology_path) agg = _aggregate_topology(topology) return {row["segment_id"]: {k: v for k, v in row.items() if k != "segment_id"} for _, row in agg.iterrows()} if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, groups, meta = build_features( base / "attack_events.csv", base / "network_topology.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} campaigns") print(f"n features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")