"""
input_sanitizer.py — L0 確定性輸入净化器
==========================================
架構位置:Pipeline 最前端(在 CrewAI 啟動前執行)
層級防御架構(Phase 4C 更新):
L0.5 WASM Sandbox ← wasmtime 沙盒過濾(Prompt Injection / Unicode / 超門檻)
L0 Python 正則掃描 ← SQL/OS/模板 Injection 標記
L1 Blocklist ← 高信心惡意模式(直接拒絕)
依據:
- FINAL_PLAN.md §3a:[⚙️ input_sanitizer.py] ← 確定性基礎設施(OWASP LLM01:2025)
- OWASP LLM01:2025 Prompt Injection — 不可信輸入在進入 LLM 前必須先過濾
設計原則:
- 純確定性運算(無 LLM、無外部 API)
- 與 security_guard.py 的分工:
input_sanitizer → 守門(Pipeline 前):截斷 + L0 正則掃描 + 禁區關鍵字過濾
security_guard → 提取(CrewAI 內):AST/正則程式碼結構提取,不做判斷
層級邊界:應用層(不引用 harness/entropy 層)
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger("threathunter.input_sanitizer")
# ══════════════════════════════════════════════════════════════
# Phase 4C: L0.5 WASM Sandbox 載入(Graceful Degradation)
# ══════════════════════════════════════════════════════════════
# 環境變數控制:WASM_SANDBOX_ENABLED=false 可全局停用
_WASM_ENABLED = os.getenv("WASM_SANDBOX_ENABLED", "true").lower() not in ("false", "0", "no")
try:
if _WASM_ENABLED:
import threathunter_prompt_sandbox as _wasm_mod
_WASM_AVAILABLE = True
logger.info("[InputSanitizer] Phase 4C: WASM Sandbox 啟用 (v%s)", _wasm_mod.sandbox_version())
else:
_wasm_mod = None # type: ignore
_WASM_AVAILABLE = False
logger.info("[InputSanitizer] WASM_SANDBOX_ENABLED=false, 跳過 L0.5 層")
except ImportError:
_wasm_mod = None # type: ignore
_WASM_AVAILABLE = False
logger.warning(
"[InputSanitizer] threathunter_prompt_sandbox 不可用(未編譯),"
"降級為純 Python L0 過濾"
)
def _wasm_eval(text: str) -> dict[str, Any]:
"""
呼叫 WASM Sandbox 評估輸入安全性。
Returns:
{"code": int, "verdict": str, "reason": str, "engine": str}
若 WASM 不可用,回傳 {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable"}
"""
if not _WASM_AVAILABLE or _wasm_mod is None:
return {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable", "engine": "none"}
try:
raw = _wasm_mod.sandbox_eval(text)
result = json.loads(raw)
return result
except Exception as exc: # noqa: BLE001
logger.warning("[InputSanitizer] WASM eval 異常: %s", exc)
return {"code": 0, "verdict": "ALLOW", "reason": f"wasm_error:{exc}", "engine": "none"}
# ══════════════════════════════════════════════════════════════
# 常數
# ══════════════════════════════════════════════════════════════
MAX_INPUT_LENGTH = 50_000 # 超過此長度截斷(避免 prompt flooding)
MAX_LINE_COUNT = 2_000 # 超過此行數截斷(避免超長函式轟炸)
# L0 正則掃描:確定性找可疑模式(SQL/Command Injection / 硬編碼憑證 / eval)
# 這些是「通報」而非「阻擋」——仍然繼續處理,但標記給 Security Guard
L0_PATTERNS: list[tuple[str, str, str]] = [
# (name, regex_pattern, description)
(
"sql_injection",
r"(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|CREATE|ALTER)\s+.{0,100}?\s*['\";]",
"SQL 語句疑似拼接(SQL Injection 風險)",
),
(
"os_command",
r"(?i)(os\.system|subprocess\.call|subprocess\.run|popen|exec\(|eval\()\s*[\(\['\"]",
"危險系統呼叫(OS Command Injection 風險)",
),
(
"hardcoded_secret",
r"(?i)(password|passwd|pwd|secret|api_key|apikey|token|auth)\s*=\s*['\"][^'\"]{4,}['\"]",
"硬編碼憑證(Credential Exposure 風險)",
),
(
"path_traversal",
r"\.{2,}/|\.{2,}\\",
"路徑穿越嘗試(Path Traversal 風險)",
),
(
"template_injection",
r"\{\{.{0,100}?\}\}|\{%.*?%\}",
"模板語法(Template Injection 風險)",
),
(
"xml_entity",
r"system|<\|system\|>"
r"|<>|<>"
r")",
"Prompt Injection 嘗試(OWASP LLM01)",
),
(
"jailbreak",
r"(?i)("
r"\bDAN\b"
r"|jailbreak"
r"|do\s+anything\s+now"
r"|no\s+restrictions?\s+mode"
r"|developer\s+mode"
r"|unrestricted\s+mode"
r"|pretend\s+(you\s+have\s+no|to\s+be)"
r"|unethical\s+twin"
r"|without\s+(?:moral|ethical|safety)\s+(?:limits?|filters?|restrictions?)"
r"|(?:constitution|guidelines)\s+(?:is\s+)?disabled"
r"|\[INST\]\s*<>"
r")",
"越獄嘗試(OWASP LLM01)",
),
]
# 完全禁止通過的關鍵字(比 L0 正則更嚴格)
# 這些是高信心惡意模式,直接拒絕而非標記
BLOCKLIST_PATTERNS: list[tuple[str, str]] = [
(r"(?i)\bDROP\s+TABLE\b", "偵測到 DROP TABLE 語句"),
(r"(?i)\bxp_cmdshell\b", "SQL Server 命令執行指令"),
(r"(?i)\bSHUTDOWN\s+WITH\s+NOWAIT\b", "資料庫關機指令"),
(r"(?i)\b(\d+|'[^']*')\s+OR\s+('?\d+'?\s*=\s*'?\d+'?|\w+\s*=\s*\w+)\s*(UNION|--)",
"SQL Boolean-based OR 注入(高信心)"),
(r"(?i)UNION\s+(?:ALL\s+)?SELECT\s+\*\s+FROM\s+\w+",
"SQL UNION SELECT 注入(高信心)"),
]
# ══════════════════════════════════════════════════════════════
# 資料結構
# ══════════════════════════════════════════════════════════════
@dataclass
class L0Finding:
"""L0 掃描的單一發現"""
pattern_name: str
description: str
line_no: int
matched_text: str # 截斷至 100 字符,避免回顯惡意內容
severity: str # "WARNING" | "INFO"
@dataclass
class SanitizeResult:
"""
淨化結果。
Attributes:
safe: 是否允許進入 Pipeline(False 時應拒絕)
blocked_reason: 若 safe=False,說明原因
truncated: 輸入是否被截斷
original_length: 原始長度
sanitized_input: 截斷後的淨化輸入(供 Pipeline 使用)
l0_findings: L0 正則掃描發現(WARNING 級別,仍允許進入但標記)
input_hash: SHA-256 前 16 字元(用於去重 / 日誌追蹤)
input_type: 推斷的輸入類型
wasm_verdict: L0.5 WASM Sandbox 評估結果(Phase 4C)
"""
safe: bool
sanitized_input: str
truncated: bool
original_length: int
l0_findings: list[L0Finding] = field(default_factory=list)
blocked_reason: str = ""
input_hash: str = ""
input_type: str = "unknown"
wasm_verdict: dict = field(default_factory=dict)
# ══════════════════════════════════════════════════════════════
# 輸入類型推斷
# ══════════════════════════════════════════════════════════════
def _infer_input_type(text: str) -> str:
"""
推斷輸入類型,供 Orchestrator 路由決策參考。
v3.1:支援多語言程式碼偵測(Python/JS/TS/Java/Go/PHP/Ruby/Rust/C/C++)。
Returns:
"package_list" → 套件清單(路徑 A)
"source_code" → 程式碼(路徑 B)
"config_file" → 配置文件(路徑 C)
"sql_review" → 孤立 SQL corpus / SQL 語法審查(路徑 C-like)
"mixed" → 混合(預設路徑 B)
"""
if _looks_like_sql_review(text):
return "sql_review"
# 程式碼特徵(多語言)
code_signals = [
# Python
bool(re.search(r"^\s*(def |class |import |from \w+\s+import )", text, re.MULTILINE)),
# JavaScript / TypeScript
bool(re.search(r"(?:const|let|var)\s+\w+\s*=|=>\s*\{|require\s*\(|export\s+(?:default|const|function)", text, re.MULTILINE)),
# Java
bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text, re.MULTILINE)),
# Go
bool(re.search(r"^(?:package\s+\w+|func\s+\w+|:=)", text, re.MULTILINE)),
# PHP
bool(re.search(r"<\?php|\$\w+\s*=", text)),
# Ruby
bool(re.search(r"^\s*(?:def\s+\w+|require\s+['\"]|module\s+\w+|class\s+\w+\s*<)", text, re.MULTILINE)),
# Rust
bool(re.search(r"(?:fn\s+\w+|let\s+mut\s+|impl\s+\w+|use\s+\w+::)", text, re.MULTILINE)),
# C / C++
bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(|printf\s*\(|std::", text, re.MULTILINE)),
# C#
bool(re.search(r"using\s+System(?:\.\w+)?\s*;|namespace\s+\w+|public\s+class\s+\w+", text, re.MULTILINE)),
# Shebang
bool(re.search(r"^#!\/", text)),
# 通用:大括號語言 + 多行
bool(re.search(r"[{}();]", text)) and text.count("\n") > 5,
]
# 套件清單特徵:`name==version` 或 `name>=version` 或單純名稱列表
pkg_signals = [
bool(re.search(r"^[\w\-\.]+[>== 2 or (config_score >= 1 and code_score == 0 and pkg_score < 2):
return "config_file"
if code_score >= 2:
return "source_code"
if pkg_score >= 2:
return "package_list"
if code_score >= 1:
return "source_code"
return "package_list"
def _looks_like_sql_review(text: str) -> bool:
"""
判斷輸入是否更像孤立 SQL corpus,而不是應用程式 source code。
SQL Injection 的可利用性需要 application sink/source;孤立 `.sql`
文本只能做語法與 payload review,因此在 L0 先分流。
"""
if not text or not text.strip():
return False
sql_statement_hits = len(re.findall(
r"(?im)^\s*(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|WITH|EXEC|GRANT)\b",
text,
))
sql_payload_hits = len(re.findall(
r"(?i)\b(?:UNION\s+SELECT|OR\s+1\s*=\s*1|SLEEP\s*\(|WAITFOR\s+DELAY|"
r"EXTRACTVALUE\s*\(|UPDATEXML\s*\(|xp_cmdshell|sp_executesql|EXEC\s*\(|"
r"CREATE\s+USER|GRANT\s+ALL|\$gt|\$where)\b",
text,
))
sql_comment_hits = len(re.findall(r"(?m)^\s*(?:--|/\*)", text))
application_code_signals = [
bool(re.search(r"^\s*(?:def |class |import |from \w+\s+import )", text, re.MULTILINE)),
bool(re.search(r"(?:const|let|var)\s+\w+\s*=|function\s+\w+\s*\(", text)),
bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text)),
bool(re.search(r"^(?:package\s+\w+|func\s+\w+)", text, re.MULTILINE)),
bool(re.search(r"<\?php|\$\w+\s*=", text)),
bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(", text)),
bool(re.search(r"using\s+System(?:\.\w+)?\s*;|public\s+class\s+\w+", text, re.MULTILINE)),
]
if sum(application_code_signals) >= 1:
return False
return sql_statement_hits >= 2 or (sql_statement_hits >= 1 and (sql_payload_hits + sql_comment_hits) >= 2)
def _wasm_block_finding(reason: str, text: str) -> L0Finding:
"""將 WASM L0.5 封鎖訊號保留成可稽核 finding。"""
normalized = (reason or "wasm_block").lower()
if "prompt" in normalized or "instruction" in normalized:
pattern_name = "wasm_prompt_injection"
elif "jailbreak" in normalized:
pattern_name = "wasm_jailbreak"
elif "code" in normalized or "command" in normalized:
pattern_name = "wasm_code_injection"
else:
pattern_name = "wasm_l0_block"
return L0Finding(
pattern_name=pattern_name,
description=f"WASM L0.5 flagged input before type-aware review: {reason}",
line_no=1,
matched_text=(text or "")[:100],
severity="WARNING",
)
def _extract_safe_targets_from_blocked_text(text: str) -> str:
"""從混入 prompt injection 的輸入中只保留可掃描目標。"""
target_pattern = re.compile(
r"\b(?!CVE\b)([A-Za-z][A-Za-z0-9_.+-]{1,40})\s*"
r"(?:==|>=|<=|~=|=|\s+)\s*(v?\d[\w.+-]{0,30})\b"
)
blocked_words = re.compile(
r"(?i)(ignore|forget|instruction|system|rule|constitution|chatbot|"
r"hacked|developer\s+mode|jailbreak|dan|say\s+['\"]|output)"
)
safe_targets: list[str] = []
for match in target_pattern.finditer(text):
package, version = match.groups()
start = max(0, match.start() - 80)
end = min(len(text), match.end() + 80)
context = text[start:end]
if blocked_words.search(context) and not re.search(r"(?i)(django|postgresql|postgres|redis|nginx|flask|express|spring|openssl|apache)", package):
continue
normalized = f"{package} {version}".strip()
if normalized not in safe_targets:
safe_targets.append(normalized)
return "\n".join(safe_targets)
# ══════════════════════════════════════════════════════════════
# 核心淨化函式
# ══════════════════════════════════════════════════════════════
def sanitize_input(raw_input: str) -> SanitizeResult:
"""
對原始用戶輸入進行確定性淨化。
流程:
1. 計算 input_hash(用於日誌追蹤)
2. L0.5 WASM Sandbox 評估
3. 截斷超長輸入
4. Blocklist 掃描(高信心惡意 → 直接拒絕)
5. L0 正則掃描(標記,仍允許通過)
6. 推斷輸入類型
7. 返回 SanitizeResult
Args:
raw_input: 用戶原始輸入字串
Returns:
SanitizeResult — 淨化後結果
"""
if not isinstance(raw_input, str):
raw_input = str(raw_input)
original_length = len(raw_input)
# ── 步驟 1:計算 hash ──────────────────────────────────────
input_hash = hashlib.sha256(raw_input.encode("utf-8", errors="replace")).hexdigest()[:16]
logger.debug("[SANITIZE] hash=%s original_len=%d", input_hash, original_length)
# ── 步驟 1.5:L0.5 WASM Sandbox (Phase 4C) ──────────────────
wasm_verdict = _wasm_eval(raw_input)
wasm_code = wasm_verdict.get("code", 0)
wasm_reason = wasm_verdict.get("reason", "ok")
wasm_block_msg = ""
if wasm_code == 1: # BLOCK
wasm_block_msg = f"[WASM-L0.5] BLOCK: {wasm_reason}"
logger.warning("[SANITIZE][%s] %s", input_hash, wasm_block_msg)
elif wasm_code == 3: # TRUNCATE — WASM 建議截斷,繼續處理
logger.info("[SANITIZE][%s] WASM TRUNCATE 建議", input_hash)
raw_input = raw_input[:MAX_INPUT_LENGTH]
# ── 步驟 2:截斷 ──────────────────────────────────────────
truncated = False
text = raw_input
if len(text) > MAX_INPUT_LENGTH:
text = text[:MAX_INPUT_LENGTH]
truncated = True
logger.warning(
"[SANITIZE][%s] Input truncated: %d → %d chars",
input_hash, original_length, MAX_INPUT_LENGTH,
)
# 超過行數也截斷
lines = text.splitlines()
if len(lines) > MAX_LINE_COUNT:
text = "\n".join(lines[:MAX_LINE_COUNT])
truncated = True
logger.warning(
"[SANITIZE][%s] Input truncated to %d lines", input_hash, MAX_LINE_COUNT
)
preliminary_sql_review = _looks_like_sql_review(text)
# ── 步驟 3:Blocklist 掃描(直接拒絕) ──────────────────────
for block_pattern, reason in BLOCKLIST_PATTERNS:
if re.search(block_pattern, text):
if preliminary_sql_review:
l0_findings = [
L0Finding(
pattern_name="sql_review_payload",
description=f"SQL review corpus contains blocked payload syntax: {reason}",
line_no=1,
matched_text=text[:100],
severity="WARNING",
)
]
break
logger.warning("[SANITIZE][%s] BLOCKED: %s", input_hash, reason)
return SanitizeResult(
safe=False,
sanitized_input="",
truncated=truncated,
original_length=original_length,
blocked_reason=reason,
input_hash=input_hash,
input_type="blocked",
)
# ── 步驟 4:L0 正則掃描(標記,不拒絕) ─────────────────────
if "l0_findings" not in locals():
l0_findings: list[L0Finding] = []
text_lines = text.splitlines()
for pattern_name, pattern, description in L0_PATTERNS:
try:
for match in re.finditer(pattern, text):
# 計算行號
line_no = text[: match.start()].count("\n") + 1
matched_snippet = match.group(0)[:100] # 截斷,避免回顯惡意內容
finding = L0Finding(
pattern_name=pattern_name,
description=description,
line_no=line_no,
matched_text=matched_snippet,
severity="WARNING" if "injection" in pattern_name or "jailbreak" in pattern_name else "INFO",
)
l0_findings.append(finding)
logger.info(
"[SANITIZE][%s] L0 finding: %s @ line %d",
input_hash, pattern_name, line_no,
)
except re.error as e:
logger.error("[SANITIZE] Regex error for pattern %s: %s", pattern_name, e)
# ── 步驟 5:推斷輸入類型 ────────────────────────────────────
input_type = _infer_input_type(text)
if wasm_block_msg:
l0_findings.append(_wasm_block_finding(str(wasm_reason), text))
if input_type not in {"source_code", "config_file", "mixed", "sql_review"}:
safe_targets = _extract_safe_targets_from_blocked_text(text)
if safe_targets:
text = safe_targets
input_type = _infer_input_type(text)
logger.warning(
"[SANITIZE][%s] WASM block sanitized to safe targets: %s",
input_hash, text,
)
else:
logger.warning(
"[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d",
input_hash, input_type, wasm_block_msg, len(l0_findings),
)
return SanitizeResult(
safe=False,
sanitized_input="",
truncated=truncated,
original_length=original_length,
l0_findings=l0_findings,
blocked_reason=wasm_block_msg,
input_hash=input_hash,
input_type="blocked",
wasm_verdict=wasm_verdict,
)
if input_type not in {"source_code", "config_file", "mixed", "package_list", "sql_review"}:
logger.warning(
"[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d",
input_hash, input_type, wasm_block_msg, len(l0_findings),
)
return SanitizeResult(
safe=False,
sanitized_input="",
truncated=truncated,
original_length=original_length,
l0_findings=l0_findings,
blocked_reason=wasm_block_msg,
input_hash=input_hash,
input_type="blocked",
wasm_verdict=wasm_verdict,
)
logger.info(
"[SANITIZE][%s] Result: safe=True type=%s truncated=%s l0_count=%d",
input_hash, input_type, truncated, len(l0_findings),
)
return SanitizeResult(
safe=True,
sanitized_input=text,
truncated=truncated,
original_length=original_length,
l0_findings=l0_findings,
input_hash=input_hash,
input_type=input_type,
wasm_verdict=wasm_verdict,
)
def format_l0_report(result: SanitizeResult) -> dict[str, Any]:
"""
將 SanitizeResult 轉換為 Pipeline 可用的字典格式。
供 main.py 使用,注入至 Orchestrator 的路由決策。
Returns:
{
"safe": bool,
"input_type": str,
"truncated": bool,
"input_hash": str,
"l0_findings": [{"pattern": str, "description": str, "line_no": int, "severity": str}],
"l0_warning_count": int,
"blocked_reason": str,
}
"""
return {
"safe": result.safe,
"input_type": result.input_type,
"truncated": result.truncated,
"input_hash": result.input_hash,
"blocked_reason": result.blocked_reason,
"wasm_verdict": result.wasm_verdict, # Phase 4C: L0.5 WASM 評估
"l0_findings": [
{
"pattern": f.pattern_name,
"description": f.description,
"line_no": f.line_no,
"severity": f.severity,
}
for f in result.l0_findings
],
"l0_warning_count": sum(1 for f in result.l0_findings if f.severity == "WARNING"),
}