""" Step 1 - Ingestion: Download video and generate word-level transcript. Responsibilities: - Download source video from YouTube using yt-dlp. - Extract audio track for transcription. - Generate word-level timestamped transcript. """ import json import logging import os import shutil import subprocess import base64 from math import ceil from pathlib import Path import httpx from humeo.video_cache import local_source_matches, write_local_source_info logger = logging.getLogger(__name__) OPENAI_MAX_UPLOAD_BYTES = 25 * 1024 * 1024 OPENAI_TARGET_UPLOAD_BYTES = 20 * 1024 * 1024 OPENAI_MIN_CHUNK_SEC = 300.0 ELEVENLABS_TRANSCRIBE_URL = "https://api.elevenlabs.io/v1/speech-to-text" TRANSCRIPT_META_FILENAME = "transcript.meta.json" ELEVENLABS_SCRIBE_MODEL = "scribe_v2" _ELEVENLABS_SEGMENT_MAX_GAP_SEC = 0.65 _ELEVENLABS_SEGMENT_MAX_DURATION_SEC = 6.0 _ELEVENLABS_SEGMENT_MAX_WORDS = 18 YTDLP_BROWSER_USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" ) def _decode_cookie_secret(raw: str) -> str: text = raw.strip() if "\\n" in text and "\n" not in text: text = text.replace("\\n", "\n") return text def _yt_dlp_cookie_file(output_dir: Path) -> Path | None: raw = ( os.environ.get("YTDLP_COOKIES") or os.environ.get("YOUTUBE_COOKIES") or "" ).strip() encoded = ( os.environ.get("YTDLP_COOKIES_B64") or os.environ.get("YOUTUBE_COOKIES_B64") or "" ).strip() if not raw and encoded: try: raw = base64.b64decode(encoded).decode("utf-8") except Exception as exc: raise RuntimeError("Could not decode YTDLP_COOKIES_B64.") from exc if not raw: return None cookie_path = output_dir / "yt-dlp-cookies.txt" cookie_path.write_text(_decode_cookie_secret(raw).rstrip() + "\n", encoding="utf-8") try: cookie_path.chmod(0o600) except OSError: pass return cookie_path def _yt_dlp_impersonate_target() -> str | None: target = (os.environ.get("YTDLP_IMPERSONATE") or "chrome").strip() if target.lower() in {"", "0", "false", "no", "off", "none"}: return None return target def _yt_dlp_ip_family_flag() -> str | None: value = (os.environ.get("YTDLP_IP_FAMILY") or "").strip().lower() if value in {"4", "ipv4"}: return "--force-ipv4" if value in {"6", "ipv6"}: return "--force-ipv6" return None def _yt_dlp_error(exc: subprocess.CalledProcessError) -> RuntimeError: stdout = (exc.stdout or "").strip() stderr = (exc.stderr or "").strip() details = stderr or stdout or str(exc) lowered = details.lower() hint = "" if any(token in lowered for token in ("sign in", "not a bot", "confirm you're not a bot", "cookies")): hint = ( "\n\nYouTube blocked the Hugging Face downloader. Add a Space secret named " "YTDLP_COOKIES_B64 containing a base64 encoded Netscape cookies.txt export " "from a logged-in browser, or upload the MP4 directly." ) elif "unexpected_eof_while_reading" in lowered or "ssl" in lowered: hint = ( "\n\nYouTube closed the TLS connection from Hugging Face. The app will use " "browser TLS impersonation when curl_cffi is installed; if this persists, " "upload the MP4 directly or add YTDLP_COOKIES_B64." ) return RuntimeError(f"yt-dlp failed to download the YouTube video:\n{details}{hint}") def stage_local_video(source: str | Path, output_dir: Path) -> Path: """ Copy a local source video into ``output_dir/source.mp4`` for cacheable reruns. """ source_path = Path(source).expanduser().resolve(strict=False) if not source_path.is_file(): raise FileNotFoundError(f"Local source video does not exist: {source_path}") output_dir.mkdir(parents=True, exist_ok=True) staged_path = output_dir / "source.mp4" staged_resolved = staged_path.resolve(strict=False) if source_path == staged_resolved: logger.info("Using local source video in place: %s", source_path) write_local_source_info(output_dir, source_path) return staged_path if staged_path.exists() and local_source_matches(output_dir, str(source_path)): logger.info("Local source already staged at: %s", staged_path) return staged_path if source_path.suffix.lower() != ".mp4": logger.warning( "Local source uses %s; staging it as source.mp4 anyway.", source_path.suffix or "", ) action = "Replacing" if staged_path.exists() else "Staging" logger.info("%s local video: %s -> %s", action, source_path, staged_path) shutil.copy2(source_path, staged_path) write_local_source_info(output_dir, source_path) return staged_path def download_video(youtube_url: str, output_dir: Path) -> Path: """ Download the best quality video+audio from YouTube. Returns the path to the downloaded MP4 file. """ output_template = str(output_dir / "source.%(ext)s") cmd = [ "yt-dlp", "--format", "bv*[ext=mp4]+ba[ext=m4a]/bv*+ba/best[ext=mp4]/best", "--merge-output-format", "mp4", "--output", output_template, "--no-playlist", "--write-info-json", "--retries", "5", "--fragment-retries", "5", "--extractor-retries", "3", "--socket-timeout", "30", "--user-agent", YTDLP_BROWSER_USER_AGENT, "--extractor-args", (os.environ.get("YTDLP_EXTRACTOR_ARGS") or "youtube:player_client=default,web_creator"), "--quiet", ] ip_family_flag = _yt_dlp_ip_family_flag() if ip_family_flag: cmd.append(ip_family_flag) impersonate_target = _yt_dlp_impersonate_target() if impersonate_target: cmd.extend(["--impersonate", impersonate_target]) if shutil.which("node"): cmd.extend(["--js-runtimes", "node", "--remote-components", "ejs:github"]) cookie_path = _yt_dlp_cookie_file(output_dir) if cookie_path is not None: cmd.extend(["--cookies", str(cookie_path)]) cmd.append(youtube_url) logger.info("Downloading video: %s", youtube_url) try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as exc: raise _yt_dlp_error(exc) from exc if result.stderr: logger.warning(result.stderr.strip()) # yt-dlp should produce source.mp4 video_path = output_dir / "source.mp4" if not video_path.exists(): # Fallback: find any mp4 in the output dir mp4_files = list(output_dir.glob("source.*")) if mp4_files: video_path = mp4_files[0] else: raise FileNotFoundError(f"Download failed - no output found in {output_dir}") logger.info("Downloaded to: %s", video_path) return video_path def extract_audio(video_path: Path, output_dir: Path) -> Path: """ Extract audio track from video as WAV (required by most ASR models). """ audio_path = output_dir / "source_audio.wav" cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-vn", # no video "-acodec", "pcm_s16le", # raw PCM "-ar", "16000", # 16kHz sample rate (standard for ASR) "-ac", "1", # mono str(audio_path), ] logger.info("Extracting audio to: %s", audio_path) subprocess.run(cmd, check=True, capture_output=True) return audio_path def _resolve_elevenlabs_api_key() -> str: key = (os.environ.get("ELEVENLABS_API_KEY") or "").strip() if key: return key raise ValueError("Set ELEVENLABS_API_KEY to use ElevenLabs Scribe v2 transcription.") def _elevenlabs_no_verbatim_enabled() -> bool: raw = (os.environ.get("ELEVENLABS_NO_VERBATIM") or "true").strip().lower() return raw not in {"0", "false", "no", "off"} def resolved_transcribe_settings() -> dict[str, object]: provider = (os.environ.get("HUMEO_TRANSCRIBE_PROVIDER") or "elevenlabs").strip().lower() if provider in ("", "auto"): if (os.environ.get("ELEVENLABS_API_KEY") or "").strip(): provider = "elevenlabs" else: provider = "openai" if provider in ("api",): provider = "openai" if provider in ("local",): provider = "whisperx" settings: dict[str, object] = {"provider": provider} if provider == "elevenlabs": settings.update( { "model_id": ELEVENLABS_SCRIBE_MODEL, "no_verbatim": _elevenlabs_no_verbatim_enabled(), } ) return settings def transcript_cache_valid(output_dir: Path) -> bool: transcript_path = output_dir / "transcript.json" meta_path = output_dir / TRANSCRIPT_META_FILENAME if not transcript_path.is_file() or not meta_path.is_file(): return False try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return False return meta == resolved_transcribe_settings() def _write_transcript(output_dir: Path, transcript: dict) -> None: transcript_path = output_dir / "transcript.json" with open(transcript_path, "w", encoding="utf-8") as f: json.dump(transcript, f, indent=2, ensure_ascii=False) with open(output_dir / TRANSCRIPT_META_FILENAME, "w", encoding="utf-8") as f: json.dump(resolved_transcribe_settings(), f, indent=2, ensure_ascii=False) f.write("\n") def _normalize_elevenlabs_word(raw_word: dict) -> dict | None: if not isinstance(raw_word, dict): return None if str(raw_word.get("type", "word")).strip().lower() not in {"word", ""}: return None text = str(raw_word.get("text", raw_word.get("word", ""))).strip() if not text: return None try: start = float(raw_word["start"]) end = float(raw_word["end"]) except (KeyError, TypeError, ValueError): return None if end <= start: return None return {"word": text, "start": start, "end": end} def _segment_words_into_transcript(words: list[dict], *, language: str) -> dict: segments: list[dict] = [] chunk: list[dict] = [] def flush() -> None: if not chunk: return segments.append( { "start": chunk[0]["start"], "end": chunk[-1]["end"], "text": " ".join(str(word["word"]) for word in chunk).strip(), "words": list(chunk), } ) chunk.clear() for word in words: if chunk: gap = float(word["start"]) - float(chunk[-1]["end"]) dur = float(word["end"]) - float(chunk[0]["start"]) if ( gap >= _ELEVENLABS_SEGMENT_MAX_GAP_SEC or dur >= _ELEVENLABS_SEGMENT_MAX_DURATION_SEC or len(chunk) >= _ELEVENLABS_SEGMENT_MAX_WORDS ): flush() chunk.append(word) flush() return {"segments": segments, "language": language} def _normalize_elevenlabs_response(data: dict) -> dict: words = [ word for raw_word in data.get("words", []) or [] if (word := _normalize_elevenlabs_word(raw_word)) is not None ] language = str( data.get("language_code") or data.get("language") or "en" ).strip() or "en" return _segment_words_into_transcript(words, language=language) def _transcribe_elevenlabs_scribe(audio_path: Path) -> dict: headers = {"xi-api-key": _resolve_elevenlabs_api_key()} form = { "model_id": ELEVENLABS_SCRIBE_MODEL, "timestamps_granularity": "word", "diarize": "false", "tag_audio_events": "false", "file_format": "pcm_s16le_16", "no_verbatim": "true" if _elevenlabs_no_verbatim_enabled() else "false", } with audio_path.open("rb") as handle: files = {"file": (audio_path.name, handle, "audio/wav")} response = httpx.post( ELEVENLABS_TRANSCRIBE_URL, headers=headers, data=form, files=files, timeout=600.0, ) response.raise_for_status() return _normalize_elevenlabs_response(response.json()) def _transcribe_whisperx_local(audio_path: Path) -> dict: """Word-level transcript via WhisperX (local). Raises ImportError if not installed.""" import whisperx logger.info("Transcribing with WhisperX...") device = "cpu" # Use "cuda" if GPU available model = whisperx.load_model("base", device=device, compute_type="int8") audio = whisperx.load_audio(str(audio_path)) result = model.transcribe(audio, batch_size=16) align_model, metadata = whisperx.load_align_model( language_code=result["language"], device=device ) result = whisperx.align( result["segments"], align_model, metadata, audio, device, return_char_alignments=False, ) logger.info("Transcription complete: %d segments", len(result["segments"])) return result def transcribe_whisperx(audio_path: Path, output_dir: Path) -> dict: """ Transcribe audio for word-level timestamps. Provider is controlled by **HUMEO_TRANSCRIBE_PROVIDER** (default ``auto``): - ``auto`` — WhisperX if installed, else OpenAI Whisper API. - ``openai`` / ``api`` — OpenAI Whisper API (uses ``OPENAI_API_KEY``), even when WhisperX is installed. - ``whisperx`` / ``local`` — WhisperX only; fails clearly if not installed. The result is written to ``output_dir / "transcript.json"``. Re-runs with an existing transcript are skipped by the pipeline before this function runs. """ settings = resolved_transcribe_settings() provider = str(settings["provider"]) if provider == "elevenlabs": logger.info( "Transcribing with ElevenLabs Scribe v2 (no_verbatim=%s).", bool(settings.get("no_verbatim", False)), ) result = _transcribe_elevenlabs_scribe(audio_path) elif provider == "openai": logger.info( "Transcribing with OpenAI Whisper API (HUMEO_TRANSCRIBE_PROVIDER=%s).", provider, ) result = _transcribe_openai_api(audio_path) elif provider == "whisperx": try: result = _transcribe_whisperx_local(audio_path) except ImportError as e: raise RuntimeError( "WhisperX requested (HUMEO_TRANSCRIBE_PROVIDER=whisperx) but whisperx is not installed. " "Install with: uv sync --extra whisper" ) from e else: raise RuntimeError( f"Unknown HUMEO_TRANSCRIBE_PROVIDER={provider!r}. " "Use elevenlabs, openai, or whisperx." ) _write_transcript(output_dir, result) return result def _transcribe_openai_api(audio_path: Path) -> dict: """ Fallback transcription using OpenAI's Whisper API. Requires OPENAI_API_KEY environment variable. """ from openai import OpenAI client = OpenAI() work_dir = audio_path.parent / "openai_transcribe" work_dir.mkdir(parents=True, exist_ok=True) duration_sec = _probe_media_duration(audio_path) chunk_ranges = _plan_openai_chunk_ranges( duration_sec=duration_sec, file_size_bytes=audio_path.stat().st_size, ) if len(chunk_ranges) == 1: return _transcribe_openai_file(client, audio_path) logger.info("Audio exceeds OpenAI upload limit; transcribing in %d chunks.", len(chunk_ranges)) chunk_transcripts: list[dict] = [] for idx, (offset_sec, chunk_duration_sec) in enumerate(chunk_ranges, start=1): chunk_path = work_dir / f"{audio_path.stem}_part_{idx:03d}.wav" if not chunk_path.exists(): _extract_openai_audio_chunk( input_path=audio_path, output_path=chunk_path, offset_sec=offset_sec, duration_sec=chunk_duration_sec, ) logger.info( "Transcribing chunk %d/%d (%.1fs-%.1fs)", idx, len(chunk_ranges), offset_sec, offset_sec + chunk_duration_sec, ) chunk_transcript = _transcribe_openai_file(client, chunk_path) chunk_transcripts.append(_offset_transcript_timestamps(chunk_transcript, offset_sec)) return _merge_transcripts(chunk_transcripts) def _extract_openai_audio_chunk( input_path: Path, output_path: Path, offset_sec: float, duration_sec: float, ) -> Path: cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-ss", f"{offset_sec:.3f}", "-t", f"{duration_sec:.3f}", "-i", str(input_path), "-vn", "-acodec", "pcm_s16le", "-ac", "1", "-ar", "16000", str(output_path), ] subprocess.run(cmd, check=True, capture_output=True) return output_path def _probe_media_duration(media_path: Path) -> float: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", str(media_path), ] result = subprocess.run(cmd, check=True, capture_output=True, text=True) data = json.loads(result.stdout) return float(data["format"]["duration"]) def _plan_openai_chunk_ranges( *, duration_sec: float, file_size_bytes: int, max_upload_bytes: int = OPENAI_MAX_UPLOAD_BYTES, target_upload_bytes: int = OPENAI_TARGET_UPLOAD_BYTES, ) -> list[tuple[float, float]]: if file_size_bytes <= max_upload_bytes: return [(0.0, duration_sec)] chunk_sec = max( OPENAI_MIN_CHUNK_SEC, duration_sec * (target_upload_bytes / file_size_bytes), ) chunk_count = max(2, ceil(duration_sec / chunk_sec)) exact_chunk_sec = duration_sec / chunk_count ranges: list[tuple[float, float]] = [] for idx in range(chunk_count): start = idx * exact_chunk_sec end = min(duration_sec, (idx + 1) * exact_chunk_sec) ranges.append((round(start, 3), round(end - start, 3))) return ranges def _transcribe_openai_file(client, audio_path: Path) -> dict: with open(audio_path, "rb") as f: response = client.audio.transcriptions.create( model="whisper-1", file=f, response_format="verbose_json", timestamp_granularities=["word", "segment"], ) return _normalize_openai_response(response) def _normalize_openai_response(response: object) -> dict: data = response.model_dump() if hasattr(response, "model_dump") else response if not isinstance(data, dict): raise TypeError(f"Unexpected transcription payload type: {type(data)!r}") top_words = [_normalize_word(word) for word in data.get("words", []) or []] segments: list[dict] = [] word_index = 0 for raw_segment in data.get("segments", []) or []: segment = raw_segment.model_dump() if hasattr(raw_segment, "model_dump") else raw_segment if not isinstance(segment, dict): continue start = float(segment.get("start", 0.0)) end = float(segment.get("end", 0.0)) text = str(segment.get("text", "")).strip() segment_words = [_normalize_word(word) for word in segment.get("words", []) or []] if not segment_words and top_words: while word_index < len(top_words) and top_words[word_index]["end"] <= start: word_index += 1 probe_index = word_index while probe_index < len(top_words) and top_words[probe_index]["start"] < end: word = top_words[probe_index] if word["end"] > start: segment_words.append(word) probe_index += 1 word_index = probe_index segments.append( { "start": start, "end": end, "text": text, "words": segment_words, } ) if not segments and top_words: segments.append( { "start": top_words[0]["start"], "end": top_words[-1]["end"], "text": " ".join(word["word"] for word in top_words).strip(), "words": top_words, } ) return { "segments": segments, "language": str(data.get("language", "en") or "en"), } def _normalize_word(raw_word: object) -> dict: word = raw_word.model_dump() if hasattr(raw_word, "model_dump") else raw_word if not isinstance(word, dict): return {"word": "", "start": 0.0, "end": 0.0} return { "word": str(word.get("word", "")).strip(), "start": float(word.get("start", 0.0)), "end": float(word.get("end", 0.0)), } def _offset_transcript_timestamps(transcript: dict, offset_sec: float) -> dict: shifted_segments = [] for segment in transcript.get("segments", []): shifted_segments.append( { "start": float(segment["start"]) + offset_sec, "end": float(segment["end"]) + offset_sec, "text": segment["text"], "words": [ { "word": word["word"], "start": float(word["start"]) + offset_sec, "end": float(word["end"]) + offset_sec, } for word in segment.get("words", []) ], } ) return { "segments": shifted_segments, "language": transcript.get("language", "en"), } def _merge_transcripts(transcripts: list[dict]) -> dict: merged_segments = [] language = "en" for transcript in transcripts: merged_segments.extend(transcript.get("segments", [])) if transcript.get("language"): language = transcript["language"] return { "segments": merged_segments, "language": language, }