File size: 5,324 Bytes
6c3043e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | """
Threat-map observability: TF-IDF + SVD embeddings, KMeans clusters, mutual information.
Mirrors the failure-geometry / CARB pipeline shape (embed → cluster → MI vs labels)
for **scored threat probes**, so structural patterns in risky evaluations are visible.
No network downloads; scikit-learn only.
"""
from __future__ import annotations
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import normalize
def observation_text(case: dict) -> str:
"""Dense text view of one CaseScore (+ optional probe context) for embedding."""
fm = " ".join(case.get("detected_failure_modes") or [])
u = " ".join(case.get("matched_unsafe_patterns") or [])
s = " ".join(case.get("matched_safe_patterns") or [])
task = case.get("task") or ""
pin = (case.get("probe_input") or "")[:800]
pf = "pass" if case.get("passed") else "fail"
return (
f"category: {case.get('category', '')} "
f"severity: {case.get('severity', '')} "
f"pass_fail: {pf} "
f"risk: {case.get('risk_score', '')} weighted: {case.get('weighted_risk', '')} "
f"task: {task} "
f"probe_input: {pin} "
f"explanation: {case.get('explanation', '')} "
f"failure_modes: {fm} "
f"unsafe_patterns: {u} "
f"safe_patterns: {s}"
)
def _embed_texts(texts: list[str], n_components: int) -> np.ndarray:
if not texts:
return np.empty((0, max(n_components, 1)))
n = len(texts)
vectorizer = TfidfVectorizer(
max_features=800,
ngram_range=(1, 2),
sublinear_tf=True,
)
tfidf = vectorizer.fit_transform(texts)
effective_dims = min(n_components, tfidf.shape[1] - 1, max(n - 1, 1))
if effective_dims < 2:
arr = tfidf.toarray()
return normalize(arr[:, : max(effective_dims, 1)])
svd = TruncatedSVD(n_components=effective_dims, random_state=42)
dense = svd.fit_transform(tfidf)
return normalize(dense)
def _cluster(embeddings: np.ndarray, n_clusters: int, random_state: int = 42) -> list[int]:
if len(embeddings) == 0:
return []
effective_k = max(2, min(n_clusters, len(embeddings)))
if effective_k == 1 or len(embeddings) < 2:
return [0] * len(embeddings)
km = KMeans(n_clusters=effective_k, random_state=random_state, n_init=10)
return km.fit_predict(embeddings).tolist()
def analyze_case_records(
cases: list[dict],
*,
n_clusters: int = 4,
min_cases: int = 5,
random_state: int = 42,
) -> dict:
"""
Embed scored cases, cluster in SVD space, compare clusters to category / severity / pass-fail.
Returns a dict suitable for JSON reports and Gradio; ``eligible`` False when too few rows.
"""
n = len(cases)
if n < min_cases:
return {
"eligible": False,
"message": f"Need at least {min_cases} scored cases (have {n}).",
"n_cases": n,
"mutual_information": {},
"case_clusters": [],
}
if n < 3:
return {
"eligible": False,
"message": "Need at least 3 cases for stable embedding dimensions.",
"n_cases": n,
"mutual_information": {},
"case_clusters": [],
}
texts = [observation_text(c) for c in cases]
emb = _embed_texts(texts, n_components=32)
coords_2d = _embed_texts(texts, n_components=2)
if coords_2d.shape[1] == 1 and n >= 3:
coords_2d = np.hstack([coords_2d, np.zeros((n, 1))])
cluster_ids = _cluster(emb, n_clusters, random_state=random_state)
categories = [str(c.get("category", "")) for c in cases]
severities = [str(c.get("severity", "medium")) for c in cases]
pass_labels = ["pass" if c.get("passed") else "fail" for c in cases]
mi_cat = float(mutual_info_score(cluster_ids, categories))
mi_sev = float(mutual_info_score(cluster_ids, severities))
mi_pf = float(mutual_info_score(cluster_ids, pass_labels))
effective_k = len(set(cluster_ids))
case_clusters = [
{
"case_id": c.get("case_id", ""),
"cluster_id": int(cid),
"category": categories[i],
"severity": severities[i],
"passed": bool(c.get("passed")),
"scatter_x": float(coords_2d[i, 0]) if coords_2d.shape[1] > 0 else 0.0,
"scatter_y": float(coords_2d[i, 1]) if coords_2d.shape[1] > 1 else 0.0,
}
for i, (c, cid) in enumerate(zip(cases, cluster_ids, strict=True))
]
interpretation = (
"Higher MI(cluster, category) suggests clusters align with threat family; "
"higher MI(cluster, pass_fail) suggests clusters separate mostly by outcome."
)
return {
"eligible": True,
"message": "Embedding + clustering complete.",
"n_cases": n,
"n_clusters_used": effective_k,
"mutual_information": {
"MI(cluster, category)": round(mi_cat, 6),
"MI(cluster, severity)": round(mi_sev, 6),
"MI(cluster, pass_fail)": round(mi_pf, 6),
},
"interpretation": interpretation,
"case_clusters": case_clusters,
}
|