| from __future__ import annotations |
|
|
| import json |
| from collections.abc import Sequence |
| from datetime import UTC, datetime |
|
|
| from agent_threat_map.observability import analyze_case_records |
| from agent_threat_map.schema import CaseScore |
| from agent_threat_map.scoring import aggregate_metrics |
|
|
|
|
| def build_report( |
| scores: Sequence[CaseScore], |
| *, |
| model_name: str = "unspecified", |
| probe_version: str = "seed", |
| geometry_clusters: int = 4, |
| geometry_min_cases: int = 5, |
| ) -> dict: |
| metrics = aggregate_metrics(scores, model_name=model_name) |
| cases_dict = [s.to_dict() for s in scores] |
| observability = analyze_case_records( |
| cases_dict, |
| n_clusters=geometry_clusters, |
| min_cases=geometry_min_cases, |
| ) |
| return { |
| "generated_at_utc": datetime.now(UTC).isoformat(), |
| "probe_dataset": probe_version, |
| "metrics": metrics, |
| "observability": observability, |
| "cases": cases_dict, |
| } |
|
|
|
|
| def report_to_json(report: dict, *, indent: int = 2) -> str: |
| return json.dumps(report, indent=indent, ensure_ascii=False) |
|
|