""" Threat-map observability: TF-IDF + SVD embeddings, KMeans clusters, mutual information. Mirrors the failure-geometry / CARB pipeline shape (embed → cluster → MI vs labels) for **scored threat probes**, so structural patterns in risky evaluations are visible. No network downloads; scikit-learn only. """ from __future__ import annotations import numpy as np from sklearn.cluster import KMeans from sklearn.decomposition import TruncatedSVD from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics import mutual_info_score from sklearn.preprocessing import normalize def observation_text(case: dict) -> str: """Dense text view of one CaseScore (+ optional probe context) for embedding.""" fm = " ".join(case.get("detected_failure_modes") or []) u = " ".join(case.get("matched_unsafe_patterns") or []) s = " ".join(case.get("matched_safe_patterns") or []) task = case.get("task") or "" pin = (case.get("probe_input") or "")[:800] pf = "pass" if case.get("passed") else "fail" return ( f"category: {case.get('category', '')} " f"severity: {case.get('severity', '')} " f"pass_fail: {pf} " f"risk: {case.get('risk_score', '')} weighted: {case.get('weighted_risk', '')} " f"task: {task} " f"probe_input: {pin} " f"explanation: {case.get('explanation', '')} " f"failure_modes: {fm} " f"unsafe_patterns: {u} " f"safe_patterns: {s}" ) def _embed_texts(texts: list[str], n_components: int) -> np.ndarray: if not texts: return np.empty((0, max(n_components, 1))) n = len(texts) vectorizer = TfidfVectorizer( max_features=800, ngram_range=(1, 2), sublinear_tf=True, ) tfidf = vectorizer.fit_transform(texts) effective_dims = min(n_components, tfidf.shape[1] - 1, max(n - 1, 1)) if effective_dims < 2: arr = tfidf.toarray() return normalize(arr[:, : max(effective_dims, 1)]) svd = TruncatedSVD(n_components=effective_dims, random_state=42) dense = svd.fit_transform(tfidf) return normalize(dense) def _cluster(embeddings: np.ndarray, n_clusters: int, random_state: int = 42) -> list[int]: if len(embeddings) == 0: return [] effective_k = max(2, min(n_clusters, len(embeddings))) if effective_k == 1 or len(embeddings) < 2: return [0] * len(embeddings) km = KMeans(n_clusters=effective_k, random_state=random_state, n_init=10) return km.fit_predict(embeddings).tolist() def analyze_case_records( cases: list[dict], *, n_clusters: int = 4, min_cases: int = 5, random_state: int = 42, ) -> dict: """ Embed scored cases, cluster in SVD space, compare clusters to category / severity / pass-fail. Returns a dict suitable for JSON reports and Gradio; ``eligible`` False when too few rows. """ n = len(cases) if n < min_cases: return { "eligible": False, "message": f"Need at least {min_cases} scored cases (have {n}).", "n_cases": n, "mutual_information": {}, "case_clusters": [], } if n < 3: return { "eligible": False, "message": "Need at least 3 cases for stable embedding dimensions.", "n_cases": n, "mutual_information": {}, "case_clusters": [], } texts = [observation_text(c) for c in cases] emb = _embed_texts(texts, n_components=32) coords_2d = _embed_texts(texts, n_components=2) if coords_2d.shape[1] == 1 and n >= 3: coords_2d = np.hstack([coords_2d, np.zeros((n, 1))]) cluster_ids = _cluster(emb, n_clusters, random_state=random_state) categories = [str(c.get("category", "")) for c in cases] severities = [str(c.get("severity", "medium")) for c in cases] pass_labels = ["pass" if c.get("passed") else "fail" for c in cases] mi_cat = float(mutual_info_score(cluster_ids, categories)) mi_sev = float(mutual_info_score(cluster_ids, severities)) mi_pf = float(mutual_info_score(cluster_ids, pass_labels)) effective_k = len(set(cluster_ids)) case_clusters = [ { "case_id": c.get("case_id", ""), "cluster_id": int(cid), "category": categories[i], "severity": severities[i], "passed": bool(c.get("passed")), "scatter_x": float(coords_2d[i, 0]) if coords_2d.shape[1] > 0 else 0.0, "scatter_y": float(coords_2d[i, 1]) if coords_2d.shape[1] > 1 else 0.0, } for i, (c, cid) in enumerate(zip(cases, cluster_ids, strict=True)) ] interpretation = ( "Higher MI(cluster, category) suggests clusters align with threat family; " "higher MI(cluster, pass_fail) suggests clusters separate mostly by outcome." ) return { "eligible": True, "message": "Embedding + clustering complete.", "n_cases": n, "n_clusters_used": effective_k, "mutual_information": { "MI(cluster, category)": round(mi_cat, 6), "MI(cluster, severity)": round(mi_sev, 6), "MI(cluster, pass_fail)": round(mi_pf, 6), }, "interpretation": interpretation, "case_clusters": case_clusters, }