Spaces:
Running
Running
| """Mode C evaluation runner: scores any analyzer against chakravyuh-bench-v0. | |
| Usage: | |
| python -m eval.mode_c_real_cases --analyzer scripted | |
| python -m eval.mode_c_real_cases --analyzer scripted --bootstrap 1000 --permutation | |
| python -m eval.mode_c_real_cases --analyzer scripted --output logs/mode_c_results.json | |
| The runner is analyzer-agnostic — it calls `AnalyzerProtocol.score_text(text) -> float` | |
| so any scripted, LLM zero-shot, or LoRA-trained analyzer plugs in via the same | |
| interface. Day-2 LLM analyzer just implements that one method. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import sys | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Protocol | |
| from chakravyuh_env.agents.analyzer import ScriptedAnalyzer | |
| from chakravyuh_env.schemas import AnalyzerScore, ChatMessage, Observation | |
| from eval.bootstrap_ci import bootstrap_ci, cohens_d, permutation_test | |
| logger = logging.getLogger("chakravyuh.eval.mode_c") | |
| DEFAULT_DATASET = Path("data/chakravyuh-bench-v0/scenarios.jsonl") | |
| class AnalyzerProtocol(Protocol): | |
| """Minimal interface any analyzer must implement for Mode C eval.""" | |
| def score_text(self, text: str) -> float: ... | |
| class ScriptedAnalyzerAdapter: | |
| """Adapts the ScriptedAnalyzer.act() output to a simple score.""" | |
| def __init__(self) -> None: | |
| self.inner = ScriptedAnalyzer() | |
| def score_text(self, text: str) -> float: | |
| # Fake a one-turn observation so the rule-based analyzer can score | |
| obs = Observation( | |
| agent_role="analyzer", | |
| turn=1, | |
| chat_history=[ChatMessage(sender="scammer", turn=1, text=text)], | |
| ) | |
| action = self.inner.act(obs) | |
| if isinstance(action, AnalyzerScore): | |
| return action.score | |
| return 0.0 | |
| class EvalResult: | |
| scenario_id: str | |
| is_scam_truth: bool | |
| predicted_score: float | |
| predicted_flag: bool | |
| correct: bool | |
| category: str | |
| difficulty: str | |
| class AggregateMetrics: | |
| n: int | |
| detection_rate: float # scam-only: recall | |
| false_positive_rate: float # benign-only: FP / total_benign | |
| precision: float | |
| recall: float | |
| f1: float | |
| accuracy: float | |
| def load_dataset(path: Path) -> list[dict]: | |
| """Load all 175 scenarios. The bench file is the single source of truth. | |
| Note: ``logs/eval_v2.json`` reports n=174 (n_benign=30), one benign less | |
| than the 31 benigns in scenarios.jsonl. That eval was run during the v2 | |
| LoRA inference batch (Apr 21) where one benign scenario produced a | |
| malformed model output and was excluded from the aggregate. The | |
| asymmetric-improvement direction is unaffected (FPR delta dominates the | |
| one-row difference). See docs/limitations.md → Bench eval n=174. | |
| """ | |
| with open(path, encoding="utf-8") as f: | |
| return [json.loads(line) for line in f if line.strip()] | |
| def run_eval( | |
| analyzer: AnalyzerProtocol, | |
| dataset: list[dict], | |
| threshold: float = 0.5, | |
| ) -> list[EvalResult]: | |
| results: list[EvalResult] = [] | |
| for scenario in dataset: | |
| text = " ".join( | |
| msg["text"] for msg in scenario["attack_sequence"] if msg["sender"] == "scammer" | |
| ) | |
| score = analyzer.score_text(text) | |
| flagged = score >= threshold | |
| is_scam = scenario["ground_truth"]["is_scam"] | |
| results.append( | |
| EvalResult( | |
| scenario_id=scenario["id"], | |
| is_scam_truth=is_scam, | |
| predicted_score=score, | |
| predicted_flag=flagged, | |
| correct=(flagged == is_scam), | |
| category=scenario["ground_truth"]["category"], | |
| difficulty=scenario["ground_truth"]["difficulty"], | |
| ) | |
| ) | |
| return results | |
| def aggregate(results: list[EvalResult]) -> AggregateMetrics: | |
| n = len(results) | |
| if n == 0: | |
| return AggregateMetrics(0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0) | |
| tp = sum(1 for r in results if r.is_scam_truth and r.predicted_flag) | |
| fn = sum(1 for r in results if r.is_scam_truth and not r.predicted_flag) | |
| fp = sum(1 for r in results if not r.is_scam_truth and r.predicted_flag) | |
| tn = sum(1 for r in results if not r.is_scam_truth and not r.predicted_flag) | |
| scam_count = tp + fn | |
| benign_count = fp + tn | |
| detection_rate = tp / scam_count if scam_count else 0.0 | |
| false_positive_rate = fp / benign_count if benign_count else 0.0 | |
| precision = tp / (tp + fp) if (tp + fp) else 0.0 | |
| recall = detection_rate | |
| f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0.0 | |
| accuracy = (tp + tn) / n | |
| return AggregateMetrics( | |
| n=n, | |
| detection_rate=detection_rate, | |
| false_positive_rate=false_positive_rate, | |
| precision=precision, | |
| recall=recall, | |
| f1=f1, | |
| accuracy=accuracy, | |
| ) | |
| def per_category_breakdown(results: list[EvalResult]) -> dict[str, AggregateMetrics]: | |
| by_cat: dict[str, list[EvalResult]] = defaultdict(list) | |
| for r in results: | |
| by_cat[r.category].append(r) | |
| return {cat: aggregate(rs) for cat, rs in sorted(by_cat.items())} | |
| def per_difficulty_breakdown(results: list[EvalResult]) -> dict[str, AggregateMetrics]: | |
| by_diff: dict[str, list[EvalResult]] = defaultdict(list) | |
| for r in results: | |
| by_diff[r.difficulty].append(r) | |
| return {d: aggregate(rs) for d, rs in sorted(by_diff.items())} | |
| def format_report( | |
| analyzer_name: str, | |
| metrics: AggregateMetrics, | |
| by_cat: dict[str, AggregateMetrics], | |
| by_diff: dict[str, AggregateMetrics], | |
| ci_bounds: tuple[float, float] | None = None, | |
| ) -> str: | |
| lines = [ | |
| f"=== Mode C Results: {analyzer_name} ===", | |
| f"N = {metrics.n}", | |
| f"Detection rate (scam recall): {metrics.detection_rate:.1%}", | |
| f"False positive rate: {metrics.false_positive_rate:.1%}", | |
| f"Precision: {metrics.precision:.1%}", | |
| f"F1 score: {metrics.f1:.3f}", | |
| f"Accuracy: {metrics.accuracy:.1%}", | |
| ] | |
| if ci_bounds is not None: | |
| lines.append(f"Detection 95% CI: [{ci_bounds[0]:.1%}, {ci_bounds[1]:.1%}]") | |
| lines.append("") | |
| lines.append("Per-category:") | |
| for cat, m in by_cat.items(): | |
| marker = "[FP]" if cat in ("benign", "borderline") else " " | |
| lines.append(f" {marker} {cat}: det={m.detection_rate:.1%}, fpr={m.false_positive_rate:.1%}, n={m.n}") | |
| lines.append("") | |
| lines.append("Per-difficulty:") | |
| for diff, m in by_diff.items(): | |
| lines.append(f" {diff}: det={m.detection_rate:.1%}, n={m.n}") | |
| return "\n".join(lines) | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser(description="Mode C real-cases evaluator") | |
| parser.add_argument( | |
| "--analyzer", | |
| type=str, | |
| default="scripted", | |
| choices=["scripted", "llm"], | |
| help="Which analyzer to evaluate", | |
| ) | |
| parser.add_argument("--dataset", type=Path, default=DEFAULT_DATASET) | |
| parser.add_argument("--threshold", type=float, default=0.5) | |
| parser.add_argument("--bootstrap", type=int, default=0, help="Bootstrap resamples (0 = off)") | |
| parser.add_argument("--output", type=Path, default=None) | |
| args = parser.parse_args(argv) | |
| logging.basicConfig(level=logging.INFO, format="%(message)s") | |
| dataset = load_dataset(args.dataset) | |
| logger.info("Loaded %d scenarios from %s", len(dataset), args.dataset) | |
| if args.analyzer == "scripted": | |
| analyzer: AnalyzerProtocol = ScriptedAnalyzerAdapter() | |
| name = "ScriptedAnalyzer (rule-based baseline)" | |
| else: | |
| raise NotImplementedError("LLM analyzer will be wired on Day 2") | |
| results = run_eval(analyzer, dataset, threshold=args.threshold) | |
| metrics = aggregate(results) | |
| by_cat = per_category_breakdown(results) | |
| by_diff = per_difficulty_breakdown(results) | |
| ci: tuple[float, float] | None = None | |
| if args.bootstrap > 0: | |
| scam_hits = [1.0 if r.predicted_flag else 0.0 for r in results if r.is_scam_truth] | |
| _, lo, hi = bootstrap_ci(scam_hits, n_resamples=args.bootstrap, seed=42) | |
| ci = (lo, hi) | |
| report = format_report(name, metrics, by_cat, by_diff, ci) | |
| print(report) | |
| if args.output is not None: | |
| args.output.parent.mkdir(parents=True, exist_ok=True) | |
| payload = { | |
| "analyzer": name, | |
| "n": metrics.n, | |
| "detection_rate": metrics.detection_rate, | |
| "false_positive_rate": metrics.false_positive_rate, | |
| "precision": metrics.precision, | |
| "recall": metrics.recall, | |
| "f1": metrics.f1, | |
| "accuracy": metrics.accuracy, | |
| "detection_ci_95": list(ci) if ci else None, | |
| "per_category": {k: v.__dict__ for k, v in by_cat.items()}, | |
| "per_difficulty": {k: v.__dict__ for k, v in by_diff.items()}, | |
| "scenarios": [r.__dict__ for r in results], | |
| } | |
| args.output.write_text(json.dumps(payload, indent=2)) | |
| logger.info("Wrote %s", args.output) | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |
| # Convenience re-exports for stats scripts | |
| __all__ = [ | |
| "AggregateMetrics", | |
| "EvalResult", | |
| "aggregate", | |
| "bootstrap_ci", | |
| "cohens_d", | |
| "load_dataset", | |
| "per_category_breakdown", | |
| "per_difficulty_breakdown", | |
| "permutation_test", | |
| "run_eval", | |
| ] | |