Threat_Hunter / ui /server.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
"""
ui/server.py โ€” ThreatHunter FastAPI Backend
=============================================
ๆžถๆง‹๏ผšFastAPI + Server-Sent Events (SSE)
็ซฏ้ปž๏ผš
POST /api/scan ๆŽฅๆ”ถ tech_stack๏ผŒๅ•Ÿๅ‹• pipeline๏ผŒ่ฟ”ๅ›ž scan_id
GET /api/stream/{id} SSE ็ซฏ้ปž๏ผŒๅณๆ™‚ๆŽจ้€ agent ้€ฒๅบฆ
GET /api/result/{id} ่ฟ”ๅ›žๆœ€็ต‚ๅ ฑๅ‘Š JSON
GET /api/health ๅฅๅบทๆชขๆŸฅ
GET / ๅ›žๅ‚ณ้œๆ…‹ index.html
SSE ไบ‹ไปถ้กžๅž‹๏ผš
agent_start โ†’ { agent: str }
agent_log โ†’ { agent: str, message: str }
agent_done โ†’ { agent: str, status: str, detail: dict }
done โ†’ ๅฎŒๆ•ดๅ ฑๅ‘Š JSON
pipeline_error โ†’ { message: str }
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import queue
import sys
import threading
import uuid
import time
from pathlib import Path
from typing import Any
# โ”€โ”€ ็ขบไฟ project root ๅœจ sys.path โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_HERE = Path(__file__).parent
_ROOT = _HERE.parent
sys.path.insert(0, str(_ROOT))
from dotenv import load_dotenv
load_dotenv()
import fastapi
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
logger = logging.getLogger("ThreatHunter.server")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๆŽƒๆ็‹€ๆ…‹็ฎก็†๏ผˆ่จ˜ๆ†ถ้ซ”ไธญ๏ผŒDemo ่ถณๅค ๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# scan_id โ†’ { "queue": Queue, "result": dict|None, "error": str|None }
_scan_store: dict[str, dict[str, Any]] = {}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# FastAPI App
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
from contextlib import asynccontextmanager
@asynccontextmanager
async def _lifespan(application): # type: ignore[override]
"""Server ๅ•Ÿๅ‹•ๆ™‚๏ผš่‡ชๅ‹•ๆธ…ๆฝ” Memory ไธญ year < 2005 ็š„้ ๅค CVE ๆฑ™ๆŸ“ใ€‚"""
import sys as _sys
_sys.path.insert(0, str(_HERE.parent))
try:
from scripts.clean_memory_contamination import clean_memory_file
for _fname in ["memory/scout_memory.json", "memory/advisor_memory.json"]:
_path = str(_HERE.parent / _fname)
_r = clean_memory_file(_path)
if _r.get("status") == "CLEANED":
logger.info(
"[STARTUP] Memory cleaned: %s โ€” removed %d ancient CVEs, kept %d",
_fname, _r.get("removed", 0), _r.get("remaining", 0),
)
else:
logger.info("[STARTUP] Memory check: %s โ€” %s", _fname, _r.get("status", "OK"))
except Exception as _me:
logger.warning("[STARTUP] Memory cleanup skipped: %s", _me)
yield
app = FastAPI(
title="ThreatHunter API",
version="3.1",
description="AI ๅคš Agent ่ณ‡ๅฎ‰ๅจ่„…ๆƒ…ๅ ฑๅนณๅฐ",
lifespan=_lifespan,
)
# โ”€โ”€ ๆŽ›่ผ‰้œๆ…‹่ณ‡ๆบ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_STATIC_DIR = _HERE / "static"
app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Request / Response ๆจกๅž‹
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class ScanRequest(BaseModel):
tech_stack: str
input_type: str = "pkg" # v3.7: forwarded from frontend input-type detector
class ScanResponse(BaseModel):
scan_id: str
message: str = "Scan started"
_CANONICAL_INPUT_TYPES = {"pkg", "code", "config", "injection"}
_L0_INPUT_TYPE_MAP = {
"package_list": "pkg",
"source_code": "code",
"mixed": "code",
"config_file": "config",
"blocked": "injection",
}
def _canonical_scan_input_type(requested: str, text: str) -> str:
"""ไปฅๅพŒ็ซฏ L0 ๅตๆธฌ่ฃœๅผทๅ‰็ซฏ input_type๏ผŒ้ฟๅ… source code ่ขซ็•ถ package ๆŽƒๆใ€‚"""
req = (requested or "pkg").strip().lower()
if req not in _CANONICAL_INPUT_TYPES:
req = "pkg"
try:
from input_sanitizer import _infer_input_type
inferred = _L0_INPUT_TYPE_MAP.get(_infer_input_type(text), req)
except Exception:
inferred = req
if req == "pkg" and inferred in {"code", "config", "injection"}:
return inferred
return req
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅ ฑๅ‘Š็ต„่ฃ๏ผšๅพž Scout ่จ˜ๆ†ถ่ฃœๅ……ๆผๆดž็ดฐ็ฏ€
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _summarize_vulnerabilities(vulns: list[dict[str, Any]]) -> dict[str, int]:
"""ๆŠŠๆผๆดžๆธ…ๅ–ฎๆ•ด็†ๆˆๅ‰็ซฏๆ‘˜่ฆๆŒ‡ๆจ™ใ€‚"""
summary = {
"total": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"new": 0,
"new_since_last_scan": 0,
}
for vuln in vulns:
summary["total"] += 1
severity = str(vuln.get("severity", "LOW")).upper()
if severity == "CRITICAL":
summary["critical"] += 1
elif severity == "HIGH":
summary["high"] += 1
elif severity == "MEDIUM":
summary["medium"] += 1
else:
summary["low"] += 1
if vuln.get("is_new"):
summary["new"] += 1
summary["new_since_last_scan"] += 1
return summary
def _is_code_finding_item(item: dict[str, Any]) -> bool:
"""ๅˆคๆ–ทไธ€็ญ† UI item ๆ˜ฏๅฆๅฑฌๆ–ผ source-code findingใ€‚"""
cve_id = str(item.get("cve_id") or item.get("cwe_id") or "").upper()
package = str(item.get("package") or "").lower()
finding_id = str(item.get("finding_id") or "").upper()
item_type = str(item.get("type") or "").lower()
return (
cve_id.startswith("CWE-")
or finding_id.startswith("CODE-")
or item_type in {"code_pattern", "hardcoded_secret"}
or bool(item.get("vulnerable_snippet") or item.get("fixed_snippet"))
or package == "code finding"
)
def _summarize_code_patterns(patterns: list[dict[str, Any]]) -> dict[str, int]:
"""ๆŠŠ Security Guard code patterns ๆ•ด็†ๆˆ Code Scan ๆ‘˜่ฆใ€‚"""
summary = {
"total": len(patterns),
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"cwe_categories": 0,
"hardcoded_secrets": 0,
}
cwe_ids: set[str] = set()
for pattern in patterns:
severity = str(pattern.get("severity", "LOW")).upper()
if severity == "CRITICAL":
summary["critical"] += 1
elif severity == "HIGH":
summary["high"] += 1
elif severity == "MEDIUM":
summary["medium"] += 1
else:
summary["low"] += 1
cwe_id = str(pattern.get("cwe_id") or pattern.get("cve_id") or "").upper()
if cwe_id.startswith("CWE-"):
cwe_ids.add(cwe_id)
pattern_type = str(pattern.get("pattern_type") or "").upper()
if cwe_id == "CWE-798" or pattern_type == "HARDCODED_SECRET":
summary["hardcoded_secrets"] += 1
summary["cwe_categories"] = len(cwe_ids)
return summary
def _attach_split_summaries(result: dict[str, Any], vulns: list[dict[str, Any]]) -> None:
"""ๆไพ› UI package/code ๅ…ฉๅผตไธปๅกๆ‰€้œ€ๆ‘˜่ฆใ€‚"""
code_patterns = list(result.get("code_patterns_summary") or [])
package_vulns = [v for v in vulns if not _is_code_finding_item(v)]
result["package_vulnerability_summary"] = _summarize_vulnerabilities(package_vulns)
result["code_vulnerability_summary"] = _summarize_code_patterns(code_patterns)
def _extract_action_vulnerabilities(result: dict[str, Any]) -> list[dict[str, Any]]:
"""็•ถ pipeline ๆฒ’ๅธถๆผๆดžๆ˜Ž็ดฐๆ™‚๏ผŒๅพž Advisor actions ๅปบ็ซ‹ๆœ€ๅฐๅ‚™ๆด่ณ‡ๆ–™ใ€‚"""
vulns: list[dict[str, Any]] = []
seen: set[str] = set()
actions = result.get("actions", {})
for level in ["urgent", "important", "resolved"]:
for item in actions.get(level, []):
cve_id = item.get("cve_id", "")
if cve_id and cve_id not in seen:
seen.add(cve_id)
vulns.append({
"cve_id": cve_id,
"package": item.get("package") or "Package not provided",
"cvss_score": item.get("cvss_score", 0),
"severity": item.get("severity", "MEDIUM"),
"description": item.get("action", ""),
"is_new": item.get("is_new", False),
"source": "ADVISOR_ACTIONS",
"report_level": level.upper(),
})
return vulns
def _enrich_result(result: dict[str, Any]) -> dict[str, Any]:
"""
ๅ„ชๅ…ˆไฝฟ็”จๆœฌๆฌก scan ็š„ๆผๆดžๆ˜Ž็ดฐ๏ผŒๅชๆœ‰็ผบ่ณ‡ๆ–™ๆ™‚ๆ‰ๅ›ž้€€ๅˆฐ memory/actionsใ€‚
"""
if "vulnerability_detail" in result:
vulns = list(result.get("vulnerability_detail") or [])
result["vulnerability_summary"] = _summarize_vulnerabilities(vulns)
_attach_split_summaries(result, vulns)
sources = result.setdefault("report_sources", {})
sources.setdefault("vulnerability_detail", "pipeline_result")
result["vulnerability_detail"] = vulns
return result
scout_path = _ROOT / "memory" / "scout_memory.json"
vulns: list[dict[str, Any]] = []
action_vulns = _extract_action_vulnerabilities(result)
if action_vulns:
vulns = action_vulns
elif scout_path.exists():
try:
with open(scout_path, encoding="utf-8") as f:
raw = f.read().strip()
if raw:
scout_data = json.loads(raw)
# scout_memory ๅฏ่ƒฝ็›ดๆŽฅๆ˜ฏ {"data": {...}} ๆ ผๅผ
if isinstance(scout_data, dict):
inner = scout_data.get("data") or scout_data
if isinstance(inner, str):
inner = json.loads(inner)
vulns = inner.get("vulnerabilities", [])
except Exception as exc:
logger.warning("[ENRICH] Cannot read scout_memory: %s", exc)
if not vulns:
vulns = action_vulns
# โ”€โ”€ UI ๆœ€ๅพŒ้˜ฒ็ทš๏ผšCVE ๅนดไปฝ้Žๆฟพ๏ผˆyear < 2005 ไธ้กฏ็คบๅœจ UI๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ็„ก่ซ–ๅ“ชๅ€‹ Agent ็”ข็”Ÿไบ†่ˆŠ CVE๏ผŒๅœจๅ‘ˆ็พ็ตฆๅ‰็ซฏๅ‰ไธ€ๅพ‹้Žๆฟพ
CVE_YEAR_MIN_UI = 2005
ancient_in_ui = []
fresh_ui_vulns = []
for v in vulns:
cve_id = v.get("cve_id", "")
if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"):
fresh_ui_vulns.append(v)
continue
try:
yr = int(cve_id.split("-")[1])
if yr < CVE_YEAR_MIN_UI:
ancient_in_ui.append(cve_id)
logger.warning("[UI FILTER] Ancient CVE hidden from UI (year=%d): %s", yr, cve_id)
else:
fresh_ui_vulns.append(v)
except (IndexError, ValueError):
fresh_ui_vulns.append(v)
if ancient_in_ui:
logger.warning("[UI FILTER] Total ancient CVEs removed from UI: %d โ€” %s", len(ancient_in_ui), ancient_in_ui)
vulns = fresh_ui_vulns
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
result["vulnerability_detail"] = vulns
result["vulnerability_summary"] = _summarize_vulnerabilities(vulns)
_attach_split_summaries(result, vulns)
result["report_sources"] = {
"vulnerability_detail": "memory_or_actions_fallback",
"fallbacks": ["memory_or_actions"],
"layer1_state": "not_reported",
}
return result
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Pipeline Worker๏ผˆๅœจ่ƒŒๆ™ฏๅŸท่กŒ็ท’๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _pipeline_worker(scan_id: str, tech_stack: str, input_type: str = "pkg") -> None:
"""
ๅœจ็จ็ซ‹ๅŸท่กŒ็ท’ไธญๅŸท่กŒๅฎŒๆ•ด Pipelineใ€‚
้€้Ž Queue ๆŽจ้€ SSE ไบ‹ไปถ็ตฆไธปๅŸท่กŒ็ท’ใ€‚
v3.7: ๆŽฅๅ— input_type ๅƒๆ•ธ๏ผŒๅ‚ณ็ตฆ run_pipeline ๅš Path-Aware Skills ่ทฏ็”ฑใ€‚
"""
store = _scan_store[scan_id]
q: queue.Queue = store["queue"]
def emit(event_type: str, data: dict) -> None:
q.put((event_type, data))
try:
from main import run_pipeline_with_callback
def on_progress(agent: str, status: str, detail: dict) -> None:
"""main.py ๅ‘ผๅซ็š„ callback๏ผŒ่ฝ‰ๆ›็‚บ SSE ไบ‹ไปถ"""
if status == "RUNNING":
emit("agent_start", {"agent": agent})
elif status == "COMPLETE":
agent_status = detail.get("status", "SUCCESS")
# ่ฎ–ๅˆฅ DEGRADED ็‹€ๆ…‹๏ผš_degraded=True ๆˆ– status=="DEGRADED" ๅ‡่งธ็™ผ
is_degraded = (
detail.get("_degraded", False)
or str(agent_status).upper() == "DEGRADED"
)
if is_degraded:
agent_status = "DEGRADED"
# ๆๅ–้Œฏ่ชคๅŽŸๅ› ๏ผˆไพ›ๅ‰็ซฏ้กฏ็คบ๏ผ‰
error_msg = detail.get("_error", "")
if error_msg:
# ๆˆช็Ÿญ่‡ณ 200 ๅญ—ๅ…ƒ๏ผŒ็กฎไฟ SSE JSON ไธ็‹€
error_msg = str(error_msg)[:200]
emit("agent_done", {
"agent": agent,
"status": agent_status,
"detail": detail,
"error_msg": error_msg,
})
elif status == "LOG":
# ้ƒจๅˆ† stage ๆœƒ็™ผ้€ไธญ้–“ๆ—ฅ่ชŒ
emit("agent_log", {"agent": agent, "message": str(detail)})
logger.info("[SCAN:%s] Pipeline start | tech_stack=%s | input_type=%s", scan_id, tech_stack, input_type)
result = run_pipeline_with_callback(tech_stack, on_progress, input_type=input_type)
# โ”€โ”€ ็ต„่ฃๅฎŒๆ•ดๅ ฑๅ‘Š๏ผšๅพž scout_memory.json ่ฎ€ๅ–ๆผๆดž่ณ‡ๆ–™ โ”€โ”€
result = _enrich_result(result)
store["result"] = result
emit("done", result)
logger.info("[SCAN:%s] Pipeline DONE | risk=%s", scan_id, result.get("risk_score"))
except Exception as exc:
err_msg = str(exc)
logger.error("[SCAN:%s] Pipeline ERROR: %s", scan_id, err_msg)
store["error"] = err_msg
emit("pipeline_error", {"message": err_msg})
finally:
# v3.6: ๅญ˜ๅ„ฒ checkpoint ๆช”ๅ๏ผŒไพ› Thinking Path API ๆŸฅ่ฉข
try:
from checkpoint import recorder
if recorder.current_filename:
store["checkpoint_file"] = recorder.current_filename
logger.info("[SCAN:%s] Checkpoint file: %s", scan_id, recorder.current_filename)
except Exception as ex:
logger.debug("[SCAN:%s] Cannot retrieve checkpoint filename: %s", scan_id, ex)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# SSE Generator
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
async def _sse_generator(scan_id: str):
"""
็•ฐๆญฅ SSE ็”Ÿๆˆๅ™จ๏ผš
- ๅพž Queue ๆ‹‰ๅ–ไบ‹ไปถ๏ผˆ็”ฑ pipeline worker ๆŽจๅ…ฅ๏ผ‰
- ๆ ผๅผๅŒ–็‚บ SSE ่ฆๆ ผ๏ผˆevent: xxx\\ndata: ...\\n\\n๏ผ‰
- ็›ดๅˆฐ done / pipeline_error ๆˆ– timeout (15min)
"""
if scan_id not in _scan_store:
yield _sse_fmt("pipeline_error", {"message": f"scan_id {scan_id} not found"})
return
store = _scan_store[scan_id]
q: queue.Queue = store["queue"]
# ้€ๅ‡บๅฟƒ่ทณ๏ผŒ็ขบ่ช้€ฃ็ทšๆˆๅŠŸ
yield _sse_fmt("connected", {"scan_id": scan_id})
deadline = time.time() + 900 # 15 min max
terminal_events = {"done", "pipeline_error"}
while time.time() < deadline:
try:
event_type, data = q.get(timeout=0.3)
except queue.Empty:
# ๅฟƒ่ทณ๏ผŒไฟๆŒ้€ฃ็ทš
yield ": ping\n\n"
await asyncio.sleep(0)
continue
yield _sse_fmt(event_type, data)
# ็ต‚ๆญขๆขไปถ
if event_type in terminal_events:
break
await asyncio.sleep(0) # yield to event loop
# ๆธ…็†๏ผˆๅฏ้ธ๏ผšๅปถๅพŒ 5 ๅˆ†้˜ๅ†ๅˆช๏ผŒ่ฎ“ /api/result ้‚„่ƒฝๅ–ๅˆฐ๏ผ‰
asyncio.get_event_loop().call_later(300, lambda: _scan_store.pop(scan_id, None))
def _sse_fmt(event: str, data: Any) -> str:
"""ๆ ผๅผๅŒ–็‚บๆจ™ๆบ– SSE ๅญ—ไธฒ"""
payload = json.dumps(data, ensure_ascii=False)
return f"event: {event}\ndata: {payload}\n\n"
def _bool_env(name: str, default: str = "false") -> bool:
return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"}
def _build_runtime_capabilities() -> dict[str, Any]:
"""ๅฝ™ๆ•ด Rust / Sandbox ๅฏ็”จ็‹€ๆ…‹๏ผŒ่ฎ“ไธป dashboard ๅฏไปฅ็›ดๆŽฅ้กฏ็คบใ€‚"""
try:
from checkpoint import get_checkpoint_writer_status
checkpoint_writer = get_checkpoint_writer_status()
except Exception as exc: # noqa: BLE001
checkpoint_writer = {
"available": False,
"active": False,
"preferred_backend": "rust_bufwriter",
"current_backend": "python_lock",
"fallback_backend": "python_lock",
"error": str(exc),
}
try:
import input_sanitizer as _input_sanitizer
wasm_enabled = bool(getattr(_input_sanitizer, "_WASM_ENABLED", False))
wasm_available = bool(getattr(_input_sanitizer, "_WASM_AVAILABLE", False))
wasm_error = ""
except Exception as exc: # noqa: BLE001
wasm_enabled = _bool_env("WASM_SANDBOX_ENABLED", "true")
wasm_available = False
wasm_error = str(exc)
docker_enabled = _bool_env("SANDBOX_ENABLED", "true")
docker_available = False
docker_image_ready = False
docker_error = ""
try:
from sandbox.docker_sandbox import SANDBOX_IMAGE, is_docker_available, is_sandbox_image_ready
docker_available = is_docker_available()
docker_image_ready = is_sandbox_image_ready() if docker_available else False
except Exception as exc: # noqa: BLE001
SANDBOX_IMAGE = os.getenv("SANDBOX_IMAGE", "threathunter-sandbox:latest")
docker_error = str(exc)
docker_status = "disabled"
if docker_enabled and docker_available and docker_image_ready:
docker_status = "enabled"
elif docker_enabled:
docker_status = "not_ready"
modules = {}
for key, module_name in {
"memory_sanitizer": "sandbox.memory_sanitizer",
"ast_guard": "sandbox.ast_guard",
}.items():
try:
__import__(module_name)
modules[key] = {"active": True, "module": module_name}
except Exception as exc: # noqa: BLE001
modules[key] = {"active": False, "module": module_name, "error": str(exc)}
return {
"status": "ok",
"defaults": {
"sandbox_enabled": docker_enabled,
"wasm_sandbox_enabled": wasm_enabled,
},
"checkpoint_writer": checkpoint_writer,
"wasm_prompt_sandbox": {
"enabled": wasm_enabled,
"available": wasm_available,
"status": "enabled" if wasm_enabled and wasm_available else ("fallback" if wasm_enabled else "disabled"),
"fallback": "python_l0_filter",
"error": wasm_error,
},
"docker_sandbox": {
"enabled": docker_enabled,
"available": docker_available,
"image_ready": docker_image_ready,
"image": SANDBOX_IMAGE,
"status": docker_status,
"fallback": "in_process_pipeline",
"error": docker_error,
},
"memory_sanitizer": modules["memory_sanitizer"],
"ast_guard": modules["ast_guard"],
}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# API Endpoints
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
@app.get("/", response_class=HTMLResponse)
async def serve_index():
"""ๅ›žๅ‚ณไธป้  HTML"""
index_path = _STATIC_DIR / "index.html"
if not index_path.exists():
raise HTTPException(status_code=404, detail="index.html not found")
return HTMLResponse(content=index_path.read_text(encoding="utf-8"))
@app.get("/api/health")
async def health():
"""ๅฅๅบทๆชขๆŸฅ็ซฏ้ปž"""
return JSONResponse({
"status": "ok",
"pipeline_version": "4.0",
"active_scans": len(_scan_store),
})
@app.get("/api/system-info")
async def system_info():
"""็ณป็ตฑ่ณ‡่จŠ็ซฏ้ปž๏ผšๅ›žๅ‚ณ็•ถๅ‰ GPUใ€LLM ๆจกๅž‹ใ€้™็ดš็‹€ๆ…‹ไพ› UI ้กฏ็คบใ€‚"""
import config as _cfg
# ๅˆคๆ–ท GPU ้กžๅž‹
provider = _cfg.LLM_PROVIDER
if provider in {"amd", "vllm"}:
gpu_label = "AMD Instinct (ROCm)"
gpu_status = "connected" if _cfg._is_configured(_cfg.AMD_LLM_BASE_URL) else "not_configured"
elif provider == "openrouter":
gpu_label = "OpenRouter Cloud"
gpu_status = "connected" if _cfg._is_configured(_cfg.OPENROUTER_API_KEY) else "not_configured"
else:
gpu_label = f"{provider.upper()} API"
gpu_status = "connected" if _cfg._direct_provider_is_configured(provider) else "not_configured"
# ๅ–ๅพ—็•ถๅ‰ๅฏฆ้š›ไฝฟ็”จ็š„ๆจกๅž‹๏ผˆๅ˜—่ฉฆๅพž provider chain ๅ–็ฌฌไธ€ๅ€‹ๅฏ็”จ็š„๏ผ‰
chain = _cfg._build_provider_chain()
active_model = "No model available"
active_provider_label = "None"
if chain:
active_provider_label, active_config = chain[0]
active_model = active_config.get("model", "unknown")
# ็งป้™ค LiteLLM routing prefix๏ผˆๅฆ‚ openai/๏ผ‰๏ผŒUI ๅช้กฏ็คบไนพๆทจ็š„ๆจกๅž‹ๅ
if active_model.startswith("openai/"):
active_model = active_model[len("openai/"):]
# ้™็ดš็‹€ๆ…‹
deg = _cfg.degradation_status.to_dict()
# ๆจกๅž‹็ตฑ่จˆ
stats_report = _cfg.model_stats.get_report()
return JSONResponse({
"provider": provider,
"gpu_label": gpu_label,
"gpu_status": gpu_status,
"active_model": active_model,
"active_provider_label": active_provider_label,
"base_url": _cfg.AMD_LLM_BASE_URL or _cfg.VLLM_BASE_URL or "",
"max_tokens": _cfg.AMD_LLM_MAX_TOKENS,
"degradation": deg,
"waterfall_depth": len(chain),
"model_stats": stats_report,
})
@app.get("/api/runtime-capabilities")
async def runtime_capabilities():
"""ไธป dashboard Runtime Protection panel ไฝฟ็”จ็š„่ƒฝๅŠ›็‹€ๆ…‹ใ€‚"""
return JSONResponse(_build_runtime_capabilities())
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Checkpoint Dashboard API
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
@app.get("/checkpoints", response_class=HTMLResponse)
async def serve_checkpoint_dashboard():
"""ๅ›žๅ‚ณ Checkpoint Dashboard ้ ้ข"""
cp_path = _STATIC_DIR / "checkpoint.html"
if not cp_path.exists():
raise HTTPException(status_code=404, detail="checkpoint.html not found")
return HTMLResponse(content=cp_path.read_text(encoding="utf-8"))
@app.get("/api/checkpoints")
async def list_checkpoint_files():
"""ๅˆ—ๅ‡บๆ‰€ๆœ‰ checkpoint JSONL ๆช”ๆกˆ๏ผˆๅซๅคงๅฐใ€ไฟฎๆ”นๆ™‚้–“ใ€ๆ่ฟฐๆ€งๆจ™็ฑค๏ผ‰"""
cp_dir = _ROOT / "logs" / "checkpoints"
files = []
if cp_dir.exists():
for f in sorted(cp_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True):
stat = f.stat()
# ่งฃๆžๅ‰ๅนพ่กŒ๏ผŒๆๅ–ๆ่ฟฐๆ€งๆจ™็ฑค
label = _extract_scan_label(f)
files.append({
"name": f.name,
"size": stat.st_size,
"modified": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(stat.st_mtime)),
"label": label,
})
return JSONResponse({"files": files, "total": len(files)})
@app.get("/api/checkpoints/latest")
async def get_latest_checkpoint():
"""ๅ›žๅ‚ณๆœ€ๆ–ฐไธ€ๅ€‹ checkpoint JSONL ๆช”ๆกˆ็š„่ณ‡่จŠ๏ผˆไพ›ๅ‰็ซฏ่‡ชๅ‹•่ทณ่ฝ‰๏ผ‰"""
cp_dir = _ROOT / "logs" / "checkpoints"
if not cp_dir.exists():
return JSONResponse({"latest": None})
files = sorted(cp_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
if not files:
return JSONResponse({"latest": None})
f = files[0]
label = _extract_scan_label(f)
stat = f.stat()
return JSONResponse({
"latest": {
"name": f.name,
"size": stat.st_size,
"modified": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(stat.st_mtime)),
"label": label,
}
})
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Thinking Path API๏ผˆv3.6 Observability)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅ…ถไป– Agent ็š„่ง’่‰ฒๆ่ฟฐ
_AGENT_META: dict[str, dict] = {
"pipeline": {"role": "Pipeline ็ฎก็†", "skill": None},
"input_sanitizer": {"role": "L0 ่ผธๅ…ฅๆทจๅŒ–", "skill": "security_guard.md"},
"orchestrator": {"role": "ๅ‹•ๆ…‹่ทฏ็”ฑๆฑบ็ญ–", "skill": "orchestrator.md"},
"security_guard": {"role": "Code weakness discovery", "skill": "security_guard.md"},
"scout": {"role": "Package CVE discovery", "skill": "scout.md"},
"layer1_parallel": {"role": "Discovery parallel layer", "skill": None},
"intel_fusion": {"role": "Risk fusion and priority", "skill": "intel_fusion.md"},
"analyst": {"role": "ๆผๆดž้€ฃ้Ž–ๅˆ†ๆž", "skill": "analyst.md"},
"critic": {"role": "ColMAD ่พฉ่ซ–", "skill": "critic.md"},
"advisor": {"role": "่กŒๅ‹•ๅ ฑๅ‘Š็”Ÿๆˆ", "skill": "advisor.md"},
"feedback_loop": {"role": "ๅ›ž้ˆ่ฟด่ทฏ", "skill": None},
}
def _build_thinking_path(cp_file: Path) -> dict:
"""
่ฎ€ๅ– JSONL checkpoint ๆช”ๆกˆ๏ผŒๅฐ‡ไบ‹ไปถไพ Agent ๅˆ†็ต„ใ€‚
ๅฐๆฏๅ€‹ Agent ่จˆ็ฎ—๏ผš
- skill_applied: ๆ˜ฏๅฆๆœ‰ LLM_RESULT ไธ” status=SUCCESS
- ๆ‰€ๆœ‰ๆ–นๅผไบ‹ไปถ๏ผˆLLM_CALL/LLM_RESULT/TOOL_CALL/STAGE_ENTER/STAGE_EXIT/HARNESS_CHECK/DEGRADATION๏ผ‰
"""
# ๅฑ•็คบ็ตฆไฝฟ็”จ่€…็œ‹็š„ไบ‹ไปถ้กžๅž‹๏ผˆๆŽ’้™คไธ็›ธ้—œ็š„๏ผ‰
DISPLAY_EVENTS = {
"LLM_CALL", "LLM_RESULT", "LLM_RETRY", "LLM_ERROR",
"TOOL_CALL", "STAGE_ENTER", "STAGE_EXIT",
"HARNESS_CHECK", "DEGRADATION",
}
agents: dict[str, dict] = {}
scan_meta: dict = {}
try:
with open(cp_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
evt = json.loads(line)
except json.JSONDecodeError:
continue
event_type = evt.get("event", "")
agent = evt.get("agent", "pipeline")
ts = evt.get("ts", "")
data = evt.get("data", {})
seq = evt.get("seq", 0)
# ๆๅ–ๆŽƒๆๅ…ƒ่ณ‡ๆ–™
if event_type == "SCAN_START":
scan_meta["scan_id"] = data.get("scan_id", "")
scan_meta["start_ts"] = ts
elif event_type == "SCAN_END":
scan_meta["end_ts"] = ts
scan_meta["duration_seconds"] = data.get("total_duration_seconds", 0)
scan_meta["total_events"] = data.get("total_checkpoints", seq)
scan_meta["event_summary"] = data.get("event_summary", {})
# ๅฐ‡ๅฑ•็คบไบ‹ไปถๅŠ ๅ…ฅๅฐๆ‡‰็š„ Agent
if event_type in DISPLAY_EVENTS:
if agent not in agents:
meta = _AGENT_META.get(agent, {"role": agent, "skill": None})
agents[agent] = {
"role": meta["role"],
"skill_name": meta["skill"],
"skill_file": None, # v3.7: raw .md filename from checkpoint
"input_type": None, # v3.7: pkg/code/injection/config
"skill_applied": False,
"llm_calls": 0,
"tool_calls": 0,
"total_duration_ms": 0,
"steps": [],
"agent_record": {
"input": None,
"output": None,
"tool_calls": [],
"llm_calls": [],
"status": "RUNNING",
"duration_ms": 0,
"skill_file": None,
"input_type": None,
"degraded": False,
"degradation_reason": "",
},
}
step = {"seq": seq, "event": event_type, "ts": ts, "data": data}
agents[agent]["steps"].append(step)
record = agents[agent]["agent_record"]
if event_type in {"LLM_CALL", "LLM_RESULT", "LLM_RETRY", "LLM_ERROR"}:
record["llm_calls"].append({"seq": seq, "event": event_type, "ts": ts, "data": data})
elif event_type == "TOOL_CALL":
record["tool_calls"].append({"seq": seq, "event": event_type, "ts": ts, "data": data})
# v3.7: extract skill_file + input_type from STAGE_ENTER
if event_type == "STAGE_ENTER":
sf = data.get("skill_file", "")
if sf:
agents[agent]["skill_file"] = sf # NEW: raw filename for badge
agents[agent]["skill_name"] = sf # legacy compat
agents[agent]["skill_applied"] = True
agents[agent]["agent_record"]["skill_file"] = sf
it = data.get("input_type", "")
if it:
agents[agent]["input_type"] = it
agents[agent]["agent_record"]["input_type"] = it
agents[agent]["agent_record"]["input"] = data
# ๆทฑๅŒ–็ตฑ่จˆ
if event_type == "LLM_CALL":
agents[agent]["llm_calls"] += 1
elif event_type == "LLM_RESULT":
if data.get("status") == "SUCCESS":
agents[agent]["skill_applied"] = True
agents[agent]["total_duration_ms"] += data.get("duration_ms", 0)
elif event_type == "TOOL_CALL":
agents[agent]["tool_calls"] += 1
elif event_type == "STAGE_EXIT":
record = agents[agent]["agent_record"]
record["output"] = data
record["status"] = data.get("status", record["status"])
record["duration_ms"] = data.get("duration_ms", record["duration_ms"])
if record["status"] not in {"SUCCESS", "COMPLETE", "COMPLETED"}:
record["degraded"] = True
record["degradation_reason"] = data.get("error") or data.get("reason") or record["status"]
elif event_type == "LLM_ERROR":
record = agents[agent]["agent_record"]
record["degraded"] = True
record["degradation_reason"] = data.get("error", "LLM error")
elif event_type == "DEGRADATION":
# v3.7: DEGRADATION means skill was NOT properly applied
agents[agent]["skill_applied"] = False
record = agents[agent]["agent_record"]
record["degraded"] = True
record["degradation_reason"] = data.get("reason") or data.get("error") or "Degraded"
except Exception as e:
logger.warning("[THINKING] ่ฎ€ๅ– checkpoint ๅคฑๆ•—: %s", e)
# ๆŒ‰็…ง Agent ้ †ๅบๆŽ’ๅˆ—๏ผˆไธป่ฆ Pipeline ้ †ๅบ๏ผ‰
order = ["input_sanitizer", "orchestrator", "security_guard", "scout",
"layer1_parallel", "intel_fusion", "analyst", "critic", "advisor", "feedback_loop"]
ordered_agents = {}
for a in order:
if a in agents:
ordered_agents[a] = agents[a]
# ๅŠ ๅ…ฅๅ…ถไป–ๆœชๅœจ้ ๆœŸ้ †ๅบไธญ็š„ Agent
for a, v in agents.items():
if a not in ordered_agents:
ordered_agents[a] = v
return {"scan_meta": scan_meta, "agents": ordered_agents}
@app.get("/api/thinking/{scan_id}")
async def get_thinking_path(scan_id: str):
"""
v3.6 Thinking Path API
ๅ›žๅ‚ณๆŒ‡ๅฎšๆŽƒๆ็š„ๅฎŒๆ•ดๆ€่€ƒ่ปŒ่ทก๏ผš
- ไพ Agent ๅˆ†็ต„็š„ LLM ๅ‘ผๅซ / Tool ๅ‘ผๅญ / Stage ๅฑ•ๅ…ไบ‹ไปถ
- ๆฏๅ€‹ Agent ็š„ skill_applied ็‹€ๆ…‹
่ณ‡ๆ–™ไพ†ๆบ๏ผš_scan_store[scan_id]["checkpoint_file"] ่จ˜้Œ„็š„ JSONL
Graceful Degradation๏ผšๅฐ‹ๆ‰พๆœ€ๆ–ฐ็š„ checkpoint ๆช”ไธฆๅ›žๅ‚ณ
"""
cp_dir = _ROOT / "logs" / "checkpoints"
cp_file: Path | None = None
# ๅ„ชๅ…ˆๅพž _scan_store ๅ–ๅฐๆ‡‰ๆช”ๅ
store = _scan_store.get(scan_id)
if store and store.get("checkpoint_file"):
candidate = cp_dir / store["checkpoint_file"]
if candidate.exists():
cp_file = candidate
# Fallback๏ผšๅพž scan_id ๆจก็ณŠๆฏ”ๅฐ
if cp_file is None and cp_dir.exists():
# scan_id ๆ ผๅผ๏ผšpipe_{timestamp_int}๏ผŒๆช”ๅๆ ผๅผ๏ผšscan_pipe_{8chars}_{timestamp}.jsonl
short_id = scan_id[:8] if len(scan_id) >= 8 else scan_id
candidates = sorted(
[f for f in cp_dir.glob(f"scan_{short_id}*.jsonl")],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if candidates:
cp_file = candidates[0]
# Fallback ๆœ€ๆ–ฐ JSONL
if cp_file is None and cp_dir.exists():
all_files = sorted(cp_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
if all_files:
cp_file = all_files[0]
logger.warning("[THINKING] scan_id=%s ๆ‰พไธๅˆฐๅฐๆ‡‰ checkpoint๏ผŒไฝฟ็”จๆœ€ๆ–ฐ: %s", scan_id, cp_file.name)
if cp_file is None:
raise HTTPException(status_code=404, detail="ๅฐš็„ก checkpoint ๆช”ๆกˆ")
thinking_data = _build_thinking_path(cp_file)
thinking_data["scan_id"] = scan_id
thinking_data["checkpoint_file"] = cp_file.name
return JSONResponse(thinking_data)
def _extract_scan_label(filepath: Path) -> str:
"""
ๅพž JSONL ๆช”ๆกˆๅ‰ 10 ่กŒๆๅ–ๆ่ฟฐๆ€งๆŽƒๆๆจ™็ฑคใ€‚
ๅฐ‹ๆ‰พ้ †ๅบ๏ผš
1. STAGE_ENTER(orchestrator) ็š„ tech_stack_preview โ†’ ๅ–ๅ‰ 60 ๅญ—ๅ…ƒ
2. SCAN_END ็š„ final_status + duration
3. SCAN_START ็š„ scan_id
ๅ›žๅ‚ณๆ ผๅผ็คบไพ‹๏ผšใ€ŒFlask CRUD + sqlite3 | Path B | 2.2mใ€
"""
try:
target_preview = ""
scan_path = ""
duration = ""
event_count = 0
with open(filepath, "r", encoding="utf-8") as fh:
for i, line in enumerate(fh):
if i > 30:
break # ๅช็œ‹ๅ‰ 30 ่กŒ
line = line.strip()
if not line:
continue
try:
evt = json.loads(line)
except json.JSONDecodeError:
continue
event_count += 1
event_type = evt.get("event", "")
data = evt.get("data", {})
# ๆๅ–ๆŽƒๆ็›ฎๆจ™ๆ่ฟฐ
if event_type == "STAGE_ENTER" and evt.get("agent") == "orchestrator":
raw = data.get("tech_stack_preview", "")
if raw:
# ๅŒๆ™‚่™•็† real newline (\n) ๅ’Œ escaped \\n
lines = raw.replace("\\n", "\n").split("\n")
# ๅ–็ฌฌไธ€่กŒๆœ‰ๆ„็พฉ็š„ๅ…งๅฎน๏ผˆ่ทณ้Ž่จป่งฃ่กŒๅ’Œ็ฉบ่กŒ๏ผ‰
for text_line in lines:
text_line = text_line.strip()
if text_line and not text_line.startswith("#") and not text_line.startswith("//"):
target_preview = text_line[:60]
break
if not target_preview:
# ่‹ฅ้ƒฝๆ˜ฏ่จป่งฃ๏ผŒๅ–็ฌฌไธ€่กŒไธฆๅŽปๆމ่จป่งฃๅ‰็ถด
first = lines[0].strip() if lines else ""
first = first.lstrip("#/ ").strip()
target_preview = first[:60] if first else ""
# ๆๅ–ๆŽƒๆ่ทฏๅพ‘
if event_type == "STAGE_EXIT" and evt.get("agent") == "orchestrator":
scan_path = data.get("scan_path", "")
# ๆๅ–ๆŒ็บŒๆ™‚้–“
if event_type == "SCAN_END":
dur_s = data.get("total_duration_seconds", 0)
if dur_s:
duration = f"{dur_s / 60:.1f}m" if dur_s >= 60 else f"{dur_s:.0f}s"
event_count = data.get("total_checkpoints", event_count)
# ็ต„ๅˆๆจ™็ฑค๏ผˆ็ขบไฟ็„กๆ›่กŒ๏ผ‰
parts = []
if target_preview:
# ๆธ…้™คๆ›่กŒๅ’Œๅคš้ค˜็ฉบๆ ผ
clean = target_preview.replace("\n", " ").replace("\r", "").strip()
if clean:
parts.append(clean)
if scan_path:
parts.append(f"Path {scan_path}")
if duration:
parts.append(duration)
if event_count:
parts.append(f"{event_count} events")
return " | ".join(parts) if parts else filepath.stem
except Exception:
return filepath.stem
@app.get("/api/checkpoints/{filename}")
async def get_checkpoint_events(filename: str):
"""่ฎ€ๅ–ๆŒ‡ๅฎš JSONL ๆช”ๆกˆ็š„ๅ…จ้ƒจไบ‹ไปถ๏ผˆไพ› Dashboard ๆธฒๆŸ“๏ผ‰"""
# ๅฎ‰ๅ…จๆ€ง๏ผšๅชๅ…่จฑ่ฎ€ๅ– checkpoints ็›ฎ้Œ„ไธ‹็š„ .jsonl ๆช”ๆกˆ
if not filename.endswith(".jsonl") or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
cp_file = _ROOT / "logs" / "checkpoints" / filename
if not cp_file.exists():
raise HTTPException(status_code=404, detail=f"File not found: {filename}")
events = []
try:
with open(cp_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
pass # ๅฟฝ็•ฅๆๅฃž็š„่กŒ
except Exception as e:
raise HTTPException(status_code=500, detail=f"Read error: {e}")
return JSONResponse({"filename": filename, "events": events, "total": len(events)})
@app.post("/api/scan", response_model=ScanResponse)
async def start_scan(req: ScanRequest):
"""่งธ็™ผๆŽƒๆ๏ผšๅปบ็ซ‹ scan_id๏ผŒๅ•Ÿๅ‹•่ƒŒๆ™ฏๅŸท่กŒ็ท’"""
tech_stack = req.tech_stack.strip()
if not tech_stack:
raise HTTPException(status_code=422, detail="tech_stack cannot be empty")
input_type = _canonical_scan_input_type(req.input_type, tech_stack)
scan_id = str(uuid.uuid4())[:8]
# ๅˆๅง‹ๅŒ– store
_scan_store[scan_id] = {
"queue": queue.Queue(),
"result": None,
"error": None,
"tech_stack": tech_stack,
"input_type": input_type,
}
# ๅ•Ÿๅ‹•่ƒŒๆ™ฏๅŸท่กŒ็ท’
t = threading.Thread(
target=_pipeline_worker,
args=(scan_id, tech_stack, input_type),
daemon=True,
name=f"pipeline-{scan_id}",
)
t.start()
logger.info("[API] Scan started | scan_id=%s | input_type=%s | tech_stack=%s", scan_id, input_type, tech_stack)
return ScanResponse(scan_id=scan_id)
@app.get("/api/stream/{scan_id}")
async def stream_scan(scan_id: str):
"""SSE ไธฒๆต็ซฏ้ปž๏ผšๅณๆ™‚ๆŽจ้€ pipeline ้€ฒๅบฆ"""
if scan_id not in _scan_store:
raise HTTPException(status_code=404, detail=f"scan_id '{scan_id}' not found")
return StreamingResponse(
_sse_generator(scan_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
"Connection": "keep-alive",
},
)
@app.get("/api/result/{scan_id}")
async def get_result(scan_id: str):
"""ๅ–ๅพ—ๆœ€็ต‚ๆŽƒๆ็ตๆžœ JSON"""
store = _scan_store.get(scan_id)
if not store:
raise HTTPException(status_code=404, detail=f"scan_id '{scan_id}' not found")
if store.get("error"):
raise HTTPException(status_code=500, detail=store["error"])
if store.get("result") is None:
raise HTTPException(status_code=202, detail="Scan still in progress")
return JSONResponse(store["result"])
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Phase 4D: Skill ็†ฑ่ผ‰ๅ…ฅ็ฎก็† API
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅปถ้ฒๅŒฏๅ…ฅ SkillLoader๏ผˆ้ฟๅ…ๅœจ import ๆ™‚ๅคฑๆ•—ๅฝฑ้Ÿฟๆ•ดๅ€‹ server๏ผ‰
def _get_skill_loader():
"""ๅฎ‰ๅ…จๅ–ๅพ— SkillLoader ๅ–ฎไพ‹๏ผŒ่‹ฅไธๅฏ็”จๅ›žๅ‚ณ None"""
try:
from skills.skill_loader import skill_loader
return skill_loader
except Exception as exc:
logger.warning("[Skills API] SkillLoader ไธๅฏ็”จ: %s", exc)
return None
@app.get("/api/skills")
async def list_skills():
"""
ๅˆ—ๅ‡บๆ‰€ๆœ‰ Skills ๅŠๅ…ถ็‰ˆๆœฌ่ณ‡่จŠ๏ผˆmtimeใ€ๅฟซๅ–็‹€ๆ…‹๏ผ‰ใ€‚
ๅ›žๅ‚ณๆ ผๅผ๏ผš
{ "skills": [{ "name": str, "mtime": float, "cached": bool, "size": int }],
"total": int, "skill_loader": "available"|"unavailable" }
"""
loader = _get_skill_loader()
if loader is None:
return JSONResponse({
"skills": [],
"total": 0,
"skill_loader": "unavailable",
})
try:
registry_data = loader.get_registry()
skills_dir = _ROOT / "skills"
skills_list = []
for entry in registry_data.get("skills", []):
name = entry.get("filename", "")
skill_path = skills_dir / name
skills_list.append({
"name": name,
"mtime": entry.get("mtime", 0),
"cached": not entry.get("is_fallback", False),
"size": skill_path.stat().st_size if skill_path.exists() else 0,
"modified": time.strftime(
"%Y-%m-%dT%H:%M:%S",
time.localtime(entry.get("mtime", 0))
) if entry.get("mtime", 0) > 0 else None,
})
# ไนŸ่ฃœๅ…… skills/ ็›ฎ้Œ„ไธญๅญ˜ๅœจไฝ†ๅฐšๆœชๅฟซๅ–็š„ .md ๆช”
if skills_dir.exists():
cached_names = {s["name"] for s in skills_list}
for md_file in sorted(skills_dir.glob("*.md")):
if md_file.name not in cached_names:
stat = md_file.stat()
skills_list.append({
"name": md_file.name,
"mtime": stat.st_mtime,
"cached": False,
"size": stat.st_size,
"modified": time.strftime("%Y-%m-%dT%H:%M:%S",
time.localtime(stat.st_mtime)),
})
skills_list.sort(key=lambda s: s["name"])
return JSONResponse({
"skills": skills_list,
"total": len(skills_list),
"skill_loader": "available",
"cache_ttl": registry_data.get("cache_ttl_seconds"),
})
except Exception as exc:
logger.error("[Skills API] list_skills error: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))
@app.get("/api/skills/{skill_name}")
async def get_skill_content(skill_name: str):
"""
ๅ–ๅพ—ๆŒ‡ๅฎš Skill ็š„ SOP ๅ…งๅฎนใ€‚
Args:
skill_name: .md ๆช”ๅ๏ผˆๅฆ‚ scout.md๏ผ‰
ๅ›žๅ‚ณๆ ผๅผ๏ผš
{ "name": str, "content": str, "cached": bool, "mtime": float }
"""
# ๅฎ‰ๅ…จๆ€ง๏ผšๅชๅ…่จฑ .md ๅ‰ฏๆช”ๅ๏ผŒไธ”ไธๅซ่ทฏๅพ‘ๅˆ†้š”็ฌฆ
if not skill_name.endswith(".md") or "/" in skill_name or "\\" in skill_name:
raise HTTPException(status_code=400, detail="Invalid skill name")
skills_dir = _ROOT / "skills"
skill_path = skills_dir / skill_name
if not skill_path.exists():
raise HTTPException(status_code=404, detail=f"Skill not found: {skill_name}")
loader = _get_skill_loader()
content = ""
cached = False
if loader is not None:
try:
content = loader.load_skill(skill_name)
registry_data = loader.get_registry()
cached_entries = {e["filename"]: e for e in registry_data.get("skills", [])}
cached = not cached_entries.get(skill_name, {}).get("is_fallback", True)
except Exception as exc:
logger.warning("[Skills API] SkillLoader.load_skill failed: %s", exc)
# Fallback: ็›ดๆŽฅ่ฎ€ๅ–
if not content:
try:
content = skill_path.read_text(encoding="utf-8").strip()
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Read error: {exc}")
stat = skill_path.stat()
return JSONResponse({
"name": skill_name,
"content": content,
"size": len(content),
"cached": cached,
"mtime": stat.st_mtime,
"modified": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(stat.st_mtime)),
})
class SkillReloadRequest(BaseModel):
skill_name: str | None = None # None โ†’ ๅผทๅˆถ้‡่ผ‰ๆ‰€ๆœ‰
@app.post("/api/skills/reload")
async def reload_skills(req: SkillReloadRequest):
"""
ๅผทๅˆถ้‡่ผ‰ๆŒ‡ๅฎš Skill๏ผˆๆˆ–ๅ…จ้ƒจ๏ผ‰็š„ๅฟซๅ–ใ€‚
Body: { "skill_name": "scout.md" } โ† ๆŒ‡ๅฎšๅ–ฎไธ€
{ "skill_name": null } โ† ๅ…จ้ƒจ้‡่ผ‰
ๅ›žๅ‚ณๆ ผๅผ๏ผš
{ "reloaded": ["scout.md", ...], "errors": [...] }
"""
loader = _get_skill_loader()
if loader is None:
raise HTTPException(status_code=503, detail="SkillLoader unavailable")
reloaded = []
errors = []
try:
if req.skill_name:
# ๅ–ฎไธ€้‡่ผ‰
if not req.skill_name.endswith(".md"):
raise HTTPException(status_code=400, detail="skill_name must end with .md")
loader.reload_skill(req.skill_name)
reloaded.append(req.skill_name)
logger.info("[Skills API] Force reloaded: %s", req.skill_name)
else:
# ๅ…จ้ƒจ้‡่ผ‰๏ผšๆธ…็ฉบๅฟซๅ–๏ผŒไธ‹ๆฌก load_skill ่‡ชๅ‹•้‡ๆ–ฐ่ฎ€ๅ–
skills_dir = _ROOT / "skills"
if skills_dir.exists():
for md_file in skills_dir.glob("*.md"):
try:
loader.reload_skill(md_file.name)
reloaded.append(md_file.name)
except Exception as exc:
errors.append({"name": md_file.name, "error": str(exc)})
logger.info("[Skills API] Force reloaded all: %d skills", len(reloaded))
except HTTPException:
raise
except Exception as exc:
logger.error("[Skills API] reload error: %s", exc)
raise HTTPException(status_code=500, detail=str(exc))
return JSONResponse({
"reloaded": reloaded,
"reloaded_count": len(reloaded),
"errors": errors,
"status": "ok" if not errors else "partial",
})
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅ•Ÿๅ‹•ๅ…ฅๅฃ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
if __name__ == "__main__":
import uvicorn
# ็›ดๆŽฅๅ‚ณ app ็‰ฉไปถ๏ผˆไธ็”จๅญ—ไธฒ๏ผ‰๏ผŒ็„ก่ซ–ๅพž project root ๆˆ– ui/ ็›ฎ้Œ„ๅŸท่กŒ้ƒฝๆญฃๅธธ
uvicorn.run(
app,
host="0.0.0.0",
port=1000,
reload=False,
log_level="info",
)