| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
| import os |
| import time |
| import hashlib |
| import logging |
| from datetime import datetime, timezone |
|
|
| import requests |
|
|
| |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| os.environ.setdefault("CREWAI_STORAGE_DIR", os.path.join(_PROJECT_ROOT, ".crewai_storage")) |
| from crewai.tools import tool |
|
|
| logger = logging.getLogger("ThreatHunter.epss_tool") |
|
|
| |
| |
| |
|
|
| EPSS_API_BASE = "https://api.first.org/data/v1/epss" |
| REQUEST_TIMEOUT = 15 |
| MAX_RETRIES = 2 |
|
|
| |
| CACHE_DIR = os.path.join(_PROJECT_ROOT, "data") |
| CACHE_TTL = 3600 * 24 |
|
|
| |
| EPSS_HIGH_THRESHOLD = 0.10 |
| EPSS_CRITICAL_THRESHOLD = 0.50 |
| EPSS_MEDIUM_THRESHOLD = 0.05 |
|
|
|
|
| def _get_cache_path(cve_id: str) -> str: |
| safe = hashlib.md5(cve_id.encode()).hexdigest()[:12] |
| return os.path.join(CACHE_DIR, f"epss_cache_{cve_id}_{safe}.json") |
|
|
|
|
| def _read_cache(cve_id: str) -> dict | None: |
| try: |
| path = _get_cache_path(cve_id) |
| if os.path.exists(path): |
| with open(path, "r", encoding="utf-8") as f: |
| cached = json.load(f) |
| if time.time() - cached.get("_cached_at", 0) < CACHE_TTL: |
| logger.info("[OK] EPSS cache hit: %s", cve_id) |
| return cached |
| except (json.JSONDecodeError, IOError): |
| pass |
| return None |
|
|
|
|
| def _write_cache(cve_id: str, data: dict) -> None: |
| try: |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| data["_cached_at"] = time.time() |
| with open(_get_cache_path(cve_id), "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
| except (IOError, PermissionError) as e: |
| logger.warning("[WARN] EPSS cache write failed: %s", e) |
|
|
|
|
| def _normalize_cve_ids(cve_ids_str: str) -> list[str]: |
| """่งฃๆไธฆๆญฃ่ฆๅ้่ๅ้็ CVE IDใ""" |
| normalized: list[str] = [] |
| for raw in cve_ids_str.split(","): |
| candidate = raw.strip().upper() |
| if candidate.startswith("CVE-"): |
| normalized.append(candidate) |
| return normalized |
|
|
|
|
| def _query_epss_api(cve_id: str) -> dict | None: |
| """ |
| ๅผๅซ FIRST.org EPSS APIใ |
| |
| GET https://api.first.org/data/v1/epss?cve=CVE-2021-44228 |
| Response: {"data": [{"cve": "...", "epss": "0.XX", "percentile": "0.XX", "date": "YYYY-MM-DD"}]} |
| """ |
| for attempt in range(MAX_RETRIES): |
| try: |
| logger.info("[QUERY] EPSS API: %s (attempt %d)", cve_id, attempt + 1) |
| response = requests.get( |
| EPSS_API_BASE, |
| params={"cve": cve_id}, |
| timeout=REQUEST_TIMEOUT, |
| ) |
| if response.status_code == 200: |
| return response.json() |
| elif response.status_code == 429: |
| logger.warning("[WARN] EPSS API 429 (rate limited), waiting...") |
| time.sleep(3) |
| else: |
| logger.warning("[WARN] EPSS API %d", response.status_code) |
| return None |
| except requests.exceptions.Timeout: |
| logger.warning("[WARN] EPSS API timeout") |
| except requests.exceptions.ConnectionError: |
| logger.warning("[WARN] EPSS API connection failed") |
| except requests.exceptions.RequestException as e: |
| logger.warning("[WARN] EPSS API error: %s", e) |
| return None |
|
|
|
|
| def _interpret_epss(score: float) -> str: |
| """ๅฐ EPSS ๅๆธ่ฝ็บไบบ้กๅฏ่ฎ่ชชๆใ""" |
| if score >= EPSS_CRITICAL_THRESHOLD: |
| return f"CRITICAL_RISK โ {score:.1%} probability of exploitation in 30 days" |
| elif score >= EPSS_HIGH_THRESHOLD: |
| return f"HIGH_RISK โ {score:.1%} probability of exploitation in 30 days" |
| elif score >= 0.01: |
| return f"MODERATE_RISK โ {score:.2%} probability of exploitation in 30 days" |
| else: |
| return f"LOW_RISK โ {score:.3%} probability of exploitation in 30 days" |
|
|
|
|
| def _risk_level_from_epss(score: float) -> str: |
| """ๅฐ EPSS ๅๆธๆ ๅฐๆๆธฌ่ฉฆ่ UI ไฝฟ็จ็้ขจ้ช็ญ็ดใ""" |
| if score >= EPSS_CRITICAL_THRESHOLD: |
| return "CRITICAL" |
| if score >= 0.20: |
| return "HIGH" |
| if score >= EPSS_MEDIUM_THRESHOLD: |
| return "MEDIUM" |
| return "LOW" |
|
|
|
|
| def _fetch_epss_online(cve_ids: list[str]) -> dict[str, dict]: |
| """ๆฅ่ฉขๅคๅ CVE ็ EPSS ่ณๆ๏ผๅ
่ฎๅฟซๅ๏ผๅคฑๆๆๅๅณ็ฉบ็ตๆใ""" |
| results: dict[str, dict] = {} |
| for cve_id in cve_ids: |
| cached = _read_cache(cve_id) |
| if cached: |
| results[cve_id] = cached |
| continue |
|
|
| raw = _query_epss_api(cve_id) |
| if raw and raw.get("data"): |
| entry = raw["data"][0] |
| results[cve_id] = { |
| "epss": float(entry.get("epss", 0.0)), |
| "percentile": float(entry.get("percentile", 0.0)), |
| "date": entry.get("date", ""), |
| "_cached_at": time.time(), |
| "_source": "FIRST.org EPSS API (online)", |
| } |
| return results |
|
|
|
|
| def get_epss_score(cve_id: str) -> dict: |
| """ |
| ๅๅพๅฎไธ CVE ็ EPSS ๅๆธ๏ผ็จๅผ็ขผๅฑคๅผๅซ๏ผไพ Intel Fusion ็ดๆฅไฝฟ็จ๏ผใ |
| |
| Returns: |
| { |
| "cve_id": "CVE-...", |
| "epss": float, # 0.0-1.0 |
| "percentile": float, # 0.0-1.0 |
| "date": "YYYY-MM-DD", |
| "source": "EPSS", |
| "error": str | None, |
| } |
| """ |
| if not cve_id or not cve_id.startswith("CVE-"): |
| return {"cve_id": cve_id, "epss": 0.0, "percentile": 0.0, |
| "source": "EPSS", "error": "Invalid CVE ID"} |
|
|
| online_results = _fetch_epss_online([cve_id]) |
| if cve_id in online_results: |
| entry = online_results[cve_id] |
| result = { |
| "cve_id": cve_id, |
| "epss": float(entry.get("epss", 0.0)), |
| "percentile": float(entry.get("percentile", 0.0)), |
| "date": entry.get("date", ""), |
| "source": "EPSS", |
| "error": None, |
| } |
| if not _read_cache(cve_id): |
| _write_cache(cve_id, result) |
| logger.info("[OK] EPSS: %s -> %.4f (percentile %.2f)", |
| cve_id, result["epss"], result["percentile"]) |
| return result |
|
|
| logger.warning("[WARN] EPSS unavailable for: %s", cve_id) |
| return { |
| "cve_id": cve_id, |
| "epss": 0.0, |
| "percentile": 0.0, |
| "source": "EPSS", |
| "error": f"EPSS API unavailable for {cve_id}", |
| } |
|
|
|
|
| def _fetch_epss_impl(cve_ids_str: str) -> str: |
| """fetch_epss_score ็ๆ ธๅฟๅฏฆไฝ๏ผๆฅๅ้่ๅ้็ CVE IDใ""" |
| cve_ids = _normalize_cve_ids(cve_ids_str) |
| if not cve_ids: |
| return json.dumps({"error": "No valid CVE IDs provided", "results": []}) |
|
|
| limited_cve_ids = cve_ids[:10] |
| online_results = _fetch_epss_online(limited_cve_ids) |
|
|
| results = [] |
| high_risk = 0 |
| found_count = 0 |
|
|
| for cve_id in limited_cve_ids: |
| data = online_results.get(cve_id) |
| if data: |
| if not _read_cache(cve_id): |
| _write_cache(cve_id, data) |
| epss_score = float(data.get("epss", 0.0)) |
| percentile = float(data.get("percentile", 0.0)) |
| found = True |
| found_count += 1 |
| else: |
| epss_score = 0.0 |
| percentile = 0.0 |
| found = False |
|
|
| risk_level = _risk_level_from_epss(epss_score) |
| if epss_score >= EPSS_HIGH_THRESHOLD: |
| high_risk += 1 |
|
|
| results.append({ |
| "cve_id": cve_id, |
| "epss_score": epss_score, |
| "percentile": percentile, |
| "date": data.get("date", "") if data else "", |
| "risk_level": risk_level, |
| "found": found, |
| "interpretation": _interpret_epss(epss_score), |
| }) |
|
|
| return json.dumps({ |
| "source": "FIRST.org EPSS", |
| "results": results, |
| "summary": { |
| "total_queried": len(limited_cve_ids), |
| "found": found_count, |
| "high_risk": high_risk, |
| }, |
| "query_time": datetime.now(timezone.utc).isoformat(), |
| }, ensure_ascii=False) |
|
|
|
|
| |
| |
| |
|
|
| class _Loader: |
| def __init__(self): |
| self._tool = None |
|
|
| def _load(self): |
| if self._tool is None: |
| @tool("fetch_epss_score") |
| def fetch_epss_score(cve_ids: str) -> str: |
| """ๆฅ่ฉข FIRST.org EPSS (Exploit Prediction Scoring System) ๅๆธใ |
| |
| ่ผธๅ
ฅ๏ผ้่ๅ้็ CVE ID๏ผไพๅฆ "CVE-2021-44228,CVE-2024-1234" |
| ่ฟๅ๏ผๆฏๅ CVE ๅจๆฅไธไพ 30 ๅคฉๅ
ง่ขซ้ๅคๅฉ็จ็ๆฉ็๏ผ0.0-1.0๏ผ |
| |
| EPSS > 0.1 (10%) ่กจ็คบ้ซ้ขจ้ช๏ผๆๅชๅ
ไฟฎ่ฃใ |
| EPSS > 0.5 (50%) ่กจ็คบๆฅต้ซ้ขจ้ช๏ผๆ็ซๅณไฟฎ่ฃใ |
| ๅๅพ CVE ๅ่กจๅพ็ซๅณๆฅ่ฉข EPSS๏ผๅคๆทไฟฎ่ฃๅชๅ
้ ๅบใ |
| """ |
| return _fetch_epss_impl(cve_ids) |
| self._tool = fetch_epss_score |
| return self._tool |
|
|
| @property |
| def fetch_epss_score(self): |
| return self._load() |
|
|
|
|
| _loader = _Loader() |
|
|
|
|
| def __getattr__(name: str): |
| if name == "fetch_epss_score": |
| return _loader.fetch_epss_score |
| raise AttributeError(f"module 'tools.epss_tool' has no attribute {name!r}") |
|
|