| """ |
| 柱 3:熵防管理 — UNTIL CLEAN 閉環驗證迴圈 |
| ========================================== |
| |
| 以「終止條件」驅動,而非「步驟列表」驅動。 |
| 三道關卡:arch-lint → entropy-scan → pytest |
| 重複執行直到全部通過,或達到最大迭代次數後升級給工程師。 |
| |
| 層級邊界:L3(最高層)— 可引用 L1, L2 |
| """ |
|
|
| import sys |
| import logging |
| import subprocess |
| from pathlib import Path |
| from dataclasses import dataclass, field |
| from typing import Callable |
|
|
| |
| from harness.constraints.arch_linter import ArchLinter |
| from harness.entropy.entropy_scanner import EntropyScanner |
|
|
| logger = logging.getLogger("threathunter.harness.entropy") |
|
|
| MAX_ITERATIONS = 10 |
|
|
|
|
| @dataclass |
| class GateResult: |
| """單一關卡的執行結果""" |
| name: str |
| passed: bool |
| message: str |
| details: str = "" |
|
|
|
|
| @dataclass |
| class CleanStatus: |
| """UNTIL CLEAN 迴圈的最終狀態""" |
| is_clean: bool |
| iterations: int |
| gate_results: list[GateResult] = field(default_factory=list) |
| escalation: bool = False |
|
|
| def summary(self) -> str: |
| status = "✅ CLEAN" if self.is_clean else "❌ DIRTY" |
| esc = " (⚠️ ESCALATION)" if self.escalation else "" |
| lines = [f"{status} — 迭代次數: {self.iterations}{esc}"] |
| for gate in self.gate_results: |
| icon = "✅" if gate.passed else "❌" |
| lines.append(f" {icon} {gate.name}: {gate.message}") |
| return "\n".join(lines) |
|
|
|
|
| class UntilCleanLoop: |
| """ |
| UNTIL CLEAN 閉環驗證迴圈 |
| |
| 三道關卡(依序執行): |
| 1. arch-lint → 邊界 Linter(零 error 違規) |
| 2. entropy-scan → 熵防掃描(零 critical 指標) |
| 3. pytest → 測試套件(全通過) |
| |
| 流程: |
| REPEAT |
| 執行三道關卡 |
| UNTIL all_pass == True OR iterations >= MAX_ITERATIONS |
| |
| 用法: |
| loop = UntilCleanLoop(project_root) |
| status = loop.run() |
| if not status.is_clean: |
| print("請工程師介入!") |
| """ |
|
|
| def __init__(self, project_root: Path, max_iterations: int = MAX_ITERATIONS): |
| self.project_root = project_root |
| self.max_iterations = max_iterations |
| self.linter = ArchLinter(project_root) |
| self.scanner = EntropyScanner(project_root) |
|
|
| def run(self, fix_callback: Callable[[list[GateResult]], None] | None = None) -> CleanStatus: |
| """ |
| 執行 UNTIL CLEAN 迴圈 |
| |
| Args: |
| fix_callback: 可選的修復回呼函式,接收失敗的關卡結果。 |
| 若提供,每次迭代失敗後會呼叫此函式嘗試修復。 |
| |
| Returns: |
| CleanStatus 最終狀態 |
| """ |
| for iteration in range(1, self.max_iterations + 1): |
| logger.info(f"{'─' * 40}") |
| logger.info(f" UNTIL CLEAN 迭代 {iteration}/{self.max_iterations}") |
| logger.info(f"{'─' * 40}") |
|
|
| gate_results = self._run_all_gates() |
| all_passed = all(g.passed for g in gate_results) |
|
|
| if all_passed: |
| logger.info(f"✅ SYSTEM STATUS: CLEAN(迭代 {iteration})") |
| return CleanStatus( |
| is_clean=True, |
| iterations=iteration, |
| gate_results=gate_results, |
| ) |
|
|
| logger.warning( |
| f"迭代 {iteration}: " |
| f"{sum(1 for g in gate_results if not g.passed)} 道關卡未通過" |
| ) |
|
|
| |
| if fix_callback: |
| failed_gates = [g for g in gate_results if not g.passed] |
| try: |
| fix_callback(failed_gates) |
| except Exception as e: |
| logger.error(f"修復回呼失敗:{e}") |
|
|
| |
| logger.error( |
| f"❌ SYSTEM STATUS: DIRTY\n" |
| f"已達到最大迭代次數 ({self.max_iterations}),系統仍未 CLEAN。\n" |
| f"請通知工程師介入處理。(Escalation)" |
| ) |
| return CleanStatus( |
| is_clean=False, |
| iterations=self.max_iterations, |
| gate_results=gate_results, |
| escalation=True, |
| ) |
|
|
| def _run_all_gates(self) -> list[GateResult]: |
| """依序執行三道關卡""" |
| results = [] |
|
|
| |
| results.append(self._gate_arch_lint()) |
|
|
| |
| results.append(self._gate_entropy_scan()) |
|
|
| |
| results.append(self._gate_pytest()) |
|
|
| return results |
|
|
| def _gate_arch_lint(self) -> GateResult: |
| """關卡 1:邊界 Linter""" |
| try: |
| report = self.linter.lint_directory() |
| if report.is_clean: |
| return GateResult( |
| name="arch-lint", |
| passed=True, |
| message=f"CLEAN — {report.files_scanned} 檔案零違規", |
| ) |
| else: |
| details = "\n".join( |
| f" {v.file_path}:{v.line_no} [{v.layer_name}] → {v.imported_module}" |
| for v in report.violations |
| ) |
| return GateResult( |
| name="arch-lint", |
| passed=False, |
| message=f"{report.error_count} error, {report.warning_count} warning", |
| details=details, |
| ) |
| except Exception as e: |
| return GateResult( |
| name="arch-lint", |
| passed=False, |
| message=f"執行失敗:{e}", |
| ) |
|
|
| def _gate_entropy_scan(self) -> GateResult: |
| """關卡 2:熵防掃描""" |
| try: |
| report = self.scanner.scan() |
| if report.is_clean: |
| return GateResult( |
| name="entropy-scan", |
| passed=True, |
| message=f"CLEAN — 熵分數: {report.entropy_score:.1f}", |
| ) |
| else: |
| critical_items = [ |
| i for i in report.indicators if i.severity == "critical" |
| ] |
| details = "\n".join(f" ❌ {i.message}" for i in critical_items) |
| return GateResult( |
| name="entropy-scan", |
| passed=False, |
| message=f"熵分數: {report.entropy_score:.1f}", |
| details=details, |
| ) |
| except Exception as e: |
| return GateResult( |
| name="entropy-scan", |
| passed=False, |
| message=f"執行失敗:{e}", |
| ) |
|
|
| def _gate_pytest(self) -> GateResult: |
| """關卡 3:測試套件(排除需要外部 LLM/API 的測試)""" |
| tests_dir = self.project_root / "tests" |
| if not tests_dir.exists() or not list(tests_dir.glob("test_*.py")): |
| return GateResult( |
| name="pytest", |
| passed=True, |
| message="無測試檔案,跳過(符合規範後應補測試)", |
| ) |
|
|
| |
| |
| fast_tests = [ |
| str(tests_dir / "test_epss_tool.py"), |
| str(tests_dir / "test_security_guard.py"), |
| str(tests_dir / "test_intel_fusion.py"), |
| str(tests_dir / "test_memory_tool.py"), |
| str(tests_dir / "test_nvd_tool.py"), |
| str(tests_dir / "test_otx_tool.py"), |
| str(tests_dir / "test_harness.py"), |
| str(tests_dir / "test_pipeline_integration.py"), |
| ] |
| |
| existing_fast = [t for t in fast_tests if (self.project_root / t.lstrip(str(self.project_root))).exists() or Path(t).exists()] |
|
|
| try: |
| result = subprocess.run( |
| [sys.executable, "-m", "pytest"] + existing_fast + ["-v", "--tb=short", "-q"], |
| capture_output=True, |
| text=True, |
| cwd=str(self.project_root), |
| timeout=600, |
| env={**__import__('os').environ, "PYTHONUTF8": "1"}, |
| ) |
|
|
| if result.returncode == 0: |
| return GateResult( |
| name="pytest", |
| passed=True, |
| message="快速測試套件全部通過", |
| ) |
| else: |
| |
| output_lines = (result.stdout + result.stderr).strip().split("\n") |
| tail = "\n".join(output_lines[-20:]) |
| return GateResult( |
| name="pytest", |
| passed=False, |
| message=f"測試失敗(exit code: {result.returncode})", |
| details=tail, |
| ) |
|
|
| except subprocess.TimeoutExpired: |
| return GateResult( |
| name="pytest", |
| passed=False, |
| message="快速測試超時(300 秒),可能有無限等待", |
| ) |
| except Exception as e: |
| return GateResult( |
| name="pytest", |
| passed=False, |
| message=f"執行失敗:{e}", |
| ) |
|
|
|
|
| def main() -> int: |
| """CLI 入口點:until-clean""" |
| project_root = Path(__file__).parent.parent.parent |
| loop = UntilCleanLoop(project_root) |
| status = loop.run() |
| print(status.summary()) |
| return 0 if status.is_clean else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|