Spaces:
Running on Zero
Running on Zero
| """ | |
| Subtitle generation: sidecar files (.srt/.vtt/.txt) and burn-in MP4. | |
| Reuses steps.s2_transcribe.transcribe and steps.s3_translate.translate as | |
| libraries. ffmpeg burn-in goes through subprocess (matches existing s5_sync | |
| pattern but without sharing code, since the styling needs are different). | |
| """ | |
| from __future__ import annotations | |
| import subprocess | |
| from pathlib import Path | |
| from typing import Literal | |
| from steps.s2_transcribe import transcribe | |
| from steps.s3_translate import translate | |
| Format = Literal["srt", "vtt", "txt", "mp4"] | |
| CaptionStyle = Literal["tiktok", "youtube", "minimal"] | |
| Position = Literal["top", "middle", "bottom"] | |
| HAlign = Literal["left", "center", "right"] | |
| # Bounds for user-adjustable knobs. Backend clamps to these regardless of | |
| # what the client sends. | |
| FONT_SIZE_MIN = 12 | |
| FONT_SIZE_MAX = 40 | |
| MARGIN_V_MIN = 0 | |
| MARGIN_V_MAX = 240 | |
| # ISO-style short codes Whisper accepts. Names map to UI dropdown labels. | |
| _LANG_CODE = { | |
| "Auto-detect": "auto", | |
| "English": "en", "Spanish": "es", "French": "fr", "German": "de", | |
| "Portuguese": "pt", "Italian": "it", "Hindi": "hi", "Arabic": "ar", | |
| "Chinese": "zh", "Japanese": "ja", "Korean": "ko", "Russian": "ru", | |
| } | |
| def _is_video(path: Path) -> bool: | |
| return path.suffix.lower() in {".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"} | |
| def _extract_audio(input_path: Path, out_dir: Path) -> Path: | |
| """Pull a 16kHz mono WAV from the input β what whisper expects.""" | |
| audio_path = out_dir / "audio.wav" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(input_path), | |
| "-vn", "-ac", "1", "-ar", "16000", | |
| "-acodec", "pcm_s16le", | |
| str(audio_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg audio extract failed: {result.stderr[-300:]}") | |
| return audio_path | |
| def _resolve_lang(name: str) -> str: | |
| return _LANG_CODE.get(name, "auto") | |
| # ββ Caption format writers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _seg_text(seg: dict, prefer_translation: bool) -> str: | |
| if prefer_translation: | |
| return (seg.get("translated_text") or seg.get("text") or "").strip() | |
| return (seg.get("text") or "").strip() | |
| def _format_timestamp_srt(t: float) -> str: | |
| h = int(t // 3600) | |
| m = int((t % 3600) // 60) | |
| s = int(t % 60) | |
| ms = int(round((t - int(t)) * 1000)) | |
| return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" | |
| def _format_timestamp_vtt(t: float) -> str: | |
| return _format_timestamp_srt(t).replace(",", ".") | |
| def write_srt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path: | |
| lines = [] | |
| for i, seg in enumerate(segments, 1): | |
| text = _seg_text(seg, prefer_translation) | |
| if not text: | |
| continue | |
| lines.append(str(i)) | |
| lines.append(f"{_format_timestamp_srt(seg['start'])} --> {_format_timestamp_srt(seg['end'])}") | |
| lines.append(text) | |
| lines.append("") | |
| dest.write_text("\n".join(lines), encoding="utf-8") | |
| return dest | |
| def write_vtt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path: | |
| lines = ["WEBVTT", ""] | |
| for seg in segments: | |
| text = _seg_text(seg, prefer_translation) | |
| if not text: | |
| continue | |
| lines.append(f"{_format_timestamp_vtt(seg['start'])} --> {_format_timestamp_vtt(seg['end'])}") | |
| lines.append(text) | |
| lines.append("") | |
| dest.write_text("\n".join(lines), encoding="utf-8") | |
| return dest | |
| def write_txt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path: | |
| text = " ".join(_seg_text(s, prefer_translation) for s in segments if _seg_text(s, prefer_translation)) | |
| dest.write_text(text, encoding="utf-8") | |
| return dest | |
| # ββ Burn-in styling ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ASS-format alignment codes (libass), arranged as row + column: | |
| # row: bottom=0, middle=3, top=6 | |
| # col: left=1, center=2, right=3 | |
| # So bottom-left=1, bottom-center=2, ..., top-right=9. | |
| _POSITION_ROW = {"bottom": 0, "middle": 3, "top": 6} | |
| _HALIGN_COL = {"left": 1, "center": 2, "right": 3} | |
| _DEFAULT_MARGIN_V = {"bottom": 60, "middle": 0, "top": 60} | |
| # Per-style baseline β font size, stroke/shadow choices. The user can override | |
| # the font size via the slider; everything else stays tied to the style preset. | |
| _STYLE_DEFAULTS: dict[CaptionStyle, dict] = { | |
| "tiktok": {"font_size": 22, "bold": 1, "border_style": 1, "outline": 3, "shadow": 1}, | |
| "youtube": {"font_size": 18, "bold": 0, "border_style": 4, "outline": 8, "shadow": 0}, | |
| "minimal": {"font_size": 16, "bold": 0, "border_style": 1, "outline": 1, "shadow": 0}, | |
| } | |
| def _clamp(value: int, lo: int, hi: int) -> int: | |
| return max(lo, min(hi, value)) | |
| def _force_style_for( | |
| style: CaptionStyle, | |
| position: Position, | |
| h_align: HAlign = "center", | |
| font_size: int | None = None, | |
| margin_v: int | None = None, | |
| ) -> str: | |
| """Return an ffmpeg `subtitles=...:force_style='...'` string. | |
| Args: | |
| style: Visual preset β sets weight, stroke, shadow defaults. | |
| position: top / middle / bottom row. | |
| h_align: left / center / right column. | |
| font_size: Override the style's default font size (clamped to FONT_SIZE_MIN..MAX). | |
| margin_v: Override vertical margin in pixels (clamped to MARGIN_V_MIN..MAX). | |
| """ | |
| defaults = _STYLE_DEFAULTS[style] | |
| fs = _clamp(font_size if font_size is not None else defaults["font_size"], | |
| FONT_SIZE_MIN, FONT_SIZE_MAX) | |
| mv = _clamp(margin_v if margin_v is not None else _DEFAULT_MARGIN_V[position], | |
| MARGIN_V_MIN, MARGIN_V_MAX) | |
| align = _POSITION_ROW[position] + _HALIGN_COL[h_align] | |
| parts = [ | |
| "FontName=Arial", | |
| f"FontSize={fs}", | |
| f"Bold={defaults['bold']}", | |
| "PrimaryColour=&H00FFFFFF", | |
| ] | |
| if style == "youtube": | |
| # White on translucent black box | |
| parts.append("BackColour=&HB8000000") | |
| elif style == "minimal": | |
| # Subtle semi-transparent stroke instead of hard black | |
| parts.append("OutlineColour=&H80000000") | |
| else: # tiktok β hard black stroke | |
| parts.append("OutlineColour=&H00000000") | |
| parts += [ | |
| f"BorderStyle={defaults['border_style']}", | |
| f"Outline={defaults['outline']}", | |
| f"Shadow={defaults['shadow']}", | |
| f"Alignment={align}", | |
| f"MarginV={mv}", | |
| # Symmetric horizontal margins so left/right alignment has breathing room | |
| "MarginL=40", | |
| "MarginR=40", | |
| ] | |
| return ",".join(parts) | |
| def _burn_in( | |
| video_path: Path, | |
| srt_path: Path, | |
| dest: Path, | |
| style: CaptionStyle, | |
| position: Position, | |
| h_align: HAlign = "center", | |
| font_size: int | None = None, | |
| margin_v: int | None = None, | |
| ) -> Path: | |
| """Render captions into the video pixels via ffmpeg + libass.""" | |
| force_style = _force_style_for(style, position, h_align, font_size, margin_v) | |
| # Escape path for ffmpeg subtitle filter (single quotes around path, | |
| # and we replace any existing single quotes since they'd break the filter). | |
| srt_str = str(srt_path).replace("'", r"\'").replace(":", r"\:") | |
| vf = f"subtitles='{srt_str}':force_style='{force_style}'" | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", str(video_path), | |
| "-vf", vf, | |
| "-c:a", "copy", | |
| "-c:v", "libx264", | |
| "-preset", "veryfast", | |
| "-crf", "22", | |
| str(dest), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg burn-in failed: {result.stderr[-300:]}") | |
| return dest | |
| # ββ Public entry point ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_subtitles( | |
| *, | |
| input_path: Path, | |
| out_dir: Path, | |
| source_lang_name: str, | |
| target_lang_name: str, | |
| fmt: Format, | |
| style: CaptionStyle = "tiktok", | |
| position: Position = "bottom", | |
| h_align: HAlign = "center", | |
| font_size: int | None = None, | |
| margin_v: int | None = None, | |
| ) -> dict: | |
| """ | |
| Run the full subtitle pipeline. Returns: | |
| { | |
| "format": "srt" | "vtt" | "txt" | "mp4", | |
| "filename": <name in out_dir>, | |
| "segments": <int>, | |
| "translated": <bool>, | |
| } | |
| """ | |
| is_burn = fmt == "mp4" | |
| if is_burn and not _is_video(input_path): | |
| raise ValueError("Burn-in requires a video file.") | |
| # 1. Extract audio (or use as-is) | |
| if _is_video(input_path): | |
| audio_path = _extract_audio(input_path, out_dir) | |
| else: | |
| audio_path = input_path | |
| # 2. Transcribe | |
| src_code = _resolve_lang(source_lang_name) | |
| segments = transcribe(str(audio_path), language=src_code) | |
| if not segments: | |
| raise RuntimeError("Transcription produced no segments.") | |
| # 3. Translate if requested | |
| translated = False | |
| same_as_source = ( | |
| target_lang_name == "Same as source" | |
| or target_lang_name.lower() == source_lang_name.lower() | |
| ) | |
| if not same_as_source: | |
| segments = translate(segments, target_lang_name) | |
| translated = True | |
| # 4. Emit | |
| if fmt == "srt": | |
| out = write_srt(segments, out_dir / "captions.srt", translated) | |
| elif fmt == "vtt": | |
| out = write_vtt(segments, out_dir / "captions.vtt", translated) | |
| elif fmt == "txt": | |
| out = write_txt(segments, out_dir / "transcript.txt", translated) | |
| else: # mp4 | |
| srt_path = write_srt(segments, out_dir / "captions.srt", translated) | |
| out = _burn_in( | |
| input_path, srt_path, out_dir / "captioned.mp4", | |
| style, position, h_align, font_size, margin_v, | |
| ) | |
| return { | |
| "format": fmt, | |
| "filename": out.name, | |
| "segments": len(segments), | |
| "translated": translated, | |
| } | |