File size: 15,424 Bytes
c8d30bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 | """
skills/skill_loader.py — 動態 Skill 熱載入系統 (Phase 4D)
============================================================
設計目標:
- 無需重啟 FastAPI 服務即可更新 SOP .md 檔案
- mtime 型 LRU Cache:讀取 + 快取,修改後自動失效
- 執行緒安全:所有公開方法均受 threading.RLock 保護
- Graceful Degradation:檔案遺失時回退到嵌入式 fallback SOP
- 可觀測性:提供完整的 registry API 供 /api/skills 端點使用
架構:
SkillLoader(單例)
├── _load_with_mtime() 讀取 .md → 快取 (content, mtime, load_time)
├── load_skill() 公開取得介面(mtime 驗證,過期自動 reload)
├── reload_skill() 強制重載(不管 mtime)
├── reload_all() 強制重載全部
└── get_registry() 列出所有已快取的 skill + 版本資訊
相容性:
- 所有現有 Agent 的 _load_skill() 可無縫替換為
skill_loader.load_skill(filename)
- 新增 server.py API 端點使用 skill_loader.get_registry()
遵守:project_CONSTITUTION.md + AGENTS.md + HARNESS_ENGINEERING.md
"""
import logging
import os
import threading
import time
from pathlib import Path
from typing import Optional
logger = logging.getLogger("ThreatHunter.skill_loader")
# ── Skill 目錄定位 ───────────────────────────────────────────────
_PROJECT_ROOT = Path(__file__).parent.parent
_SKILLS_DIR = Path(__file__).parent # skills/ 目錄本身
# ── 快取 TTL(以秒計):已修改的檔案在此時間後強制 reload ─────────
# 設置為 0 代表每次都驗證 mtime(最即時但性能稍差)
# 設置為正數代表 stall 期間,TTL 內不 check mtime(高性能)
CACHE_TTL_SECONDS: float = float(os.getenv("SKILL_CACHE_TTL", "5.0"))
# ── 所有 skill 檔案的對應 fallback SOP ──────────────────────────
_FALLBACK_SOPS: dict[str, str] = {
"threat_intel.md": """
# Skill: Threat Intel Scout (fallback)
## SOP
1. read_memory(agent_name="scout")
2. search_nvd 查詢每個套件
3. CVSS >= 7.0 → search_otx
4. 比對歷史標記 is_new
5. write_memory 寫入
6. 輸出純 JSON
""".strip(),
"source_code_audit.md": """
# Skill: Source Code Audit (fallback)
## SOP
1. Identify imported packages from code
2. search_nvd for each package
3. Flag hardcoded secrets (OWASP A07)
4. write_memory, output JSON
""".strip(),
"ai_security_audit.md": """
# Skill: AI Security Audit (fallback)
## SOP
1. Classify input: prompt injection / jailbreak / data poisoning
2. Map to OWASP LLM Top10
3. Rate severity 1-10
4. Output JSON (no CVE calls needed)
""".strip(),
"config_audit.md": """
# Skill: Config Audit (fallback)
## SOP
1. Check for hardcoded secrets
2. Validate against CIS Benchmark
3. Flag misconfigurations
4. Output JSON
""".strip(),
}
_DEFAULT_FALLBACK = """
# Skill SOP (generic fallback)
Follow security analysis best practices.
Output structured JSON with findings.
Do not fabricate CVE IDs.
""".strip()
class _CacheEntry:
"""單一 Skill 的快取條目"""
__slots__ = ("content", "mtime", "load_time", "filename", "size_bytes")
def __init__(self, filename: str, content: str, mtime: float):
self.filename = filename
self.content = content
self.mtime = mtime # 磁碟上的 mtime(float,Unix timestamp)
self.load_time = time.time() # 本次 reload 的時間
self.size_bytes = len(content.encode("utf-8"))
class SkillLoader:
"""
執行緒安全的 Skill 熱載入器(單例推薦)。
使用範例:
from skills.skill_loader import skill_loader
sop = skill_loader.load_skill("threat_intel.md")
API:
load_skill(filename) → str (快取 + 自動失效)
reload_skill(filename) → str (強制重載)
reload_all() → dict (重載全部已快取)
get_registry() → dict (列出所有快取內容)
invalidate(filename) → None (移除單一快取條目)
invalidate_all() → None (清空全部快取)
"""
def __init__(self, skills_dir: Path | str | None = None):
self._skills_dir = Path(skills_dir) if skills_dir else _SKILLS_DIR
self._cache: dict[str, _CacheEntry] = {}
self._lock = threading.RLock()
logger.info("[SkillLoader] 初始化完成 | skills_dir=%s", self._skills_dir)
# ══════════════════════════════════════════════════════════════
# 核心讀取
# ══════════════════════════════════════════════════════════════
def _get_mtime(self, filepath: Path) -> Optional[float]:
"""取得檔案的 mtime(若不存在則回傳 None)"""
try:
return filepath.stat().st_mtime
except (OSError, FileNotFoundError):
return None
def _read_file(self, filepath: Path) -> Optional[str]:
"""嘗試多種編碼讀取 .md 檔案,失敗回傳 None"""
for enc in ("utf-8", "utf-8-sig", "latin-1"):
try:
content = filepath.read_text(encoding=enc).strip()
if content:
return content
except (OSError, UnicodeDecodeError):
continue
return None
def _load_with_mtime(self, filename: str) -> _CacheEntry:
"""
從磁碟讀取 skill 並建立快取條目。
若讀取失敗,使用 fallback SOP 建立條目(mtime=-1 標識為 fallback)。
"""
filepath = self._skills_dir / filename
mtime = self._get_mtime(filepath)
if mtime is not None:
content = self._read_file(filepath)
if content:
logger.info("[SkillLoader] 載入: %s (%d chars)", filename, len(content))
return _CacheEntry(filename, content, mtime)
else:
logger.warning("[SkillLoader] 檔案為空: %s,使用 fallback", filename)
else:
logger.warning("[SkillLoader] 找不到檔案: %s,使用 fallback", filename)
# Fallback:使用嵌入式 SOP
fallback_content = _FALLBACK_SOPS.get(filename, _DEFAULT_FALLBACK)
return _CacheEntry(filename, fallback_content, -1.0)
# ══════════════════════════════════════════════════════════════
# 公開 API
# ══════════════════════════════════════════════════════════════
def load_skill(self, filename: str) -> str:
"""
取得 Skill SOP 內容(快取優先,mtime 驗證自動失效)。
邏輯:
1. 若快取中無此檔 → 從磁碟載入 → 快取
2. 若快取存在 + TTL 內 → 直接回傳(最高效)
3. 若快取存在 + TTL 過期 → 驗證 mtime:
mtime 不變 → 更新 load_time,繼續使用
mtime 改變 → 重新從磁碟載入(熱載入!)
4. fallback entry(mtime=-1)→ 每次重試磁碟確認是否已建立
Args:
filename: Skill .md 文件名(不含路徑),如 "threat_intel.md"
Returns:
str: Skill 文件內容(或 fallback SOP)
"""
with self._lock:
entry = self._cache.get(filename)
# 情況 1:尚未快取 → 載入
if entry is None:
entry = self._load_with_mtime(filename)
self._cache[filename] = entry
return entry.content
# 情況 2:TTL 內 → 直接回傳
age = time.time() - entry.load_time
if age < CACHE_TTL_SECONDS:
return entry.content
# 情況 3 / 4:TTL 過期,驗證 mtime
current_mtime = self._get_mtime(self._skills_dir / filename)
if current_mtime is None:
# 檔案消失了 → 若是 fallback 就繼續用,否則切換 fallback
if entry.mtime == -1.0:
entry.load_time = time.time() # 刷新 TTL
else:
logger.warning("[SkillLoader] 熱載入偵測:%s 已刪除,切換 fallback", filename)
entry = self._load_with_mtime(filename) # 會走 fallback 路徑
self._cache[filename] = entry
return entry.content
if current_mtime == entry.mtime:
# 檔案未更動 → 更新 load_time 刷新 TTL
entry.load_time = time.time()
return entry.content
# 檔案已更新!熱載入
logger.info(
"[SkillLoader] 🔄 熱載入 %s (舊 mtime=%.3f → 新 mtime=%.3f)",
filename, entry.mtime, current_mtime,
)
entry = self._load_with_mtime(filename)
self._cache[filename] = entry
return entry.content
def reload_skill(self, filename: str) -> str:
"""
強制重載指定 Skill(不管 mtime 和 TTL)。
適用於:/api/skills/reload API 被呼叫時。
Returns:
str: 重新載入後的 Skill 內容
"""
with self._lock:
logger.info("[SkillLoader] 強制重載: %s", filename)
entry = self._load_with_mtime(filename)
self._cache[filename] = entry
return entry.content
def reload_all(self) -> dict[str, str]:
"""
強制重載所有已快取的 Skill。
Returns:
dict[filename → new_content](包含 fallback entry)
"""
with self._lock:
results = {}
for filename in list(self._cache.keys()):
entry = self._load_with_mtime(filename)
self._cache[filename] = entry
results[filename] = entry.content
logger.info("[SkillLoader] 全部重載完成, %d 個 skill", len(results))
return results
def invalidate(self, filename: str) -> None:
"""移除單一快取條目(下次 load_skill 時重新讀取)"""
with self._lock:
removed = self._cache.pop(filename, None)
if removed:
logger.info("[SkillLoader] 快取失效: %s", filename)
def invalidate_all(self) -> None:
"""清空全部快取(下次 load_skill 時重新讀取所有)"""
with self._lock:
count = len(self._cache)
self._cache.clear()
logger.info("[SkillLoader] 全部快取清空 (%d 個)", count)
def get_registry(self) -> dict:
"""
回傳所有已快取的 Skill 狀態,供 /api/skills 端點使用。
Returns:
dict:
{
"skills_dir": str,
"cache_ttl_seconds": float,
"total": int,
"skills": [
{
"filename": str,
"size_bytes": int,
"mtime": float, # -1 = fallback SOP
"load_time": float,
"age_seconds": float,
"is_fallback": bool,
"content_preview": str # 前 200 字元
}
]
}
"""
with self._lock:
now = time.time()
skills_list = []
for filename, entry in self._cache.items():
skills_list.append({
"filename": filename,
"size_bytes": entry.size_bytes,
"mtime": entry.mtime,
"load_time": entry.load_time,
"age_seconds": round(now - entry.load_time, 2),
"is_fallback": entry.mtime == -1.0,
"content_preview": entry.content[:200],
})
return {
"skills_dir": str(self._skills_dir),
"cache_ttl_seconds": CACHE_TTL_SECONDS,
"total": len(skills_list),
"skills": skills_list,
}
def get_skill_content(self, filename: str) -> Optional[str]:
"""
回傳已快取的 Skill 原始內容(若尚未快取則先載入)。
供 /api/skills/{name} 端點使用。
"""
return self.load_skill(filename)
def list_available_skills(self) -> list[str]:
"""
掃描 skills/ 目錄,回傳所有可用的 .md 檔案清單。
(包含未快取的檔案)
"""
try:
return sorted(
f.name for f in self._skills_dir.iterdir()
if f.is_file() and f.suffix == ".md"
)
except OSError as e:
logger.warning("[SkillLoader] 無法掃描 skills/ 目錄: %s", e)
return list(_FALLBACK_SOPS.keys())
def get_stats(self) -> dict:
"""
回傳 SkillLoader 的效能統計。
"""
with self._lock:
return {
"cached_skills": len(self._cache),
"fallback_count": sum(1 for e in self._cache.values() if e.mtime == -1.0),
"skills_dir": str(self._skills_dir),
"cache_ttl_seconds": CACHE_TTL_SECONDS,
}
# ══════════════════════════════════════════════════════════════════
# 全域單例(供所有 Agent 使用的共享實例)
# ══════════════════════════════════════════════════════════════════
skill_loader = SkillLoader()
# ══════════════════════════════════════════════════════════════════
# 便利函式(向後相容 —— 取代 agents/ 中的 _load_skill())
# ══════════════════════════════════════════════════════════════════
def load_skill(filename: str) -> str:
"""
全域便利函式,等同於 skill_loader.load_skill(filename)。
Agent 可直接 from skills.skill_loader import load_skill 使用。
"""
return skill_loader.load_skill(filename)
def reload_skill(filename: str) -> str:
"""強制重載單一 Skill"""
return skill_loader.reload_skill(filename)
def get_registry() -> dict:
"""取得所有 Skill 的快取狀態"""
return skill_loader.get_registry()
|