"""Live-backed benchmark cases for vulnerability triage.""" from __future__ import annotations from dataclasses import dataclass, field from functools import lru_cache import json from pathlib import Path import random from typing import Dict, List, Optional import requests OSV_VULN_URL = "https://api.osv.dev/v1/vulns/{osv_id}" NVD_CVE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" EPSS_URL = "https://api.first.org/data/v1/epss" SNAPSHOT_DIR = Path(__file__).resolve().parent.parent / "data" / "snapshots" @dataclass(frozen=True) class GroundTruth: validity: str affected_package: str affected_versions: str severity: str exploitability: str next_action: str missing_information: List[str] = field(default_factory=list) supporting_evidence_ids: List[str] = field(default_factory=list) @dataclass(frozen=True) class CaseDefinition: task_id: str difficulty: str title: str objective: str report_summary: str max_steps: int evidence: List[Dict[str, str]] truth: GroundTruth @dataclass(frozen=True) class RuntimeCaseSeed: task_id: str difficulty: str title: str objective: str max_steps: int osv_id: str next_action: str fallback_snapshot: Dict[str, object] missing_information: List[str] = field(default_factory=list) # When set, completely replaces the auto-computed ground truth. # Use this to encode scenarios that require non-obvious reasoning # (e.g. next_action=request_info when no patch exists). truth_override: Optional[Dict[str, object]] = None # Extra evidence items injected after the auto-built ones. # Use this to add contradictory or ambiguous signals. extra_evidence: List[Dict[str, str]] = field(default_factory=list) def _load_snapshot_file(osv_id: str) -> Optional[Dict[str, object]]: path = SNAPSHOT_DIR / f"{osv_id}.json" if not path.exists(): return None return json.loads(path.read_text()) def _normalize_text(value: Optional[str]) -> str: return " ".join((value or "").strip().split()) def _shorten(text: str, limit: int = 280) -> str: text = _normalize_text(text) if len(text) <= limit: return text return text[: limit - 3].rstrip() + "..." def _severity_band(snapshot: Dict[str, object]) -> str: severity = _normalize_text(str(snapshot.get("severity", ""))).lower() mapping = { "none": "low", "low": "low", "medium": "medium", "moderate": "medium", "high": "high", "critical": "critical", } return mapping.get(severity, "medium") def _exploitability_band(snapshot: Dict[str, object]) -> str: percentile = float(snapshot.get("epss_percentile", 0.0) or 0.0) if percentile >= 0.9: return "high" if percentile >= 0.6: return "medium" return "low" def _range_string(ranges: List[Dict[str, object]]) -> str: normalized: List[str] = [] for range_item in ranges: if range_item.get("type") != "ECOSYSTEM": continue introduced: Optional[str] = None fixed: Optional[str] = None last: Optional[str] = None for event in range_item.get("events", []): if "introduced" in event: introduced = str(event["introduced"]) if "last_affected" in event: last = str(event["last_affected"]) if "fixed" in event: fixed = str(event["fixed"]) if introduced in (None, "0") and fixed: normalized.append(f"<{fixed}") elif introduced and fixed: normalized.append(f">={introduced},<{fixed}") elif introduced and last: normalized.append(f">={introduced},<={last}") elif introduced: normalized.append(f">={introduced}") return " ; ".join(normalized) or "unknown" def _all_affected_versions(snapshot: Dict[str, object]) -> str: """Collect version ranges from every affected block for the primary package. OSV advisories sometimes split a single package across multiple affected blocks (one per release branch). Joining them all gives a complete and accurate truth value instead of just the first branch. """ package_name = _extract_package(snapshot) all_ranges: List[str] = [] for block in snapshot.get("affected", []): pkg = block.get("package", {}) if str(pkg.get("name", "")) == package_name: rs = _range_string(block.get("ranges", [])) if rs and rs != "unknown": all_ranges.append(rs) return " ; ".join(all_ranges) if all_ranges else "unknown" def _extract_cve_id(snapshot: Dict[str, object]) -> Optional[str]: for alias in snapshot.get("aliases", []): alias_text = str(alias) if alias_text.startswith("CVE-"): return alias_text return None def _extract_package(snapshot: Dict[str, object]) -> str: affected = snapshot.get("affected", []) if not affected: return "" package = affected[0].get("package", {}) return str(package.get("name", "")) def _build_report_summary(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> str: package = _extract_package(snapshot) versions = _range_string(snapshot.get("affected", [{}])[0].get("ranges", [])) if snapshot.get("affected") else "unknown" details = _shorten(str(snapshot.get("details") or snapshot.get("summary") or "")) return ( f"{package} vulnerability triage case sourced from {seed.osv_id}. " f"Affected versions: {versions}. {details}" ) def _build_evidence(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> List[Dict[str, str]]: cve_id = _extract_cve_id(snapshot) or "unknown" package = _extract_package(snapshot) # Use all affected blocks so multi-branch advisories are fully represented affected_versions = _all_affected_versions(snapshot) fix_refs = [ ref["url"] for ref in snapshot.get("references", []) if ref.get("type") in {"FIX", "ADVISORY", "WEB"} ][:3] evidence = [ { "evidence_id": "osv_advisory", "title": "OSV advisory", "kind": "advisory", "summary": _shorten( str(snapshot.get("summary") or snapshot.get("details") or "") ), }, { "evidence_id": "affected_versions", "title": "Affected versions", "kind": "versions", "summary": ( f"OSV lists {package} as affected in these ranges: {affected_versions}." ), }, { "evidence_id": "nvd_assessment", "title": "NVD assessment", "kind": "severity", "summary": ( f"NVD CVSS Vector: {snapshot.get('cvss_vector', 'Not Available')} \n" f"{_shorten(str(snapshot.get('nvd_description', '')), 220)}" ), }, { "evidence_id": "epss_signal", "title": "EPSS signal", "kind": "exploitability", "summary": ( f"EPSS score: {snapshot.get('epss_score', 0.0):.6f}, " f"percentile: {snapshot.get('epss_percentile', 0.0):.3f}" ), }, ] if fix_refs: evidence.append( { "evidence_id": "fix_reference", "title": "Fix and advisory references", "kind": "reference", "summary": "Relevant upstream references: " + ", ".join(fix_refs), } ) # Append any task-specific extra evidence items (e.g. contradictory signals) evidence.extend(seed.extra_evidence) return evidence def _build_truth(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> GroundTruth: # truth_override lets a seed encode non-obvious ground truth # (e.g. next_action=request_info when no patch exists yet) if seed.truth_override is not None: override = dict(seed.truth_override) # Always merge seed-level missing_information into the override so the # grader's 10% weight stays meaningful if "missing_information" not in override: override["missing_information"] = list(seed.missing_information) return GroundTruth(**override) return GroundTruth( validity="valid", affected_package=_extract_package(snapshot), # Collect ranges from ALL affected blocks for completeness affected_versions=_all_affected_versions(snapshot), severity=_severity_band(snapshot), exploitability=_exploitability_band(snapshot), next_action=seed.next_action, # Per-task missing information declared on the seed missing_information=list(seed.missing_information), supporting_evidence_ids=[ "osv_advisory", "affected_versions", "nvd_assessment", "epss_signal", ], ) def _build_case(seed: RuntimeCaseSeed, snapshot: Dict[str, object]) -> CaseDefinition: return CaseDefinition( task_id=seed.task_id, difficulty=seed.difficulty, title=seed.title, objective=seed.objective, report_summary=_build_report_summary(seed, snapshot), max_steps=seed.max_steps, evidence=_build_evidence(seed, snapshot), truth=_build_truth(seed, snapshot), ) def _fetch_json(url: str, *, params: Optional[Dict[str, str]] = None) -> Dict[str, object]: response = requests.get(url, params=params, timeout=12) response.raise_for_status() return response.json() def _fetch_live_snapshot(seed: RuntimeCaseSeed) -> Dict[str, object]: osv = _fetch_json(OSV_VULN_URL.format(osv_id=seed.osv_id)) cve_id = _extract_cve_id(osv) snapshot: Dict[str, object] = { "id": osv.get("id"), "summary": osv.get("summary"), "details": osv.get("details"), "aliases": osv.get("aliases", []), "references": osv.get("references", []), "affected": osv.get("affected", []), } if cve_id: nvd = _fetch_json(NVD_CVE_URL, params={"cveId": cve_id}) vulnerability = (nvd.get("vulnerabilities") or [{}])[0].get("cve", {}) metrics = vulnerability.get("metrics", {}) severity: Optional[str] = None for key in ("cvssMetricV40", "cvssMetricV31", "cvssMetricV30", "cvssMetricV2"): if key in metrics: item = metrics[key][0] severity = ( item.get("cvssData", {}).get("baseSeverity") or item.get("baseSeverity") ) if severity: break descriptions = vulnerability.get("descriptions", []) nvd_description = next( ( desc.get("value", "") for desc in descriptions if desc.get("lang") == "en" ), descriptions[0].get("value", "") if descriptions else "", ) snapshot["severity"] = severity or snapshot.get("severity", "medium") snapshot["nvd_description"] = nvd_description epss = _fetch_json(EPSS_URL, params={"cve": cve_id}) epss_item = (epss.get("data") or [{}])[0] snapshot["epss_score"] = float(epss_item.get("epss", 0.0) or 0.0) snapshot["epss_percentile"] = float( epss_item.get("percentile", 0.0) or 0.0 ) else: snapshot["severity"] = "medium" snapshot["nvd_description"] = "" snapshot["epss_score"] = 0.0 snapshot["epss_percentile"] = 0.0 return snapshot SEEDS: Dict[str, RuntimeCaseSeed] = { # ------------------------------------------------------------------ # EASY — Direct evidence reading, tight step budget # # Agent skill tested: can you read a clear advisory and map it to # the right fields quickly? Only two evidence items are needed # (the OSV advisory + affected versions). The NVD and EPSS evidence # exist but provide no extra signal — a capable agent doesn't waste # steps on them. Max 10 steps forces efficiency. # ------------------------------------------------------------------ "task_easy_guarddog": RuntimeCaseSeed( task_id="task_easy_guarddog", difficulty="easy", title="GuardDog Path Traversal", objective=( "Validate the advisory, identify the affected package and version range, " "estimate severity and exploitability, and decide the correct maintainer action. " "All necessary information is available in the OSV advisory and version evidence." ), max_steps=10, osv_id="PYSEC-2022-42994", next_action="patch", truth_override={ "validity": "valid", "affected_package": "guarddog", "affected_versions": "<0.1.5", "severity": "medium", "exploitability": "low", "next_action": "patch", "missing_information": [], # Only 2 evidence items needed — the advisory is self-contained "supporting_evidence_ids": ["osv_advisory", "affected_versions"], }, extra_evidence=[ { "evidence_id": "decoy_threat_intel", "title": "Threat Intel: GuardCat", "kind": "exploitability", "summary": "Active exploitation in the wild observed for the 'GuardCat' node.js package. Do not confuse with python guarddog.", }, { "evidence_id": "decoy_nvd_unrelated", "title": "NVD: CVE-2021-99999", "kind": "severity", "summary": "CRITICAL 9.8 vulnerability in GuardDog-Enterprise. This is a licensed product and does not apply to the open source guarddog package.", } ], fallback_snapshot={ "id": "PYSEC-2022-42994", "details": ( "GuardDog is a CLI tool to identify malicious PyPI packages. Versions prior " "to 0.1.5 are vulnerable to Relative Path Traversal when scanning a " "specially-crafted local PyPI package. This issue is patched in version 0.1.5. " "This is explicitly rated as a Medium severity issue with inherently Low exploitability." ), "aliases": ["CVE-2022-23531", "GHSA-rp2v-v467-q9vq"], "references": [ {"type": "WEB", "url": "https://github.com/DataDog/guarddog/releases/tag/v0.1.5"}, {"type": "ADVISORY", "url": "https://github.com/DataDog/guarddog/security/advisories/GHSA-rp2v-v467-q9vq"}, {"type": "FIX", "url": "https://github.com/DataDog/guarddog/pull/89/commits/a56aff58264cb6b7855d71b00dc10c39a5dbd306"}, ], "affected": [ { "package": {"name": "guarddog", "ecosystem": "PyPI"}, "ranges": [ { "type": "ECOSYSTEM", "events": [{"introduced": "0"}, {"fixed": "0.1.5"}], } ], } ], "cvss_vector": "CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:N/A:N", "nvd_description": ( "GuardDog versions prior to 0.1.5 are vulnerable to relative path traversal " "when scanning a specially-crafted local PyPI package." ), "epss_score": 0.00152, "epss_percentile": 0.36042, }, ), # ------------------------------------------------------------------ # MEDIUM — Conflicting signal resolution, multi-branch versions # # Agent skill tested: can you weigh contradictory evidence? The # EPSS percentile (0.43) maps to "low" exploitability by the formula, # but an injected threat-intel evidence item reports real-world active # probing. The correct answer is "medium" exploitability because # independent field evidence overrides a lagging statistical signal. # All four auto-built evidence items PLUS the threat_intel_signal are # needed — a model that submits after reading only EPSS will be wrong. # ------------------------------------------------------------------ "task_medium_invenio": RuntimeCaseSeed( task_id="task_medium_invenio", difficulty="medium", title="Invenio Multi-Branch XSS", objective=( "Resolve affected versions across multiple maintained release lines, weigh " "a conflicting exploitability signal, and choose the correct advisory workflow. " "The EPSS percentile and the threat-intelligence report disagree — inspect both " "before deciding on exploitability." ), max_steps=14, osv_id="GHSA-vxh3-mvv7-265j", next_action="publish_advisory", truth_override={ "validity": "valid", "affected_package": "invenio-records", "affected_versions": "<1.0.2 ; >=1.1.0,<1.1.1 ; >=1.2.0,<1.2.2", "severity": "medium", # KEY: EPSS alone says "low" (0.43 percentile) but the injected # threat-intel evidence documents active real-world probing. # A model that reads only EPSS will score 0 on exploitability. "exploitability": "medium", "next_action": "publish_advisory", "missing_information": [], "supporting_evidence_ids": [ "osv_advisory", "affected_versions", "nvd_assessment", "threat_intel_signal", "github_commit_diff", ], }, extra_evidence=[ { "evidence_id": "github_commit_diff", "title": "GitHub Commit a93b12f", "kind": "reference", "summary": ( "```diff\n" "@@ -101,3 +101,3 @@\n" "- html = \"