File size: 4,924 Bytes
d63a1ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Build a provider-backed fallback snapshot cache."""

from __future__ import annotations

import json
from pathlib import Path
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List

import requests

ROOT = Path(__file__).resolve().parent.parent
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from server.cases import EPSS_URL, NVD_CVE_URL, OSV_VULN_URL, _extract_cve_id

SNAPSHOT_DIR = ROOT / "data" / "snapshots"
INDEX_PATH = ROOT / "data" / "snapshot_index.json"
PYPA_TREE_URL = "https://api.github.com/repos/pypa/advisory-database/git/trees/main?recursive=1"


def get_candidate_ids(limit: int = 200) -> List[str]:
    response = requests.get(PYPA_TREE_URL, timeout=30)
    response.raise_for_status()
    tree = response.json().get("tree", [])
    ids = []
    for item in tree:
        path = item.get("path", "")
        if not path.startswith("vulns/") or not path.endswith(".yaml"):
            continue
        ident = path.rsplit("/", 1)[-1][:-5]
        if ident.startswith(("PYSEC-", "GHSA-")):
            ids.append(ident)
    return ids[: limit * 4]


def fetch_json(url: str, *, params: Dict[str, str] | None = None) -> Dict:
    response = requests.get(url, params=params, timeout=20)
    response.raise_for_status()
    return response.json()


def build_snapshot(osv_id: str) -> Dict | None:
    osv = fetch_json(OSV_VULN_URL.format(osv_id=osv_id))
    if not osv.get("affected"):
        return None

    cve_id = _extract_cve_id(osv)
    snapshot = {
        "id": osv.get("id"),
        "summary": osv.get("summary"),
        "details": osv.get("details"),
        "aliases": osv.get("aliases", []),
        "references": osv.get("references", []),
        "affected": osv.get("affected", []),
        "severity": "MEDIUM",
        "nvd_description": "",
        "epss_score": 0.0,
        "epss_percentile": 0.0,
    }

    if cve_id:
        try:
            nvd = fetch_json(NVD_CVE_URL, params={"cveId": cve_id})
            vulnerability = (nvd.get("vulnerabilities") or [{}])[0].get("cve", {})
            metrics = vulnerability.get("metrics", {})
            severity = None
            for key in ("cvssMetricV40", "cvssMetricV31", "cvssMetricV30", "cvssMetricV2"):
                if key in metrics:
                    item = metrics[key][0]
                    severity = (
                        item.get("cvssData", {}).get("baseSeverity")
                        or item.get("baseSeverity")
                    )
                    if severity:
                        break
            descriptions = vulnerability.get("descriptions", [])
            snapshot["severity"] = severity or snapshot["severity"]
            snapshot["nvd_description"] = next(
                (
                    desc.get("value", "")
                    for desc in descriptions
                    if desc.get("lang") == "en"
                ),
                descriptions[0].get("value", "") if descriptions else "",
            )
        except Exception:
            pass

        try:
            epss = fetch_json(EPSS_URL, params={"cve": cve_id})
            item = (epss.get("data") or [{}])[0]
            snapshot["epss_score"] = float(item.get("epss", 0.0) or 0.0)
            snapshot["epss_percentile"] = float(item.get("percentile", 0.0) or 0.0)
        except Exception:
            pass

    return snapshot


def main(target_count: int = 200) -> None:
    SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
    candidates = get_candidate_ids(target_count)[: max(target_count + 40, 240)]
    saved = []

    with ThreadPoolExecutor(max_workers=12) as executor:
        futures = {executor.submit(build_snapshot, osv_id): osv_id for osv_id in candidates}
        for future in as_completed(futures):
            if len(saved) >= target_count:
                executor.shutdown(wait=False, cancel_futures=True)
                break
            osv_id = futures[future]
            try:
                snapshot = future.result()
            except Exception:
                continue
            if not snapshot:
                continue
            out_path = SNAPSHOT_DIR / f"{osv_id}.json"
            out_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True))
            saved.append(
                {
                    "osv_id": osv_id,
                    "file": str(out_path.relative_to(ROOT)),
                    "cve_id": _extract_cve_id(snapshot),
                    "package": (snapshot.get("affected") or [{}])[0].get("package", {}).get("name", ""),
                }
            )

    INDEX_PATH.parent.mkdir(parents=True, exist_ok=True)
    saved = sorted(saved, key=lambda item: item["osv_id"])
    INDEX_PATH.write_text(json.dumps({"count": len(saved), "snapshots": saved}, indent=2))
    print(f"Saved {len(saved)} snapshots to {SNAPSHOT_DIR}")


if __name__ == "__main__":
    main()