"""Parse and retrieve viral hook examples from a local zip or directory.""" from __future__ import annotations import hashlib import os import re import zipfile from dataclasses import dataclass from pathlib import Path from typing import Iterable from humeo.config import PipelineConfig _ENTRY_RE = re.compile( r"^\s*\d+\.\s*Hook:\s*(?P.+?)
Example:\s*(?P.+?)
Psychology:\s*(?P.+?)\s*$", re.IGNORECASE, ) _TOKEN_RE = re.compile(r"[a-z0-9']+") @dataclass(frozen=True) class HookExample: category: str hook: str example: str psychology: str _LIB_CACHE: dict[str, list[HookExample]] = {} def resolve_hook_library_path(config: PipelineConfig | None = None) -> Path | None: if config is not None and config.hook_library_path is not None: return Path(config.hook_library_path) raw = (os.environ.get("HUMEO_HOOK_LIBRARY_PATH") or "").strip() if raw: return Path(raw).expanduser() return None def require_hook_library_path(config: PipelineConfig | None = None) -> Path: path = resolve_hook_library_path(config) if path is None: raise FileNotFoundError( "HUMEO_HOOK_LIBRARY_PATH is required for the hook retrieval workflow." ) if not path.exists(): raise FileNotFoundError(f"Hook library path does not exist: {path}") return path def hook_library_fingerprint(path: Path | None) -> str: if path is None: return "" if not path.exists(): return "" hasher = hashlib.sha256() if path.is_file(): hasher.update(path.read_bytes()) return hasher.hexdigest() for md_path in sorted(p for p in path.rglob("*.md") if p.is_file()): hasher.update(str(md_path.relative_to(path)).encode("utf-8")) hasher.update(md_path.read_bytes()) return hasher.hexdigest() def _tokenize(text: str) -> set[str]: return {m.group(0) for m in _TOKEN_RE.finditer(text.lower()) if len(m.group(0)) > 2} def _ordered_tokens(text: str) -> list[str]: return [m.group(0) for m in _TOKEN_RE.finditer(text.lower()) if len(m.group(0)) > 2] def _iter_markdown_files(path: Path) -> Iterable[tuple[str, str]]: if path.is_file(): with zipfile.ZipFile(path) as zf: for name in sorted(n for n in zf.namelist() if n.endswith(".md")): yield name, zf.read(name).decode("utf-8", errors="replace") return for md_path in sorted(p for p in path.rglob("*.md") if p.is_file()): yield str(md_path.relative_to(path)).replace("\\", "/"), md_path.read_text( encoding="utf-8", errors="replace" ) def _category_from_name(name: str) -> str: stem = Path(name).stem stem = stem.replace("_Hooks", "").replace("_", " ").strip() return stem def _parse_examples(path: Path) -> list[HookExample]: examples: list[HookExample] = [] for name, content in _iter_markdown_files(path): category = _category_from_name(name) for raw_line in content.splitlines(): line = raw_line.strip() if not line or not line[0].isdigit(): continue match = _ENTRY_RE.match(line) if not match: continue examples.append( HookExample( category=category, hook=match.group("hook").strip(), example=match.group("example").strip(), psychology=match.group("psychology").strip(), ) ) return examples def load_hook_library(path: Path | None) -> list[HookExample]: if path is None: return [] fingerprint = hook_library_fingerprint(path) if not fingerprint: return [] cached = _LIB_CACHE.get(fingerprint) if cached is not None: return cached parsed = _parse_examples(path) _LIB_CACHE[fingerprint] = parsed return parsed def retrieve_hook_examples( query_text: str, *, topic: str = "", path: Path | None, limit: int = 8, ) -> list[HookExample]: items = load_hook_library(path) if not items: return [] query_tokens = _tokenize(f"{topic} {query_text}") query_phrases = [ " ".join(pair) for pair in zip(_ordered_tokens(f"{topic} {query_text}"), _ordered_tokens(f"{topic} {query_text}")[1:]) ] if not query_tokens: return items[:limit] scored: list[tuple[tuple[int, int, int], HookExample]] = [] for item in items: hook_tokens = _tokenize(item.hook) example_tokens = _tokenize(item.example) category_tokens = _tokenize(item.category) hook_overlap = len(query_tokens & hook_tokens) example_overlap = len(query_tokens & example_tokens) category_overlap = len(query_tokens & category_tokens) overlap = hook_overlap + example_overlap + category_overlap if overlap == 0: continue psychology_overlap = len(query_tokens & _tokenize(item.psychology)) phrase_bonus = sum(1 for phrase in query_phrases if phrase in item.example.lower()) scored.append( ( ( phrase_bonus * 5 + example_overlap * 3 + hook_overlap + category_overlap, phrase_bonus, example_overlap, category_overlap + psychology_overlap, ), item, ) ) if not scored: return items[:limit] scored.sort(key=lambda pair: pair[0], reverse=True) return [item for _, item in scored[:limit]] def format_hook_examples(examples: list[HookExample]) -> str: if not examples: return "" lines: list[str] = [] for idx, item in enumerate(examples, start=1): lines.append( f"{idx}. [{item.category}] Hook: {item.hook}\n" f" Example: {item.example}\n" f" Psychology: {item.psychology}" ) return "\n".join(lines)