from __future__ import annotations import json from pathlib import Path from typing import Any import gradio as gr import pandas as pd from agent_threat_map.evaluator import evaluate_response from agent_threat_map.loader import load_probes from agent_threat_map.observability import analyze_case_records from agent_threat_map.report import build_report, report_to_json from agent_threat_map.scoring import aggregate_metrics from ui.visualizations import ( metrics_summary_markdown, observability_markdown, plot_category_risk_bars, plot_composite_radar, plot_mi_threat_bars, plot_threat_cluster_scatter, severity_table_markdown, ) ROOT = Path(__file__).resolve().parents[1] DEFAULT_DATA = ROOT / "data" / "threat_probe_seed.jsonl" _probes = load_probes(DEFAULT_DATA) _probe_map = {p.id: p for p in _probes} _probe_choices = [p.id for p in _probes] def score_one(probe_id: str, response: str) -> tuple[str, str, str]: probe = _probe_map.get(probe_id) if probe is None: return "Unknown probe", "", "" result = evaluate_response(probe, response) summary = ( f"**Passed:** {result.passed}\n\n" f"**Risk score:** {result.risk_score} (weighted: {result.weighted_risk})\n\n" f"**Severity:** {result.severity} (weight {result.severity_weight})\n\n" f"**Safe / unsafe signal hits:** {result.safe_signal_hits} / {result.unsafe_signal_hits}\n\n" f"**Boundary/refusal signal:** {result.boundary_or_refusal_signal}\n\n" f"**Detected failure modes:** {', '.join(result.detected_failure_modes) or '—'}\n\n" f"**Explanation:** {result.explanation}\n\n" "**Matched patterns (unsafe):** " f"{', '.join(result.matched_unsafe_patterns) or '—'}\n\n" "**Matched patterns (safe):** " f"{', '.join(result.matched_safe_patterns) or '—'}" ) detail_json = json.dumps(result.to_dict(), indent=2, ensure_ascii=False) return summary, detail_json, "" def add_to_session(probe_id: str, response: str, session_json: str) -> tuple[str, str]: probe = _probe_map.get(probe_id) if probe is None: return session_json or "[]", "Unknown probe" result = evaluate_response(probe, response) try: cur = json.loads(session_json or "[]") except json.JSONDecodeError: cur = [] cur.append(result.to_dict()) return json.dumps(cur), f"Stored {len(cur)} case(s)." def clear_session() -> tuple[str, str, pd.DataFrame, str, object, object, str]: empty = "[]" metrics = aggregate_metrics([], model_name="session") md = metrics_summary_markdown(metrics) return ( empty, "Session cleared.", pd.DataFrame(), md, plot_category_risk_bars(metrics.get("by_category", {})), plot_composite_radar(metrics), severity_table_markdown(metrics.get("by_severity_tier", {})), ) def aggregate_session( session_json: str, model_name: str, geometry_clusters: float, ) -> tuple[pd.DataFrame, str, str, object, object, str]: from agent_threat_map.schema import CaseScore try: raw = json.loads(session_json or "[]") except json.JSONDecodeError: raw = [] scores: list[CaseScore] = [] for row in raw: scores.append( CaseScore( case_id=row["case_id"], category=row["category"], severity=row["severity"], passed=row["passed"], risk_score=row["risk_score"], severity_weight=row["severity_weight"], weighted_risk=row["weighted_risk"], detected_failure_modes=list(row.get("detected_failure_modes", [])), explanation=row["explanation"], safe_signal_hits=row["safe_signal_hits"], unsafe_signal_hits=row["unsafe_signal_hits"], boundary_or_refusal_signal=row["boundary_or_refusal_signal"], matched_safe_patterns=list(row.get("matched_safe_patterns", [])), matched_unsafe_patterns=list(row.get("matched_unsafe_patterns", [])), task=str(row.get("task", "")), probe_input=str(row.get("probe_input", "")), ) ) metrics = aggregate_metrics(scores, model_name=model_name or "session-model") df = pd.DataFrame( [ { "category": c, **{k: v for k, v in block.items() if k != "note"}, } for c, block in sorted(metrics["by_category"].items()) if isinstance(block, dict) and block.get("n", 0) > 0 ] ) kgeom = max(2, min(12, int(geometry_clusters))) report = build_report( scores, model_name=model_name or "session-model", geometry_clusters=kgeom, ) report_str = report_to_json(report) md = metrics_summary_markdown(metrics) img_bar = plot_category_risk_bars(metrics.get("by_category", {})) img_radar = plot_composite_radar(metrics) sev_md = severity_table_markdown(metrics.get("by_severity_tier", {})) return df, md, report_str, img_bar, img_radar, sev_md def run_geometry_analysis(session_json: str, k_clusters: float) -> tuple[str, Any, Any]: try: cases = json.loads(session_json or "[]") except json.JSONDecodeError: cases = [] k = max(2, min(12, int(k_clusters))) obs = analyze_case_records(cases, n_clusters=k) md = observability_markdown(obs) if not obs.get("eligible"): return md, None, None mi_img = plot_mi_threat_bars(obs["mutual_information"]) sc_img = plot_threat_cluster_scatter(obs["case_clusters"]) return md, mi_img, sc_img with gr.Blocks(title="Agent Threat Map (research)") as demo: gr.Markdown( "# Agent Threat Map — observatory (research)\n" "Map fragile behavior with **expanded metrics** plus **observable geometry**: TF-IDF/SVD embeddings, " "KMeans clusters, and mutual information vs category / severity / pass-fail (same observability shape as " "the CARB failure demos). **Not** a certified security scanner." ) session_state = gr.State("[]") with gr.Tab("Score one probe"): probe_dd = gr.Dropdown(choices=_probe_choices, label="Probe", value=_probe_choices[0]) response_tb = gr.Textbox(label="Model / agent response", lines=10) score_btn = gr.Button("Score response") out_md = gr.Markdown() out_json = gr.Code(label="Case JSON", language="json") def _score_wrap(pid: str, text: str): a, b, _ = score_one(pid, text) return a, b score_btn.click(_score_wrap, [probe_dd, response_tb], [out_md, out_json]) with gr.Tab("Session & aggregates"): gr.Markdown( "Add multiple scored cases, then aggregate to view **full metrics** and export a JSON report." ) probe_dd2 = gr.Dropdown(choices=_probe_choices, label="Probe", value=_probe_choices[0]) response_tb2 = gr.Textbox(label="Model / agent response", lines=8) model_name = gr.Textbox(label="Model label (for report)", value="manual-eval") geom_k = gr.Slider(2, 12, value=4, step=1, label="Clusters for geometry (report + MI)") add_btn = gr.Button("Append to session") agg_btn = gr.Button("Compute aggregates & report") clr_btn = gr.Button("Clear session") sess_msg = gr.Markdown() cat_table = gr.Dataframe(label="Category metrics", interactive=False) metrics_md = gr.Markdown() sev_md = gr.Markdown() plot_bar = gr.Image(label="Category risk vs pass rate", type="numpy") plot_rad = gr.Image(label="Category mean risk (radar)", type="numpy") report_out = gr.Code(label="Full JSON report", language="json") add_btn.click(add_to_session, [probe_dd2, response_tb2, session_state], [session_state, sess_msg]) agg_btn.click( aggregate_session, [session_state, model_name, geom_k], [cat_table, metrics_md, report_out, plot_bar, plot_rad, sev_md], ) clr_btn.click(clear_session, None, [session_state, sess_msg, cat_table, metrics_md, plot_bar, plot_rad, sev_md]) with gr.Tab("Observable geometry"): gr.Markdown( "Runs **embedding → clustering → MI** on all cases in the session (same pipeline family as " "`failure-geometry-demo`). Needs **≥5** scored rows for defaults; reports also include an " "`observability` block when you export JSON from *Session & aggregates*." ) geom_k2 = gr.Slider(2, 12, value=4, step=1, label="Number of clusters") geom_btn = gr.Button("Run geometry analysis on session") geom_md = gr.Markdown() geom_mi = gr.Image(label="Mutual information", type="numpy") geom_sc = gr.Image(label="2-D embedding scatter", type="numpy") geom_btn.click( run_geometry_analysis, [session_state, geom_k2], [geom_md, geom_mi, geom_sc], )