Initial release: attack_phase 7-class baseline + 6-oracle-path leakage diagnostic + missing tier note
03d64e5 verified | """ | |
| feature_engineering.py | |
| ====================== | |
| Feature pipeline for the CYB011 baseline classifier. | |
| Predicts `attack_phase` (7-class adversarial attack phase) from | |
| per-timestep features on the CYB011 sample dataset. | |
| CSV inputs: | |
| attack_trajectories.csv (primary, per-timestep, 14,000 events) | |
| network_topology.csv (per-segment registry, joined for defender | |
| context features) | |
| campaign_summary.csv (per-campaign summaries; reserved) | |
| campaign_events.csv (discrete event log; reserved) | |
| Target classes (7): | |
| reconnaissance, feature_space_probe, perturbation_craft, | |
| evasion_attempt, feedback_adaptation, campaign_consolidation, | |
| idle_dwell | |
| The CYB011 README describes a "6-phase adversarial state machine" but | |
| the sample data has 7 phases — it adds `idle_dwell` (18% of events, | |
| the second-largest class). | |
| Group structure | |
| --------------- | |
| 200 campaigns x 70 timesteps = 14,000 events. Each campaign is a | |
| sequential evasion attempt; events from the same campaign share | |
| attacker, target segment, and tier. Group-aware splitting by | |
| `campaign_id` (~30 test campaigns per fold) prevents train/test | |
| contamination. | |
| Leakage audit | |
| ------------- | |
| Three columns dropped from features because they're outcome leaks | |
| for `attack_phase`: | |
| 1. `detection_outcome` (4-class categorical): | |
| - `evasion_success` / `marginal_alert` / `high_confidence_alert` | |
| ALL → 100% `evasion_attempt` phase | |
| - `suppressed_alert` → can be any of the 7 phases | |
| So detection_outcome != suppressed_alert is a perfect oracle for | |
| evasion_attempt. | |
| 2. `detector_confidence_score`: deterministically derives detection | |
| outcome via threshold boundaries (< 0.25 -> evasion_success, | |
| [0.52, 0.78] -> marginal, >= 0.78 -> high_confidence). Same | |
| leakage as detection_outcome. | |
| 3. `evasion_budget_consumed`: == 0 for 100% of {reconnaissance, | |
| feature_space_probe, perturbation_craft} events. > 0 for the | |
| other 4 phases. Perfect oracle for the 3 early phases. | |
| KEPT as a legitimate observable: | |
| - `timestep` is the per-event position in the campaign lifecycle. | |
| It correlates with phase (reconnaissance is always early, | |
| campaign_consolidation is always late) but is NOT a label-encoding | |
| oracle — it's a real progress observable that a defender would have | |
| at decision time. Adding +9pp accuracy when included is honest signal. | |
| KEPT as a defender-context observable: | |
| - `defender_architecture`, `detection_strength`, `adversarial_robustness`, | |
| `ensemble_size`, `alert_threshold`, `detection_coverage`, | |
| `feature_space_dim`, `retraining_cadence_days`, `trust_level`: all | |
| per-segment topology features. They are deterministic per segment | |
| (each topology row uniquely fingerprints its segment), but the | |
| segment itself is real context — a defender knows its own | |
| architecture. These features are NOT oracles for attack_phase (they | |
| predict defender_architecture trivially, but defender_architecture | |
| isn't our target). | |
| Public API | |
| ---------- | |
| build_features(trajectories_path, topology_path) | |
| -> (X, y, ids, groups, meta) | |
| transform_single(record, meta, segment_lookup=None) -> np.ndarray | |
| save_meta(meta, path) / load_meta(path) | |
| build_segment_lookup(topology_path) -> dict | |
| License | |
| ------- | |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, | |
| matching the dataset license. See README.md. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Label space | |
| # --------------------------------------------------------------------------- | |
| # Ordered by attack lifecycle progression. | |
| LABEL_ORDER = [ | |
| "reconnaissance", | |
| "feature_space_probe", | |
| "perturbation_craft", | |
| "evasion_attempt", | |
| "feedback_adaptation", | |
| "campaign_consolidation", | |
| "idle_dwell", | |
| ] | |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} | |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} | |
| # --------------------------------------------------------------------------- | |
| # Identifier and target columns | |
| # --------------------------------------------------------------------------- | |
| ID_COLUMNS = [ | |
| "campaign_id", "attacker_id", | |
| "target_segment_id", "segment_id", "detector_id", | |
| ] | |
| TARGET_COLUMN = "attack_phase" | |
| GROUP_COLUMN = "campaign_id" | |
| # Outcome leaks dropped from features. | |
| ORACLE_COLUMNS = [ | |
| "detection_outcome", # !=suppressed -> 100% evasion_attempt | |
| "detector_confidence_score",# threshold-derived from detection_outcome | |
| "evasion_budget_consumed", # ==0 -> 100% one of 3 early phases | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Per-timestep numeric features | |
| # --------------------------------------------------------------------------- | |
| EVENT_NUMERIC_FEATURES = [ | |
| "timestep", # kept: legitimate campaign-progress observable | |
| "perturbation_magnitude", | |
| "feature_delta_l2_norm", | |
| "feature_delta_linf_norm", | |
| "query_count_cumulative", | |
| ] | |
| EVENT_CATEGORICAL_FEATURES = [ | |
| "attacker_capability_tier", # 3 values in sample (script_kiddie, opportunistic, APT) | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Segment / topology features (joined on target_segment_id) | |
| # --------------------------------------------------------------------------- | |
| SEGMENT_NUMERIC_FEATURES = [ | |
| "trust_level", | |
| "detection_coverage", | |
| "feature_space_dim", | |
| "alert_threshold", | |
| "retraining_cadence_days", | |
| "ensemble_size", | |
| "detection_strength", | |
| "adversarial_robustness", | |
| ] | |
| SEGMENT_CATEGORICAL_FEATURES = [ | |
| "segment_type", # 8 values | |
| "defender_architecture", # 8 values | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Engineered features | |
| # --------------------------------------------------------------------------- | |
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Five engineered features encoding phase-discriminative hypotheses. | |
| """ | |
| df = df.copy() | |
| # 1. Campaign progress fraction (timestep / 70). Normalizes the | |
| # position-in-lifecycle signal. | |
| if "timestep" in df.columns: | |
| df["progress_frac"] = (df["timestep"] / 70.0).astype(float) | |
| else: | |
| df["progress_frac"] = 0.0 | |
| # 2. Log query intensity. Queries are heavy-tailed; some phases | |
| # (reconnaissance, idle_dwell) have ~0 queries while | |
| # evasion_attempt cumulates many. | |
| df["log_queries"] = np.log1p( | |
| df.get("query_count_cumulative", 0).clip(lower=0) | |
| ).astype(float) | |
| # 3. Perturbation intensity: max(L2, Linf). Captures whether the | |
| # attacker is actively perturbing inputs. | |
| if "feature_delta_l2_norm" in df.columns and "feature_delta_linf_norm" in df.columns: | |
| df["perturb_intensity"] = np.maximum( | |
| df["feature_delta_l2_norm"].fillna(0), | |
| df["feature_delta_linf_norm"].fillna(0), | |
| ).astype(float) | |
| else: | |
| df["perturb_intensity"] = 0.0 | |
| # 4. Defender weakness composite: low detection_strength + low | |
| # adversarial_robustness = more evadable defender. Some phases | |
| # (evasion_attempt) cluster on weaker defenders. | |
| if "detection_strength" in df.columns and "adversarial_robustness" in df.columns: | |
| df["defender_weakness"] = ( | |
| (1 - df["detection_strength"].fillna(0.5)) | |
| * (1 - df["adversarial_robustness"].fillna(0.5)) | |
| ).astype(float) | |
| else: | |
| df["defender_weakness"] = 0.0 | |
| # 5. Query-per-timestep rate: indicates active probing vs idling. | |
| if "query_count_cumulative" in df.columns and "timestep" in df.columns: | |
| df["query_rate"] = ( | |
| df["query_count_cumulative"] / df["timestep"].clip(lower=1) | |
| ).astype(float) | |
| else: | |
| df["query_rate"] = 0.0 | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_features( | |
| trajectories_path: str | Path, | |
| topology_path: str | Path, | |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]: | |
| """ | |
| Load attack_trajectories.csv, join network_topology.csv, drop | |
| target + identifiers + oracle columns, engineer features, one-hot | |
| encode, return (X, y, ids, groups, meta). | |
| """ | |
| traj = pd.read_csv(trajectories_path) | |
| topo = pd.read_csv(topology_path) | |
| y = traj[TARGET_COLUMN].map(LABEL_TO_INT) | |
| if y.isna().any(): | |
| bad = traj.loc[y.isna(), TARGET_COLUMN].unique() | |
| raise ValueError(f"Unknown attack_phase values: {bad}") | |
| y = y.astype(int) | |
| ids = ( | |
| traj["campaign_id"].astype(str) | |
| + ":t" | |
| + traj["timestep"].astype(str) | |
| ) | |
| groups = traj[GROUP_COLUMN].copy() | |
| topo_cols_needed = ( | |
| ["segment_id"] | |
| + SEGMENT_NUMERIC_FEATURES | |
| + SEGMENT_CATEGORICAL_FEATURES | |
| ) | |
| traj = traj.merge( | |
| topo[topo_cols_needed], | |
| left_on="target_segment_id", right_on="segment_id", | |
| how="left", | |
| ) | |
| traj = _add_engineered_features(traj) | |
| traj = traj.drop( | |
| columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS, | |
| errors="ignore", | |
| ) | |
| numeric_features = ( | |
| EVENT_NUMERIC_FEATURES | |
| + SEGMENT_NUMERIC_FEATURES | |
| + [ | |
| "progress_frac", "log_queries", "perturb_intensity", | |
| "defender_weakness", "query_rate", | |
| ] | |
| ) | |
| numeric_features = [c for c in numeric_features if c in traj.columns] | |
| X_numeric = traj[numeric_features].astype(float) | |
| all_categorical = EVENT_CATEGORICAL_FEATURES + SEGMENT_CATEGORICAL_FEATURES | |
| categorical_levels: dict[str, list[str]] = {} | |
| blocks: list[pd.DataFrame] = [] | |
| for col in all_categorical: | |
| if col not in traj.columns: | |
| continue | |
| levels = sorted(traj[col].dropna().astype(str).unique().tolist()) | |
| categorical_levels[col] = levels | |
| block = pd.get_dummies( | |
| traj[col].astype(str).astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| blocks.append(block) | |
| X = pd.concat( | |
| [X_numeric.reset_index(drop=True)] | |
| + [b.reset_index(drop=True) for b in blocks], | |
| axis=1, | |
| ).fillna(0.0) | |
| meta = { | |
| "feature_names": X.columns.tolist(), | |
| "numeric_features": numeric_features, | |
| "categorical_levels": categorical_levels, | |
| "label_to_int": LABEL_TO_INT, | |
| "int_to_label": INT_TO_LABEL, | |
| "oracle_excluded": ORACLE_COLUMNS, | |
| } | |
| return X, y, ids, groups, meta | |
| def transform_single( | |
| record: dict | pd.DataFrame, | |
| meta: dict[str, Any], | |
| segment_lookup: dict | None = None, | |
| ) -> np.ndarray: | |
| """Encode a single trajectory record for inference.""" | |
| if isinstance(record, dict): | |
| df = pd.DataFrame([record.copy()]) | |
| else: | |
| df = record.copy() | |
| if segment_lookup is not None and "target_segment_id" in df.columns: | |
| seg_id = df["target_segment_id"].iloc[0] | |
| seg_feats = segment_lookup.get(seg_id, {}) | |
| for k, v in seg_feats.items(): | |
| if k not in df.columns: | |
| df[k] = v | |
| df = _add_engineered_features(df) | |
| numeric = pd.DataFrame({ | |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values | |
| for col in meta["numeric_features"] | |
| }) | |
| blocks: list[pd.DataFrame] = [numeric] | |
| for col, levels in meta["categorical_levels"].items(): | |
| val = df.get(col, pd.Series([None] * len(df))).astype(str) | |
| block = pd.get_dummies( | |
| val.astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| for lvl in levels: | |
| cname = f"{col}_{lvl}" | |
| if cname not in block.columns: | |
| block[cname] = 0 | |
| block = block[[f"{col}_{lvl}" for lvl in levels]] | |
| blocks.append(block) | |
| X = pd.concat(blocks, axis=1).fillna(0.0) | |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) | |
| return X.values.astype(np.float32) | |
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: | |
| serializable = { | |
| "feature_names": meta["feature_names"], | |
| "numeric_features": meta["numeric_features"], | |
| "categorical_levels": meta["categorical_levels"], | |
| "label_to_int": meta["label_to_int"], | |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, | |
| "oracle_excluded": meta.get("oracle_excluded", []), | |
| } | |
| with open(path, "w") as f: | |
| json.dump(serializable, f, indent=2) | |
| def load_meta(path: str | Path) -> dict[str, Any]: | |
| with open(path) as f: | |
| meta = json.load(f) | |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} | |
| return meta | |
| def build_segment_lookup(topology_path: str | Path) -> dict[str, dict]: | |
| """Build {segment_id: {segment feature values}} for inference.""" | |
| topo = pd.read_csv(topology_path) | |
| cols = SEGMENT_NUMERIC_FEATURES + SEGMENT_CATEGORICAL_FEATURES | |
| out = {} | |
| for _, row in topo.iterrows(): | |
| out[row["segment_id"]] = {c: row[c] for c in cols if c in topo.columns} | |
| return out | |
| if __name__ == "__main__": | |
| import sys | |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") | |
| X, y, ids, groups, meta = build_features( | |
| base / "attack_trajectories.csv", | |
| base / "network_topology.csv", | |
| ) | |
| print(f"X shape: {X.shape}") | |
| print(f"y shape: {y.shape}") | |
| print(f"groups: {groups.nunique()} unique campaigns") | |
| print(f"n_features: {len(meta['feature_names'])}") | |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") | |
| print(f"X has NaN: {X.isnull().any().any()}") | |