Threat_Hunter / input_sanitizer.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
"""
input_sanitizer.py โ€” L0 ็ขบๅฎšๆ€ง่ผธๅ…ฅๅ‡€ๅŒ–ๅ™จ
==========================================
ๆžถๆง‹ไฝ็ฝฎ๏ผšPipeline ๆœ€ๅ‰็ซฏ๏ผˆๅœจ CrewAI ๅ•Ÿๅ‹•ๅ‰ๅŸท่กŒ๏ผ‰
ๅฑค็ดš้˜ฒๅพกๆžถๆง‹๏ผˆPhase 4C ๆ›ดๆ–ฐ๏ผ‰๏ผš
L0.5 WASM Sandbox โ† wasmtime ๆฒ™็›’้Žๆฟพ๏ผˆPrompt Injection / Unicode / ่ถ…้–€ๆชป๏ผ‰
L0 Python ๆญฃๅ‰‡ๆŽƒๆ โ† SQL/OS/ๆจกๆฟ Injection ๆจ™่จ˜
L1 Blocklist โ† ้ซ˜ไฟกๅฟƒๆƒกๆ„ๆจกๅผ๏ผˆ็›ดๆŽฅๆ‹’็ต•๏ผ‰
ไพๆ“š๏ผš
- FINAL_PLAN.md ยง3a๏ผš[โš™๏ธ input_sanitizer.py] โ† ็ขบๅฎšๆ€งๅŸบ็คŽ่จญๆ–ฝ๏ผˆOWASP LLM01:2025๏ผ‰
- OWASP LLM01:2025 Prompt Injection โ€” ไธๅฏไฟก่ผธๅ…ฅๅœจ้€ฒๅ…ฅ LLM ๅ‰ๅฟ…้ ˆๅ…ˆ้Žๆฟพ
่จญ่จˆๅŽŸๅ‰‡๏ผš
- ็ด”็ขบๅฎšๆ€ง้‹็ฎ—๏ผˆ็„ก LLMใ€็„กๅค–้ƒจ API๏ผ‰
- ่ˆ‡ security_guard.py ็š„ๅˆ†ๅทฅ๏ผš
input_sanitizer โ†’ ๅฎˆ้–€๏ผˆPipeline ๅ‰๏ผ‰๏ผšๆˆชๆ–ท + L0 ๆญฃๅ‰‡ๆŽƒๆ + ็ฆๅ€้—œ้ตๅญ—้Žๆฟพ
security_guard โ†’ ๆๅ–๏ผˆCrewAI ๅ…ง๏ผ‰๏ผšAST/ๆญฃๅ‰‡็จ‹ๅผ็ขผ็ตๆง‹ๆๅ–๏ผŒไธๅšๅˆคๆ–ท
ๅฑค็ดš้‚Š็•Œ๏ผšๆ‡‰็”จๅฑค๏ผˆไธๅผ•็”จ harness/entropy ๅฑค๏ผ‰
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger("threathunter.input_sanitizer")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Phase 4C: L0.5 WASM Sandbox ่ผ‰ๅ…ฅ๏ผˆGraceful Degradation๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็’ฐๅขƒ่ฎŠๆ•ธๆŽงๅˆถ๏ผšWASM_SANDBOX_ENABLED=false ๅฏๅ…จๅฑ€ๅœ็”จ
_WASM_ENABLED = os.getenv("WASM_SANDBOX_ENABLED", "true").lower() not in ("false", "0", "no")
try:
if _WASM_ENABLED:
import threathunter_prompt_sandbox as _wasm_mod
_WASM_AVAILABLE = True
logger.info("[InputSanitizer] Phase 4C: WASM Sandbox ๅ•Ÿ็”จ (v%s)", _wasm_mod.sandbox_version())
else:
_wasm_mod = None # type: ignore
_WASM_AVAILABLE = False
logger.info("[InputSanitizer] WASM_SANDBOX_ENABLED=false, ่ทณ้Ž L0.5 ๅฑค")
except ImportError:
_wasm_mod = None # type: ignore
_WASM_AVAILABLE = False
logger.warning(
"[InputSanitizer] threathunter_prompt_sandbox ไธๅฏ็”จ๏ผˆๆœช็ทจ่ญฏ๏ผ‰๏ผŒ"
"้™็ดš็‚บ็ด” Python L0 ้Žๆฟพ"
)
def _wasm_eval(text: str) -> dict[str, Any]:
"""
ๅ‘ผๅซ WASM Sandbox ่ฉ•ไผฐ่ผธๅ…ฅๅฎ‰ๅ…จๆ€งใ€‚
Returns:
{"code": int, "verdict": str, "reason": str, "engine": str}
่‹ฅ WASM ไธๅฏ็”จ๏ผŒๅ›žๅ‚ณ {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable"}
"""
if not _WASM_AVAILABLE or _wasm_mod is None:
return {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable", "engine": "none"}
try:
raw = _wasm_mod.sandbox_eval(text)
result = json.loads(raw)
return result
except Exception as exc: # noqa: BLE001
logger.warning("[InputSanitizer] WASM eval ็•ฐๅธธ: %s", exc)
return {"code": 0, "verdict": "ALLOW", "reason": f"wasm_error:{exc}", "engine": "none"}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅธธๆ•ธ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
MAX_INPUT_LENGTH = 50_000 # ่ถ…้Žๆญค้•ทๅบฆๆˆชๆ–ท๏ผˆ้ฟๅ… prompt flooding๏ผ‰
MAX_LINE_COUNT = 2_000 # ่ถ…้Žๆญค่กŒๆ•ธๆˆชๆ–ท๏ผˆ้ฟๅ…่ถ…้•ทๅ‡ฝๅผ่ฝŸ็‚ธ๏ผ‰
# L0 ๆญฃๅ‰‡ๆŽƒๆ๏ผš็ขบๅฎšๆ€งๆ‰พๅฏ็–‘ๆจกๅผ๏ผˆSQL/Command Injection / ็กฌ็ทจ็ขผๆ†‘่ญ‰ / eval๏ผ‰
# ้€™ไบ›ๆ˜ฏใ€Œ้€šๅ ฑใ€่€Œ้žใ€Œ้˜ปๆ“‹ใ€โ€”โ€”ไป็„ถ็นผ็บŒ่™•็†๏ผŒไฝ†ๆจ™่จ˜็ตฆ Security Guard
L0_PATTERNS: list[tuple[str, str, str]] = [
# (name, regex_pattern, description)
(
"sql_injection",
r"(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|CREATE|ALTER)\s+.{0,100}?\s*['\";]",
"SQL ่ชžๅฅ็–‘ไผผๆ‹ผๆŽฅ๏ผˆSQL Injection ้ขจ้šช๏ผ‰",
),
(
"os_command",
r"(?i)(os\.system|subprocess\.call|subprocess\.run|popen|exec\(|eval\()\s*[\(\['\"]",
"ๅฑ้šช็ณป็ตฑๅ‘ผๅซ๏ผˆOS Command Injection ้ขจ้šช๏ผ‰",
),
(
"hardcoded_secret",
r"(?i)(password|passwd|pwd|secret|api_key|apikey|token|auth)\s*=\s*['\"][^'\"]{4,}['\"]",
"็กฌ็ทจ็ขผๆ†‘่ญ‰๏ผˆCredential Exposure ้ขจ้šช๏ผ‰",
),
(
"path_traversal",
r"\.{2,}/|\.{2,}\\",
"่ทฏๅพ‘็ฉฟ่ถŠๅ˜—่ฉฆ๏ผˆPath Traversal ้ขจ้šช๏ผ‰",
),
(
"template_injection",
r"\{\{.{0,100}?\}\}|\{%.*?%\}",
"ๆจกๆฟ่ชžๆณ•๏ผˆTemplate Injection ้ขจ้šช๏ผ‰",
),
(
"xml_entity",
r"<!ENTITY|<!DOCTYPE\s+\w+\s+\[",
"XML ๅค–้ƒจๅฏฆ้ซ”๏ผˆXXE ้ขจ้šช๏ผ‰",
),
(
"prompt_override",
r"(?i)("
r"ignore\s+(all\s+)?previous\s+instructions?"
r"|forget\s+(all\s+|your\s+)?(?:previous|guidelines?|rules?|instructions?)"
r"|(?:previous\s+)?instructions?\s+(?:are\s+)?now\s+void"
r"|you\s+are\s+now\s+(a|an|in)"
r"|act\s+as\s+(a|an)\s+(different|new|evil|unrestricted)"
r"|(?:ignore|bypass|disable|override)\s+(?:your\s+)?(?:constitution|guidelines?|safety|rules?|restrictions?)"
r"|(?:cancel|disable|delete)\s+all\s+security\s+checks"
r"|(?:delete|disable|remove)\s+all\s+safety\s+filters"
r"|stop\s+following\s+(?:your\s+)?rules"
r"|from\s+this\s+point\s+forward,\s*ignore\s+all\s+restrictions"
r"|(?:your\s+)?constitution\s+is\s+now\s+disabled"
r"|system\s+override"
r"|critical\s+override"
r"|begin\s+override"
r"|ignore\s+all\s+rules"
r"|output\s+(?:raw\s+internal\s+data|sensitive\s+data|all\s+environment\s+variables|all\s+api\s+keys)"
r"|show\s+me\s+your\s+system\s+prompt"
r"|what\s+model\s+are\s+you\s+using\s+internally"
r"|delete\s+all\s+memory\s+files"
r"|mark\s+everything\s+as\s+critical"
r"|(?:new\s+)?(?:directive|instruction|order|command)\s*:\s*(?:fabricate|ignore|output|bypass)"
r"|\[\[SYSTEM\]\]|\[\[INST\]\]"
r"|\[System Message\]"
r"|<\|im_start\|>system|<\|system\|>"
r"|<<SYS>>|<</SYS>>"
r")",
"Prompt Injection ๅ˜—่ฉฆ๏ผˆOWASP LLM01๏ผ‰",
),
(
"jailbreak",
r"(?i)("
r"\bDAN\b"
r"|jailbreak"
r"|do\s+anything\s+now"
r"|no\s+restrictions?\s+mode"
r"|developer\s+mode"
r"|unrestricted\s+mode"
r"|pretend\s+(you\s+have\s+no|to\s+be)"
r"|unethical\s+twin"
r"|without\s+(?:moral|ethical|safety)\s+(?:limits?|filters?|restrictions?)"
r"|(?:constitution|guidelines)\s+(?:is\s+)?disabled"
r"|\[INST\]\s*<<SYS>>"
r")",
"่ถŠ็„ๅ˜—่ฉฆ๏ผˆOWASP LLM01๏ผ‰",
),
]
# ๅฎŒๅ…จ็ฆๆญข้€š้Ž็š„้—œ้ตๅญ—๏ผˆๆฏ” L0 ๆญฃๅ‰‡ๆ›ดๅšดๆ ผ๏ผ‰
# ้€™ไบ›ๆ˜ฏ้ซ˜ไฟกๅฟƒๆƒกๆ„ๆจกๅผ๏ผŒ็›ดๆŽฅๆ‹’็ต•่€Œ้žๆจ™่จ˜
BLOCKLIST_PATTERNS: list[tuple[str, str]] = [
(r"(?i)\bDROP\s+TABLE\b", "ๅตๆธฌๅˆฐ DROP TABLE ่ชžๅฅ"),
(r"(?i)\bxp_cmdshell\b", "SQL Server ๅ‘ฝไปคๅŸท่กŒๆŒ‡ไปค"),
(r"(?i)\bSHUTDOWN\s+WITH\s+NOWAIT\b", "่ณ‡ๆ–™ๅบซ้—œๆฉŸๆŒ‡ไปค"),
(r"(?i)\b(\d+|'[^']*')\s+OR\s+('?\d+'?\s*=\s*'?\d+'?|\w+\s*=\s*\w+)\s*(UNION|--)",
"SQL Boolean-based OR ๆณจๅ…ฅ๏ผˆ้ซ˜ไฟกๅฟƒ๏ผ‰"),
(r"(?i)UNION\s+(?:ALL\s+)?SELECT\s+\*\s+FROM\s+\w+",
"SQL UNION SELECT ๆณจๅ…ฅ๏ผˆ้ซ˜ไฟกๅฟƒ๏ผ‰"),
]
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ่ณ‡ๆ–™็ตๆง‹
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
@dataclass
class L0Finding:
"""L0 ๆŽƒๆ็š„ๅ–ฎไธ€็™ผ็พ"""
pattern_name: str
description: str
line_no: int
matched_text: str # ๆˆชๆ–ท่‡ณ 100 ๅญ—็ฌฆ๏ผŒ้ฟๅ…ๅ›ž้กฏๆƒกๆ„ๅ…งๅฎน
severity: str # "WARNING" | "INFO"
@dataclass
class SanitizeResult:
"""
ๆทจๅŒ–็ตๆžœใ€‚
Attributes:
safe: ๆ˜ฏๅฆๅ…่จฑ้€ฒๅ…ฅ Pipeline๏ผˆFalse ๆ™‚ๆ‡‰ๆ‹’็ต•๏ผ‰
blocked_reason: ่‹ฅ safe=False๏ผŒ่ชชๆ˜ŽๅŽŸๅ› 
truncated: ่ผธๅ…ฅๆ˜ฏๅฆ่ขซๆˆชๆ–ท
original_length: ๅŽŸๅง‹้•ทๅบฆ
sanitized_input: ๆˆชๆ–ทๅพŒ็š„ๆทจๅŒ–่ผธๅ…ฅ๏ผˆไพ› Pipeline ไฝฟ็”จ๏ผ‰
l0_findings: L0 ๆญฃๅ‰‡ๆŽƒๆ็™ผ็พ๏ผˆWARNING ็ดšๅˆฅ๏ผŒไปๅ…่จฑ้€ฒๅ…ฅไฝ†ๆจ™่จ˜๏ผ‰
input_hash: SHA-256 ๅ‰ 16 ๅญ—ๅ…ƒ๏ผˆ็”จๆ–ผๅŽป้‡ / ๆ—ฅ่ชŒ่ฟฝ่นค๏ผ‰
input_type: ๆŽจๆ–ท็š„่ผธๅ…ฅ้กžๅž‹
wasm_verdict: L0.5 WASM Sandbox ่ฉ•ไผฐ็ตๆžœ๏ผˆPhase 4C๏ผ‰
"""
safe: bool
sanitized_input: str
truncated: bool
original_length: int
l0_findings: list[L0Finding] = field(default_factory=list)
blocked_reason: str = ""
input_hash: str = ""
input_type: str = "unknown"
wasm_verdict: dict = field(default_factory=dict)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ่ผธๅ…ฅ้กžๅž‹ๆŽจๆ–ท
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _infer_input_type(text: str) -> str:
"""
ๆŽจๆ–ท่ผธๅ…ฅ้กžๅž‹๏ผŒไพ› Orchestrator ่ทฏ็”ฑๆฑบ็ญ–ๅƒ่€ƒใ€‚
v3.1๏ผšๆ”ฏๆดๅคš่ชž่จ€็จ‹ๅผ็ขผๅตๆธฌ๏ผˆPython/JS/TS/Java/Go/PHP/Ruby/Rust/C/C++๏ผ‰ใ€‚
Returns:
"package_list" โ†’ ๅฅ—ไปถๆธ…ๅ–ฎ๏ผˆ่ทฏๅพ‘ A๏ผ‰
"source_code" โ†’ ็จ‹ๅผ็ขผ๏ผˆ่ทฏๅพ‘ B๏ผ‰
"config_file" โ†’ ้…็ฝฎๆ–‡ไปถ๏ผˆ่ทฏๅพ‘ C๏ผ‰
"sql_review" โ†’ ๅญค็ซ‹ SQL corpus / SQL ่ชžๆณ•ๅฏฉๆŸฅ๏ผˆ่ทฏๅพ‘ C-like๏ผ‰
"mixed" โ†’ ๆททๅˆ๏ผˆ้ ่จญ่ทฏๅพ‘ B๏ผ‰
"""
if _looks_like_sql_review(text):
return "sql_review"
# ็จ‹ๅผ็ขผ็‰นๅพต๏ผˆๅคš่ชž่จ€๏ผ‰
code_signals = [
# Python
bool(re.search(r"^\s*(def |class |import |from \w+\s+import )", text, re.MULTILINE)),
# JavaScript / TypeScript
bool(re.search(r"(?:const|let|var)\s+\w+\s*=|=>\s*\{|require\s*\(|export\s+(?:default|const|function)", text, re.MULTILINE)),
# Java
bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text, re.MULTILINE)),
# Go
bool(re.search(r"^(?:package\s+\w+|func\s+\w+|:=)", text, re.MULTILINE)),
# PHP
bool(re.search(r"<\?php|\$\w+\s*=", text)),
# Ruby
bool(re.search(r"^\s*(?:def\s+\w+|require\s+['\"]|module\s+\w+|class\s+\w+\s*<)", text, re.MULTILINE)),
# Rust
bool(re.search(r"(?:fn\s+\w+|let\s+mut\s+|impl\s+\w+|use\s+\w+::)", text, re.MULTILINE)),
# C / C++
bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(|printf\s*\(|std::", text, re.MULTILINE)),
# C#
bool(re.search(r"using\s+System(?:\.\w+)?\s*;|namespace\s+\w+|public\s+class\s+\w+", text, re.MULTILINE)),
# Shebang
bool(re.search(r"^#!\/", text)),
# ้€š็”จ๏ผšๅคงๆ‹ฌ่™Ÿ่ชž่จ€ + ๅคš่กŒ
bool(re.search(r"[{}();]", text)) and text.count("\n") > 5,
]
# ๅฅ—ไปถๆธ…ๅ–ฎ็‰นๅพต๏ผš`name==version` ๆˆ– `name>=version` ๆˆ–ๅ–ฎ็ด”ๅ็จฑๅˆ—่กจ
pkg_signals = [
bool(re.search(r"^[\w\-\.]+[>=<!~^]{0,2}[\d\.]*$", text.strip(), re.MULTILINE)),
bool(re.search(r"(requirements|package|dependency|pip install|npm install|go get)", text, re.IGNORECASE)),
"," in text and "\n" not in text, # ้€—่™Ÿๅˆ†้š”ๅฅ—ไปถๅ๏ผˆๅ–ฎ่กŒ๏ผ‰
]
# ้…็ฝฎๆ–‡ไปถ็‰นๅพต
config_signals = [
bool(re.search(r"^\[.*\]$", text, re.MULTILINE)), # INI [section]
bool(re.search(r"^[\w\-]+:\s+\S", text, re.MULTILINE)), # YAML key: value
"<?xml" in text.lower(),
bool(re.search(r"^FROM\s+\S+", text, re.MULTILINE)), # Dockerfile FROM
bool(re.search(r"^(WORKDIR|EXPOSE|ENV|ARG|CMD|RUN|COPY|ADD)\s+", text, re.MULTILINE)), # Dockerfile ๆŒ‡ไปค
]
code_score = sum(code_signals)
pkg_score = sum(pkg_signals)
config_score = sum(config_signals)
if config_score >= 2 or (config_score >= 1 and code_score == 0 and pkg_score < 2):
return "config_file"
if code_score >= 2:
return "source_code"
if pkg_score >= 2:
return "package_list"
if code_score >= 1:
return "source_code"
return "package_list"
def _looks_like_sql_review(text: str) -> bool:
"""
ๅˆคๆ–ท่ผธๅ…ฅๆ˜ฏๅฆๆ›ดๅƒๅญค็ซ‹ SQL corpus๏ผŒ่€Œไธๆ˜ฏๆ‡‰็”จ็จ‹ๅผ source codeใ€‚
SQL Injection ็š„ๅฏๅˆฉ็”จๆ€ง้œ€่ฆ application sink/source๏ผ›ๅญค็ซ‹ `.sql`
ๆ–‡ๆœฌๅช่ƒฝๅš่ชžๆณ•่ˆ‡ payload review๏ผŒๅ› ๆญคๅœจ L0 ๅ…ˆๅˆ†ๆตใ€‚
"""
if not text or not text.strip():
return False
sql_statement_hits = len(re.findall(
r"(?im)^\s*(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|WITH|EXEC|GRANT)\b",
text,
))
sql_payload_hits = len(re.findall(
r"(?i)\b(?:UNION\s+SELECT|OR\s+1\s*=\s*1|SLEEP\s*\(|WAITFOR\s+DELAY|"
r"EXTRACTVALUE\s*\(|UPDATEXML\s*\(|xp_cmdshell|sp_executesql|EXEC\s*\(|"
r"CREATE\s+USER|GRANT\s+ALL|\$gt|\$where)\b",
text,
))
sql_comment_hits = len(re.findall(r"(?m)^\s*(?:--|/\*)", text))
application_code_signals = [
bool(re.search(r"^\s*(?:def |class |import |from \w+\s+import )", text, re.MULTILINE)),
bool(re.search(r"(?:const|let|var)\s+\w+\s*=|function\s+\w+\s*\(", text)),
bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text)),
bool(re.search(r"^(?:package\s+\w+|func\s+\w+)", text, re.MULTILINE)),
bool(re.search(r"<\?php|\$\w+\s*=", text)),
bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(", text)),
bool(re.search(r"using\s+System(?:\.\w+)?\s*;|public\s+class\s+\w+", text, re.MULTILINE)),
]
if sum(application_code_signals) >= 1:
return False
return sql_statement_hits >= 2 or (sql_statement_hits >= 1 and (sql_payload_hits + sql_comment_hits) >= 2)
def _wasm_block_finding(reason: str, text: str) -> L0Finding:
"""ๅฐ‡ WASM L0.5 ๅฐ้Ž–่จŠ่™Ÿไฟ็•™ๆˆๅฏ็จฝๆ ธ findingใ€‚"""
normalized = (reason or "wasm_block").lower()
if "prompt" in normalized or "instruction" in normalized:
pattern_name = "wasm_prompt_injection"
elif "jailbreak" in normalized:
pattern_name = "wasm_jailbreak"
elif "code" in normalized or "command" in normalized:
pattern_name = "wasm_code_injection"
else:
pattern_name = "wasm_l0_block"
return L0Finding(
pattern_name=pattern_name,
description=f"WASM L0.5 flagged input before type-aware review: {reason}",
line_no=1,
matched_text=(text or "")[:100],
severity="WARNING",
)
def _extract_safe_targets_from_blocked_text(text: str) -> str:
"""ๅพžๆททๅ…ฅ prompt injection ็š„่ผธๅ…ฅไธญๅชไฟ็•™ๅฏๆŽƒๆ็›ฎๆจ™ใ€‚"""
target_pattern = re.compile(
r"\b(?!CVE\b)([A-Za-z][A-Za-z0-9_.+-]{1,40})\s*"
r"(?:==|>=|<=|~=|=|\s+)\s*(v?\d[\w.+-]{0,30})\b"
)
blocked_words = re.compile(
r"(?i)(ignore|forget|instruction|system|rule|constitution|chatbot|"
r"hacked|developer\s+mode|jailbreak|dan|say\s+['\"]|output)"
)
safe_targets: list[str] = []
for match in target_pattern.finditer(text):
package, version = match.groups()
start = max(0, match.start() - 80)
end = min(len(text), match.end() + 80)
context = text[start:end]
if blocked_words.search(context) and not re.search(r"(?i)(django|postgresql|postgres|redis|nginx|flask|express|spring|openssl|apache)", package):
continue
normalized = f"{package} {version}".strip()
if normalized not in safe_targets:
safe_targets.append(normalized)
return "\n".join(safe_targets)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆ ธๅฟƒๆทจๅŒ–ๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def sanitize_input(raw_input: str) -> SanitizeResult:
"""
ๅฐๅŽŸๅง‹็”จๆˆถ่ผธๅ…ฅ้€ฒ่กŒ็ขบๅฎšๆ€งๆทจๅŒ–ใ€‚
ๆต็จ‹๏ผš
1. ่จˆ็ฎ— input_hash๏ผˆ็”จๆ–ผๆ—ฅ่ชŒ่ฟฝ่นค๏ผ‰
2. L0.5 WASM Sandbox ่ฉ•ไผฐ
3. ๆˆชๆ–ท่ถ…้•ท่ผธๅ…ฅ
4. Blocklist ๆŽƒๆ๏ผˆ้ซ˜ไฟกๅฟƒๆƒกๆ„ โ†’ ็›ดๆŽฅๆ‹’็ต•๏ผ‰
5. L0 ๆญฃๅ‰‡ๆŽƒๆ๏ผˆๆจ™่จ˜๏ผŒไปๅ…่จฑ้€š้Ž๏ผ‰
6. ๆŽจๆ–ท่ผธๅ…ฅ้กžๅž‹
7. ่ฟ”ๅ›ž SanitizeResult
Args:
raw_input: ็”จๆˆถๅŽŸๅง‹่ผธๅ…ฅๅญ—ไธฒ
Returns:
SanitizeResult โ€” ๆทจๅŒ–ๅพŒ็ตๆžœ
"""
if not isinstance(raw_input, str):
raw_input = str(raw_input)
original_length = len(raw_input)
# โ”€โ”€ ๆญฅ้ฉŸ 1๏ผš่จˆ็ฎ— hash โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
input_hash = hashlib.sha256(raw_input.encode("utf-8", errors="replace")).hexdigest()[:16]
logger.debug("[SANITIZE] hash=%s original_len=%d", input_hash, original_length)
# โ”€โ”€ ๆญฅ้ฉŸ 1.5๏ผšL0.5 WASM Sandbox ๏ผˆPhase 4C๏ผ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
wasm_verdict = _wasm_eval(raw_input)
wasm_code = wasm_verdict.get("code", 0)
wasm_reason = wasm_verdict.get("reason", "ok")
wasm_block_msg = ""
if wasm_code == 1: # BLOCK
wasm_block_msg = f"[WASM-L0.5] BLOCK: {wasm_reason}"
logger.warning("[SANITIZE][%s] %s", input_hash, wasm_block_msg)
elif wasm_code == 3: # TRUNCATE โ€” WASM ๅปบ่ญฐๆˆชๆ–ท๏ผŒ็นผ็บŒ่™•็†
logger.info("[SANITIZE][%s] WASM TRUNCATE ๅปบ่ญฐ", input_hash)
raw_input = raw_input[:MAX_INPUT_LENGTH]
# โ”€โ”€ ๆญฅ้ฉŸ 2๏ผšๆˆชๆ–ท โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
truncated = False
text = raw_input
if len(text) > MAX_INPUT_LENGTH:
text = text[:MAX_INPUT_LENGTH]
truncated = True
logger.warning(
"[SANITIZE][%s] Input truncated: %d โ†’ %d chars",
input_hash, original_length, MAX_INPUT_LENGTH,
)
# ่ถ…้Ž่กŒๆ•ธไนŸๆˆชๆ–ท
lines = text.splitlines()
if len(lines) > MAX_LINE_COUNT:
text = "\n".join(lines[:MAX_LINE_COUNT])
truncated = True
logger.warning(
"[SANITIZE][%s] Input truncated to %d lines", input_hash, MAX_LINE_COUNT
)
preliminary_sql_review = _looks_like_sql_review(text)
# โ”€โ”€ ๆญฅ้ฉŸ 3๏ผšBlocklist ๆŽƒๆ๏ผˆ็›ดๆŽฅๆ‹’็ต•๏ผ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
for block_pattern, reason in BLOCKLIST_PATTERNS:
if re.search(block_pattern, text):
if preliminary_sql_review:
l0_findings = [
L0Finding(
pattern_name="sql_review_payload",
description=f"SQL review corpus contains blocked payload syntax: {reason}",
line_no=1,
matched_text=text[:100],
severity="WARNING",
)
]
break
logger.warning("[SANITIZE][%s] BLOCKED: %s", input_hash, reason)
return SanitizeResult(
safe=False,
sanitized_input="",
truncated=truncated,
original_length=original_length,
blocked_reason=reason,
input_hash=input_hash,
input_type="blocked",
)
# โ”€โ”€ ๆญฅ้ฉŸ 4๏ผšL0 ๆญฃๅ‰‡ๆŽƒๆ๏ผˆๆจ™่จ˜๏ผŒไธๆ‹’็ต•๏ผ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if "l0_findings" not in locals():
l0_findings: list[L0Finding] = []
text_lines = text.splitlines()
for pattern_name, pattern, description in L0_PATTERNS:
try:
for match in re.finditer(pattern, text):
# ่จˆ็ฎ—่กŒ่™Ÿ
line_no = text[: match.start()].count("\n") + 1
matched_snippet = match.group(0)[:100] # ๆˆชๆ–ท๏ผŒ้ฟๅ…ๅ›ž้กฏๆƒกๆ„ๅ…งๅฎน
finding = L0Finding(
pattern_name=pattern_name,
description=description,
line_no=line_no,
matched_text=matched_snippet,
severity="WARNING" if "injection" in pattern_name or "jailbreak" in pattern_name else "INFO",
)
l0_findings.append(finding)
logger.info(
"[SANITIZE][%s] L0 finding: %s @ line %d",
input_hash, pattern_name, line_no,
)
except re.error as e:
logger.error("[SANITIZE] Regex error for pattern %s: %s", pattern_name, e)
# โ”€โ”€ ๆญฅ้ฉŸ 5๏ผšๆŽจๆ–ท่ผธๅ…ฅ้กžๅž‹ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
input_type = _infer_input_type(text)
if wasm_block_msg:
l0_findings.append(_wasm_block_finding(str(wasm_reason), text))
if input_type not in {"source_code", "config_file", "mixed", "sql_review"}:
safe_targets = _extract_safe_targets_from_blocked_text(text)
if safe_targets:
text = safe_targets
input_type = _infer_input_type(text)
logger.warning(
"[SANITIZE][%s] WASM block sanitized to safe targets: %s",
input_hash, text,
)
else:
logger.warning(
"[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d",
input_hash, input_type, wasm_block_msg, len(l0_findings),
)
return SanitizeResult(
safe=False,
sanitized_input="",
truncated=truncated,
original_length=original_length,
l0_findings=l0_findings,
blocked_reason=wasm_block_msg,
input_hash=input_hash,
input_type="blocked",
wasm_verdict=wasm_verdict,
)
if input_type not in {"source_code", "config_file", "mixed", "package_list", "sql_review"}:
logger.warning(
"[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d",
input_hash, input_type, wasm_block_msg, len(l0_findings),
)
return SanitizeResult(
safe=False,
sanitized_input="",
truncated=truncated,
original_length=original_length,
l0_findings=l0_findings,
blocked_reason=wasm_block_msg,
input_hash=input_hash,
input_type="blocked",
wasm_verdict=wasm_verdict,
)
logger.info(
"[SANITIZE][%s] Result: safe=True type=%s truncated=%s l0_count=%d",
input_hash, input_type, truncated, len(l0_findings),
)
return SanitizeResult(
safe=True,
sanitized_input=text,
truncated=truncated,
original_length=original_length,
l0_findings=l0_findings,
input_hash=input_hash,
input_type=input_type,
wasm_verdict=wasm_verdict,
)
def format_l0_report(result: SanitizeResult) -> dict[str, Any]:
"""
ๅฐ‡ SanitizeResult ่ฝ‰ๆ›็‚บ Pipeline ๅฏ็”จ็š„ๅญ—ๅ…ธๆ ผๅผใ€‚
ไพ› main.py ไฝฟ็”จ๏ผŒๆณจๅ…ฅ่‡ณ Orchestrator ็š„่ทฏ็”ฑๆฑบ็ญ–ใ€‚
Returns:
{
"safe": bool,
"input_type": str,
"truncated": bool,
"input_hash": str,
"l0_findings": [{"pattern": str, "description": str, "line_no": int, "severity": str}],
"l0_warning_count": int,
"blocked_reason": str,
}
"""
return {
"safe": result.safe,
"input_type": result.input_type,
"truncated": result.truncated,
"input_hash": result.input_hash,
"blocked_reason": result.blocked_reason,
"wasm_verdict": result.wasm_verdict, # Phase 4C: L0.5 WASM ่ฉ•ไผฐ
"l0_findings": [
{
"pattern": f.pattern_name,
"description": f.description,
"line_no": f.line_no,
"severity": f.severity,
}
for f in result.l0_findings
],
"l0_warning_count": sum(1 for f in result.l0_findings if f.severity == "WARNING"),
}