File size: 5,324 Bytes
6c3043e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
Threat-map observability: TF-IDF + SVD embeddings, KMeans clusters, mutual information.

Mirrors the failure-geometry / CARB pipeline shape (embed → cluster → MI vs labels)
for **scored threat probes**, so structural patterns in risky evaluations are visible.

No network downloads; scikit-learn only.
"""

from __future__ import annotations

import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import normalize


def observation_text(case: dict) -> str:
    """Dense text view of one CaseScore (+ optional probe context) for embedding."""
    fm = " ".join(case.get("detected_failure_modes") or [])
    u = " ".join(case.get("matched_unsafe_patterns") or [])
    s = " ".join(case.get("matched_safe_patterns") or [])
    task = case.get("task") or ""
    pin = (case.get("probe_input") or "")[:800]
    pf = "pass" if case.get("passed") else "fail"
    return (
        f"category: {case.get('category', '')} "
        f"severity: {case.get('severity', '')} "
        f"pass_fail: {pf} "
        f"risk: {case.get('risk_score', '')} weighted: {case.get('weighted_risk', '')} "
        f"task: {task} "
        f"probe_input: {pin} "
        f"explanation: {case.get('explanation', '')} "
        f"failure_modes: {fm} "
        f"unsafe_patterns: {u} "
        f"safe_patterns: {s}"
    )


def _embed_texts(texts: list[str], n_components: int) -> np.ndarray:
    if not texts:
        return np.empty((0, max(n_components, 1)))
    n = len(texts)
    vectorizer = TfidfVectorizer(
        max_features=800,
        ngram_range=(1, 2),
        sublinear_tf=True,
    )
    tfidf = vectorizer.fit_transform(texts)
    effective_dims = min(n_components, tfidf.shape[1] - 1, max(n - 1, 1))
    if effective_dims < 2:
        arr = tfidf.toarray()
        return normalize(arr[:, : max(effective_dims, 1)])
    svd = TruncatedSVD(n_components=effective_dims, random_state=42)
    dense = svd.fit_transform(tfidf)
    return normalize(dense)


def _cluster(embeddings: np.ndarray, n_clusters: int, random_state: int = 42) -> list[int]:
    if len(embeddings) == 0:
        return []
    effective_k = max(2, min(n_clusters, len(embeddings)))
    if effective_k == 1 or len(embeddings) < 2:
        return [0] * len(embeddings)
    km = KMeans(n_clusters=effective_k, random_state=random_state, n_init=10)
    return km.fit_predict(embeddings).tolist()


def analyze_case_records(
    cases: list[dict],
    *,
    n_clusters: int = 4,
    min_cases: int = 5,
    random_state: int = 42,
) -> dict:
    """
    Embed scored cases, cluster in SVD space, compare clusters to category / severity / pass-fail.

    Returns a dict suitable for JSON reports and Gradio; ``eligible`` False when too few rows.
    """
    n = len(cases)
    if n < min_cases:
        return {
            "eligible": False,
            "message": f"Need at least {min_cases} scored cases (have {n}).",
            "n_cases": n,
            "mutual_information": {},
            "case_clusters": [],
        }
    if n < 3:
        return {
            "eligible": False,
            "message": "Need at least 3 cases for stable embedding dimensions.",
            "n_cases": n,
            "mutual_information": {},
            "case_clusters": [],
        }

    texts = [observation_text(c) for c in cases]
    emb = _embed_texts(texts, n_components=32)
    coords_2d = _embed_texts(texts, n_components=2)
    if coords_2d.shape[1] == 1 and n >= 3:
        coords_2d = np.hstack([coords_2d, np.zeros((n, 1))])

    cluster_ids = _cluster(emb, n_clusters, random_state=random_state)
    categories = [str(c.get("category", "")) for c in cases]
    severities = [str(c.get("severity", "medium")) for c in cases]
    pass_labels = ["pass" if c.get("passed") else "fail" for c in cases]

    mi_cat = float(mutual_info_score(cluster_ids, categories))
    mi_sev = float(mutual_info_score(cluster_ids, severities))
    mi_pf = float(mutual_info_score(cluster_ids, pass_labels))

    effective_k = len(set(cluster_ids))
    case_clusters = [
        {
            "case_id": c.get("case_id", ""),
            "cluster_id": int(cid),
            "category": categories[i],
            "severity": severities[i],
            "passed": bool(c.get("passed")),
            "scatter_x": float(coords_2d[i, 0]) if coords_2d.shape[1] > 0 else 0.0,
            "scatter_y": float(coords_2d[i, 1]) if coords_2d.shape[1] > 1 else 0.0,
        }
        for i, (c, cid) in enumerate(zip(cases, cluster_ids, strict=True))
    ]

    interpretation = (
        "Higher MI(cluster, category) suggests clusters align with threat family; "
        "higher MI(cluster, pass_fail) suggests clusters separate mostly by outcome."
    )

    return {
        "eligible": True,
        "message": "Embedding + clustering complete.",
        "n_cases": n,
        "n_clusters_used": effective_k,
        "mutual_information": {
            "MI(cluster, category)": round(mi_cat, 6),
            "MI(cluster, severity)": round(mi_sev, 6),
            "MI(cluster, pass_fail)": round(mi_pf, 6),
        },
        "interpretation": interpretation,
        "case_clusters": case_clusters,
    }