| """ |
| input_sanitizer.py โ L0 ็ขบๅฎๆง่ผธๅ
ฅๅๅๅจ |
| ========================================== |
| ๆถๆงไฝ็ฝฎ๏ผPipeline ๆๅ็ซฏ๏ผๅจ CrewAI ๅๅๅๅท่ก๏ผ |
| |
| ๅฑค็ด้ฒๅพกๆถๆง๏ผPhase 4C ๆดๆฐ๏ผ๏ผ |
| L0.5 WASM Sandbox โ wasmtime ๆฒ็้ๆฟพ๏ผPrompt Injection / Unicode / ่ถ
้ๆชป๏ผ |
| L0 Python ๆญฃๅๆๆ โ SQL/OS/ๆจกๆฟ Injection ๆจ่จ |
| L1 Blocklist โ ้ซไฟกๅฟๆกๆๆจกๅผ๏ผ็ดๆฅๆ็ต๏ผ |
| |
| ไพๆ๏ผ |
| - FINAL_PLAN.md ยง3a๏ผ[โ๏ธ input_sanitizer.py] โ ็ขบๅฎๆงๅบ็ค่จญๆฝ๏ผOWASP LLM01:2025๏ผ |
| - OWASP LLM01:2025 Prompt Injection โ ไธๅฏไฟก่ผธๅ
ฅๅจ้ฒๅ
ฅ LLM ๅๅฟ
้ ๅ
้ๆฟพ |
| |
| ่จญ่จๅๅ๏ผ |
| - ็ด็ขบๅฎๆง้็ฎ๏ผ็ก LLMใ็กๅค้จ API๏ผ |
| - ่ security_guard.py ็ๅๅทฅ๏ผ |
| input_sanitizer โ ๅฎ้๏ผPipeline ๅ๏ผ๏ผๆชๆท + L0 ๆญฃๅๆๆ + ็ฆๅ้้ตๅญ้ๆฟพ |
| security_guard โ ๆๅ๏ผCrewAI ๅ
ง๏ผ๏ผAST/ๆญฃๅ็จๅผ็ขผ็ตๆงๆๅ๏ผไธๅๅคๆท |
| |
| ๅฑค็ด้็๏ผๆ็จๅฑค๏ผไธๅผ็จ harness/entropy ๅฑค๏ผ |
| """ |
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import logging |
| import os |
| import re |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| logger = logging.getLogger("threathunter.input_sanitizer") |
|
|
| |
| |
| |
|
|
| |
| _WASM_ENABLED = os.getenv("WASM_SANDBOX_ENABLED", "true").lower() not in ("false", "0", "no") |
|
|
| try: |
| if _WASM_ENABLED: |
| import threathunter_prompt_sandbox as _wasm_mod |
| _WASM_AVAILABLE = True |
| logger.info("[InputSanitizer] Phase 4C: WASM Sandbox ๅ็จ (v%s)", _wasm_mod.sandbox_version()) |
| else: |
| _wasm_mod = None |
| _WASM_AVAILABLE = False |
| logger.info("[InputSanitizer] WASM_SANDBOX_ENABLED=false, ่ทณ้ L0.5 ๅฑค") |
| except ImportError: |
| _wasm_mod = None |
| _WASM_AVAILABLE = False |
| logger.warning( |
| "[InputSanitizer] threathunter_prompt_sandbox ไธๅฏ็จ๏ผๆช็ทจ่ญฏ๏ผ๏ผ" |
| "้็ด็บ็ด Python L0 ้ๆฟพ" |
| ) |
|
|
|
|
| def _wasm_eval(text: str) -> dict[str, Any]: |
| """ |
| ๅผๅซ WASM Sandbox ่ฉไผฐ่ผธๅ
ฅๅฎๅ
จๆงใ |
| |
| Returns: |
| {"code": int, "verdict": str, "reason": str, "engine": str} |
| ่ฅ WASM ไธๅฏ็จ๏ผๅๅณ {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable"} |
| """ |
| if not _WASM_AVAILABLE or _wasm_mod is None: |
| return {"code": 0, "verdict": "ALLOW", "reason": "wasm_unavailable", "engine": "none"} |
|
|
| try: |
| raw = _wasm_mod.sandbox_eval(text) |
| result = json.loads(raw) |
| return result |
| except Exception as exc: |
| logger.warning("[InputSanitizer] WASM eval ็ฐๅธธ: %s", exc) |
| return {"code": 0, "verdict": "ALLOW", "reason": f"wasm_error:{exc}", "engine": "none"} |
|
|
|
|
| |
| |
| |
|
|
| MAX_INPUT_LENGTH = 50_000 |
| MAX_LINE_COUNT = 2_000 |
|
|
| |
| |
| L0_PATTERNS: list[tuple[str, str, str]] = [ |
| |
| ( |
| "sql_injection", |
| r"(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|CREATE|ALTER)\s+.{0,100}?\s*['\";]", |
| "SQL ่ชๅฅ็ไผผๆผๆฅ๏ผSQL Injection ้ขจ้ช๏ผ", |
| ), |
| ( |
| "os_command", |
| r"(?i)(os\.system|subprocess\.call|subprocess\.run|popen|exec\(|eval\()\s*[\(\['\"]", |
| "ๅฑ้ช็ณป็ตฑๅผๅซ๏ผOS Command Injection ้ขจ้ช๏ผ", |
| ), |
| ( |
| "hardcoded_secret", |
| r"(?i)(password|passwd|pwd|secret|api_key|apikey|token|auth)\s*=\s*['\"][^'\"]{4,}['\"]", |
| "็กฌ็ทจ็ขผๆ่ญ๏ผCredential Exposure ้ขจ้ช๏ผ", |
| ), |
| ( |
| "path_traversal", |
| r"\.{2,}/|\.{2,}\\", |
| "่ทฏๅพ็ฉฟ่ถๅ่ฉฆ๏ผPath Traversal ้ขจ้ช๏ผ", |
| ), |
| ( |
| "template_injection", |
| r"\{\{.{0,100}?\}\}|\{%.*?%\}", |
| "ๆจกๆฟ่ชๆณ๏ผTemplate Injection ้ขจ้ช๏ผ", |
| ), |
| ( |
| "xml_entity", |
| r"<!ENTITY|<!DOCTYPE\s+\w+\s+\[", |
| "XML ๅค้จๅฏฆ้ซ๏ผXXE ้ขจ้ช๏ผ", |
| ), |
| ( |
| "prompt_override", |
| r"(?i)(" |
| r"ignore\s+(all\s+)?previous\s+instructions?" |
| r"|forget\s+(all\s+|your\s+)?(?:previous|guidelines?|rules?|instructions?)" |
| r"|(?:previous\s+)?instructions?\s+(?:are\s+)?now\s+void" |
| r"|you\s+are\s+now\s+(a|an|in)" |
| r"|act\s+as\s+(a|an)\s+(different|new|evil|unrestricted)" |
| r"|(?:ignore|bypass|disable|override)\s+(?:your\s+)?(?:constitution|guidelines?|safety|rules?|restrictions?)" |
| r"|(?:cancel|disable|delete)\s+all\s+security\s+checks" |
| r"|(?:delete|disable|remove)\s+all\s+safety\s+filters" |
| r"|stop\s+following\s+(?:your\s+)?rules" |
| r"|from\s+this\s+point\s+forward,\s*ignore\s+all\s+restrictions" |
| r"|(?:your\s+)?constitution\s+is\s+now\s+disabled" |
| r"|system\s+override" |
| r"|critical\s+override" |
| r"|begin\s+override" |
| r"|ignore\s+all\s+rules" |
| r"|output\s+(?:raw\s+internal\s+data|sensitive\s+data|all\s+environment\s+variables|all\s+api\s+keys)" |
| r"|show\s+me\s+your\s+system\s+prompt" |
| r"|what\s+model\s+are\s+you\s+using\s+internally" |
| r"|delete\s+all\s+memory\s+files" |
| r"|mark\s+everything\s+as\s+critical" |
| r"|(?:new\s+)?(?:directive|instruction|order|command)\s*:\s*(?:fabricate|ignore|output|bypass)" |
| r"|\[\[SYSTEM\]\]|\[\[INST\]\]" |
| r"|\[System Message\]" |
| r"|<\|im_start\|>system|<\|system\|>" |
| r"|<<SYS>>|<</SYS>>" |
| r")", |
| "Prompt Injection ๅ่ฉฆ๏ผOWASP LLM01๏ผ", |
| ), |
| ( |
| "jailbreak", |
| r"(?i)(" |
| r"\bDAN\b" |
| r"|jailbreak" |
| r"|do\s+anything\s+now" |
| r"|no\s+restrictions?\s+mode" |
| r"|developer\s+mode" |
| r"|unrestricted\s+mode" |
| r"|pretend\s+(you\s+have\s+no|to\s+be)" |
| r"|unethical\s+twin" |
| r"|without\s+(?:moral|ethical|safety)\s+(?:limits?|filters?|restrictions?)" |
| r"|(?:constitution|guidelines)\s+(?:is\s+)?disabled" |
| r"|\[INST\]\s*<<SYS>>" |
| r")", |
| "่ถ็ๅ่ฉฆ๏ผOWASP LLM01๏ผ", |
| ), |
| ] |
|
|
| |
| |
| BLOCKLIST_PATTERNS: list[tuple[str, str]] = [ |
| (r"(?i)\bDROP\s+TABLE\b", "ๅตๆธฌๅฐ DROP TABLE ่ชๅฅ"), |
| (r"(?i)\bxp_cmdshell\b", "SQL Server ๅฝไปคๅท่กๆไปค"), |
| (r"(?i)\bSHUTDOWN\s+WITH\s+NOWAIT\b", "่ณๆๅบซ้ๆฉๆไปค"), |
| (r"(?i)\b(\d+|'[^']*')\s+OR\s+('?\d+'?\s*=\s*'?\d+'?|\w+\s*=\s*\w+)\s*(UNION|--)", |
| "SQL Boolean-based OR ๆณจๅ
ฅ๏ผ้ซไฟกๅฟ๏ผ"), |
| (r"(?i)UNION\s+(?:ALL\s+)?SELECT\s+\*\s+FROM\s+\w+", |
| "SQL UNION SELECT ๆณจๅ
ฅ๏ผ้ซไฟกๅฟ๏ผ"), |
| ] |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class L0Finding: |
| """L0 ๆๆ็ๅฎไธ็ผ็พ""" |
| pattern_name: str |
| description: str |
| line_no: int |
| matched_text: str |
| severity: str |
|
|
|
|
| @dataclass |
| class SanitizeResult: |
| """ |
| ๆทจๅ็ตๆใ |
| |
| Attributes: |
| safe: ๆฏๅฆๅ
่จฑ้ฒๅ
ฅ Pipeline๏ผFalse ๆๆๆ็ต๏ผ |
| blocked_reason: ่ฅ safe=False๏ผ่ชชๆๅๅ |
| truncated: ่ผธๅ
ฅๆฏๅฆ่ขซๆชๆท |
| original_length: ๅๅง้ทๅบฆ |
| sanitized_input: ๆชๆทๅพ็ๆทจๅ่ผธๅ
ฅ๏ผไพ Pipeline ไฝฟ็จ๏ผ |
| l0_findings: L0 ๆญฃๅๆๆ็ผ็พ๏ผWARNING ็ดๅฅ๏ผไปๅ
่จฑ้ฒๅ
ฅไฝๆจ่จ๏ผ |
| input_hash: SHA-256 ๅ 16 ๅญๅ
๏ผ็จๆผๅป้ / ๆฅ่ช่ฟฝ่นค๏ผ |
| input_type: ๆจๆท็่ผธๅ
ฅ้กๅ |
| wasm_verdict: L0.5 WASM Sandbox ่ฉไผฐ็ตๆ๏ผPhase 4C๏ผ |
| """ |
| safe: bool |
| sanitized_input: str |
| truncated: bool |
| original_length: int |
| l0_findings: list[L0Finding] = field(default_factory=list) |
| blocked_reason: str = "" |
| input_hash: str = "" |
| input_type: str = "unknown" |
| wasm_verdict: dict = field(default_factory=dict) |
|
|
|
|
| |
| |
| |
|
|
| def _infer_input_type(text: str) -> str: |
| """ |
| ๆจๆท่ผธๅ
ฅ้กๅ๏ผไพ Orchestrator ่ทฏ็ฑๆฑบ็ญๅ่ใ |
| |
| v3.1๏ผๆฏๆดๅค่ช่จ็จๅผ็ขผๅตๆธฌ๏ผPython/JS/TS/Java/Go/PHP/Ruby/Rust/C/C++๏ผใ |
| |
| Returns: |
| "package_list" โ ๅฅไปถๆธ
ๅฎ๏ผ่ทฏๅพ A๏ผ |
| "source_code" โ ็จๅผ็ขผ๏ผ่ทฏๅพ B๏ผ |
| "config_file" โ ้
็ฝฎๆไปถ๏ผ่ทฏๅพ C๏ผ |
| "sql_review" โ ๅญค็ซ SQL corpus / SQL ่ชๆณๅฏฉๆฅ๏ผ่ทฏๅพ C-like๏ผ |
| "mixed" โ ๆททๅ๏ผ้ ่จญ่ทฏๅพ B๏ผ |
| """ |
| if _looks_like_sql_review(text): |
| return "sql_review" |
|
|
| |
| code_signals = [ |
| |
| bool(re.search(r"^\s*(def |class |import |from \w+\s+import )", text, re.MULTILINE)), |
| |
| bool(re.search(r"(?:const|let|var)\s+\w+\s*=|=>\s*\{|require\s*\(|export\s+(?:default|const|function)", text, re.MULTILINE)), |
| |
| bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text, re.MULTILINE)), |
| |
| bool(re.search(r"^(?:package\s+\w+|func\s+\w+|:=)", text, re.MULTILINE)), |
| |
| bool(re.search(r"<\?php|\$\w+\s*=", text)), |
| |
| bool(re.search(r"^\s*(?:def\s+\w+|require\s+['\"]|module\s+\w+|class\s+\w+\s*<)", text, re.MULTILINE)), |
| |
| bool(re.search(r"(?:fn\s+\w+|let\s+mut\s+|impl\s+\w+|use\s+\w+::)", text, re.MULTILINE)), |
| |
| bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(|printf\s*\(|std::", text, re.MULTILINE)), |
| |
| bool(re.search(r"using\s+System(?:\.\w+)?\s*;|namespace\s+\w+|public\s+class\s+\w+", text, re.MULTILINE)), |
| |
| bool(re.search(r"^#!\/", text)), |
| |
| bool(re.search(r"[{}();]", text)) and text.count("\n") > 5, |
| ] |
|
|
| |
| pkg_signals = [ |
| bool(re.search(r"^[\w\-\.]+[>=<!~^]{0,2}[\d\.]*$", text.strip(), re.MULTILINE)), |
| bool(re.search(r"(requirements|package|dependency|pip install|npm install|go get)", text, re.IGNORECASE)), |
| "," in text and "\n" not in text, |
| ] |
|
|
| |
| config_signals = [ |
| bool(re.search(r"^\[.*\]$", text, re.MULTILINE)), |
| bool(re.search(r"^[\w\-]+:\s+\S", text, re.MULTILINE)), |
| "<?xml" in text.lower(), |
| bool(re.search(r"^FROM\s+\S+", text, re.MULTILINE)), |
| bool(re.search(r"^(WORKDIR|EXPOSE|ENV|ARG|CMD|RUN|COPY|ADD)\s+", text, re.MULTILINE)), |
| ] |
|
|
| code_score = sum(code_signals) |
| pkg_score = sum(pkg_signals) |
| config_score = sum(config_signals) |
|
|
| if config_score >= 2 or (config_score >= 1 and code_score == 0 and pkg_score < 2): |
| return "config_file" |
| if code_score >= 2: |
| return "source_code" |
| if pkg_score >= 2: |
| return "package_list" |
| if code_score >= 1: |
| return "source_code" |
|
|
| return "package_list" |
|
|
|
|
| def _looks_like_sql_review(text: str) -> bool: |
| """ |
| ๅคๆท่ผธๅ
ฅๆฏๅฆๆดๅๅญค็ซ SQL corpus๏ผ่ไธๆฏๆ็จ็จๅผ source codeใ |
| |
| SQL Injection ็ๅฏๅฉ็จๆง้่ฆ application sink/source๏ผๅญค็ซ `.sql` |
| ๆๆฌๅช่ฝๅ่ชๆณ่ payload review๏ผๅ ๆญคๅจ L0 ๅ
ๅๆตใ |
| """ |
| if not text or not text.strip(): |
| return False |
|
|
| sql_statement_hits = len(re.findall( |
| r"(?im)^\s*(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|WITH|EXEC|GRANT)\b", |
| text, |
| )) |
| sql_payload_hits = len(re.findall( |
| r"(?i)\b(?:UNION\s+SELECT|OR\s+1\s*=\s*1|SLEEP\s*\(|WAITFOR\s+DELAY|" |
| r"EXTRACTVALUE\s*\(|UPDATEXML\s*\(|xp_cmdshell|sp_executesql|EXEC\s*\(|" |
| r"CREATE\s+USER|GRANT\s+ALL|\$gt|\$where)\b", |
| text, |
| )) |
| sql_comment_hits = len(re.findall(r"(?m)^\s*(?:--|/\*)", text)) |
|
|
| application_code_signals = [ |
| bool(re.search(r"^\s*(?:def |class |import |from \w+\s+import )", text, re.MULTILINE)), |
| bool(re.search(r"(?:const|let|var)\s+\w+\s*=|function\s+\w+\s*\(", text)), |
| bool(re.search(r"(?:public|private|protected)\s+(?:static\s+)?(?:class|void|int|String)\s+\w+", text)), |
| bool(re.search(r"^(?:package\s+\w+|func\s+\w+)", text, re.MULTILINE)), |
| bool(re.search(r"<\?php|\$\w+\s*=", text)), |
| bool(re.search(r"#include\s*[<\"]|int\s+main\s*\(", text)), |
| bool(re.search(r"using\s+System(?:\.\w+)?\s*;|public\s+class\s+\w+", text, re.MULTILINE)), |
| ] |
|
|
| if sum(application_code_signals) >= 1: |
| return False |
|
|
| return sql_statement_hits >= 2 or (sql_statement_hits >= 1 and (sql_payload_hits + sql_comment_hits) >= 2) |
|
|
|
|
| def _wasm_block_finding(reason: str, text: str) -> L0Finding: |
| """ๅฐ WASM L0.5 ๅฐ้่จ่ไฟ็ๆๅฏ็จฝๆ ธ findingใ""" |
| normalized = (reason or "wasm_block").lower() |
| if "prompt" in normalized or "instruction" in normalized: |
| pattern_name = "wasm_prompt_injection" |
| elif "jailbreak" in normalized: |
| pattern_name = "wasm_jailbreak" |
| elif "code" in normalized or "command" in normalized: |
| pattern_name = "wasm_code_injection" |
| else: |
| pattern_name = "wasm_l0_block" |
|
|
| return L0Finding( |
| pattern_name=pattern_name, |
| description=f"WASM L0.5 flagged input before type-aware review: {reason}", |
| line_no=1, |
| matched_text=(text or "")[:100], |
| severity="WARNING", |
| ) |
|
|
|
|
| def _extract_safe_targets_from_blocked_text(text: str) -> str: |
| """ๅพๆททๅ
ฅ prompt injection ็่ผธๅ
ฅไธญๅชไฟ็ๅฏๆๆ็ฎๆจใ""" |
| target_pattern = re.compile( |
| r"\b(?!CVE\b)([A-Za-z][A-Za-z0-9_.+-]{1,40})\s*" |
| r"(?:==|>=|<=|~=|=|\s+)\s*(v?\d[\w.+-]{0,30})\b" |
| ) |
| blocked_words = re.compile( |
| r"(?i)(ignore|forget|instruction|system|rule|constitution|chatbot|" |
| r"hacked|developer\s+mode|jailbreak|dan|say\s+['\"]|output)" |
| ) |
| safe_targets: list[str] = [] |
| for match in target_pattern.finditer(text): |
| package, version = match.groups() |
| start = max(0, match.start() - 80) |
| end = min(len(text), match.end() + 80) |
| context = text[start:end] |
| if blocked_words.search(context) and not re.search(r"(?i)(django|postgresql|postgres|redis|nginx|flask|express|spring|openssl|apache)", package): |
| continue |
| normalized = f"{package} {version}".strip() |
| if normalized not in safe_targets: |
| safe_targets.append(normalized) |
| return "\n".join(safe_targets) |
|
|
|
|
| |
| |
| |
|
|
| def sanitize_input(raw_input: str) -> SanitizeResult: |
| """ |
| ๅฐๅๅง็จๆถ่ผธๅ
ฅ้ฒ่ก็ขบๅฎๆงๆทจๅใ |
| |
| ๆต็จ๏ผ |
| 1. ่จ็ฎ input_hash๏ผ็จๆผๆฅ่ช่ฟฝ่นค๏ผ |
| 2. L0.5 WASM Sandbox ่ฉไผฐ |
| 3. ๆชๆท่ถ
้ท่ผธๅ
ฅ |
| 4. Blocklist ๆๆ๏ผ้ซไฟกๅฟๆกๆ โ ็ดๆฅๆ็ต๏ผ |
| 5. L0 ๆญฃๅๆๆ๏ผๆจ่จ๏ผไปๅ
่จฑ้้๏ผ |
| 6. ๆจๆท่ผธๅ
ฅ้กๅ |
| 7. ่ฟๅ SanitizeResult |
| |
| Args: |
| raw_input: ็จๆถๅๅง่ผธๅ
ฅๅญไธฒ |
| |
| Returns: |
| SanitizeResult โ ๆทจๅๅพ็ตๆ |
| """ |
| if not isinstance(raw_input, str): |
| raw_input = str(raw_input) |
|
|
| original_length = len(raw_input) |
|
|
| |
| input_hash = hashlib.sha256(raw_input.encode("utf-8", errors="replace")).hexdigest()[:16] |
| logger.debug("[SANITIZE] hash=%s original_len=%d", input_hash, original_length) |
|
|
| |
| wasm_verdict = _wasm_eval(raw_input) |
| wasm_code = wasm_verdict.get("code", 0) |
| wasm_reason = wasm_verdict.get("reason", "ok") |
| wasm_block_msg = "" |
|
|
| if wasm_code == 1: |
| wasm_block_msg = f"[WASM-L0.5] BLOCK: {wasm_reason}" |
| logger.warning("[SANITIZE][%s] %s", input_hash, wasm_block_msg) |
| elif wasm_code == 3: |
| logger.info("[SANITIZE][%s] WASM TRUNCATE ๅปบ่ญฐ", input_hash) |
| raw_input = raw_input[:MAX_INPUT_LENGTH] |
|
|
| |
| truncated = False |
| text = raw_input |
|
|
| if len(text) > MAX_INPUT_LENGTH: |
| text = text[:MAX_INPUT_LENGTH] |
| truncated = True |
| logger.warning( |
| "[SANITIZE][%s] Input truncated: %d โ %d chars", |
| input_hash, original_length, MAX_INPUT_LENGTH, |
| ) |
|
|
| |
| lines = text.splitlines() |
| if len(lines) > MAX_LINE_COUNT: |
| text = "\n".join(lines[:MAX_LINE_COUNT]) |
| truncated = True |
| logger.warning( |
| "[SANITIZE][%s] Input truncated to %d lines", input_hash, MAX_LINE_COUNT |
| ) |
|
|
| preliminary_sql_review = _looks_like_sql_review(text) |
|
|
| |
| for block_pattern, reason in BLOCKLIST_PATTERNS: |
| if re.search(block_pattern, text): |
| if preliminary_sql_review: |
| l0_findings = [ |
| L0Finding( |
| pattern_name="sql_review_payload", |
| description=f"SQL review corpus contains blocked payload syntax: {reason}", |
| line_no=1, |
| matched_text=text[:100], |
| severity="WARNING", |
| ) |
| ] |
| break |
| logger.warning("[SANITIZE][%s] BLOCKED: %s", input_hash, reason) |
| return SanitizeResult( |
| safe=False, |
| sanitized_input="", |
| truncated=truncated, |
| original_length=original_length, |
| blocked_reason=reason, |
| input_hash=input_hash, |
| input_type="blocked", |
| ) |
|
|
| |
| if "l0_findings" not in locals(): |
| l0_findings: list[L0Finding] = [] |
| text_lines = text.splitlines() |
|
|
| for pattern_name, pattern, description in L0_PATTERNS: |
| try: |
| for match in re.finditer(pattern, text): |
| |
| line_no = text[: match.start()].count("\n") + 1 |
| matched_snippet = match.group(0)[:100] |
|
|
| finding = L0Finding( |
| pattern_name=pattern_name, |
| description=description, |
| line_no=line_no, |
| matched_text=matched_snippet, |
| severity="WARNING" if "injection" in pattern_name or "jailbreak" in pattern_name else "INFO", |
| ) |
| l0_findings.append(finding) |
| logger.info( |
| "[SANITIZE][%s] L0 finding: %s @ line %d", |
| input_hash, pattern_name, line_no, |
| ) |
| except re.error as e: |
| logger.error("[SANITIZE] Regex error for pattern %s: %s", pattern_name, e) |
|
|
| |
| input_type = _infer_input_type(text) |
|
|
| if wasm_block_msg: |
| l0_findings.append(_wasm_block_finding(str(wasm_reason), text)) |
| if input_type not in {"source_code", "config_file", "mixed", "sql_review"}: |
| safe_targets = _extract_safe_targets_from_blocked_text(text) |
| if safe_targets: |
| text = safe_targets |
| input_type = _infer_input_type(text) |
| logger.warning( |
| "[SANITIZE][%s] WASM block sanitized to safe targets: %s", |
| input_hash, text, |
| ) |
| else: |
| logger.warning( |
| "[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d", |
| input_hash, input_type, wasm_block_msg, len(l0_findings), |
| ) |
| return SanitizeResult( |
| safe=False, |
| sanitized_input="", |
| truncated=truncated, |
| original_length=original_length, |
| l0_findings=l0_findings, |
| blocked_reason=wasm_block_msg, |
| input_hash=input_hash, |
| input_type="blocked", |
| wasm_verdict=wasm_verdict, |
| ) |
|
|
| if input_type not in {"source_code", "config_file", "mixed", "package_list", "sql_review"}: |
| logger.warning( |
| "[SANITIZE][%s] Result: safe=False type=%s reason=%s l0_count=%d", |
| input_hash, input_type, wasm_block_msg, len(l0_findings), |
| ) |
| return SanitizeResult( |
| safe=False, |
| sanitized_input="", |
| truncated=truncated, |
| original_length=original_length, |
| l0_findings=l0_findings, |
| blocked_reason=wasm_block_msg, |
| input_hash=input_hash, |
| input_type="blocked", |
| wasm_verdict=wasm_verdict, |
| ) |
|
|
| logger.info( |
| "[SANITIZE][%s] Result: safe=True type=%s truncated=%s l0_count=%d", |
| input_hash, input_type, truncated, len(l0_findings), |
| ) |
|
|
| return SanitizeResult( |
| safe=True, |
| sanitized_input=text, |
| truncated=truncated, |
| original_length=original_length, |
| l0_findings=l0_findings, |
| input_hash=input_hash, |
| input_type=input_type, |
| wasm_verdict=wasm_verdict, |
| ) |
|
|
|
|
| def format_l0_report(result: SanitizeResult) -> dict[str, Any]: |
| """ |
| ๅฐ SanitizeResult ่ฝๆ็บ Pipeline ๅฏ็จ็ๅญๅ
ธๆ ผๅผใ |
| ไพ main.py ไฝฟ็จ๏ผๆณจๅ
ฅ่ณ Orchestrator ็่ทฏ็ฑๆฑบ็ญใ |
| |
| Returns: |
| { |
| "safe": bool, |
| "input_type": str, |
| "truncated": bool, |
| "input_hash": str, |
| "l0_findings": [{"pattern": str, "description": str, "line_no": int, "severity": str}], |
| "l0_warning_count": int, |
| "blocked_reason": str, |
| } |
| """ |
| return { |
| "safe": result.safe, |
| "input_type": result.input_type, |
| "truncated": result.truncated, |
| "input_hash": result.input_hash, |
| "blocked_reason": result.blocked_reason, |
| "wasm_verdict": result.wasm_verdict, |
| "l0_findings": [ |
| { |
| "pattern": f.pattern_name, |
| "description": f.description, |
| "line_no": f.line_no, |
| "severity": f.severity, |
| } |
| for f in result.l0_findings |
| ], |
| "l0_warning_count": sum(1 for f in result.l0_findings if f.severity == "WARNING"), |
| } |
|
|