""" input_sanitizer.py — L0 確定性輸入净化器 ========================================== 架構位置:Pipeline 最前端(在 CrewAI 啟動前執行) 層級防御架構(Phase 4C 更新): L0.5 WASM Sandbox ← wasmtime 沙盒過濾(Prompt Injection / Unicode / 超門檻) L0 Python 正則掃描 ← SQL/OS/模板 Injection 標記 L1 Blocklist ← 高信心惡意模式(直接拒絕) 依據: - FINAL_PLAN.md §3a:[⚙️ input_sanitizer.py] ← 確定性基礎設施(OWASP LLM01:2025) - OWASP LLM01:2025 Prompt Injection — 不可信輸入在進入 LLM 前必須先過濾 設計原則: - 純確定性運算(無 LLM、無外部 API) - 與 security_guard.py 的分工: input_sanitizer → 守門(Pipeline 前):截斷 + L0 正則掃描 + 禁區關鍵字過濾 security_guard → 提取(CrewAI 內):AST/正則程式碼結構提取,不做判斷 層級邊界:應用層(不引用 harness/entropy 層) """ from __future__ import annotations import hashlib import json import logging import os import re from dataclasses import dataclass, field from typing import Any logger = logging.getLogger("threathunter.input_sanitizer") # ══════════════════════════════════════════════════════════════ # Phase 4C: L0.5 WASM Sandbox 載入(Graceful Degradation) # ══════════════════════════════════════════════════════════════ # 環境變數控制:WASM_SANDBOX_ENABLED=false 可全局停用 _WASM_ENABLED = os.getenv("WASM_SANDBOX_ENABLED", "true").lower() not in ("false", "0", "no") try: if _WASM_ENABLED: import threathunter_prompt_sandbox as _wasm_mod _WASM_AVAILABLE = True logger.info("[InputSanitizer] Phase 4C: WASM Sandbox 啟用 (v%s)", _wasm_mod.sandbox_version()) else: _wasm_mod = None # type: ignore _WASM_AVAILABLE = False logger.info("[InputSanitizer] WASM_SANDBOX_ENABLED=false, 跳過 L0.5 層") except ImportError: _wasm_mod = None # type: ignore _WASM_AVAILABLE = False logger.warning( "[InputSanitizer] threathunter_prompt_sandbox 不可用(未編譯)," "降級為純 Python L0 過濾" ) def _wasm_eval(text: str) -> dict[str, Any]: """ 呼叫 WASM Sandbox 評估輸入安全性。 Returns: {"code": int, "verdict": str, "reason": str, "engine": str} 若 WASM 不可用,回傳 {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable"} """ if not _WASM_AVAILABLE or _wasm_mod is None: return {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable", "engine": "none"} try: raw = _wasm_mod.sandbox_eval(text) result = json.loads(raw) return result except Exception as exc: # noqa: BLE001 logger.warning("[InputSanitizer] WASM eval 異常: %s", exc) return {"code": 0, "verdict": "ALLOW", "reason": f"wasm_error:{exc}", "engine": "none"} # ══════════════════════════════════════════════════════════════ # 常數 # ══════════════════════════════════════════════════════════════ MAX_INPUT_LENGTH = 50_000 # 超過此長度截斷(避免 prompt flooding) MAX_LINE_COUNT = 2_000 # 超過此行數截斷(避免超長函式轟炸) # L0 正則掃描:確定性找可疑模式(SQL/Command Injection / 硬編碼憑證 / eval) # 這些是「通報」而非「阻擋」——仍然繼續處理,但標記給 Security Guard L0_PATTERNS: list[tuple[str, str, str]] = [ # (name, regex_pattern, description) ( "sql_injection", r"(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|CREATE|ALTER)\s+.{0,100}?\s*['\";]", "SQL 語句疑似拼接(SQL Injection 風險)", ), ( "os_command", r"(?i)(os\.system|subprocess\.call|subprocess\.run|popen|exec\(|eval\()\s*[\(\['\"]", "危險系統呼叫(OS Command Injection 風險)", ), ( "hardcoded_secret", r"(?i)(password|passwd|pwd|secret|api_key|apikey|token|auth)\s*=\s*['\"][^'\"]{4,}['\"]", "硬編碼憑證(Credential Exposure 風險)", ), ( "path_traversal", r"\.{2,}/|\.{2,}\\", "路徑穿越嘗試(Path Traversal 風險)", ), ( "template_injection", r"\{\{.{0,100}?\}\}|\{%.*?%\}", "模板語法(Template Injection 風險)", ), ( "xml_entity", r"system|<\|system\|>" r"|<>|<>" r")", "Prompt Injection 嘗試(OWASP LLM01)", ), ( "jailbreak", r"(?i)(" r"\bDAN\b" r"|jailbreak" r"|do\s+anything\s+now" r"|no\s+restrictions?\s+mode" r"|developer\s+mode" r"|unrestricted\s+mode" r"|pretend\s+(you\s+have\s+no|to\s+be)" r"|unethical\s+twin" r"|without\s+(?:moral|ethical|safety)\s+(?:limits?|filters?|restrictions?)" r"|(?:constitution|guidelines)\s+(?:is\s+)?disabled" r"|\[INST\]\s*<>" r")", "越獄嘗試(OWASP LLM01)", ), ] # 完全禁止通過的關鍵字(比 L0 正則更嚴格) # 這些是高信心惡意模式,直接拒絕而非標記 BLOCKLIST_PATTERNS: list[tuple[str, str]] = [ (r"(?i)\bDROP\s+TABLE\b", "偵測到 DROP TABLE 語句"), (r"(?i)\bxp_cmdshell\b", "SQL Server 命令執行指令"), (r"(?i)\bSHUTDOWN\s+WITH\s+NOWAIT\b", "資料庫關機指令"), (r"(?i)\b(\d+|'[^']*')\s+OR\s+('?\d+'?\s*=\s*'?\d+'?|\w+\s*=\s*\w+)\s*(UNION|--)", "SQL Boolean-based OR 注入(高信心)"), (r"(?i)UNION\s+(?:ALL\s+)?SELECT\s+\*\s+FROM\s+\w+", "SQL UNION SELECT 注入(高信心)"), ] # ══════════════════════════════════════════════════════════════ # 資料結構 # ══════════════════════════════════════════════════════════════ @dataclass class L0Finding: """L0 掃描的單一發現""" pattern_name: str description: str line_no: int matched_text: str # 截斷至 100 字符,避免回顯惡意內容 severity: str # "WARNING" | "INFO" @dataclass class SanitizeResult: """ 淨化結果。 Attributes: safe: 是否允許進入 Pipeline(False 時應拒絕) blocked_reason: 若 safe=False,說明原因 truncated: 輸入是否被截斷 original_length: 原始長度 sanitized_input: 截斷後的淨化輸入(供 Pipeline 使用) l0_findings: L0 正則掃描發現(WARNING 級別,仍允許進入但標記) input_hash: SHA-256 前 16 字元(用於去重 / 日誌追蹤) input_type: 推斷的輸入類型 wasm_verdict: L0.5 WASM Sandbox 評估結果(Phase 4C) """ safe: bool sanitized_input: str truncated: bool original_length: int l0_findings: list[L0Finding] = field(default_factory=list) blocked_reason: str = "" input_hash: str = "" input_type: str = "unknown" wasm_verdict: dict = field(default_factory=dict) # ══════════════════════════════════════════════════════════════ # 輸入類型推斷 # ══════════════════════════════════════════════════════════════ def _infer_input_type(text: str) -> str: """ 推斷輸入類型,供 Orchestrator 路由決策參考。 v3.1:支援多語言程式碼偵測(Python/JS/TS/Java/Go/PHP/Ruby/Rust/C/C++)。 Returns: "package_list" → 套件清單(路徑 A) "source_code" → 程式碼(路徑 B) "config_file" → 配置文件(路徑 C) "sql_review" → 孤立 SQL corpus / SQL 語法審查(路徑 C-like) "mixed" → 混合(預設路徑 B) """ if _looks_like_sql_review(text): return "sql_review" # 程式碼特徵(多語言) code_signals = [ # Python bool(re.search(r"^\s*(def |class |import |from \w+\s+import )", text, re.MULTILINE)), # JavaScript / TypeScript bool(re.search(r"(?:const|let|var)\s+\w+\s*=|=>\s*\{|require\s*\(|export\s+(?:default|const|function)", text, re.MULTILINE)), # Java bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text, re.MULTILINE)), # Go bool(re.search(r"^(?:package\s+\w+|func\s+\w+|:=)", text, re.MULTILINE)), # PHP bool(re.search(r"<\?php|\$\w+\s*=", text)), # Ruby bool(re.search(r"^\s*(?:def\s+\w+|require\s+['\"]|module\s+\w+|class\s+\w+\s*<)", text, re.MULTILINE)), # Rust bool(re.search(r"(?:fn\s+\w+|let\s+mut\s+|impl\s+\w+|use\s+\w+::)", text, re.MULTILINE)), # C / C++ bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(|printf\s*\(|std::", text, re.MULTILINE)), # C# bool(re.search(r"using\s+System(?:\.\w+)?\s*;|namespace\s+\w+|public\s+class\s+\w+", text, re.MULTILINE)), # Shebang bool(re.search(r"^#!\/", text)), # 通用:大括號語言 + 多行 bool(re.search(r"[{}();]", text)) and text.count("\n") > 5, ] # 套件清單特徵:`name==version` 或 `name>=version` 或單純名稱列表 pkg_signals = [ bool(re.search(r"^[\w\-\.]+[>== 2 or (config_score >= 1 and code_score == 0 and pkg_score < 2): return "config_file" if code_score >= 2: return "source_code" if pkg_score >= 2: return "package_list" if code_score >= 1: return "source_code" return "package_list" def _looks_like_sql_review(text: str) -> bool: """ 判斷輸入是否更像孤立 SQL corpus,而不是應用程式 source code。 SQL Injection 的可利用性需要 application sink/source;孤立 `.sql` 文本只能做語法與 payload review,因此在 L0 先分流。 """ if not text or not text.strip(): return False sql_statement_hits = len(re.findall( r"(?im)^\s*(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|WITH|EXEC|GRANT)\b", text, )) sql_payload_hits = len(re.findall( r"(?i)\b(?:UNION\s+SELECT|OR\s+1\s*=\s*1|SLEEP\s*\(|WAITFOR\s+DELAY|" r"EXTRACTVALUE\s*\(|UPDATEXML\s*\(|xp_cmdshell|sp_executesql|EXEC\s*\(|" r"CREATE\s+USER|GRANT\s+ALL|\$gt|\$where)\b", text, )) sql_comment_hits = len(re.findall(r"(?m)^\s*(?:--|/\*)", text)) application_code_signals = [ bool(re.search(r"^\s*(?:def |class |import |from \w+\s+import )", text, re.MULTILINE)), bool(re.search(r"(?:const|let|var)\s+\w+\s*=|function\s+\w+\s*\(", text)), bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text)), bool(re.search(r"^(?:package\s+\w+|func\s+\w+)", text, re.MULTILINE)), bool(re.search(r"<\?php|\$\w+\s*=", text)), bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(", text)), bool(re.search(r"using\s+System(?:\.\w+)?\s*;|public\s+class\s+\w+", text, re.MULTILINE)), ] if sum(application_code_signals) >= 1: return False return sql_statement_hits >= 2 or (sql_statement_hits >= 1 and (sql_payload_hits + sql_comment_hits) >= 2) def _wasm_block_finding(reason: str, text: str) -> L0Finding: """將 WASM L0.5 封鎖訊號保留成可稽核 finding。""" normalized = (reason or "wasm_block").lower() if "prompt" in normalized or "instruction" in normalized: pattern_name = "wasm_prompt_injection" elif "jailbreak" in normalized: pattern_name = "wasm_jailbreak" elif "code" in normalized or "command" in normalized: pattern_name = "wasm_code_injection" else: pattern_name = "wasm_l0_block" return L0Finding( pattern_name=pattern_name, description=f"WASM L0.5 flagged input before type-aware review: {reason}", line_no=1, matched_text=(text or "")[:100], severity="WARNING", ) def _extract_safe_targets_from_blocked_text(text: str) -> str: """從混入 prompt injection 的輸入中只保留可掃描目標。""" target_pattern = re.compile( r"\b(?!CVE\b)([A-Za-z][A-Za-z0-9_.+-]{1,40})\s*" r"(?:==|>=|<=|~=|=|\s+)\s*(v?\d[\w.+-]{0,30})\b" ) blocked_words = re.compile( r"(?i)(ignore|forget|instruction|system|rule|constitution|chatbot|" r"hacked|developer\s+mode|jailbreak|dan|say\s+['\"]|output)" ) safe_targets: list[str] = [] for match in target_pattern.finditer(text): package, version = match.groups() start = max(0, match.start() - 80) end = min(len(text), match.end() + 80) context = text[start:end] if blocked_words.search(context) and not re.search(r"(?i)(django|postgresql|postgres|redis|nginx|flask|express|spring|openssl|apache)", package): continue normalized = f"{package} {version}".strip() if normalized not in safe_targets: safe_targets.append(normalized) return "\n".join(safe_targets) # ══════════════════════════════════════════════════════════════ # 核心淨化函式 # ══════════════════════════════════════════════════════════════ def sanitize_input(raw_input: str) -> SanitizeResult: """ 對原始用戶輸入進行確定性淨化。 流程: 1. 計算 input_hash(用於日誌追蹤) 2. L0.5 WASM Sandbox 評估 3. 截斷超長輸入 4. Blocklist 掃描(高信心惡意 → 直接拒絕) 5. L0 正則掃描(標記,仍允許通過) 6. 推斷輸入類型 7. 返回 SanitizeResult Args: raw_input: 用戶原始輸入字串 Returns: SanitizeResult — 淨化後結果 """ if not isinstance(raw_input, str): raw_input = str(raw_input) original_length = len(raw_input) # ── 步驟 1:計算 hash ────────────────────────────────────── input_hash = hashlib.sha256(raw_input.encode("utf-8", errors="replace")).hexdigest()[:16] logger.debug("[SANITIZE] hash=%s original_len=%d", input_hash, original_length) # ── 步驟 1.5:L0.5 WASM Sandbox (Phase 4C) ────────────────── wasm_verdict = _wasm_eval(raw_input) wasm_code = wasm_verdict.get("code", 0) wasm_reason = wasm_verdict.get("reason", "ok") wasm_block_msg = "" if wasm_code == 1: # BLOCK wasm_block_msg = f"[WASM-L0.5] BLOCK: {wasm_reason}" logger.warning("[SANITIZE][%s] %s", input_hash, wasm_block_msg) elif wasm_code == 3: # TRUNCATE — WASM 建議截斷,繼續處理 logger.info("[SANITIZE][%s] WASM TRUNCATE 建議", input_hash) raw_input = raw_input[:MAX_INPUT_LENGTH] # ── 步驟 2:截斷 ────────────────────────────────────────── truncated = False text = raw_input if len(text) > MAX_INPUT_LENGTH: text = text[:MAX_INPUT_LENGTH] truncated = True logger.warning( "[SANITIZE][%s] Input truncated: %d → %d chars", input_hash, original_length, MAX_INPUT_LENGTH, ) # 超過行數也截斷 lines = text.splitlines() if len(lines) > MAX_LINE_COUNT: text = "\n".join(lines[:MAX_LINE_COUNT]) truncated = True logger.warning( "[SANITIZE][%s] Input truncated to %d lines", input_hash, MAX_LINE_COUNT ) preliminary_sql_review = _looks_like_sql_review(text) # ── 步驟 3:Blocklist 掃描(直接拒絕) ────────────────────── for block_pattern, reason in BLOCKLIST_PATTERNS: if re.search(block_pattern, text): if preliminary_sql_review: l0_findings = [ L0Finding( pattern_name="sql_review_payload", description=f"SQL review corpus contains blocked payload syntax: {reason}", line_no=1, matched_text=text[:100], severity="WARNING", ) ] break logger.warning("[SANITIZE][%s] BLOCKED: %s", input_hash, reason) return SanitizeResult( safe=False, sanitized_input="", truncated=truncated, original_length=original_length, blocked_reason=reason, input_hash=input_hash, input_type="blocked", ) # ── 步驟 4:L0 正則掃描(標記,不拒絕) ───────────────────── if "l0_findings" not in locals(): l0_findings: list[L0Finding] = [] text_lines = text.splitlines() for pattern_name, pattern, description in L0_PATTERNS: try: for match in re.finditer(pattern, text): # 計算行號 line_no = text[: match.start()].count("\n") + 1 matched_snippet = match.group(0)[:100] # 截斷,避免回顯惡意內容 finding = L0Finding( pattern_name=pattern_name, description=description, line_no=line_no, matched_text=matched_snippet, severity="WARNING" if "injection" in pattern_name or "jailbreak" in pattern_name else "INFO", ) l0_findings.append(finding) logger.info( "[SANITIZE][%s] L0 finding: %s @ line %d", input_hash, pattern_name, line_no, ) except re.error as e: logger.error("[SANITIZE] Regex error for pattern %s: %s", pattern_name, e) # ── 步驟 5:推斷輸入類型 ──────────────────────────────────── input_type = _infer_input_type(text) if wasm_block_msg: l0_findings.append(_wasm_block_finding(str(wasm_reason), text)) if input_type not in {"source_code", "config_file", "mixed", "sql_review"}: safe_targets = _extract_safe_targets_from_blocked_text(text) if safe_targets: text = safe_targets input_type = _infer_input_type(text) logger.warning( "[SANITIZE][%s] WASM block sanitized to safe targets: %s", input_hash, text, ) else: logger.warning( "[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d", input_hash, input_type, wasm_block_msg, len(l0_findings), ) return SanitizeResult( safe=False, sanitized_input="", truncated=truncated, original_length=original_length, l0_findings=l0_findings, blocked_reason=wasm_block_msg, input_hash=input_hash, input_type="blocked", wasm_verdict=wasm_verdict, ) if input_type not in {"source_code", "config_file", "mixed", "package_list", "sql_review"}: logger.warning( "[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d", input_hash, input_type, wasm_block_msg, len(l0_findings), ) return SanitizeResult( safe=False, sanitized_input="", truncated=truncated, original_length=original_length, l0_findings=l0_findings, blocked_reason=wasm_block_msg, input_hash=input_hash, input_type="blocked", wasm_verdict=wasm_verdict, ) logger.info( "[SANITIZE][%s] Result: safe=True type=%s truncated=%s l0_count=%d", input_hash, input_type, truncated, len(l0_findings), ) return SanitizeResult( safe=True, sanitized_input=text, truncated=truncated, original_length=original_length, l0_findings=l0_findings, input_hash=input_hash, input_type=input_type, wasm_verdict=wasm_verdict, ) def format_l0_report(result: SanitizeResult) -> dict[str, Any]: """ 將 SanitizeResult 轉換為 Pipeline 可用的字典格式。 供 main.py 使用,注入至 Orchestrator 的路由決策。 Returns: { "safe": bool, "input_type": str, "truncated": bool, "input_hash": str, "l0_findings": [{"pattern": str, "description": str, "line_no": int, "severity": str}], "l0_warning_count": int, "blocked_reason": str, } """ return { "safe": result.safe, "input_type": result.input_type, "truncated": result.truncated, "input_hash": result.input_hash, "blocked_reason": result.blocked_reason, "wasm_verdict": result.wasm_verdict, # Phase 4C: L0.5 WASM 評估 "l0_findings": [ { "pattern": f.pattern_name, "description": f.description, "line_no": f.line_no, "severity": f.severity, } for f in result.l0_findings ], "l0_warning_count": sum(1 for f in result.l0_findings if f.severity == "WARNING"), }