| """ |
| skills/skill_loader.py โ ๅๆ
Skill ็ฑ่ผๅ
ฅ็ณป็ตฑ (Phase 4D) |
| ============================================================ |
| |
| ่จญ่จ็ฎๆจ๏ผ |
| - ็ก้้ๅ FastAPI ๆๅๅณๅฏๆดๆฐ SOP .md ๆชๆก |
| - mtime ๅ LRU Cache๏ผ่ฎๅ + ๅฟซๅ๏ผไฟฎๆนๅพ่ชๅๅคฑๆ |
| - ๅท่ก็ทๅฎๅ
จ๏ผๆๆๅ
ฌ้ๆนๆณๅๅ threading.RLock ไฟ่ญท |
| - Graceful Degradation๏ผๆชๆก้บๅคฑๆๅ้ๅฐๅตๅ
ฅๅผ fallback SOP |
| - ๅฏ่งๆธฌๆง๏ผๆไพๅฎๆด็ registry API ไพ /api/skills ็ซฏ้ปไฝฟ็จ |
| |
| ๆถๆง๏ผ |
| SkillLoader๏ผๅฎไพ๏ผ |
| โโโ _load_with_mtime() ่ฎๅ .md โ ๅฟซๅ (content, mtime, load_time) |
| โโโ load_skill() ๅ
ฌ้ๅๅพไป้ข๏ผmtime ้ฉ่ญ๏ผ้ๆ่ชๅ reload๏ผ |
| โโโ reload_skill() ๅผทๅถ้่ผ๏ผไธ็ฎก mtime๏ผ |
| โโโ reload_all() ๅผทๅถ้่ผๅ
จ้จ |
| โโโ get_registry() ๅๅบๆๆๅทฒๅฟซๅ็ skill + ็ๆฌ่ณ่จ |
| |
| ็ธๅฎนๆง๏ผ |
| - ๆๆ็พๆ Agent ็ _load_skill() ๅฏ็ก็ธซๆฟๆ็บ |
| skill_loader.load_skill(filename) |
| - ๆฐๅข server.py API ็ซฏ้ปไฝฟ็จ skill_loader.get_registry() |
| |
| ้ตๅฎ๏ผproject_CONSTITUTION.md + AGENTS.md + HARNESS_ENGINEERING.md |
| """ |
|
|
| import logging |
| import os |
| import threading |
| import time |
| from pathlib import Path |
| from typing import Optional |
|
|
| logger = logging.getLogger("ThreatHunter.skill_loader") |
|
|
| |
| _PROJECT_ROOT = Path(__file__).parent.parent |
| _SKILLS_DIR = Path(__file__).parent |
|
|
| |
| |
| |
| CACHE_TTL_SECONDS: float = float(os.getenv("SKILL_CACHE_TTL", "5.0")) |
|
|
| |
| _FALLBACK_SOPS: dict[str, str] = { |
| "threat_intel.md": """ |
| # Skill: Threat Intel Scout (fallback) |
| ## SOP |
| 1. read_memory(agent_name="scout") |
| 2. search_nvd ๆฅ่ฉขๆฏๅๅฅไปถ |
| 3. CVSS >= 7.0 โ search_otx |
| 4. ๆฏๅฐๆญทๅฒๆจ่จ is_new |
| 5. write_memory ๅฏซๅ
ฅ |
| 6. ่ผธๅบ็ด JSON |
| """.strip(), |
| "source_code_audit.md": """ |
| # Skill: Source Code Audit (fallback) |
| ## SOP |
| 1. Identify imported packages from code |
| 2. search_nvd for each package |
| 3. Flag hardcoded secrets (OWASP A07) |
| 4. write_memory, output JSON |
| """.strip(), |
| "ai_security_audit.md": """ |
| # Skill: AI Security Audit (fallback) |
| ## SOP |
| 1. Classify input: prompt injection / jailbreak / data poisoning |
| 2. Map to OWASP LLM Top10 |
| 3. Rate severity 1-10 |
| 4. Output JSON (no CVE calls needed) |
| """.strip(), |
| "config_audit.md": """ |
| # Skill: Config Audit (fallback) |
| ## SOP |
| 1. Check for hardcoded secrets |
| 2. Validate against CIS Benchmark |
| 3. Flag misconfigurations |
| 4. Output JSON |
| """.strip(), |
| } |
|
|
| _DEFAULT_FALLBACK = """ |
| # Skill SOP (generic fallback) |
| Follow security analysis best practices. |
| Output structured JSON with findings. |
| Do not fabricate CVE IDs. |
| """.strip() |
|
|
|
|
| class _CacheEntry: |
| """ๅฎไธ Skill ็ๅฟซๅๆข็ฎ""" |
|
|
| __slots__ = ("content", "mtime", "load_time", "filename", "size_bytes") |
|
|
| def __init__(self, filename: str, content: str, mtime: float): |
| self.filename = filename |
| self.content = content |
| self.mtime = mtime |
| self.load_time = time.time() |
| self.size_bytes = len(content.encode("utf-8")) |
|
|
|
|
| class SkillLoader: |
| """ |
| ๅท่ก็ทๅฎๅ
จ็ Skill ็ฑ่ผๅ
ฅๅจ๏ผๅฎไพๆจ่ฆ๏ผใ |
| |
| ไฝฟ็จ็ฏไพ๏ผ |
| from skills.skill_loader import skill_loader |
| sop = skill_loader.load_skill("threat_intel.md") |
| |
| API๏ผ |
| load_skill(filename) โ str ๏ผๅฟซๅ + ่ชๅๅคฑๆ๏ผ |
| reload_skill(filename) โ str ๏ผๅผทๅถ้่ผ๏ผ |
| reload_all() โ dict ๏ผ้่ผๅ
จ้จๅทฒๅฟซๅ๏ผ |
| get_registry() โ dict ๏ผๅๅบๆๆๅฟซๅๅ
งๅฎน๏ผ |
| invalidate(filename) โ None ๏ผ็งป้คๅฎไธๅฟซๅๆข็ฎ๏ผ |
| invalidate_all() โ None ๏ผๆธ
็ฉบๅ
จ้จๅฟซๅ๏ผ |
| """ |
|
|
| def __init__(self, skills_dir: Path | str | None = None): |
| self._skills_dir = Path(skills_dir) if skills_dir else _SKILLS_DIR |
| self._cache: dict[str, _CacheEntry] = {} |
| self._lock = threading.RLock() |
| logger.info("[SkillLoader] ๅๅงๅๅฎๆ | skills_dir=%s", self._skills_dir) |
|
|
| |
| |
| |
|
|
| def _get_mtime(self, filepath: Path) -> Optional[float]: |
| """ๅๅพๆชๆก็ mtime๏ผ่ฅไธๅญๅจๅๅๅณ None๏ผ""" |
| try: |
| return filepath.stat().st_mtime |
| except (OSError, FileNotFoundError): |
| return None |
|
|
| def _read_file(self, filepath: Path) -> Optional[str]: |
| """ๅ่ฉฆๅค็จฎ็ทจ็ขผ่ฎๅ .md ๆชๆก๏ผๅคฑๆๅๅณ None""" |
| for enc in ("utf-8", "utf-8-sig", "latin-1"): |
| try: |
| content = filepath.read_text(encoding=enc).strip() |
| if content: |
| return content |
| except (OSError, UnicodeDecodeError): |
| continue |
| return None |
|
|
| def _load_with_mtime(self, filename: str) -> _CacheEntry: |
| """ |
| ๅพ็ฃ็ข่ฎๅ skill ไธฆๅปบ็ซๅฟซๅๆข็ฎใ |
| ่ฅ่ฎๅๅคฑๆ๏ผไฝฟ็จ fallback SOP ๅปบ็ซๆข็ฎ๏ผmtime=-1 ๆจ่ญ็บ fallback๏ผใ |
| """ |
| filepath = self._skills_dir / filename |
| mtime = self._get_mtime(filepath) |
|
|
| if mtime is not None: |
| content = self._read_file(filepath) |
| if content: |
| logger.info("[SkillLoader] ่ผๅ
ฅ: %s (%d chars)", filename, len(content)) |
| return _CacheEntry(filename, content, mtime) |
| else: |
| logger.warning("[SkillLoader] ๆชๆก็บ็ฉบ: %s๏ผไฝฟ็จ fallback", filename) |
| else: |
| logger.warning("[SkillLoader] ๆพไธๅฐๆชๆก: %s๏ผไฝฟ็จ fallback", filename) |
|
|
| |
| fallback_content = _FALLBACK_SOPS.get(filename, _DEFAULT_FALLBACK) |
| return _CacheEntry(filename, fallback_content, -1.0) |
|
|
| |
| |
| |
|
|
| def load_skill(self, filename: str) -> str: |
| """ |
| ๅๅพ Skill SOP ๅ
งๅฎน๏ผๅฟซๅๅชๅ
๏ผmtime ้ฉ่ญ่ชๅๅคฑๆ๏ผใ |
| |
| ้่ผฏ๏ผ |
| 1. ่ฅๅฟซๅไธญ็กๆญคๆช โ ๅพ็ฃ็ข่ผๅ
ฅ โ ๅฟซๅ |
| 2. ่ฅๅฟซๅๅญๅจ + TTL ๅ
ง โ ็ดๆฅๅๅณ๏ผๆ้ซๆ๏ผ |
| 3. ่ฅๅฟซๅๅญๅจ + TTL ้ๆ โ ้ฉ่ญ mtime๏ผ |
| mtime ไธ่ฎ โ ๆดๆฐ load_time๏ผ็นผ็บไฝฟ็จ |
| mtime ๆน่ฎ โ ้ๆฐๅพ็ฃ็ข่ผๅ
ฅ๏ผ็ฑ่ผๅ
ฅ๏ผ๏ผ |
| 4. fallback entry๏ผmtime=-1๏ผโ ๆฏๆฌก้่ฉฆ็ฃ็ข็ขบ่ชๆฏๅฆๅทฒๅปบ็ซ |
| |
| Args: |
| filename: Skill .md ๆไปถๅ๏ผไธๅซ่ทฏๅพ๏ผ๏ผๅฆ "threat_intel.md" |
| |
| Returns: |
| str: Skill ๆไปถๅ
งๅฎน๏ผๆ fallback SOP๏ผ |
| """ |
| with self._lock: |
| entry = self._cache.get(filename) |
|
|
| |
| if entry is None: |
| entry = self._load_with_mtime(filename) |
| self._cache[filename] = entry |
| return entry.content |
|
|
| |
| age = time.time() - entry.load_time |
| if age < CACHE_TTL_SECONDS: |
| return entry.content |
|
|
| |
| current_mtime = self._get_mtime(self._skills_dir / filename) |
|
|
| if current_mtime is None: |
| |
| if entry.mtime == -1.0: |
| entry.load_time = time.time() |
| else: |
| logger.warning("[SkillLoader] ็ฑ่ผๅ
ฅๅตๆธฌ๏ผ%s ๅทฒๅช้ค๏ผๅๆ fallback", filename) |
| entry = self._load_with_mtime(filename) |
| self._cache[filename] = entry |
| return entry.content |
|
|
| if current_mtime == entry.mtime: |
| |
| entry.load_time = time.time() |
| return entry.content |
|
|
| |
| logger.info( |
| "[SkillLoader] ๐ ็ฑ่ผๅ
ฅ %s (่ mtime=%.3f โ ๆฐ mtime=%.3f)", |
| filename, entry.mtime, current_mtime, |
| ) |
| entry = self._load_with_mtime(filename) |
| self._cache[filename] = entry |
| return entry.content |
|
|
| def reload_skill(self, filename: str) -> str: |
| """ |
| ๅผทๅถ้่ผๆๅฎ Skill๏ผไธ็ฎก mtime ๅ TTL๏ผใ |
| ้ฉ็จๆผ๏ผ/api/skills/reload API ่ขซๅผๅซๆใ |
| |
| Returns: |
| str: ้ๆฐ่ผๅ
ฅๅพ็ Skill ๅ
งๅฎน |
| """ |
| with self._lock: |
| logger.info("[SkillLoader] ๅผทๅถ้่ผ: %s", filename) |
| entry = self._load_with_mtime(filename) |
| self._cache[filename] = entry |
| return entry.content |
|
|
| def reload_all(self) -> dict[str, str]: |
| """ |
| ๅผทๅถ้่ผๆๆๅทฒๅฟซๅ็ Skillใ |
| |
| Returns: |
| dict[filename โ new_content]๏ผๅ
ๅซ fallback entry๏ผ |
| """ |
| with self._lock: |
| results = {} |
| for filename in list(self._cache.keys()): |
| entry = self._load_with_mtime(filename) |
| self._cache[filename] = entry |
| results[filename] = entry.content |
| logger.info("[SkillLoader] ๅ
จ้จ้่ผๅฎๆ, %d ๅ skill", len(results)) |
| return results |
|
|
| def invalidate(self, filename: str) -> None: |
| """็งป้คๅฎไธๅฟซๅๆข็ฎ๏ผไธๆฌก load_skill ๆ้ๆฐ่ฎๅ๏ผ""" |
| with self._lock: |
| removed = self._cache.pop(filename, None) |
| if removed: |
| logger.info("[SkillLoader] ๅฟซๅๅคฑๆ: %s", filename) |
|
|
| def invalidate_all(self) -> None: |
| """ๆธ
็ฉบๅ
จ้จๅฟซๅ๏ผไธๆฌก load_skill ๆ้ๆฐ่ฎๅๆๆ๏ผ""" |
| with self._lock: |
| count = len(self._cache) |
| self._cache.clear() |
| logger.info("[SkillLoader] ๅ
จ้จๅฟซๅๆธ
็ฉบ (%d ๅ)", count) |
|
|
| def get_registry(self) -> dict: |
| """ |
| ๅๅณๆๆๅทฒๅฟซๅ็ Skill ็ๆ
๏ผไพ /api/skills ็ซฏ้ปไฝฟ็จใ |
| |
| Returns: |
| dict: |
| { |
| "skills_dir": str, |
| "cache_ttl_seconds": float, |
| "total": int, |
| "skills": [ |
| { |
| "filename": str, |
| "size_bytes": int, |
| "mtime": float, # -1 = fallback SOP |
| "load_time": float, |
| "age_seconds": float, |
| "is_fallback": bool, |
| "content_preview": str # ๅ 200 ๅญๅ
|
| } |
| ] |
| } |
| """ |
| with self._lock: |
| now = time.time() |
| skills_list = [] |
| for filename, entry in self._cache.items(): |
| skills_list.append({ |
| "filename": filename, |
| "size_bytes": entry.size_bytes, |
| "mtime": entry.mtime, |
| "load_time": entry.load_time, |
| "age_seconds": round(now - entry.load_time, 2), |
| "is_fallback": entry.mtime == -1.0, |
| "content_preview": entry.content[:200], |
| }) |
| return { |
| "skills_dir": str(self._skills_dir), |
| "cache_ttl_seconds": CACHE_TTL_SECONDS, |
| "total": len(skills_list), |
| "skills": skills_list, |
| } |
|
|
| def get_skill_content(self, filename: str) -> Optional[str]: |
| """ |
| ๅๅณๅทฒๅฟซๅ็ Skill ๅๅงๅ
งๅฎน๏ผ่ฅๅฐๆชๅฟซๅๅๅ
่ผๅ
ฅ๏ผใ |
| ไพ /api/skills/{name} ็ซฏ้ปไฝฟ็จใ |
| """ |
| return self.load_skill(filename) |
|
|
| def list_available_skills(self) -> list[str]: |
| """ |
| ๆๆ skills/ ็ฎ้๏ผๅๅณๆๆๅฏ็จ็ .md ๆชๆกๆธ
ๅฎใ |
| ๏ผๅ
ๅซๆชๅฟซๅ็ๆชๆก๏ผ |
| """ |
| try: |
| return sorted( |
| f.name for f in self._skills_dir.iterdir() |
| if f.is_file() and f.suffix == ".md" |
| ) |
| except OSError as e: |
| logger.warning("[SkillLoader] ็กๆณๆๆ skills/ ็ฎ้: %s", e) |
| return list(_FALLBACK_SOPS.keys()) |
|
|
| def get_stats(self) -> dict: |
| """ |
| ๅๅณ SkillLoader ็ๆ่ฝ็ตฑ่จใ |
| """ |
| with self._lock: |
| return { |
| "cached_skills": len(self._cache), |
| "fallback_count": sum(1 for e in self._cache.values() if e.mtime == -1.0), |
| "skills_dir": str(self._skills_dir), |
| "cache_ttl_seconds": CACHE_TTL_SECONDS, |
| } |
|
|
|
|
| |
| |
| |
|
|
| skill_loader = SkillLoader() |
|
|
|
|
| |
| |
| |
|
|
| def load_skill(filename: str) -> str: |
| """ |
| ๅ
จๅไพฟๅฉๅฝๅผ๏ผ็ญๅๆผ skill_loader.load_skill(filename)ใ |
| Agent ๅฏ็ดๆฅ from skills.skill_loader import load_skill ไฝฟ็จใ |
| """ |
| return skill_loader.load_skill(filename) |
|
|
|
|
| def reload_skill(filename: str) -> str: |
| """ๅผทๅถ้่ผๅฎไธ Skill""" |
| return skill_loader.reload_skill(filename) |
|
|
|
|
| def get_registry() -> dict: |
| """ๅๅพๆๆ Skill ็ๅฟซๅ็ๆ
""" |
| return skill_loader.get_registry() |
|
|