cyb009-baseline-classifier / feature_engineering.py
pradeep-xpert's picture
Initial release: vulnerability_class baseline + comprehensive 8-oracle-path leakage diagnostic on CYB009 sample
e520bf1 verified
"""
feature_engineering.py
======================
Feature pipeline for the CYB009 baseline classifier.
Predicts `vulnerability_class` (8-class vulnerability classification)
from per-vulnerability features on the CYB009 sample dataset.
CSV inputs:
vuln_summary.csv (primary, one row per vulnerability,
2,638 vulnerabilities)
asset_inventory.csv (per-asset registry, joined for asset
context features)
vulnerability_records.csv (per-timestep trajectory; reserved)
vuln_lifecycle_events.csv (discrete event log; reserved)
Target classes (8):
auth_access_control, cryptographic_failure, information_disclosure,
injection_family, logic_flaw, memory_corruption, misconfiguration,
supply_chain_weakness
Why this task (and why not the more obvious targets)
----------------------------------------------------
The CYB009 README lists 11 suggested use cases. We piloted every
README-headline target on the sample dataset and found the sample
has pervasive structural leakage that makes most targets either
trivially solvable via oracle features or unlearnable after honest
leakage removal:
- `exploit_maturity_final` (4-class) is structurally leaky via
`cvss_temporal_score_final`: CVSS v3.1 computes temporal score from
base score using Exploit Code Maturity multipliers (0.91 / 0.94 /
0.97 / 1.00 for unproven / PoC / functional / weaponised), so the
cvss_temporal/cvss_base ratio clusters near-deterministically per
maturity tier (0.80 / 0.83 / 0.85 / 0.88 in the data). Drop
cvss_temporal -> accuracy collapses from 0.74 to 0.31 (below
majority 0.36).
- `remediation_status` / `patch_status` / `lifecycle_phase`
(per-timestep) form a tightly-coupled state machine. lifecycle_phase
= `residual_risk_review` -> 100% `remediated`. `patch_status =
deployed` -> 100% `remediated`. Any two of the three deterministically
pin the third.
- `severity_class` is 100% derived from `cvss_base_score` via CVSS
v3.1 boundaries (low=0.1-3.9, medium=4.0-6.9, high=7.0-8.9,
critical=9.0-10.0). Trivial if cvss_base included; below majority
(acc 0.55 vs majority 0.51) without it.
- All seven binary flags (`exploitation_occurred_flag`, `zero_day_flag`,
`cisa_kev_flag`, `supply_chain_propagation_flag`,
`remediation_success_flag`, `sla_compliance_flag`,
`false_positive_flag`) are at-or-below majority after honest
leakage removal of the event-time sentinels
(`time_to_exploit_days`, `time_to_remediate_days`, `patch_lag_days`,
`risk_score_composite`). See leakage_diagnostic.json.
`vulnerability_class` is the only README-suggested target that learns
honestly on the sample: acc 0.24, macro-F1 0.22, ROC-AUC 0.69 vs
majority baseline 0.18. Modest +6pp lift over majority - the weakest
baseline in the XpertSystems CYB catalog by design. The full ~487k-row
product would tighten per-class signal materially.
The model card frames this honestly: the strongest finding on CYB009
is the comprehensive leakage diagnostic rather than the modest
classifier performance. Buyers planning CYB009 ML work should read
the diagnostic first.
Leakage audit
-------------
Excluded as outcome leaks for this target:
1. `exploit_maturity_final` - the target's natural pair via the CVSS
v3.1 temporal-score machinery.
2. Event-time sentinel oracles dropped as precaution (not directly
leaky for vulnerability_class but indirectly via flag fields):
`time_to_exploit_days`, `time_to_remediate_days`, `patch_lag_days`,
`risk_score_composite`.
3. `cvss_temporal_score_final` excluded because of the CVSS v3.1
maturity-multiplier structural encoding.
`severity_class` is KEPT as a one-hot feature because it's a derived
view of `cvss_base_score` rather than the target.
Binary post-hoc flags are KEPT as legitimate observables that a SOC
analyst would have at decision time. They contribute modest real
signal (a few pp accuracy).
Public API
----------
build_features(vuln_summary_path, asset_inventory_path)
-> (X, y, ids, meta)
transform_single(record, meta, asset_lookup=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_asset_lookup(asset_inventory_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Eight vulnerability classes from the CYB009 sample. The README claims
# 10 classes but only 8 exist in the sample data.
LABEL_ORDER = [
"auth_access_control",
"cryptographic_failure",
"information_disclosure",
"injection_family",
"logic_flaw",
"memory_corruption",
"misconfiguration",
"supply_chain_weakness",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns
# ---------------------------------------------------------------------------
ID_COLUMNS = ["vuln_id", "asset_id", "org_id"]
TARGET_COLUMN = "vulnerability_class"
# Outcome-leak columns excluded from features.
EXCLUDED_FROM_FEATURES = [
"time_to_exploit_days", # -1 sentinel oracle
"time_to_remediate_days", # 120 sentinel oracle
"patch_lag_days", # likely similar sentinel
"risk_score_composite", # computed from flag fields
"exploit_maturity_final", # indirect leak via CVSS temporal
"cvss_temporal_score_final", # near-deterministic per maturity tier
]
# ---------------------------------------------------------------------------
# Per-vulnerability numeric features
# ---------------------------------------------------------------------------
VULN_NUMERIC_FEATURES = [
"cvss_base_score",
"epss_score_final",
"exploitation_occurred_flag",
"zero_day_flag",
"cisa_kev_flag",
"supply_chain_propagation_flag",
"compensating_control_flag",
"false_positive_flag",
"remediation_success_flag",
"sla_compliance_flag",
]
VULN_CATEGORICAL_FEATURES = [
"severity_class", # 4 values; CVSS-derived but useful as feature
]
# ---------------------------------------------------------------------------
# Asset features (joined on asset_id from asset_inventory.csv)
# ---------------------------------------------------------------------------
ASSET_NUMERIC_FEATURES = [
"scanner_coverage",
"patch_mgmt_maturity",
"mean_time_to_remediate_days",
"sla_critical_days",
"sla_high_days",
"sla_medium_days",
"internet_exposed_flag",
"sbom_depth_score",
]
ASSET_CATEGORICAL_FEATURES = [
"asset_type", # 12 values
"criticality_tier", # 4 values
"environment_type", # 8 values
"os_family", # 6 values
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Five engineered features for vulnerability_class discrimination.
Note: no temporal-CVSS-derived features (those leak via the CVSS
v3.1 exploit-code-maturity machinery).
"""
df = df.copy()
# 1. Log-scaled EPSS. EPSS is heavy-tailed.
df["log_epss"] = np.log1p(
df["epss_score_final"].clip(lower=0)
).astype(float)
# 2. High-CVSS indicator. CVSS >= 7.0 (high or critical).
df["is_high_cvss"] = (df["cvss_base_score"] >= 7.0).astype(int)
# 3. Exposure x severity composite. Internet-exposed high-severity
# vulns are often weighted differently per class.
df["exposure_severity_composite"] = (
df.get("internet_exposed_flag", 0) * df["cvss_base_score"]
).astype(float)
# 4. Flag count: total number of risk flags raised. Different vuln
# classes have different baseline flag patterns.
flag_cols = [
"exploitation_occurred_flag", "zero_day_flag", "cisa_kev_flag",
"supply_chain_propagation_flag", "compensating_control_flag",
"false_positive_flag",
]
df["risk_flag_count"] = sum(df.get(c, 0) for c in flag_cols)
# 5. EPSS x CVSS composite.
df["epss_x_base"] = (
df["epss_score_final"] * df["cvss_base_score"]
).astype(float)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
vuln_summary_path: str | Path,
asset_inventory_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]:
"""
Load vuln_summary.csv, join asset_inventory.csv, drop target +
identifiers + outcome leaks, engineer features, one-hot encode,
return (X, y, ids, meta).
"""
vulns = pd.read_csv(vuln_summary_path)
assets = pd.read_csv(asset_inventory_path)
y = vulns[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = vulns.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown vulnerability_class values: {bad}")
y = y.astype(int)
ids = vulns["vuln_id"].copy()
asset_cols_needed = (
["asset_id"] + ASSET_NUMERIC_FEATURES + ASSET_CATEGORICAL_FEATURES
)
vulns = vulns.merge(
assets[asset_cols_needed], on="asset_id", how="left",
)
vulns = vulns.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + EXCLUDED_FROM_FEATURES,
errors="ignore",
)
vulns = _add_engineered_features(vulns)
numeric_features = (
VULN_NUMERIC_FEATURES
+ ASSET_NUMERIC_FEATURES
+ [
"log_epss", "is_high_cvss", "exposure_severity_composite",
"risk_flag_count", "epss_x_base",
]
)
numeric_features = [c for c in numeric_features if c in vulns.columns]
X_numeric = vulns[numeric_features].astype(float)
all_categorical = VULN_CATEGORICAL_FEATURES + ASSET_CATEGORICAL_FEATURES
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in all_categorical:
if col not in vulns.columns:
continue
levels = sorted(vulns[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
vulns[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"outcome_leak_excluded": EXCLUDED_FROM_FEATURES,
}
return X, y, ids, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
asset_lookup: dict | None = None,
) -> np.ndarray:
"""Encode a single vulnerability record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if asset_lookup is not None and "asset_id" in df.columns:
asset_id = df["asset_id"].iloc[0]
asset_feats = asset_lookup.get(asset_id, {})
for k, v in asset_feats.items():
if k not in df.columns:
df[k] = v
df = _add_engineered_features(df)
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"outcome_leak_excluded": meta.get("outcome_leak_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_asset_lookup(asset_inventory_path: str | Path) -> dict[str, dict]:
"""Build {asset_id: {asset feature values}} for inference-time lookup."""
assets = pd.read_csv(asset_inventory_path)
cols = ASSET_NUMERIC_FEATURES + ASSET_CATEGORICAL_FEATURES
out = {}
for _, row in assets.iterrows():
out[row["asset_id"]] = {c: row[c] for c in cols if c in assets.columns}
return out
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, ids, meta = build_features(
base / "vuln_summary.csv",
base / "asset_inventory.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"n_features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")