| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import logging |
| import re |
| from typing import Any |
|
|
| logger = logging.getLogger("ThreatHunter.package_extractor") |
|
|
| |
| |
| |
| |
|
|
| STDLIB_BLACKLIST: frozenset[str] = frozenset({ |
| |
| "__future__", "__main__", "builtins", |
| |
| "string", "re", "difflib", "textwrap", "unicodedata", "readline", |
| "rlcompleter", "codecs", "encodings", |
| |
| "datetime", "calendar", "collections", "heapq", "bisect", |
| "array", "weakref", "types", "copy", "pprint", "reprlib", |
| "enum", "graphlib", "dataclasses", |
| |
| "numbers", "math", "cmath", "decimal", "fractions", "random", |
| "statistics", |
| |
| "itertools", "functools", "operator", |
| |
| "io", "time", "logging", "os", "os.path", "pathlib", |
| "fileinput", "stat", "filecmp", "shutil", "tempfile", |
| "glob", "fnmatch", "linecache", "pickle", "shelve", |
| "marshal", "dbm", "sqlite3", "csv", "configparser", |
| "tomllib", "netrc", "plistlib", |
| |
| "zlib", "gzip", "bz2", "lzma", "zipfile", "tarfile", |
| |
| "json", "html", "html.parser", "xml", "xml.etree", |
| "xml.etree.ElementTree", "xml.dom", "xml.sax", |
| "csv", "struct", |
| |
| "hashlib", "hmac", "secrets", |
| |
| "sys", "sysconfig", "builtins", "warnings", "contextlib", |
| "abc", "atexit", "traceback", "gc", "inspect", "site", |
| "codeop", "code", "zipimport", "pkgutil", "modulefinder", |
| "importlib", "ast", "dis", "py_compile", |
| |
| "threading", "multiprocessing", "concurrent", |
| "concurrent.futures", "subprocess", "sched", "queue", |
| "asyncio", "socket", "ssl", "select", "selectors", |
| "signal", "mmap", "ctypes", |
| |
| "urllib", "urllib.parse", "urllib.request", "urllib.error", |
| "urllib.response", "urllib.robotparser", |
| "http", "http.client", "http.server", "http.cookies", |
| "http.cookiejar", "ftplib", "poplib", "imaplib", |
| "smtplib", "uuid", "socketserver", "xmlrpc", |
| "email", "mailbox", "mimetypes", |
| |
| "unittest", "doctest", "pdb", "profile", "cProfile", |
| "timeit", "trace", "tracemalloc", |
| |
| "typing", "typing_extensions", |
| |
| "platform", "errno", "ctypes", "locale", "gettext", |
| "argparse", "getopt", "getpass", "curses", "turtle", |
| "copy", "pprint", "base64", "binascii", "quopri", |
| "uu", "struct", "codecs", "unicodedata", |
| }) |
|
|
| |
| _RELATIVE_IMPORT_MODULE_PREFIXES = frozenset({"", None}) |
|
|
| |
| MAX_PACKAGES = 8 |
|
|
| |
| _MIN_PACKAGE_NAME_LEN = 2 |
| _INVALID_NAME_RE = re.compile(r"[^a-zA-Z0-9_\-]") |
|
|
| |
| |
| NODEJS_BUILTIN_BLACKLIST: frozenset[str] = frozenset({ |
| "fs", "path", "http", "https", "url", "events", "stream", |
| "util", "crypto", "os", "child_process", "net", "tls", |
| "dns", "readline", "cluster", "worker_threads", "buffer", |
| "assert", "querystring", "punycode", "string_decoder", |
| "zlib", "timers", "process", "console", "module", |
| "v8", "vm", "perf_hooks", "async_hooks", "inspector", |
| "http2", "dgram", "domain", "repl", "tty", "wasi", |
| "trace_events", "diagnostics_channel", "node:fs", "node:path", |
| }) |
|
|
| |
| |
| |
| GO_STDLIB_BLACKLIST: frozenset[str] = frozenset({ |
| |
| "fmt", "log", "os", "io", "net", "sync", "time", "math", |
| "sort", "strings", "strconv", "bytes", "errors", "context", |
| "flag", "regexp", "reflect", "runtime", "unsafe", "builtin", |
| "testing", "debug", "embed", "encoding", "archive", "compress", |
| "crypto", "database", "image", "index", "mime", "path", |
| "plugin", "text", "unicode", "html", "hash", "container", |
| "expvar", "go", "internal", "maps", "slices", "cmp", "iter", |
| |
| |
| "net/http", "net/url", "os/exec", "os/signal", "io/ioutil", |
| "encoding/json", "encoding/xml", "encoding/csv", "encoding/base64", |
| "crypto/tls", "crypto/sha256", "crypto/md5", "crypto/rand", |
| "database/sql", "html/template", "text/template", "path/filepath", |
| "log/slog", "sync/atomic", "testing/fstest", |
| }) |
|
|
| |
| |
| |
| JAVA_STDLIB_BLACKLIST: frozenset[str] = frozenset({ |
| |
| "java", "javax", |
| |
| "java.io", "java.sql", "java.lang", "java.util", |
| "java.net", "java.nio", "java.security", "java.math", |
| "java.time", "java.text", "java.beans", "java.rmi", |
| "java.awt", "java.applet", "javax.swing", "java.swing", |
| "java.management", "javax.sql", "javax.net", |
| "javax.security", "javax.crypto", "javax.xml", "javax.naming", |
| |
| "android", "dalvik", "kotlin", |
| }) |
|
|
| JVM_STDLIB_PREFIXES: tuple[str, ...] = ( |
| "java.", |
| "javax.", |
| "org.w3c.", |
| "org.xml.sax.", |
| ) |
|
|
| |
| |
| GENERIC_NAMESPACE_ROOTS: frozenset[str] = frozenset({ |
| "com", "org", "net", "io", "edu", "gov", "mil", |
| }) |
|
|
| def _is_valid_package_name(name: str) -> bool: |
| """ |
| ๅคๆทๅฅไปถๅ็จฑๆฏๅฆ็บๅ็็ PyPI/npm ๅฅไปถๅ็จฑใ |
| |
| ้ๆฟพ่ฆๅ๏ผ |
| - ้ทๅบฆ >= 2 |
| - ไธๅซ็นๆฎๅญๅ
๏ผ้ค _ ๅ - ๅค๏ผ |
| - ไธๆฏ็ดๆธๅญ |
| """ |
| if not name or len(name) < _MIN_PACKAGE_NAME_LEN: |
| return False |
| if name.isdigit(): |
| return False |
| if _INVALID_NAME_RE.search(name): |
| return False |
| return True |
|
|
|
|
| def _normalize_package_name(module_str: str) -> str | None: |
| """ |
| ๅฐๆจก็ต่ทฏๅพๆญฃ่ฆๅ็บ้ ๅฑคๅฅไปถๅ็จฑใ |
| |
| ไพๅฆ๏ผ |
| "flask.views" โ "flask" |
| "PIL.Image" โ "PIL" |
| "requests" โ "requests" |
| "os.path" โ "os"๏ผๅพ็บ็ฑ้ปๅๅฎ้ๆฟพ๏ผ |
| "" โ None๏ผ็ธๅฐๅฏๅ
ฅ๏ผ |
| """ |
| if not module_str: |
| return None |
| |
| module_str = module_str.strip().split()[0] |
| |
| top_level = module_str.split(".")[0].strip() |
| if not top_level: |
| return None |
| return top_level.lower() |
|
|
|
|
| def extract_third_party_packages( |
| imports: list[dict[str, Any]], |
| max_packages: int = MAX_PACKAGES, |
| ) -> list[str]: |
| """ |
| ๅพ Security Guard ๆๅ็ imports ๅ่กจไธญ่ๅ็ฌฌไธๆนๅฅไปถๅ็จฑใ |
| |
| Harness ่จญ่จ๏ผ |
| - ็ขบๅฎๆง้่ผฏ๏ผไธไพ่ณด LLM |
| - ้ๆฟพ Python ๆจๆบๅบซ |
| - ้ๅถๆธ้ไธ้๏ผไฟ่ญท Rate Limit๏ผ |
| - ่ผธๅ
ฅๆ ผๅผ้ฏ่ชคๆไธๅดฉๆฝฐ |
| |
| Args: |
| imports: Security Guard extract_code_surface() ๅๅณ็ imports ๅ่กจใ |
| ๆฏๅๅ
็ด ็บ {"module": "requests", "items": [...], "line": 1, ...} |
| max_packages: ๆๅคๅๅณๅนพๅๅฅไปถ๏ผ้ ่จญ 8๏ผ |
| |
| Returns: |
| ๅป้ๅพ็็ฌฌไธๆนๅฅไปถๅ็จฑๅ่กจ๏ผๅฐๅฏซ๏ผใ |
| ไพๅฆ๏ผ["requests", "flask", "pymysql"] |
| """ |
| if not imports: |
| logger.info("[PKG_EX] No imports provided, returning empty list") |
| return [] |
|
|
| seen: set[str] = set() |
| packages: list[str] = [] |
|
|
| for imp in imports: |
| try: |
| if not isinstance(imp, dict): |
| continue |
|
|
| module_raw: str = imp.get("module", "") or "" |
| module_clean = module_raw.strip().rstrip(".*") |
|
|
| |
| level = imp.get("level", 0) |
| if level and level > 0: |
| continue |
| if not module_raw.strip(): |
| continue |
|
|
| top_level = _normalize_package_name(module_raw) |
| if top_level is None: |
| continue |
|
|
| |
| if top_level in STDLIB_BLACKLIST: |
| logger.debug("[PKG_EX] Filtered Python stdlib: %s", top_level) |
| continue |
|
|
| |
| if top_level in NODEJS_BUILTIN_BLACKLIST: |
| logger.debug("[PKG_EX] Filtered Node.js builtin: %s", top_level) |
| continue |
|
|
| |
| if module_raw.strip() in GO_STDLIB_BLACKLIST or top_level in GO_STDLIB_BLACKLIST: |
| logger.debug("[PKG_EX] Filtered Go stdlib: %s (raw: %s)", top_level, module_raw) |
| continue |
|
|
| |
| |
| if "/" in module_clean and module_clean.startswith(( |
| "github.com/", "gitlab.com/", "bitbucket.org/", "gopkg.in/", |
| "golang.org/", "google.golang.org/", |
| )): |
| logger.debug("[PKG_EX] Filtered unversioned Go module import: %s", module_clean) |
| continue |
|
|
| |
| if ( |
| top_level in JAVA_STDLIB_BLACKLIST |
| or module_clean in JAVA_STDLIB_BLACKLIST |
| or any(module_clean.startswith(prefix) for prefix in JVM_STDLIB_PREFIXES) |
| ): |
| logger.debug("[PKG_EX] Filtered Java stdlib: %s (raw: %s)", top_level, module_raw) |
| continue |
|
|
| if "." in module_clean and top_level in GENERIC_NAMESPACE_ROOTS: |
| logger.debug("[PKG_EX] Filtered generic JVM namespace root: %s (raw: %s)", top_level, module_raw) |
| continue |
|
|
| |
| if not _is_valid_package_name(top_level): |
| logger.debug("[PKG_EX] Filtered invalid name: %s", top_level) |
| continue |
|
|
| |
| if top_level in seen: |
| continue |
|
|
| seen.add(top_level) |
| packages.append(top_level) |
|
|
| if len(packages) >= max_packages: |
| logger.info("[PKG_EX] Reached max_packages=%d, truncating", max_packages) |
| break |
|
|
| except Exception as exc: |
| |
| logger.warning("[PKG_EX] Failed to parse import entry %r: %s", imp, exc) |
| continue |
|
|
| logger.info( |
| "[PKG_EX] Extracted %d third-party packages from %d imports: %s", |
| len(packages), len(imports), packages, |
| ) |
| return packages |
|
|
|
|
| def packages_from_security_guard(sg_result: dict[str, Any]) -> list[str]: |
| """ |
| ไพฟๅฉๅฝๅผ๏ผ็ดๆฅๅพ Security Guard ๅฎๆด่ผธๅบไธญ่ๅๅฅไปถๅ่กจใ |
| |
| Args: |
| sg_result: run_security_guard() ็ๅๅณๅผ |
| |
| Returns: |
| ็ฌฌไธๆนๅฅไปถๅ็จฑๅ่กจ |
| """ |
| if not sg_result or not isinstance(sg_result, dict): |
| logger.warning("[PKG_EX] Invalid sg_result type: %s", type(sg_result)) |
| return [] |
|
|
| imports = sg_result.get("imports", []) |
| if not isinstance(imports, list): |
| logger.warning("[PKG_EX] sg_result.imports is not a list: %s", type(imports)) |
| return [] |
|
|
| return extract_third_party_packages(imports) |
|
|
|
|
| def format_packages_for_intel_fusion(packages: list[str]) -> str: |
| """ |
| ๅฐๅฅไปถๅ่กจๆ ผๅผๅ็บ Intel Fusion ๅฏไปฅ็ดๆฅไฝฟ็จ็ๅญไธฒใ |
| |
| ไพๅฆ๏ผ["requests", "flask"] โ "requests, flask" |
| |
| Args: |
| packages: ๅฅไปถๅ็จฑๅ่กจ |
| |
| Returns: |
| ้่ๅ้็ๅฅไปถๅญไธฒ |
| """ |
| return ", ".join(packages) if packages else "" |
|
|
|
|
| |
| |
| |
|
|
| def extract_packages_with_versions(source_text: str, filename: str = "") -> list[dict]: |
| """ |
| ๅพไพ่ณดๆไปถ๏ผrequirements.txt / package.json / pom.xml / Pipfile๏ผ |
| ๆๅๅฅไปถๅ็จฑ + ็ๆฌ่ใ |
| |
| ่ฅ็ๆฌๆช็ฅ๏ผไพๅฆ็ดๆฅๅพ import ๆๅ๏ผ๏ผ |
| ๅๅณ version=None, version_known=Falseใ |
| |
| Args: |
| source_text: ๆไปถๅ
งๅฎน |
| filename: ๆไปถๅ็จฑ๏ผ็จๆผๅคๆทๆ ผๅผ๏ผ |
| |
| Returns: |
| list[dict]: [{"package": "requests", "version": "2.28.0", "version_known": True}, ...] |
| """ |
| results = [] |
| fname = filename.lower() |
|
|
| |
| if "requirements" in fname or fname.endswith(".txt"): |
| for line in source_text.splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| |
| m = re.match(r"^([a-zA-Z0-9_.-]+)\s*(?:==|>=|<=|~=|!=|>|<)\s*([^\s;]+)", line) |
| if m: |
| pkg, ver = m.group(1), m.group(2) |
| results.append({"package": pkg.lower(), "version": ver, "version_known": True}) |
| else: |
| |
| m2 = re.match(r"^([a-zA-Z0-9_.-]+)\s*$", line) |
| if m2: |
| results.append({"package": m2.group(1).lower(), "version": None, "version_known": False}) |
|
|
| |
| elif fname.endswith("package.json"): |
| import json as _json |
| try: |
| data = _json.loads(source_text) |
| for section in ["dependencies", "devDependencies"]: |
| for pkg, ver in data.get(section, {}).items(): |
| |
| clean_ver = re.sub(r"^[^0-9]*", "", ver) if ver else None |
| known = bool(clean_ver and re.match(r"^\d", clean_ver)) |
| results.append({"package": pkg.lower(), "version": clean_ver if known else ver, "version_known": known}) |
| except Exception: |
| pass |
|
|
| |
| elif fname.endswith("pom.xml"): |
| |
| deps = re.findall( |
| r"<dependency>.*?<artifactId>([^<]+)</artifactId>.*?(?:<version>([^<]+)</version>)?.*?</dependency>", |
| source_text, |
| re.DOTALL, |
| ) |
| for art, ver in deps: |
| if art.strip() and not art.strip().startswith("$"): |
| results.append({ |
| "package": art.strip().lower(), |
| "version": ver.strip() if ver and not ver.strip().startswith("$") else None, |
| "version_known": bool(ver and not ver.strip().startswith("$")), |
| }) |
|
|
| |
| elif fname == "pipfile" or fname.endswith("pipfile"): |
| for line in source_text.splitlines(): |
| m = re.match(r'''(?x)^([a-zA-Z0-9_.\-]+)\s*=\s*["\']?([^"\' \t]+)["\']?''', line.strip()) |
| if m: |
| pkg, ver = m.group(1), m.group(2) |
| clean = re.sub(r"^[^0-9]*", "", ver) |
| known = bool(clean and re.match(r"^\d", clean)) |
| results.append({"package": pkg.lower(), "version": clean if known else ver, "version_known": known}) |
|
|
| return results |
|
|
|
|
| def build_version_disclaimer(package: str, version: str | None) -> str: |
| """ |
| ็บ Intel Fusion ็ CVE ่ผธๅบ็ๆ็ๆฌๅ
่ฒฌ่ฒๆใ |
| |
| Args: |
| package: ๅฅไปถๅ็จฑ |
| version: ็ๆฌ่๏ผNone ่กจ็คบๆช็ฅ๏ผ |
| |
| Returns: |
| ๅ
่ฒฌ่ฒๆๅญไธฒ๏ผ่ฅ็ๆฌๅทฒ็ฅๅ็บ็ฉบๅญไธฒ๏ผ |
| """ |
| if version: |
| return "" |
| return ( |
| f"[็ๆฌๆช็ฅ] ็กๆณ็ขบ่ช {package} ็็ขบๅ็ๆฌใ" |
| f"ไปฅไธ CVE ็บ่ฉฒๅฅไปถ็ๆๆๅทฒ็ฅๆผๆด๏ผ่ซ็ขบ่ชไฝ ็็ๆฌๆฏๅฆ่ฝๅจๅๅฝฑ้ฟ็ฏๅๅ
งๅๆกๅ่กๅใ" |
| ) |
|
|