agent-threat-map / ui /gradio_app.py
obversarystudios's picture
Threat-map metrics + observable geometry (embed/cluster/MI)
6c3043e verified
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import gradio as gr
import pandas as pd
from agent_threat_map.evaluator import evaluate_response
from agent_threat_map.loader import load_probes
from agent_threat_map.observability import analyze_case_records
from agent_threat_map.report import build_report, report_to_json
from agent_threat_map.scoring import aggregate_metrics
from ui.visualizations import (
metrics_summary_markdown,
observability_markdown,
plot_category_risk_bars,
plot_composite_radar,
plot_mi_threat_bars,
plot_threat_cluster_scatter,
severity_table_markdown,
)
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DATA = ROOT / "data" / "threat_probe_seed.jsonl"
_probes = load_probes(DEFAULT_DATA)
_probe_map = {p.id: p for p in _probes}
_probe_choices = [p.id for p in _probes]
def score_one(probe_id: str, response: str) -> tuple[str, str, str]:
probe = _probe_map.get(probe_id)
if probe is None:
return "Unknown probe", "", ""
result = evaluate_response(probe, response)
summary = (
f"**Passed:** {result.passed}\n\n"
f"**Risk score:** {result.risk_score} (weighted: {result.weighted_risk})\n\n"
f"**Severity:** {result.severity} (weight {result.severity_weight})\n\n"
f"**Safe / unsafe signal hits:** {result.safe_signal_hits} / {result.unsafe_signal_hits}\n\n"
f"**Boundary/refusal signal:** {result.boundary_or_refusal_signal}\n\n"
f"**Detected failure modes:** {', '.join(result.detected_failure_modes) or '—'}\n\n"
f"**Explanation:** {result.explanation}\n\n"
"**Matched patterns (unsafe):** "
f"{', '.join(result.matched_unsafe_patterns) or '—'}\n\n"
"**Matched patterns (safe):** "
f"{', '.join(result.matched_safe_patterns) or '—'}"
)
detail_json = json.dumps(result.to_dict(), indent=2, ensure_ascii=False)
return summary, detail_json, ""
def add_to_session(probe_id: str, response: str, session_json: str) -> tuple[str, str]:
probe = _probe_map.get(probe_id)
if probe is None:
return session_json or "[]", "Unknown probe"
result = evaluate_response(probe, response)
try:
cur = json.loads(session_json or "[]")
except json.JSONDecodeError:
cur = []
cur.append(result.to_dict())
return json.dumps(cur), f"Stored {len(cur)} case(s)."
def clear_session() -> tuple[str, str, pd.DataFrame, str, object, object, str]:
empty = "[]"
metrics = aggregate_metrics([], model_name="session")
md = metrics_summary_markdown(metrics)
return (
empty,
"Session cleared.",
pd.DataFrame(),
md,
plot_category_risk_bars(metrics.get("by_category", {})),
plot_composite_radar(metrics),
severity_table_markdown(metrics.get("by_severity_tier", {})),
)
def aggregate_session(
session_json: str,
model_name: str,
geometry_clusters: float,
) -> tuple[pd.DataFrame, str, str, object, object, str]:
from agent_threat_map.schema import CaseScore
try:
raw = json.loads(session_json or "[]")
except json.JSONDecodeError:
raw = []
scores: list[CaseScore] = []
for row in raw:
scores.append(
CaseScore(
case_id=row["case_id"],
category=row["category"],
severity=row["severity"],
passed=row["passed"],
risk_score=row["risk_score"],
severity_weight=row["severity_weight"],
weighted_risk=row["weighted_risk"],
detected_failure_modes=list(row.get("detected_failure_modes", [])),
explanation=row["explanation"],
safe_signal_hits=row["safe_signal_hits"],
unsafe_signal_hits=row["unsafe_signal_hits"],
boundary_or_refusal_signal=row["boundary_or_refusal_signal"],
matched_safe_patterns=list(row.get("matched_safe_patterns", [])),
matched_unsafe_patterns=list(row.get("matched_unsafe_patterns", [])),
task=str(row.get("task", "")),
probe_input=str(row.get("probe_input", "")),
)
)
metrics = aggregate_metrics(scores, model_name=model_name or "session-model")
df = pd.DataFrame(
[
{
"category": c,
**{k: v for k, v in block.items() if k != "note"},
}
for c, block in sorted(metrics["by_category"].items())
if isinstance(block, dict) and block.get("n", 0) > 0
]
)
kgeom = max(2, min(12, int(geometry_clusters)))
report = build_report(
scores,
model_name=model_name or "session-model",
geometry_clusters=kgeom,
)
report_str = report_to_json(report)
md = metrics_summary_markdown(metrics)
img_bar = plot_category_risk_bars(metrics.get("by_category", {}))
img_radar = plot_composite_radar(metrics)
sev_md = severity_table_markdown(metrics.get("by_severity_tier", {}))
return df, md, report_str, img_bar, img_radar, sev_md
def run_geometry_analysis(session_json: str, k_clusters: float) -> tuple[str, Any, Any]:
try:
cases = json.loads(session_json or "[]")
except json.JSONDecodeError:
cases = []
k = max(2, min(12, int(k_clusters)))
obs = analyze_case_records(cases, n_clusters=k)
md = observability_markdown(obs)
if not obs.get("eligible"):
return md, None, None
mi_img = plot_mi_threat_bars(obs["mutual_information"])
sc_img = plot_threat_cluster_scatter(obs["case_clusters"])
return md, mi_img, sc_img
with gr.Blocks(title="Agent Threat Map (research)") as demo:
gr.Markdown(
"# Agent Threat Map — observatory (research)\n"
"Map fragile behavior with **expanded metrics** plus **observable geometry**: TF-IDF/SVD embeddings, "
"KMeans clusters, and mutual information vs category / severity / pass-fail (same observability shape as "
"the CARB failure demos). **Not** a certified security scanner."
)
session_state = gr.State("[]")
with gr.Tab("Score one probe"):
probe_dd = gr.Dropdown(choices=_probe_choices, label="Probe", value=_probe_choices[0])
response_tb = gr.Textbox(label="Model / agent response", lines=10)
score_btn = gr.Button("Score response")
out_md = gr.Markdown()
out_json = gr.Code(label="Case JSON", language="json")
def _score_wrap(pid: str, text: str):
a, b, _ = score_one(pid, text)
return a, b
score_btn.click(_score_wrap, [probe_dd, response_tb], [out_md, out_json])
with gr.Tab("Session & aggregates"):
gr.Markdown(
"Add multiple scored cases, then aggregate to view **full metrics** and export a JSON report."
)
probe_dd2 = gr.Dropdown(choices=_probe_choices, label="Probe", value=_probe_choices[0])
response_tb2 = gr.Textbox(label="Model / agent response", lines=8)
model_name = gr.Textbox(label="Model label (for report)", value="manual-eval")
geom_k = gr.Slider(2, 12, value=4, step=1, label="Clusters for geometry (report + MI)")
add_btn = gr.Button("Append to session")
agg_btn = gr.Button("Compute aggregates & report")
clr_btn = gr.Button("Clear session")
sess_msg = gr.Markdown()
cat_table = gr.Dataframe(label="Category metrics", interactive=False)
metrics_md = gr.Markdown()
sev_md = gr.Markdown()
plot_bar = gr.Image(label="Category risk vs pass rate", type="numpy")
plot_rad = gr.Image(label="Category mean risk (radar)", type="numpy")
report_out = gr.Code(label="Full JSON report", language="json")
add_btn.click(add_to_session, [probe_dd2, response_tb2, session_state], [session_state, sess_msg])
agg_btn.click(
aggregate_session,
[session_state, model_name, geom_k],
[cat_table, metrics_md, report_out, plot_bar, plot_rad, sev_md],
)
clr_btn.click(clear_session, None, [session_state, sess_msg, cat_table, metrics_md, plot_bar, plot_rad, sev_md])
with gr.Tab("Observable geometry"):
gr.Markdown(
"Runs **embedding → clustering → MI** on all cases in the session (same pipeline family as "
"`failure-geometry-demo`). Needs **≥5** scored rows for defaults; reports also include an "
"`observability` block when you export JSON from *Session & aggregates*."
)
geom_k2 = gr.Slider(2, 12, value=4, step=1, label="Number of clusters")
geom_btn = gr.Button("Run geometry analysis on session")
geom_md = gr.Markdown()
geom_mi = gr.Image(label="Mutual information", type="numpy")
geom_sc = gr.Image(label="2-D embedding scatter", type="numpy")
geom_btn.click(
run_geometry_analysis,
[session_state, geom_k2],
[geom_md, geom_mi, geom_sc],
)