Initial release: XGBoost + MLP for SOC alert triage outcome classification, with structural-leakage and unlearnable-target diagnostic
001717c verified | """ | |
| feature_engineering.py | |
| ====================== | |
| Feature pipeline for the CYB008 baseline classifier. | |
| Predicts `resolution_outcome` (5-class triage outcome) from per-alert | |
| features on the CYB008 sample dataset. | |
| CSV inputs: | |
| soc_alerts.csv (primary, one row per alert, 9,200 alerts) | |
| soc_topology.csv (per-analyst registry; reserved for future | |
| work - 25 analysts is too small to be a | |
| useful join target beyond the analyst_tier | |
| column already on soc_alerts) | |
| incident_summary.csv (per-incident aggregates; reserved - only | |
| 9% of alerts link to an incident) | |
| alert_events.csv (discrete alert event log; reserved) | |
| Target classes (5): | |
| auto_resolved_soar, duplicate_merged, false_positive_closed, | |
| true_positive_escalated, true_positive_remediated | |
| Grouping decision | |
| ----------------- | |
| There is no natural row-level group key for CYB008: | |
| - 25 analysts -> group-aware split would yield ~4 test analysts | |
| - 5 SOCs -> group-aware split would yield ~1 test SOC | |
| - 589 incidents -> only 9% of alerts have a non-null incident_id | |
| This baseline uses STRATIFIED random splitting (like CYB001 for network | |
| flows), which is the right choice when alerts are independent given | |
| features. The model card documents this rationale. | |
| Leakage audit | |
| ------------- | |
| Three columns are structural oracles for resolution_outcome and are | |
| DROPPED from the feature set: | |
| 1. `alert_lifecycle_phase` (4 values: auto_closed, escalated, resolved, | |
| suppressed_duplicate): three of the four values map deterministically | |
| to specific resolution_outcome classes. Drop. | |
| 2. `automation_resolved` (binary): exactly 1:1 with auto_resolved_soar | |
| outcome. Drop. | |
| 3. `escalation_flag` (binary): near-1:1 with true_positive_escalated | |
| outcome (1319 escalation flags = 1319 escalated outcomes). Drop. | |
| With all three dropped, accuracy drops from 100% to 79% - confirming | |
| they were structural oracles, not real predictive signal. | |
| `soar_playbook_triggered` is a PARTIAL oracle (one-way necessary | |
| condition: auto_resolved_soar => soar_playbook_triggered=1, but | |
| soar_playbook_triggered=1 also yields 32% non-auto-resolve outcomes). | |
| This is a legitimate observable - a SOAR playbook actually executing | |
| is part of how the alert is triaged. KEPT. | |
| `mitre_technique_id` is a perfect oracle for mitre_tactic (every T- | |
| number belongs to one tactic by ATT&CK design) but has no relationship | |
| to resolution_outcome. It is high-cardinality (36 values from a small | |
| sample of a 600+-value enterprise space) and contributes nothing to | |
| this task. Dropped for parsimony. | |
| `detection_rule_id` has 656 unique values - too high-cardinality for | |
| one-hot encoding. Dropped. | |
| Identifier / non-feature columns | |
| -------------------------------- | |
| Dropped: alert_id, incident_id (mostly null), analyst_id, soc_id, | |
| shift_id, alert_timestamp_min, soar_playbook_id (high cardinality). | |
| Public API | |
| ---------- | |
| build_features(alerts_path) -> (X, y, ids, meta) | |
| transform_single(record, meta) -> np.ndarray | |
| save_meta(meta, path) / load_meta(path) | |
| License | |
| ------- | |
| Ships with the public model on Hugging Face under CC-BY-NC-4.0, | |
| matching the dataset license. See README.md. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Label space | |
| # --------------------------------------------------------------------------- | |
| # Ordered by triage spectrum: auto -> dup -> FP -> TP-remediate -> TP-escalate | |
| LABEL_ORDER = [ | |
| "auto_resolved_soar", | |
| "duplicate_merged", | |
| "false_positive_closed", | |
| "true_positive_remediated", | |
| "true_positive_escalated", | |
| ] | |
| LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)} | |
| INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()} | |
| # --------------------------------------------------------------------------- | |
| # Identifier and target columns | |
| # --------------------------------------------------------------------------- | |
| ID_COLUMNS = [ | |
| "alert_id", "incident_id", "analyst_id", "soc_id", "shift_id", | |
| "alert_timestamp_min", "soar_playbook_id", | |
| ] | |
| TARGET_COLUMN = "resolution_outcome" | |
| # Structural oracle columns - dropped from features. | |
| ORACLE_COLUMNS = [ | |
| "alert_lifecycle_phase", # deterministically maps to 3 of 5 outcomes | |
| "automation_resolved", # 1:1 with auto_resolved_soar outcome | |
| "escalation_flag", # 1:1 with true_positive_escalated outcome | |
| ] | |
| # High-cardinality categorical columns - dropped for tractability. | |
| HIGH_CARDINALITY_COLUMNS = [ | |
| "mitre_technique_id", # 36 values; no relationship to outcome | |
| "detection_rule_id", # 656 values; one-hot explosion | |
| ] | |
| DROPPED_FROM_FEATURES = ORACLE_COLUMNS + HIGH_CARDINALITY_COLUMNS | |
| # --------------------------------------------------------------------------- | |
| # Per-alert numeric features | |
| # --------------------------------------------------------------------------- | |
| DIRECT_NUMERIC_FEATURES = [ | |
| "raw_score", | |
| "enriched_score", | |
| "time_in_phase_minutes", | |
| "queue_depth_at_ingestion", | |
| "soar_playbook_triggered", # partial oracle, kept as observable | |
| "sla_breached_flag", | |
| "mttd_minutes", | |
| "mttr_minutes", | |
| "fatigue_score_at_alert", | |
| ] | |
| CATEGORICAL_FEATURES = [ | |
| "alert_severity", # 7 values | |
| "alert_source", # 8 values | |
| "mitre_tactic", # 12 values | |
| "analyst_tier", # 3 values (alerts) / 4 (topology) -- 3 here | |
| "siem_platform", # 8 values | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Engineered features | |
| # --------------------------------------------------------------------------- | |
| def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Six engineered features encoding triage-outcome hypotheses. | |
| Each composite is a quantity a SOC analyst would compute by hand | |
| to assess an alert's likely disposition. | |
| """ | |
| df = df.copy() | |
| # 1. Enrichment lift: how much enrichment improved the raw score. | |
| # Positive lift = enrichment increased confidence (often -> TP). | |
| df["enrichment_lift"] = ( | |
| df["enriched_score"] - df["raw_score"] | |
| ).astype(float) | |
| # 2. Log-scaled MTTR. MTTR is heavy-tailed (auto-resolves seconds, | |
| # escalations hours). log1p compresses for both XGBoost and MLP. | |
| df["log_mttr"] = np.log1p(df["mttr_minutes"].clip(lower=0)).astype(float) | |
| # 3. Log-scaled MTTD. Same heavy-tail shape. | |
| df["log_mttd"] = np.log1p(df["mttd_minutes"].clip(lower=0)).astype(float) | |
| # 4. Queue pressure: queue depth times analyst fatigue. High = | |
| # overloaded analyst, more likely to auto-resolve or escalate. | |
| df["queue_pressure"] = ( | |
| df["queue_depth_at_ingestion"] * df["fatigue_score_at_alert"] | |
| ).astype(float) | |
| # 5. Triage time efficiency: enrichment_score per minute in phase. | |
| df["enrichment_per_minute"] = ( | |
| df["enriched_score"] / df["time_in_phase_minutes"].clip(lower=0.1) | |
| ).astype(float) | |
| # 6. Is high-confidence alert: enriched score above 0.7 typically | |
| # indicates a strong signal that warrants escalation. | |
| df["is_high_confidence"] = (df["enriched_score"] > 0.7).astype(int) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_features( | |
| alerts_path: str | Path, | |
| ) -> tuple[pd.DataFrame, pd.Series, pd.Series, dict[str, Any]]: | |
| """ | |
| Load soc_alerts.csv, drop target + identifiers + oracle columns, | |
| engineer features, one-hot encode, return (X, y, ids, meta). | |
| `ids` is a Series of alert_id values aligned with X (used for | |
| round-tripping; not a group label since this task uses stratified | |
| random splitting). | |
| """ | |
| alerts = pd.read_csv(alerts_path) | |
| y = alerts[TARGET_COLUMN].map(LABEL_TO_INT) | |
| if y.isna().any(): | |
| bad = alerts.loc[y.isna(), TARGET_COLUMN].unique() | |
| raise ValueError(f"Unknown resolution_outcome values: {bad}") | |
| y = y.astype(int) | |
| ids = alerts["alert_id"].copy() | |
| alerts = alerts.drop( | |
| columns=ID_COLUMNS + [TARGET_COLUMN] + DROPPED_FROM_FEATURES, | |
| errors="ignore", | |
| ) | |
| alerts = _add_engineered_features(alerts) | |
| numeric_features = ( | |
| DIRECT_NUMERIC_FEATURES | |
| + [ | |
| "enrichment_lift", "log_mttr", "log_mttd", | |
| "queue_pressure", "enrichment_per_minute", "is_high_confidence", | |
| ] | |
| ) | |
| numeric_features = [c for c in numeric_features if c in alerts.columns] | |
| X_numeric = alerts[numeric_features].astype(float) | |
| categorical_levels: dict[str, list[str]] = {} | |
| blocks: list[pd.DataFrame] = [] | |
| for col in CATEGORICAL_FEATURES: | |
| if col not in alerts.columns: | |
| continue | |
| levels = sorted(alerts[col].dropna().unique().tolist()) | |
| categorical_levels[col] = levels | |
| block = pd.get_dummies( | |
| alerts[col].astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| blocks.append(block) | |
| X = pd.concat( | |
| [X_numeric.reset_index(drop=True)] | |
| + [b.reset_index(drop=True) for b in blocks], | |
| axis=1, | |
| ).fillna(0.0) | |
| meta = { | |
| "feature_names": X.columns.tolist(), | |
| "numeric_features": numeric_features, | |
| "categorical_levels": categorical_levels, | |
| "label_to_int": LABEL_TO_INT, | |
| "int_to_label": INT_TO_LABEL, | |
| "oracle_excluded": ORACLE_COLUMNS, | |
| "high_cardinality_excluded": HIGH_CARDINALITY_COLUMNS, | |
| } | |
| return X, y, ids, meta | |
| def transform_single( | |
| record: dict | pd.DataFrame, | |
| meta: dict[str, Any], | |
| ) -> np.ndarray: | |
| """Encode a single alert record for inference.""" | |
| if isinstance(record, dict): | |
| df = pd.DataFrame([record.copy()]) | |
| else: | |
| df = record.copy() | |
| df = _add_engineered_features(df) | |
| numeric = pd.DataFrame({ | |
| col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values | |
| for col in meta["numeric_features"] | |
| }) | |
| blocks: list[pd.DataFrame] = [numeric] | |
| for col, levels in meta["categorical_levels"].items(): | |
| val = df.get(col, pd.Series([None] * len(df))) | |
| block = pd.get_dummies( | |
| val.astype("category").cat.set_categories(levels), | |
| prefix=col, dummy_na=False, | |
| ).astype(int) | |
| for lvl in levels: | |
| cname = f"{col}_{lvl}" | |
| if cname not in block.columns: | |
| block[cname] = 0 | |
| block = block[[f"{col}_{lvl}" for lvl in levels]] | |
| blocks.append(block) | |
| X = pd.concat(blocks, axis=1).fillna(0.0) | |
| X = X.reindex(columns=meta["feature_names"], fill_value=0.0) | |
| return X.values.astype(np.float32) | |
| def save_meta(meta: dict[str, Any], path: str | Path) -> None: | |
| serializable = { | |
| "feature_names": meta["feature_names"], | |
| "numeric_features": meta["numeric_features"], | |
| "categorical_levels": meta["categorical_levels"], | |
| "label_to_int": meta["label_to_int"], | |
| "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()}, | |
| "oracle_excluded": meta.get("oracle_excluded", []), | |
| "high_cardinality_excluded": meta.get("high_cardinality_excluded", []), | |
| } | |
| with open(path, "w") as f: | |
| json.dump(serializable, f, indent=2) | |
| def load_meta(path: str | Path) -> dict[str, Any]: | |
| with open(path) as f: | |
| meta = json.load(f) | |
| meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()} | |
| return meta | |
| if __name__ == "__main__": | |
| import sys | |
| base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads") | |
| X, y, ids, meta = build_features(base / "soc_alerts.csv") | |
| print(f"X shape: {X.shape}") | |
| print(f"y shape: {y.shape}") | |
| print(f"n_features: {len(meta['feature_names'])}") | |
| print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}") | |
| print(f"X has NaN: {X.isnull().any().any()}") | |