"""SQL 語法風險審查器。 此工具只審查孤立 SQL corpus 裡的危險語法與測試 payload。 它不宣稱已驗證應用程式漏洞,因為缺少 application source/sink context。 """ from __future__ import annotations import re from dataclasses import dataclass from typing import Any @dataclass(frozen=True) class SqlPatternSpec: """SQL review pattern 的固定規格。""" syntax_pattern: str category: str regex: re.Pattern[str] risk_note: str SQL_REVIEW_PATTERNS: tuple[SqlPatternSpec, ...] = ( SqlPatternSpec( syntax_pattern="boolean_blind", category="sql_injection_payload", regex=re.compile(r"(?i)(?:\bOR\b|\bAND\b)\s+(?:1\s*=\s*1|'1'\s*=\s*'1')"), risk_note="Boolean-based SQL injection payload. Requires application input-flow context.", ), SqlPatternSpec( syntax_pattern="union_select", category="sql_injection_payload", regex=re.compile(r"(?i)\bUNION\s+(?:ALL\s+)?SELECT\b"), risk_note="UNION-based SQL injection payload. Requires application input-flow context.", ), SqlPatternSpec( syntax_pattern="time_based", category="sql_injection_payload", regex=re.compile(r"(?i)\b(?:SLEEP\s*\(|BENCHMARK\s*\(|WAITFOR\s+DELAY|pg_sleep\s*\()"), risk_note="Time-based SQL injection payload. Requires runtime and database context.", ), SqlPatternSpec( syntax_pattern="error_based", category="sql_injection_payload", regex=re.compile(r"(?i)\b(?:EXTRACTVALUE\s*\(|UPDATEXML\s*\(|CONVERT\s*\(|CAST\s*\()"), risk_note="Error-based SQL injection payload. Requires database error exposure context.", ), SqlPatternSpec( syntax_pattern="stacked_query", category="dangerous_sql_syntax", regex=re.compile(r"(?i);\s*(?:DROP|EXEC|CREATE\s+USER|GRANT\s+ALL|ALTER|TRUNCATE|DELETE)\b"), risk_note="Stacked query or destructive statement pattern. Requires execution context.", ), SqlPatternSpec( syntax_pattern="dynamic_sql", category="dangerous_sql_syntax", regex=re.compile(r"(?i)(?:\bEXEC\s*\(@?\w+\)|\bsp_executesql\b|\bEXECUTE\s+IMMEDIATE\b|\bPREPARE\s+\w+\s+FROM\b)"), risk_note="Dynamic SQL execution pattern. Requires parameterization and source context.", ), SqlPatternSpec( syntax_pattern="nosql_payload", category="nosql_injection_payload", regex=re.compile(r"(?i)(?:['\"]?\$(?:gt|gte|lt|lte|ne|in|nin|regex|where)['\"]?\s*:|\$where\b)"), risk_note="NoSQL-style injection payload. Requires datastore and request parsing context.", ), ) def _line_no(text: str, offset: int) -> int: """依字元 offset 計算 1-based 行號。""" return text[:offset].count("\n") + 1 def _snippet(value: str, limit: int = 160) -> str: """回傳短 snippet,避免 review output 過長。""" compact = " ".join(value.strip().split()) return compact[:limit] def review_sql_syntax(sql_text: str) -> dict[str, Any]: """ 審查 SQL corpus 中的危險語法與 injection payload。 Returns: `sql_syntax_review` 契約;所有 finding 都標示 `requires_application_context=true`,避免被下游包裝成 verified CWE。 """ if not isinstance(sql_text, str): sql_text = str(sql_text) findings: list[dict[str, Any]] = [] seen: set[tuple[str, int, str]] = set() for spec in SQL_REVIEW_PATTERNS: for match in spec.regex.finditer(sql_text): line_no = _line_no(sql_text, match.start()) snippet = _snippet(match.group(0)) key = (spec.syntax_pattern, line_no, snippet) if key in seen: continue seen.add(key) findings.append({ "syntax_pattern": spec.syntax_pattern, "line_no": line_no, "category": spec.category, "snippet": snippet, "risk_note": spec.risk_note, "requires_application_context": True, }) categories = sorted({item["syntax_pattern"] for item in findings}) return { "review_status": "completed", "review_type": "sql_syntax_review", "requires_application_context": True, "verified_application_vulnerability": False, "code_scan_excluded": True, "package_scan_allowed": False, "patterns": findings, "summary": { "total": len(findings), "patterns_detected": len(categories), "syntax_patterns": categories, }, }