| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
| import os |
| import time |
| import hashlib |
| import logging |
| from datetime import datetime, timezone |
|
|
| import requests |
|
|
| logger = logging.getLogger("ThreatHunter") |
|
|
| |
| |
| |
|
|
| GITHUB_API_BASE = "https://api.github.com/search/repositories" |
| RESULTS_PER_PAGE = 10 |
| REQUEST_TIMEOUT = 20 |
|
|
| |
| |
| RATE_LIMIT_WITH_TOKEN = 2.0 |
| RATE_LIMIT_WITHOUT_TOKEN = 6.0 |
| MAX_RETRIES = 2 |
|
|
| |
| CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data") |
| CACHE_TTL = 3600 * 24 |
|
|
| |
| _last_request_time = 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def _get_github_token() -> str: |
| """取得 GitHub Token(優先從 config,備選環境變數)""" |
| try: |
| from config import GITHUB_TOKEN |
| if GITHUB_TOKEN: |
| return GITHUB_TOKEN |
| except ImportError: |
| pass |
| return os.getenv("GITHUB_TOKEN", "") |
|
|
|
|
| def _get_cache_path(cve_id: str) -> str: |
| """取得離線快取檔案路徑""" |
| safe_name = hashlib.md5(cve_id.encode()).hexdigest()[:12] |
| return os.path.join(CACHE_DIR, f"exploit_cache_{cve_id}_{safe_name}.json") |
|
|
|
|
| def _read_cache(cve_id: str) -> dict | None: |
| """讀取離線快取,過期或不存在回傳 None""" |
| cache_path = _get_cache_path(cve_id) |
| try: |
| if os.path.exists(cache_path): |
| with open(cache_path, "r", encoding="utf-8") as f: |
| cached = json.load(f) |
| cached_time = cached.get("_cached_at", 0) |
| if time.time() - cached_time < CACHE_TTL: |
| logger.info("[OK] Exploit cache hit: %s", cve_id) |
| return cached |
| else: |
| logger.info("[INFO] Exploit cache expired: %s", cve_id) |
| except (json.JSONDecodeError, IOError) as e: |
| logger.warning("[WARN] Exploit cache read failed: %s", e) |
| return None |
|
|
|
|
| def _write_cache(cve_id: str, data: dict) -> None: |
| """寫入離線快取""" |
| try: |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| cache_path = _get_cache_path(cve_id) |
| data["_cached_at"] = time.time() |
| with open(cache_path, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
| except (IOError, PermissionError) as e: |
| logger.warning("[WARN] Exploit cache write failed: %s", e) |
|
|
|
|
| def _rate_limit() -> None: |
| """Rate limiter — 確保不超過 GitHub API 限速""" |
| global _last_request_time |
| token = _get_github_token() |
| interval = RATE_LIMIT_WITH_TOKEN if token else RATE_LIMIT_WITHOUT_TOKEN |
| elapsed = time.time() - _last_request_time |
| if elapsed < interval: |
| wait = interval - elapsed |
| logger.info("[WAIT] GitHub rate limit: waiting %.1fs", wait) |
| time.sleep(wait) |
| _last_request_time = time.time() |
|
|
|
|
| def _classify_exploit_type(name: str, description: str) -> str: |
| """ |
| 根據 repo 名稱和描述判斷 exploit 類型。 |
| |
| 分類規則: |
| - 含 scanner/detect/check → scanner |
| - 含 weaponized/exploit-db/metasploit → weaponized |
| - 其他(poc/proof/test/demo 等)→ poc |
| """ |
| text = f"{name} {description}".lower() |
|
|
| |
| scanner_keywords = ["scanner", "detect", "check", "scan", "checker", "finder"] |
| for kw in scanner_keywords: |
| if kw in text: |
| return "scanner" |
|
|
| |
| weaponized_keywords = ["weaponized", "exploit-db", "metasploit", "payload", "shellcode", "reverse shell"] |
| for kw in weaponized_keywords: |
| if kw in text: |
| return "weaponized" |
|
|
| |
| return "poc" |
|
|
|
|
| def _determine_risk_indicator(exploit_count: int, api_available: bool) -> str: |
| """ |
| 根據 exploit 數量和 API 可用性判定風險指標。 |
| |
| 邏輯: |
| exploit_count > 0 → "HIGH"(有公開 exploit = 高風險) |
| exploit_count == 0 且 API 可用 → "LOW"(確認沒有公開 exploit) |
| API 不可用 → "UNKNOWN (API limited)" |
| """ |
| if not api_available: |
| return "UNKNOWN (API limited)" |
| if exploit_count > 0: |
| return "HIGH" |
| return "LOW" |
|
|
|
|
| |
| |
| |
|
|
| def _query_github_api(cve_id: str) -> dict | None: |
| """ |
| 呼叫 GitHub Search API,回傳原始 JSON response dict。 |
| 失敗回傳 None。 |
| |
| 包含: |
| - Rate limiting |
| - 重試機制(最多 MAX_RETRIES 次) |
| - Timeout 處理 |
| """ |
| token = _get_github_token() |
|
|
| headers = { |
| "Accept": "application/vnd.github.v3+json", |
| } |
| if token: |
| headers["Authorization"] = f"token {token}" |
|
|
| |
| params = { |
| "q": f"{cve_id} exploit", |
| "sort": "stars", |
| "order": "desc", |
| "per_page": RESULTS_PER_PAGE, |
| } |
|
|
| for attempt in range(1, MAX_RETRIES + 1): |
| _rate_limit() |
| try: |
| logger.info("[QUERY] GitHub API search: %s (attempt %d)", cve_id, attempt) |
| response = requests.get( |
| GITHUB_API_BASE, |
| params=params, |
| headers=headers, |
| timeout=REQUEST_TIMEOUT, |
| ) |
|
|
| if response.status_code == 200: |
| return response.json() |
|
|
| if response.status_code in (403, 429): |
| |
| retry_after = response.headers.get("Retry-After", "60") |
| logger.warning( |
| "[WARN] GitHub API %d (rate limited), Retry-After: %ss", |
| response.status_code, retry_after |
| ) |
| time.sleep(min(int(retry_after), 30)) |
| continue |
|
|
| if response.status_code >= 500: |
| logger.warning("[WARN] GitHub API %d (server error)", response.status_code) |
| time.sleep(2) |
| continue |
|
|
| |
| logger.warning("[WARN] GitHub API returned %d: %s", response.status_code, response.text[:200]) |
| return None |
|
|
| except requests.exceptions.Timeout: |
| logger.warning("[WARN] GitHub API timeout (%ds)", REQUEST_TIMEOUT) |
| continue |
| except requests.exceptions.ConnectionError: |
| logger.warning("[WARN] GitHub API connection failed (network issue)") |
| continue |
| except requests.exceptions.RequestException as e: |
| logger.warning("[WARN] GitHub API request error: %s", e) |
| return None |
|
|
| return None |
|
|
|
|
| def _parse_github_response(raw: dict, cve_id: str) -> dict: |
| """ |
| 將 GitHub Search API 原始 response 轉換為 Tool 輸出格式。 |
| |
| 轉換 mapping: |
| response.items[].full_name → repo_name |
| response.items[].html_url → url |
| response.items[].stargazers_count → stars |
| response.items[].language → language |
| response.items[].updated_at → last_updated |
| response.items[].description → description |
| """ |
| exploits = [] |
| items = raw.get("items", []) |
|
|
| for repo in items: |
| repo_name = repo.get("full_name", "") |
| description = repo.get("description", "") or "" |
| exploit_type = _classify_exploit_type(repo_name, description) |
|
|
| |
| updated_at = repo.get("updated_at", "") |
| if "T" in updated_at: |
| updated_at = updated_at.split("T")[0] |
|
|
| exploits.append({ |
| "repo_name": repo_name, |
| "url": repo.get("html_url", ""), |
| "stars": repo.get("stargazers_count", 0), |
| "language": repo.get("language", "") or "Unknown", |
| "last_updated": updated_at, |
| "description": description[:300], |
| "type": exploit_type, |
| }) |
|
|
| exploit_count = len(exploits) |
| risk_indicator = _determine_risk_indicator(exploit_count, api_available=True) |
|
|
| return { |
| "cve_id": cve_id, |
| "source": "GitHub API", |
| "exploit_count": exploit_count, |
| "exploits": exploits, |
| "risk_indicator": risk_indicator, |
| } |
|
|
|
|
| def _search_exploits_impl(cve_id: str) -> str: |
| """ |
| search_exploits 的核心實作(與 CrewAI @tool 解耦,方便單元測試)。 |
| |
| 降級瀑布: |
| 1. 查 GitHub Search API → 成功則快取 |
| 2. API 失敗(403/429 rate limit)→ 讀離線快取 |
| 3. 快取也沒有 → 回傳 exploit_count: 0, risk_indicator: "UNKNOWN" |
| 4. 任何未預期錯誤 → 回傳安全的預設結果(絕不 crash) |
| """ |
| try: |
| |
| cve_id = cve_id.strip().upper() |
| if not cve_id: |
| logger.warning("[WARN] Exploit Tool received empty CVE ID input") |
| return json.dumps({ |
| "cve_id": "", |
| "source": "GitHub API (error)", |
| "exploit_count": 0, |
| "exploits": [], |
| "risk_indicator": "UNKNOWN", |
| "error": "No CVE ID provided", |
| }, ensure_ascii=False, indent=2) |
|
|
| logger.info("[QUERY] Exploit search: %s", cve_id) |
|
|
| |
| raw = _query_github_api(cve_id) |
|
|
| if raw is not None: |
| result = _parse_github_response(raw, cve_id) |
|
|
| |
| _write_cache(cve_id, result) |
|
|
| logger.info( |
| "[OK] Exploit search success: %s -> %d exploit(s), risk_indicator=%s", |
| cve_id, result['exploit_count'], result['risk_indicator'] |
| ) |
| return json.dumps(result, ensure_ascii=False, indent=2) |
|
|
| |
| cached = _read_cache(cve_id) |
| if cached: |
| cached.pop("_cached_at", None) |
| cached["source"] = "GitHub API (cache)" |
| cached["error"] = f"GitHub API unavailable, using cached data for '{cve_id}'" |
| logger.info("[OK] Exploit using cache: %s", cve_id) |
| return json.dumps(cached, ensure_ascii=False, indent=2) |
|
|
| |
| empty_result = { |
| "cve_id": cve_id, |
| "source": "GitHub API (unavailable)", |
| "exploit_count": 0, |
| "exploits": [], |
| "risk_indicator": "UNKNOWN (API limited)", |
| "error": f"GitHub API unavailable and no cache for '{cve_id}'", |
| } |
| logger.info("[INFO] Exploit no data for: %s", cve_id) |
| return json.dumps(empty_result, ensure_ascii=False, indent=2) |
|
|
| except Exception as e: |
| |
| logger.error("[FAIL] Exploit Tool unexpected error: %s", e, exc_info=True) |
| error_result = { |
| "cve_id": cve_id if 'cve_id' in dir() else "", |
| "source": "GitHub API (error)", |
| "exploit_count": 0, |
| "exploits": [], |
| "risk_indicator": "UNKNOWN", |
| "error": f"Unexpected error: {str(e)}", |
| } |
| return json.dumps(error_result, ensure_ascii=False, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| def _create_tool(): |
| """延遲建立 CrewAI Tool,僅在 Agent 實際使用時才 import""" |
| from crewai.tools import tool |
|
|
| @tool("search_exploits") |
| def search_exploits(cve_id: str) -> str: |
| """搜尋指定 CVE 的公開 Exploit 和 PoC 程式碼(透過 GitHub API)。 |
| 輸入單一 CVE ID(如 "CVE-2021-44228"),回傳公開 exploit 的數量、連結、星數等資訊。 |
| 有公開 exploit = 攻擊門檻極低 = 風險指標 HIGH。""" |
| return _search_exploits_impl(cve_id) |
|
|
| return search_exploits |
|
|
|
|
| |
|
|
| class _LazyToolLoader: |
| def __init__(self): |
| self._tool = None |
|
|
| def _load(self): |
| if self._tool is None: |
| self._tool = _create_tool() |
|
|
| @property |
| def search_exploits(self): |
| self._load() |
| return self._tool |
|
|
|
|
| _loader = _LazyToolLoader() |
|
|
|
|
| def __getattr__(name): |
| """模組層級 __getattr__,支援 from tools.exploit_tool import search_exploits""" |
| if name == "search_exploits": |
| return _loader.search_exploits |
| raise AttributeError(f"module 'tools.exploit_tool' has no attribute {name!r}") |
|
|