Threat_Hunter / tools /kev_tool.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
# tools/kev_tool.py
# ๅŠŸ่ƒฝ๏ผšCISA KEV๏ผˆKnown Exploited Vulnerabilities๏ผ‰ๆธ…ๅ–ฎๆŸฅ่ฉข Tool
# Harness ๆ”ฏๆŸฑ๏ผšGraceful Degradation๏ผˆ้™็ดš็€‘ๅธƒ๏ผ‰+ Observability๏ผˆๅŽŸๅญๅŒ–ๆ—ฅ่ชŒ๏ผ‰
# ๆ“ๆœ‰่€…๏ผšๆˆๅ“ก C๏ผˆAnalyst Agent Pipeline๏ผ‰
#
# ไฝฟ็”จๆ–นๅผ๏ผš
# from tools.kev_tool import check_cisa_kev
#
# ๆžถๆง‹ๅฎšไฝ๏ผš
# Analyst Agent ็š„ใ€Œ็ฌฌไธ€้šปๆ‰‹ใ€โ€” ้ฉ—่ญ‰ CVE ๆ˜ฏๅฆๅทฒ่ขซ้‡Žๅค–ๅˆฉ็”จ
# ๅœจ KEV ๆธ…ๅ–ฎไธŠ = ๅทฒ็ขบ่ช่ขซๅˆฉ็”จ = ้ขจ้šชๆฅต้ซ˜๏ผŒ้œ€็ซ‹ๅณ่™•็†
import json
import os
import time
import logging
from datetime import datetime, timezone
import requests
logger = logging.getLogger("ThreatHunter")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅธธๆ•ธ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
KEV_API_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
REQUEST_TIMEOUT = 30 # ็ง’
# ้›ข็ทšๅฟซๅ–
CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data")
KEV_CACHE_PATH = os.path.join(CACHE_DIR, "kev_cache.json")
# ๆจก็ต„็ดš KEV ๆŸฅ่ฉข่กจ๏ผˆ้ฆ–ๆฌกๅ‘ผๅซๆ™‚่ผ‰ๅ…ฅ๏ผŒไน‹ๅพŒ้‡่ค‡ไฝฟ็”จ๏ผ‰
_kev_lookup: dict | None = None
_kev_total_count: int = 0
_kev_source: str = "CISA KEV (unavailable)"
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ่ผ”ๅŠฉๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _download_kev_catalog() -> dict | None:
"""
ไธ‹่ผ‰ๅฎŒๆ•ด CISA KEV JSON ่ณ‡ๆ–™ใ€‚
ๆˆๅŠŸๅ›žๅ‚ณๅŽŸๅง‹ JSON dict๏ผŒๅคฑๆ•—ๅ›žๅ‚ณ Noneใ€‚
"""
try:
logger.info("[QUERY] Downloading CISA KEV catalog...")
response = requests.get(KEV_API_URL, timeout=REQUEST_TIMEOUT)
if response.status_code == 200:
data = response.json()
logger.info("[OK] KEV catalog downloaded: %d entries", len(data.get('vulnerabilities', [])))
return data
logger.warning("[WARN] KEV API returned %d", response.status_code)
return None
except requests.exceptions.Timeout:
logger.warning("[WARN] KEV API timeout (%ds)", REQUEST_TIMEOUT)
return None
except requests.exceptions.ConnectionError:
logger.warning("[WARN] KEV API connection failed (network issue)")
return None
except requests.exceptions.RequestException as e:
logger.warning("[WARN] KEV API request error: %s", e)
return None
except (json.JSONDecodeError, ValueError) as e:
logger.warning("[WARN] KEV API returned non-JSON: %s", e)
return None
def _write_kev_cache(data: dict) -> None:
"""ๅฐ‡ KEV ๅฎŒๆ•ด่ณ‡ๆ–™ๅฏซๅ…ฅ้›ข็ทšๅฟซๅ–"""
try:
os.makedirs(CACHE_DIR, exist_ok=True)
data["_cached_at"] = time.time()
with open(KEV_CACHE_PATH, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info("[OK] KEV cache updated: %s", KEV_CACHE_PATH)
except (IOError, PermissionError) as e:
logger.warning("[WARN] KEV cache write failed: %s", e)
def _read_kev_cache() -> dict | None:
"""่ฎ€ๅ– KEV ้›ข็ทšๅฟซๅ–๏ผŒไธๅญ˜ๅœจๅ›žๅ‚ณ None๏ผˆKEV ๅฟซๅ–ไธ่จญ้ŽๆœŸ๏ผŒๅ› ็‚บๆœ‰ๆ›ดๆ–ฐๆฉŸๅˆถ๏ผ‰"""
try:
if os.path.exists(KEV_CACHE_PATH):
with open(KEV_CACHE_PATH, "r", encoding="utf-8") as f:
cached = json.load(f)
logger.info("[OK] KEV cache hit: %d entries", len(cached.get('vulnerabilities', [])))
return cached
except (json.JSONDecodeError, IOError) as e:
logger.warning("[WARN] KEV cache read failed: %s", e)
return None
def _build_kev_lookup(raw_data: dict) -> dict:
"""
ๅฐ‡ KEV ๅŽŸๅง‹่ณ‡ๆ–™ๅปบ็ซ‹็‚บ {cve_id: details} ๆŸฅ่ฉข่กจใ€‚
KEV JSON ็ตๆง‹๏ผš
vulnerabilities[].cveID โ†’ CVE ID
vulnerabilities[].dateAdded โ†’ ๅŠ ๅ…ฅๆ—ฅๆœŸ
vulnerabilities[].dueDate โ†’ ไฟฎ่ฃœๆœŸ้™
vulnerabilities[].vendorProject โ†’ ไพ›ๆ‡‰ๅ•†
vulnerabilities[].product โ†’ ็”ขๅ“
vulnerabilities[].knownRansomwareCampaignUse โ†’ ๆ˜ฏๅฆ่ขซๅ‹’็ดข่ปŸ้ซ”ๅˆฉ็”จ
vulnerabilities[].shortDescription โ†’ ็ฐก็Ÿญๆ่ฟฐ
"""
lookup = {}
for vuln in raw_data.get("vulnerabilities", []):
cve_id = vuln.get("cveID", "")
if cve_id:
lookup[cve_id] = {
"date_added": vuln.get("dateAdded", ""),
"due_date": vuln.get("dueDate", ""),
"vendor": vuln.get("vendorProject", ""),
"product": vuln.get("product", ""),
"known_ransomware_use": vuln.get("knownRansomwareCampaignUse", "Unknown"),
"short_description": vuln.get("shortDescription", ""),
}
return lookup
def _ensure_kev_loaded() -> None:
"""
็ขบไฟ KEV ๆŸฅ่ฉข่กจๅทฒ่ผ‰ๅ…ฅ๏ผˆLazy Loading๏ผ‰ใ€‚
้™็ดš็ญ–็•ฅ๏ผš
1. ไธ‹่ผ‰็ทšไธŠ KEV JSON โ†’ ๆˆๅŠŸๅ‰‡ๅปบ็ซ‹ๆŸฅ่ฉข่กจ + ๆ›ดๆ–ฐๅฟซๅ–
2. ไธ‹่ผ‰ๅคฑๆ•— โ†’ ่ฎ€้›ข็ทšๅฟซๅ–
3. ๅฟซๅ–ไนŸๆฒ’ๆœ‰ โ†’ ๆŸฅ่ฉข่กจ็‚บ็ฉบ dict๏ผˆๆ‰€ๆœ‰ CVE ้ƒฝๅ›žๅ‚ณ in_kev=false๏ผ‰
"""
global _kev_lookup, _kev_total_count, _kev_source
if _kev_lookup is not None:
return # ๅทฒ่ผ‰ๅ…ฅ๏ผŒไธ้‡่ค‡ไธ‹่ผ‰
# ๅ˜—่ฉฆ็ทšไธŠไธ‹่ผ‰
raw_data = _download_kev_catalog()
if raw_data is not None:
_kev_lookup = _build_kev_lookup(raw_data)
_kev_total_count = len(_kev_lookup)
_kev_source = "CISA KEV (online)"
_write_kev_cache(raw_data)
logger.info("[OK] KEV lookup table built (online): %d entries", _kev_total_count)
return
# ้™็ดš๏ผš่ฎ€้›ข็ทšๅฟซๅ–
cached = _read_kev_cache()
if cached is not None:
_kev_lookup = _build_kev_lookup(cached)
_kev_total_count = len(_kev_lookup)
_kev_source = "CISA KEV (cache)"
logger.info("[OK] KEV lookup table built (cache): %d entries", _kev_total_count)
return
# ๆœ€็ต‚้™็ดš๏ผš็ฉบๆŸฅ่ฉข่กจ
_kev_lookup = {}
_kev_total_count = 0
_kev_source = "CISA KEV (unavailable)"
logger.warning("[WARN] KEV catalog unavailable (online + cache both failed), all queries return in_kev=false")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆ ธๅฟƒๆŸฅ่ฉข้‚่ผฏ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _check_kev_impl(cve_ids: str) -> str:
"""
check_cisa_kev ็š„ๆ ธๅฟƒๅฏฆไฝœ๏ผˆ่ˆ‡ CrewAI @tool ่งฃ่€ฆ๏ผŒๆ–นไพฟๅ–ฎๅ…ƒๆธฌ่ฉฆ๏ผ‰ใ€‚
ๆŽฅๆ”ถ้€—่™Ÿๅˆ†้š”็š„ CVE ID ๅญ—ไธฒ๏ผŒๅ›žๅ‚ณๆฏๅ€‹ CVE ็š„ KEV ็‹€ๆ…‹ JSONใ€‚
้™็ดš็ญ–็•ฅ๏ผš
1. ็ทšไธŠ KEV โ†’ ไฝฟ็”จ + ๆ›ดๆ–ฐๅฟซๅ–
2. ็ทšไธŠๅคฑๆ•— โ†’ ่ฎ€้›ข็ทšๅฟซๅ–
3. ๅฟซๅ–ไนŸๆฒ’ๆœ‰ โ†’ in_kev: false๏ผˆไฟๅฎˆ้ ่จญ๏ผŒไธ crash๏ผ‰
4. ไปปไฝ•ๆœช้ ๆœŸ้Œฏ่ชค โ†’ ๅ›žๅ‚ณๅฎ‰ๅ…จ็š„็ฉบ็ตๆžœ๏ผˆ็ต•ไธ crash๏ผ‰
"""
try:
# ็ขบไฟ KEV ๆŸฅ่ฉข่กจๅทฒ่ผ‰ๅ…ฅ
_ensure_kev_loaded()
# ่งฃๆž้€—่™Ÿๅˆ†้š”็š„ CVE ID
raw_ids = [cid.strip() for cid in cve_ids.split(",") if cid.strip()]
if not raw_ids:
logger.warning("[WARN] KEV Tool received empty CVE ID input")
return json.dumps({
"source": _kev_source,
"results": [],
"kev_total_count": _kev_total_count,
"error": "No CVE IDs provided",
}, ensure_ascii=False, indent=2)
logger.info("[QUERY] KEV check: %d CVEs -- %s", len(raw_ids), raw_ids)
results = []
for cve_id in raw_ids:
# ๆญฃ่ฆๅŒ– CVE ID ๆ ผๅผ๏ผˆๅŽป้™ค็ฉบ็™ฝใ€็ตฑไธ€ๅคงๅฏซ๏ผ‰
cve_id = cve_id.strip().upper()
if cve_id in _kev_lookup:
details = _kev_lookup[cve_id]
results.append({
"cve_id": cve_id,
"in_kev": True,
"date_added": details["date_added"],
"due_date": details["due_date"],
"vendor": details["vendor"],
"product": details["product"],
"known_ransomware_use": details["known_ransomware_use"],
"short_description": details["short_description"],
})
logger.info("[ALERT] %s is in CISA KEV list! (confirmed wild exploitation)", cve_id)
else:
results.append({
"cve_id": cve_id,
"in_kev": False,
})
logger.info("[OK] %s is not in CISA KEV list", cve_id)
kev_count = sum(1 for r in results if r["in_kev"])
logger.info(
"[OK] KEV check complete: %d queries, %d in KEV list",
len(results), kev_count
)
return json.dumps({
"source": _kev_source,
"results": results,
"kev_total_count": _kev_total_count,
}, ensure_ascii=False, indent=2)
except Exception as e:
# ๆœ€ๅพŒไธ€้“้˜ฒ็ทš๏ผšไปปไฝ•ๆœช้ ๆœŸ้Œฏ่ชค้ƒฝไธ่ƒฝ่ฎ“ Agent crash
logger.error("[FAIL] KEV Tool unexpected error: %s", e, exc_info=True)
error_result = {
"source": "CISA KEV (error)",
"results": [],
"kev_total_count": 0,
"error": f"Unexpected error: {str(e)}",
}
return json.dumps(error_result, ensure_ascii=False, indent=2)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# CrewAI @tool ๅŒ…่ฃ๏ผˆAgent ๅ‘ผๅซ็”จ๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# โš ๏ธ ้‡่ฆ๏ผšไฝฟ็”จใ€Œๅปถ้ฒ่ผ‰ๅ…ฅใ€ๆจกๅผ๏ผˆLazyToolLoader๏ผ‰
# ๅŽŸๅ› ๏ผš้ฟๅ…ๅœจ import ้šŽๆฎตๅฐฑ่งธ็™ผ CrewAI ็š„ tool ่จปๅ†Š
def _create_tool():
"""ๅปถ้ฒๅปบ็ซ‹ CrewAI Tool๏ผŒๅƒ…ๅœจ Agent ๅฏฆ้š›ไฝฟ็”จๆ™‚ๆ‰ import"""
from crewai.tools import tool
@tool("check_cisa_kev")
def check_cisa_kev(cve_ids: str) -> str:
"""ๆŸฅ่ฉข CVE ๆ˜ฏๅฆๅœจ CISA KEV๏ผˆๅทฒ็Ÿฅ่ขซๅˆฉ็”จๆผๆดž๏ผ‰ๆธ…ๅ–ฎไธŠใ€‚
่ผธๅ…ฅไธ€ๆˆ–ๅคšๅ€‹ CVE ID๏ผˆ้€—่™Ÿๅˆ†้š”๏ผŒๅฆ‚ "CVE-2021-44228,CVE-2024-1234"๏ผ‰๏ผŒ
ๅ›žๅ‚ณๆฏๅ€‹ CVE ็š„ KEV ็‹€ๆ…‹๏ผŒๅŒ…ๅซๅŠ ๅ…ฅๆ—ฅๆœŸใ€ๅˆฐๆœŸๆ—ฅใ€ๆ˜ฏๅฆ่ขซๅ‹’็ดข่ปŸ้ซ”ๅˆฉ็”จ็ญ‰่ณ‡่จŠใ€‚
ๅœจ KEV ๆธ…ๅ–ฎไธŠ็š„ CVE = ๅทฒ็ขบ่ช่ขซ้‡Žๅค–ๅˆฉ็”จ = ้ขจ้šชๆฅต้ซ˜ใ€‚"""
return _check_kev_impl(cve_ids)
return check_cisa_kev
# โ”€โ”€ ๅปถ้ฒ่ผ‰ๅ…ฅๆฉŸๅˆถ๏ผˆ่ˆ‡ nvd_tool.py ็›ธๅŒๆจกๅผ๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class _LazyToolLoader:
def __init__(self):
self._tool = None
def _load(self):
if self._tool is None:
self._tool = _create_tool()
@property
def check_cisa_kev(self):
self._load()
return self._tool
_loader = _LazyToolLoader()
def __getattr__(name):
"""ๆจก็ต„ๅฑค็ดš __getattr__๏ผŒๆ”ฏๆด from tools.kev_tool import check_cisa_kev"""
if name == "check_cisa_kev":
return _loader.check_cisa_kev
raise AttributeError(f"module 'tools.kev_tool' has no attribute {name!r}")