| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import gradio as gr |
| import pandas as pd |
|
|
| from agent_threat_map.evaluator import evaluate_response |
| from agent_threat_map.loader import load_probes |
| from agent_threat_map.observability import analyze_case_records |
| from agent_threat_map.report import build_report, report_to_json |
| from agent_threat_map.scoring import aggregate_metrics |
|
|
| from ui.visualizations import ( |
| metrics_summary_markdown, |
| observability_markdown, |
| plot_category_risk_bars, |
| plot_composite_radar, |
| plot_mi_threat_bars, |
| plot_threat_cluster_scatter, |
| severity_table_markdown, |
| ) |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| DEFAULT_DATA = ROOT / "data" / "threat_probe_seed.jsonl" |
|
|
| _probes = load_probes(DEFAULT_DATA) |
| _probe_map = {p.id: p for p in _probes} |
| _probe_choices = [p.id for p in _probes] |
|
|
|
|
| def score_one(probe_id: str, response: str) -> tuple[str, str, str]: |
| probe = _probe_map.get(probe_id) |
| if probe is None: |
| return "Unknown probe", "", "" |
| result = evaluate_response(probe, response) |
| summary = ( |
| f"**Passed:** {result.passed}\n\n" |
| f"**Risk score:** {result.risk_score} (weighted: {result.weighted_risk})\n\n" |
| f"**Severity:** {result.severity} (weight {result.severity_weight})\n\n" |
| f"**Safe / unsafe signal hits:** {result.safe_signal_hits} / {result.unsafe_signal_hits}\n\n" |
| f"**Boundary/refusal signal:** {result.boundary_or_refusal_signal}\n\n" |
| f"**Detected failure modes:** {', '.join(result.detected_failure_modes) or '—'}\n\n" |
| f"**Explanation:** {result.explanation}\n\n" |
| "**Matched patterns (unsafe):** " |
| f"{', '.join(result.matched_unsafe_patterns) or '—'}\n\n" |
| "**Matched patterns (safe):** " |
| f"{', '.join(result.matched_safe_patterns) or '—'}" |
| ) |
| detail_json = json.dumps(result.to_dict(), indent=2, ensure_ascii=False) |
| return summary, detail_json, "" |
|
|
|
|
| def add_to_session(probe_id: str, response: str, session_json: str) -> tuple[str, str]: |
| probe = _probe_map.get(probe_id) |
| if probe is None: |
| return session_json or "[]", "Unknown probe" |
| result = evaluate_response(probe, response) |
| try: |
| cur = json.loads(session_json or "[]") |
| except json.JSONDecodeError: |
| cur = [] |
| cur.append(result.to_dict()) |
| return json.dumps(cur), f"Stored {len(cur)} case(s)." |
|
|
|
|
| def clear_session() -> tuple[str, str, pd.DataFrame, str, object, object, str]: |
| empty = "[]" |
| metrics = aggregate_metrics([], model_name="session") |
| md = metrics_summary_markdown(metrics) |
| return ( |
| empty, |
| "Session cleared.", |
| pd.DataFrame(), |
| md, |
| plot_category_risk_bars(metrics.get("by_category", {})), |
| plot_composite_radar(metrics), |
| severity_table_markdown(metrics.get("by_severity_tier", {})), |
| ) |
|
|
|
|
| def aggregate_session( |
| session_json: str, |
| model_name: str, |
| geometry_clusters: float, |
| ) -> tuple[pd.DataFrame, str, str, object, object, str]: |
| from agent_threat_map.schema import CaseScore |
|
|
| try: |
| raw = json.loads(session_json or "[]") |
| except json.JSONDecodeError: |
| raw = [] |
| scores: list[CaseScore] = [] |
| for row in raw: |
| scores.append( |
| CaseScore( |
| case_id=row["case_id"], |
| category=row["category"], |
| severity=row["severity"], |
| passed=row["passed"], |
| risk_score=row["risk_score"], |
| severity_weight=row["severity_weight"], |
| weighted_risk=row["weighted_risk"], |
| detected_failure_modes=list(row.get("detected_failure_modes", [])), |
| explanation=row["explanation"], |
| safe_signal_hits=row["safe_signal_hits"], |
| unsafe_signal_hits=row["unsafe_signal_hits"], |
| boundary_or_refusal_signal=row["boundary_or_refusal_signal"], |
| matched_safe_patterns=list(row.get("matched_safe_patterns", [])), |
| matched_unsafe_patterns=list(row.get("matched_unsafe_patterns", [])), |
| task=str(row.get("task", "")), |
| probe_input=str(row.get("probe_input", "")), |
| ) |
| ) |
| metrics = aggregate_metrics(scores, model_name=model_name or "session-model") |
| df = pd.DataFrame( |
| [ |
| { |
| "category": c, |
| **{k: v for k, v in block.items() if k != "note"}, |
| } |
| for c, block in sorted(metrics["by_category"].items()) |
| if isinstance(block, dict) and block.get("n", 0) > 0 |
| ] |
| ) |
| kgeom = max(2, min(12, int(geometry_clusters))) |
| report = build_report( |
| scores, |
| model_name=model_name or "session-model", |
| geometry_clusters=kgeom, |
| ) |
| report_str = report_to_json(report) |
| md = metrics_summary_markdown(metrics) |
| img_bar = plot_category_risk_bars(metrics.get("by_category", {})) |
| img_radar = plot_composite_radar(metrics) |
| sev_md = severity_table_markdown(metrics.get("by_severity_tier", {})) |
| return df, md, report_str, img_bar, img_radar, sev_md |
|
|
|
|
| def run_geometry_analysis(session_json: str, k_clusters: float) -> tuple[str, Any, Any]: |
| try: |
| cases = json.loads(session_json or "[]") |
| except json.JSONDecodeError: |
| cases = [] |
| k = max(2, min(12, int(k_clusters))) |
| obs = analyze_case_records(cases, n_clusters=k) |
| md = observability_markdown(obs) |
| if not obs.get("eligible"): |
| return md, None, None |
| mi_img = plot_mi_threat_bars(obs["mutual_information"]) |
| sc_img = plot_threat_cluster_scatter(obs["case_clusters"]) |
| return md, mi_img, sc_img |
|
|
|
|
| with gr.Blocks(title="Agent Threat Map (research)") as demo: |
| gr.Markdown( |
| "# Agent Threat Map — observatory (research)\n" |
| "Map fragile behavior with **expanded metrics** plus **observable geometry**: TF-IDF/SVD embeddings, " |
| "KMeans clusters, and mutual information vs category / severity / pass-fail (same observability shape as " |
| "the CARB failure demos). **Not** a certified security scanner." |
| ) |
| session_state = gr.State("[]") |
|
|
| with gr.Tab("Score one probe"): |
| probe_dd = gr.Dropdown(choices=_probe_choices, label="Probe", value=_probe_choices[0]) |
| response_tb = gr.Textbox(label="Model / agent response", lines=10) |
| score_btn = gr.Button("Score response") |
| out_md = gr.Markdown() |
| out_json = gr.Code(label="Case JSON", language="json") |
|
|
| def _score_wrap(pid: str, text: str): |
| a, b, _ = score_one(pid, text) |
| return a, b |
|
|
| score_btn.click(_score_wrap, [probe_dd, response_tb], [out_md, out_json]) |
|
|
| with gr.Tab("Session & aggregates"): |
| gr.Markdown( |
| "Add multiple scored cases, then aggregate to view **full metrics** and export a JSON report." |
| ) |
| probe_dd2 = gr.Dropdown(choices=_probe_choices, label="Probe", value=_probe_choices[0]) |
| response_tb2 = gr.Textbox(label="Model / agent response", lines=8) |
| model_name = gr.Textbox(label="Model label (for report)", value="manual-eval") |
| geom_k = gr.Slider(2, 12, value=4, step=1, label="Clusters for geometry (report + MI)") |
| add_btn = gr.Button("Append to session") |
| agg_btn = gr.Button("Compute aggregates & report") |
| clr_btn = gr.Button("Clear session") |
| sess_msg = gr.Markdown() |
| cat_table = gr.Dataframe(label="Category metrics", interactive=False) |
| metrics_md = gr.Markdown() |
| sev_md = gr.Markdown() |
| plot_bar = gr.Image(label="Category risk vs pass rate", type="numpy") |
| plot_rad = gr.Image(label="Category mean risk (radar)", type="numpy") |
| report_out = gr.Code(label="Full JSON report", language="json") |
|
|
| add_btn.click(add_to_session, [probe_dd2, response_tb2, session_state], [session_state, sess_msg]) |
|
|
| agg_btn.click( |
| aggregate_session, |
| [session_state, model_name, geom_k], |
| [cat_table, metrics_md, report_out, plot_bar, plot_rad, sev_md], |
| ) |
| clr_btn.click(clear_session, None, [session_state, sess_msg, cat_table, metrics_md, plot_bar, plot_rad, sev_md]) |
|
|
| with gr.Tab("Observable geometry"): |
| gr.Markdown( |
| "Runs **embedding → clustering → MI** on all cases in the session (same pipeline family as " |
| "`failure-geometry-demo`). Needs **≥5** scored rows for defaults; reports also include an " |
| "`observability` block when you export JSON from *Session & aggregates*." |
| ) |
| geom_k2 = gr.Slider(2, 12, value=4, step=1, label="Number of clusters") |
| geom_btn = gr.Button("Run geometry analysis on session") |
| geom_md = gr.Markdown() |
| geom_mi = gr.Image(label="Mutual information", type="numpy") |
| geom_sc = gr.Image(label="2-D embedding scatter", type="numpy") |
| geom_btn.click( |
| run_geometry_analysis, |
| [session_state, geom_k2], |
| [geom_md, geom_mi, geom_sc], |
| ) |
|
|