""" feature_engineering.py ====================== Feature pipeline for the CYB009 baseline classifier. Predicts `vulnerability_class` (8-class vulnerability classification) from per-vulnerability features on the CYB009 sample dataset. CSV inputs: vuln_summary.csv (primary, one row per vulnerability, 2,638 vulnerabilities) asset_inventory.csv (per-asset registry, joined for asset context features) vulnerability_records.csv (per-timestep trajectory; reserved) vuln_lifecycle_events.csv (discrete event log; reserved) Target classes (8): auth_access_control, cryptographic_failure, information_disclosure, injection_family, logic_flaw, memory_corruption, misconfiguration, supply_chain_weakness Why this task (and why not the more obvious targets) ---------------------------------------------------- The CYB009 README lists 11 suggested use cases. We piloted every README-headline target on the sample dataset and found the sample has pervasive structural leakage that makes most targets either trivially solvable via oracle features or unlearnable after honest leakage removal: - `exploit_maturity_final` (4-class) is structurally leaky via `cvss_temporal_score_final`: CVSS v3.1 computes temporal score from base score using Exploit Code Maturity multipliers (0.91 / 0.94 / 0.97 / 1.00 for unproven / PoC / functional / weaponised), so the cvss_temporal/cvss_base ratio clusters near-deterministically per maturity tier (0.80 / 0.83 / 0.85 / 0.88 in the data). Drop cvss_temporal -> accuracy collapses from 0.74 to 0.31 (below majority 0.36). - `remediation_status` / `patch_status` / `lifecycle_phase` (per-timestep) form a tightly-coupled state machine. lifecycle_phase = `residual_risk_review` -> 100% `remediated`. `patch_status = deployed` -> 100% `remediated`. Any two of the three deterministically pin the third. - `severity_class` is 100% derived from `cvss_base_score` via CVSS v3.1 boundaries (low=0.1-3.9, medium=4.0-6.9, high=7.0-8.9, critical=9.0-10.0). Trivial if cvss_base included; below majority (acc 0.55 vs majority 0.51) without it. - All seven binary flags (`exploitation_occurred_flag`, `zero_day_flag`, `cisa_kev_flag`, `supply_chain_propagation_flag`, `remediation_success_flag`, `sla_compliance_flag`, `false_positive_flag`) are at-or-below majority after honest leakage removal of the event-time sentinels (`time_to_exploit_days`, `time_to_remediate_days`, `patch_lag_days`, `risk_score_composite`). See leakage_diagnostic.json. `vulnerability_class` is the only README-suggested target that learns honestly on the sample: acc 0.24, macro-F1 0.22, ROC-AUC 0.69 vs majority baseline 0.18. Modest +6pp lift over majority - the weakest baseline in the XpertSystems CYB catalog by design. The full ~487k-row product would tighten per-class signal materially. The model card frames this honestly: the strongest finding on CYB009 is the comprehensive leakage diagnostic rather than the modest classifier performance. Buyers planning CYB009 ML work should read the diagnostic first. Leakage audit ------------- Excluded as outcome leaks for this target: 1. `exploit_maturity_final` - the target's natural pair via the CVSS v3.1 temporal-score machinery. 2. Event-time sentinel oracles dropped as precaution (not directly leaky for vulnerability_class but indirectly via flag fields): `time_to_exploit_days`, `time_to_remediate_days`, `patch_lag_days`, `risk_score_composite`. 3. `cvss_temporal_score_final` excluded because of the CVSS v3.1 maturity-multiplier structural encoding. `severity_class` is KEPT as a one-hot feature because it's a derived view of `cvss_base_score` rather than the target. Binary post-hoc flags are KEPT as legitimate observables that a SOC analyst would have at decision time. They contribute modest real signal (a few pp accuracy). Public API ---------- build_features(vuln_summary_path, asset_inventory_path) -> (X, y, ids, meta) transform_single(record, meta, asset_lookup=None) -> np.ndarray save_meta(meta, path) / load_meta(path) build_asset_lookup(asset_inventory_path) -> dict License ------- Ships with the public model on Hugging Face under CC-BY-NC-4.0, matching the dataset license. See README.md. """ from __future__ import annotations import json from pathlib import Path from typing import Any import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Label space # --------------------------------------------------------------------------- # Eight vulnerability classes from the CYB009 sample. The README claims # 10 classes but only 8 exist in the sample data. LABEL_ORDER = [ "auth_access_control", "cryptographic_failure", "information_disclosure", "injection_family", "logic_flaw", "memory_corruption", "misconfiguration", "supply_chain_weakness", ] LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} # --------------------------------------------------------------------------- # Identifier and target columns # --------------------------------------------------------------------------- ID_COLUMNS = ["vuln_id", "asset_id", "org_id"] TARGET_COLUMN = "vulnerability_class" # Outcome-leak columns excluded from features. EXCLUDED_FROM_FEATURES = [ "time_to_exploit_days", # -1 sentinel oracle "time_to_remediate_days", # 120 sentinel oracle "patch_lag_days", # likely similar sentinel "risk_score_composite", # computed from flag fields "exploit_maturity_final", # indirect leak via CVSS temporal "cvss_temporal_score_final", # near-deterministic per maturity tier ] # --------------------------------------------------------------------------- # Per-vulnerability numeric features # --------------------------------------------------------------------------- VULN_NUMERIC_FEATURES = [ "cvss_base_score", "epss_score_final", "exploitation_occurred_flag", "zero_day_flag", "cisa_kev_flag", "supply_chain_propagation_flag", "compensating_control_flag", "false_positive_flag", "remediation_success_flag", "sla_compliance_flag", ] VULN_CATEGORICAL_FEATURES = [ "severity_class", # 4 values; CVSS-derived but useful as feature ] # --------------------------------------------------------------------------- # Asset features (joined on asset_id from asset_inventory.csv) # --------------------------------------------------------------------------- ASSET_NUMERIC_FEATURES = [ "scanner_coverage", "patch_mgmt_maturity", "mean_time_to_remediate_days", "sla_critical_days", "sla_high_days", "sla_medium_days", "internet_exposed_flag", "sbom_depth_score", ] ASSET_CATEGORICAL_FEATURES = [ "asset_type", # 12 values "criticality_tier", # 4 values "environment_type", # 8 values "os_family", # 6 values ] # --------------------------------------------------------------------------- # Engineered features # --------------------------------------------------------------------------- def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: """ Five engineered features for vulnerability_class discrimination. Note: no temporal-CVSS-derived features (those leak via the CVSS v3.1 exploit-code-maturity machinery). """ df = df.copy() # 1. Log-scaled EPSS. EPSS is heavy-tailed. df["log_epss"] = np.log1p( df["epss_score_final"].clip(lower=0) ).astype(float) # 2. High-CVSS indicator. CVSS >= 7.0 (high or critical). df["is_high_cvss"] = (df["cvss_base_score"] >= 7.0).astype(int) # 3. Exposure x severity composite. Internet-exposed high-severity # vulns are often weighted differently per class. df["exposure_severity_composite"] = ( df.get("internet_exposed_flag", 0) * df["cvss_base_score"] ).astype(float) # 4. Flag count: total number of risk flags raised. Different vuln # classes have different baseline flag patterns. flag_cols = [ "exploitation_occurred_flag", "zero_day_flag", "cisa_kev_flag", "supply_chain_propagation_flag", "compensating_control_flag", "false_positive_flag", ] df["risk_flag_count"] = sum(df.get(c, 0) for c in flag_cols) # 5. EPSS x CVSS composite. df["epss_x_base"] = ( df["epss_score_final"] * df["cvss_base_score"] ).astype(float) return df # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_features( vuln_summary_path: str | Path, asset_inventory_path: str | Path, ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: """ Load vuln_summary.csv, join asset_inventory.csv, drop target + identifiers + outcome leaks, engineer features, one-hot encode, return (X, y, ids, meta). """ vulns = pd.read_csv(vuln_summary_path) assets = pd.read_csv(asset_inventory_path) y = vulns[TARGET_COLUMN].map(LABEL_TO_INT) if y.isna().any(): bad = vulns.loc[y.isna(), TARGET_COLUMN].unique() raise ValueError(f"Unknown vulnerability_class values: {bad}") y = y.astype(int) ids = vulns["vuln_id"].copy() asset_cols_needed = ( ["asset_id"] + ASSET_NUMERIC_FEATURES + ASSET_CATEGORICAL_FEATURES ) vulns = vulns.merge( assets[asset_cols_needed], on="asset_id", how="left", ) vulns = vulns.drop( columns=ID_COLUMNS + [TARGET_COLUMN] + EXCLUDED_FROM_FEATURES, errors="ignore", ) vulns = _add_engineered_features(vulns) numeric_features = ( VULN_NUMERIC_FEATURES + ASSET_NUMERIC_FEATURES + [ "log_epss", "is_high_cvss", "exposure_severity_composite", "risk_flag_count", "epss_x_base", ] ) numeric_features = [c for c in numeric_features if c in vulns.columns] X_numeric = vulns[numeric_features].astype(float) all_categorical = VULN_CATEGORICAL_FEATURES + ASSET_CATEGORICAL_FEATURES categorical_levels: dict[str, list[str]] = {} blocks: list[pd.DataFrame] = [] for col in all_categorical: if col not in vulns.columns: continue levels = sorted(vulns[col].dropna().unique().tolist()) categorical_levels[col] = levels block = pd.get_dummies( vulns[col].astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) blocks.append(block) X = pd.concat( [X_numeric.reset_index(drop=True)] + [b.reset_index(drop=True) for b in blocks], axis=1, ).fillna(0.0) meta = { "feature_names": X.columns.tolist(), "numeric_features": numeric_features, "categorical_levels": categorical_levels, "label_to_int": LABEL_TO_INT, "int_to_label": INT_TO_LABEL, "outcome_leak_excluded": EXCLUDED_FROM_FEATURES, } return X, y, ids, meta def transform_single( record: dict | pd.DataFrame, meta: dict[str, Any], asset_lookup: dict | None = None, ) -> np.ndarray: """Encode a single vulnerability record for inference.""" if isinstance(record, dict): df = pd.DataFrame([record.copy()]) else: df = record.copy() if asset_lookup is not None and "asset_id" in df.columns: asset_id = df["asset_id"].iloc[0] asset_feats = asset_lookup.get(asset_id, {}) for k, v in asset_feats.items(): if k not in df.columns: df[k] = v df = _add_engineered_features(df) numeric = pd.DataFrame({ col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values for col in meta["numeric_features"] }) blocks: list[pd.DataFrame] = [numeric] for col, levels in meta["categorical_levels"].items(): val = df.get(col, pd.Series([None] * len(df))) block = pd.get_dummies( val.astype("category").cat.set_categories(levels), prefix=col, dummy_na=False, ).astype(int) for lvl in levels: cname = f"{col}_{lvl}" if cname not in block.columns: block[cname] = 0 block = block[[f"{col}_{lvl}" for lvl in levels]] blocks.append(block) X = pd.concat(blocks, axis=1).fillna(0.0) X = X.reindex(columns=meta["feature_names"], fill_value=0.0) return X.values.astype(np.float32) def save_meta(meta: dict[str, Any], path: str | Path) -> None: serializable = { "feature_names": meta["feature_names"], "numeric_features": meta["numeric_features"], "categorical_levels": meta["categorical_levels"], "label_to_int": meta["label_to_int"], "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, "outcome_leak_excluded": meta.get("outcome_leak_excluded", []), } with open(path, "w") as f: json.dump(serializable, f, indent=2) def load_meta(path: str | Path) -> dict[str, Any]: with open(path) as f: meta = json.load(f) meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} return meta def build_asset_lookup(asset_inventory_path: str | Path) -> dict[str, dict]: """Build {asset_id: {asset feature values}} for inference-time lookup.""" assets = pd.read_csv(asset_inventory_path) cols = ASSET_NUMERIC_FEATURES + ASSET_CATEGORICAL_FEATURES out = {} for _, row in assets.iterrows(): out[row["asset_id"]] = {c: row[c] for c in cols if c in assets.columns} return out if __name__ == "__main__": import sys base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") X, y, ids, meta = build_features( base / "vuln_summary.csv", base / "asset_inventory.csv", ) print(f"X shape: {X.shape}") print(f"y shape: {y.shape}") print(f"n_features: {len(meta['feature_names'])}") print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") print(f"X has NaN: {X.isnull().any().any()}")