metadata tags:
- text-classification
- security
- blue-team
- roberta
license: odc-by
datasets:
- trendmicro-ailab/Primus-FineWeb
metrics:
- precision
- recall
- f1
pipeline_tag: text-classification
library_name: transformers
models:
- ehsanaghaei/SecureBERT
BlueSecureBERT π¦π‘οΈ
Detects blue-team / defensive security text (English), with a focus on technical detection-engineering workflows (SIEM ingestion, Sigma rules, Sysmon, Microsoft KQL..).
Split
Precision
Recall
F1
Fβ
CE-loss
Threshold
Validation
0.949
0.991
0.969
0.982
0.011
0.579
Recommended cut-off: prob >= 0.579 (arg-max on the validation split)
Demo
Phrase
BlueSecureBERT
RedSecureBERT
To exfiltrate sensitive data, launch a phishing campaign that tricks employees into revealing their VPN credentials.
0.066
0.824
We should deploy an EDR solution, monitor all endpoints for intrusion attempts, and enforce strict password policies.
0.557
0.019
Our marketing team will unveil the new cybersecurity branding materials at next Tuesdayβs antivirus product launch.
0.256
0.021
I'm excited about the company picnic. There's no cybersecurity topicβjust burgers and games.
0.272
0.103
Intended uses & limits
Triage large corpora for techial detection engineering, sysmon, sigma, SIEM, indicators of compromise related data.
Input language: English
No external test set yet β treat numbers as optimistic
Training data
Label
Rows
Offensive
30 746
Defensive
19 550
Other
130 000
Total
180 296
Model details
Field
Value
Base encoder
ehsanaghaei/SecureBERT (RoBERTa-base, 125 M)
Objective
One-vs-rest, focal-loss (Ξ³ = 2)
Training
3 epochs Β· micro-batch 16 Β· LR 2e-5
Hardware
1Γ RTX 4090 (β 41 min)
Inference dtype
FP16-safe
Training Data License
Script exemple
"""
06_split_binary.py
~~~~~~~~~~~~~~~~~~
Stream-splits a JSONL cybersecurity corpus into *offensive*, *defensive*, and *other* shards
using **two** fine-tuned SecureBERT heads.
How the two heads work together
-------------------------------
We load two independent checkpoints:
* `offensive_vs_rest`βββgives **P(offensive | text)**
* `defensive_vs_rest`βββgives **P(defensive | text)**
For every line we:
1. run both heads in the same GPU batch;
2. take the positive-class probability from each soft-max;
3. compare against per-head thresholds (from `thresholds.json`, default 0.5);
4. route the text with this truth table
"""
from __future__ import annotations
import argparse
import json
from itertools import islice
from pathlib import Path
import torch
from torch.nn.functional import softmax
from tqdm.auto import tqdm
from transformers import (
AutoModelForSequenceClassification as HFModel,
AutoTokenizer,
)
from config import RAW_JSONL, MODEL_DIR
torch.backends.cuda.matmul.allow_tf32 = True
torch.set_float32_matmul_precision("medium" )
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
cli = argparse.ArgumentParser(description="Split JSONL into offence/defence/other" )
cli.add_argument("--batch_size" , type =int , help ="override auto batch sizing" )
args = cli.parse_args()
if args.batch_size:
BATCH = args.batch_size
else :
try :
import pynvml
pynvml.nvmlInit()
free = (
pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(0 )).free
/ 1024 **3
)
pynvml.nvmlShutdown()
BATCH = max (64 , min (int (free // 0.03 ), 1024 ))
except Exception:
BATCH = 256
print (f"[split-binary] batch size = {BATCH} " )
thr_path = Path(MODEL_DIR) / "thresholds.json"
if thr_path.exists():
THR = json.loads(thr_path.read_text())
print ("Loaded thresholds:" , THR)
else :
THR = {"off" : 0.5 , "def" : 0.5 }
print ("No thresholds.json β default 0.5 each" )
def load_model (path: Path ):
"""Load classification head in BF16 (no flash-attention)."""
return HFModel.from_pretrained(path, torch_dtype=torch.bfloat16)
paths = {
"off" : Path(MODEL_DIR) / "offensive_vs_rest" ,
"def" : Path(MODEL_DIR) / "defensive_vs_rest" ,
}
print ("Loading models β¦" )
m_off = load_model(paths["off" ]).to(DEVICE).eval ()
m_def = load_model(paths["def" ]).to(DEVICE).eval ()
try :
m_off = torch.compile (m_off, dynamic=True , mode="reduce-overhead" )
m_def = torch.compile (m_def, dynamic=True , mode="reduce-overhead" )
print ("torch.compile: dynamic=True, reduce-overhead β" )
except Exception:
pass
tok = AutoTokenizer.from_pretrained(paths["off" ])
ENC = dict (
truncation=True ,
padding="longest" ,
max_length=512 ,
return_tensors="pt" ,
)
outs = {
"off" : open ("offensive.jsonl" , "w" , encoding="utf-8" ),
"def" : open ("defensive.jsonl" , "w" , encoding="utf-8" ),
"oth" : open ("other.jsonl" , "w" , encoding="utf-8" ),
}
def batched (it, n ):
"""Yield `n`-sized chunks from iterator `it`."""
while True :
chunk = list (islice(it, n))
if not chunk:
break
yield chunk
with open (RAW_JSONL, "r" , encoding="utf-8" ) as fin, torch.inference_mode():
for lines in tqdm(batched(fin, BATCH), desc="Splitting" , ncols=110 ):
recs = [json.loads(l) for l in lines]
texts = [r.get("content" , "" ) for r in recs]
batch = tok(texts, **ENC)
batch = {
k: v.pin_memory().to(DEVICE, non_blocking=True ) for k, v in batch.items()
}
p_off = softmax(m_off(**batch).logits, dim=-1 )[:, 1 ].cpu()
p_def = softmax(m_def(**batch).logits, dim=-1 )[:, 1 ].cpu()
for r, po, pd in zip (recs, p_off, p_def):
txt = r.get("content" , "" )
off, dfn = po >= THR["off" ], pd >= THR["def" ]
if off and not dfn:
outs["off" ].write(json.dumps({"content" : txt}) + "\n" )
elif dfn and not off:
outs["def" ].write(json.dumps({"content" : txt}) + "\n" )
elif off and dfn:
(outs["off" ] if po >= pd else outs["def" ]).write(
json.dumps({"content" : txt}) + "\n"
)
else :
outs["oth" ].write(json.dumps({"content" : txt}) + "\n" )
for f in outs.values():
f.close()
print ("β
Done! β offensive.jsonl defensive.jsonl other.jsonl" )