File size: 14,662 Bytes
e2c4702 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 | """
feature_engineering.py
======================
Feature pipeline for the CYB010 baseline classifier.
Predicts `attack_lifecycle_phase` (5-class attack phase) from per-event
features on the CYB010 sample dataset.
CSV inputs:
security_events.csv (primary, one row per event, 21,896 events)
host_inventory.csv (per-host registry, joined for host context)
alert_records.csv (per-alert records; reserved)
incident_summary.csv (per-incident summaries; reserved)
Target classes (5):
benign_background, initial_access, lateral_movement,
persistence_establishment, exfiltration_or_impact
Why this task
-------------
The CYB010 README's central concept is the "5-phase attack lifecycle
state machine", and `attack_lifecycle_phase` is the data's headline
target. We piloted six candidate targets and found it gives the
strongest honest result on the sample (acc 0.95, macro-F1 0.78,
ROC-AUC 0.99 with group-aware split on incident_id).
The other README-suggested targets either have unrecoverable structural
leakage or are weaker after honest leak removal:
- `threat_actor_profile` 5-class works (acc 0.84) but is benign-driven
- 4-class malicious-only collapses to acc 0.57 vs majority 0.61.
- `label_true_positive` on alerts has 9 oracle features; after dropping
all of them, honest acc 0.80, AUC 0.89 (documented as a secondary
finding in leakage_diagnostic.json).
- `mitre_tactic` 14-class hits 0.90 acc but macro-F1 0.37 - imbalance
gaming (benign class dominates at 57%).
- `event_class` 12-class is unlearnable (acc 0.35 vs majority 0.42).
Group structure
---------------
500 incidents x ~44 events each. The per-event task has clear group
structure: events from the same incident share host, threat actor, and
phase trajectory. Group-aware split by `incident_id` is required to
prevent train/test contamination. With 500 incidents, ~75 test
incidents per fold gives reasonable estimation precision.
Leakage audit
-------------
Four columns dropped from features because they're structural oracles
for the target:
1. `mitre_tactic`: when == "benign", deterministically pins
attack_lifecycle_phase == "benign_background" (12,448 cases - all
benign events).
2. `mitre_technique_id`: perfect oracle for `mitre_tactic` by ATT&CK
design (54 techniques, each maps to exactly one tactic). Dropped
because it indirectly encodes the benign vs malicious distinction.
3. `label_malicious`: when False, perfect oracle for
benign_background phase.
4. `threat_actor_id`: when == "NONE", perfect oracle for benign
profile/phase. The non-"NONE" actor IDs are 10 distinct labels
that would also leak actor profile information indirectly.
5. `threat_actor_profile`: contains "benign_user" which trivially
identifies benign_background phase.
6. `event_type`: many event types are phase-specific
(`c2_beacon_outbound` -> 99% exfiltration_or_impact). Dropped to
avoid this near-oracle path.
KEPT features that are informative but NOT oracles:
- `event_class` (12 values): max purity 0.87, mean 0.72 - real signal
with substantial overlap. C2 beacons (network_flow class) hit 65%
exfil phase but also 29% benign. Strong feature, kept.
- `severity_level`, `cvss_score_analogue`: per-event severity is a
real observable, correlates with phase, has overlap.
- `label_log_tampered`: real observable (APTs tamper more), correlates
with malicious phases but not deterministic.
- `log_source_type`, `siem_platform`: not phase-deterministic.
- All host context features.
Public API
----------
build_features(events_path, hosts_path) -> (X, y, ids, groups, meta)
transform_single(record, meta, host_lookup=None) -> np.ndarray
save_meta(meta, path) / load_meta(path)
build_host_lookup(hosts_path) -> dict
License
-------
Ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Label space
# ---------------------------------------------------------------------------
# Ordered by attack progression.
LABEL_ORDER = [
"benign_background",
"initial_access",
"lateral_movement",
"persistence_establishment",
"exfiltration_or_impact",
]
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# ---------------------------------------------------------------------------
# Identifier and target columns
# ---------------------------------------------------------------------------
ID_COLUMNS = [
"event_id", "host_id", "incident_id", "timestamp", "user_id",
"source_ip", "dest_ip", "raw_log_payload",
]
TARGET_COLUMN = "attack_lifecycle_phase"
GROUP_COLUMN = "incident_id"
# Oracle columns dropped from features.
ORACLE_COLUMNS = [
"mitre_tactic", # benign value -> benign_background phase
"mitre_technique_id", # ATT&CK technique -> tactic deterministic
"label_malicious", # False -> benign_background
"threat_actor_id", # NONE -> benign
"threat_actor_profile", # benign_user -> benign_background
"event_type", # many event types phase-specific (e.g. c2_beacon_outbound)
]
# ---------------------------------------------------------------------------
# Per-event numeric features
# ---------------------------------------------------------------------------
EVENT_NUMERIC_FEATURES = [
"source_port",
"dest_port",
"cvss_score_analogue",
"label_log_tampered", # bool kept as observable
"label_false_positive", # bool kept as observable (all False on events)
]
EVENT_CATEGORICAL_FEATURES = [
"event_class", # 12 values
"log_source_type", # 8 values
"severity_level", # 5 values
]
# ---------------------------------------------------------------------------
# Host features (joined on host_id from host_inventory.csv)
# ---------------------------------------------------------------------------
HOST_NUMERIC_FEATURES = [
"edr_agent_installed",
"patch_compliance_level",
"vulnerability_count_open",
]
HOST_CATEGORICAL_FEATURES = [
"os_type", # 7 values
"host_role", # 10 values
"network_segment", # 8 values
"defender_posture_tier", # 4 values
"criticality_rating", # 4 values
"cloud_provider", # 4 values
"siem_platform", # 8 values
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Six engineered features encoding phase-discriminative hypotheses.
Each composite is something a SOC analyst would compute by hand.
"""
df = df.copy()
# 1. Hour of day (0-23) from timestamp, if available
if "timestamp" in df.columns:
ts = pd.to_datetime(df["timestamp"], errors="coerce")
df["hour_of_day"] = ts.dt.hour.fillna(12).astype(int)
df["is_off_hours"] = ((ts.dt.hour < 9) | (ts.dt.hour > 17)).fillna(False).astype(int)
df["is_weekend"] = (ts.dt.weekday >= 5).fillna(False).astype(int)
else:
df["hour_of_day"] = 12
df["is_off_hours"] = 0
df["is_weekend"] = 0
# 2. Log-scaled CVSS (heavy-tailed)
df["log_cvss"] = np.log1p(
df.get("cvss_score_analogue", 0).clip(lower=0)
).astype(float)
# 3. High-CVSS indicator
df["is_high_cvss"] = (
df.get("cvss_score_analogue", 0) >= 7.0
).astype(int)
# 4. Port category: well-known (<1024) vs registered vs dynamic
dest = df.get("dest_port", 0).fillna(0).astype(int)
df["is_well_known_port"] = (dest < 1024).astype(int)
df["is_dynamic_port"] = (dest >= 49152).astype(int)
# 5. Network direction: same-network if source_port equals dest_port
# OR if specific dest_port matches common service. Rough proxy.
df["is_outbound_web"] = (dest.isin([80, 443, 8080, 8443])).astype(int)
# 6. Risk composite: CVSS x defender_weakness. Higher composite -> later phase.
if "patch_compliance_level" in df.columns:
df["risk_composite"] = (
df["cvss_score_analogue"].fillna(0) *
(1 - df["patch_compliance_level"].fillna(1))
).astype(float)
else:
df["risk_composite"] = 0.0
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
events_path: str | Path,
hosts_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, pd.Series, pd.Series, dict[str, Any]]:
"""
Load security_events.csv, join host_inventory.csv, drop target +
identifiers + oracle columns, engineer features, one-hot encode,
return (X, y, ids, groups, meta).
"""
events = pd.read_csv(events_path)
hosts = pd.read_csv(hosts_path)
y = events[TARGET_COLUMN].map(LABEL_TO_INT)
if y.isna().any():
bad = events.loc[y.isna(), TARGET_COLUMN].unique()
raise ValueError(f"Unknown attack_lifecycle_phase values: {bad}")
y = y.astype(int)
ids = events["event_id"].copy()
groups = events[GROUP_COLUMN].copy()
host_cols_needed = (
["host_id"] + HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES
)
events = events.merge(
hosts[host_cols_needed], on="host_id", how="left",
)
# Apply engineered features BEFORE dropping timestamp
events = _add_engineered_features(events)
events = events.drop(
columns=ID_COLUMNS + [TARGET_COLUMN] + ORACLE_COLUMNS,
errors="ignore",
)
numeric_features = (
EVENT_NUMERIC_FEATURES
+ HOST_NUMERIC_FEATURES
+ [
"hour_of_day", "is_off_hours", "is_weekend",
"log_cvss", "is_high_cvss",
"is_well_known_port", "is_dynamic_port", "is_outbound_web",
"risk_composite",
]
)
numeric_features = [c for c in numeric_features if c in events.columns]
X_numeric = events[numeric_features].apply(
lambda s: s.astype(float) if s.dtype != bool else s.astype(int).astype(float)
)
all_categorical = EVENT_CATEGORICAL_FEATURES + HOST_CATEGORICAL_FEATURES
categorical_levels: dict[str, list[str]] = {}
blocks: list[pd.DataFrame] = []
for col in all_categorical:
if col not in events.columns:
continue
levels = sorted(events[col].dropna().astype(str).unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
events[col].astype(str).astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
blocks.append(block)
X = pd.concat(
[X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in blocks],
axis=1,
).fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
"oracle_excluded": ORACLE_COLUMNS,
}
return X, y, ids, groups, meta
def transform_single(
record: dict | pd.DataFrame,
meta: dict[str, Any],
host_lookup: dict | None = None,
) -> np.ndarray:
"""Encode a single event record for inference."""
if isinstance(record, dict):
df = pd.DataFrame([record.copy()])
else:
df = record.copy()
if host_lookup is not None and "host_id" in df.columns:
host_id = df["host_id"].iloc[0]
host_feats = host_lookup.get(host_id, {})
for k, v in host_feats.items():
if k not in df.columns:
df[k] = v
df = _add_engineered_features(df)
numeric = pd.DataFrame()
for col in meta["numeric_features"]:
s = df.get(col, pd.Series([0.0] * len(df)))
if s.dtype == bool:
s = s.astype(int)
numeric[col] = s.astype(float).values
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df))).astype(str)
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
for lvl in levels:
cname = f"{col}_{lvl}"
if cname not in block.columns:
block[cname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
"oracle_excluded": meta.get("oracle_excluded", []),
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
def build_host_lookup(hosts_path: str | Path) -> dict[str, dict]:
"""Build {host_id: {host feature values}} for inference-time lookup."""
hosts = pd.read_csv(hosts_path)
cols = HOST_NUMERIC_FEATURES + HOST_CATEGORICAL_FEATURES
out = {}
for _, row in hosts.iterrows():
out[row["host_id"]] = {c: row[c] for c in cols if c in hosts.columns}
return out
if __name__ == "__main__":
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, ids, groups, meta = build_features(
base / "security_events.csv",
base / "host_inventory.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"groups: {groups.nunique()} unique incidents")
print(f"n_features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X has NaN: {X.isnull().any().any()}")
|