Threat_Hunter / tools /ghsa_tool.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
# tools/ghsa_tool.py
# ๅŠŸ่ƒฝ๏ผšGitHub Security Advisory Database๏ผˆGHSA๏ผ‰ๆŸฅ่ฉข Tool
# ่ณ‡ๆ–™ไพ†ๆบ๏ผšGitHub Advisory Database REST API๏ผˆๅ…ฌ้–‹๏ผŒ็„ก้œ€ Token ๅฏ็”จๅŸบๆœฌ้กๅบฆ๏ผ‰
# Harness ๆ”ฏๆŸฑ๏ผšGraceful Degradation๏ผˆREST ้™็ดš + ้›ข็ทšๅฟซๅ–๏ผ‰+ Observability
#
# ไฝฟ็”จๆ–นๅผ๏ผš
# from tools.ghsa_tool import query_ghsa
#
# ๆžถๆง‹ๅฎšไฝ๏ผš
# Intel Fusion Agent ็š„็ฌฌๅ››็ถญๆƒ…ๅ ฑ โ€” ่ฃœๅ…… NVD ็š„็”Ÿๆ…‹็ณป่ณ‡่จŠ๏ผˆ็‰นๅˆฅๆ˜ฏ 2024 ๅนด NVD ็ฉๅฃ“ๆœŸ้–“๏ผ‰
# GHSA ๅ„ชๅ‹ข๏ผšๆฏ” NVD ๆ—ฉ 2-4 ้€ฑๆ”ถๅˆฐๅฅ—ไปถ็”Ÿๆ…‹็ณปๅ‘Š่ญฆ
#
# ๅ…ญ็ถญๆƒ…ๅ ฑ่žๅˆไธญ็š„ไฝ็ฝฎ๏ผš
# NVD(CVSS) EPSS KEV GHSA ATT&CK OTX
# 0.20 0.30 0.25 [0.10] 0.10 0.05
#
# API ็ซฏ้ปž๏ผˆ็„ก้œ€ Token ็š„ REST API๏ผ‰๏ผš
# GET https://api.github.com/advisories?affects={package}&ecosystem={ecosystem}
# ้™ๅˆถ๏ผšๆฏๅฐๆ™‚ 60 ่ซ‹ๆฑ‚๏ผˆๆœช่ช่ญ‰๏ผ‰/ 5,000 ่ซ‹ๆฑ‚๏ผˆๆœ‰ GITHUB_TOKEN๏ผ‰
import json
import logging
import os
import time
from datetime import datetime, timezone
import requests
logger = logging.getLogger("ThreatHunter.ghsa")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅธธๆ•ธ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
GHSA_REST_API = "https://api.github.com/advisories"
REQUEST_TIMEOUT = 15 # ็ง’
# ๅฟซๅ–่ทฏๅพ‘๏ผˆ่ˆ‡ epss/kev ๅŒๅฑค๏ผ‰
CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data")
GHSA_CACHE_PATH = os.path.join(CACHE_DIR, "ghsa_cache.json")
# ๅฟซๅ– TTL๏ผš12 ๅฐๆ™‚๏ผˆGHSA ๆ›ดๆ–ฐ้ ป็އ้ซ˜ๆ–ผ NVD๏ผ‰
CACHE_TTL_HOURS = 12
# ๆ”ฏๆด็š„็”Ÿๆ…‹็ณป๏ผˆGitHub Advisory Database ็š„ ecosystem ๅ€ผ๏ผ‰
SUPPORTED_ECOSYSTEMS = {
"python": "pip",
"pip": "pip",
"npm": "npm",
"node": "npm",
"javascript": "npm",
"go": "go",
"golang": "go",
"java": "maven",
"maven": "maven",
"ruby": "rubygems",
"rubygems": "rubygems",
"rust": "crates.io",
"cargo": "crates.io",
"php": "composer",
"composer": "composer",
"nuget": "nuget",
"dotnet": "nuget",
}
# GHSA ๅšด้‡ๆ€ง โ†’ ๆ•ธๅ€ผๅฐๆ‡‰๏ผˆ็”จๆ–ผ่จˆๅˆ†๏ผ‰
SEVERITY_SCORE = {
"CRITICAL": 1.0,
"HIGH": 0.75,
"MODERATE": 0.5,
"LOW": 0.25,
"UNKNOWN": 0.0,
}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅฟซๅ–็ฎก็†
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _read_ghsa_cache() -> dict:
"""่ฎ€ๅ– GHSA ๅฟซๅ–๏ผˆๆ ผๅผ๏ผš{"pkg:ecosystem": {hits, severity, cves, _cached_at}}๏ผ‰"""
try:
if not os.path.exists(GHSA_CACHE_PATH):
return {}
with open(GHSA_CACHE_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning("[WARN] GHSA cache read failed: %s", e)
return {}
def _write_ghsa_cache(cache: dict) -> None:
"""ๅฏซๅ…ฅ GHSA ๅฟซๅ–"""
try:
os.makedirs(CACHE_DIR, exist_ok=True)
with open(GHSA_CACHE_PATH, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
except (IOError, PermissionError) as e:
logger.warning("[WARN] GHSA cache write failed: %s", e)
def _is_cache_fresh(cached_entry: dict) -> bool:
"""ๆชขๆŸฅๅฟซๅ–ๆ˜ฏๅฆๅœจ TTL ๅ…ง๏ผˆ12 ๅฐๆ™‚๏ผ‰"""
cached_at = cached_entry.get("_cached_at", 0)
elapsed_hours = (time.time() - cached_at) / 3600
return elapsed_hours < CACHE_TTL_HOURS
def _normalize_ecosystem(ecosystem: str) -> str:
"""ๆญฃ่ฆๅŒ–็”Ÿๆ…‹็ณปๅ็จฑ๏ผˆuser input โ†’ GitHub API ๆ ผๅผ๏ผ‰"""
return SUPPORTED_ECOSYSTEMS.get(ecosystem.lower(), ecosystem.lower())
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆ ธๅฟƒๆŸฅ่ฉข้‚่ผฏ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _fetch_ghsa_rest(package_name: str, ecosystem: str, github_token: str = "") -> dict:
"""
ไฝฟ็”จ GitHub Advisory Database REST API ๆŸฅ่ฉขๅฅ—ไปถ็š„ๅฎ‰ๅ…จๅ‘Š่ญฆใ€‚
REST API๏ผˆ็„ก้œ€ Token๏ผŒไฝ†ๆœ‰้กๅบฆ้™ๅˆถ๏ผ‰๏ผš
GET https://api.github.com/advisories?affects={pkg}&ecosystem={eco}&per_page=10
ๅ›žๅ‚ณๆ ผๅผ๏ผˆๆ‘˜่ฆ๏ผŒไพ› Intel Fusion ไฝฟ็”จ๏ผ‰๏ผš
{
"hits": 3, # GHSA ๅ‘Š่ญฆๆ•ธ้‡
"max_severity": "HIGH", # ๆœ€้ซ˜ๅšด้‡ๆ€ง
"severity_score": 0.75, # ๆ•ธๅ€ผๅŒ–ๅšด้‡ๆ€ง๏ผˆไพ›ๅŠ ๆฌŠ่จˆ็ฎ—๏ผ‰
"cve_ids": ["CVE-..."], # ็›ธ้—œ CVE๏ผˆGHSA ๅทฒ้—œ่ฏ็š„๏ผ‰
"ghsa_ids": ["GHSA-..."], # GHSA ID
"published_since": "2024-...", # ๆœ€่ฟ‘ๅ‘Š่ญฆ็™ผๅธƒๆ—ฅๆœŸ
"source": "GHSA REST API",
}
"""
headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
# ๅ˜—่ฉฆๅธถ ecosystem ๆŸฅ่ฉข๏ผˆๆœ€็ฒพ็ขบ๏ผ‰
ecosystem_normalized = _normalize_ecosystem(ecosystem)
try:
logger.info("[QUERY] GHSA REST: %s (%s)", package_name, ecosystem_normalized)
resp = requests.get(
GHSA_REST_API,
params={
"affects": package_name,
"ecosystem": ecosystem_normalized.upper(), # GitHub API ่ฆๅคงๅฏซ
"per_page": 10,
"sort": "published",
"direction": "desc",
},
headers=headers,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code == 200:
advisories = resp.json()
return _parse_ghsa_advisories(advisories, package_name, ecosystem_normalized)
elif resp.status_code == 403:
# Rate limit๏ผˆๆœช่ช่ญ‰็š„ token ๆฏๅฐๆ™‚ 60 ๆฌก๏ผ‰
reset_ts = resp.headers.get("X-RateLimit-Reset", "")
logger.warning("[WARN] GHSA API rate limited (403), reset at %s", reset_ts)
elif resp.status_code == 422:
# ไธๆ”ฏๆด็š„ ecosystem โ†’ ไธๅธถ ecosystem ้‡่ฉฆ
logger.info("[INFO] GHSA API: ecosystem %s not recognized, retrying without", ecosystem_normalized)
resp2 = requests.get(
GHSA_REST_API,
params={"affects": package_name, "per_page": 10},
headers=headers,
timeout=REQUEST_TIMEOUT,
)
if resp2.status_code == 200:
return _parse_ghsa_advisories(resp2.json(), package_name, "unknown")
else:
logger.warning("[WARN] GHSA API returned %d for %s", resp.status_code, package_name)
except requests.exceptions.Timeout:
logger.warning("[WARN] GHSA API timeout for %s", package_name)
except requests.exceptions.ConnectionError:
logger.warning("[WARN] GHSA API connection failed (offline?)")
except (json.JSONDecodeError, ValueError) as e:
logger.warning("[WARN] GHSA API returned non-JSON for %s: %s", package_name, e)
# ๆŸฅ่ฉขๅคฑๆ•— โ†’ ๅ›žๅ‚ณ็ฉบ็ตๆžœ๏ผˆ้ž None๏ผŒ่ฎ“ Agent ็Ÿฅ้“ GHSA ๆœชๅ‘ฝไธญ่€Œ้ž้Œฏ่ชค๏ผ‰
return {
"hits": 0,
"max_severity": "UNKNOWN",
"severity_score": 0.0,
"cve_ids": [],
"ghsa_ids": [],
"published_since": "",
"_source": "GHSA REST API (failed)",
}
def _parse_ghsa_advisories(advisories: list, package_name: str, ecosystem: str) -> dict:
"""
่งฃๆž GitHub Advisory ๅˆ—่กจ๏ผŒๆๅ– Intel Fusion ้œ€่ฆ็š„ๆ ธๅฟƒๆฌ„ไฝใ€‚
GitHub Advisory API ๅ›žๅ‚ณๆ ผๅผ๏ผˆๆฏๅ€‹ advisory๏ผ‰๏ผš
{
"ghsa_id": "GHSA-xxxx-xxxx-xxxx",
"severity": "HIGH",
"cve_id": "CVE-2024-XXXX", # ๅฏ่ƒฝ็‚บ null
"published_at": "2024-04-01T...",
"summary": "...",
...
}
"""
if not advisories:
logger.info("[INFO] GHSA: no advisories found for %s", package_name)
return {
"hits": 0,
"max_severity": "UNKNOWN",
"severity_score": 0.0,
"cve_ids": [],
"ghsa_ids": [],
"published_since": "",
"_source": f"GHSA REST API (no results for {package_name})",
}
# ๆๅ– CVE ID ๅ’Œ GHSA ID
cve_ids = []
ghsa_ids = []
severities = []
published_dates = []
for advisory in advisories:
ghsa_id = advisory.get("ghsa_id", "")
if ghsa_id:
ghsa_ids.append(ghsa_id)
cve_id = advisory.get("cve_id", "")
if cve_id and cve_id.startswith("CVE-"):
cve_ids.append(cve_id)
severity = (advisory.get("severity") or "UNKNOWN").upper()
severities.append(severity)
pub_date = advisory.get("published_at", "")
if pub_date:
published_dates.append(pub_date)
# ่จˆ็ฎ—ๆœ€้ซ˜ๅšด้‡ๆ€ง
max_severity = "UNKNOWN"
max_score = 0.0
for sev in severities:
score = SEVERITY_SCORE.get(sev, 0.0)
if score > max_score:
max_score = score
max_severity = sev
# ๆœ€่ฟ‘ๅ‘Š่ญฆๆ—ฅๆœŸ
published_since = max(published_dates) if published_dates else ""
hits = len(advisories)
logger.info(
"[OK] GHSA: %s (%s) โ†’ %d hits, max_severity=%s, CVEs=%s",
package_name, ecosystem, hits, max_severity, cve_ids[:3],
)
return {
"hits": hits,
"max_severity": max_severity,
"severity_score": round(max_score, 4),
"cve_ids": cve_ids[:10], # ๆœ€ๅคš 10 ๅ€‹้—œ่ฏ CVE
"ghsa_ids": ghsa_ids[:10], # ๆœ€ๅคš 10 ๅ€‹ GHSA ID
"published_since": published_since,
"_source": "GHSA REST API (online)",
}
def _query_ghsa_impl(package_query: str) -> str:
"""
query_ghsa ็š„ๆ ธๅฟƒๅฏฆไฝœ๏ผˆ่ˆ‡ CrewAI @tool ่งฃ่€ฆ๏ผŒๆ–นไพฟๅ–ฎๅ…ƒๆธฌ่ฉฆ๏ผ‰ใ€‚
ๆ”ฏๆดๅ…ฉ็จฎ่ผธๅ…ฅๆ ผๅผ๏ผš
- "django" โ†’ ๆŸฅๆ‰€ๆœ‰็”Ÿๆ…‹็ณป๏ผˆ้ ่จญ pip๏ผ‰
- "django:python" โ†’ ๆŒ‡ๅฎš็”Ÿๆ…‹็ณป
- "lodash:npm" โ†’ npm ็”Ÿๆ…‹็ณป
้™็ดš็ญ–็•ฅ๏ผš
1. ่ฎ€ๅ–ๅฟซๅ–๏ผˆTTL 12 ๅฐๆ™‚๏ผ‰
2. ๅฟซๅ–ๆœชๅ‘ฝไธญ โ†’ ็ทšไธŠๆŸฅ่ฉข GHSA REST API
3. ็ทšไธŠๅคฑๆ•— โ†’ ๅ›žๅ‚ณๅฟซๅ–๏ผˆ้ŽๆœŸ็š„๏ผ‰
4. ๅฟซๅ–ไนŸๆฒ’ๆœ‰ โ†’ ๅ›žๅ‚ณ hits=0๏ผˆไธ crash๏ผ‰
Args:
package_query: ๅฅ—ไปถๅ๏ผˆๅฏๅซๅ†’่™Ÿๅˆ†้š”็š„็”Ÿๆ…‹็ณป๏ผ‰
Returns:
JSON ๅญ—ไธฒ๏ผŒๆ ผๅผ็ฌฆๅˆ Intel Fusion Agent ่ผธๅ…ฅ
"""
try:
# โ”€โ”€ Step 1๏ผš่งฃๆž่ผธๅ…ฅๆ ผๅผ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if ":" in package_query:
parts = package_query.split(":", 1)
package_name = parts[0].strip().lower()
ecosystem = parts[1].strip().lower()
else:
package_name = package_query.strip().lower()
# ๅพžๅฅ—ไปถๅๆŽจๆ–ท็”Ÿๆ…‹็ณป๏ผˆๅธธ่ฆ‹่ฆๅ‰‡๏ผ‰
if package_name.endswith(".py") or package_name.startswith("py"):
ecosystem = "pip"
elif package_name.startswith("@") or "node" in package_name:
ecosystem = "npm"
else:
ecosystem = "pip" # ้ ่จญ Python
if not package_name:
return json.dumps({"error": "Empty package name", "hits": 0}, ensure_ascii=False, indent=2)
cache_key = f"{package_name}:{ecosystem}"
logger.info("[QUERY] GHSA check: %s (ecosystem=%s)", package_name, ecosystem)
# โ”€โ”€ Step 2๏ผš่ฎ€ๅ–ๅฟซๅ– โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
cache = _read_ghsa_cache()
if cache_key in cache and _is_cache_fresh(cache[cache_key]):
cached = cache[cache_key]
logger.info("[CACHE] GHSA cache hit: %s โ†’ hits=%d", cache_key, cached.get("hits", 0))
return json.dumps({
"package": package_name,
"ecosystem": ecosystem,
"query_time": datetime.now(timezone.utc).isoformat(),
**{k: v for k, v in cached.items() if not k.startswith("_")},
"source": cached.get("_source", "GHSA (cache)"),
}, ensure_ascii=False, indent=2)
# โ”€โ”€ Step 3๏ผš็ทšไธŠๆŸฅ่ฉข โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
github_token = os.getenv("GITHUB_TOKEN", "")
result = _fetch_ghsa_rest(package_name, ecosystem, github_token)
# ๆ›ดๆ–ฐๅฟซๅ–
cache[cache_key] = {**result, "_cached_at": time.time()}
_write_ghsa_cache(cache)
logger.info(
"[OK] GHSA: %s โ†’ hits=%d, severity=%s, cves=%s",
package_name, result["hits"], result["max_severity"], result["cve_ids"][:3],
)
return json.dumps({
"package": package_name,
"ecosystem": ecosystem,
"query_time": datetime.now(timezone.utc).isoformat(),
"hits": result["hits"],
"max_severity": result["max_severity"],
"severity_score": result["severity_score"],
"cve_ids": result["cve_ids"],
"ghsa_ids": result["ghsa_ids"],
"published_since": result["published_since"],
"source": result.get("_source", "GHSA REST API"),
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error("[FAIL] GHSA Tool unexpected error for %s: %s", package_query, e, exc_info=True)
return json.dumps({
"package": package_query,
"hits": 0,
"max_severity": "UNKNOWN",
"severity_score": 0.0,
"cve_ids": [],
"ghsa_ids": [],
"error": f"Unexpected error: {str(e)[:200]}",
"source": "GHSA (error)",
}, ensure_ascii=False, indent=2)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# CrewAI @tool ๅŒ…่ฃ๏ผˆAgent ๅ‘ผๅซ็”จ๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _create_tool():
"""ๅปถ้ฒๅปบ็ซ‹ CrewAI Tool"""
from crewai.tools import tool
@tool("search_ghsa")
def query_ghsa(package_query: str) -> str:
"""ๆŸฅ่ฉข GitHub Advisory Database๏ผˆGHSA๏ผ‰ๅ–ๅพ—ๅฅ—ไปถ็š„ๅฎ‰ๅ…จๅ‘Š่ญฆใ€‚
่ผธๅ…ฅๆ ผๅผ๏ผšๅฅ—ไปถๅ๏ผŒๅฏๅŠ  :ecosystem ๆŒ‡ๅฎš็”Ÿๆ…‹็ณป๏ผˆๅฆ‚ "django:python"ใ€"lodash:npm"๏ผ‰ใ€‚
่‹ฅไธๆŒ‡ๅฎš็”Ÿๆ…‹็ณป๏ผŒ้ ่จญ็‚บ Python/pipใ€‚
ๅ›žๅ‚ณ๏ผšGHSA ๅ‘Š่ญฆๅ‘ฝไธญๆ•ธใ€ๆœ€้ซ˜ๅšด้‡ๆ€งใ€้—œ่ฏ CVE IDใ€GHSA IDใ€‚
GHSA ๅ„ชๅ‹ข๏ผšๆฏ” NVD ๆ—ฉ 2-4 ้€ฑ็™ผๅ‡บๅ‘Š่ญฆ๏ผŒ็‰นๅˆฅ้ฉๅˆ 2024 ๅนด NVD ็ฉๅฃ“ๆœŸ้–“็š„่ฃœๅ……ๆŸฅ่ฉขใ€‚
ๆ”ฏๆด็”Ÿๆ…‹็ณป๏ผšpip๏ผˆPython๏ผ‰ใ€npm๏ผˆNode.js๏ผ‰ใ€goใ€maven๏ผˆJava๏ผ‰ใ€rubygems๏ผˆRuby๏ผ‰ใ€crates.io๏ผˆRust๏ผ‰ใ€‚
ๆณจๆ„๏ผš็„ก GITHUB_TOKEN ๆ™‚ไฝฟ็”จๆœช่ช่ญ‰้กๅบฆ๏ผˆๆฏๅฐๆ™‚ 60 ่ซ‹ๆฑ‚๏ผ‰๏ผ›่จญๅฎš GITHUB_TOKEN ๅฏๆๅ‡่‡ณ 5,000 ่ซ‹ๆฑ‚ใ€‚"""
return _query_ghsa_impl(package_query)
return query_ghsa
# โ”€โ”€ ๅปถ้ฒ่ผ‰ๅ…ฅๆฉŸๅˆถ๏ผˆ่ˆ‡ kev_tool.py ็›ธๅŒๆจกๅผ๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class _LazyToolLoader:
def __init__(self):
self._tool = None
def _load(self):
if self._tool is None:
self._tool = _create_tool()
@property
def query_ghsa(self):
self._load()
return self._tool
_loader = _LazyToolLoader()
def __getattr__(name):
"""ๆจก็ต„ๅฑค็ดš __getattr__๏ผŒๆ”ฏๆด from tools.ghsa_tool import query_ghsa"""
if name == "query_ghsa":
return _loader.query_ghsa
raise AttributeError(f"module 'tools.ghsa_tool' has no attribute {name!r}")