"""Error analysis — full audit of scripted-baseline FPs + v2 LoRA aggregate gaps. Two parts: 1. **Scripted baseline (per-scenario)** — read ``logs/mode_c_scripted_n135.json`` and enumerate every FP, FN, and missed scam, with category and difficulty tags. Full per-scenario fidelity. 2. **v2 LoRA (aggregate-level only)** — read ``logs/eval_v2.json`` and report per-difficulty error counts. Per-scenario v2 audit requires GPU re-inference and is v3 work. Output: ``docs/v2_error_analysis.md`` (markdown for judges). Usage: python eval/error_analysis.py python eval/error_analysis.py --output docs/v2_error_analysis.md """ from __future__ import annotations import argparse import json from collections import defaultdict from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent SCRIPTED = REPO_ROOT / "logs" / "mode_c_scripted_n135.json" EVAL_V2 = REPO_ROOT / "logs" / "eval_v2.json" DEFAULT_OUTPUT = REPO_ROOT / "docs" / "v2_error_analysis.md" def _load_json(path: Path) -> dict: if not path.exists(): raise SystemExit(f"missing input: {path}") return json.loads(path.read_text()) def _scripted_errors(data: dict) -> dict: scenarios = data.get("scenarios", []) by_category: dict[str, dict] = defaultdict( lambda: {"n": 0, "n_correct": 0, "fp": [], "fn": []} ) for s in scenarios: cat = s.get("category", "unknown") truth = bool(s.get("is_scam_truth", False)) flag = bool(s.get("predicted_flag", False)) b = by_category[cat] b["n"] += 1 if truth == flag: b["n_correct"] += 1 if truth and not flag: b["fn"].append(s) elif (not truth) and flag: b["fp"].append(s) summary = { "n_total": len(scenarios), "by_category": { cat: { "n": b["n"], "n_correct": b["n_correct"], "accuracy": round(b["n_correct"] / b["n"], 4) if b["n"] else 0.0, "n_fp": len(b["fp"]), "n_fn": len(b["fn"]), "fp_examples": [ { "scenario_id": s.get("scenario_id"), "predicted_score": s.get("predicted_score"), "difficulty": s.get("difficulty"), } for s in b["fp"] ], "fn_examples": [ { "scenario_id": s.get("scenario_id"), "predicted_score": s.get("predicted_score"), "difficulty": s.get("difficulty"), } for s in b["fn"] ], } for cat, b in sorted(by_category.items()) }, } return summary def _v2_aggregate_summary(eval_data: dict) -> dict: block = eval_data.get("lora_v2", {}) n = int(block.get("n", 0)) detection = float(block.get("detection", 0.0)) fpr = float(block.get("fpr", 0.0)) per_diff = block.get("per_difficulty", {}) n_scams = sum(int(v.get("n", 0)) for v in per_diff.values()) n_benign = max(n - n_scams, 0) n_missed_scams = round((1 - detection) * n_scams) n_fps = round(fpr * n_benign) return { "n_total": n, "n_scams": n_scams, "n_benign": n_benign, "n_missed_scams": n_missed_scams, "n_false_positives": n_fps, "detection_rate": round(detection, 4), "fpr": round(fpr, 4), "f1": float(block.get("f1", 0.0)), "per_difficulty": { diff: { "n": int(info.get("n", 0)), "detection_rate": float(info.get("detection_rate", 0.0)), "n_missed": round(int(info.get("n", 0)) * (1 - float(info.get("detection_rate", 0.0)))), } for diff, info in sorted(per_diff.items()) }, "threshold": float(block.get("threshold", 0.5)), } def render(scripted: dict, v2: dict) -> str: L: list[str] = [] L.append("# v2 Error Analysis") L.append("") L.append( "Honest accounting of where the analyzers fail. Two layers: " "**scripted baseline** has full per-scenario detail; **v2 LoRA** is " "aggregated (per-scenario audit requires GPU re-inference, v3 work)." ) L.append("") # ---- Scripted baseline section ---- # L.append("## Scripted baseline (per-scenario, n=" + str(scripted["n_total"]) + ")") L.append("") L.append("Source: [`logs/mode_c_scripted_n135.json`](../logs/mode_c_scripted_n135.json)") L.append("") L.append("### Per-category breakdown") L.append("") L.append("| Category | n | Accuracy | False Positives | False Negatives (missed scams) |") L.append("|---|---|---|---|---|") total_fp = 0 total_fn = 0 for cat, b in scripted["by_category"].items(): total_fp += b["n_fp"] total_fn += b["n_fn"] L.append( f"| `{cat}` | {b['n']} | {b['accuracy']:.3f} " f"| {b['n_fp']} | {b['n_fn']} |" ) L.append(f"| **Total** | **{scripted['n_total']}** | — | **{total_fp}** | **{total_fn}** |") L.append("") L.append("### False-positive scenarios (scripted-baseline)") L.append("") if total_fp == 0: L.append("**None observed in this slice.**") else: L.append("| Category | Scenario | Score | Difficulty |") L.append("|---|---|---|---|") for cat, b in scripted["by_category"].items(): for fp in b["fp_examples"]: L.append( f"| `{cat}` | `{fp['scenario_id']}` " f"| {fp['predicted_score']:.3f} " f"| {fp['difficulty']} |" ) L.append("") L.append("### Missed-scam scenarios (scripted-baseline false negatives)") L.append("") if total_fn == 0: L.append("**None observed in this slice.**") else: L.append("| Category | Scenario | Score | Difficulty |") L.append("|---|---|---|---|") for cat, b in scripted["by_category"].items(): for fn in b["fn_examples"]: L.append( f"| `{cat}` | `{fn['scenario_id']}` " f"| {fn['predicted_score']:.3f} " f"| {fn['difficulty']} |" ) L.append("") # ---- v2 LoRA aggregate section ---- # L.append("## v2 LoRA (aggregate, n=" + str(v2["n_total"]) + ")") L.append("") L.append("Source: [`logs/eval_v2.json`](../logs/eval_v2.json)") L.append("") L.append( f"- Detection rate: **{v2['detection_rate']:.4f}** " f"({v2['n_scams'] - v2['n_missed_scams']}/{v2['n_scams']} scams caught)" ) L.append( f"- False positive rate: **{v2['fpr']:.4f}** " f"({v2['n_false_positives']}/{v2['n_benign']} benign mislabelled)" ) L.append(f"- F1: **{v2['f1']:.4f}**") L.append(f"- Threshold: `{v2['threshold']}`") L.append("") L.append("### Per-difficulty breakdown") L.append("") L.append("| Difficulty | n | Detection | Missed scams |") L.append("|---|---|---|---|") for diff, info in v2["per_difficulty"].items(): L.append( f"| `{diff}` | {info['n']} | {info['detection_rate']:.3f} " f"| {info['n_missed']} |" ) L.append("") L.append("### Why this is aggregate-only") L.append("") L.append( "The v2 evaluation logged aggregate detection/FPR/F1 + per-difficulty " "buckets, but not per-scenario predictions. To audit *which* " f"{v2['n_false_positives']} benign(s) the v2 model misclassified, or " f"*which* {v2['n_missed_scams']} novel scam(s) it missed, requires " "re-running inference with the LoRA adapter on every bench scenario " "and dumping per-row scores. That is a single-GPU, ~30-minute job — " "tracked as v3 work in [`docs/limitations.md`](limitations.md)." ) L.append("") # ---- Cross-comparison + v3 plan ---- # L.append("## Comparison summary") L.append("") L.append("| Metric | Scripted (per-scenario) | v2 LoRA (aggregate) |") L.append("|---|---|---|") scripted_total = scripted["n_total"] scripted_acc = sum(b["n_correct"] for b in scripted["by_category"].values()) / max(scripted_total, 1) L.append( f"| Accuracy / detection | " f"{scripted_acc:.3f} (n={scripted_total}) | " f"{v2['detection_rate']:.3f} det · {v2['fpr']:.3f} FPR (n={v2['n_total']}) |" ) L.append(f"| Total errors | {total_fp + total_fn} | {v2['n_missed_scams'] + v2['n_false_positives']} |") L.append("") L.append("## v3 plan") L.append("") L.append("1. Re-run v2 inference on the bench with per-scenario logging (~30 min on 1× A100).") L.append("2. Manually label each FP / FN root cause: scammer-template overlap, urgency-only signal, multi-language drift, etc.") L.append("3. Add fix-targeted templates to `chakravyuh_env/benign_augmented_v2.json` to push n_benign past 150.") L.append("4. Retrain v2.1 on the expanded corpus, re-eval, repeat audit.") L.append("") return "\n".join(L) + "\n" def main() -> int: parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) parser.add_argument("--scripted-eval", type=Path, default=SCRIPTED) parser.add_argument("--v2-eval", type=Path, default=EVAL_V2) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) args = parser.parse_args() scripted = _scripted_errors(_load_json(args.scripted_eval)) v2 = _v2_aggregate_summary(_load_json(args.v2_eval)) args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(render(scripted, v2)) total_fp = sum(b["n_fp"] for b in scripted["by_category"].values()) total_fn = sum(b["n_fn"] for b in scripted["by_category"].values()) print(f"error analysis: {args.output}") print( f" scripted: n={scripted['n_total']} · " f"FPs={total_fp} · missed scams={total_fn}" ) print( f" v2 LoRA: n={v2['n_total']} · " f"missed scams={v2['n_missed_scams']} · FPs={v2['n_false_positives']}" ) return 0 if __name__ == "__main__": raise SystemExit(main())