cyb011-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: attack_phase 7-class baseline + 6-oracle-path leakage diagnostic + missing tier note
03d64e5 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB011 baseline classifier.
Predicts `attack_phase` (7-class adversarial attack phase) from
per-timestep features on the CYB011 sample dataset.
CSV inputs:
attack_trajectories.csv (primary, per-timestep, 14,000 events)
network_topology.csv (per-segment registry, joined for defender
context features)
campaign_summary.csv (per-campaign summaries; reserved)
campaign_events.csv (discrete event log; reserved)
Target classes (7):
reconnaissance, feature_space_probe, perturbation_craft,
evasion_attempt, feedback_adaptation, campaign_consolidation,
idle_dwell
The CYB011 README describes a "6-phase adversarial state machine" but
the sample data has 7 phases — it adds `idle_dwell` (18% of events,
the second-largest class).
Group structure
---------------
200 campaigns x 70 timesteps = 14,000 events. Each campaign is a
sequential evasion attempt; events from the same campaign share
attacker, target segment, and tier. Group-aware splitting by
`campaign_id` (~30 test campaigns per fold) prevents train/test
contamination.
Leakage audit
-------------
Three columns dropped from features because they're outcome leaks
for `attack_phase`:
1. `detection_outcome` (4-class categorical):
- `evasion_success` / `marginal_alert` / `high_confidence_alert`
ALL → 100% `evasion_attempt` phase
- `suppressed_alert` → can be any of the 7 phases
So detection_outcome != suppressed_alert is a perfect oracle for
evasion_attempt.
2. `detector_confidence_score`: deterministically derives detection
outcome via threshold boundaries (< 0.25 -> evasion_success,
[0.52, 0.78] -> marginal, >= 0.78 -> high_confidence). Same
leakage as detection_outcome.
3. `evasion_budget_consumed`: == 0 for 100% of {reconnaissance,
feature_space_probe, perturbation_craft} events. > 0 for the
other 4 phases. Perfect oracle for the 3 early phases.
KEPT as a legitimate observable:
- `timestep` is the per-event position in the campaign lifecycle.
It correlates with phase (reconnaissance is always early,
campaign_consolidation is always late) but is NOT a label-encoding
oracle — it's a real progress observable that a defender would have
at decision time. Adding +9pp accuracy when included is honest signal.
KEPT as a defender-context observable:
- `defender_architecture`, `detection_strength`, `adversarial_robustness`,
`ensemble_size`, `alert_threshold`, `detection_coverage`,
`feature_space_dim`, `retraining_cadence_days`, `trust_level`: all
per-segment topology features. They are deterministic per segment
(each topology row uniquely fingerprints its segment), but the
segment itself is real context — a defender knows its own
architecture. These features are NOT oracles for attack_phase (they
predict defender_architecture trivially, but defender_architecture
isn't our target).
Public API
----------
build_features(trajectories_path, topology_path)
-> (X, y, ids, groups, meta)
transform_single(record, meta, segment_lookup=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_segment_lookup(topology_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Ordered by attack lifecycle progression.
LABEL_ORDER = [
"reconnaissance",
"feature_space_probe",
"perturbation_craft",
"evasion_attempt",
"feedback_adaptation",
"campaign_consolidation",
"idle_dwell",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns
# ---------------------------------------------------------------------------
ID_COLUMNS = [
"campaign_id", "attacker_id",
"target_segment_id", "segment_id", "detector_id",
]
TARGET_COLUMN = "attack_phase"
GROUP_COLUMN = "campaign_id"
# Outcome leaks dropped from features.
ORACLE_COLUMNS = [
"detection_outcome", # !=suppressed -> 100% evasion_attempt
"detector_confidence_score",# threshold-derived from detection_outcome
"evasion_budget_consumed", # ==0 -> 100% one of 3 early phases
]
# ---------------------------------------------------------------------------
# Per-timestep numeric features
# ---------------------------------------------------------------------------
EVENT_NUMERIC_FEATURES = [
"timestep", # kept: legitimate campaign-progress observable
"perturbation_magnitude",
"feature_delta_l2_norm",
"feature_delta_linf_norm",
"query_count_cumulative",
]
EVENT_CATEGORICAL_FEATURES = [
"attacker_capability_tier", # 3 values in sample (script_kiddie, opportunistic, APT)
]
# ---------------------------------------------------------------------------
# Segment / topology features (joined on target_segment_id)
# ---------------------------------------------------------------------------
SEGMENT_NUMERIC_FEATURES = [
"trust_level",
"detection_coverage",
"feature_space_dim",
"alert_threshold",
"retraining_cadence_days",
"ensemble_size",
"detection_strength",
"adversarial_robustness",
]
SEGMENT_CATEGORICAL_FEATURES = [
"segment_type", # 8 values
"defender_architecture", # 8 values
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Five engineered features encoding phase-discriminative hypotheses.
"""
df = df.copy()
# 1. Campaign progress fraction (timestep / 70). Normalizes the
# position-in-lifecycle signal.
if "timestep" in df.columns:
df["progress_frac"] = (df["timestep"] / 70.0).astype(float)
else:
df["progress_frac"] = 0.0
# 2. Log query intensity. Queries are heavy-tailed; some phases
# (reconnaissance, idle_dwell) have ~0 queries while
# evasion_attempt cumulates many.
df["log_queries"] = np.log1p(
df.get("query_count_cumulative", 0).clip(lower=0)
).astype(float)
# 3. Perturbation intensity: max(L2, Linf). Captures whether the
# attacker is actively perturbing inputs.
if "feature_delta_l2_norm" in df.columns and "feature_delta_linf_norm" in df.columns:
df["perturb_intensity"] = np.maximum(
df["feature_delta_l2_norm"].fillna(0),
df["feature_delta_linf_norm"].fillna(0),
).astype(float)
else:
df["perturb_intensity"] = 0.0
# 4. Defender weakness composite: low detection_strength + low
# adversarial_robustness = more evadable defender. Some phases
# (evasion_attempt) cluster on weaker defenders.
if "detection_strength" in df.columns and "adversarial_robustness" in df.columns:
df["defender_weakness"] = (
(1 - df["detection_strength"].fillna(0.5))
* (1 - df["adversarial_robustness"].fillna(0.5))
).astype(float)
else:
df["defender_weakness"] = 0.0
# 5. Query-per-timestep rate: indicates active probing vs idling.
if "query_count_cumulative" in df.columns and "timestep" in df.columns:
df["query_rate"] = (
df["query_count_cumulative"] / df["timestep"].clip(lower=1)
).astype(float)
else:
df["query_rate"] = 0.0
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
trajectories_path: str | Path,
topology_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]:
"""
Load attack_trajectories.csv, join network_topology.csv, drop
target + identifiers + oracle columns, engineer features, one-hot
encode, return (X, y, ids, groups, meta).
"""
traj = pd.read_csv(trajectories_path)
topo = pd.read_csv(topology_path)
y = traj[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = traj.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown attack_phase values: {bad}")
y = y.astype(int)
ids = (
traj["campaign_id"].astype(str)
+ ":t"
+ traj["timestep"].astype(str)
)
groups = traj[GROUP_COLUMN].copy()
topo_cols_needed = (
["segment_id"]
+ SEGMENT_NUMERIC_FEATURES
+ SEGMENT_CATEGORICAL_FEATURES
)
traj = traj.merge(
topo[topo_cols_needed],
left_on="target_segment_id", right_on="segment_id",
how="left",
)
traj = _add_engineered_features(traj)
traj = traj.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS,
errors="ignore",
)
numeric_features = (
EVENT_NUMERIC_FEATURES
+ SEGMENT_NUMERIC_FEATURES
+ [
"progress_frac", "log_queries", "perturb_intensity",
"defender_weakness", "query_rate",
]
)
numeric_features = [c for c in numeric_features if c in traj.columns]
X_numeric = traj[numeric_features].astype(float)
all_categorical = EVENT_CATEGORICAL_FEATURES + SEGMENT_CATEGORICAL_FEATURES
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in all_categorical:
if col not in traj.columns:
continue
levels = sorted(traj[col].dropna().astype(str).unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
traj[col].astype(str).astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"oracle_excluded": ORACLE_COLUMNS,
}
return X, y, ids, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
segment_lookup: dict | None = None,
) -> np.ndarray:
"""Encode a single trajectory record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if segment_lookup is not None and "target_segment_id" in df.columns:
seg_id = df["target_segment_id"].iloc[0]
seg_feats = segment_lookup.get(seg_id, {})
for k, v in seg_feats.items():
if k not in df.columns:
df[k] = v
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df))).astype(str)
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"oracle_excluded": meta.get("oracle_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]:
"""Build {segment_id: {segment feature values}} for inference."""
topo = pd.read_csv(topology_path)
cols = SEGMENT_NUMERIC_FEATURES + SEGMENT_CATEGORICAL_FEATURES
out = {}
for _, row in topo.iterrows():
out[row["segment_id"]] = {c: row[c] for c in cols if c in topo.columns}
return out
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, ids, groups, meta = build_features(
base / "attack_trajectories.csv",
base / "network_topology.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} unique campaigns")
print(f"n_features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")