File size: 4,623 Bytes
c8d30bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""SQL 語法風險審查器。

此工具只審查孤立 SQL corpus 裡的危險語法與測試 payload。
它不宣稱已驗證應用程式漏洞,因為缺少 application source/sink context。
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Any


@dataclass(frozen=True)
class SqlPatternSpec:
    """SQL review pattern 的固定規格。"""

    syntax_pattern: str
    category: str
    regex: re.Pattern[str]
    risk_note: str


SQL_REVIEW_PATTERNS: tuple[SqlPatternSpec, ...] = (
    SqlPatternSpec(
        syntax_pattern="boolean_blind",
        category="sql_injection_payload",
        regex=re.compile(r"(?i)(?:\bOR\b|\bAND\b)\s+(?:1\s*=\s*1|'1'\s*=\s*'1')"),
        risk_note="Boolean-based SQL injection payload. Requires application input-flow context.",
    ),
    SqlPatternSpec(
        syntax_pattern="union_select",
        category="sql_injection_payload",
        regex=re.compile(r"(?i)\bUNION\s+(?:ALL\s+)?SELECT\b"),
        risk_note="UNION-based SQL injection payload. Requires application input-flow context.",
    ),
    SqlPatternSpec(
        syntax_pattern="time_based",
        category="sql_injection_payload",
        regex=re.compile(r"(?i)\b(?:SLEEP\s*\(|BENCHMARK\s*\(|WAITFOR\s+DELAY|pg_sleep\s*\()"),
        risk_note="Time-based SQL injection payload. Requires runtime and database context.",
    ),
    SqlPatternSpec(
        syntax_pattern="error_based",
        category="sql_injection_payload",
        regex=re.compile(r"(?i)\b(?:EXTRACTVALUE\s*\(|UPDATEXML\s*\(|CONVERT\s*\(|CAST\s*\()"),
        risk_note="Error-based SQL injection payload. Requires database error exposure context.",
    ),
    SqlPatternSpec(
        syntax_pattern="stacked_query",
        category="dangerous_sql_syntax",
        regex=re.compile(r"(?i);\s*(?:DROP|EXEC|CREATE\s+USER|GRANT\s+ALL|ALTER|TRUNCATE|DELETE)\b"),
        risk_note="Stacked query or destructive statement pattern. Requires execution context.",
    ),
    SqlPatternSpec(
        syntax_pattern="dynamic_sql",
        category="dangerous_sql_syntax",
        regex=re.compile(r"(?i)(?:\bEXEC\s*\(@?\w+\)|\bsp_executesql\b|\bEXECUTE\s+IMMEDIATE\b|\bPREPARE\s+\w+\s+FROM\b)"),
        risk_note="Dynamic SQL execution pattern. Requires parameterization and source context.",
    ),
    SqlPatternSpec(
        syntax_pattern="nosql_payload",
        category="nosql_injection_payload",
        regex=re.compile(r"(?i)(?:['\"]?\$(?:gt|gte|lt|lte|ne|in|nin|regex|where)['\"]?\s*:|\$where\b)"),
        risk_note="NoSQL-style injection payload. Requires datastore and request parsing context.",
    ),
)


def _line_no(text: str, offset: int) -> int:
    """依字元 offset 計算 1-based 行號。"""
    return text[:offset].count("\n") + 1


def _snippet(value: str, limit: int = 160) -> str:
    """回傳短 snippet,避免 review output 過長。"""
    compact = " ".join(value.strip().split())
    return compact[:limit]


def review_sql_syntax(sql_text: str) -> dict[str, Any]:
    """
    審查 SQL corpus 中的危險語法與 injection payload。

    Returns:
        `sql_syntax_review` 契約;所有 finding 都標示
        `requires_application_context=true`,避免被下游包裝成 verified CWE。
    """
    if not isinstance(sql_text, str):
        sql_text = str(sql_text)

    findings: list[dict[str, Any]] = []
    seen: set[tuple[str, int, str]] = set()

    for spec in SQL_REVIEW_PATTERNS:
        for match in spec.regex.finditer(sql_text):
            line_no = _line_no(sql_text, match.start())
            snippet = _snippet(match.group(0))
            key = (spec.syntax_pattern, line_no, snippet)
            if key in seen:
                continue
            seen.add(key)
            findings.append({
                "syntax_pattern": spec.syntax_pattern,
                "line_no": line_no,
                "category": spec.category,
                "snippet": snippet,
                "risk_note": spec.risk_note,
                "requires_application_context": True,
            })

    categories = sorted({item["syntax_pattern"] for item in findings})
    return {
        "review_status": "completed",
        "review_type": "sql_syntax_review",
        "requires_application_context": True,
        "verified_application_vulnerability": False,
        "code_scan_excluded": True,
        "package_scan_allowed": False,
        "patterns": findings,
        "summary": {
            "total": len(findings),
            "patterns_detected": len(categories),
            "syntax_patterns": categories,
        },
    }