Initial release: vulnerability_class baseline + comprehensive 8-oracle-path leakage diagnostic on CYB009 sample
e520bf1 verified | """ | |
| feature_engineering.py | |
| ====================== | |
| Feature pipeline for the CYB009 baseline classifier. | |
| Predicts `vulnerability_class` (8-class vulnerability classification) | |
| from per-vulnerability features on the CYB009 sample dataset. | |
| CSV inputs: | |
| vuln_summary.csv (primary, one row per vulnerability, | |
| 2,638 vulnerabilities) | |
| asset_inventory.csv (per-asset registry, joined for asset | |
| context features) | |
| vulnerability_records.csv (per-timestep trajectory; reserved) | |
| vuln_lifecycle_events.csv (discrete event log; reserved) | |
| Target classes (8): | |
| auth_access_control, cryptographic_failure, information_disclosure, | |
| injection_family, logic_flaw, memory_corruption, misconfiguration, | |
| supply_chain_weakness | |
| Why this task (and why not the more obvious targets) | |
| ---------------------------------------------------- | |
| The CYB009 README lists 11 suggested use cases. We piloted every | |
| README-headline target on the sample dataset and found the sample | |
| has pervasive structural leakage that makes most targets either | |
| trivially solvable via oracle features or unlearnable after honest | |
| leakage removal: | |
| - `exploit_maturity_final` (4-class) is structurally leaky via | |
| `cvss_temporal_score_final`: CVSS v3.1 computes temporal score from | |
| base score using Exploit Code Maturity multipliers (0.91 / 0.94 / | |
| 0.97 / 1.00 for unproven / PoC / functional / weaponised), so the | |
| cvss_temporal/cvss_base ratio clusters near-deterministically per | |
| maturity tier (0.80 / 0.83 / 0.85 / 0.88 in the data). Drop | |
| cvss_temporal -> accuracy collapses from 0.74 to 0.31 (below | |
| majority 0.36). | |
| - `remediation_status` / `patch_status` / `lifecycle_phase` | |
| (per-timestep) form a tightly-coupled state machine. lifecycle_phase | |
| = `residual_risk_review` -> 100% `remediated`. `patch_status = | |
| deployed` -> 100% `remediated`. Any two of the three deterministically | |
| pin the third. | |
| - `severity_class` is 100% derived from `cvss_base_score` via CVSS | |
| v3.1 boundaries (low=0.1-3.9, medium=4.0-6.9, high=7.0-8.9, | |
| critical=9.0-10.0). Trivial if cvss_base included; below majority | |
| (acc 0.55 vs majority 0.51) without it. | |
| - All seven binary flags (`exploitation_occurred_flag`, `zero_day_flag`, | |
| `cisa_kev_flag`, `supply_chain_propagation_flag`, | |
| `remediation_success_flag`, `sla_compliance_flag`, | |
| `false_positive_flag`) are at-or-below majority after honest | |
| leakage removal of the event-time sentinels | |
| (`time_to_exploit_days`, `time_to_remediate_days`, `patch_lag_days`, | |
| `risk_score_composite`). See leakage_diagnostic.json. | |
| `vulnerability_class` is the only README-suggested target that learns | |
| honestly on the sample: acc 0.24, macro-F1 0.22, ROC-AUC 0.69 vs | |
| majority baseline 0.18. Modest +6pp lift over majority - the weakest | |
| baseline in the XpertSystems CYB catalog by design. The full ~487k-row | |
| product would tighten per-class signal materially. | |
| The model card frames this honestly: the strongest finding on CYB009 | |
| is the comprehensive leakage diagnostic rather than the modest | |
| classifier performance. Buyers planning CYB009 ML work should read | |
| the diagnostic first. | |
| Leakage audit | |
| ------------- | |
| Excluded as outcome leaks for this target: | |
| 1. `exploit_maturity_final` - the target's natural pair via the CVSS | |
| v3.1 temporal-score machinery. | |
| 2. Event-time sentinel oracles dropped as precaution (not directly | |
| leaky for vulnerability_class but indirectly via flag fields): | |
| `time_to_exploit_days`, `time_to_remediate_days`, `patch_lag_days`, | |
| `risk_score_composite`. | |
| 3. `cvss_temporal_score_final` excluded because of the CVSS v3.1 | |
| maturity-multiplier structural encoding. | |
| `severity_class` is KEPT as a one-hot feature because it's a derived | |
| view of `cvss_base_score` rather than the target. | |
| Binary post-hoc flags are KEPT as legitimate observables that a SOC | |
| analyst would have at decision time. They contribute modest real | |
| signal (a few pp accuracy). | |
| Public API | |
| ---------- | |
| build_features(vuln_summary_path, asset_inventory_path) | |
| -> (X, y, ids, meta) | |
| transform_single(record, meta, asset_lookup=None) -> np.ndarray | |
| save_meta(meta, path) / load_meta(path) | |
| build_asset_lookup(asset_inventory_path) -> dict | |
| License | |
| ------- | |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, | |
| matching the dataset license. See README.md. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Label space | |
| # --------------------------------------------------------------------------- | |
| # Eight vulnerability classes from the CYB009 sample. The README claims | |
| # 10 classes but only 8 exist in the sample data. | |
| LABEL_ORDER = [ | |
| "auth_access_control", | |
| "cryptographic_failure", | |
| "information_disclosure", | |
| "injection_family", | |
| "logic_flaw", | |
| "memory_corruption", | |
| "misconfiguration", | |
| "supply_chain_weakness", | |
| ] | |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} | |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} | |
| # --------------------------------------------------------------------------- | |
| # Identifier and target columns | |
| # --------------------------------------------------------------------------- | |
| ID_COLUMNS = ["vuln_id", "asset_id", "org_id"] | |
| TARGET_COLUMN = "vulnerability_class" | |
| # Outcome-leak columns excluded from features. | |
| EXCLUDED_FROM_FEATURES = [ | |
| "time_to_exploit_days", # -1 sentinel oracle | |
| "time_to_remediate_days", # 120 sentinel oracle | |
| "patch_lag_days", # likely similar sentinel | |
| "risk_score_composite", # computed from flag fields | |
| "exploit_maturity_final", # indirect leak via CVSS temporal | |
| "cvss_temporal_score_final", # near-deterministic per maturity tier | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Per-vulnerability numeric features | |
| # --------------------------------------------------------------------------- | |
| VULN_NUMERIC_FEATURES = [ | |
| "cvss_base_score", | |
| "epss_score_final", | |
| "exploitation_occurred_flag", | |
| "zero_day_flag", | |
| "cisa_kev_flag", | |
| "supply_chain_propagation_flag", | |
| "compensating_control_flag", | |
| "false_positive_flag", | |
| "remediation_success_flag", | |
| "sla_compliance_flag", | |
| ] | |
| VULN_CATEGORICAL_FEATURES = [ | |
| "severity_class", # 4 values; CVSS-derived but useful as feature | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Asset features (joined on asset_id from asset_inventory.csv) | |
| # --------------------------------------------------------------------------- | |
| ASSET_NUMERIC_FEATURES = [ | |
| "scanner_coverage", | |
| "patch_mgmt_maturity", | |
| "mean_time_to_remediate_days", | |
| "sla_critical_days", | |
| "sla_high_days", | |
| "sla_medium_days", | |
| "internet_exposed_flag", | |
| "sbom_depth_score", | |
| ] | |
| ASSET_CATEGORICAL_FEATURES = [ | |
| "asset_type", # 12 values | |
| "criticality_tier", # 4 values | |
| "environment_type", # 8 values | |
| "os_family", # 6 values | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Engineered features | |
| # --------------------------------------------------------------------------- | |
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Five engineered features for vulnerability_class discrimination. | |
| Note: no temporal-CVSS-derived features (those leak via the CVSS | |
| v3.1 exploit-code-maturity machinery). | |
| """ | |
| df = df.copy() | |
| # 1. Log-scaled EPSS. EPSS is heavy-tailed. | |
| df["log_epss"] = np.log1p( | |
| df["epss_score_final"].clip(lower=0) | |
| ).astype(float) | |
| # 2. High-CVSS indicator. CVSS >= 7.0 (high or critical). | |
| df["is_high_cvss"] = (df["cvss_base_score"] >= 7.0).astype(int) | |
| # 3. Exposure x severity composite. Internet-exposed high-severity | |
| # vulns are often weighted differently per class. | |
| df["exposure_severity_composite"] = ( | |
| df.get("internet_exposed_flag", 0) * df["cvss_base_score"] | |
| ).astype(float) | |
| # 4. Flag count: total number of risk flags raised. Different vuln | |
| # classes have different baseline flag patterns. | |
| flag_cols = [ | |
| "exploitation_occurred_flag", "zero_day_flag", "cisa_kev_flag", | |
| "supply_chain_propagation_flag", "compensating_control_flag", | |
| "false_positive_flag", | |
| ] | |
| df["risk_flag_count"] = sum(df.get(c, 0) for c in flag_cols) | |
| # 5. EPSS x CVSS composite. | |
| df["epss_x_base"] = ( | |
| df["epss_score_final"] * df["cvss_base_score"] | |
| ).astype(float) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_features( | |
| vuln_summary_path: str | Path, | |
| asset_inventory_path: str | Path, | |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: | |
| """ | |
| Load vuln_summary.csv, join asset_inventory.csv, drop target + | |
| identifiers + outcome leaks, engineer features, one-hot encode, | |
| return (X, y, ids, meta). | |
| """ | |
| vulns = pd.read_csv(vuln_summary_path) | |
| assets = pd.read_csv(asset_inventory_path) | |
| y = vulns[TARGET_COLUMN].map(LABEL_TO_INT) | |
| if y.isna().any(): | |
| bad = vulns.loc[y.isna(), TARGET_COLUMN].unique() | |
| raise ValueError(f"Unknown vulnerability_class values: {bad}") | |
| y = y.astype(int) | |
| ids = vulns["vuln_id"].copy() | |
| asset_cols_needed = ( | |
| ["asset_id"] + ASSET_NUMERIC_FEATURES + ASSET_CATEGORICAL_FEATURES | |
| ) | |
| vulns = vulns.merge( | |
| assets[asset_cols_needed], on="asset_id", how="left", | |
| ) | |
| vulns = vulns.drop( | |
| columns=ID_COLUMNS + [TARGET_COLUMN] + EXCLUDED_FROM_FEATURES, | |
| errors="ignore", | |
| ) | |
| vulns = _add_engineered_features(vulns) | |
| numeric_features = ( | |
| VULN_NUMERIC_FEATURES | |
| + ASSET_NUMERIC_FEATURES | |
| + [ | |
| "log_epss", "is_high_cvss", "exposure_severity_composite", | |
| "risk_flag_count", "epss_x_base", | |
| ] | |
| ) | |
| numeric_features = [c for c in numeric_features if c in vulns.columns] | |
| X_numeric = vulns[numeric_features].astype(float) | |
| all_categorical = VULN_CATEGORICAL_FEATURES + ASSET_CATEGORICAL_FEATURES | |
| categorical_levels: dict[str, list[str]] = {} | |
| blocks: list[pd.DataFrame] = [] | |
| for col in all_categorical: | |
| if col not in vulns.columns: | |
| continue | |
| levels = sorted(vulns[col].dropna().unique().tolist()) | |
| categorical_levels[col] = levels | |
| block = pd.get_dummies( | |
| vulns[col].astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| blocks.append(block) | |
| X = pd.concat( | |
| [X_numeric.reset_index(drop=True)] | |
| + [b.reset_index(drop=True) for b in blocks], | |
| axis=1, | |
| ).fillna(0.0) | |
| meta = { | |
| "feature_names": X.columns.tolist(), | |
| "numeric_features": numeric_features, | |
| "categorical_levels": categorical_levels, | |
| "label_to_int": LABEL_TO_INT, | |
| "int_to_label": INT_TO_LABEL, | |
| "outcome_leak_excluded": EXCLUDED_FROM_FEATURES, | |
| } | |
| return X, y, ids, meta | |
| def transform_single( | |
| record: dict | pd.DataFrame, | |
| meta: dict[str, Any], | |
| asset_lookup: dict | None = None, | |
| ) -> np.ndarray: | |
| """Encode a single vulnerability record for inference.""" | |
| if isinstance(record, dict): | |
| df = pd.DataFrame([record.copy()]) | |
| else: | |
| df = record.copy() | |
| if asset_lookup is not None and "asset_id" in df.columns: | |
| asset_id = df["asset_id"].iloc[0] | |
| asset_feats = asset_lookup.get(asset_id, {}) | |
| for k, v in asset_feats.items(): | |
| if k not in df.columns: | |
| df[k] = v | |
| df = _add_engineered_features(df) | |
| numeric = pd.DataFrame({ | |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values | |
| for col in meta["numeric_features"] | |
| }) | |
| blocks: list[pd.DataFrame] = [numeric] | |
| for col, levels in meta["categorical_levels"].items(): | |
| val = df.get(col, pd.Series([None] * len(df))) | |
| block = pd.get_dummies( | |
| val.astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| for lvl in levels: | |
| cname = f"{col}_{lvl}" | |
| if cname not in block.columns: | |
| block[cname] = 0 | |
| block = block[[f"{col}_{lvl}" for lvl in levels]] | |
| blocks.append(block) | |
| X = pd.concat(blocks, axis=1).fillna(0.0) | |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) | |
| return X.values.astype(np.float32) | |
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: | |
| serializable = { | |
| "feature_names": meta["feature_names"], | |
| "numeric_features": meta["numeric_features"], | |
| "categorical_levels": meta["categorical_levels"], | |
| "label_to_int": meta["label_to_int"], | |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, | |
| "outcome_leak_excluded": meta.get("outcome_leak_excluded", []), | |
| } | |
| with open(path, "w") as f: | |
| json.dump(serializable, f, indent=2) | |
| def load_meta(path: str | Path) -> dict[str, Any]: | |
| with open(path) as f: | |
| meta = json.load(f) | |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} | |
| return meta | |
| def build_asset_lookup(asset_inventory_path: str | Path) -> dict[str, dict]: | |
| """Build {asset_id: {asset feature values}} for inference-time lookup.""" | |
| assets = pd.read_csv(asset_inventory_path) | |
| cols = ASSET_NUMERIC_FEATURES + ASSET_CATEGORICAL_FEATURES | |
| out = {} | |
| for _, row in assets.iterrows(): | |
| out[row["asset_id"]] = {c: row[c] for c in cols if c in assets.columns} | |
| return out | |
| if __name__ == "__main__": | |
| import sys | |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") | |
| X, y, ids, meta = build_features( | |
| base / "vuln_summary.csv", | |
| base / "asset_inventory.csv", | |
| ) | |
| print(f"X shape: {X.shape}") | |
| print(f"y shape: {y.shape}") | |
| print(f"n_features: {len(meta['feature_names'])}") | |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") | |
| print(f"X has NaN: {X.isnull().any().any()}") | |