""" Subtitle generation: sidecar files (.srt/.vtt/.txt) and burn-in MP4. Reuses steps.s2_transcribe.transcribe and steps.s3_translate.translate as libraries. ffmpeg burn-in goes through subprocess (matches existing s5_sync pattern but without sharing code, since the styling needs are different). """ from __future__ import annotations import subprocess from pathlib import Path from typing import Literal from steps.s2_transcribe import transcribe from steps.s3_translate import translate Format = Literal["srt", "vtt", "txt", "mp4"] CaptionStyle = Literal["tiktok", "youtube", "minimal"] Position = Literal["top", "middle", "bottom"] HAlign = Literal["left", "center", "right"] # Bounds for user-adjustable knobs. Backend clamps to these regardless of # what the client sends. FONT_SIZE_MIN = 12 FONT_SIZE_MAX = 40 MARGIN_V_MIN = 0 MARGIN_V_MAX = 240 # ISO-style short codes Whisper accepts. Names map to UI dropdown labels. _LANG_CODE = { "Auto-detect": "auto", "English": "en", "Spanish": "es", "French": "fr", "German": "de", "Portuguese": "pt", "Italian": "it", "Hindi": "hi", "Arabic": "ar", "Chinese": "zh", "Japanese": "ja", "Korean": "ko", "Russian": "ru", } def _is_video(path: Path) -> bool: return path.suffix.lower() in {".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"} def _extract_audio(input_path: Path, out_dir: Path) -> Path: """Pull a 16kHz mono WAV from the input — what whisper expects.""" audio_path = out_dir / "audio.wav" cmd = [ "ffmpeg", "-y", "-i", str(input_path), "-vn", "-ac", "1", "-ar", "16000", "-acodec", "pcm_s16le", str(audio_path), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) if result.returncode != 0: raise RuntimeError(f"ffmpeg audio extract failed: {result.stderr[-300:]}") return audio_path def _resolve_lang(name: str) -> str: return _LANG_CODE.get(name, "auto") # ── Caption format writers ───────────────────────────────────────────── def _seg_text(seg: dict, prefer_translation: bool) -> str: if prefer_translation: return (seg.get("translated_text") or seg.get("text") or "").strip() return (seg.get("text") or "").strip() def _format_timestamp_srt(t: float) -> str: h = int(t // 3600) m = int((t % 3600) // 60) s = int(t % 60) ms = int(round((t - int(t)) * 1000)) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def _format_timestamp_vtt(t: float) -> str: return _format_timestamp_srt(t).replace(",", ".") def write_srt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path: lines = [] for i, seg in enumerate(segments, 1): text = _seg_text(seg, prefer_translation) if not text: continue lines.append(str(i)) lines.append(f"{_format_timestamp_srt(seg['start'])} --> {_format_timestamp_srt(seg['end'])}") lines.append(text) lines.append("") dest.write_text("\n".join(lines), encoding="utf-8") return dest def write_vtt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path: lines = ["WEBVTT", ""] for seg in segments: text = _seg_text(seg, prefer_translation) if not text: continue lines.append(f"{_format_timestamp_vtt(seg['start'])} --> {_format_timestamp_vtt(seg['end'])}") lines.append(text) lines.append("") dest.write_text("\n".join(lines), encoding="utf-8") return dest def write_txt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path: text = " ".join(_seg_text(s, prefer_translation) for s in segments if _seg_text(s, prefer_translation)) dest.write_text(text, encoding="utf-8") return dest # ── Burn-in styling ──────────────────────────────────────────────────── # ASS-format alignment codes (libass), arranged as row + column: # row: bottom=0, middle=3, top=6 # col: left=1, center=2, right=3 # So bottom-left=1, bottom-center=2, ..., top-right=9. _POSITION_ROW = {"bottom": 0, "middle": 3, "top": 6} _HALIGN_COL = {"left": 1, "center": 2, "right": 3} _DEFAULT_MARGIN_V = {"bottom": 60, "middle": 0, "top": 60} # Per-style baseline — font size, stroke/shadow choices. The user can override # the font size via the slider; everything else stays tied to the style preset. _STYLE_DEFAULTS: dict[CaptionStyle, dict] = { "tiktok": {"font_size": 22, "bold": 1, "border_style": 1, "outline": 3, "shadow": 1}, "youtube": {"font_size": 18, "bold": 0, "border_style": 4, "outline": 8, "shadow": 0}, "minimal": {"font_size": 16, "bold": 0, "border_style": 1, "outline": 1, "shadow": 0}, } def _clamp(value: int, lo: int, hi: int) -> int: return max(lo, min(hi, value)) def _force_style_for( style: CaptionStyle, position: Position, h_align: HAlign = "center", font_size: int | None = None, margin_v: int | None = None, ) -> str: """Return an ffmpeg `subtitles=...:force_style='...'` string. Args: style: Visual preset — sets weight, stroke, shadow defaults. position: top / middle / bottom row. h_align: left / center / right column. font_size: Override the style's default font size (clamped to FONT_SIZE_MIN..MAX). margin_v: Override vertical margin in pixels (clamped to MARGIN_V_MIN..MAX). """ defaults = _STYLE_DEFAULTS[style] fs = _clamp(font_size if font_size is not None else defaults["font_size"], FONT_SIZE_MIN, FONT_SIZE_MAX) mv = _clamp(margin_v if margin_v is not None else _DEFAULT_MARGIN_V[position], MARGIN_V_MIN, MARGIN_V_MAX) align = _POSITION_ROW[position] + _HALIGN_COL[h_align] parts = [ "FontName=Arial", f"FontSize={fs}", f"Bold={defaults['bold']}", "PrimaryColour=&H00FFFFFF", ] if style == "youtube": # White on translucent black box parts.append("BackColour=&HB8000000") elif style == "minimal": # Subtle semi-transparent stroke instead of hard black parts.append("OutlineColour=&H80000000") else: # tiktok — hard black stroke parts.append("OutlineColour=&H00000000") parts += [ f"BorderStyle={defaults['border_style']}", f"Outline={defaults['outline']}", f"Shadow={defaults['shadow']}", f"Alignment={align}", f"MarginV={mv}", # Symmetric horizontal margins so left/right alignment has breathing room "MarginL=40", "MarginR=40", ] return ",".join(parts) def _burn_in( video_path: Path, srt_path: Path, dest: Path, style: CaptionStyle, position: Position, h_align: HAlign = "center", font_size: int | None = None, margin_v: int | None = None, ) -> Path: """Render captions into the video pixels via ffmpeg + libass.""" force_style = _force_style_for(style, position, h_align, font_size, margin_v) # Escape path for ffmpeg subtitle filter (single quotes around path, # and we replace any existing single quotes since they'd break the filter). srt_str = str(srt_path).replace("'", r"\'").replace(":", r"\:") vf = f"subtitles='{srt_str}':force_style='{force_style}'" cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-vf", vf, "-c:a", "copy", "-c:v", "libx264", "-preset", "veryfast", "-crf", "22", str(dest), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) if result.returncode != 0: raise RuntimeError(f"ffmpeg burn-in failed: {result.stderr[-300:]}") return dest # ── Public entry point ──────────────────────────────────────────────── def generate_subtitles( *, input_path: Path, out_dir: Path, source_lang_name: str, target_lang_name: str, fmt: Format, style: CaptionStyle = "tiktok", position: Position = "bottom", h_align: HAlign = "center", font_size: int | None = None, margin_v: int | None = None, ) -> dict: """ Run the full subtitle pipeline. Returns: { "format": "srt" | "vtt" | "txt" | "mp4", "filename": , "segments": , "translated": , } """ is_burn = fmt == "mp4" if is_burn and not _is_video(input_path): raise ValueError("Burn-in requires a video file.") # 1. Extract audio (or use as-is) if _is_video(input_path): audio_path = _extract_audio(input_path, out_dir) else: audio_path = input_path # 2. Transcribe src_code = _resolve_lang(source_lang_name) segments = transcribe(str(audio_path), language=src_code) if not segments: raise RuntimeError("Transcription produced no segments.") # 3. Translate if requested translated = False same_as_source = ( target_lang_name == "Same as source" or target_lang_name.lower() == source_lang_name.lower() ) if not same_as_source: segments = translate(segments, target_lang_name) translated = True # 4. Emit if fmt == "srt": out = write_srt(segments, out_dir / "captions.srt", translated) elif fmt == "vtt": out = write_vtt(segments, out_dir / "captions.vtt", translated) elif fmt == "txt": out = write_txt(segments, out_dir / "transcript.txt", translated) else: # mp4 srt_path = write_srt(segments, out_dir / "captions.srt", translated) out = _burn_in( input_path, srt_path, out_dir / "captioned.mp4", style, position, h_align, font_size, margin_v, ) return { "format": fmt, "filename": out.name, "segments": len(segments), "translated": translated, }