File size: 14,623 Bytes
721fce4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""
feature_engineering.py
======================

Feature pipeline for the CYB001 baseline classifier.

This module produces a flow-level feature matrix and label vector from the
four CSV files distributed with the CYB001 sample dataset on Hugging Face:

    network_flows.csv     (primary, one row per flow)
    session_summary.csv   (one row per session, joined on session_id)
    network_topology.csv  (one row per network segment, joined on segment_id)
    flow_events.csv       (one row per security event - NOT used for v1
                           features; flows lose temporal granularity if
                           aggregated naively. Reserved for future work.)

The pipeline is deliberately written to be read end-to-end. Every dropped
column is dropped with a one-line explanation. Every engineered feature
sits next to a one-sentence motivation. If you are evaluating the CYB001
product, this file is the feature recipe; what the model "sees" is exactly
what this file emits.

Public API
----------
    build_features(flows_path, sessions_path, topology_path) -> (X, y, meta)

        X : pd.DataFrame  - feature matrix, all numeric, no NaNs
        y : pd.Series     - integer-encoded label (0=BENIGN, 1=MALICIOUS, 2=AMBIGUOUS)
        meta : dict       - {feature_names, label_encoder, categorical_levels}

The same `meta` dict is used at inference time so a new flow record gets
encoded identically to training.

    transform_single(record, meta) -> np.ndarray

        Encode a single flow record (dict or 1-row DataFrame) for inference.

License
-------
This file ships with the public model on Hugging Face under CC-BY-NC-4.0,
matching the dataset license. See README.md.
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd

# ---------------------------------------------------------------------------
# Constants - what we keep, what we drop, and why
# ---------------------------------------------------------------------------

LABEL_ORDER = ["BENIGN", "MALICIOUS", "AMBIGUOUS"]  # index 0, 1, 2
LABEL_TO_INT = {lbl: i for i, lbl in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {i: lbl for lbl, i in LABEL_TO_INT.items()}

# Columns dropped from network_flows.csv because they are ground-truth
# generator metadata, not observables a real IDS would have at inference time.
# Including any of these gives perfect or near-perfect accuracy that does
# not reflect real-world performance.
LEAKY_FLOW_COLUMNS = [
    "traffic_category",          # 100% deterministic of label (attack_*/benign_*/ambiguous_*)
    "attack_subcategory",        # null iff label != MALICIOUS
    "attacker_capability_tier",  # labeled per flow including benign - generator metadata
]

# Identifier / non-feature columns
ID_COLUMNS = [
    "flow_id", "session_id",
    "source_ip_hash", "destination_ip_hash",   # SHA-256 pseudonyms, not useful as features
    "flow_start_timestamp",                    # consumed by is_off_hours engineered feature
]

# Direct numeric features from network_flows.csv (pass-through)
DIRECT_NUMERIC_FLOW_FEATURES = [
    "source_port", "dest_port",
    "flow_duration_ms",
    "total_fwd_packets", "total_bwd_packets",
    "total_bytes_fwd", "total_bytes_bwd",
    "fwd_packet_len_mean", "fwd_packet_len_std",
    "bwd_packet_len_mean", "bwd_packet_len_std",
    "flow_bytes_per_sec", "flow_packets_per_sec",
    "inter_arrival_time_mean", "inter_arrival_time_std",
    "tcp_flag_syn_count", "tcp_flag_ack_count", "tcp_flag_fin_count",
    "tcp_flag_rst_count", "tcp_flag_psh_count", "tcp_flag_urg_count",
    "retransmission_flag", "fragmentation_flag", "protocol_violation_flag",
]

# Session-level numeric features (joined on session_id).
# Selected after a per-label leakage audit:
#   KEEP: payload_entropy_mean, retransmission_rate, protocol_violation_count,
#         c2_beacon_flag, session_risk_score   (overlapping distributions across labels)
#   DROP: exfil_volume_bytes, scan_probe_count, lateral_move_flag
#         (zero for all BENIGN/AMBIGUOUS - generator oracles, not detector outputs)
SESSION_FEATURES_KEEP = [
    "payload_entropy_mean",
    "retransmission_rate",
    "protocol_violation_count",
    "c2_beacon_flag",
    "session_risk_score",
]

# Topology-level numeric features (joined on segment_id)
TOPOLOGY_NUMERIC_FEATURES = [
    "trust_level", "avg_concurrent_flows", "bandwidth_mbps",
    "nat_enabled", "ids_coverage", "diurnal_peak_factor",
    "feature_space_dim", "alert_threshold",
    "retraining_cadence_days", "ensemble_size", "device_count",
]

# Categorical columns that get one-hot encoded
CATEGORICAL_FEATURES = [
    ("protocol",             "flows"),     # TCP / UDP / HTTPS / DNS / SMTP / SSH / FTP / NTP
    ("flow_lifecycle_phase", "flows"),     # initiation / handshake / transfer / ...
    ("source_device_type",   "flows"),     # workstation / server / iot / mobile / cloud / ot
    ("dest_device_type",     "flows"),
    ("segment_type",         "topology"),  # corporate_lan / dmz / cloud_workload / ...
    ("firewall_policy",      "topology"),
    ("qos_policy",           "topology"),
    ("defender_architecture","topology"),
]


# ---------------------------------------------------------------------------
# Engineered features
# ---------------------------------------------------------------------------

def _safe_divide(num: pd.Series, denom: pd.Series, fill: float = 0.0) -> pd.Series:
    """Element-wise divide, replacing inf/nan from div-by-zero with `fill`."""
    out = num / denom.replace(0, np.nan)
    return out.replace([np.inf, -np.inf], np.nan).fillna(fill)


def _add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add eight engineered features that encode domain hypotheses about how
    each label class behaves. These are NOT learned; they are stated by hand
    so a buyer can read this function and see what the model is told to look
    at. Tree models can recover most of these on their own, but giving them
    explicitly improves both XGBoost convergence and MLP performance.
    """
    df = df.copy()

    # IAT coefficient of variation. Low cv => regular inter-arrival times
    # => C2 beacon signature (the dataset is calibrated to cv ~= 0.065 for
    # APT beacons, regularity score ~= 0.93 per the README).
    df["iat_cv"] = _safe_divide(df["inter_arrival_time_std"],
                                df["inter_arrival_time_mean"])

    # Forward/backward byte ratio. >> 1 indicates upload-heavy flow, which
    # is the exfiltration signature.
    df["fwd_bwd_byte_ratio"] = _safe_divide(df["total_bytes_fwd"],
                                            df["total_bytes_bwd"])

    # Bytes per packet (forward direction). Combined with packet length
    # std, separates streaming traffic from short-message protocols.
    total_fwd = df["total_fwd_packets"].replace(0, np.nan)
    df["bytes_per_packet_fwd"] = (df["total_bytes_fwd"] / total_fwd).fillna(0)

    # TCP flag anomaly score. RST and URG together, or high counts relative
    # to total packets, indicate scan/probe or protocol misuse.
    total_packets = (df["total_fwd_packets"] + df["total_bwd_packets"]).replace(0, np.nan)
    flag_total = (df["tcp_flag_rst_count"] + df["tcp_flag_urg_count"]
                  + df["tcp_flag_fin_count"])
    df["tcp_flag_anomaly_score"] = (flag_total / total_packets).fillna(0)

    # Payload density. Bytes per packet, normalized to MTU. Low density on
    # high packet counts indicates beaconing or keep-alive.
    total_bytes = df["total_bytes_fwd"] + df["total_bytes_bwd"]
    df["payload_density"] = (total_bytes / (total_packets * 1500)).fillna(0)

    # Hour of day from timestamp. Off-hours bias is calibrated into the
    # APT and insider-threat tiers.
    ts = pd.to_datetime(df["flow_start_timestamp"], errors="coerce")
    hour = ts.dt.hour.fillna(12).astype(int)
    df["hour_of_day"] = hour
    df["is_off_hours"] = ((hour < 6) | (hour > 22)).astype(int)

    # Port observables. Well-known ports < 1024, ephemeral ports >= 49152.
    df["is_well_known_dest_port"] = (df["dest_port"] < 1024).astype(int)
    df["is_ephemeral_src_port"]   = (df["source_port"] >= 49152).astype(int)

    return df


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

def build_features(
    flows_path: str | Path,
    sessions_path: str | Path,
    topology_path: str | Path,
) -> tuple[pd.DataFrame, pd.Series, dict[str, Any]]:
    """
    Load the three CSVs, join them, drop leaky columns, engineer features,
    one-hot encode categoricals, and return (X, y, meta).

    The returned `meta` dict captures the column order and the categorical
    level set, which is what `transform_single` needs at inference time to
    encode a new record identically.
    """
    flows = pd.read_csv(flows_path)
    sessions = pd.read_csv(sessions_path)
    topology = pd.read_csv(topology_path)

    # Drop columns that leak the label (see LEAKY_FLOW_COLUMNS for rationale)
    flows = flows.drop(columns=LEAKY_FLOW_COLUMNS, errors="ignore")

    # Join session-level aggregates
    df = flows.merge(
        sessions[["session_id"] + SESSION_FEATURES_KEEP],
        on="session_id", how="left",
    )

    # Join topology features (numeric + categorical)
    topo_cols = ["segment_id"] + TOPOLOGY_NUMERIC_FEATURES + [
        col for col, src in CATEGORICAL_FEATURES if src == "topology"
    ]
    df = df.merge(topology[topo_cols], on="segment_id", how="left")

    # Extract labels before adding features
    y = df["label"].map(LABEL_TO_INT).astype(int)

    # Engineered features
    df = _add_engineered_features(df)

    # Assemble feature columns
    numeric_features = (
        DIRECT_NUMERIC_FLOW_FEATURES
        + SESSION_FEATURES_KEEP
        + TOPOLOGY_NUMERIC_FEATURES
        + [
            "iat_cv", "fwd_bwd_byte_ratio", "bytes_per_packet_fwd",
            "tcp_flag_anomaly_score", "payload_density",
            "hour_of_day", "is_off_hours",
            "is_well_known_dest_port", "is_ephemeral_src_port",
        ]
    )

    X_numeric = df[numeric_features].astype(float)

    # One-hot encode categoricals. Record the level set in `meta` so we can
    # reproduce the same columns at inference time even if a new record
    # contains an unseen level (it will encode to all-zero, which is the
    # correct fallback for one-hot).
    categorical_levels: dict[str, list[str]] = {}
    one_hot_blocks: list[pd.DataFrame] = []
    for col, _src in CATEGORICAL_FEATURES:
        levels = sorted(df[col].dropna().unique().tolist())
        categorical_levels[col] = levels
        block = pd.get_dummies(
            df[col].astype("category").cat.set_categories(levels),
            prefix=col, dummy_na=False,
        ).astype(int)
        one_hot_blocks.append(block)

    X = pd.concat([X_numeric.reset_index(drop=True)]
                  + [b.reset_index(drop=True) for b in one_hot_blocks], axis=1)

    # Final NaN sweep (defensive - session join can introduce NaN if a
    # session_id is missing from session_summary.csv).
    X = X.fillna(0.0)

    meta = {
        "feature_names": X.columns.tolist(),
        "numeric_features": numeric_features,
        "categorical_levels": categorical_levels,
        "label_to_int": LABEL_TO_INT,
        "int_to_label": INT_TO_LABEL,
    }

    return X, y, meta


def transform_single(record: dict | pd.DataFrame, meta: dict[str, Any]) -> np.ndarray:
    """
    Encode a single flow record for inference.

    `record` must contain the same columns as network_flows.csv (minus the
    leaky columns), plus the joined session and topology fields. If you only
    have the flow row, you must look up the matching session_summary row and
    network_topology row and merge them into `record` before calling this.

    Returns a (1, n_features) numpy array ready for model.predict_proba.
    """
    if isinstance(record, dict):
        df = pd.DataFrame([record])
    else:
        df = record.copy()

    df = _add_engineered_features(df)

    # Numeric features in fixed order
    numeric = pd.DataFrame({
        col: df.get(col, pd.Series([0.0] * len(df))).astype(float).values
        for col in meta["numeric_features"]
    })

    # One-hot blocks in fixed order, using the levels seen at fit time
    blocks: list[pd.DataFrame] = [numeric]
    for col, levels in meta["categorical_levels"].items():
        val = df.get(col, pd.Series([None] * len(df)))
        block = pd.get_dummies(
            val.astype("category").cat.set_categories(levels),
            prefix=col, dummy_na=False,
        ).astype(int)
        # Ensure all expected level columns are present (in case a level
        # didn't appear in this single record)
        for lvl in levels:
            colname = f"{col}_{lvl}"
            if colname not in block.columns:
                block[colname] = 0
        block = block[[f"{col}_{lvl}" for lvl in levels]]
        blocks.append(block)

    X = pd.concat(blocks, axis=1).fillna(0.0)

    # Reorder to match training column order exactly
    X = X.reindex(columns=meta["feature_names"], fill_value=0.0)
    return X.values.astype(np.float32)


def save_meta(meta: dict[str, Any], path: str | Path) -> None:
    """Persist meta to JSON for inference-time reuse."""
    serializable = {
        "feature_names": meta["feature_names"],
        "numeric_features": meta["numeric_features"],
        "categorical_levels": meta["categorical_levels"],
        "label_to_int": meta["label_to_int"],
        "int_to_label": {str(k): v for k, v in meta["int_to_label"].items()},
    }
    with open(path, "w") as f:
        json.dump(serializable, f, indent=2)


def load_meta(path: str | Path) -> dict[str, Any]:
    """Load meta from JSON."""
    with open(path) as f:
        meta = json.load(f)
    meta["int_to_label"] = {int(k): v for k, v in meta["int_to_label"].items()}
    return meta


if __name__ == "__main__":
    # Smoke test
    import sys
    base = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/mnt/user-data/uploads")
    X, y, meta = build_features(
        base / "network_flows.csv",
        base / "session_summary.csv",
        base / "network_topology.csv",
    )
    print(f"X shape: {X.shape}")
    print(f"y shape: {y.shape}")
    print(f"n features: {len(meta['feature_names'])}")
    print(f"label distribution:\n{y.map(INT_TO_LABEL).value_counts()}")
    print(f"X dtypes unique: {X.dtypes.unique()}")
    print(f"X has NaN: {X.isnull().any().any()}")