agent-threat-map / agent_threat_map /observability.py
obversarystudios's picture
Threat-map metrics + observable geometry (embed/cluster/MI)
6c3043e verified
"""
Threat-map observability: TF-IDF + SVD embeddings, KMeans clusters, mutual information.
Mirrors the failure-geometry / CARB pipeline shape (embed → cluster → MI vs labels)
for **scored threat probes**, so structural patterns in risky evaluations are visible.
No network downloads; scikit-learn only.
"""
from __future__ import annotations
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import normalize
def observation_text(case: dict) -> str:
"""Dense text view of one CaseScore (+ optional probe context) for embedding."""
fm = " ".join(case.get("detected_failure_modes") or [])
u = " ".join(case.get("matched_unsafe_patterns") or [])
s = " ".join(case.get("matched_safe_patterns") or [])
task = case.get("task") or ""
pin = (case.get("probe_input") or "")[:800]
pf = "pass" if case.get("passed") else "fail"
return (
f"category: {case.get('category', '')} "
f"severity: {case.get('severity', '')} "
f"pass_fail: {pf} "
f"risk: {case.get('risk_score', '')} weighted: {case.get('weighted_risk', '')} "
f"task: {task} "
f"probe_input: {pin} "
f"explanation: {case.get('explanation', '')} "
f"failure_modes: {fm} "
f"unsafe_patterns: {u} "
f"safe_patterns: {s}"
)
def _embed_texts(texts: list[str], n_components: int) -> np.ndarray:
if not texts:
return np.empty((0, max(n_components, 1)))
n = len(texts)
vectorizer = TfidfVectorizer(
max_features=800,
ngram_range=(1, 2),
sublinear_tf=True,
)
tfidf = vectorizer.fit_transform(texts)
effective_dims = min(n_components, tfidf.shape[1] - 1, max(n - 1, 1))
if effective_dims < 2:
arr = tfidf.toarray()
return normalize(arr[:, : max(effective_dims, 1)])
svd = TruncatedSVD(n_components=effective_dims, random_state=42)
dense = svd.fit_transform(tfidf)
return normalize(dense)
def _cluster(embeddings: np.ndarray, n_clusters: int, random_state: int = 42) -> list[int]:
if len(embeddings) == 0:
return []
effective_k = max(2, min(n_clusters, len(embeddings)))
if effective_k == 1 or len(embeddings) < 2:
return [0] * len(embeddings)
km = KMeans(n_clusters=effective_k, random_state=random_state, n_init=10)
return km.fit_predict(embeddings).tolist()
def analyze_case_records(
cases: list[dict],
*,
n_clusters: int = 4,
min_cases: int = 5,
random_state: int = 42,
) -> dict:
"""
Embed scored cases, cluster in SVD space, compare clusters to category / severity / pass-fail.
Returns a dict suitable for JSON reports and Gradio; ``eligible`` False when too few rows.
"""
n = len(cases)
if n < min_cases:
return {
"eligible": False,
"message": f"Need at least {min_cases} scored cases (have {n}).",
"n_cases": n,
"mutual_information": {},
"case_clusters": [],
}
if n < 3:
return {
"eligible": False,
"message": "Need at least 3 cases for stable embedding dimensions.",
"n_cases": n,
"mutual_information": {},
"case_clusters": [],
}
texts = [observation_text(c) for c in cases]
emb = _embed_texts(texts, n_components=32)
coords_2d = _embed_texts(texts, n_components=2)
if coords_2d.shape[1] == 1 and n >= 3:
coords_2d = np.hstack([coords_2d, np.zeros((n, 1))])
cluster_ids = _cluster(emb, n_clusters, random_state=random_state)
categories = [str(c.get("category", "")) for c in cases]
severities = [str(c.get("severity", "medium")) for c in cases]
pass_labels = ["pass" if c.get("passed") else "fail" for c in cases]
mi_cat = float(mutual_info_score(cluster_ids, categories))
mi_sev = float(mutual_info_score(cluster_ids, severities))
mi_pf = float(mutual_info_score(cluster_ids, pass_labels))
effective_k = len(set(cluster_ids))
case_clusters = [
{
"case_id": c.get("case_id", ""),
"cluster_id": int(cid),
"category": categories[i],
"severity": severities[i],
"passed": bool(c.get("passed")),
"scatter_x": float(coords_2d[i, 0]) if coords_2d.shape[1] > 0 else 0.0,
"scatter_y": float(coords_2d[i, 1]) if coords_2d.shape[1] > 1 else 0.0,
}
for i, (c, cid) in enumerate(zip(cases, cluster_ids, strict=True))
]
interpretation = (
"Higher MI(cluster, category) suggests clusters align with threat family; "
"higher MI(cluster, pass_fail) suggests clusters separate mostly by outcome."
)
return {
"eligible": True,
"message": "Embedding + clustering complete.",
"n_cases": n,
"n_clusters_used": effective_k,
"mutual_information": {
"MI(cluster, category)": round(mi_cat, 6),
"MI(cluster, severity)": round(mi_sev, 6),
"MI(cluster, pass_fail)": round(mi_pf, 6),
},
"interpretation": interpretation,
"case_clusters": case_clusters,
}