| """ |
| feature_engineering.py |
| ====================== |
| |
| Feature pipeline for the CYB002 baseline classifier. |
| |
| Predicts `kill_chain_phase` (10-class) from event + segment-level |
| observables on the CYB002 sample dataset. |
| |
| CSV inputs: |
| attack_events.csv (primary, one row per timestep-level action) |
| network_topology.csv (asset-level inventory; aggregated to segment |
| level before joining on target_segment_id) |
| campaign_summary.csv (reserved for future work, not used in v1) |
| campaign_events.csv (reserved for future work, not used in v1) |
| |
| Target classes: |
| dwell_idle, reconnaissance, initial_access, execution, persistence, |
| privilege_escalation, lateral_movement, collection, exfiltration, impact |
| |
| This corresponds to the README's first listed use case: predicting the |
| next ATT&CK phase from observable features. The challenge is that three |
| fields perfectly determine phase by construction: |
| |
| - technique_id -> 62 of 63 techniques map 1:1 to a single phase |
| - technique_name -> 1:1 with technique_id |
| - tactic_category -> direct alias of phase |
| |
| These are dropped before feature assembly. Phase is predicted from: |
| timestep position (recon mean=6, impact mean=66), target asset type, |
| protocol/port, byte volumes, connection duration, auth-failure count, |
| process-injection / lateral-hop counts, attacker tier vs defender |
| maturity, and segment-level topology aggregates. |
| |
| Public API |
| ---------- |
| build_features(attack_events_path, topology_path, |
| campaign_summary_path=None) -> (X, y, groups, meta) |
| transform_single(record, meta, segment_aggregates=None) -> np.ndarray |
| save_meta(meta, path) / load_meta(path) |
| build_segment_lookup(topology_path) -> dict |
| |
| License |
| ------- |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching |
| the dataset license. See README.md. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| |
| |
|
|
| |
| |
| |
| LABEL_ORDER = [ |
| "dwell_idle", |
| "reconnaissance", |
| "initial_access", |
| "execution", |
| "persistence", |
| "privilege_escalation", |
| "lateral_movement", |
| "collection", |
| "exfiltration", |
| "impact", |
| ] |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| LEAKY_COLUMNS = [ |
| "technique_id", |
| "technique_name", |
| "tactic_category", |
| ] |
|
|
| |
| |
| |
|
|
| DIRECT_NUMERIC_EVENT_FEATURES = [ |
| "timestep", |
| "dest_port", |
| "bytes_transferred", |
| "connection_duration_s", |
| "auth_failure_count", |
| "process_injection_flag", |
| "lateral_hop_count", |
| "c2_beacon_interval_s", |
| |
| |
| |
| |
| |
| |
| "edr_blocked_flag", |
| "siem_rule_triggered", |
| ] |
|
|
| CATEGORICAL_EVENT_FEATURES = [ |
| "target_asset_type", |
| "source_ip_class", |
| "protocol", |
| "attacker_capability_tier", |
| "defender_maturity_level", |
| "alert_severity", |
| "detection_outcome", |
| ] |
|
|
| ID_COLUMNS = ["campaign_id", "attacker_id"] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| SEGMENT_CONSTANT_TOPO_COLS = ["segment_type", "defender_maturity_level"] |
| SEGMENT_NUMERIC_AGGREGATES = { |
| "patch_lag_days": "mean", |
| "exposure_score": "mean", |
| "vulnerability_count": "max", |
| "inter_segment_trust_level": "mean", |
| "alert_threshold_sensitivity": "mean", |
| "mttd_baseline_hours": "mean", |
| "mttr_baseline_hours": "mean", |
| "siem_coverage_flag": "mean", |
| "edr_deployed_flag": "mean", |
| "ndr_coverage_flag": "mean", |
| "mfa_enforced_flag": "mean", |
| } |
|
|
|
|
| def _aggregate_topology(topology: pd.DataFrame) -> pd.DataFrame: |
| """Collapse asset-level topology to one row per segment.""" |
| parts = [] |
| for col in SEGMENT_CONSTANT_TOPO_COLS: |
| parts.append(topology.groupby("segment_id")[col].first().rename(f"seg_{col}")) |
| for col, agg in SEGMENT_NUMERIC_AGGREGATES.items(): |
| parts.append(topology.groupby("segment_id")[col].agg(agg).rename(f"seg_{col}_{agg}")) |
| return pd.concat(parts, axis=1).reset_index() |
|
|
|
|
| TOPOLOGY_FEATURE_NAMES_NUMERIC = [ |
| f"seg_{col}_{agg}" for col, agg in SEGMENT_NUMERIC_AGGREGATES.items() |
| ] |
| TOPOLOGY_FEATURE_NAMES_CATEGORICAL = [f"seg_{col}" for col in SEGMENT_CONSTANT_TOPO_COLS] |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| TIER_RANK = {"script_kiddie": 1, "opportunistic": 2, "apt": 3, "nation_state": 4} |
| DEFENDER_RANK = {"minimal": 1, "baseline": 2, "managed": 3, "advanced": 4, "zero_trust": 5} |
|
|
|
|
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Six engineered features, no phase-derived oracles.""" |
| df = df.copy() |
|
|
| |
| |
| |
| df["byte_volume_log"] = np.log1p(df["bytes_transferred"].clip(lower=0)).astype(float) |
|
|
| |
| |
| |
| df["has_c2_beacon"] = df["c2_beacon_interval_s"].notna().astype(int) |
| df["c2_beacon_interval_s"] = df["c2_beacon_interval_s"].fillna(-1.0) |
|
|
| |
| |
| |
| df["is_brute_forcing"] = (df["auth_failure_count"] > 0).astype(int) |
|
|
| |
| |
| tier_r = df["attacker_capability_tier"].map(TIER_RANK).fillna(2).astype(int) |
| def_r = df["defender_maturity_level"].map(DEFENDER_RANK).fillna(2).astype(int) |
| df["attacker_defender_advantage"] = (tier_r - def_r).astype(int) |
|
|
| |
| |
| df["is_high_volume"] = (df["bytes_transferred"] > 100_000).astype(int) |
|
|
| |
| |
| df["is_privileged_port"] = (df["dest_port"] < 1024).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def build_features( |
| attack_events_path: str | Path, |
| topology_path: str | Path, |
| campaign_summary_path: str | Path | None = None, |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: |
| """ |
| Load CSVs, aggregate topology, drop leaky columns, engineer features, |
| one-hot encode, return (X, y, groups, meta). |
| |
| `groups` is a Series of campaign_id values aligned with X for |
| GroupShuffleSplit / GroupKFold use. A single campaign generates ~40 |
| correlated events; row-level random splitting inflates metrics. |
| """ |
| events = pd.read_csv(attack_events_path) |
| topology = pd.read_csv(topology_path) |
|
|
| events = events.drop(columns=LEAKY_COLUMNS, errors="ignore") |
|
|
| topo_agg = _aggregate_topology(topology) |
| events = events.merge( |
| topo_agg, left_on="target_segment_id", right_on="segment_id", how="left", |
| ).drop(columns=["segment_id"], errors="ignore") |
|
|
| y = events["kill_chain_phase"].map(LABEL_TO_INT) |
| if y.isna().any(): |
| bad = events.loc[y.isna(), "kill_chain_phase"].unique() |
| raise ValueError(f"Unknown kill_chain_phase values: {bad}") |
| y = y.astype(int) |
| groups = events["campaign_id"].copy() |
|
|
| events = _add_engineered_features(events) |
|
|
| numeric_features = ( |
| DIRECT_NUMERIC_EVENT_FEATURES |
| + TOPOLOGY_FEATURE_NAMES_NUMERIC |
| + [ |
| "byte_volume_log", "has_c2_beacon", "is_brute_forcing", |
| "attacker_defender_advantage", "is_high_volume", |
| "is_privileged_port", |
| ] |
| ) |
| X_numeric = events[numeric_features].astype(float) |
|
|
| all_categorical = ( |
| [(col, "event") for col in CATEGORICAL_EVENT_FEATURES] |
| + [(col, "topology") for col in TOPOLOGY_FEATURE_NAMES_CATEGORICAL] |
| ) |
| categorical_levels: dict[str, list[str]] = {} |
| blocks: list[pd.DataFrame] = [] |
| for col, _src in all_categorical: |
| levels = sorted(events[col].dropna().unique().tolist()) |
| categorical_levels[col] = levels |
| block = pd.get_dummies( |
| events[col].astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| blocks.append(block) |
|
|
| X = pd.concat( |
| [X_numeric.reset_index(drop=True)] |
| + [b.reset_index(drop=True) for b in blocks], |
| axis=1, |
| ).fillna(0.0) |
|
|
| meta = { |
| "feature_names": X.columns.tolist(), |
| "numeric_features": numeric_features, |
| "categorical_levels": categorical_levels, |
| "label_to_int": LABEL_TO_INT, |
| "int_to_label": INT_TO_LABEL, |
| "topology_aggregation": { |
| "segment_constant": SEGMENT_CONSTANT_TOPO_COLS, |
| "segment_numeric_aggregates": SEGMENT_NUMERIC_AGGREGATES, |
| }, |
| } |
| return X, y, groups, meta |
|
|
|
|
| def transform_single( |
| record: dict | pd.DataFrame, |
| meta: dict[str, Any], |
| segment_aggregates: dict | None = None, |
| ) -> np.ndarray: |
| """Encode a single event record for inference. |
| |
| `record` must contain event-level fields (sans leaky columns) plus |
| the segment-level aggregate fields. If you only have the raw event, |
| pass `segment_aggregates` as a dict {seg_*: value, ...} and they'll |
| be merged in. |
| """ |
| if isinstance(record, dict): |
| df = pd.DataFrame([record.copy()]) |
| else: |
| df = record.copy() |
|
|
| if segment_aggregates is not None: |
| for k, v in segment_aggregates.items(): |
| df[k] = v |
|
|
| df = _add_engineered_features(df) |
|
|
| numeric = pd.DataFrame({ |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values |
| for col in meta["numeric_features"] |
| }) |
| blocks: list[pd.DataFrame] = [numeric] |
| for col, levels in meta["categorical_levels"].items(): |
| val = df.get(col, pd.Series([None] * len(df))) |
| block = pd.get_dummies( |
| val.astype("category").cat.set_categories(levels), |
| prefix=col, dummy_na=False, |
| ).astype(int) |
| for lvl in levels: |
| cname = f"{col}_{lvl}" |
| if cname not in block.columns: |
| block[cname] = 0 |
| block = block[[f"{col}_{lvl}" for lvl in levels]] |
| blocks.append(block) |
|
|
| X = pd.concat(blocks, axis=1).fillna(0.0) |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) |
| return X.values.astype(np.float32) |
|
|
|
|
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: |
| serializable = { |
| "feature_names": meta["feature_names"], |
| "numeric_features": meta["numeric_features"], |
| "categorical_levels": meta["categorical_levels"], |
| "label_to_int": meta["label_to_int"], |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, |
| "topology_aggregation": meta["topology_aggregation"], |
| } |
| with open(path, "w") as f: |
| json.dump(serializable, f, indent=2) |
|
|
|
|
| def load_meta(path: str | Path) -> dict[str, Any]: |
| with open(path) as f: |
| meta = json.load(f) |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} |
| return meta |
|
|
|
|
| def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]: |
| """Build a {segment_id: {seg_* feature values}} lookup for inference.""" |
| topology = pd.read_csv(topology_path) |
| agg = _aggregate_topology(topology) |
| return {row["segment_id"]: {k: v for k, v in row.items() if k != "segment_id"} |
| for _, row in agg.iterrows()} |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") |
| X, y, groups, meta = build_features( |
| base / "attack_events.csv", |
| base / "network_topology.csv", |
| ) |
| print(f"X shape: {X.shape}") |
| print(f"y shape: {y.shape}") |
| print(f"groups: {groups.nunique()} campaigns") |
| print(f"n features: {len(meta['feature_names'])}") |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") |
| print(f"X has NaN: {X.isnull().any().any()}") |
|
|