File size: 13,589 Bytes
c8d30bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | # tools/otx_tool.py
# ๅ่ฝ๏ผAlienVault OTX ๅจ่
ๆ
ๅ ฑๆฅ่ฉข Tool
# Harness ๆฏๆฑ๏ผGraceful Degradation๏ผ้็ด็ๅธ๏ผ+ Observability๏ผๅๅญๅๆฅ่ช๏ผ
# ๆๆ่
๏ผๆๅก B๏ผScout Agent Pipeline๏ผ
#
# ไฝฟ็จๆนๅผ๏ผ
# from tools.otx_tool import search_otx
#
# ๆถๆงๅฎไฝ๏ผ
# Scout Agent ็ใ็ฌฌไบ้ปๆใโ ่ฒ ่ฒฌๆฅ่ฉข OTX ๅจ่
ๆ
ๅ ฑ
# ๅ
ๅจ Agent ๅคๆท CVSS >= 7.0 ๆๆๆ่ขซๅผๅซ๏ผ็ฑ Skill SOP ๅผๅฐ๏ผ
import json
import os
import time
import hashlib
import logging
from datetime import datetime, timezone, timedelta
import requests
logger = logging.getLogger("ThreatHunter")
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ๅธธๆธ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
OTX_API_BASE = "https://otx.alienvault.com/api/v1"
OTX_SEARCH_ENDPOINT = f"{OTX_API_BASE}/search/pulses"
RESULTS_LIMIT = 10
REQUEST_TIMEOUT = 20 # ็ง
# Rate limit ๆงๅถ๏ผOTX ่ผๅฏฌ้ฌ๏ผ10,000 req/hr๏ผ
RATE_LIMIT_INTERVAL = 1.0 # ไฟๅฎ้้
MAX_RETRIES = 2
# ้ข็ทๅฟซๅ
CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data")
CACHE_TTL = 3600 * 12 # 12 ๅฐๆ้ๆ๏ผOTX ่ณๆๆดๆฐ่ผ้ ป็น๏ผ
# ๆดป่บๅบฆๅคๅฎ
ACTIVE_THRESHOLD_DAYS = 90 # 90 ๅคฉๅ
งๆๆฐ pulse โ active
ACTIVE_PULSE_COUNT = 3 # pulse ๆธ >= 3 โ ๆดๅฏ่ฝ active
# ไธๆฌก่ซๆฑๆ้
_last_request_time = 0.0
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ่ผๅฉๅฝๅผ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _get_cache_path(package_name: str) -> str:
"""ๅๅพ้ข็ทๅฟซๅๆชๆก่ทฏๅพ"""
safe_name = hashlib.md5(package_name.encode()).hexdigest()[:12]
return os.path.join(CACHE_DIR, f"otx_cache_{package_name}_{safe_name}.json")
def _read_cache(package_name: str) -> dict | None:
"""่ฎๅ้ข็ทๅฟซๅ๏ผ้ๆๆไธๅญๅจๅๅณ None"""
cache_path = _get_cache_path(package_name)
try:
if os.path.exists(cache_path):
with open(cache_path, "r", encoding="utf-8") as f:
cached = json.load(f)
cached_time = cached.get("_cached_at", 0)
if time.time() - cached_time < CACHE_TTL:
logger.info("[OK] OTX cache hit: %s", package_name)
return cached
else:
logger.info("[INFO] OTX cache expired: %s", package_name)
except (json.JSONDecodeError, IOError) as e:
logger.warning("[WARN] OTX cache read failed: %s", e)
return None
def _write_cache(package_name: str, data: dict) -> None:
"""ๅฏซๅ
ฅ้ข็ทๅฟซๅ"""
try:
os.makedirs(CACHE_DIR, exist_ok=True)
cache_path = _get_cache_path(package_name)
data["_cached_at"] = time.time()
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except (IOError, PermissionError) as e:
logger.warning("[WARN] OTX cache write failed: %s", e)
def _rate_limit() -> None:
"""Rate limiter โ OTX ่ผๅฏฌ้ฌไฝไป้ไฟๅฎ"""
global _last_request_time
elapsed = time.time() - _last_request_time
if elapsed < RATE_LIMIT_INTERVAL:
wait = RATE_LIMIT_INTERVAL - elapsed
time.sleep(wait)
_last_request_time = time.time()
def _determine_threat_level(pulse_count: int, pulses: list) -> str:
"""
ๆ นๆ pulse ๆธ้ๅๆ้ๅคๅฎๅจ่
็ญ็ดใ
่ฆๅ๏ผ่ฆ architecture_spec.md ยง4.2๏ผ๏ผ
pulse_count >= 3 ไธๆ่ฟ 90 ๅคฉๆๆฐ pulse โ "active"
pulse_count >= 1 ไฝ้ฝ่ถ
้ 90 ๅคฉ โ "inactive"
pulse_count == 0 โ "unknown"
"""
if pulse_count == 0:
return "unknown"
cutoff = datetime.now(timezone.utc) - timedelta(days=ACTIVE_THRESHOLD_DAYS)
has_recent = False
for pulse in pulses:
created_str = pulse.get("created", "")
try:
# OTX ๆ้ๆ ผๅผ๏ผ2024-08-10T12:00:00.000000 ๆ 2024-08-10T12:00:00
created_str_clean = created_str.replace("Z", "+00:00")
if "." in created_str_clean and "+" not in created_str_clean.split(".")[-1]:
created_str_clean = created_str_clean.split(".")[0] + "+00:00"
elif "+" not in created_str_clean and "-" not in created_str_clean[10:]:
created_str_clean += "+00:00"
created = datetime.fromisoformat(created_str_clean)
if created.tzinfo is None:
created = created.replace(tzinfo=timezone.utc)
if created > cutoff:
has_recent = True
break
except (ValueError, TypeError):
continue
if pulse_count >= ACTIVE_PULSE_COUNT and has_recent:
return "active"
elif has_recent:
return "active" # ๅณไฝฟ pulse ๅฐ๏ผๆ่ฟๆๆดปๅไน็ฎ active
else:
return "inactive"
def _parse_pulse(pulse: dict) -> dict:
"""ๅฐๅฎไธ OTX pulse ่ฝ็บ Tool ่ผธๅบๆ ผๅผ"""
# ๆๅ indicator ็ตฑ่จ
indicators = pulse.get("indicators", [])
indicator_count = len(indicators) if isinstance(indicators, list) else 0
# ๆๅ tags
tags = pulse.get("tags", [])
if not isinstance(tags, list):
tags = []
# ๆๅๆ้๏ผๅชๅๆฅๆ้จๅ๏ผ
created = pulse.get("created", "")
if "T" in created:
created = created.split("T")[0]
return {
"name": pulse.get("name", "")[:200], # ๆชๆท้้ทๅ็จฑ
"description": (pulse.get("description", "") or "")[:300], # ๆชๆท้้ทๆ่ฟฐ
"created": created,
"tags": tags[:10], # ๆๅค 10 ๅ tag
"indicator_count": indicator_count,
}
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ๆ ธๅฟๆฅ่ฉข้่ผฏ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _query_otx_api(keyword: str) -> dict | None:
"""
ๅผๅซ OTX API๏ผๅๅณๅๅง JSON response dictใ
ๅคฑๆๅๅณ Noneใ
"""
api_key = os.getenv("OTX_API_KEY", "")
headers = {
"Accept": "application/json",
}
if api_key:
headers["X-OTX-API-KEY"] = api_key
params = {
"q": keyword,
"limit": RESULTS_LIMIT,
}
for attempt in range(1, MAX_RETRIES + 1):
_rate_limit()
try:
logger.info("[QUERY] OTX API: %s (attempt %d)", keyword, attempt)
response = requests.get(
OTX_SEARCH_ENDPOINT,
params=params,
headers=headers,
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 200:
return response.json()
if response.status_code == 403:
logger.warning("[WARN] OTX API 403 (unauthorized) -- API Key needed")
return None
if response.status_code == 429:
logger.warning("[WARN] OTX API 429 (rate limited)")
time.sleep(5)
continue
if response.status_code >= 500:
logger.warning("[WARN] OTX API %d (server error)", response.status_code)
time.sleep(2)
continue
logger.warning("[WARN] OTX API returned %d: %s", response.status_code, response.text[:200])
return None
except requests.exceptions.Timeout:
logger.warning("[WARN] OTX API timeout (%ds)", REQUEST_TIMEOUT)
continue
except requests.exceptions.ConnectionError:
logger.warning("[WARN] OTX API connection failed (network issue)")
continue
except requests.exceptions.RequestException as e:
logger.warning("[WARN] OTX API request error: %s", e)
return None
return None
def _parse_otx_response(raw: dict, package_name: str) -> dict:
"""
ๅฐ OTX API ๅๅง response ่ฝๆ็บ Tool ่ผธๅบๆ ผๅผใ
่ฝๆ mapping๏ผ่ฆ architecture_spec.md ยง4.2๏ผ:
response.results[].name โ pulse_name
response.results[].description โ description
response.results[].created โ created
response.results[].indicators โ indicator_count
response.results[].tags โ tags
len(response.results) โ pulse_count
"""
raw_results = raw.get("results", [])
if not isinstance(raw_results, list):
raw_results = []
pulses = [_parse_pulse(p) for p in raw_results]
pulse_count = len(pulses)
threat_level = _determine_threat_level(pulse_count, raw_results)
return {
"package": package_name,
"source": "OTX",
"pulse_count": pulse_count,
"threat_level": threat_level,
"pulses": pulses,
}
def _search_otx_impl(package_name: str) -> str:
"""
search_otx ็ๆ ธๅฟๅฏฆไฝ๏ผ่ CrewAI @tool ่งฃ่ฆ๏ผๆนไพฟๅฎๅ
ๆธฌ่ฉฆ๏ผใ
้็ด็ๅธ๏ผ
1. ๆฅ OTX API
2. API ๅคฑๆ โ ่ฎ้ข็ทๅฟซๅ
3. ๅฟซๅไนๆฒๆ โ ๅๅณ threat_level: "unknown"
4. ไปปไฝๆช้ ๆ้ฏ่ชค โ ๅๅณๅฎๅ
จ็้ ่จญ็ตๆ๏ผ็ตไธ crash๏ผ
"""
try:
# ๆธ
็ๅฅไปถๅ็จฑ
name = package_name.strip().lower()
name = name.split()[0] if " " in name else name
logger.info("[QUERY] OTX package: %s", name)
# ๅ่ฉฆ API ๆฅ่ฉข
raw = _query_otx_api(name)
if raw is not None:
result = _parse_otx_response(raw, package_name)
# ๅฏซๅ
ฅๅฟซๅ
_write_cache(name, result)
logger.info(
"[OK] OTX query success: %s -> %d pulses, threat_level=%s",
package_name, result['pulse_count'], result['threat_level']
)
return json.dumps(result, ensure_ascii=False, indent=2)
# API ๅคฑๆ โ ๅ่ฉฆๅฟซๅ
cached = _read_cache(name)
if cached:
cached.pop("_cached_at", None)
cached["fallback_used"] = True
cached["error"] = f"OTX API unavailable, using cached data for '{name}'"
logger.info("[OK] OTX using cache: %s", name)
return json.dumps(cached, ensure_ascii=False, indent=2)
# ๅฎๅ
จๆฒๆ่ณๆ
empty_result = {
"package": package_name,
"source": "OTX",
"pulse_count": 0,
"threat_level": "unknown",
"pulses": [],
"error": f"OTX API unavailable and no cache for '{name}'",
"fallback_used": False,
}
logger.info("[INFO] OTX no data for: %s", package_name)
return json.dumps(empty_result, ensure_ascii=False, indent=2)
except Exception as e:
# ๆๅพไธ้้ฒ็ท
logger.error("[FAIL] OTX Tool unexpected error: %s", e, exc_info=True)
error_result = {
"package": package_name,
"source": "OTX",
"pulse_count": 0,
"threat_level": "unknown",
"pulses": [],
"error": f"Unexpected error: {str(e)}",
"fallback_used": False,
}
return json.dumps(error_result, ensure_ascii=False, indent=2)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# CrewAI @tool ๅ
่ฃ๏ผAgent ๅผๅซ็จ๏ผ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _create_tool():
"""ๅปถ้ฒๅปบ็ซ CrewAI Tool๏ผๅ
ๅจ Agent ๅฏฆ้ไฝฟ็จๆๆ import"""
from crewai.tools import tool
@tool("search_otx")
def search_otx(package_name: str) -> str:
"""ๆฅ่ฉข AlienVault OTX ไธญๆๅฎๅฅไปถ็ๆดป่บๅจ่
ๆ
ๅ ฑใ
่ผธๅ
ฅๅฅไปถๅ็จฑ๏ผๅฆ djangoใredis๏ผ๏ผๅๅณ่ฉฒๅฅไปถ็ๅจ่
ๆ
ๅ ฑ pulse ๆธ
ๅฎ๏ผJSON ๆ ผๅผ๏ผใ
ๅ
ๅซๆดป่บๅบฆๅคๅฎ๏ผactive/inactive/unknown๏ผใๅจ่
pulse ๅ็จฑใๆ่ฟฐใIOC ๆธ้็ญใ
ๅปบ่ญฐๅ
ๅจ CVSS >= 7.0 ็้ซๅฑๅฅไปถๆๆฅ่ฉข OTXใ"""
return _search_otx_impl(package_name)
return search_otx
# โโ ๅปถ้ฒ่ผๅ
ฅๆฉๅถ๏ผ่ memory_tool.py ็ธๅๆจกๅผ๏ผโโโโโโโโโโโโโโโโโโ
class _LazyToolLoader:
def __init__(self):
self._tool = None
def _load(self):
if self._tool is None:
self._tool = _create_tool()
@property
def search_otx(self):
self._load()
return self._tool
_loader = _LazyToolLoader()
def __getattr__(name):
"""ๆจก็ตๅฑค็ด __getattr__๏ผๆฏๆด from tools.otx_tool import search_otx"""
if name == "search_otx":
return _loader.search_otx
raise AttributeError(f"module 'tools.otx_tool' has no attribute {name!r}")
|