File size: 14,623 Bytes
721fce4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 | """
feature_engineering.py
======================
Feature pipeline for the CYB001 baseline classifier.
This module produces a flow-level feature matrix and label vector from the
four CSV files distributed with the CYB001 sample dataset on Hugging Face:
network_flows.csv (primary, one row per flow)
session_summary.csv (one row per session, joined on session_id)
network_topology.csv (one row per network segment, joined on segment_id)
flow_events.csv (one row per security event - NOT used for v1
features; flows lose temporal granularity if
aggregated naively. Reserved for future work.)
The pipeline is deliberately written to be read end-to-end. Every dropped
column is dropped with a one-line explanation. Every engineered feature
sits next to a one-sentence motivation. If you are evaluating the CYB001
product, this file is the feature recipe; what the model "sees" is exactly
what this file emits.
Public API
----------
build_features(flows_path, sessions_path, topology_path) -> (X, y, meta)
X : pd.DataFrame - feature matrix, all numeric, no NaNs
y : pd.Series - integer-encoded label (0=BENIGN, 1=MALICIOUS, 2=AMBIGUOUS)
meta : dict - {feature_names, label_encoder, categorical_levels}
The same `meta` dict is used at inference time so a new flow record gets
encoded identically to training.
transform_single(record, meta) -> np.ndarray
Encode a single flow record (dict or 1-row DataFrame) for inference.
License
-------
This file ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Constants - what we keep, what we drop, and why
# ---------------------------------------------------------------------------
LABEL_ORDER = ["BENIGN", "MALICIOUS", "AMBIGUOUS"] # index 0, 1, 2
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}
# Columns dropped from network_flows.csv because they are ground-truth
# generator metadata, not observables a real IDS would have at inference time.
# Including any of these gives perfect or near-perfect accuracy that does
# not reflect real-world performance.
LEAKY_FLOW_COLUMNS = [
"traffic_category", # 100% deterministic of label (attack_*/benign_*/ambiguous_*)
"attack_subcategory", # null iff label != MALICIOUS
"attacker_capability_tier", # labeled per flow including benign - generator metadata
]
# Identifier / non-feature columns
ID_COLUMNS = [
"flow_id", "session_id",
"source_ip_hash", "destination_ip_hash", # SHA-256 pseudonyms, not useful as features
"flow_start_timestamp", # consumed by is_off_hours engineered feature
]
# Direct numeric features from network_flows.csv (pass-through)
DIRECT_NUMERIC_FLOW_FEATURES = [
"source_port", "dest_port",
"flow_duration_ms",
"total_fwd_packets", "total_bwd_packets",
"total_bytes_fwd", "total_bytes_bwd",
"fwd_packet_len_mean", "fwd_packet_len_std",
"bwd_packet_len_mean", "bwd_packet_len_std",
"flow_bytes_per_sec", "flow_packets_per_sec",
"inter_arrival_time_mean", "inter_arrival_time_std",
"tcp_flag_syn_count", "tcp_flag_ack_count", "tcp_flag_fin_count",
"tcp_flag_rst_count", "tcp_flag_psh_count", "tcp_flag_urg_count",
"retransmission_flag", "fragmentation_flag", "protocol_violation_flag",
]
# Session-level numeric features (joined on session_id).
# Selected after a per-label leakage audit:
# KEEP: payload_entropy_mean, retransmission_rate, protocol_violation_count,
# c2_beacon_flag, session_risk_score (overlapping distributions across labels)
# DROP: exfil_volume_bytes, scan_probe_count, lateral_move_flag
# (zero for all BENIGN/AMBIGUOUS - generator oracles, not detector outputs)
SESSION_FEATURES_KEEP = [
"payload_entropy_mean",
"retransmission_rate",
"protocol_violation_count",
"c2_beacon_flag",
"session_risk_score",
]
# Topology-level numeric features (joined on segment_id)
TOPOLOGY_NUMERIC_FEATURES = [
"trust_level", "avg_concurrent_flows", "bandwidth_mbps",
"nat_enabled", "ids_coverage", "diurnal_peak_factor",
"feature_space_dim", "alert_threshold",
"retraining_cadence_days", "ensemble_size", "device_count",
]
# Categorical columns that get one-hot encoded
CATEGORICAL_FEATURES = [
("protocol", "flows"), # TCP / UDP / HTTPS / DNS / SMTP / SSH / FTP / NTP
("flow_lifecycle_phase", "flows"), # initiation / handshake / transfer / ...
("source_device_type", "flows"), # workstation / server / iot / mobile / cloud / ot
("dest_device_type", "flows"),
("segment_type", "topology"), # corporate_lan / dmz / cloud_workload / ...
("firewall_policy", "topology"),
("qos_policy", "topology"),
("defender_architecture","topology"),
]
# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------
def _safe_divide(num: pd.Series, denom: pd.Series, fill: float = 0.0) -> pd.Series:
"""Element-wise divide, replacing inf/nan from div-by-zero with `fill`."""
out = num / denom.replace(0, np.nan)
return out.replace([np.inf, -np.inf], np.nan).fillna(fill)
def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Add eight engineered features that encode domain hypotheses about how
each label class behaves. These are NOT learned; they are stated by hand
so a buyer can read this function and see what the model is told to look
at. Tree models can recover most of these on their own, but giving them
explicitly improves both XGBoost convergence and MLP performance.
"""
df = df.copy()
# IAT coefficient of variation. Low cv => regular inter-arrival times
# => C2 beacon signature (the dataset is calibrated to cv ~= 0.065 for
# APT beacons, regularity score ~= 0.93 per the README).
df["iat_cv"] = _safe_divide(df["inter_arrival_time_std"],
df["inter_arrival_time_mean"])
# Forward/backward byte ratio. >> 1 indicates upload-heavy flow, which
# is the exfiltration signature.
df["fwd_bwd_byte_ratio"] = _safe_divide(df["total_bytes_fwd"],
df["total_bytes_bwd"])
# Bytes per packet (forward direction). Combined with packet length
# std, separates streaming traffic from short-message protocols.
total_fwd = df["total_fwd_packets"].replace(0, np.nan)
df["bytes_per_packet_fwd"] = (df["total_bytes_fwd"] / total_fwd).fillna(0)
# TCP flag anomaly score. RST and URG together, or high counts relative
# to total packets, indicate scan/probe or protocol misuse.
total_packets = (df["total_fwd_packets"] + df["total_bwd_packets"]).replace(0, np.nan)
flag_total = (df["tcp_flag_rst_count"] + df["tcp_flag_urg_count"]
+ df["tcp_flag_fin_count"])
df["tcp_flag_anomaly_score"] = (flag_total / total_packets).fillna(0)
# Payload density. Bytes per packet, normalized to MTU. Low density on
# high packet counts indicates beaconing or keep-alive.
total_bytes = df["total_bytes_fwd"] + df["total_bytes_bwd"]
df["payload_density"] = (total_bytes / (total_packets * 1500)).fillna(0)
# Hour of day from timestamp. Off-hours bias is calibrated into the
# APT and insider-threat tiers.
ts = pd.to_datetime(df["flow_start_timestamp"], errors="coerce")
hour = ts.dt.hour.fillna(12).astype(int)
df["hour_of_day"] = hour
df["is_off_hours"] = ((hour < 6) | (hour > 22)).astype(int)
# Port observables. Well-known ports < 1024, ephemeral ports >= 49152.
df["is_well_known_dest_port"] = (df["dest_port"] < 1024).astype(int)
df["is_ephemeral_src_port"] = (df["source_port"] >= 49152).astype(int)
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_features(
flows_path: str | Path,
sessions_path: str | Path,
topology_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, dict[str, Any]]:
"""
Load the three CSVs, join them, drop leaky columns, engineer features,
one-hot encode categoricals, and return (X, y, meta).
The returned `meta` dict captures the column order and the categorical
level set, which is what `transform_single` needs at inference time to
encode a new record identically.
"""
flows = pd.read_csv(flows_path)
sessions = pd.read_csv(sessions_path)
topology = pd.read_csv(topology_path)
# Drop columns that leak the label (see LEAKY_FLOW_COLUMNS for rationale)
flows = flows.drop(columns=LEAKY_FLOW_COLUMNS, errors="ignore")
# Join session-level aggregates
df = flows.merge(
sessions[["session_id"] + SESSION_FEATURES_KEEP],
on="session_id", how="left",
)
# Join topology features (numeric + categorical)
topo_cols = ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + [
col for col, src in CATEGORICAL_FEATURES if src == "topology"
]
df = df.merge(topology[topo_cols], on="segment_id", how="left")
# Extract labels before adding features
y = df["label"].map(LABEL_TO_INT).astype(int)
# Engineered features
df = _add_engineered_features(df)
# Assemble feature columns
numeric_features = (
DIRECT_NUMERIC_FLOW_FEATURES
+ SESSION_FEATURES_KEEP
+ TOPOLOGY_NUMERIC_FEATURES
+ [
"iat_cv", "fwd_bwd_byte_ratio", "bytes_per_packet_fwd",
"tcp_flag_anomaly_score", "payload_density",
"hour_of_day", "is_off_hours",
"is_well_known_dest_port", "is_ephemeral_src_port",
]
)
X_numeric = df[numeric_features].astype(float)
# One-hot encode categoricals. Record the level set in `meta` so we can
# reproduce the same columns at inference time even if a new record
# contains an unseen level (it will encode to all-zero, which is the
# correct fallback for one-hot).
categorical_levels: dict[str, list[str]] = {}
one_hot_blocks: list[pd.DataFrame] = []
for col, _src in CATEGORICAL_FEATURES:
levels = sorted(df[col].dropna().unique().tolist())
categorical_levels[col] = levels
block = pd.get_dummies(
df[col].astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
one_hot_blocks.append(block)
X = pd.concat([X_numeric.reset_index(drop=True)]
+ [b.reset_index(drop=True) for b in one_hot_blocks], axis=1)
# Final NaN sweep (defensive - session join can introduce NaN if a
# session_id is missing from session_summary.csv).
X = X.fillna(0.0)
meta = {
"feature_names": X.columns.tolist(),
"numeric_features": numeric_features,
"categorical_levels": categorical_levels,
"label_to_int": LABEL_TO_INT,
"int_to_label": INT_TO_LABEL,
}
return X, y, meta
def transform_single(record: dict | pd.DataFrame, meta: dict[str, Any]) -> np.ndarray:
"""
Encode a single flow record for inference.
`record` must contain the same columns as network_flows.csv (minus the
leaky columns), plus the joined session and topology fields. If you only
have the flow row, you must look up the matching session_summary row and
network_topology row and merge them into `record` before calling this.
Returns a (1, n_features) numpy array ready for model.predict_proba.
"""
if isinstance(record, dict):
df = pd.DataFrame([record])
else:
df = record.copy()
df = _add_engineered_features(df)
# Numeric features in fixed order
numeric = pd.DataFrame({
col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
for col in meta["numeric_features"]
})
# One-hot blocks in fixed order, using the levels seen at fit time
blocks: list[pd.DataFrame] = [numeric]
for col, levels in meta["categorical_levels"].items():
val = df.get(col, pd.Series([None] * len(df)))
block = pd.get_dummies(
val.astype("category").cat.set_categories(levels),
prefix=col, dummy_na=False,
).astype(int)
# Ensure all expected level columns are present (in case a level
# didn't appear in this single record)
for lvl in levels:
colname = f"{col}_{lvl}"
if colname not in block.columns:
block[colname] = 0
block = block[[f"{col}_{lvl}" for lvl in levels]]
blocks.append(block)
X = pd.concat(blocks, axis=1).fillna(0.0)
# Reorder to match training column order exactly
X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
return X.values.astype(np.float32)
def save_meta(meta: dict[str, Any], path: str | Path) -> None:
"""Persist meta to JSON for inference-time reuse."""
serializable = {
"feature_names": meta["feature_names"],
"numeric_features": meta["numeric_features"],
"categorical_levels": meta["categorical_levels"],
"label_to_int": meta["label_to_int"],
"int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
}
with open(path, "w") as f:
json.dump(serializable, f, indent=2)
def load_meta(path: str | Path) -> dict[str, Any]:
"""Load meta from JSON."""
with open(path) as f:
meta = json.load(f)
meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
return meta
if __name__ == "__main__":
# Smoke test
import sys
base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
X, y, meta = build_features(
base / "network_flows.csv",
base / "session_summary.csv",
base / "network_topology.csv",
)
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"n features: {len(meta['feature_names'])}")
print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
print(f"X dtypes unique: {X.dtypes.unique()}")
print(f"X has NaN: {X.isnull().any().any()}")
|