File size: 7,954 Bytes
eda316b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""Video ingest cache: YouTube id → work directory + manifest on disk."""

from __future__ import annotations

import hashlib
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field

from humeo.env import default_humeo_cache_root

logger = logging.getLogger(__name__)

# Typical watch / short / embed URLs (11-char id).
_YOUTUBE_ID_RE = re.compile(
    r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/v/)([a-zA-Z0-9_-]{11})"
)

MANIFEST_VERSION = 1
MANIFEST_NAME = "video_cache_manifest.json"
LOCAL_SOURCE_INFO_NAME = "source.local.json"


class VideoCacheEntry(BaseModel):
    """One row in the global cache manifest (machine-checkable, Pydantic-only)."""

    video_id: str
    url: str = ""
    title: str = ""
    channel: str = ""
    work_dir: str
    source_mp4: str
    transcript_json: str
    downloaded_at: str = ""  # ISO 8601 UTC when ingest completed


class VideoCacheManifest(BaseModel):
    version: int = MANIFEST_VERSION
    entries: dict[str, VideoCacheEntry] = Field(default_factory=dict)


def extract_youtube_video_id(url: str) -> str | None:
    """Return the 11-character video id, or None if not a recognized YouTube URL."""
    m = _YOUTUBE_ID_RE.search(url)
    return m.group(1) if m else None


def looks_like_local_source(source: str) -> bool:
    """Return True when ``source`` should be treated as a local file path."""
    if extract_youtube_video_id(source):
        return False
    return "://" not in source


def normalize_local_source_path(source: str) -> Path | None:
    """Return an absolute local path for ``source`` when it is file-like."""
    if not looks_like_local_source(source):
        return None
    return Path(source).expanduser().resolve(strict=False)


def local_source_cache_key(source: str) -> str | None:
    """Return a stable cache key for a local source path."""
    path = normalize_local_source_path(source)
    if path is None:
        return None
    stem = re.sub(r"[^a-zA-Z0-9]+", "-", path.stem).strip("-").lower() or "video"
    digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
    return f"{stem}-{digest}"


def _local_source_info_path(work_dir: Path) -> Path:
    return work_dir / LOCAL_SOURCE_INFO_NAME


def read_local_source_info(work_dir: Path) -> dict[str, str]:
    """Read ``source.local.json`` when present."""
    path = _local_source_info_path(work_dir)
    if not path.is_file():
        return {}
    with open(path, encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, dict):
        return {}
    return {str(k): str(v) for k, v in data.items()}


def write_local_source_info(work_dir: Path, source_path: Path) -> Path:
    """Persist the original local source path used for ``source.mp4``."""
    work_dir.mkdir(parents=True, exist_ok=True)
    path = _local_source_info_path(work_dir)
    payload = {"local_source_path": str(Path(source_path).expanduser().resolve(strict=False))}
    with open(path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2)
        f.write("\n")
    return path


def local_source_matches(work_dir: Path, source: str) -> bool:
    """Return True when ``work_dir`` already contains the same local source."""
    path = normalize_local_source_path(source)
    if path is None:
        return False
    info = read_local_source_info(work_dir)
    return info.get("local_source_path") == str(path)


def manifest_path(cache_root: Path | None = None) -> Path:
    root = cache_root if cache_root is not None else default_humeo_cache_root()
    root.mkdir(parents=True, exist_ok=True)
    return root / MANIFEST_NAME


def load_manifest(cache_root: Path | None = None) -> VideoCacheManifest:
    path = manifest_path(cache_root)
    if not path.exists():
        return VideoCacheManifest()
    with open(path, encoding="utf-8") as f:
        data: Any = json.load(f)
    return VideoCacheManifest.model_validate(data)


def save_manifest(manifest: VideoCacheManifest, cache_root: Path | None = None) -> Path:
    path = manifest_path(cache_root)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        f.write(manifest.model_dump_json(indent=2))
    return path


def resolve_work_directory(

    *,

    youtube_url: str,

    explicit_work_dir: Path | None,

    use_video_cache: bool,

    cache_root: Path | None,

) -> Path:
    """Pick the directory for ``source.mp4``, ``transcript.json``, ``clips.json``, etc.



    - If ``explicit_work_dir`` is set (CLI ``--work-dir``), use it.

    - Else if video cache is disabled, use ``.humeo_work``.

    - Else if the source is a local file path, use ``<cache_root>/local/<source_key>/``.

    - Else if the source has no YouTube id, use ``.humeo_work``.

    - Else use ``<cache_root>/videos/<video_id>/`` (creates parents as needed).

    """
    if explicit_work_dir is not None:
        p = Path(explicit_work_dir).resolve()
        p.mkdir(parents=True, exist_ok=True)
        return p

    if not use_video_cache:
        p = Path(".humeo_work").resolve()
        p.mkdir(parents=True, exist_ok=True)
        return p

    local_key = local_source_cache_key(youtube_url)
    if local_key:
        root = cache_root if cache_root is not None else default_humeo_cache_root()
        p = (root / "local" / local_key).resolve()
        p.mkdir(parents=True, exist_ok=True)
        return p

    vid = extract_youtube_video_id(youtube_url)
    if not vid:
        p = Path(".humeo_work").resolve()
        p.mkdir(parents=True, exist_ok=True)
        return p

    root = cache_root if cache_root is not None else default_humeo_cache_root()
    p = (root / "videos" / vid).resolve()
    p.mkdir(parents=True, exist_ok=True)
    return p


def ingest_complete(work_dir: Path, source: str | None = None) -> bool:
    """Return True if both video and transcript exist and match the current source."""
    complete = (work_dir / "source.mp4").is_file() and (work_dir / "transcript.json").is_file()
    if not complete:
        return False
    if source is None:
        return True
    local_path = normalize_local_source_path(source)
    if local_path is None:
        return True
    return local_source_matches(work_dir, source)


def read_youtube_info_json(work_dir: Path) -> dict[str, Any]:
    """Read ``source.info.json`` written by yt-dlp ``--write-info-json``."""
    p = work_dir / "source.info.json"
    if not p.is_file():
        return {}
    with open(p, encoding="utf-8") as f:
        return json.load(f)


def upsert_manifest_from_info(

    *,

    work_dir: Path,

    youtube_url: str,

    info: dict[str, Any],

    cache_root: Path | None = None,

) -> None:
    """Merge or add a manifest entry after successful ingest."""
    vid = (info.get("id") or extract_youtube_video_id(youtube_url) or "").strip()
    if not vid:
        logger.debug("No video id for manifest; skipping.")
        return

    now = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
    wd = work_dir.resolve()
    entry = VideoCacheEntry(
        video_id=vid,
        url=str(info.get("webpage_url") or youtube_url),
        title=str(info.get("title") or ""),
        channel=str(info.get("channel") or info.get("uploader") or ""),
        work_dir=str(wd),
        source_mp4=str((wd / "source.mp4").resolve()),
        transcript_json=str((wd / "transcript.json").resolve()),
        downloaded_at=now,
    )

    manifest = load_manifest(cache_root)
    manifest.entries[vid] = entry
    path = save_manifest(manifest, cache_root)
    logger.info("Updated video cache manifest: %s", path)