| """ |
| Threat-map observability: TF-IDF + SVD embeddings, KMeans clusters, mutual information. |
| |
| Mirrors the failure-geometry / CARB pipeline shape (embed → cluster → MI vs labels) |
| for **scored threat probes**, so structural patterns in risky evaluations are visible. |
| |
| No network downloads; scikit-learn only. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import numpy as np |
| from sklearn.cluster import KMeans |
| from sklearn.decomposition import TruncatedSVD |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics import mutual_info_score |
| from sklearn.preprocessing import normalize |
|
|
|
|
| def observation_text(case: dict) -> str: |
| """Dense text view of one CaseScore (+ optional probe context) for embedding.""" |
| fm = " ".join(case.get("detected_failure_modes") or []) |
| u = " ".join(case.get("matched_unsafe_patterns") or []) |
| s = " ".join(case.get("matched_safe_patterns") or []) |
| task = case.get("task") or "" |
| pin = (case.get("probe_input") or "")[:800] |
| pf = "pass" if case.get("passed") else "fail" |
| return ( |
| f"category: {case.get('category', '')} " |
| f"severity: {case.get('severity', '')} " |
| f"pass_fail: {pf} " |
| f"risk: {case.get('risk_score', '')} weighted: {case.get('weighted_risk', '')} " |
| f"task: {task} " |
| f"probe_input: {pin} " |
| f"explanation: {case.get('explanation', '')} " |
| f"failure_modes: {fm} " |
| f"unsafe_patterns: {u} " |
| f"safe_patterns: {s}" |
| ) |
|
|
|
|
| def _embed_texts(texts: list[str], n_components: int) -> np.ndarray: |
| if not texts: |
| return np.empty((0, max(n_components, 1))) |
| n = len(texts) |
| vectorizer = TfidfVectorizer( |
| max_features=800, |
| ngram_range=(1, 2), |
| sublinear_tf=True, |
| ) |
| tfidf = vectorizer.fit_transform(texts) |
| effective_dims = min(n_components, tfidf.shape[1] - 1, max(n - 1, 1)) |
| if effective_dims < 2: |
| arr = tfidf.toarray() |
| return normalize(arr[:, : max(effective_dims, 1)]) |
| svd = TruncatedSVD(n_components=effective_dims, random_state=42) |
| dense = svd.fit_transform(tfidf) |
| return normalize(dense) |
|
|
|
|
| def _cluster(embeddings: np.ndarray, n_clusters: int, random_state: int = 42) -> list[int]: |
| if len(embeddings) == 0: |
| return [] |
| effective_k = max(2, min(n_clusters, len(embeddings))) |
| if effective_k == 1 or len(embeddings) < 2: |
| return [0] * len(embeddings) |
| km = KMeans(n_clusters=effective_k, random_state=random_state, n_init=10) |
| return km.fit_predict(embeddings).tolist() |
|
|
|
|
| def analyze_case_records( |
| cases: list[dict], |
| *, |
| n_clusters: int = 4, |
| min_cases: int = 5, |
| random_state: int = 42, |
| ) -> dict: |
| """ |
| Embed scored cases, cluster in SVD space, compare clusters to category / severity / pass-fail. |
| |
| Returns a dict suitable for JSON reports and Gradio; ``eligible`` False when too few rows. |
| """ |
| n = len(cases) |
| if n < min_cases: |
| return { |
| "eligible": False, |
| "message": f"Need at least {min_cases} scored cases (have {n}).", |
| "n_cases": n, |
| "mutual_information": {}, |
| "case_clusters": [], |
| } |
| if n < 3: |
| return { |
| "eligible": False, |
| "message": "Need at least 3 cases for stable embedding dimensions.", |
| "n_cases": n, |
| "mutual_information": {}, |
| "case_clusters": [], |
| } |
|
|
| texts = [observation_text(c) for c in cases] |
| emb = _embed_texts(texts, n_components=32) |
| coords_2d = _embed_texts(texts, n_components=2) |
| if coords_2d.shape[1] == 1 and n >= 3: |
| coords_2d = np.hstack([coords_2d, np.zeros((n, 1))]) |
|
|
| cluster_ids = _cluster(emb, n_clusters, random_state=random_state) |
| categories = [str(c.get("category", "")) for c in cases] |
| severities = [str(c.get("severity", "medium")) for c in cases] |
| pass_labels = ["pass" if c.get("passed") else "fail" for c in cases] |
|
|
| mi_cat = float(mutual_info_score(cluster_ids, categories)) |
| mi_sev = float(mutual_info_score(cluster_ids, severities)) |
| mi_pf = float(mutual_info_score(cluster_ids, pass_labels)) |
|
|
| effective_k = len(set(cluster_ids)) |
| case_clusters = [ |
| { |
| "case_id": c.get("case_id", ""), |
| "cluster_id": int(cid), |
| "category": categories[i], |
| "severity": severities[i], |
| "passed": bool(c.get("passed")), |
| "scatter_x": float(coords_2d[i, 0]) if coords_2d.shape[1] > 0 else 0.0, |
| "scatter_y": float(coords_2d[i, 1]) if coords_2d.shape[1] > 1 else 0.0, |
| } |
| for i, (c, cid) in enumerate(zip(cases, cluster_ids, strict=True)) |
| ] |
|
|
| interpretation = ( |
| "Higher MI(cluster, category) suggests clusters align with threat family; " |
| "higher MI(cluster, pass_fail) suggests clusters separate mostly by outcome." |
| ) |
|
|
| return { |
| "eligible": True, |
| "message": "Embedding + clustering complete.", |
| "n_cases": n, |
| "n_clusters_used": effective_k, |
| "mutual_information": { |
| "MI(cluster, category)": round(mi_cat, 6), |
| "MI(cluster, severity)": round(mi_sev, 6), |
| "MI(cluster, pass_fail)": round(mi_pf, 6), |
| }, |
| "interpretation": interpretation, |
| "case_clusters": case_clusters, |
| } |
|
|