"""Shared utility functions for the skill engine. Provides: - YAML frontmatter parsing/manipulation (unified across registry, evolver, etc.) - LLM output cleaning (markdown fence stripping, change summary extraction) - Skill content safety checking (regex-based moderation) - Skill directory validation - Text truncation """ from __future__ import annotations import re from pathlib import Path from typing import Any, Dict, List, Optional from openspace.utils.logging import Logger logger = Logger.get_logger(__name__) SKILL_FILENAME = "SKILL.md" _SAFETY_RULES = [ ("blocked.malware", re.compile(r"(ClawdAuthenticatorTool)", re.IGNORECASE)), ("suspicious.keyword", re.compile(r"(malware|stealer|phish|phishing|keylogger)", re.IGNORECASE)), ("suspicious.secrets", re.compile(r"(api[-_ ]?key|token|password|private key|secret)", re.IGNORECASE)), ("suspicious.crypto", re.compile(r"(wallet|seed phrase|mnemonic|crypto)", re.IGNORECASE)), ("suspicious.webhook", re.compile(r"(discord\.gg|webhook|hooks\.slack)", re.IGNORECASE)), ("suspicious.script", re.compile(r"(curl[^\n]+\|\s*(sh|bash))", re.IGNORECASE)), ("suspicious.url_shortener", re.compile(r"(bit\.ly|tinyurl\.com|t\.co|goo\.gl|is\.gd)", re.IGNORECASE)), ] _BLOCKING_FLAGS = frozenset({"blocked.malware"}) def check_skill_safety(text: str) -> List[str]: """Check *text* against safety rules, return list of triggered flag names. Returns an empty list if no rules match (= safe). """ return [flag for flag, pat in _SAFETY_RULES if pat.search(text)] def is_skill_safe(flags: List[str]) -> bool: """Return True if *flags* contain no blocking flag. ``suspicious.*`` flags are informational (logged / attached to search results) but do NOT block. Only ``blocked.*`` flags cause rejection. """ return not any(f in _BLOCKING_FLAGS for f in flags) _FRONTMATTER_RE = re.compile(r"^---\n(.*?)\n---", re.DOTALL) # Characters that require YAML value quoting (colon-space, hash-space, # or values starting with special YAML indicators). _YAML_NEEDS_QUOTE_RE = re.compile(r"[:\#\[\]{}&*!|>'\"%@`]") def _yaml_quote(value: str) -> str: """Quote a YAML scalar value if it contains special characters.""" if not value or not _YAML_NEEDS_QUOTE_RE.search(value): return value escaped = value.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def _yaml_unquote(value: str) -> str: """Strip surrounding quotes and unescape a YAML scalar value.""" if len(value) >= 2: if (value[0] == '"' and value[-1] == '"') or \ (value[0] == "'" and value[-1] == "'"): inner = value[1:-1] if value[0] == '"': inner = inner.replace('\\"', '"').replace("\\\\", "\\") return inner return value def parse_frontmatter(content: str) -> Dict[str, Any]: """Parse YAML frontmatter into a flat dict. Simple line-by-line parser (no PyYAML dependency). Handles both quoted and unquoted values. Returns ``{}`` if no valid frontmatter is found. """ if not content.startswith("---"): return {} match = _FRONTMATTER_RE.match(content) if not match: return {} fm: Dict[str, Any] = {} for line in match.group(1).split("\n"): if ":" in line: key, value = line.split(":", 1) key = key.strip() if key: fm[key] = _yaml_unquote(value.strip()) return fm def get_frontmatter_field(content: str, field_name: str) -> Optional[str]: """Extract a single field value from YAML frontmatter. Returns ``None`` if the field is absent or content has no frontmatter. """ if not content.startswith("---"): return None match = _FRONTMATTER_RE.match(content) if not match: return None for line in match.group(1).split("\n"): if ":" in line: key, value = line.split(":", 1) if key.strip() == field_name: return _yaml_unquote(value.strip()) return None def set_frontmatter_field(content: str, field_name: str, value: str) -> str: """Set (or insert) a field in YAML frontmatter. Values containing YAML special characters (``:``, ``#``, etc.) are automatically double-quoted to produce valid YAML. If *content* has no frontmatter, a new one is prepended. """ quoted = _yaml_quote(value) if not content.startswith("---"): return f"---\n{field_name}: {quoted}\n---\n{content}" match = _FRONTMATTER_RE.match(content) if not match: return content fm_text = match.group(1) new_line = f"{field_name}: {quoted}" found = False new_lines = [] for line in fm_text.split("\n"): if ":" in line and line.split(":", 1)[0].strip() == field_name: new_lines.append(new_line) found = True else: new_lines.append(line) if not found: new_lines.append(new_line) new_fm = "\n".join(new_lines) return f"---\n{new_fm}\n---{content[match.end():]}" def normalize_frontmatter(content: str) -> str: """Re-serialize frontmatter with proper YAML quoting. Parses the existing frontmatter, then re-writes each value through :func:`_yaml_quote` so that colons, hashes, and other special characters are safely double-quoted. The body after ``---`` is preserved verbatim. Returns *content* unchanged if no frontmatter is found. """ if not content.startswith("---"): return content match = _FRONTMATTER_RE.match(content) if not match: return content fm = parse_frontmatter(content) if not fm: return content safe_lines = [f"{k}: {_yaml_quote(v)}" for k, v in fm.items()] new_fm = "\n".join(safe_lines) return f"---\n{new_fm}\n---{content[match.end():]}" def strip_frontmatter(content: str) -> str: """Remove YAML frontmatter from markdown content.""" if content.startswith("---"): match = re.match(r"^---\n.*?\n---\n?", content, re.DOTALL) if match: return content[match.end():].strip() return content def strip_markdown_fences(text: str) -> str: """Remove surrounding markdown code fences if present. Handles common LLM wrapping patterns: - ````` ```markdown ```, ````` ```md ```, ````` ``` ```, ````` ```text ````` - Nested triple-backtick pairs (outermost only) - Leading/trailing whitespace around fences """ text = text.strip() # Pattern: opening ``` with optional language tag, content, closing ``` m = re.match( r"^```(?:markdown|md|text|yaml|diff|patch)?\s*\n(.*?)\n```\s*$", text, re.DOTALL, ) if m: return m.group(1).strip() # Some LLMs emit ``````` (4+ backticks) as outer fence m = re.match( r"^`{3,}(?:\w+)?\s*\n(.*?)\n`{3,}\s*$", text, re.DOTALL, ) if m: return m.group(1).strip() return text _CHANGE_SUMMARY_RE = re.compile( r"^[\s*_]*(?:CHANGE[\s_-]?SUMMARY)\s*[::]\s*(.+)", re.IGNORECASE, ) def extract_change_summary(content: str) -> tuple[str, str]: """Extract ``CHANGE_SUMMARY`` from LLM output. Returns ``(clean_content, change_summary)``. """ lines = content.split("\n") # Find the first non-blank line first_nonblank = -1 for i, line in enumerate(lines): if line.strip(): first_nonblank = i break if first_nonblank == -1: return content, "" m = _CHANGE_SUMMARY_RE.match(lines[first_nonblank]) if not m: return content, "" # Strip markdown bold/italic markers (** or __) from both ends summary = m.group(1).strip().strip("*_").strip() # Skip blank lines after the summary line to find content start content_start = first_nonblank + 1 while content_start < len(lines) and not lines[content_start].strip(): content_start += 1 rest = "\n".join(lines[content_start:]) return rest.strip(), summary def validate_skill_dir(skill_dir: Path) -> Optional[str]: """Validate a skill directory after edit application. Returns None if valid, or an error message string. Checks: 1. Directory exists 2. SKILL.md exists and is non-empty 3. SKILL.md has valid YAML frontmatter with ``name`` field 4. No empty files (warning-level, not blocking) """ if not skill_dir.exists(): return f"Skill directory does not exist: {skill_dir}" skill_file = skill_dir / SKILL_FILENAME if not skill_file.exists(): return f"SKILL.md not found in {skill_dir}" try: content = skill_file.read_text(encoding="utf-8") except Exception as e: return f"Cannot read SKILL.md: {e}" if not content.strip(): return "SKILL.md is empty" # Check frontmatter if not content.startswith("---"): return "SKILL.md missing YAML frontmatter (should start with '---')" m = re.match(r"^---\n(.*?)\n---", content, re.DOTALL) if not m: return "SKILL.md has malformed YAML frontmatter (missing closing '---')" # Check for required 'name' field in frontmatter name = get_frontmatter_field(content, "name") if not name: return "SKILL.md frontmatter missing 'name' field" # Non-blocking checks: log warnings for empty auxiliary files for p in skill_dir.rglob("*"): if p.is_file() and p != skill_file: try: if p.stat().st_size == 0: logger.warning(f"Validation: empty auxiliary file: {p.relative_to(skill_dir)}") except OSError: pass return None def truncate(text: str, max_chars: int) -> str: """Truncate *text* to *max_chars* with an ellipsis marker.""" if len(text) <= max_chars: return text return text[:max_chars] + f"\n\n... [truncated at {max_chars} chars]"