| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
| import logging |
| import os |
| import time |
| from datetime import datetime, timezone |
|
|
| import requests |
|
|
| logger = logging.getLogger("ThreatHunter.ghsa") |
|
|
| |
| |
| |
|
|
| GHSA_REST_API = "https://api.github.com/advisories" |
| REQUEST_TIMEOUT = 15 |
|
|
| |
| CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data") |
| GHSA_CACHE_PATH = os.path.join(CACHE_DIR, "ghsa_cache.json") |
|
|
| |
| CACHE_TTL_HOURS = 12 |
|
|
| |
| SUPPORTED_ECOSYSTEMS = { |
| "python": "pip", |
| "pip": "pip", |
| "npm": "npm", |
| "node": "npm", |
| "javascript": "npm", |
| "go": "go", |
| "golang": "go", |
| "java": "maven", |
| "maven": "maven", |
| "ruby": "rubygems", |
| "rubygems": "rubygems", |
| "rust": "crates.io", |
| "cargo": "crates.io", |
| "php": "composer", |
| "composer": "composer", |
| "nuget": "nuget", |
| "dotnet": "nuget", |
| } |
|
|
| |
| SEVERITY_SCORE = { |
| "CRITICAL": 1.0, |
| "HIGH": 0.75, |
| "MODERATE": 0.5, |
| "LOW": 0.25, |
| "UNKNOWN": 0.0, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _read_ghsa_cache() -> dict: |
| """่ฎๅ GHSA ๅฟซๅ๏ผๆ ผๅผ๏ผ{"pkg:ecosystem": {hits, severity, cves, _cached_at}}๏ผ""" |
| try: |
| if not os.path.exists(GHSA_CACHE_PATH): |
| return {} |
| with open(GHSA_CACHE_PATH, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except (json.JSONDecodeError, IOError) as e: |
| logger.warning("[WARN] GHSA cache read failed: %s", e) |
| return {} |
|
|
|
|
| def _write_ghsa_cache(cache: dict) -> None: |
| """ๅฏซๅ
ฅ GHSA ๅฟซๅ""" |
| try: |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| with open(GHSA_CACHE_PATH, "w", encoding="utf-8") as f: |
| json.dump(cache, f, ensure_ascii=False, indent=2) |
| except (IOError, PermissionError) as e: |
| logger.warning("[WARN] GHSA cache write failed: %s", e) |
|
|
|
|
| def _is_cache_fresh(cached_entry: dict) -> bool: |
| """ๆชขๆฅๅฟซๅๆฏๅฆๅจ TTL ๅ
ง๏ผ12 ๅฐๆ๏ผ""" |
| cached_at = cached_entry.get("_cached_at", 0) |
| elapsed_hours = (time.time() - cached_at) / 3600 |
| return elapsed_hours < CACHE_TTL_HOURS |
|
|
|
|
| def _normalize_ecosystem(ecosystem: str) -> str: |
| """ๆญฃ่ฆๅ็ๆ
็ณปๅ็จฑ๏ผuser input โ GitHub API ๆ ผๅผ๏ผ""" |
| return SUPPORTED_ECOSYSTEMS.get(ecosystem.lower(), ecosystem.lower()) |
|
|
|
|
| |
| |
| |
|
|
| def _fetch_ghsa_rest(package_name: str, ecosystem: str, github_token: str = "") -> dict: |
| """ |
| ไฝฟ็จ GitHub Advisory Database REST API ๆฅ่ฉขๅฅไปถ็ๅฎๅ
จๅ่ญฆใ |
| |
| REST API๏ผ็ก้ Token๏ผไฝๆ้กๅบฆ้ๅถ๏ผ๏ผ |
| GET https://api.github.com/advisories?affects={pkg}&ecosystem={eco}&per_page=10 |
| |
| ๅๅณๆ ผๅผ๏ผๆ่ฆ๏ผไพ Intel Fusion ไฝฟ็จ๏ผ๏ผ |
| { |
| "hits": 3, # GHSA ๅ่ญฆๆธ้ |
| "max_severity": "HIGH", # ๆ้ซๅด้ๆง |
| "severity_score": 0.75, # ๆธๅผๅๅด้ๆง๏ผไพๅ ๆฌ่จ็ฎ๏ผ |
| "cve_ids": ["CVE-..."], # ็ธ้ CVE๏ผGHSA ๅทฒ้่ฏ็๏ผ |
| "ghsa_ids": ["GHSA-..."], # GHSA ID |
| "published_since": "2024-...", # ๆ่ฟๅ่ญฆ็ผๅธๆฅๆ |
| "source": "GHSA REST API", |
| } |
| """ |
| headers = { |
| "Accept": "application/vnd.github+json", |
| "X-GitHub-Api-Version": "2022-11-28", |
| } |
| if github_token: |
| headers["Authorization"] = f"Bearer {github_token}" |
|
|
| |
| ecosystem_normalized = _normalize_ecosystem(ecosystem) |
| try: |
| logger.info("[QUERY] GHSA REST: %s (%s)", package_name, ecosystem_normalized) |
| resp = requests.get( |
| GHSA_REST_API, |
| params={ |
| "affects": package_name, |
| "ecosystem": ecosystem_normalized.upper(), |
| "per_page": 10, |
| "sort": "published", |
| "direction": "desc", |
| }, |
| headers=headers, |
| timeout=REQUEST_TIMEOUT, |
| ) |
|
|
| if resp.status_code == 200: |
| advisories = resp.json() |
| return _parse_ghsa_advisories(advisories, package_name, ecosystem_normalized) |
| elif resp.status_code == 403: |
| |
| reset_ts = resp.headers.get("X-RateLimit-Reset", "") |
| logger.warning("[WARN] GHSA API rate limited (403), reset at %s", reset_ts) |
| elif resp.status_code == 422: |
| |
| logger.info("[INFO] GHSA API: ecosystem %s not recognized, retrying without", ecosystem_normalized) |
| resp2 = requests.get( |
| GHSA_REST_API, |
| params={"affects": package_name, "per_page": 10}, |
| headers=headers, |
| timeout=REQUEST_TIMEOUT, |
| ) |
| if resp2.status_code == 200: |
| return _parse_ghsa_advisories(resp2.json(), package_name, "unknown") |
| else: |
| logger.warning("[WARN] GHSA API returned %d for %s", resp.status_code, package_name) |
|
|
| except requests.exceptions.Timeout: |
| logger.warning("[WARN] GHSA API timeout for %s", package_name) |
| except requests.exceptions.ConnectionError: |
| logger.warning("[WARN] GHSA API connection failed (offline?)") |
| except (json.JSONDecodeError, ValueError) as e: |
| logger.warning("[WARN] GHSA API returned non-JSON for %s: %s", package_name, e) |
|
|
| |
| return { |
| "hits": 0, |
| "max_severity": "UNKNOWN", |
| "severity_score": 0.0, |
| "cve_ids": [], |
| "ghsa_ids": [], |
| "published_since": "", |
| "_source": "GHSA REST API (failed)", |
| } |
|
|
|
|
| def _parse_ghsa_advisories(advisories: list, package_name: str, ecosystem: str) -> dict: |
| """ |
| ่งฃๆ GitHub Advisory ๅ่กจ๏ผๆๅ Intel Fusion ้่ฆ็ๆ ธๅฟๆฌไฝใ |
| |
| GitHub Advisory API ๅๅณๆ ผๅผ๏ผๆฏๅ advisory๏ผ๏ผ |
| { |
| "ghsa_id": "GHSA-xxxx-xxxx-xxxx", |
| "severity": "HIGH", |
| "cve_id": "CVE-2024-XXXX", # ๅฏ่ฝ็บ null |
| "published_at": "2024-04-01T...", |
| "summary": "...", |
| ... |
| } |
| """ |
| if not advisories: |
| logger.info("[INFO] GHSA: no advisories found for %s", package_name) |
| return { |
| "hits": 0, |
| "max_severity": "UNKNOWN", |
| "severity_score": 0.0, |
| "cve_ids": [], |
| "ghsa_ids": [], |
| "published_since": "", |
| "_source": f"GHSA REST API (no results for {package_name})", |
| } |
|
|
| |
| cve_ids = [] |
| ghsa_ids = [] |
| severities = [] |
| published_dates = [] |
|
|
| for advisory in advisories: |
| ghsa_id = advisory.get("ghsa_id", "") |
| if ghsa_id: |
| ghsa_ids.append(ghsa_id) |
|
|
| cve_id = advisory.get("cve_id", "") |
| if cve_id and cve_id.startswith("CVE-"): |
| cve_ids.append(cve_id) |
|
|
| severity = (advisory.get("severity") or "UNKNOWN").upper() |
| severities.append(severity) |
|
|
| pub_date = advisory.get("published_at", "") |
| if pub_date: |
| published_dates.append(pub_date) |
|
|
| |
| max_severity = "UNKNOWN" |
| max_score = 0.0 |
| for sev in severities: |
| score = SEVERITY_SCORE.get(sev, 0.0) |
| if score > max_score: |
| max_score = score |
| max_severity = sev |
|
|
| |
| published_since = max(published_dates) if published_dates else "" |
|
|
| hits = len(advisories) |
| logger.info( |
| "[OK] GHSA: %s (%s) โ %d hits, max_severity=%s, CVEs=%s", |
| package_name, ecosystem, hits, max_severity, cve_ids[:3], |
| ) |
|
|
| return { |
| "hits": hits, |
| "max_severity": max_severity, |
| "severity_score": round(max_score, 4), |
| "cve_ids": cve_ids[:10], |
| "ghsa_ids": ghsa_ids[:10], |
| "published_since": published_since, |
| "_source": "GHSA REST API (online)", |
| } |
|
|
|
|
| def _query_ghsa_impl(package_query: str) -> str: |
| """ |
| query_ghsa ็ๆ ธๅฟๅฏฆไฝ๏ผ่ CrewAI @tool ่งฃ่ฆ๏ผๆนไพฟๅฎๅ
ๆธฌ่ฉฆ๏ผใ |
| |
| ๆฏๆดๅ
ฉ็จฎ่ผธๅ
ฅๆ ผๅผ๏ผ |
| - "django" โ ๆฅๆๆ็ๆ
็ณป๏ผ้ ่จญ pip๏ผ |
| - "django:python" โ ๆๅฎ็ๆ
็ณป |
| - "lodash:npm" โ npm ็ๆ
็ณป |
| |
| ้็ด็ญ็ฅ๏ผ |
| 1. ่ฎๅๅฟซๅ๏ผTTL 12 ๅฐๆ๏ผ |
| 2. ๅฟซๅๆชๅฝไธญ โ ็ทไธๆฅ่ฉข GHSA REST API |
| 3. ็ทไธๅคฑๆ โ ๅๅณๅฟซๅ๏ผ้ๆ็๏ผ |
| 4. ๅฟซๅไนๆฒๆ โ ๅๅณ hits=0๏ผไธ crash๏ผ |
| |
| Args: |
| package_query: ๅฅไปถๅ๏ผๅฏๅซๅ่ๅ้็็ๆ
็ณป๏ผ |
| |
| Returns: |
| JSON ๅญไธฒ๏ผๆ ผๅผ็ฌฆๅ Intel Fusion Agent ่ผธๅ
ฅ |
| """ |
| try: |
| |
| if ":" in package_query: |
| parts = package_query.split(":", 1) |
| package_name = parts[0].strip().lower() |
| ecosystem = parts[1].strip().lower() |
| else: |
| package_name = package_query.strip().lower() |
| |
| if package_name.endswith(".py") or package_name.startswith("py"): |
| ecosystem = "pip" |
| elif package_name.startswith("@") or "node" in package_name: |
| ecosystem = "npm" |
| else: |
| ecosystem = "pip" |
|
|
| if not package_name: |
| return json.dumps({"error": "Empty package name", "hits": 0}, ensure_ascii=False, indent=2) |
|
|
| cache_key = f"{package_name}:{ecosystem}" |
| logger.info("[QUERY] GHSA check: %s (ecosystem=%s)", package_name, ecosystem) |
|
|
| |
| cache = _read_ghsa_cache() |
| if cache_key in cache and _is_cache_fresh(cache[cache_key]): |
| cached = cache[cache_key] |
| logger.info("[CACHE] GHSA cache hit: %s โ hits=%d", cache_key, cached.get("hits", 0)) |
| return json.dumps({ |
| "package": package_name, |
| "ecosystem": ecosystem, |
| "query_time": datetime.now(timezone.utc).isoformat(), |
| **{k: v for k, v in cached.items() if not k.startswith("_")}, |
| "source": cached.get("_source", "GHSA (cache)"), |
| }, ensure_ascii=False, indent=2) |
|
|
| |
| github_token = os.getenv("GITHUB_TOKEN", "") |
| result = _fetch_ghsa_rest(package_name, ecosystem, github_token) |
|
|
| |
| cache[cache_key] = {**result, "_cached_at": time.time()} |
| _write_ghsa_cache(cache) |
|
|
| logger.info( |
| "[OK] GHSA: %s โ hits=%d, severity=%s, cves=%s", |
| package_name, result["hits"], result["max_severity"], result["cve_ids"][:3], |
| ) |
|
|
| return json.dumps({ |
| "package": package_name, |
| "ecosystem": ecosystem, |
| "query_time": datetime.now(timezone.utc).isoformat(), |
| "hits": result["hits"], |
| "max_severity": result["max_severity"], |
| "severity_score": result["severity_score"], |
| "cve_ids": result["cve_ids"], |
| "ghsa_ids": result["ghsa_ids"], |
| "published_since": result["published_since"], |
| "source": result.get("_source", "GHSA REST API"), |
| }, ensure_ascii=False, indent=2) |
|
|
| except Exception as e: |
| logger.error("[FAIL] GHSA Tool unexpected error for %s: %s", package_query, e, exc_info=True) |
| return json.dumps({ |
| "package": package_query, |
| "hits": 0, |
| "max_severity": "UNKNOWN", |
| "severity_score": 0.0, |
| "cve_ids": [], |
| "ghsa_ids": [], |
| "error": f"Unexpected error: {str(e)[:200]}", |
| "source": "GHSA (error)", |
| }, ensure_ascii=False, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| def _create_tool(): |
| """ๅปถ้ฒๅปบ็ซ CrewAI Tool""" |
| from crewai.tools import tool |
|
|
| @tool("search_ghsa") |
| def query_ghsa(package_query: str) -> str: |
| """ๆฅ่ฉข GitHub Advisory Database๏ผGHSA๏ผๅๅพๅฅไปถ็ๅฎๅ
จๅ่ญฆใ |
| ่ผธๅ
ฅๆ ผๅผ๏ผๅฅไปถๅ๏ผๅฏๅ :ecosystem ๆๅฎ็ๆ
็ณป๏ผๅฆ "django:python"ใ"lodash:npm"๏ผใ |
| ่ฅไธๆๅฎ็ๆ
็ณป๏ผ้ ่จญ็บ Python/pipใ |
| ๅๅณ๏ผGHSA ๅ่ญฆๅฝไธญๆธใๆ้ซๅด้ๆงใ้่ฏ CVE IDใGHSA IDใ |
| GHSA ๅชๅข๏ผๆฏ NVD ๆฉ 2-4 ้ฑ็ผๅบๅ่ญฆ๏ผ็นๅฅ้ฉๅ 2024 ๅนด NVD ็ฉๅฃๆ้็่ฃๅ
ๆฅ่ฉขใ |
| ๆฏๆด็ๆ
็ณป๏ผpip๏ผPython๏ผใnpm๏ผNode.js๏ผใgoใmaven๏ผJava๏ผใrubygems๏ผRuby๏ผใcrates.io๏ผRust๏ผใ |
| ๆณจๆ๏ผ็ก GITHUB_TOKEN ๆไฝฟ็จๆช่ช่ญ้กๅบฆ๏ผๆฏๅฐๆ 60 ่ซๆฑ๏ผ๏ผ่จญๅฎ GITHUB_TOKEN ๅฏๆๅ่ณ 5,000 ่ซๆฑใ""" |
| return _query_ghsa_impl(package_query) |
|
|
| return query_ghsa |
|
|
|
|
| |
|
|
| class _LazyToolLoader: |
| def __init__(self): |
| self._tool = None |
|
|
| def _load(self): |
| if self._tool is None: |
| self._tool = _create_tool() |
|
|
| @property |
| def query_ghsa(self): |
| self._load() |
| return self._tool |
|
|
|
|
| _loader = _LazyToolLoader() |
|
|
|
|
| def __getattr__(name): |
| """ๆจก็ตๅฑค็ด __getattr__๏ผๆฏๆด from tools.ghsa_tool import query_ghsa""" |
| if name == "query_ghsa": |
| return _loader.query_ghsa |
| raise AttributeError(f"module 'tools.ghsa_tool' has no attribute {name!r}") |
|
|