"""Video ingest cache: YouTube id → work directory + manifest on disk.""" from __future__ import annotations import hashlib import json import logging import re from datetime import datetime, timezone from pathlib import Path from typing import Any from pydantic import BaseModel, Field from humeo.env import default_humeo_cache_root logger = logging.getLogger(__name__) # Typical watch / short / embed URLs (11-char id). _YOUTUBE_ID_RE = re.compile( r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/v/)([a-zA-Z0-9_-]{11})" ) MANIFEST_VERSION = 1 MANIFEST_NAME = "video_cache_manifest.json" LOCAL_SOURCE_INFO_NAME = "source.local.json" class VideoCacheEntry(BaseModel): """One row in the global cache manifest (machine-checkable, Pydantic-only).""" video_id: str url: str = "" title: str = "" channel: str = "" work_dir: str source_mp4: str transcript_json: str downloaded_at: str = "" # ISO 8601 UTC when ingest completed class VideoCacheManifest(BaseModel): version: int = MANIFEST_VERSION entries: dict[str, VideoCacheEntry] = Field(default_factory=dict) def extract_youtube_video_id(url: str) -> str | None: """Return the 11-character video id, or None if not a recognized YouTube URL.""" m = _YOUTUBE_ID_RE.search(url) return m.group(1) if m else None def looks_like_local_source(source: str) -> bool: """Return True when ``source`` should be treated as a local file path.""" if extract_youtube_video_id(source): return False return "://" not in source def normalize_local_source_path(source: str) -> Path | None: """Return an absolute local path for ``source`` when it is file-like.""" if not looks_like_local_source(source): return None return Path(source).expanduser().resolve(strict=False) def local_source_cache_key(source: str) -> str | None: """Return a stable cache key for a local source path.""" path = normalize_local_source_path(source) if path is None: return None stem = re.sub(r"[^a-zA-Z0-9]+", "-", path.stem).strip("-").lower() or "video" digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16] return f"{stem}-{digest}" def _local_source_info_path(work_dir: Path) -> Path: return work_dir / LOCAL_SOURCE_INFO_NAME def read_local_source_info(work_dir: Path) -> dict[str, str]: """Read ``source.local.json`` when present.""" path = _local_source_info_path(work_dir) if not path.is_file(): return {} with open(path, encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict): return {} return {str(k): str(v) for k, v in data.items()} def write_local_source_info(work_dir: Path, source_path: Path) -> Path: """Persist the original local source path used for ``source.mp4``.""" work_dir.mkdir(parents=True, exist_ok=True) path = _local_source_info_path(work_dir) payload = {"local_source_path": str(Path(source_path).expanduser().resolve(strict=False))} with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2) f.write("\n") return path def local_source_matches(work_dir: Path, source: str) -> bool: """Return True when ``work_dir`` already contains the same local source.""" path = normalize_local_source_path(source) if path is None: return False info = read_local_source_info(work_dir) return info.get("local_source_path") == str(path) def manifest_path(cache_root: Path | None = None) -> Path: root = cache_root if cache_root is not None else default_humeo_cache_root() root.mkdir(parents=True, exist_ok=True) return root / MANIFEST_NAME def load_manifest(cache_root: Path | None = None) -> VideoCacheManifest: path = manifest_path(cache_root) if not path.exists(): return VideoCacheManifest() with open(path, encoding="utf-8") as f: data: Any = json.load(f) return VideoCacheManifest.model_validate(data) def save_manifest(manifest: VideoCacheManifest, cache_root: Path | None = None) -> Path: path = manifest_path(cache_root) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(manifest.model_dump_json(indent=2)) return path def resolve_work_directory( *, youtube_url: str, explicit_work_dir: Path | None, use_video_cache: bool, cache_root: Path | None, ) -> Path: """Pick the directory for ``source.mp4``, ``transcript.json``, ``clips.json``, etc. - If ``explicit_work_dir`` is set (CLI ``--work-dir``), use it. - Else if video cache is disabled, use ``.humeo_work``. - Else if the source is a local file path, use ``/local//``. - Else if the source has no YouTube id, use ``.humeo_work``. - Else use ``/videos//`` (creates parents as needed). """ if explicit_work_dir is not None: p = Path(explicit_work_dir).resolve() p.mkdir(parents=True, exist_ok=True) return p if not use_video_cache: p = Path(".humeo_work").resolve() p.mkdir(parents=True, exist_ok=True) return p local_key = local_source_cache_key(youtube_url) if local_key: root = cache_root if cache_root is not None else default_humeo_cache_root() p = (root / "local" / local_key).resolve() p.mkdir(parents=True, exist_ok=True) return p vid = extract_youtube_video_id(youtube_url) if not vid: p = Path(".humeo_work").resolve() p.mkdir(parents=True, exist_ok=True) return p root = cache_root if cache_root is not None else default_humeo_cache_root() p = (root / "videos" / vid).resolve() p.mkdir(parents=True, exist_ok=True) return p def ingest_complete(work_dir: Path, source: str | None = None) -> bool: """Return True if both video and transcript exist and match the current source.""" complete = (work_dir / "source.mp4").is_file() and (work_dir / "transcript.json").is_file() if not complete: return False if source is None: return True local_path = normalize_local_source_path(source) if local_path is None: return True return local_source_matches(work_dir, source) def read_youtube_info_json(work_dir: Path) -> dict[str, Any]: """Read ``source.info.json`` written by yt-dlp ``--write-info-json``.""" p = work_dir / "source.info.json" if not p.is_file(): return {} with open(p, encoding="utf-8") as f: return json.load(f) def upsert_manifest_from_info( *, work_dir: Path, youtube_url: str, info: dict[str, Any], cache_root: Path | None = None, ) -> None: """Merge or add a manifest entry after successful ingest.""" vid = (info.get("id") or extract_youtube_video_id(youtube_url) or "").strip() if not vid: logger.debug("No video id for manifest; skipping.") return now = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") wd = work_dir.resolve() entry = VideoCacheEntry( video_id=vid, url=str(info.get("webpage_url") or youtube_url), title=str(info.get("title") or ""), channel=str(info.get("channel") or info.get("uploader") or ""), work_dir=str(wd), source_mp4=str((wd / "source.mp4").resolve()), transcript_json=str((wd / "transcript.json").resolve()), downloaded_at=now, ) manifest = load_manifest(cache_root) manifest.entries[vid] = entry path = save_manifest(manifest, cache_root) logger.info("Updated video cache manifest: %s", path)