File size: 11,821 Bytes
c8d30bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# tools/kev_tool.py
# ๅŠŸ่ƒฝ๏ผšCISA KEV๏ผˆKnown Exploited Vulnerabilities๏ผ‰ๆธ…ๅ–ฎๆŸฅ่ฉข Tool
# Harness ๆ”ฏๆŸฑ๏ผšGraceful Degradation๏ผˆ้™็ดš็€‘ๅธƒ๏ผ‰+ Observability๏ผˆๅŽŸๅญๅŒ–ๆ—ฅ่ชŒ๏ผ‰
# ๆ“ๆœ‰่€…๏ผšๆˆๅ“ก C๏ผˆAnalyst Agent Pipeline๏ผ‰
#
# ไฝฟ็”จๆ–นๅผ๏ผš
#   from tools.kev_tool import check_cisa_kev
#
# ๆžถๆง‹ๅฎšไฝ๏ผš
#   Analyst Agent ็š„ใ€Œ็ฌฌไธ€้šปๆ‰‹ใ€โ€” ้ฉ—่ญ‰ CVE ๆ˜ฏๅฆๅทฒ่ขซ้‡Žๅค–ๅˆฉ็”จ
#   ๅœจ KEV ๆธ…ๅ–ฎไธŠ = ๅทฒ็ขบ่ช่ขซๅˆฉ็”จ = ้ขจ้šชๆฅต้ซ˜๏ผŒ้œ€็ซ‹ๅณ่™•็†

import json
import os
import time
import logging
from datetime import datetime, timezone

import requests

logger = logging.getLogger("ThreatHunter")

# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅธธๆ•ธ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

KEV_API_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
REQUEST_TIMEOUT = 30  # ็ง’

# ้›ข็ทšๅฟซๅ–
CACHE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data")
KEV_CACHE_PATH = os.path.join(CACHE_DIR, "kev_cache.json")

# ๆจก็ต„็ดš KEV ๆŸฅ่ฉข่กจ๏ผˆ้ฆ–ๆฌกๅ‘ผๅซๆ™‚่ผ‰ๅ…ฅ๏ผŒไน‹ๅพŒ้‡่ค‡ไฝฟ็”จ๏ผ‰
_kev_lookup: dict | None = None
_kev_total_count: int = 0
_kev_source: str = "CISA KEV (unavailable)"


# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ่ผ”ๅŠฉๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

def _download_kev_catalog() -> dict | None:
    """
    ไธ‹่ผ‰ๅฎŒๆ•ด CISA KEV JSON ่ณ‡ๆ–™ใ€‚
    ๆˆๅŠŸๅ›žๅ‚ณๅŽŸๅง‹ JSON dict๏ผŒๅคฑๆ•—ๅ›žๅ‚ณ Noneใ€‚
    """
    try:
        logger.info("[QUERY] Downloading CISA KEV catalog...")
        response = requests.get(KEV_API_URL, timeout=REQUEST_TIMEOUT)

        if response.status_code == 200:
            data = response.json()
            logger.info("[OK] KEV catalog downloaded: %d entries", len(data.get('vulnerabilities', [])))
            return data

        logger.warning("[WARN] KEV API returned %d", response.status_code)
        return None

    except requests.exceptions.Timeout:
        logger.warning("[WARN] KEV API timeout (%ds)", REQUEST_TIMEOUT)
        return None
    except requests.exceptions.ConnectionError:
        logger.warning("[WARN] KEV API connection failed (network issue)")
        return None
    except requests.exceptions.RequestException as e:
        logger.warning("[WARN] KEV API request error: %s", e)
        return None
    except (json.JSONDecodeError, ValueError) as e:
        logger.warning("[WARN] KEV API returned non-JSON: %s", e)
        return None


def _write_kev_cache(data: dict) -> None:
    """ๅฐ‡ KEV ๅฎŒๆ•ด่ณ‡ๆ–™ๅฏซๅ…ฅ้›ข็ทšๅฟซๅ–"""
    try:
        os.makedirs(CACHE_DIR, exist_ok=True)
        data["_cached_at"] = time.time()
        with open(KEV_CACHE_PATH, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        logger.info("[OK] KEV cache updated: %s", KEV_CACHE_PATH)
    except (IOError, PermissionError) as e:
        logger.warning("[WARN] KEV cache write failed: %s", e)


def _read_kev_cache() -> dict | None:
    """่ฎ€ๅ– KEV ้›ข็ทšๅฟซๅ–๏ผŒไธๅญ˜ๅœจๅ›žๅ‚ณ None๏ผˆKEV ๅฟซๅ–ไธ่จญ้ŽๆœŸ๏ผŒๅ› ็‚บๆœ‰ๆ›ดๆ–ฐๆฉŸๅˆถ๏ผ‰"""
    try:
        if os.path.exists(KEV_CACHE_PATH):
            with open(KEV_CACHE_PATH, "r", encoding="utf-8") as f:
                cached = json.load(f)
            logger.info("[OK] KEV cache hit: %d entries", len(cached.get('vulnerabilities', [])))
            return cached
    except (json.JSONDecodeError, IOError) as e:
        logger.warning("[WARN] KEV cache read failed: %s", e)
    return None


def _build_kev_lookup(raw_data: dict) -> dict:
    """
    ๅฐ‡ KEV ๅŽŸๅง‹่ณ‡ๆ–™ๅปบ็ซ‹็‚บ {cve_id: details} ๆŸฅ่ฉข่กจใ€‚

    KEV JSON ็ตๆง‹๏ผš
      vulnerabilities[].cveID โ†’ CVE ID
      vulnerabilities[].dateAdded โ†’ ๅŠ ๅ…ฅๆ—ฅๆœŸ
      vulnerabilities[].dueDate โ†’ ไฟฎ่ฃœๆœŸ้™
      vulnerabilities[].vendorProject โ†’ ไพ›ๆ‡‰ๅ•†
      vulnerabilities[].product โ†’ ็”ขๅ“
      vulnerabilities[].knownRansomwareCampaignUse โ†’ ๆ˜ฏๅฆ่ขซๅ‹’็ดข่ปŸ้ซ”ๅˆฉ็”จ
      vulnerabilities[].shortDescription โ†’ ็ฐก็Ÿญๆ่ฟฐ
    """
    lookup = {}
    for vuln in raw_data.get("vulnerabilities", []):
        cve_id = vuln.get("cveID", "")
        if cve_id:
            lookup[cve_id] = {
                "date_added": vuln.get("dateAdded", ""),
                "due_date": vuln.get("dueDate", ""),
                "vendor": vuln.get("vendorProject", ""),
                "product": vuln.get("product", ""),
                "known_ransomware_use": vuln.get("knownRansomwareCampaignUse", "Unknown"),
                "short_description": vuln.get("shortDescription", ""),
            }
    return lookup


def _ensure_kev_loaded() -> None:
    """
    ็ขบไฟ KEV ๆŸฅ่ฉข่กจๅทฒ่ผ‰ๅ…ฅ๏ผˆLazy Loading๏ผ‰ใ€‚

    ้™็ดš็ญ–็•ฅ๏ผš
      1. ไธ‹่ผ‰็ทšไธŠ KEV JSON โ†’ ๆˆๅŠŸๅ‰‡ๅปบ็ซ‹ๆŸฅ่ฉข่กจ + ๆ›ดๆ–ฐๅฟซๅ–
      2. ไธ‹่ผ‰ๅคฑๆ•— โ†’ ่ฎ€้›ข็ทšๅฟซๅ–
      3. ๅฟซๅ–ไนŸๆฒ’ๆœ‰ โ†’ ๆŸฅ่ฉข่กจ็‚บ็ฉบ dict๏ผˆๆ‰€ๆœ‰ CVE ้ƒฝๅ›žๅ‚ณ in_kev=false๏ผ‰
    """
    global _kev_lookup, _kev_total_count, _kev_source

    if _kev_lookup is not None:
        return  # ๅทฒ่ผ‰ๅ…ฅ๏ผŒไธ้‡่ค‡ไธ‹่ผ‰

    # ๅ˜—่ฉฆ็ทšไธŠไธ‹่ผ‰
    raw_data = _download_kev_catalog()
    if raw_data is not None:
        _kev_lookup = _build_kev_lookup(raw_data)
        _kev_total_count = len(_kev_lookup)
        _kev_source = "CISA KEV (online)"
        _write_kev_cache(raw_data)
        logger.info("[OK] KEV lookup table built (online): %d entries", _kev_total_count)
        return

    # ้™็ดš๏ผš่ฎ€้›ข็ทšๅฟซๅ–
    cached = _read_kev_cache()
    if cached is not None:
        _kev_lookup = _build_kev_lookup(cached)
        _kev_total_count = len(_kev_lookup)
        _kev_source = "CISA KEV (cache)"
        logger.info("[OK] KEV lookup table built (cache): %d entries", _kev_total_count)
        return

    # ๆœ€็ต‚้™็ดš๏ผš็ฉบๆŸฅ่ฉข่กจ
    _kev_lookup = {}
    _kev_total_count = 0
    _kev_source = "CISA KEV (unavailable)"
    logger.warning("[WARN] KEV catalog unavailable (online + cache both failed), all queries return in_kev=false")


# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆ ธๅฟƒๆŸฅ่ฉข้‚่ผฏ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

def _check_kev_impl(cve_ids: str) -> str:
    """
    check_cisa_kev ็š„ๆ ธๅฟƒๅฏฆไฝœ๏ผˆ่ˆ‡ CrewAI @tool ่งฃ่€ฆ๏ผŒๆ–นไพฟๅ–ฎๅ…ƒๆธฌ่ฉฆ๏ผ‰ใ€‚

    ๆŽฅๆ”ถ้€—่™Ÿๅˆ†้š”็š„ CVE ID ๅญ—ไธฒ๏ผŒๅ›žๅ‚ณๆฏๅ€‹ CVE ็š„ KEV ็‹€ๆ…‹ JSONใ€‚

    ้™็ดš็ญ–็•ฅ๏ผš
      1. ็ทšไธŠ KEV โ†’ ไฝฟ็”จ + ๆ›ดๆ–ฐๅฟซๅ–
      2. ็ทšไธŠๅคฑๆ•— โ†’ ่ฎ€้›ข็ทšๅฟซๅ–
      3. ๅฟซๅ–ไนŸๆฒ’ๆœ‰ โ†’ in_kev: false๏ผˆไฟๅฎˆ้ ่จญ๏ผŒไธ crash๏ผ‰
      4. ไปปไฝ•ๆœช้ ๆœŸ้Œฏ่ชค โ†’ ๅ›žๅ‚ณๅฎ‰ๅ…จ็š„็ฉบ็ตๆžœ๏ผˆ็ต•ไธ crash๏ผ‰
    """
    try:
        # ็ขบไฟ KEV ๆŸฅ่ฉข่กจๅทฒ่ผ‰ๅ…ฅ
        _ensure_kev_loaded()

        # ่งฃๆž้€—่™Ÿๅˆ†้š”็š„ CVE ID
        raw_ids = [cid.strip() for cid in cve_ids.split(",") if cid.strip()]
        if not raw_ids:
            logger.warning("[WARN] KEV Tool received empty CVE ID input")
            return json.dumps({
                "source": _kev_source,
                "results": [],
                "kev_total_count": _kev_total_count,
                "error": "No CVE IDs provided",
            }, ensure_ascii=False, indent=2)

        logger.info("[QUERY] KEV check: %d CVEs -- %s", len(raw_ids), raw_ids)

        results = []
        for cve_id in raw_ids:
            # ๆญฃ่ฆๅŒ– CVE ID ๆ ผๅผ๏ผˆๅŽป้™ค็ฉบ็™ฝใ€็ตฑไธ€ๅคงๅฏซ๏ผ‰
            cve_id = cve_id.strip().upper()

            if cve_id in _kev_lookup:
                details = _kev_lookup[cve_id]
                results.append({
                    "cve_id": cve_id,
                    "in_kev": True,
                    "date_added": details["date_added"],
                    "due_date": details["due_date"],
                    "vendor": details["vendor"],
                    "product": details["product"],
                    "known_ransomware_use": details["known_ransomware_use"],
                    "short_description": details["short_description"],
                })
                logger.info("[ALERT] %s is in CISA KEV list! (confirmed wild exploitation)", cve_id)
            else:
                results.append({
                    "cve_id": cve_id,
                    "in_kev": False,
                })
                logger.info("[OK] %s is not in CISA KEV list", cve_id)

        kev_count = sum(1 for r in results if r["in_kev"])
        logger.info(
            "[OK] KEV check complete: %d queries, %d in KEV list",
            len(results), kev_count
        )

        return json.dumps({
            "source": _kev_source,
            "results": results,
            "kev_total_count": _kev_total_count,
        }, ensure_ascii=False, indent=2)

    except Exception as e:
        # ๆœ€ๅพŒไธ€้“้˜ฒ็ทš๏ผšไปปไฝ•ๆœช้ ๆœŸ้Œฏ่ชค้ƒฝไธ่ƒฝ่ฎ“ Agent crash
        logger.error("[FAIL] KEV Tool unexpected error: %s", e, exc_info=True)
        error_result = {
            "source": "CISA KEV (error)",
            "results": [],
            "kev_total_count": 0,
            "error": f"Unexpected error: {str(e)}",
        }
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# CrewAI @tool ๅŒ…่ฃ๏ผˆAgent ๅ‘ผๅซ็”จ๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

# โš ๏ธ ้‡่ฆ๏ผšไฝฟ็”จใ€Œๅปถ้ฒ่ผ‰ๅ…ฅใ€ๆจกๅผ๏ผˆLazyToolLoader๏ผ‰
# ๅŽŸๅ› ๏ผš้ฟๅ…ๅœจ import ้šŽๆฎตๅฐฑ่งธ็™ผ CrewAI ็š„ tool ่จปๅ†Š
def _create_tool():
    """ๅปถ้ฒๅปบ็ซ‹ CrewAI Tool๏ผŒๅƒ…ๅœจ Agent ๅฏฆ้š›ไฝฟ็”จๆ™‚ๆ‰ import"""
    from crewai.tools import tool

    @tool("check_cisa_kev")
    def check_cisa_kev(cve_ids: str) -> str:
        """ๆŸฅ่ฉข CVE ๆ˜ฏๅฆๅœจ CISA KEV๏ผˆๅทฒ็Ÿฅ่ขซๅˆฉ็”จๆผๆดž๏ผ‰ๆธ…ๅ–ฎไธŠใ€‚
่ผธๅ…ฅไธ€ๆˆ–ๅคšๅ€‹ CVE ID๏ผˆ้€—่™Ÿๅˆ†้š”๏ผŒๅฆ‚ "CVE-2021-44228,CVE-2024-1234"๏ผ‰๏ผŒ
ๅ›žๅ‚ณๆฏๅ€‹ CVE ็š„ KEV ็‹€ๆ…‹๏ผŒๅŒ…ๅซๅŠ ๅ…ฅๆ—ฅๆœŸใ€ๅˆฐๆœŸๆ—ฅใ€ๆ˜ฏๅฆ่ขซๅ‹’็ดข่ปŸ้ซ”ๅˆฉ็”จ็ญ‰่ณ‡่จŠใ€‚
ๅœจ KEV ๆธ…ๅ–ฎไธŠ็š„ CVE = ๅทฒ็ขบ่ช่ขซ้‡Žๅค–ๅˆฉ็”จ = ้ขจ้šชๆฅต้ซ˜ใ€‚"""
        return _check_kev_impl(cve_ids)

    return check_cisa_kev


# โ”€โ”€ ๅปถ้ฒ่ผ‰ๅ…ฅๆฉŸๅˆถ๏ผˆ่ˆ‡ nvd_tool.py ็›ธๅŒๆจกๅผ๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

class _LazyToolLoader:
    def __init__(self):
        self._tool = None

    def _load(self):
        if self._tool is None:
            self._tool = _create_tool()

    @property
    def check_cisa_kev(self):
        self._load()
        return self._tool


_loader = _LazyToolLoader()


def __getattr__(name):
    """ๆจก็ต„ๅฑค็ดš __getattr__๏ผŒๆ”ฏๆด from tools.kev_tool import check_cisa_kev"""
    if name == "check_cisa_kev":
        return _loader.check_cisa_kev
    raise AttributeError(f"module 'tools.kev_tool' has no attribute {name!r}")