chakravyuh / eval /error_analysis.py
UjjwalPardeshi
deploy: latest main to HF Space
03815d6
"""Error analysis — full audit of scripted-baseline FPs + v2 LoRA aggregate gaps.
Two parts:
1. **Scripted baseline (per-scenario)** — read ``logs/mode_c_scripted_n135.json``
and enumerate every FP, FN, and missed scam, with category and difficulty
tags. Full per-scenario fidelity.
2. **v2 LoRA (aggregate-level only)** — read ``logs/eval_v2.json`` and report
per-difficulty error counts. Per-scenario v2 audit requires GPU re-inference
and is v3 work.
Output: ``docs/v2_error_analysis.md`` (markdown for judges).
Usage:
python eval/error_analysis.py
python eval/error_analysis.py --output docs/v2_error_analysis.md
"""
from __future__ import annotations
import argparse
import json
from collections import defaultdict
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
SCRIPTED = REPO_ROOT / "logs" / "mode_c_scripted_n135.json"
EVAL_V2 = REPO_ROOT / "logs" / "eval_v2.json"
DEFAULT_OUTPUT = REPO_ROOT / "docs" / "v2_error_analysis.md"
def _load_json(path: Path) -> dict:
if not path.exists():
raise SystemExit(f"missing input: {path}")
return json.loads(path.read_text())
def _scripted_errors(data: dict) -> dict:
scenarios = data.get("scenarios", [])
by_category: dict[str, dict] = defaultdict(
lambda: {"n": 0, "n_correct": 0, "fp": [], "fn": []}
)
for s in scenarios:
cat = s.get("category", "unknown")
truth = bool(s.get("is_scam_truth", False))
flag = bool(s.get("predicted_flag", False))
b = by_category[cat]
b["n"] += 1
if truth == flag:
b["n_correct"] += 1
if truth and not flag:
b["fn"].append(s)
elif (not truth) and flag:
b["fp"].append(s)
summary = {
"n_total": len(scenarios),
"by_category": {
cat: {
"n": b["n"],
"n_correct": b["n_correct"],
"accuracy": round(b["n_correct"] / b["n"], 4) if b["n"] else 0.0,
"n_fp": len(b["fp"]),
"n_fn": len(b["fn"]),
"fp_examples": [
{
"scenario_id": s.get("scenario_id"),
"predicted_score": s.get("predicted_score"),
"difficulty": s.get("difficulty"),
}
for s in b["fp"]
],
"fn_examples": [
{
"scenario_id": s.get("scenario_id"),
"predicted_score": s.get("predicted_score"),
"difficulty": s.get("difficulty"),
}
for s in b["fn"]
],
}
for cat, b in sorted(by_category.items())
},
}
return summary
def _v2_aggregate_summary(eval_data: dict) -> dict:
block = eval_data.get("lora_v2", {})
n = int(block.get("n", 0))
detection = float(block.get("detection", 0.0))
fpr = float(block.get("fpr", 0.0))
per_diff = block.get("per_difficulty", {})
n_scams = sum(int(v.get("n", 0)) for v in per_diff.values())
n_benign = max(n - n_scams, 0)
n_missed_scams = round((1 - detection) * n_scams)
n_fps = round(fpr * n_benign)
return {
"n_total": n,
"n_scams": n_scams,
"n_benign": n_benign,
"n_missed_scams": n_missed_scams,
"n_false_positives": n_fps,
"detection_rate": round(detection, 4),
"fpr": round(fpr, 4),
"f1": float(block.get("f1", 0.0)),
"per_difficulty": {
diff: {
"n": int(info.get("n", 0)),
"detection_rate": float(info.get("detection_rate", 0.0)),
"n_missed": round(int(info.get("n", 0)) * (1 - float(info.get("detection_rate", 0.0)))),
}
for diff, info in sorted(per_diff.items())
},
"threshold": float(block.get("threshold", 0.5)),
}
def render(scripted: dict, v2: dict) -> str:
L: list[str] = []
L.append("# v2 Error Analysis")
L.append("")
L.append(
"Honest accounting of where the analyzers fail. Two layers: "
"**scripted baseline** has full per-scenario detail; **v2 LoRA** is "
"aggregated (per-scenario audit requires GPU re-inference, v3 work)."
)
L.append("")
# ---- Scripted baseline section ---- #
L.append("## Scripted baseline (per-scenario, n=" + str(scripted["n_total"]) + ")")
L.append("")
L.append("Source: [`logs/mode_c_scripted_n135.json`](../logs/mode_c_scripted_n135.json)")
L.append("")
L.append("### Per-category breakdown")
L.append("")
L.append("| Category | n | Accuracy | False Positives | False Negatives (missed scams) |")
L.append("|---|---|---|---|---|")
total_fp = 0
total_fn = 0
for cat, b in scripted["by_category"].items():
total_fp += b["n_fp"]
total_fn += b["n_fn"]
L.append(
f"| `{cat}` | {b['n']} | {b['accuracy']:.3f} "
f"| {b['n_fp']} | {b['n_fn']} |"
)
L.append(f"| **Total** | **{scripted['n_total']}** | — | **{total_fp}** | **{total_fn}** |")
L.append("")
L.append("### False-positive scenarios (scripted-baseline)")
L.append("")
if total_fp == 0:
L.append("**None observed in this slice.**")
else:
L.append("| Category | Scenario | Score | Difficulty |")
L.append("|---|---|---|---|")
for cat, b in scripted["by_category"].items():
for fp in b["fp_examples"]:
L.append(
f"| `{cat}` | `{fp['scenario_id']}` "
f"| {fp['predicted_score']:.3f} "
f"| {fp['difficulty']} |"
)
L.append("")
L.append("### Missed-scam scenarios (scripted-baseline false negatives)")
L.append("")
if total_fn == 0:
L.append("**None observed in this slice.**")
else:
L.append("| Category | Scenario | Score | Difficulty |")
L.append("|---|---|---|---|")
for cat, b in scripted["by_category"].items():
for fn in b["fn_examples"]:
L.append(
f"| `{cat}` | `{fn['scenario_id']}` "
f"| {fn['predicted_score']:.3f} "
f"| {fn['difficulty']} |"
)
L.append("")
# ---- v2 LoRA aggregate section ---- #
L.append("## v2 LoRA (aggregate, n=" + str(v2["n_total"]) + ")")
L.append("")
L.append("Source: [`logs/eval_v2.json`](../logs/eval_v2.json)")
L.append("")
L.append(
f"- Detection rate: **{v2['detection_rate']:.4f}** "
f"({v2['n_scams'] - v2['n_missed_scams']}/{v2['n_scams']} scams caught)"
)
L.append(
f"- False positive rate: **{v2['fpr']:.4f}** "
f"({v2['n_false_positives']}/{v2['n_benign']} benign mislabelled)"
)
L.append(f"- F1: **{v2['f1']:.4f}**")
L.append(f"- Threshold: `{v2['threshold']}`")
L.append("")
L.append("### Per-difficulty breakdown")
L.append("")
L.append("| Difficulty | n | Detection | Missed scams |")
L.append("|---|---|---|---|")
for diff, info in v2["per_difficulty"].items():
L.append(
f"| `{diff}` | {info['n']} | {info['detection_rate']:.3f} "
f"| {info['n_missed']} |"
)
L.append("")
L.append("### Why this is aggregate-only")
L.append("")
L.append(
"The v2 evaluation logged aggregate detection/FPR/F1 + per-difficulty "
"buckets, but not per-scenario predictions. To audit *which* "
f"{v2['n_false_positives']} benign(s) the v2 model misclassified, or "
f"*which* {v2['n_missed_scams']} novel scam(s) it missed, requires "
"re-running inference with the LoRA adapter on every bench scenario "
"and dumping per-row scores. That is a single-GPU, ~30-minute job — "
"tracked as v3 work in [`docs/limitations.md`](limitations.md)."
)
L.append("")
# ---- Cross-comparison + v3 plan ---- #
L.append("## Comparison summary")
L.append("")
L.append("| Metric | Scripted (per-scenario) | v2 LoRA (aggregate) |")
L.append("|---|---|---|")
scripted_total = scripted["n_total"]
scripted_acc = sum(b["n_correct"] for b in scripted["by_category"].values()) / max(scripted_total, 1)
L.append(
f"| Accuracy / detection | "
f"{scripted_acc:.3f} (n={scripted_total}) | "
f"{v2['detection_rate']:.3f} det · {v2['fpr']:.3f} FPR (n={v2['n_total']}) |"
)
L.append(f"| Total errors | {total_fp + total_fn} | {v2['n_missed_scams'] + v2['n_false_positives']} |")
L.append("")
L.append("## v3 plan")
L.append("")
L.append("1. Re-run v2 inference on the bench with per-scenario logging (~30 min on 1× A100).")
L.append("2. Manually label each FP / FN root cause: scammer-template overlap, urgency-only signal, multi-language drift, etc.")
L.append("3. Add fix-targeted templates to `chakravyuh_env/benign_augmented_v2.json` to push n_benign past 150.")
L.append("4. Retrain v2.1 on the expanded corpus, re-eval, repeat audit.")
L.append("")
return "\n".join(L) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.split("\n")[0])
parser.add_argument("--scripted-eval", type=Path, default=SCRIPTED)
parser.add_argument("--v2-eval", type=Path, default=EVAL_V2)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
args = parser.parse_args()
scripted = _scripted_errors(_load_json(args.scripted_eval))
v2 = _v2_aggregate_summary(_load_json(args.v2_eval))
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(render(scripted, v2))
total_fp = sum(b["n_fp"] for b in scripted["by_category"].values())
total_fn = sum(b["n_fn"] for b in scripted["by_category"].values())
print(f"error analysis: {args.output}")
print(
f" scripted: n={scripted['n_total']} · "
f"FPs={total_fp} · missed scams={total_fn}"
)
print(
f" v2 LoRA: n={v2['n_total']} · "
f"missed scams={v2['n_missed_scams']} · FPs={v2['n_false_positives']}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())