Spaces:
Sleeping
Sleeping
File size: 6,208 Bytes
eda316b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | """Parse and retrieve viral hook examples from a local zip or directory."""
from __future__ import annotations
import hashlib
import os
import re
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
from humeo.config import PipelineConfig
_ENTRY_RE = re.compile(
r"^\s*\d+\.\s*Hook:\s*(?P<hook>.+?)<br>Example:\s*(?P<example>.+?)<br>Psychology:\s*(?P<psychology>.+?)\s*$",
re.IGNORECASE,
)
_TOKEN_RE = re.compile(r"[a-z0-9']+")
@dataclass(frozen=True)
class HookExample:
category: str
hook: str
example: str
psychology: str
_LIB_CACHE: dict[str, list[HookExample]] = {}
def resolve_hook_library_path(config: PipelineConfig | None = None) -> Path | None:
if config is not None and config.hook_library_path is not None:
return Path(config.hook_library_path)
raw = (os.environ.get("HUMEO_HOOK_LIBRARY_PATH") or "").strip()
if raw:
return Path(raw).expanduser()
return None
def require_hook_library_path(config: PipelineConfig | None = None) -> Path:
path = resolve_hook_library_path(config)
if path is None:
raise FileNotFoundError(
"HUMEO_HOOK_LIBRARY_PATH is required for the hook retrieval workflow."
)
if not path.exists():
raise FileNotFoundError(f"Hook library path does not exist: {path}")
return path
def hook_library_fingerprint(path: Path | None) -> str:
if path is None:
return ""
if not path.exists():
return ""
hasher = hashlib.sha256()
if path.is_file():
hasher.update(path.read_bytes())
return hasher.hexdigest()
for md_path in sorted(p for p in path.rglob("*.md") if p.is_file()):
hasher.update(str(md_path.relative_to(path)).encode("utf-8"))
hasher.update(md_path.read_bytes())
return hasher.hexdigest()
def _tokenize(text: str) -> set[str]:
return {m.group(0) for m in _TOKEN_RE.finditer(text.lower()) if len(m.group(0)) > 2}
def _ordered_tokens(text: str) -> list[str]:
return [m.group(0) for m in _TOKEN_RE.finditer(text.lower()) if len(m.group(0)) > 2]
def _iter_markdown_files(path: Path) -> Iterable[tuple[str, str]]:
if path.is_file():
with zipfile.ZipFile(path) as zf:
for name in sorted(n for n in zf.namelist() if n.endswith(".md")):
yield name, zf.read(name).decode("utf-8", errors="replace")
return
for md_path in sorted(p for p in path.rglob("*.md") if p.is_file()):
yield str(md_path.relative_to(path)).replace("\\", "/"), md_path.read_text(
encoding="utf-8", errors="replace"
)
def _category_from_name(name: str) -> str:
stem = Path(name).stem
stem = stem.replace("_Hooks", "").replace("_", " ").strip()
return stem
def _parse_examples(path: Path) -> list[HookExample]:
examples: list[HookExample] = []
for name, content in _iter_markdown_files(path):
category = _category_from_name(name)
for raw_line in content.splitlines():
line = raw_line.strip()
if not line or not line[0].isdigit():
continue
match = _ENTRY_RE.match(line)
if not match:
continue
examples.append(
HookExample(
category=category,
hook=match.group("hook").strip(),
example=match.group("example").strip(),
psychology=match.group("psychology").strip(),
)
)
return examples
def load_hook_library(path: Path | None) -> list[HookExample]:
if path is None:
return []
fingerprint = hook_library_fingerprint(path)
if not fingerprint:
return []
cached = _LIB_CACHE.get(fingerprint)
if cached is not None:
return cached
parsed = _parse_examples(path)
_LIB_CACHE[fingerprint] = parsed
return parsed
def retrieve_hook_examples(
query_text: str,
*,
topic: str = "",
path: Path | None,
limit: int = 8,
) -> list[HookExample]:
items = load_hook_library(path)
if not items:
return []
query_tokens = _tokenize(f"{topic} {query_text}")
query_phrases = [
" ".join(pair)
for pair in zip(_ordered_tokens(f"{topic} {query_text}"), _ordered_tokens(f"{topic} {query_text}")[1:])
]
if not query_tokens:
return items[:limit]
scored: list[tuple[tuple[int, int, int], HookExample]] = []
for item in items:
hook_tokens = _tokenize(item.hook)
example_tokens = _tokenize(item.example)
category_tokens = _tokenize(item.category)
hook_overlap = len(query_tokens & hook_tokens)
example_overlap = len(query_tokens & example_tokens)
category_overlap = len(query_tokens & category_tokens)
overlap = hook_overlap + example_overlap + category_overlap
if overlap == 0:
continue
psychology_overlap = len(query_tokens & _tokenize(item.psychology))
phrase_bonus = sum(1 for phrase in query_phrases if phrase in item.example.lower())
scored.append(
(
(
phrase_bonus * 5 + example_overlap * 3 + hook_overlap + category_overlap,
phrase_bonus,
example_overlap,
category_overlap + psychology_overlap,
),
item,
)
)
if not scored:
return items[:limit]
scored.sort(key=lambda pair: pair[0], reverse=True)
return [item for _, item in scored[:limit]]
def format_hook_examples(examples: list[HookExample]) -> str:
if not examples:
return ""
lines: list[str] = []
for idx, item in enumerate(examples, start=1):
lines.append(
f"{idx}. [{item.category}] Hook: {item.hook}\n"
f" Example: {item.example}\n"
f" Psychology: {item.psychology}"
)
return "\n".join(lines)
|