""" feature_engineering.py ====================== Feature pipeline for the CYB011 baseline classifier. Predicts `attack_phase` (7-class adversarial attack phase) from per-timestep features on the CYB011 sample dataset. CSV inputs: attack_trajectories.csv (primary, per-timestep, 14,000 events) network_topology.csv (per-segment registry, joined for defender context features) campaign_summary.csv (per-campaign summaries; reserved) campaign_events.csv (discrete event log; reserved) Target classes (7): reconnaissance, feature_space_probe, perturbation_craft, evasion_attempt, feedback_adaptation, campaign_consolidation, idle_dwell The CYB011 README describes a "6-phase adversarial state machine" but the sample data has 7 phases — it adds `idle_dwell` (18% of events, the second-largest class). Group structure --------------- 200 campaigns x 70 timesteps = 14,000 events. Each campaign is a sequential evasion attempt; events from the same campaign share attacker, target segment, and tier. Group-aware splitting by `campaign_id` (~30 test campaigns per fold) prevents train/test contamination. Leakage audit ------------- Three columns dropped from features because they're outcome leaks for `attack_phase`: 1. `detection_outcome` (4-class categorical): - `evasion_success` / `marginal_alert` / `high_confidence_alert` ALL → 100% `evasion_attempt` phase - `suppressed_alert` → can be any of the 7 phases So detection_outcome != suppressed_alert is a perfect oracle for evasion_attempt. 2. `detector_confidence_score`: deterministically derives detection outcome via threshold boundaries (< 0.25 -> evasion_success, [0.52, 0.78] -> marginal, >= 0.78 -> high_confidence). Same leakage as detection_outcome. 3. `evasion_budget_consumed`: == 0 for 100% of {reconnaissance, feature_space_probe, perturbation_craft} events. > 0 for the other 4 phases. Perfect oracle for the 3 early phases. KEPT as a legitimate observable: - `timestep` is the per-event position in the campaign lifecycle. It correlates with phase (reconnaissance is always early, campaign_consolidation is always late) but is NOT a label-encoding oracle — it's a real progress observable that a defender would have at decision time. Adding +9pp accuracy when included is honest signal. KEPT as a defender-context observable: - `defender_architecture`, `detection_strength`, `adversarial_robustness`, `ensemble_size`, `alert_threshold`, `detection_coverage`, `feature_space_dim`, `retraining_cadence_days`, `trust_level`: all per-segment topology features. They are deterministic per segment (each topology row uniquely fingerprints its segment), but the segment itself is real context — a defender knows its own architecture. These features are NOT oracles for attack_phase (they predict defender_architecture trivially, but defender_architecture isn't our target). Public API ---------- build_features(trajectories_path, topology_path) -> (X, y, ids, groups, meta) transform_single(record, meta, segment_lookup=None) -> np.ndarray save_meta(meta, path) / load_meta(path) build_segment_lookup(topology_path) -> dict License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Ordered by attack lifecycle progression. LABEL_ORDER = [ "reconnaissance", "feature_space_probe", "perturbation_craft", "evasion_attempt", "feedback_adaptation", "campaign_consolidation", "idle_dwell", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns # --------------------------------------------------------------------------- ID_COLUMNS = [ "campaign_id", "attacker_id", "target_segment_id", "segment_id", "detector_id", ] TARGET_COLUMN = "attack_phase" GROUP_COLUMN = "campaign_id" # Outcome leaks dropped from features. ORACLE_COLUMNS = [ "detection_outcome", # !=suppressed -> 100% evasion_attempt "detector_confidence_score",# threshold-derived from detection_outcome "evasion_budget_consumed", # ==0 -> 100% one of 3 early phases ] # --------------------------------------------------------------------------- # Per-timestep numeric features # --------------------------------------------------------------------------- EVENT_NUMERIC_FEATURES = [ "timestep", # kept: legitimate campaign-progress observable "perturbation_magnitude", "feature_delta_l2_norm", "feature_delta_linf_norm", "query_count_cumulative", ] EVENT_CATEGORICAL_FEATURES = [ "attacker_capability_tier", # 3 values in sample (script_kiddie, opportunistic, APT) ] # --------------------------------------------------------------------------- # Segment / topology features (joined on target_segment_id) # --------------------------------------------------------------------------- SEGMENT_NUMERIC_FEATURES = [ "trust_level", "detection_coverage", "feature_space_dim", "alert_threshold", "retraining_cadence_days", "ensemble_size", "detection_strength", "adversarial_robustness", ] SEGMENT_CATEGORICAL_FEATURES = [ "segment_type", # 8 values "defender_architecture", # 8 values ] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Five engineered features encoding phase-discriminative hypotheses. """ df = df.copy() # 1. Campaign progress fraction (timestep / 70). Normalizes the # position-in-lifecycle signal. if "timestep" in df.columns: df["progress_frac"] = (df["timestep"] / 70.0).astype(float) else: df["progress_frac"] = 0.0 # 2. Log query intensity. Queries are heavy-tailed; some phases # (reconnaissance, idle_dwell) have ~0 queries while # evasion_attempt cumulates many. df["log_queries"] = np.log1p( df.get("query_count_cumulative", 0).clip(lower=0) ).astype(float) # 3. Perturbation intensity: max(L2, Linf). Captures whether the # attacker is actively perturbing inputs. if "feature_delta_l2_norm" in df.columns and "feature_delta_linf_norm" in df.columns: df["perturb_intensity"] = np.maximum( df["feature_delta_l2_norm"].fillna(0), df["feature_delta_linf_norm"].fillna(0), ).astype(float) else: df["perturb_intensity"] = 0.0 # 4. Defender weakness composite: low detection_strength + low # adversarial_robustness = more evadable defender. Some phases # (evasion_attempt) cluster on weaker defenders. if "detection_strength" in df.columns and "adversarial_robustness" in df.columns: df["defender_weakness"] = ( (1 - df["detection_strength"].fillna(0.5)) * (1 - df["adversarial_robustness"].fillna(0.5)) ).astype(float) else: df["defender_weakness"] = 0.0 # 5. Query-per-timestep rate: indicates active probing vs idling. if "query_count_cumulative" in df.columns and "timestep" in df.columns: df["query_rate"] = ( df["query_count_cumulative"] / df["timestep"].clip(lower=1) ).astype(float) else: df["query_rate"] = 0.0 return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( trajectories_path: str | Path, topology_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]: """ Load attack_trajectories.csv, join network_topology.csv, drop target + identifiers + oracle columns, engineer features, one-hot encode, return (X, y, ids, groups, meta). """ traj = pd.read_csv(trajectories_path) topo = pd.read_csv(topology_path) y = traj[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = traj.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown attack_phase values: {bad}") y = y.astype(int) ids = ( traj["campaign_id"].astype(str) + ":t" + traj["timestep"].astype(str) ) groups = traj[GROUP_COLUMN].copy() topo_cols_needed = ( ["segment_id"] + SEGMENT_NUMERIC_FEATURES + SEGMENT_CATEGORICAL_FEATURES ) traj = traj.merge( topo[topo_cols_needed], left_on="target_segment_id", right_on="segment_id", how="left", ) traj = _add_engineered_features(traj) traj = traj.drop( columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS, errors="ignore", ) numeric_features = ( EVENT_NUMERIC_FEATURES + SEGMENT_NUMERIC_FEATURES + [ "progress_frac", "log_queries", "perturb_intensity", "defender_weakness", "query_rate", ] ) numeric_features = [c for c in numeric_features if c in traj.columns] X_numeric = traj[numeric_features].astype(float) all_categorical = EVENT_CATEGORICAL_FEATURES + SEGMENT_CATEGORICAL_FEATURES categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col in all_categorical: if col not in traj.columns: continue levels = sorted(traj[col].dropna().astype(str).unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( traj[col].astype(str).astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "oracle_excluded": ORACLE_COLUMNS, } return X, y, ids, groups, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], segment_lookup: dict | None = None, ) -> np.ndarray: """Encode a single trajectory record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() if segment_lookup is not None and "target_segment_id" in df.columns: seg_id = df["target_segment_id"].iloc[0] seg_feats = segment_lookup.get(seg_id, {}) for k, v in seg_feats.items(): if k not in df.columns: df[k] = v df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))).astype(str) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "oracle_excluded": meta.get("oracle_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]: """Build {segment_id: {segment feature values}} for inference.""" topo = pd.read_csv(topology_path) cols = SEGMENT_NUMERIC_FEATURES + SEGMENT_CATEGORICAL_FEATURES out = {} for _, row in topo.iterrows(): out[row["segment_id"]] = {c: row[c] for c in cols if c in topo.columns} return out if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, ids, groups, meta = build_features( base / "attack_trajectories.csv", base / "network_topology.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"groups: {groups.nunique()} unique campaigns") print(f"n_features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")