File size: 7,688 Bytes
03815d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""Single-scenario before/after eval — scripted vs LoRA v2.

Runs both analyzers on a single bench scenario and writes a JSON record that
the README's "Before/after" section quotes. The scripted run requires no GPU.
The LoRA run only fires when ``torch.cuda.is_available()`` (or ``--force-llm``)
and the adapter is reachable; otherwise the v2 numbers are cross-referenced
from ``logs/eval_v2.json`` (per-difficulty aggregate) and the JSON marks
``v2.measured_directly = false``.

Operating Principle #1: never fabricate numbers. If we couldn't run v2 on this
specific scenario, we say so and quote the already-measured aggregate it
belongs to.

Usage
-----
    python eval/single_scenario_eval.py \
        --bench data/chakravyuh-bench-v0/scenarios.jsonl \
        --scenario-id modec_106 \
        --eval-aggregate logs/eval_v2.json \
        --output docs/before_after_example.json
"""

from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path
from typing import Any


def _load_scenario(bench: Path, scenario_id: str) -> dict[str, Any]:
    with bench.open() as f:
        for line in f:
            row = json.loads(line)
            if row.get("id") == scenario_id or row.get("scenario_id") == scenario_id:
                return row
    raise SystemExit(f"Scenario {scenario_id!r} not found in {bench}")


def _run_scripted(scenario: dict[str, Any]) -> dict[str, Any]:
    """Score the scenario with the scripted (rule-based) analyzer."""
    from chakravyuh_env.agents.analyzer import ScriptedAnalyzer
    from chakravyuh_env.schemas import ChatMessage, Observation

    chat = [
        ChatMessage(sender=t["sender"], turn=t["turn"], text=t["text"])
        for t in scenario["attack_sequence"]
    ]
    obs = Observation(
        agent_role="analyzer",
        turn=max((m.turn for m in chat), default=1),
        chat_history=chat,
    )
    analyzer = ScriptedAnalyzer(flag_threshold=0.50, seed=42)
    score = analyzer.act(obs)
    return {
        "score": float(score.score),
        "signals": [s.value for s in score.signals],
        "explanation": score.explanation,
        "flagged": float(score.score) >= analyzer.flag_threshold,
        "threshold": analyzer.flag_threshold,
    }


def _try_run_v2(scenario: dict[str, Any], adapter: str) -> dict[str, Any] | None:
    """Run the v2 LoRA on this scenario, or return None if it's not feasible."""
    try:
        import torch  # type: ignore[import-not-found]

        if not torch.cuda.is_available():
            return None
    except Exception:
        return None

    try:
        from chakravyuh_env import get_trained_analyzer

        analyzer = get_trained_analyzer(adapter=adapter)
        first_msg = scenario["attack_sequence"][0]["text"]
        result = analyzer(first_msg)
        return {
            "score": float(result["score"]),
            "signals": list(result["signals"]),
            "explanation": result["explanation"],
            "flagged": float(result["score"]) >= 0.5,
            "threshold": 0.5,
            "measured_directly": True,
            "adapter": adapter,
        }
    except Exception as exc:  # pragma: no cover — diagnostic surface only
        return {"error": f"{type(exc).__name__}: {exc}", "measured_directly": False}


def _v2_aggregate_lookup(eval_path: Path, difficulty: str) -> dict[str, Any]:
    """Pull the v2 per-difficulty number from logs/eval_v2.json as a fallback."""
    data = json.loads(eval_path.read_text())
    block = data.get("lora_v2", {})
    per_diff = block.get("per_difficulty", {}).get(difficulty, {})
    return {
        "measured_directly": False,
        "source": str(eval_path),
        "split": difficulty,
        "n_in_split": int(per_diff.get("n", 0)),
        "detection_rate_in_split": float(per_diff.get("detection_rate", 0.0)),
        "note": (
            f"v2 was not re-run on this single scenario (no GPU available); "
            f"its detection rate on the '{difficulty}' split overall is "
            f"{float(per_diff.get('detection_rate', 0.0)) * 100:.1f}% "
            f"(n={int(per_diff.get('n', 0))}). This scenario is in that split."
        ),
    }


def _build_record(
    scenario: dict[str, Any],
    scripted: dict[str, Any],
    v2: dict[str, Any],
) -> dict[str, Any]:
    gt = scenario.get("ground_truth", {})
    return {
        "scenario": {
            "id": scenario.get("id"),
            "category": gt.get("category"),
            "difficulty": gt.get("difficulty"),
            "is_scam": gt.get("is_scam"),
            "expected_signals": gt.get("signals", []),
            "first_message": scenario["attack_sequence"][0]["text"],
            "source": scenario.get("source", {}),
            "metadata": scenario.get("metadata", {}),
        },
        "scripted": scripted,
        "v2": v2,
        "delta": {
            "scripted_flagged": bool(scripted.get("flagged")),
            "v2_caught_in_split": bool(
                v2.get("flagged")
                if v2.get("measured_directly")
                else v2.get("detection_rate_in_split", 0.0) > 0.5
            ),
            "interpretation": (
                "Scripted analyzer missed this scam (score below threshold). "
                "v2 catches scams in this split at "
                f"{(v2.get('detection_rate_in_split') or (1.0 if v2.get('flagged') else 0.0)) * 100:.0f}% rate."
            )
            if not scripted.get("flagged")
            else (
                "Both scripted and v2 catch this scam, but scripted's score may "
                "be borderline; v2 holds at the bench-wide novel detection rate."
            ),
        },
    }


def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--bench", type=Path, default=Path("data/chakravyuh-bench-v0/scenarios.jsonl"))
    parser.add_argument("--scenario-id", default="modec_106")
    parser.add_argument("--adapter", default="ujjwalpardeshi/chakravyuh-analyzer-lora-v2")
    parser.add_argument("--eval-aggregate", type=Path, default=Path("logs/eval_v2.json"))
    parser.add_argument("--output", type=Path, default=Path("docs/before_after_example.json"))
    parser.add_argument("--force-llm", action="store_true",
                        help="Try to load LoRA even without GPU (slow, may OOM)")
    args = parser.parse_args(argv)

    scenario = _load_scenario(args.bench, args.scenario_id)
    scripted = _run_scripted(scenario)

    v2: dict[str, Any] | None = None
    if args.force_llm or _has_cuda():
        v2 = _try_run_v2(scenario, args.adapter)
    if v2 is None or not v2.get("measured_directly"):
        difficulty = scenario.get("ground_truth", {}).get("difficulty", "unknown")
        v2 = _v2_aggregate_lookup(args.eval_aggregate, difficulty)

    record = _build_record(scenario, scripted, v2)
    args.output.parent.mkdir(parents=True, exist_ok=True)
    args.output.write_text(json.dumps(record, indent=2))
    print(f"Wrote {args.output}")
    print(f"  scripted score:   {scripted['score']:.3f} ({'FLAGGED' if scripted['flagged'] else 'missed'})")
    if v2.get("measured_directly"):
        print(f"  v2 score:         {v2['score']:.3f} ({'FLAGGED' if v2['flagged'] else 'missed'})")
    else:
        print(f"  v2 (aggregate):   detection={v2.get('detection_rate_in_split', 0):.3f} on '{v2.get('split')}' split (n={v2.get('n_in_split', 0)})")
    return 0


def _has_cuda() -> bool:
    try:
        import torch  # type: ignore[import-not-found]

        return bool(torch.cuda.is_available())
    except Exception:
        return False


if __name__ == "__main__":
    sys.exit(main())