Threat_Hunter / harness /entropy /until_clean_loop.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
"""
柱 3:熵防管理 — UNTIL CLEAN 閉環驗證迴圈
==========================================
以「終止條件」驅動,而非「步驟列表」驅動。
三道關卡:arch-lint → entropy-scan → pytest
重複執行直到全部通過,或達到最大迭代次數後升級給工程師。
層級邊界:L3(最高層)— 可引用 L1, L2
"""
import sys
import logging
import subprocess
from pathlib import Path
from dataclasses import dataclass, field
from typing import Callable
# L3 可引用 L2
from harness.constraints.arch_linter import ArchLinter
from harness.entropy.entropy_scanner import EntropyScanner
logger = logging.getLogger("threathunter.harness.entropy")
MAX_ITERATIONS = 10 # 最大迭代次數
@dataclass
class GateResult:
"""單一關卡的執行結果"""
name: str
passed: bool
message: str
details: str = ""
@dataclass
class CleanStatus:
"""UNTIL CLEAN 迴圈的最終狀態"""
is_clean: bool
iterations: int
gate_results: list[GateResult] = field(default_factory=list)
escalation: bool = False
def summary(self) -> str:
status = "✅ CLEAN" if self.is_clean else "❌ DIRTY"
esc = " (⚠️ ESCALATION)" if self.escalation else ""
lines = [f"{status} — 迭代次數: {self.iterations}{esc}"]
for gate in self.gate_results:
icon = "✅" if gate.passed else "❌"
lines.append(f" {icon} {gate.name}: {gate.message}")
return "\n".join(lines)
class UntilCleanLoop:
"""
UNTIL CLEAN 閉環驗證迴圈
三道關卡(依序執行):
1. arch-lint → 邊界 Linter(零 error 違規)
2. entropy-scan → 熵防掃描(零 critical 指標)
3. pytest → 測試套件(全通過)
流程:
REPEAT
執行三道關卡
UNTIL all_pass == True OR iterations >= MAX_ITERATIONS
用法:
loop = UntilCleanLoop(project_root)
status = loop.run()
if not status.is_clean:
print("請工程師介入!")
"""
def __init__(self, project_root: Path, max_iterations: int = MAX_ITERATIONS):
self.project_root = project_root
self.max_iterations = max_iterations
self.linter = ArchLinter(project_root)
self.scanner = EntropyScanner(project_root)
def run(self, fix_callback: Callable[[list[GateResult]], None] | None = None) -> CleanStatus:
"""
執行 UNTIL CLEAN 迴圈
Args:
fix_callback: 可選的修復回呼函式,接收失敗的關卡結果。
若提供,每次迭代失敗後會呼叫此函式嘗試修復。
Returns:
CleanStatus 最終狀態
"""
for iteration in range(1, self.max_iterations + 1):
logger.info(f"{'─' * 40}")
logger.info(f" UNTIL CLEAN 迭代 {iteration}/{self.max_iterations}")
logger.info(f"{'─' * 40}")
gate_results = self._run_all_gates()
all_passed = all(g.passed for g in gate_results)
if all_passed:
logger.info(f"✅ SYSTEM STATUS: CLEAN(迭代 {iteration})")
return CleanStatus(
is_clean=True,
iterations=iteration,
gate_results=gate_results,
)
logger.warning(
f"迭代 {iteration}: "
f"{sum(1 for g in gate_results if not g.passed)} 道關卡未通過"
)
# 嘗試修復
if fix_callback:
failed_gates = [g for g in gate_results if not g.passed]
try:
fix_callback(failed_gates)
except Exception as e:
logger.error(f"修復回呼失敗:{e}")
# 達到最大迭代次數
logger.error(
f"❌ SYSTEM STATUS: DIRTY\n"
f"已達到最大迭代次數 ({self.max_iterations}),系統仍未 CLEAN。\n"
f"請通知工程師介入處理。(Escalation)"
)
return CleanStatus(
is_clean=False,
iterations=self.max_iterations,
gate_results=gate_results,
escalation=True,
)
def _run_all_gates(self) -> list[GateResult]:
"""依序執行三道關卡"""
results = []
# 關卡 1:arch-lint
results.append(self._gate_arch_lint())
# 關卡 2:entropy-scan
results.append(self._gate_entropy_scan())
# 關卡 3:pytest
results.append(self._gate_pytest())
return results
def _gate_arch_lint(self) -> GateResult:
"""關卡 1:邊界 Linter"""
try:
report = self.linter.lint_directory()
if report.is_clean:
return GateResult(
name="arch-lint",
passed=True,
message=f"CLEAN — {report.files_scanned} 檔案零違規",
)
else:
details = "\n".join(
f" {v.file_path}:{v.line_no} [{v.layer_name}] → {v.imported_module}"
for v in report.violations
)
return GateResult(
name="arch-lint",
passed=False,
message=f"{report.error_count} error, {report.warning_count} warning",
details=details,
)
except Exception as e:
return GateResult(
name="arch-lint",
passed=False,
message=f"執行失敗:{e}",
)
def _gate_entropy_scan(self) -> GateResult:
"""關卡 2:熵防掃描"""
try:
report = self.scanner.scan()
if report.is_clean:
return GateResult(
name="entropy-scan",
passed=True,
message=f"CLEAN — 熵分數: {report.entropy_score:.1f}",
)
else:
critical_items = [
i for i in report.indicators if i.severity == "critical"
]
details = "\n".join(f" ❌ {i.message}" for i in critical_items)
return GateResult(
name="entropy-scan",
passed=False,
message=f"熵分數: {report.entropy_score:.1f}",
details=details,
)
except Exception as e:
return GateResult(
name="entropy-scan",
passed=False,
message=f"執行失敗:{e}",
)
def _gate_pytest(self) -> GateResult:
"""關卡 3:測試套件(排除需要外部 LLM/API 的測試)"""
tests_dir = self.project_root / "tests"
if not tests_dir.exists() or not list(tests_dir.glob("test_*.py")):
return GateResult(
name="pytest",
passed=True,
message="無測試檔案,跳過(符合規範後應補測試)",
)
# 快速通道:排除需要外部 LLM 呼叫的測試(避免 rate limit 和超時)
# 這些測試標記為 @pytest.mark.llm 或在 test_redteam.py(每個測試呼叫 LLM)
fast_tests = [
str(tests_dir / "test_epss_tool.py"),
str(tests_dir / "test_security_guard.py"),
str(tests_dir / "test_intel_fusion.py"),
str(tests_dir / "test_memory_tool.py"),
str(tests_dir / "test_nvd_tool.py"),
str(tests_dir / "test_otx_tool.py"),
str(tests_dir / "test_harness.py"),
str(tests_dir / "test_pipeline_integration.py"),
]
# 只跑「快速通道」中存在的測試
existing_fast = [t for t in fast_tests if (self.project_root / t.lstrip(str(self.project_root))).exists() or Path(t).exists()]
try:
result = subprocess.run(
[sys.executable, "-m", "pytest"] + existing_fast + ["-v", "--tb=short", "-q"],
capture_output=True,
text=True,
cwd=str(self.project_root),
timeout=600, # 10 分鐘(排除 LLM 測試後,快速套件約需 5 分鐘)
env={**__import__('os').environ, "PYTHONUTF8": "1"},
)
if result.returncode == 0:
return GateResult(
name="pytest",
passed=True,
message="快速測試套件全部通過",
)
else:
# 取最後 20 行作為摘要
output_lines = (result.stdout + result.stderr).strip().split("\n")
tail = "\n".join(output_lines[-20:])
return GateResult(
name="pytest",
passed=False,
message=f"測試失敗(exit code: {result.returncode})",
details=tail,
)
except subprocess.TimeoutExpired:
return GateResult(
name="pytest",
passed=False,
message="快速測試超時(300 秒),可能有無限等待",
)
except Exception as e:
return GateResult(
name="pytest",
passed=False,
message=f"執行失敗:{e}",
)
def main() -> int:
"""CLI 入口點:until-clean"""
project_root = Path(__file__).parent.parent.parent
loop = UntilCleanLoop(project_root)
status = loop.run()
print(status.summary())
return 0 if status.is_clean else 1
if __name__ == "__main__":
sys.exit(main())