cyb002-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: XGBoost + MLP for ATT&CK phase classification
146a3a4 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB002 baseline classifier.
Predicts `kill_chain_phase` (10-class) from event + segment-level
observables on the CYB002 sample dataset.
CSV inputs:
attack_events.csv (primary, one row per timestep-level action)
network_topology.csv (asset-level inventory; aggregated to segment
level before joining on target_segment_id)
campaign_summary.csv (reserved for future work, not used in v1)
campaign_events.csv (reserved for future work, not used in v1)
Target classes:
dwell_idle, reconnaissance, initial_access, execution, persistence,
privilege_escalation, lateral_movement, collection, exfiltration, impact
This corresponds to the README's first listed use case: predicting the
next ATT&CK phase from observable features. The challenge is that three
fields perfectly determine phase by construction:
- technique_id -> 62 of 63 techniques map 1:1 to a single phase
- technique_name -> 1:1 with technique_id
- tactic_category -> direct alias of phase
These are dropped before feature assembly. Phase is predicted from:
timestep position (recon mean=6, impact mean=66), target asset type,
protocol/port, byte volumes, connection duration, auth-failure count,
process-injection / lateral-hop counts, attacker tier vs defender
maturity, and segment-level topology aggregates.
Public API
----------
build_features(attack_events_path, topology_path,
campaign_summary_path=None) -> (X, y, groups, meta)
transform_single(record, meta, segment_aggregates=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_segment_lookup(topology_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching
the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# The 10 phases observed in the sample. dwell_idle is a no-op step
# between actions; technique_id=T0000, tactic_category=NaN. Ordering
# follows tactic flow for readability; CE-loss doesn't care.
LABEL_ORDER = [
"dwell_idle",
"reconnaissance",
"initial_access",
"execution",
"persistence",
"privilege_escalation",
"lateral_movement",
"collection",
"exfiltration",
"impact",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Columns dropped because they leak the target (kill_chain_phase)
# ---------------------------------------------------------------------------
# `technique_id`: 62 of 63 ATT&CK techniques map 1:1 to a single phase.
# T1078 Valid Accounts is the one shared technique (appears in both
# initial_access and persistence, which is correct ATT&CK behavior).
# Including technique_id as a feature is effectively label memorization.
#
# `technique_name`: 1:1 alias of technique_id (63 unique values each).
#
# `tactic_category`: direct alias of kill_chain_phase; the two columns
# carry identical information except tactic_category is null for
# dwell_idle steps. Drop.
LEAKY_COLUMNS = [
"technique_id",
"technique_name",
"tactic_category",
]
# ---------------------------------------------------------------------------
# Columns kept as features
# ---------------------------------------------------------------------------
DIRECT_NUMERIC_EVENT_FEATURES = [
"timestep", # strong signal: recon mean=6, impact mean=66
"dest_port",
"bytes_transferred",
"connection_duration_s",
"auth_failure_count",
"process_injection_flag",
"lateral_hop_count",
"c2_beacon_interval_s", # null-aware; filled with -1 + has_c2_beacon flag
# Detection-related fields. These are POST-HOC observables from the
# SOC's perspective. We keep them as features because in the realistic
# phase-prediction use case, a SOC analyst has just seen an action and
# its initial detection outcome, and is trying to reason about which
# phase the campaign is in. Buyers who want a strictly pre-detection
# model can drop these four columns and retrain.
"edr_blocked_flag",
"siem_rule_triggered",
]
CATEGORICAL_EVENT_FEATURES = [
"target_asset_type",
"source_ip_class",
"protocol",
"attacker_capability_tier",
"defender_maturity_level",
"alert_severity", # critical / high / medium / low / informational
"detection_outcome", # see note above re: post-hoc observables
]
ID_COLUMNS = ["campaign_id", "attacker_id"]
# ---------------------------------------------------------------------------
# Topology aggregation
# ---------------------------------------------------------------------------
#
# network_topology.csv is ASSET-LEVEL (651 rows, 12 segments, ~54 assets
# per segment). Direct join would explode rows. Aggregate to segment level:
# constant fields as-is, numeric fields mean/max as appropriate, 0/1 flags
# as fraction-with-coverage.
SEGMENT_CONSTANT_TOPO_COLS = ["segment_type", "defender_maturity_level"]
SEGMENT_NUMERIC_AGGREGATES = {
"patch_lag_days": "mean",
"exposure_score": "mean",
"vulnerability_count": "max", # worst-case asset matters more
"inter_segment_trust_level": "mean",
"alert_threshold_sensitivity": "mean",
"mttd_baseline_hours": "mean",
"mttr_baseline_hours": "mean",
"siem_coverage_flag": "mean", # fraction with SIEM
"edr_deployed_flag": "mean", # fraction with EDR
"ndr_coverage_flag": "mean",
"mfa_enforced_flag": "mean",
}
def _aggregate_topology(topology: pd.DataFrame) -> pd.DataFrame:
"""Collapse asset-level topology to one row per segment."""
parts = []
for col in SEGMENT_CONSTANT_TOPO_COLS:
parts.append(topology.groupby("segment_id")[col].first().rename(f"seg_{col}"))
for col, agg in SEGMENT_NUMERIC_AGGREGATES.items():
parts.append(topology.groupby("segment_id")[col].agg(agg).rename(f"seg_{col}_{agg}"))
return pd.concat(parts, axis=1).reset_index()
TOPOLOGY_FEATURE_NAMES_NUMERIC = [
f"seg_{col}_{agg}" for col, agg in SEGMENT_NUMERIC_AGGREGATES.items()
]
TOPOLOGY_FEATURE_NAMES_CATEGORICAL = [f"seg_{col}" for col in SEGMENT_CONSTANT_TOPO_COLS]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
#
# Important: NO phase-derived engineered features. is_dwell_idle,
# is_high_severity_phase, phase_order_index would all be oracles when
# phase is the target. Six features instead, each a stated hypothesis
# about phase-discriminative signal in pre-phase observables.
TIER_RANK = {"script_kiddie": 1, "opportunistic": 2, "apt": 3, "nation_state": 4}
DEFENDER_RANK = {"minimal": 1, "baseline": 2, "managed": 3, "advanced": 4, "zero_trust": 5}
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""Six engineered features, no phase-derived oracles."""
df = df.copy()
# 1. Byte volume on log scale. Heavy-tailed across phases: recon
# transfers tend to be bytes; exfiltration megabytes. log1p tames
# the tail and gives both XGBoost and the MLP a usable feature.
df["byte_volume_log"] = np.log1p(df["bytes_transferred"].clip(lower=0)).astype(float)
# 2. C2 beacon presence. c2_beacon_interval_s is null for non-C2
# actions. Encode presence as a binary flag and fill the value
# column with -1 so it stays usable.
df["has_c2_beacon"] = df["c2_beacon_interval_s"].notna().astype(int)
df["c2_beacon_interval_s"] = df["c2_beacon_interval_s"].fillna(-1.0)
# 3. Brute-force indicator. auth_failure_count > 0 distinguishes
# credential-stuffing style actions from authenticated-path
# actions; loads differently into early phases.
df["is_brute_forcing"] = (df["auth_failure_count"] > 0).astype(int)
# 4. Attacker vs defender advantage. Positive when attacker outclasses
# defender; influences which phases an attacker can reach.
tier_r = df["attacker_capability_tier"].map(TIER_RANK).fillna(2).astype(int)
def_r = df["defender_maturity_level"].map(DEFENDER_RANK).fillna(2).astype(int)
df["attacker_defender_advantage"] = (tier_r - def_r).astype(int)
# 5. High-volume action indicator. Simple binary above 100 KB,
# correlates with collection / exfiltration phases.
df["is_high_volume"] = (df["bytes_transferred"] > 100_000).astype(int)
# 6. Privileged-port indicator. dest_port < 1024, typically system
# services; common in initial-access and lateral-movement actions.
df["is_privileged_port"] = (df["dest_port"] < 1024).astype(int)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
attack_events_path: str | Path,
topology_path: str | Path,
campaign_summary_path: str | Path | None = None,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load CSVs, aggregate topology, drop leaky columns, engineer features,
one-hot encode, return (X, y, groups, meta).
`groups` is a Series of campaign_id values aligned with X for
GroupShuffleSplit / GroupKFold use. A single campaign generates ~40
correlated events; row-level random splitting inflates metrics.
"""
events = pd.read_csv(attack_events_path)
topology = pd.read_csv(topology_path)
events = events.drop(columns=LEAKY_COLUMNS, errors="ignore")
topo_agg = _aggregate_topology(topology)
events = events.merge(
topo_agg, left_on="target_segment_id", right_on="segment_id", how="left",
).drop(columns=["segment_id"], errors="ignore")
y = events["kill_chain_phase"].map(LABEL_TO_INT)
if y.isna().any():
bad = events.loc[y.isna(), "kill_chain_phase"].unique()
raise ValueError(f"Unknown kill_chain_phase values: {bad}")
y = y.astype(int)
groups = events["campaign_id"].copy()
events = _add_engineered_features(events)
numeric_features = (
DIRECT_NUMERIC_EVENT_FEATURES
+ TOPOLOGY_FEATURE_NAMES_NUMERIC
+ [
"byte_volume_log", "has_c2_beacon", "is_brute_forcing",
"attacker_defender_advantage", "is_high_volume",
"is_privileged_port",
]
)
X_numeric = events[numeric_features].astype(float)
all_categorical = (
[(col, "event") for col in CATEGORICAL_EVENT_FEATURES]
+ [(col, "topology") for col in TOPOLOGY_FEATURE_NAMES_CATEGORICAL]
)
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col, _src in all_categorical:
levels = sorted(events[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
events[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"topology_aggregation": {
"segment_constant": SEGMENT_CONSTANT_TOPO_COLS,
"segment_numeric_aggregates": SEGMENT_NUMERIC_AGGREGATES,
},
}
return X, y, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
segment_aggregates: dict | None = None,
) -> np.ndarray:
"""Encode a single event record for inference.
`record` must contain event-level fields (sans leaky columns) plus
the segment-level aggregate fields. If you only have the raw event,
pass `segment_aggregates` as a dict {seg_*: value, ...} and they'll
be merged in.
"""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if segment_aggregates is not None:
for k, v in segment_aggregates.items():
df[k] = v
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"topology_aggregation": meta["topology_aggregation"],
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]:
"""Build a {segment_id: {seg_* feature values}} lookup for inference."""
topology = pd.read_csv(topology_path)
agg = _aggregate_topology(topology)
return {row["segment_id"]: {k: v for k, v in row.items() if k != "segment_id"}
for _, row in agg.iterrows()}
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, groups, meta = build_features(
base / "attack_events.csv",
base / "network_topology.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} campaigns")
print(f"n features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")