Spaces:
Running
Running
| """Single-scenario before/after eval — scripted vs LoRA v2. | |
| Runs both analyzers on a single bench scenario and writes a JSON record that | |
| the README's "Before/after" section quotes. The scripted run requires no GPU. | |
| The LoRA run only fires when ``torch.cuda.is_available()`` (or ``--force-llm``) | |
| and the adapter is reachable; otherwise the v2 numbers are cross-referenced | |
| from ``logs/eval_v2.json`` (per-difficulty aggregate) and the JSON marks | |
| ``v2.measured_directly = false``. | |
| Operating Principle #1: never fabricate numbers. If we couldn't run v2 on this | |
| specific scenario, we say so and quote the already-measured aggregate it | |
| belongs to. | |
| Usage | |
| ----- | |
| python eval/single_scenario_eval.py \ | |
| --bench data/chakravyuh-bench-v0/scenarios.jsonl \ | |
| --scenario-id modec_106 \ | |
| --eval-aggregate logs/eval_v2.json \ | |
| --output docs/before_after_example.json | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Any | |
| def _load_scenario(bench: Path, scenario_id: str) -> dict[str, Any]: | |
| with bench.open() as f: | |
| for line in f: | |
| row = json.loads(line) | |
| if row.get("id") == scenario_id or row.get("scenario_id") == scenario_id: | |
| return row | |
| raise SystemExit(f"Scenario {scenario_id!r} not found in {bench}") | |
| def _run_scripted(scenario: dict[str, Any]) -> dict[str, Any]: | |
| """Score the scenario with the scripted (rule-based) analyzer.""" | |
| from chakravyuh_env.agents.analyzer import ScriptedAnalyzer | |
| from chakravyuh_env.schemas import ChatMessage, Observation | |
| chat = [ | |
| ChatMessage(sender=t["sender"], turn=t["turn"], text=t["text"]) | |
| for t in scenario["attack_sequence"] | |
| ] | |
| obs = Observation( | |
| agent_role="analyzer", | |
| turn=max((m.turn for m in chat), default=1), | |
| chat_history=chat, | |
| ) | |
| analyzer = ScriptedAnalyzer(flag_threshold=0.50, seed=42) | |
| score = analyzer.act(obs) | |
| return { | |
| "score": float(score.score), | |
| "signals": [s.value for s in score.signals], | |
| "explanation": score.explanation, | |
| "flagged": float(score.score) >= analyzer.flag_threshold, | |
| "threshold": analyzer.flag_threshold, | |
| } | |
| def _try_run_v2(scenario: dict[str, Any], adapter: str) -> dict[str, Any] | None: | |
| """Run the v2 LoRA on this scenario, or return None if it's not feasible.""" | |
| try: | |
| import torch # type: ignore[import-not-found] | |
| if not torch.cuda.is_available(): | |
| return None | |
| except Exception: | |
| return None | |
| try: | |
| from chakravyuh_env import get_trained_analyzer | |
| analyzer = get_trained_analyzer(adapter=adapter) | |
| first_msg = scenario["attack_sequence"][0]["text"] | |
| result = analyzer(first_msg) | |
| return { | |
| "score": float(result["score"]), | |
| "signals": list(result["signals"]), | |
| "explanation": result["explanation"], | |
| "flagged": float(result["score"]) >= 0.5, | |
| "threshold": 0.5, | |
| "measured_directly": True, | |
| "adapter": adapter, | |
| } | |
| except Exception as exc: # pragma: no cover — diagnostic surface only | |
| return {"error": f"{type(exc).__name__}: {exc}", "measured_directly": False} | |
| def _v2_aggregate_lookup(eval_path: Path, difficulty: str) -> dict[str, Any]: | |
| """Pull the v2 per-difficulty number from logs/eval_v2.json as a fallback.""" | |
| data = json.loads(eval_path.read_text()) | |
| block = data.get("lora_v2", {}) | |
| per_diff = block.get("per_difficulty", {}).get(difficulty, {}) | |
| return { | |
| "measured_directly": False, | |
| "source": str(eval_path), | |
| "split": difficulty, | |
| "n_in_split": int(per_diff.get("n", 0)), | |
| "detection_rate_in_split": float(per_diff.get("detection_rate", 0.0)), | |
| "note": ( | |
| f"v2 was not re-run on this single scenario (no GPU available); " | |
| f"its detection rate on the '{difficulty}' split overall is " | |
| f"{float(per_diff.get('detection_rate', 0.0)) * 100:.1f}% " | |
| f"(n={int(per_diff.get('n', 0))}). This scenario is in that split." | |
| ), | |
| } | |
| def _build_record( | |
| scenario: dict[str, Any], | |
| scripted: dict[str, Any], | |
| v2: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| gt = scenario.get("ground_truth", {}) | |
| return { | |
| "scenario": { | |
| "id": scenario.get("id"), | |
| "category": gt.get("category"), | |
| "difficulty": gt.get("difficulty"), | |
| "is_scam": gt.get("is_scam"), | |
| "expected_signals": gt.get("signals", []), | |
| "first_message": scenario["attack_sequence"][0]["text"], | |
| "source": scenario.get("source", {}), | |
| "metadata": scenario.get("metadata", {}), | |
| }, | |
| "scripted": scripted, | |
| "v2": v2, | |
| "delta": { | |
| "scripted_flagged": bool(scripted.get("flagged")), | |
| "v2_caught_in_split": bool( | |
| v2.get("flagged") | |
| if v2.get("measured_directly") | |
| else v2.get("detection_rate_in_split", 0.0) > 0.5 | |
| ), | |
| "interpretation": ( | |
| "Scripted analyzer missed this scam (score below threshold). " | |
| "v2 catches scams in this split at " | |
| f"{(v2.get('detection_rate_in_split') or (1.0 if v2.get('flagged') else 0.0)) * 100:.0f}% rate." | |
| ) | |
| if not scripted.get("flagged") | |
| else ( | |
| "Both scripted and v2 catch this scam, but scripted's score may " | |
| "be borderline; v2 holds at the bench-wide novel detection rate." | |
| ), | |
| }, | |
| } | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("--bench", type=Path, default=Path("data/chakravyuh-bench-v0/scenarios.jsonl")) | |
| parser.add_argument("--scenario-id", default="modec_106") | |
| parser.add_argument("--adapter", default="ujjwalpardeshi/chakravyuh-analyzer-lora-v2") | |
| parser.add_argument("--eval-aggregate", type=Path, default=Path("logs/eval_v2.json")) | |
| parser.add_argument("--output", type=Path, default=Path("docs/before_after_example.json")) | |
| parser.add_argument("--force-llm", action="store_true", | |
| help="Try to load LoRA even without GPU (slow, may OOM)") | |
| args = parser.parse_args(argv) | |
| scenario = _load_scenario(args.bench, args.scenario_id) | |
| scripted = _run_scripted(scenario) | |
| v2: dict[str, Any] | None = None | |
| if args.force_llm or _has_cuda(): | |
| v2 = _try_run_v2(scenario, args.adapter) | |
| if v2 is None or not v2.get("measured_directly"): | |
| difficulty = scenario.get("ground_truth", {}).get("difficulty", "unknown") | |
| v2 = _v2_aggregate_lookup(args.eval_aggregate, difficulty) | |
| record = _build_record(scenario, scripted, v2) | |
| args.output.parent.mkdir(parents=True, exist_ok=True) | |
| args.output.write_text(json.dumps(record, indent=2)) | |
| print(f"Wrote {args.output}") | |
| print(f" scripted score: {scripted['score']:.3f} ({'FLAGGED' if scripted['flagged'] else 'missed'})") | |
| if v2.get("measured_directly"): | |
| print(f" v2 score: {v2['score']:.3f} ({'FLAGGED' if v2['flagged'] else 'missed'})") | |
| else: | |
| print(f" v2 (aggregate): detection={v2.get('detection_rate_in_split', 0):.3f} on '{v2.get('split')}' split (n={v2.get('n_in_split', 0)})") | |
| return 0 | |
| def _has_cuda() -> bool: | |
| try: | |
| import torch # type: ignore[import-not-found] | |
| return bool(torch.cuda.is_available()) | |
| except Exception: | |
| return False | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |