File size: 9,795 Bytes
c8d30bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
柱 3:熵防管理 — UNTIL CLEAN 閉環驗證迴圈
==========================================

以「終止條件」驅動,而非「步驟列表」驅動。
三道關卡:arch-lint → entropy-scan → pytest
重複執行直到全部通過,或達到最大迭代次數後升級給工程師。

層級邊界:L3(最高層)— 可引用 L1, L2
"""

import sys
import logging
import subprocess
from pathlib import Path
from dataclasses import dataclass, field
from typing import Callable

# L3 可引用 L2
from harness.constraints.arch_linter import ArchLinter
from harness.entropy.entropy_scanner import EntropyScanner

logger = logging.getLogger("threathunter.harness.entropy")

MAX_ITERATIONS = 10  # 最大迭代次數


@dataclass
class GateResult:
    """單一關卡的執行結果"""
    name: str
    passed: bool
    message: str
    details: str = ""


@dataclass
class CleanStatus:
    """UNTIL CLEAN 迴圈的最終狀態"""
    is_clean: bool
    iterations: int
    gate_results: list[GateResult] = field(default_factory=list)
    escalation: bool = False

    def summary(self) -> str:
        status = "✅ CLEAN" if self.is_clean else "❌ DIRTY"
        esc = " (⚠️ ESCALATION)" if self.escalation else ""
        lines = [f"{status} — 迭代次數: {self.iterations}{esc}"]
        for gate in self.gate_results:
            icon = "✅" if gate.passed else "❌"
            lines.append(f"  {icon} {gate.name}: {gate.message}")
        return "\n".join(lines)


class UntilCleanLoop:
    """
    UNTIL CLEAN 閉環驗證迴圈

    三道關卡(依序執行):
    1. arch-lint  → 邊界 Linter(零 error 違規)
    2. entropy-scan → 熵防掃描(零 critical 指標)
    3. pytest     → 測試套件(全通過)

    流程:
    REPEAT
      執行三道關卡
    UNTIL all_pass == True OR iterations >= MAX_ITERATIONS

    用法:
        loop = UntilCleanLoop(project_root)
        status = loop.run()
        if not status.is_clean:
            print("請工程師介入!")
    """

    def __init__(self, project_root: Path, max_iterations: int = MAX_ITERATIONS):
        self.project_root = project_root
        self.max_iterations = max_iterations
        self.linter = ArchLinter(project_root)
        self.scanner = EntropyScanner(project_root)

    def run(self, fix_callback: Callable[[list[GateResult]], None] | None = None) -> CleanStatus:
        """
        執行 UNTIL CLEAN 迴圈

        Args:
            fix_callback: 可選的修復回呼函式,接收失敗的關卡結果。
                          若提供,每次迭代失敗後會呼叫此函式嘗試修復。

        Returns:
            CleanStatus 最終狀態
        """
        for iteration in range(1, self.max_iterations + 1):
            logger.info(f"{'─' * 40}")
            logger.info(f"  UNTIL CLEAN 迭代 {iteration}/{self.max_iterations}")
            logger.info(f"{'─' * 40}")

            gate_results = self._run_all_gates()
            all_passed = all(g.passed for g in gate_results)

            if all_passed:
                logger.info(f"✅ SYSTEM STATUS: CLEAN(迭代 {iteration})")
                return CleanStatus(
                    is_clean=True,
                    iterations=iteration,
                    gate_results=gate_results,
                )

            logger.warning(
                f"迭代 {iteration}: "
                f"{sum(1 for g in gate_results if not g.passed)} 道關卡未通過"
            )

            # 嘗試修復
            if fix_callback:
                failed_gates = [g for g in gate_results if not g.passed]
                try:
                    fix_callback(failed_gates)
                except Exception as e:
                    logger.error(f"修復回呼失敗:{e}")

        # 達到最大迭代次數
        logger.error(
            f"❌ SYSTEM STATUS: DIRTY\n"
            f"已達到最大迭代次數 ({self.max_iterations}),系統仍未 CLEAN。\n"
            f"請通知工程師介入處理。(Escalation)"
        )
        return CleanStatus(
            is_clean=False,
            iterations=self.max_iterations,
            gate_results=gate_results,
            escalation=True,
        )

    def _run_all_gates(self) -> list[GateResult]:
        """依序執行三道關卡"""
        results = []

        # 關卡 1:arch-lint
        results.append(self._gate_arch_lint())

        # 關卡 2:entropy-scan
        results.append(self._gate_entropy_scan())

        # 關卡 3:pytest
        results.append(self._gate_pytest())

        return results

    def _gate_arch_lint(self) -> GateResult:
        """關卡 1:邊界 Linter"""
        try:
            report = self.linter.lint_directory()
            if report.is_clean:
                return GateResult(
                    name="arch-lint",
                    passed=True,
                    message=f"CLEAN — {report.files_scanned} 檔案零違規",
                )
            else:
                details = "\n".join(
                    f"  {v.file_path}:{v.line_no} [{v.layer_name}] → {v.imported_module}"
                    for v in report.violations
                )
                return GateResult(
                    name="arch-lint",
                    passed=False,
                    message=f"{report.error_count} error, {report.warning_count} warning",
                    details=details,
                )
        except Exception as e:
            return GateResult(
                name="arch-lint",
                passed=False,
                message=f"執行失敗:{e}",
            )

    def _gate_entropy_scan(self) -> GateResult:
        """關卡 2:熵防掃描"""
        try:
            report = self.scanner.scan()
            if report.is_clean:
                return GateResult(
                    name="entropy-scan",
                    passed=True,
                    message=f"CLEAN — 熵分數: {report.entropy_score:.1f}",
                )
            else:
                critical_items = [
                    i for i in report.indicators if i.severity == "critical"
                ]
                details = "\n".join(f"  ❌ {i.message}" for i in critical_items)
                return GateResult(
                    name="entropy-scan",
                    passed=False,
                    message=f"熵分數: {report.entropy_score:.1f}",
                    details=details,
                )
        except Exception as e:
            return GateResult(
                name="entropy-scan",
                passed=False,
                message=f"執行失敗:{e}",
            )

    def _gate_pytest(self) -> GateResult:
        """關卡 3:測試套件(排除需要外部 LLM/API 的測試)"""
        tests_dir = self.project_root / "tests"
        if not tests_dir.exists() or not list(tests_dir.glob("test_*.py")):
            return GateResult(
                name="pytest",
                passed=True,
                message="無測試檔案,跳過(符合規範後應補測試)",
            )

        # 快速通道:排除需要外部 LLM 呼叫的測試(避免 rate limit 和超時)
        # 這些測試標記為 @pytest.mark.llm 或在 test_redteam.py(每個測試呼叫 LLM)
        fast_tests = [
            str(tests_dir / "test_epss_tool.py"),
            str(tests_dir / "test_security_guard.py"),
            str(tests_dir / "test_intel_fusion.py"),
            str(tests_dir / "test_memory_tool.py"),
            str(tests_dir / "test_nvd_tool.py"),
            str(tests_dir / "test_otx_tool.py"),
            str(tests_dir / "test_harness.py"),
            str(tests_dir / "test_pipeline_integration.py"),
        ]
        # 只跑「快速通道」中存在的測試
        existing_fast = [t for t in fast_tests if (self.project_root / t.lstrip(str(self.project_root))).exists() or Path(t).exists()]

        try:
            result = subprocess.run(
                [sys.executable, "-m", "pytest"] + existing_fast + ["-v", "--tb=short", "-q"],
                capture_output=True,
                text=True,
                cwd=str(self.project_root),
                timeout=600,  # 10 分鐘(排除 LLM 測試後,快速套件約需 5 分鐘)
                env={**__import__('os').environ, "PYTHONUTF8": "1"},
            )

            if result.returncode == 0:
                return GateResult(
                    name="pytest",
                    passed=True,
                    message="快速測試套件全部通過",
                )
            else:
                # 取最後 20 行作為摘要
                output_lines = (result.stdout + result.stderr).strip().split("\n")
                tail = "\n".join(output_lines[-20:])
                return GateResult(
                    name="pytest",
                    passed=False,
                    message=f"測試失敗(exit code: {result.returncode})",
                    details=tail,
                )

        except subprocess.TimeoutExpired:
            return GateResult(
                name="pytest",
                passed=False,
                message="快速測試超時(300 秒),可能有無限等待",
            )
        except Exception as e:
            return GateResult(
                name="pytest",
                passed=False,
                message=f"執行失敗:{e}",
            )


def main() -> int:
    """CLI 入口點:until-clean"""
    project_root = Path(__file__).parent.parent.parent
    loop = UntilCleanLoop(project_root)
    status = loop.run()
    print(status.summary())
    return 0 if status.is_clean else 1


if __name__ == "__main__":
    sys.exit(main())