"""B.2 — Known (pre-2024) vs Novel (post-2024) detection split. Re-buckets the bench by year and reports detection / FPR for the scripted baseline on each bucket. The v2 LoRA detection rate is **not** re-run here (that requires GPU); instead the v2 number is sourced from ``logs/eval_v2.json``'s `per_difficulty.novel` field as a cross-reference, since the bench's ``difficulty == "novel"`` and ``source.category == "novel_post_2024"`` buckets are by construction the same 34 scenarios. Bucket rule (operate-on-source-fields, no model): - **novel** if ``source.category == "novel_post_2024"`` OR the leading year of ``source.date_range`` is ≥ 2024. - **known** otherwise (scams with year < 2024). - Benign scenarios are kept separate to compute FPR. Output: a JSON record with both bucket sizes, scripted detection per bucket, and the v2 cross-reference. Pinned by tests/test_known_vs_novel_split.py. Usage ----- python eval/known_vs_novel_split.py \ --bench data/chakravyuh-bench-v0/scenarios.jsonl \ --eval-v2 logs/eval_v2.json \ --output logs/eval_v2_known_novel.json """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any from chakravyuh_env.agents.analyzer import ScriptedAnalyzer from chakravyuh_env.schemas import ChatMessage, Observation def _bucket_for(scenario: dict[str, Any]) -> str: """Return ``"novel"``, ``"known"``, or ``"benign"`` for a scenario.""" gt = scenario.get("ground_truth", {}) if not gt.get("is_scam"): return "benign" src = scenario.get("source", {}) if src.get("category") == "novel_post_2024": return "novel" year_str = (src.get("date_range") or "").split("-", 1)[0] try: year = int(year_str) except ValueError: year = 0 return "novel" if year >= 2024 else "known" def _normalize_sender(raw: str) -> str: """ChatMessage only accepts 'scammer' / 'victim'. The bench has some entries with `bank_official` (4 cases) — they are scam-side messages impersonating a bank. Normalise them to ``scammer`` so the analyzer scores them on the chat content as it would in deployment. ``victim`` passes through unchanged. """ return "victim" if raw == "victim" else "scammer" def _score_with_scripted(analyzer: ScriptedAnalyzer, scenario: dict[str, Any]) -> float: chat = [ ChatMessage( sender=_normalize_sender(t["sender"]), turn=t["turn"], text=t["text"], ) for t in scenario["attack_sequence"] ] obs = Observation( agent_role="analyzer", turn=max((m.turn for m in chat), default=1), chat_history=chat, ) return float(analyzer.act(obs).score) def compute_split( bench_path: Path, eval_v2_path: Path | None, threshold: float = 0.50, ) -> dict[str, Any]: with bench_path.open() as f: rows = [json.loads(line) for line in f if line.strip()] analyzer = ScriptedAnalyzer(flag_threshold=threshold, seed=42) buckets: dict[str, list[float]] = {"known": [], "novel": [], "benign": []} for row in rows: bucket = _bucket_for(row) score = _score_with_scripted(analyzer, row) buckets[bucket].append(score) def _detection(scores: list[float]) -> float: if not scores: return 0.0 return sum(1 for s in scores if s >= threshold) / len(scores) def _fpr(scores: list[float]) -> float: if not scores: return 0.0 return sum(1 for s in scores if s >= threshold) / len(scores) out: dict[str, Any] = { "_meta": { "bench": str(bench_path), "threshold": threshold, "method": "scripted-rule baseline; v2 cross-reference from logs/eval_v2.json", "rule": "novel = source.category=='novel_post_2024' OR source.date_range year >= 2024", }, "scripted": { "known": { "n": len(buckets["known"]), "detection_rate": _detection(buckets["known"]), }, "novel": { "n": len(buckets["novel"]), "detection_rate": _detection(buckets["novel"]), }, "benign": { "n": len(buckets["benign"]), "fpr": _fpr(buckets["benign"]), }, }, } if eval_v2_path and eval_v2_path.exists(): eval_v2 = json.loads(eval_v2_path.read_text()) per_diff = eval_v2.get("lora_v2", {}).get("per_difficulty", {}) novel_block = per_diff.get("novel", {}) # All non-novel difficulties together stand in for "known". known_n = sum(int(per_diff[k]["n"]) for k in ("easy", "medium", "hard") if k in per_diff) known_caught = sum( int(per_diff[k]["n"]) * float(per_diff[k]["detection_rate"]) for k in ("easy", "medium", "hard") if k in per_diff ) out["v2_crossref"] = { "source": str(eval_v2_path), "novel": { "n": int(novel_block.get("n", 0)), "detection_rate": float(novel_block.get("detection_rate", 0.0)), }, "known": { "n": known_n, "detection_rate": known_caught / known_n if known_n else 0.0, "note": "easy + medium + hard buckets aggregated as a known-scam proxy", }, } sk = out["scripted"]["known"]["detection_rate"] sn = out["scripted"]["novel"]["detection_rate"] out["headline_gap_pp"] = round((sk - sn) * 100, 1) return out def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--bench", type=Path, default=Path("data/chakravyuh-bench-v0/scenarios.jsonl")) parser.add_argument("--eval-v2", type=Path, default=Path("logs/eval_v2.json")) parser.add_argument("--threshold", type=float, default=0.50) parser.add_argument("--output", type=Path, default=Path("logs/eval_v2_known_novel.json")) args = parser.parse_args(argv) record = compute_split(args.bench, args.eval_v2, args.threshold) args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(record, indent=2)) s = record["scripted"] print(f"Wrote {args.output}") print(f" scripted known : {s['known']['detection_rate']:.3f} (n={s['known']['n']})") print(f" scripted novel : {s['novel']['detection_rate']:.3f} (n={s['novel']['n']})") print(f" scripted benign : FPR={s['benign']['fpr']:.3f} (n={s['benign']['n']})") print(f" scripted gap : {record['headline_gap_pp']} pp (known − novel)") if "v2_crossref" in record: v = record["v2_crossref"] print(f" v2 crossref known: {v['known']['detection_rate']:.3f} (n={v['known']['n']})") print(f" v2 crossref novel: {v['novel']['detection_rate']:.3f} (n={v['novel']['n']})") return 0 if __name__ == "__main__": sys.exit(main())