Threat_Hunter / checkpoint.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
"""
checkpoint.py โ€” Pipeline ๅŸท่กŒๆชขๆŸฅ้ปž่จ˜้Œ„ๅ™จ
===========================================
่จญ่จˆๅŽŸๅ‰‡๏ผš
- ้›ถ้˜ปๅกž๏ผšI/O ๆ“ไฝœ็›กๅฏ่ƒฝ่ผ•้‡๏ผˆappend-only JSONL๏ผ‰
- ้›ถๅคฑๆ•—๏ผšไปปไฝ•่จ˜้Œ„้Œฏ่ชค้ƒฝ่ขซ้œ้ป˜ๅžๅ™ฌ๏ผŒ็ต•ไธๅฝฑ้Ÿฟ Pipeline
- ็ตๆง‹ๅŒ–๏ผšๆฏๆข่จ˜้Œ„้ƒฝๆ˜ฏๅฏๆŸฅ่ฉข็š„ JSON
- ๅŸท่กŒ็ท’ๅฎ‰ๅ…จ๏ผš
Phase 4A: ๅ„ชๅ…ˆไฝฟ็”จ Rust threathunter_checkpoint_writer
๏ผˆparking_lot::Mutex + BufWriter<File>๏ผŒ้ซ˜้ ป SSE ไธ็ซถ็ˆญ๏ผ‰
Fallback: Python threading.Lock + TextIO๏ผˆๅŽŸๆœ‰ๅฏฆไฝœ๏ผ‰
่ผธๅ‡บๆ ผๅผ๏ผˆJSONL๏ผ‰๏ผš
logs/checkpoints/scan_{id}_{timestamp}.jsonl
ไบ‹ไปถ้กžๅž‹๏ผš
SCAN_START / SCAN_END
STAGE_ENTER / STAGE_EXIT
LLM_CALL / LLM_RESULT / LLM_RETRY / LLM_ERROR
TOOL_CALL / HARNESS_CHECK / DEGRADATION
้ตๅฎˆ๏ผšAGENTS.md + project_CONSTITUTION.md
"""
import hashlib
import json
import logging
import os
import re
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, TextIO
logger = logging.getLogger("ThreatHunter.checkpoint")
# โ”€โ”€ Phase 4A๏ผšRust BufWriter ๆ•ดๅˆ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ๅ„ชๅ…ˆ่ผ‰ๅ…ฅ Rust crate๏ผ›ไธๅฏ็”จๆ™‚๏ผˆๆœช็ทจ่ญฏใ€้ž Windows ็ญ‰๏ผ‰่‡ชๅ‹•้™็ดš
try:
import threathunter_checkpoint_writer as _cw
_RUST_WRITER_AVAILABLE = True
logger.info("[CHECKPOINT] Phase 4A: Rust BufWriter ๅ•Ÿ็”จ โœ“")
except ImportError:
_cw = None # type: ignore[assignment]
_RUST_WRITER_AVAILABLE = False
logger.debug("[CHECKPOINT] Phase 4A: Rust BufWriter ไธๅฏ็”จ๏ผŒไฝฟ็”จ Python fallback")
# โ”€โ”€ ็’ฐๅขƒ่ฎŠๆ•ธ้–‹้—œ๏ผˆๅ›žๆปพ็ญ–็•ฅ็ฌฌไธ‰็ดš๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
ENABLED = os.getenv("CHECKPOINT_ENABLED", "true").lower() != "false"
# โ”€โ”€ ๆ•ๆ„Ÿ่ณ‡ๆ–™้ฎ็ฝฉๆจกๅผ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_SENSITIVE_PATTERNS = [
re.compile(r"(sk(?:-proj)?-[a-zA-Z0-9\-_]{10,})", re.IGNORECASE), # OpenAI-style keys
re.compile(r"(ghp_[a-zA-Z0-9]{36,})", re.IGNORECASE), # GitHub Token
re.compile(r"(api[_-]?key\s*[:=]\s*['\"]?)([^'\"\s,]{8,})", re.IGNORECASE),
re.compile(r"(password\s*[:=]\s*['\"]?)([^'\"\s,]{4,})", re.IGNORECASE),
re.compile(r"(secret\s*[:=]\s*['\"]?)([^'\"\s,]{8,})", re.IGNORECASE),
]
# โ”€โ”€ ๆˆชๆ–ทไธŠ้™ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
MAX_DATA_LENGTH = 2000 # ๅ–ฎไธ€ๆฌ„ไฝๅ€ผๆœ€ๅคง้•ทๅบฆ
MAX_THINKING_LENGTH = 1000 # ๆ€่€ƒ้Ž็จ‹ๆ‘˜่ฆๆœ€ๅคง้•ทๅบฆ
def _redact(text: str) -> str:
"""้ฎ็ฝฉๆ•ๆ„Ÿ่ณ‡ๆ–™๏ผˆAPI Keyใ€ๅฏ†็ขผ็ญ‰๏ผ‰"""
if not isinstance(text, str):
return str(text)
result = text
for pattern in _SENSITIVE_PATTERNS:
result = pattern.sub(lambda m: m.group(0)[:4] + "***REDACTED***", result)
return result
def _truncate(value: Any, max_len: int = MAX_DATA_LENGTH) -> str:
"""ๆˆชๆ–ท้Ž้•ท็š„ๅ€ผ"""
s = str(value) if not isinstance(value, str) else value
if len(s) > max_len:
return s[:max_len] + f"...[truncated, total={len(s)}]"
return s
def _safe_hash(text: str) -> str:
"""่จˆ็ฎ—่ผธๅ…ฅ็š„็Ÿญ hash๏ผˆ็”จๆ–ผ่ฟฝ่นคๅŒไธ€่ผธๅ…ฅ็š„ๅคšๆฌกๅŸท่กŒ๏ผ‰"""
return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest()[:12]
class CheckpointRecorder:
"""
Pipeline ๅŸท่กŒๆชขๆŸฅ้ปž่จ˜้Œ„ๅ™จใ€‚
ๆฏๆฌกๆŽƒๆๅ‘ผๅซ start_scan() ๅˆๅง‹ๅŒ–ไธ€ๅ€‹ JSONL ๆช”ๆกˆ๏ผŒ
ๅพŒ็บŒๆ‰€ๆœ‰ checkpoint() ๅ‘ผๅซ่ฟฝๅŠ ไธ€่กŒ JSONใ€‚
ๆŽƒๆ็ตๆŸๅ‘ผๅซ end_scan() ้—œ้–‰ๆช”ๆกˆใ€‚
ๆ‰€ๆœ‰ๅ…ฌ้–‹ๆ–นๆณ•้ƒฝๆ˜ฏใ€Œ้œ้ป˜ๆจกๅผใ€๏ผš
- ไปปไฝ•ๅ…ง้ƒจ้Œฏ่ชค่ขซ try-except ๆ•ๆ‰
- ๅƒ…่จ˜้Œ„ๅˆฐ logger.debug๏ผŒไธๆ‹‹ๅ‡บไพ‹ๅค–
- Pipeline ไธปๆต็จ‹ๅฎŒๅ…จไธๅ—ๅฝฑ้Ÿฟ
"""
def __init__(self, logs_dir: Path | str | None = None):
if logs_dir is None:
logs_dir = Path(__file__).parent / "logs"
self._logs_dir = Path(logs_dir)
self._checkpoints_dir = self._logs_dir / "checkpoints"
self._errors_dir = self._logs_dir / "errors"
self._scan_id: str = "unknown"
self._seq: int = 0
self._file: TextIO | None = None # Python fallback writer
self._lock = threading.Lock() # Python fallback lock
self._event_counts: dict[str, int] = {}
self._scan_start_time: float = 0.0
self._current_filename: str = "" # v3.6: Thinking Path API ไฝฟ็”จ
# Phase 4A๏ผš่ฟฝ่นค Rust writer ๆ˜ฏๅฆๅฐๆœฌๆฌกๆŽƒๆ้–‹ๅ•Ÿ
self._rust_writer_active: bool = False
@property
def current_filename(self) -> str:
"""ๅ›žๅ‚ณๆœฌๆฌกๆŽƒๆ็š„ JSONL ๆช”ๅ๏ผˆไพ› server.py Thinking Path API ไฝฟ็”จ๏ผ‰"""
return self._current_filename
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆŽƒๆ็”Ÿๅ‘ฝ้€ฑๆœŸ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def start_scan(self, scan_id: str) -> None:
"""ๅˆๅง‹ๅŒ–ๆ–ฐๆŽƒๆ็š„ checkpoint ๆช”ๆกˆ"""
if not ENABLED:
return
try:
self._checkpoints_dir.mkdir(parents=True, exist_ok=True)
self._errors_dir.mkdir(parents=True, exist_ok=True)
self._scan_id = scan_id
self._seq = 0
self._event_counts = {}
self._scan_start_time = time.time()
self._rust_writer_active = False
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"scan_{scan_id[:8]}_{ts}.jsonl"
filepath = self._checkpoints_dir / filename
self._current_filename = filename # v3.6: ไพ› Thinking Path API ๆŸฅ่ฉข
# โ”€โ”€ Phase 4A๏ผšๅ„ชๅ…ˆๅ˜—่ฉฆ Rust BufWriter โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if _RUST_WRITER_AVAILABLE:
try:
_cw.open_writer(str(filepath))
self._rust_writer_active = True
logger.info(
"[CHECKPOINT] Phase 4A: Rust BufWriter ้–‹ๅ•Ÿ: %s", filepath.name
)
except Exception as rust_err:
logger.warning(
"[CHECKPOINT] Phase 4A: Rust BufWriter ้–‹ๅ•Ÿๅคฑๆ•—๏ผŒๅ›ž้€€ Python: %s", rust_err
)
self._rust_writer_active = False
# โ”€โ”€ Fallback๏ผšPython TextIO โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if not self._rust_writer_active:
if self._file and not self._file.closed:
try:
self._file.close()
except Exception:
pass
self._file = open(filepath, "a", encoding="utf-8")
self.checkpoint("SCAN_START", "pipeline", {
"scan_id": scan_id,
"writer_backend": "rust_bufwriter" if self._rust_writer_active else "python_lock",
})
logger.info("[CHECKPOINT] ๆŽƒๆ่จ˜้Œ„้–‹ๅง‹: %s", filepath.name)
except Exception as e:
logger.debug("[CHECKPOINT] start_scan failed: %s", e)
def end_scan(self, final_status: str, total_duration: float) -> None:
"""ๆŽƒๆ็ตๆŸ๏ผŒๅฏซๅ…ฅๆ‘˜่ฆไธฆ้—œ้–‰ๆช”ๆกˆ"""
if not ENABLED:
return
try:
self.checkpoint("SCAN_END", "pipeline", {
"final_status": final_status,
"total_duration_seconds": round(total_duration, 2),
"total_checkpoints": self._seq,
"event_summary": dict(self._event_counts),
"writer_backend": "rust_bufwriter" if self._rust_writer_active else "python_lock",
})
# โ”€โ”€ Phase 4A๏ผš้—œ้–‰ Rust writer โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if self._rust_writer_active and _RUST_WRITER_AVAILABLE:
try:
_cw.flush_writer()
_cw.close_writer()
logger.debug(
"[CHECKPOINT] Phase 4A: Rust BufWriter ๅทฒ้—œ้–‰๏ผŒๅ…ฑๅฏซๅ…ฅ %d ่กŒ",
_cw.get_lines_written(),
)
except Exception as e:
logger.debug("[CHECKPOINT] Phase 4A: Rust close ๅคฑๆ•—: %s", e)
finally:
self._rust_writer_active = False
# โ”€โ”€ Fallback๏ผšPython TextIO ้—œ้–‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if self._file and not self._file.closed:
self._file.close()
self._file = None
logger.info(
"[CHECKPOINT] ๆŽƒๆ่จ˜้Œ„็ตๆŸ: %d ๆข checkpoint | %.1fs",
self._seq, total_duration,
)
except Exception as e:
logger.debug("[CHECKPOINT] end_scan failed: %s", e)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆ ธๅฟƒๅฏซๅ…ฅ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def checkpoint(self, event: str, agent: str, data: dict | None = None) -> None:
"""
ๅฏซๅ…ฅไธ€ๆข checkpoint ่จ˜้Œ„๏ผˆๅŸท่กŒ็ท’ๅฎ‰ๅ…จ๏ผ‰ใ€‚
Phase 4A๏ผšๅ„ชๅ…ˆไฝฟ็”จ Rust BufWriter๏ผˆparking_lot::Mutex๏ผŒ็„ก GIL ็ซถ็ˆญ๏ผ‰ใ€‚
Fallback๏ผšPython threading.Lock + TextIO๏ผˆๅŽŸๆœ‰ๅฏฆไฝœ๏ผŒๅฎŒๅ…จ็ญ‰ๆ•ˆ๏ผ‰ใ€‚
Args:
event: ไบ‹ไปถ้กžๅž‹๏ผˆๅฆ‚ STAGE_ENTER, LLM_CALL ็ญ‰๏ผ‰
agent: Agent ๅ็จฑ๏ผˆๅฆ‚ scout, security_guard๏ผ‰
data: ้™„ๅŠ ่ณ‡ๆ–™ๅญ—ๅ…ธ
"""
if not ENABLED:
return
try:
with self._lock:
self._seq += 1
self._event_counts[event] = self._event_counts.get(event, 0) + 1
record = {
"seq": self._seq,
"ts": datetime.now(timezone.utc).isoformat(),
"scan_id": self._scan_id,
"event": event,
"agent": agent,
"data": self._sanitize_data(data or {}),
}
line = json.dumps(record, ensure_ascii=False, default=str)
# โ”€โ”€ Phase 4A๏ผšRust BufWriter ๅฏซๅ…ฅ่ทฏๅพ‘ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if self._rust_writer_active and _RUST_WRITER_AVAILABLE:
try:
_cw.write_line(line)
# ้ซ˜ๅ„ชๅ…ˆ็ดšไบ‹ไปถ๏ผˆLLM ้Œฏ่ชค / ๆŽƒๆ้‚Š็•Œ๏ผ‰็ซ‹ๅณ flush
if event in (
"SCAN_START",
"SCAN_END",
"STAGE_ENTER",
"STAGE_EXIT",
"LLM_ERROR",
"DEGRADATION",
):
_cw.flush_writer()
return
except Exception as rust_err:
# Rust ๅฏซๅ…ฅๅคฑๆ•— โ†’ ๅ›ž้€€๏ผŒไธฆ็ฆ็”จ Rust writer ้ฟๅ…ๅพŒ็บŒ้‡่ฉฆ
logger.warning(
"[CHECKPOINT] Phase 4A Rust write ๅคฑๆ•—๏ผŒๅˆ‡ๆ› Python: %s", rust_err
)
self._rust_writer_active = False
# โ”€โ”€ Fallback๏ผšPython TextIO โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if self._file and not self._file.closed:
self._file.write(line + "\n")
self._file.flush() # ๅณๆ™‚ๅฏซๅ…ฅ๏ผŒdebug ๆ™‚ๆ›ดๆ˜“่ฟฝ่นค
except Exception as e:
logger.debug("[CHECKPOINT] write failed: %s", e)
def _sanitize_data(self, data: dict) -> dict:
"""ๆธ…ๆด—่ณ‡ๆ–™๏ผšๆˆชๆ–ท + ้ฎ็ฝฉๆ•ๆ„Ÿ่ณ‡่จŠ"""
sanitized = {}
for key, value in data.items():
if isinstance(value, dict):
sanitized[key] = self._sanitize_data(value)
elif isinstance(value, (list, tuple)):
sanitized[key] = _truncate(str(value))
elif isinstance(value, str):
sanitized[key] = _redact(_truncate(value))
elif isinstance(value, (int, float, bool)):
sanitized[key] = value
else:
sanitized[key] = _truncate(str(value))
return sanitized
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Stage ๅฑคไพฟๆทๆ–นๆณ•
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def stage_enter(
self,
agent: str,
input_data: Any = None,
skill_file: str = "",
input_type: str = "",
) -> None:
"""Stage ้€ฒๅ…ฅ้ปž checkpoint โ€” v3.7: ๅŠ ๅ…ฅ skill_file / input_type ๆฌ„ไฝไพ› Thinking Path UI ไฝฟ็”จ"""
try:
data: dict[str, Any] = {}
# v3.7: Path-Aware Skills ่ฟฝ่นค่ณ‡ๆ–™
if skill_file:
data["skill_file"] = skill_file
if input_type:
data["input_type"] = input_type
# ่ผธๅ…ฅๆ‘˜่ฆ
if isinstance(input_data, dict):
data["input_keys"] = list(input_data.keys())[:20]
if "tech_stack" in input_data:
data["tech_stack_preview"] = _truncate(
str(input_data["tech_stack"]), 200
)
if "vulnerabilities" in input_data:
data["vuln_count"] = len(input_data.get("vulnerabilities", []))
data["input_hash"] = _safe_hash(json.dumps(input_data, default=str))
elif isinstance(input_data, str):
data["input_preview"] = _truncate(input_data, 200)
data["input_hash"] = _safe_hash(input_data)
data["input_length"] = len(input_data)
self.checkpoint("STAGE_ENTER", agent, data)
except Exception:
pass
def stage_exit(
self,
agent: str,
status: str,
output_data: Any = None,
duration_ms: int = 0,
) -> None:
"""Stage ้›ข้–‹้ปž checkpoint"""
try:
data: dict[str, Any] = {
"status": status,
"duration_ms": duration_ms,
}
if isinstance(output_data, dict):
data["output_keys"] = list(output_data.keys())[:20]
if "vulnerabilities" in output_data:
data["vuln_count"] = len(output_data.get("vulnerabilities", []))
if "risk_score" in output_data:
data["risk_score"] = output_data["risk_score"]
if "verdict" in output_data:
data["verdict"] = output_data["verdict"]
if output_data.get("_degraded"):
data["degraded"] = True
if "scan_path" in output_data:
data["scan_path"] = output_data["scan_path"]
self.checkpoint("STAGE_EXIT", agent, data)
# โ”€โ”€ ่‡ชๅ‹•ๆณจๅ…ฅ DEGRADATION checkpoint โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ็•ถ stage ่ผธๅ‡บๅŒ…ๅซ _degraded=True ๆ™‚๏ผŒ็ซ‹ๅณ่ฃœๅฏซไธ€ๆข DEGRADATION
# ไบ‹ไปถ๏ผŒ่ฎ“ Thinking Path ้ขๆฟ่ƒฝๆญฃ็ขบ้กฏ็คบ้™็ดšๅŽŸๅ› ๏ผŒไธ่ฎ“้–‹็™ผ่€…็žŽ็Œœ
if isinstance(output_data, dict) and output_data.get("_degraded"):
error_msg = str(output_data.get("_error", "Unknown degradation reason"))
strategy = f"status={status}, duration={duration_ms}ms"
self.checkpoint("DEGRADATION", agent, {
"reason": _truncate(error_msg, 400),
"fallback_strategy": strategy,
"error": _truncate(error_msg, 400), # ไพ›ๅ‰็ซฏ tp-error-text ไฝฟ็”จ
"source": "stage_exit_auto",
})
except Exception:
pass
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# LLM ๅฑคไพฟๆทๆ–นๆณ•
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def llm_call(
self,
agent: str,
model: str,
provider: str = "openrouter",
task_preview: str = "",
) -> None:
"""LLM ๅ‘ผๅซๅ‰ checkpoint"""
try:
self.checkpoint("LLM_CALL", agent, {
"model": model,
"provider": provider,
"task_preview": _truncate(task_preview, 300),
})
except Exception:
pass
def llm_result(
self,
agent: str,
model: str,
status: str,
output_len: int,
duration_ms: int,
thinking: str = "",
) -> None:
"""LLM ๅ‘ผๅซๅพŒ checkpoint๏ผˆๅซๆ€่€ƒ้Ž็จ‹ๆ‘˜่ฆ๏ผ‰"""
try:
data: dict[str, Any] = {
"model": model,
"status": status,
"output_length": output_len,
"duration_ms": duration_ms,
}
if thinking:
data["thinking_preview"] = _redact(
_truncate(thinking, MAX_THINKING_LENGTH)
)
self.checkpoint("LLM_RESULT", agent, data)
except Exception:
pass
def llm_retry(
self,
agent: str,
model: str,
error: str,
retry_count: int,
next_model: str,
) -> None:
"""LLM ้‡่ฉฆ checkpoint"""
try:
self.checkpoint("LLM_RETRY", agent, {
"failed_model": model,
"error": _truncate(error, 300),
"retry_count": retry_count,
"next_model": next_model,
})
except Exception:
pass
def llm_error(self, agent: str, model: str, error: str) -> None:
"""LLM ๅคฑๆ•— checkpoint๏ผˆๅŒๆ™‚ๅฏซๅ…ฅ error log๏ผ‰"""
try:
self.checkpoint("LLM_ERROR", agent, {
"model": model,
"error": _truncate(error, 500),
})
# ๅŒๆญฅๅฏซๅ…ฅ error log
self._write_error_log(agent, model, error)
except Exception:
pass
def _write_error_log(self, agent: str, model: str, error: str) -> None:
"""ๅฐ‡ LLM ้Œฏ่ชคๅฏซๅ…ฅ็จ็ซ‹็š„ error log ๆช”ๆกˆ"""
try:
self._errors_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d")
error_file = self._errors_dir / f"errors_{ts}.log"
with open(error_file, "a", encoding="utf-8") as f:
now = datetime.now(timezone.utc).isoformat()
f.write(
f"[{now}] scan={self._scan_id} agent={agent} "
f"model={model} error={error[:300]}\n"
)
except Exception:
pass
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅทฅๅ…ท / Harness ๅฑคไพฟๆทๆ–นๆณ•
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def tool_call(
self,
agent: str,
tool_name: str,
tool_input: str,
tool_output: str,
status: str = "SUCCESS",
) -> None:
"""ๅทฅๅ…ทๅ‘ผๅซ checkpoint"""
try:
self.checkpoint("TOOL_CALL", agent, {
"tool_name": tool_name,
"input": _truncate(tool_input, 500),
"output_preview": _truncate(tool_output, 500),
"status": status,
})
except Exception:
pass
def harness_check(
self,
agent: str,
layer: str,
check_name: str,
result: str,
action: str = "",
details: dict | None = None,
) -> None:
"""Harness ไฟ้šœๅฑค่งธ็™ผ checkpoint"""
try:
data: dict[str, Any] = {
"layer": layer,
"check_name": check_name,
"result": result,
}
if action:
data["corrective_action"] = action
if details:
data.update(details)
self.checkpoint("HARNESS_CHECK", agent, data)
except Exception:
pass
def degradation(
self,
agent: str,
reason: str,
fallback_strategy: str = "",
) -> None:
"""้™็ดš่งธ็™ผ checkpoint"""
try:
self.checkpoint("DEGRADATION", agent, {
"reason": _truncate(reason, 300),
"fallback_strategy": fallback_strategy,
})
except Exception:
pass
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ตฑ่จˆ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def get_summary(self) -> dict:
"""ๅ›žๅ‚ณ็•ถๅ‰ๆŽƒๆ็š„็ตฑ่จˆๆ‘˜่ฆ"""
try:
elapsed = time.time() - self._scan_start_time if self._scan_start_time else 0
return {
"scan_id": self._scan_id,
"total_checkpoints": self._seq,
"event_counts": dict(self._event_counts),
"elapsed_seconds": round(elapsed, 2),
"enabled": ENABLED,
}
except Exception:
return {"error": "summary unavailable"}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅ…จๅŸŸๅฏฆไพ‹๏ผˆๅ–ฎไพ‹ๆจกๅผ๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
_project_root = Path(__file__).parent
recorder = CheckpointRecorder(logs_dir=_project_root / "logs")
def get_checkpoint_writer_status() -> dict[str, Any]:
"""ๅ›žๅ‚ณ checkpoint writer ๅพŒ็ซฏ็‹€ๆ…‹๏ผŒไพ› UI diagnostics ไฝฟ็”จใ€‚"""
rust_active = bool(getattr(recorder, "_rust_writer_active", False))
return {
"available": _RUST_WRITER_AVAILABLE,
"active": rust_active,
"preferred_backend": "rust_bufwriter",
"current_backend": "rust_bufwriter" if rust_active else "python_lock",
"fallback_backend": "python_lock",
}