| """ |
| checkpoint.py โ Pipeline ๅท่กๆชขๆฅ้ป่จ้ๅจ |
| =========================================== |
| |
| ่จญ่จๅๅ๏ผ |
| - ้ถ้ปๅก๏ผI/O ๆไฝ็กๅฏ่ฝ่ผ้๏ผappend-only JSONL๏ผ |
| - ้ถๅคฑๆ๏ผไปปไฝ่จ้้ฏ่ชค้ฝ่ขซ้้ปๅๅฌ๏ผ็ตไธๅฝฑ้ฟ Pipeline |
| - ็ตๆงๅ๏ผๆฏๆข่จ้้ฝๆฏๅฏๆฅ่ฉข็ JSON |
| - ๅท่ก็ทๅฎๅ
จ๏ผ |
| Phase 4A: ๅชๅ
ไฝฟ็จ Rust threathunter_checkpoint_writer |
| ๏ผparking_lot::Mutex + BufWriter<File>๏ผ้ซ้ ป SSE ไธ็ซถ็ญ๏ผ |
| Fallback: Python threading.Lock + TextIO๏ผๅๆๅฏฆไฝ๏ผ |
| |
| ่ผธๅบๆ ผๅผ๏ผJSONL๏ผ๏ผ |
| logs/checkpoints/scan_{id}_{timestamp}.jsonl |
| |
| ไบไปถ้กๅ๏ผ |
| SCAN_START / SCAN_END |
| STAGE_ENTER / STAGE_EXIT |
| LLM_CALL / LLM_RESULT / LLM_RETRY / LLM_ERROR |
| TOOL_CALL / HARNESS_CHECK / DEGRADATION |
| |
| ้ตๅฎ๏ผAGENTS.md + project_CONSTITUTION.md |
| """ |
|
|
| import hashlib |
| import json |
| import logging |
| import os |
| import re |
| import threading |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, TextIO |
|
|
| logger = logging.getLogger("ThreatHunter.checkpoint") |
|
|
| |
| |
| try: |
| import threathunter_checkpoint_writer as _cw |
| _RUST_WRITER_AVAILABLE = True |
| logger.info("[CHECKPOINT] Phase 4A: Rust BufWriter ๅ็จ โ") |
| except ImportError: |
| _cw = None |
| _RUST_WRITER_AVAILABLE = False |
| logger.debug("[CHECKPOINT] Phase 4A: Rust BufWriter ไธๅฏ็จ๏ผไฝฟ็จ Python fallback") |
|
|
| |
| ENABLED = os.getenv("CHECKPOINT_ENABLED", "true").lower() != "false" |
|
|
| |
| _SENSITIVE_PATTERNS = [ |
| re.compile(r"(sk(?:-proj)?-[a-zA-Z0-9\-_]{10,})", re.IGNORECASE), |
| re.compile(r"(ghp_[a-zA-Z0-9]{36,})", re.IGNORECASE), |
| re.compile(r"(api[_-]?key\s*[:=]\s*['\"]?)([^'\"\s,]{8,})", re.IGNORECASE), |
| re.compile(r"(password\s*[:=]\s*['\"]?)([^'\"\s,]{4,})", re.IGNORECASE), |
| re.compile(r"(secret\s*[:=]\s*['\"]?)([^'\"\s,]{8,})", re.IGNORECASE), |
| ] |
|
|
| |
| MAX_DATA_LENGTH = 2000 |
| MAX_THINKING_LENGTH = 1000 |
|
|
|
|
| def _redact(text: str) -> str: |
| """้ฎ็ฝฉๆๆ่ณๆ๏ผAPI Keyใๅฏ็ขผ็ญ๏ผ""" |
| if not isinstance(text, str): |
| return str(text) |
| result = text |
| for pattern in _SENSITIVE_PATTERNS: |
| result = pattern.sub(lambda m: m.group(0)[:4] + "***REDACTED***", result) |
| return result |
|
|
|
|
| def _truncate(value: Any, max_len: int = MAX_DATA_LENGTH) -> str: |
| """ๆชๆท้้ท็ๅผ""" |
| s = str(value) if not isinstance(value, str) else value |
| if len(s) > max_len: |
| return s[:max_len] + f"...[truncated, total={len(s)}]" |
| return s |
|
|
|
|
| def _safe_hash(text: str) -> str: |
| """่จ็ฎ่ผธๅ
ฅ็็ญ hash๏ผ็จๆผ่ฟฝ่นคๅไธ่ผธๅ
ฅ็ๅคๆฌกๅท่ก๏ผ""" |
| return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest()[:12] |
|
|
|
|
| class CheckpointRecorder: |
| """ |
| Pipeline ๅท่กๆชขๆฅ้ป่จ้ๅจใ |
| |
| ๆฏๆฌกๆๆๅผๅซ start_scan() ๅๅงๅไธๅ JSONL ๆชๆก๏ผ |
| ๅพ็บๆๆ checkpoint() ๅผๅซ่ฟฝๅ ไธ่ก JSONใ |
| ๆๆ็ตๆๅผๅซ end_scan() ้้ๆชๆกใ |
| |
| ๆๆๅ
ฌ้ๆนๆณ้ฝๆฏใ้้ปๆจกๅผใ๏ผ |
| - ไปปไฝๅ
ง้จ้ฏ่ชค่ขซ try-except ๆๆ |
| - ๅ
่จ้ๅฐ logger.debug๏ผไธๆๅบไพๅค |
| - Pipeline ไธปๆต็จๅฎๅ
จไธๅๅฝฑ้ฟ |
| """ |
|
|
| def __init__(self, logs_dir: Path | str | None = None): |
| if logs_dir is None: |
| logs_dir = Path(__file__).parent / "logs" |
| self._logs_dir = Path(logs_dir) |
| self._checkpoints_dir = self._logs_dir / "checkpoints" |
| self._errors_dir = self._logs_dir / "errors" |
| self._scan_id: str = "unknown" |
| self._seq: int = 0 |
| self._file: TextIO | None = None |
| self._lock = threading.Lock() |
| self._event_counts: dict[str, int] = {} |
| self._scan_start_time: float = 0.0 |
| self._current_filename: str = "" |
| |
| self._rust_writer_active: bool = False |
|
|
| @property |
| def current_filename(self) -> str: |
| """ๅๅณๆฌๆฌกๆๆ็ JSONL ๆชๅ๏ผไพ server.py Thinking Path API ไฝฟ็จ๏ผ""" |
| return self._current_filename |
|
|
| |
| |
| |
|
|
| def start_scan(self, scan_id: str) -> None: |
| """ๅๅงๅๆฐๆๆ็ checkpoint ๆชๆก""" |
| if not ENABLED: |
| return |
| try: |
| self._checkpoints_dir.mkdir(parents=True, exist_ok=True) |
| self._errors_dir.mkdir(parents=True, exist_ok=True) |
|
|
| self._scan_id = scan_id |
| self._seq = 0 |
| self._event_counts = {} |
| self._scan_start_time = time.time() |
| self._rust_writer_active = False |
|
|
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"scan_{scan_id[:8]}_{ts}.jsonl" |
| filepath = self._checkpoints_dir / filename |
| self._current_filename = filename |
|
|
| |
| if _RUST_WRITER_AVAILABLE: |
| try: |
| _cw.open_writer(str(filepath)) |
| self._rust_writer_active = True |
| logger.info( |
| "[CHECKPOINT] Phase 4A: Rust BufWriter ้ๅ: %s", filepath.name |
| ) |
| except Exception as rust_err: |
| logger.warning( |
| "[CHECKPOINT] Phase 4A: Rust BufWriter ้ๅๅคฑๆ๏ผๅ้ Python: %s", rust_err |
| ) |
| self._rust_writer_active = False |
|
|
| |
| if not self._rust_writer_active: |
| if self._file and not self._file.closed: |
| try: |
| self._file.close() |
| except Exception: |
| pass |
| self._file = open(filepath, "a", encoding="utf-8") |
|
|
| self.checkpoint("SCAN_START", "pipeline", { |
| "scan_id": scan_id, |
| "writer_backend": "rust_bufwriter" if self._rust_writer_active else "python_lock", |
| }) |
| logger.info("[CHECKPOINT] ๆๆ่จ้้ๅง: %s", filepath.name) |
|
|
| except Exception as e: |
| logger.debug("[CHECKPOINT] start_scan failed: %s", e) |
|
|
| def end_scan(self, final_status: str, total_duration: float) -> None: |
| """ๆๆ็ตๆ๏ผๅฏซๅ
ฅๆ่ฆไธฆ้้ๆชๆก""" |
| if not ENABLED: |
| return |
| try: |
| self.checkpoint("SCAN_END", "pipeline", { |
| "final_status": final_status, |
| "total_duration_seconds": round(total_duration, 2), |
| "total_checkpoints": self._seq, |
| "event_summary": dict(self._event_counts), |
| "writer_backend": "rust_bufwriter" if self._rust_writer_active else "python_lock", |
| }) |
| |
| if self._rust_writer_active and _RUST_WRITER_AVAILABLE: |
| try: |
| _cw.flush_writer() |
| _cw.close_writer() |
| logger.debug( |
| "[CHECKPOINT] Phase 4A: Rust BufWriter ๅทฒ้้๏ผๅ
ฑๅฏซๅ
ฅ %d ่ก", |
| _cw.get_lines_written(), |
| ) |
| except Exception as e: |
| logger.debug("[CHECKPOINT] Phase 4A: Rust close ๅคฑๆ: %s", e) |
| finally: |
| self._rust_writer_active = False |
| |
| if self._file and not self._file.closed: |
| self._file.close() |
| self._file = None |
| logger.info( |
| "[CHECKPOINT] ๆๆ่จ้็ตๆ: %d ๆข checkpoint | %.1fs", |
| self._seq, total_duration, |
| ) |
| except Exception as e: |
| logger.debug("[CHECKPOINT] end_scan failed: %s", e) |
|
|
| |
| |
| |
|
|
| def checkpoint(self, event: str, agent: str, data: dict | None = None) -> None: |
| """ |
| ๅฏซๅ
ฅไธๆข checkpoint ่จ้๏ผๅท่ก็ทๅฎๅ
จ๏ผใ |
| |
| Phase 4A๏ผๅชๅ
ไฝฟ็จ Rust BufWriter๏ผparking_lot::Mutex๏ผ็ก GIL ็ซถ็ญ๏ผใ |
| Fallback๏ผPython threading.Lock + TextIO๏ผๅๆๅฏฆไฝ๏ผๅฎๅ
จ็ญๆ๏ผใ |
| |
| Args: |
| event: ไบไปถ้กๅ๏ผๅฆ STAGE_ENTER, LLM_CALL ็ญ๏ผ |
| agent: Agent ๅ็จฑ๏ผๅฆ scout, security_guard๏ผ |
| data: ้ๅ ่ณๆๅญๅ
ธ |
| """ |
| if not ENABLED: |
| return |
| try: |
| with self._lock: |
| self._seq += 1 |
| self._event_counts[event] = self._event_counts.get(event, 0) + 1 |
|
|
| record = { |
| "seq": self._seq, |
| "ts": datetime.now(timezone.utc).isoformat(), |
| "scan_id": self._scan_id, |
| "event": event, |
| "agent": agent, |
| "data": self._sanitize_data(data or {}), |
| } |
|
|
| line = json.dumps(record, ensure_ascii=False, default=str) |
|
|
| |
| if self._rust_writer_active and _RUST_WRITER_AVAILABLE: |
| try: |
| _cw.write_line(line) |
| |
| if event in ( |
| "SCAN_START", |
| "SCAN_END", |
| "STAGE_ENTER", |
| "STAGE_EXIT", |
| "LLM_ERROR", |
| "DEGRADATION", |
| ): |
| _cw.flush_writer() |
| return |
| except Exception as rust_err: |
| |
| logger.warning( |
| "[CHECKPOINT] Phase 4A Rust write ๅคฑๆ๏ผๅๆ Python: %s", rust_err |
| ) |
| self._rust_writer_active = False |
|
|
| |
| if self._file and not self._file.closed: |
| self._file.write(line + "\n") |
| self._file.flush() |
|
|
| except Exception as e: |
| logger.debug("[CHECKPOINT] write failed: %s", e) |
|
|
| def _sanitize_data(self, data: dict) -> dict: |
| """ๆธ
ๆด่ณๆ๏ผๆชๆท + ้ฎ็ฝฉๆๆ่ณ่จ""" |
| sanitized = {} |
| for key, value in data.items(): |
| if isinstance(value, dict): |
| sanitized[key] = self._sanitize_data(value) |
| elif isinstance(value, (list, tuple)): |
| sanitized[key] = _truncate(str(value)) |
| elif isinstance(value, str): |
| sanitized[key] = _redact(_truncate(value)) |
| elif isinstance(value, (int, float, bool)): |
| sanitized[key] = value |
| else: |
| sanitized[key] = _truncate(str(value)) |
| return sanitized |
|
|
| |
| |
| |
|
|
| def stage_enter( |
| self, |
| agent: str, |
| input_data: Any = None, |
| skill_file: str = "", |
| input_type: str = "", |
| ) -> None: |
| """Stage ้ฒๅ
ฅ้ป checkpoint โ v3.7: ๅ ๅ
ฅ skill_file / input_type ๆฌไฝไพ Thinking Path UI ไฝฟ็จ""" |
| try: |
| data: dict[str, Any] = {} |
| |
| if skill_file: |
| data["skill_file"] = skill_file |
| if input_type: |
| data["input_type"] = input_type |
| |
| if isinstance(input_data, dict): |
| data["input_keys"] = list(input_data.keys())[:20] |
| if "tech_stack" in input_data: |
| data["tech_stack_preview"] = _truncate( |
| str(input_data["tech_stack"]), 200 |
| ) |
| if "vulnerabilities" in input_data: |
| data["vuln_count"] = len(input_data.get("vulnerabilities", [])) |
| data["input_hash"] = _safe_hash(json.dumps(input_data, default=str)) |
| elif isinstance(input_data, str): |
| data["input_preview"] = _truncate(input_data, 200) |
| data["input_hash"] = _safe_hash(input_data) |
| data["input_length"] = len(input_data) |
| self.checkpoint("STAGE_ENTER", agent, data) |
| except Exception: |
| pass |
|
|
| def stage_exit( |
| self, |
| agent: str, |
| status: str, |
| output_data: Any = None, |
| duration_ms: int = 0, |
| ) -> None: |
| """Stage ้ข้้ป checkpoint""" |
| try: |
| data: dict[str, Any] = { |
| "status": status, |
| "duration_ms": duration_ms, |
| } |
| if isinstance(output_data, dict): |
| data["output_keys"] = list(output_data.keys())[:20] |
| if "vulnerabilities" in output_data: |
| data["vuln_count"] = len(output_data.get("vulnerabilities", [])) |
| if "risk_score" in output_data: |
| data["risk_score"] = output_data["risk_score"] |
| if "verdict" in output_data: |
| data["verdict"] = output_data["verdict"] |
| if output_data.get("_degraded"): |
| data["degraded"] = True |
| if "scan_path" in output_data: |
| data["scan_path"] = output_data["scan_path"] |
| self.checkpoint("STAGE_EXIT", agent, data) |
|
|
| |
| |
| |
| if isinstance(output_data, dict) and output_data.get("_degraded"): |
| error_msg = str(output_data.get("_error", "Unknown degradation reason")) |
| strategy = f"status={status}, duration={duration_ms}ms" |
| self.checkpoint("DEGRADATION", agent, { |
| "reason": _truncate(error_msg, 400), |
| "fallback_strategy": strategy, |
| "error": _truncate(error_msg, 400), |
| "source": "stage_exit_auto", |
| }) |
| except Exception: |
| pass |
|
|
| |
| |
| |
|
|
| def llm_call( |
| self, |
| agent: str, |
| model: str, |
| provider: str = "openrouter", |
| task_preview: str = "", |
| ) -> None: |
| """LLM ๅผๅซๅ checkpoint""" |
| try: |
| self.checkpoint("LLM_CALL", agent, { |
| "model": model, |
| "provider": provider, |
| "task_preview": _truncate(task_preview, 300), |
| }) |
| except Exception: |
| pass |
|
|
| def llm_result( |
| self, |
| agent: str, |
| model: str, |
| status: str, |
| output_len: int, |
| duration_ms: int, |
| thinking: str = "", |
| ) -> None: |
| """LLM ๅผๅซๅพ checkpoint๏ผๅซๆ่้็จๆ่ฆ๏ผ""" |
| try: |
| data: dict[str, Any] = { |
| "model": model, |
| "status": status, |
| "output_length": output_len, |
| "duration_ms": duration_ms, |
| } |
| if thinking: |
| data["thinking_preview"] = _redact( |
| _truncate(thinking, MAX_THINKING_LENGTH) |
| ) |
| self.checkpoint("LLM_RESULT", agent, data) |
| except Exception: |
| pass |
|
|
| def llm_retry( |
| self, |
| agent: str, |
| model: str, |
| error: str, |
| retry_count: int, |
| next_model: str, |
| ) -> None: |
| """LLM ้่ฉฆ checkpoint""" |
| try: |
| self.checkpoint("LLM_RETRY", agent, { |
| "failed_model": model, |
| "error": _truncate(error, 300), |
| "retry_count": retry_count, |
| "next_model": next_model, |
| }) |
| except Exception: |
| pass |
|
|
| def llm_error(self, agent: str, model: str, error: str) -> None: |
| """LLM ๅคฑๆ checkpoint๏ผๅๆๅฏซๅ
ฅ error log๏ผ""" |
| try: |
| self.checkpoint("LLM_ERROR", agent, { |
| "model": model, |
| "error": _truncate(error, 500), |
| }) |
| |
| self._write_error_log(agent, model, error) |
| except Exception: |
| pass |
|
|
| def _write_error_log(self, agent: str, model: str, error: str) -> None: |
| """ๅฐ LLM ้ฏ่ชคๅฏซๅ
ฅ็จ็ซ็ error log ๆชๆก""" |
| try: |
| self._errors_dir.mkdir(parents=True, exist_ok=True) |
| ts = datetime.now().strftime("%Y%m%d") |
| error_file = self._errors_dir / f"errors_{ts}.log" |
| with open(error_file, "a", encoding="utf-8") as f: |
| now = datetime.now(timezone.utc).isoformat() |
| f.write( |
| f"[{now}] scan={self._scan_id} agent={agent} " |
| f"model={model} error={error[:300]}\n" |
| ) |
| except Exception: |
| pass |
|
|
| |
| |
| |
|
|
| def tool_call( |
| self, |
| agent: str, |
| tool_name: str, |
| tool_input: str, |
| tool_output: str, |
| status: str = "SUCCESS", |
| ) -> None: |
| """ๅทฅๅ
ทๅผๅซ checkpoint""" |
| try: |
| self.checkpoint("TOOL_CALL", agent, { |
| "tool_name": tool_name, |
| "input": _truncate(tool_input, 500), |
| "output_preview": _truncate(tool_output, 500), |
| "status": status, |
| }) |
| except Exception: |
| pass |
|
|
| def harness_check( |
| self, |
| agent: str, |
| layer: str, |
| check_name: str, |
| result: str, |
| action: str = "", |
| details: dict | None = None, |
| ) -> None: |
| """Harness ไฟ้ๅฑค่งธ็ผ checkpoint""" |
| try: |
| data: dict[str, Any] = { |
| "layer": layer, |
| "check_name": check_name, |
| "result": result, |
| } |
| if action: |
| data["corrective_action"] = action |
| if details: |
| data.update(details) |
| self.checkpoint("HARNESS_CHECK", agent, data) |
| except Exception: |
| pass |
|
|
| def degradation( |
| self, |
| agent: str, |
| reason: str, |
| fallback_strategy: str = "", |
| ) -> None: |
| """้็ด่งธ็ผ checkpoint""" |
| try: |
| self.checkpoint("DEGRADATION", agent, { |
| "reason": _truncate(reason, 300), |
| "fallback_strategy": fallback_strategy, |
| }) |
| except Exception: |
| pass |
|
|
| |
| |
| |
|
|
| def get_summary(self) -> dict: |
| """ๅๅณ็ถๅๆๆ็็ตฑ่จๆ่ฆ""" |
| try: |
| elapsed = time.time() - self._scan_start_time if self._scan_start_time else 0 |
| return { |
| "scan_id": self._scan_id, |
| "total_checkpoints": self._seq, |
| "event_counts": dict(self._event_counts), |
| "elapsed_seconds": round(elapsed, 2), |
| "enabled": ENABLED, |
| } |
| except Exception: |
| return {"error": "summary unavailable"} |
|
|
|
|
| |
| |
| |
|
|
| _project_root = Path(__file__).parent |
| recorder = CheckpointRecorder(logs_dir=_project_root / "logs") |
|
|
|
|
| def get_checkpoint_writer_status() -> dict[str, Any]: |
| """ๅๅณ checkpoint writer ๅพ็ซฏ็ๆ
๏ผไพ UI diagnostics ไฝฟ็จใ""" |
| rust_active = bool(getattr(recorder, "_rust_writer_active", False)) |
| return { |
| "available": _RUST_WRITER_AVAILABLE, |
| "active": rust_active, |
| "preferred_backend": "rust_bufwriter", |
| "current_backend": "rust_bufwriter" if rust_active else "python_lock", |
| "fallback_backend": "python_lock", |
| } |
|
|