| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
| import os |
| import time |
| import logging |
| from datetime import datetime, timezone |
|
|
| import requests |
|
|
| logger = logging.getLogger("ThreatHunter") |
|
|
| |
| |
| |
|
|
| KEV_API_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" |
| REQUEST_TIMEOUT = 30 |
|
|
| |
| CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data") |
| KEV_CACHE_PATH = os.path.join(CACHE_DIR, "kev_cache.json") |
|
|
| |
| _kev_lookup: dict | None = None |
| _kev_total_count: int = 0 |
| _kev_source: str = "CISA KEV (unavailable)" |
|
|
|
|
| |
| |
| |
|
|
| def _download_kev_catalog() -> dict | None: |
| """ |
| ไธ่ผๅฎๆด CISA KEV JSON ่ณๆใ |
| ๆๅๅๅณๅๅง JSON dict๏ผๅคฑๆๅๅณ Noneใ |
| """ |
| try: |
| logger.info("[QUERY] Downloading CISA KEV catalog...") |
| response = requests.get(KEV_API_URL, timeout=REQUEST_TIMEOUT) |
|
|
| if response.status_code == 200: |
| data = response.json() |
| logger.info("[OK] KEV catalog downloaded: %d entries", len(data.get('vulnerabilities', []))) |
| return data |
|
|
| logger.warning("[WARN] KEV API returned %d", response.status_code) |
| return None |
|
|
| except requests.exceptions.Timeout: |
| logger.warning("[WARN] KEV API timeout (%ds)", REQUEST_TIMEOUT) |
| return None |
| except requests.exceptions.ConnectionError: |
| logger.warning("[WARN] KEV API connection failed (network issue)") |
| return None |
| except requests.exceptions.RequestException as e: |
| logger.warning("[WARN] KEV API request error: %s", e) |
| return None |
| except (json.JSONDecodeError, ValueError) as e: |
| logger.warning("[WARN] KEV API returned non-JSON: %s", e) |
| return None |
|
|
|
|
| def _write_kev_cache(data: dict) -> None: |
| """ๅฐ KEV ๅฎๆด่ณๆๅฏซๅ
ฅ้ข็ทๅฟซๅ""" |
| try: |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| data["_cached_at"] = time.time() |
| with open(KEV_CACHE_PATH, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
| logger.info("[OK] KEV cache updated: %s", KEV_CACHE_PATH) |
| except (IOError, PermissionError) as e: |
| logger.warning("[WARN] KEV cache write failed: %s", e) |
|
|
|
|
| def _read_kev_cache() -> dict | None: |
| """่ฎๅ KEV ้ข็ทๅฟซๅ๏ผไธๅญๅจๅๅณ None๏ผKEV ๅฟซๅไธ่จญ้ๆ๏ผๅ ็บๆๆดๆฐๆฉๅถ๏ผ""" |
| try: |
| if os.path.exists(KEV_CACHE_PATH): |
| with open(KEV_CACHE_PATH, "r", encoding="utf-8") as f: |
| cached = json.load(f) |
| logger.info("[OK] KEV cache hit: %d entries", len(cached.get('vulnerabilities', []))) |
| return cached |
| except (json.JSONDecodeError, IOError) as e: |
| logger.warning("[WARN] KEV cache read failed: %s", e) |
| return None |
|
|
|
|
| def _build_kev_lookup(raw_data: dict) -> dict: |
| """ |
| ๅฐ KEV ๅๅง่ณๆๅปบ็ซ็บ {cve_id: details} ๆฅ่ฉข่กจใ |
| |
| KEV JSON ็ตๆง๏ผ |
| vulnerabilities[].cveID โ CVE ID |
| vulnerabilities[].dateAdded โ ๅ ๅ
ฅๆฅๆ |
| vulnerabilities[].dueDate โ ไฟฎ่ฃๆ้ |
| vulnerabilities[].vendorProject โ ไพๆๅ |
| vulnerabilities[].product โ ็ขๅ |
| vulnerabilities[].knownRansomwareCampaignUse โ ๆฏๅฆ่ขซๅ็ดข่ป้ซๅฉ็จ |
| vulnerabilities[].shortDescription โ ็ฐก็ญๆ่ฟฐ |
| """ |
| lookup = {} |
| for vuln in raw_data.get("vulnerabilities", []): |
| cve_id = vuln.get("cveID", "") |
| if cve_id: |
| lookup[cve_id] = { |
| "date_added": vuln.get("dateAdded", ""), |
| "due_date": vuln.get("dueDate", ""), |
| "vendor": vuln.get("vendorProject", ""), |
| "product": vuln.get("product", ""), |
| "known_ransomware_use": vuln.get("knownRansomwareCampaignUse", "Unknown"), |
| "short_description": vuln.get("shortDescription", ""), |
| } |
| return lookup |
|
|
|
|
| def _ensure_kev_loaded() -> None: |
| """ |
| ็ขบไฟ KEV ๆฅ่ฉข่กจๅทฒ่ผๅ
ฅ๏ผLazy Loading๏ผใ |
| |
| ้็ด็ญ็ฅ๏ผ |
| 1. ไธ่ผ็ทไธ KEV JSON โ ๆๅๅๅปบ็ซๆฅ่ฉข่กจ + ๆดๆฐๅฟซๅ |
| 2. ไธ่ผๅคฑๆ โ ่ฎ้ข็ทๅฟซๅ |
| 3. ๅฟซๅไนๆฒๆ โ ๆฅ่ฉข่กจ็บ็ฉบ dict๏ผๆๆ CVE ้ฝๅๅณ in_kev=false๏ผ |
| """ |
| global _kev_lookup, _kev_total_count, _kev_source |
|
|
| if _kev_lookup is not None: |
| return |
|
|
| |
| raw_data = _download_kev_catalog() |
| if raw_data is not None: |
| _kev_lookup = _build_kev_lookup(raw_data) |
| _kev_total_count = len(_kev_lookup) |
| _kev_source = "CISA KEV (online)" |
| _write_kev_cache(raw_data) |
| logger.info("[OK] KEV lookup table built (online): %d entries", _kev_total_count) |
| return |
|
|
| |
| cached = _read_kev_cache() |
| if cached is not None: |
| _kev_lookup = _build_kev_lookup(cached) |
| _kev_total_count = len(_kev_lookup) |
| _kev_source = "CISA KEV (cache)" |
| logger.info("[OK] KEV lookup table built (cache): %d entries", _kev_total_count) |
| return |
|
|
| |
| _kev_lookup = {} |
| _kev_total_count = 0 |
| _kev_source = "CISA KEV (unavailable)" |
| logger.warning("[WARN] KEV catalog unavailable (online + cache both failed), all queries return in_kev=false") |
|
|
|
|
| |
| |
| |
|
|
| def _check_kev_impl(cve_ids: str) -> str: |
| """ |
| check_cisa_kev ็ๆ ธๅฟๅฏฆไฝ๏ผ่ CrewAI @tool ่งฃ่ฆ๏ผๆนไพฟๅฎๅ
ๆธฌ่ฉฆ๏ผใ |
| |
| ๆฅๆถ้่ๅ้็ CVE ID ๅญไธฒ๏ผๅๅณๆฏๅ CVE ็ KEV ็ๆ
JSONใ |
| |
| ้็ด็ญ็ฅ๏ผ |
| 1. ็ทไธ KEV โ ไฝฟ็จ + ๆดๆฐๅฟซๅ |
| 2. ็ทไธๅคฑๆ โ ่ฎ้ข็ทๅฟซๅ |
| 3. ๅฟซๅไนๆฒๆ โ in_kev: false๏ผไฟๅฎ้ ่จญ๏ผไธ crash๏ผ |
| 4. ไปปไฝๆช้ ๆ้ฏ่ชค โ ๅๅณๅฎๅ
จ็็ฉบ็ตๆ๏ผ็ตไธ crash๏ผ |
| """ |
| try: |
| |
| _ensure_kev_loaded() |
|
|
| |
| raw_ids = [cid.strip() for cid in cve_ids.split(",") if cid.strip()] |
| if not raw_ids: |
| logger.warning("[WARN] KEV Tool received empty CVE ID input") |
| return json.dumps({ |
| "source": _kev_source, |
| "results": [], |
| "kev_total_count": _kev_total_count, |
| "error": "No CVE IDs provided", |
| }, ensure_ascii=False, indent=2) |
|
|
| logger.info("[QUERY] KEV check: %d CVEs -- %s", len(raw_ids), raw_ids) |
|
|
| results = [] |
| for cve_id in raw_ids: |
| |
| cve_id = cve_id.strip().upper() |
|
|
| if cve_id in _kev_lookup: |
| details = _kev_lookup[cve_id] |
| results.append({ |
| "cve_id": cve_id, |
| "in_kev": True, |
| "date_added": details["date_added"], |
| "due_date": details["due_date"], |
| "vendor": details["vendor"], |
| "product": details["product"], |
| "known_ransomware_use": details["known_ransomware_use"], |
| "short_description": details["short_description"], |
| }) |
| logger.info("[ALERT] %s is in CISA KEV list! (confirmed wild exploitation)", cve_id) |
| else: |
| results.append({ |
| "cve_id": cve_id, |
| "in_kev": False, |
| }) |
| logger.info("[OK] %s is not in CISA KEV list", cve_id) |
|
|
| kev_count = sum(1 for r in results if r["in_kev"]) |
| logger.info( |
| "[OK] KEV check complete: %d queries, %d in KEV list", |
| len(results), kev_count |
| ) |
|
|
| return json.dumps({ |
| "source": _kev_source, |
| "results": results, |
| "kev_total_count": _kev_total_count, |
| }, ensure_ascii=False, indent=2) |
|
|
| except Exception as e: |
| |
| logger.error("[FAIL] KEV Tool unexpected error: %s", e, exc_info=True) |
| error_result = { |
| "source": "CISA KEV (error)", |
| "results": [], |
| "kev_total_count": 0, |
| "error": f"Unexpected error: {str(e)}", |
| } |
| return json.dumps(error_result, ensure_ascii=False, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| def _create_tool(): |
| """ๅปถ้ฒๅปบ็ซ CrewAI Tool๏ผๅ
ๅจ Agent ๅฏฆ้ไฝฟ็จๆๆ import""" |
| from crewai.tools import tool |
|
|
| @tool("check_cisa_kev") |
| def check_cisa_kev(cve_ids: str) -> str: |
| """ๆฅ่ฉข CVE ๆฏๅฆๅจ CISA KEV๏ผๅทฒ็ฅ่ขซๅฉ็จๆผๆด๏ผๆธ
ๅฎไธใ |
| ่ผธๅ
ฅไธๆๅคๅ CVE ID๏ผ้่ๅ้๏ผๅฆ "CVE-2021-44228,CVE-2024-1234"๏ผ๏ผ |
| ๅๅณๆฏๅ CVE ็ KEV ็ๆ
๏ผๅ
ๅซๅ ๅ
ฅๆฅๆใๅฐๆๆฅใๆฏๅฆ่ขซๅ็ดข่ป้ซๅฉ็จ็ญ่ณ่จใ |
| ๅจ KEV ๆธ
ๅฎไธ็ CVE = ๅทฒ็ขบ่ช่ขซ้ๅคๅฉ็จ = ้ขจ้ชๆฅต้ซใ""" |
| return _check_kev_impl(cve_ids) |
|
|
| return check_cisa_kev |
|
|
|
|
| |
|
|
| class _LazyToolLoader: |
| def __init__(self): |
| self._tool = None |
|
|
| def _load(self): |
| if self._tool is None: |
| self._tool = _create_tool() |
|
|
| @property |
| def check_cisa_kev(self): |
| self._load() |
| return self._tool |
|
|
|
|
| _loader = _LazyToolLoader() |
|
|
|
|
| def __getattr__(name): |
| """ๆจก็ตๅฑค็ด __getattr__๏ผๆฏๆด from tools.kev_tool import check_cisa_kev""" |
| if name == "check_cisa_kev": |
| return _loader.check_cisa_kev |
| raise AttributeError(f"module 'tools.kev_tool' has no attribute {name!r}") |
|
|