videovoice / tools_api /subtitles.py
github-actions[bot]
deploy: switch to chatterbox requirements @ 4319730
5b7cd5f
"""
Subtitle generation: sidecar files (.srt/.vtt/.txt) and burn-in MP4.
Reuses steps.s2_transcribe.transcribe and steps.s3_translate.translate as
libraries. ffmpeg burn-in goes through subprocess (matches existing s5_sync
pattern but without sharing code, since the styling needs are different).
"""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Literal
from steps.s2_transcribe import transcribe
from steps.s3_translate import translate
Format = Literal["srt", "vtt", "txt", "mp4"]
CaptionStyle = Literal["tiktok", "youtube", "minimal"]
Position = Literal["top", "middle", "bottom"]
HAlign = Literal["left", "center", "right"]
# Bounds for user-adjustable knobs. Backend clamps to these regardless of
# what the client sends.
FONT_SIZE_MIN = 12
FONT_SIZE_MAX = 40
MARGIN_V_MIN = 0
MARGIN_V_MAX = 240
# ISO-style short codes Whisper accepts. Names map to UI dropdown labels.
_LANG_CODE = {
"Auto-detect": "auto",
"English": "en", "Spanish": "es", "French": "fr", "German": "de",
"Portuguese": "pt", "Italian": "it", "Hindi": "hi", "Arabic": "ar",
"Chinese": "zh", "Japanese": "ja", "Korean": "ko", "Russian": "ru",
}
def _is_video(path: Path) -> bool:
return path.suffix.lower() in {".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"}
def _extract_audio(input_path: Path, out_dir: Path) -> Path:
"""Pull a 16kHz mono WAV from the input β€” what whisper expects."""
audio_path = out_dir / "audio.wav"
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-vn", "-ac", "1", "-ar", "16000",
"-acodec", "pcm_s16le",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg audio extract failed: {result.stderr[-300:]}")
return audio_path
def _resolve_lang(name: str) -> str:
return _LANG_CODE.get(name, "auto")
# ── Caption format writers ─────────────────────────────────────────────
def _seg_text(seg: dict, prefer_translation: bool) -> str:
if prefer_translation:
return (seg.get("translated_text") or seg.get("text") or "").strip()
return (seg.get("text") or "").strip()
def _format_timestamp_srt(t: float) -> str:
h = int(t // 3600)
m = int((t % 3600) // 60)
s = int(t % 60)
ms = int(round((t - int(t)) * 1000))
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def _format_timestamp_vtt(t: float) -> str:
return _format_timestamp_srt(t).replace(",", ".")
def write_srt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path:
lines = []
for i, seg in enumerate(segments, 1):
text = _seg_text(seg, prefer_translation)
if not text:
continue
lines.append(str(i))
lines.append(f"{_format_timestamp_srt(seg['start'])} --> {_format_timestamp_srt(seg['end'])}")
lines.append(text)
lines.append("")
dest.write_text("\n".join(lines), encoding="utf-8")
return dest
def write_vtt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path:
lines = ["WEBVTT", ""]
for seg in segments:
text = _seg_text(seg, prefer_translation)
if not text:
continue
lines.append(f"{_format_timestamp_vtt(seg['start'])} --> {_format_timestamp_vtt(seg['end'])}")
lines.append(text)
lines.append("")
dest.write_text("\n".join(lines), encoding="utf-8")
return dest
def write_txt(segments: list[dict], dest: Path, prefer_translation: bool) -> Path:
text = " ".join(_seg_text(s, prefer_translation) for s in segments if _seg_text(s, prefer_translation))
dest.write_text(text, encoding="utf-8")
return dest
# ── Burn-in styling ────────────────────────────────────────────────────
# ASS-format alignment codes (libass), arranged as row + column:
# row: bottom=0, middle=3, top=6
# col: left=1, center=2, right=3
# So bottom-left=1, bottom-center=2, ..., top-right=9.
_POSITION_ROW = {"bottom": 0, "middle": 3, "top": 6}
_HALIGN_COL = {"left": 1, "center": 2, "right": 3}
_DEFAULT_MARGIN_V = {"bottom": 60, "middle": 0, "top": 60}
# Per-style baseline β€” font size, stroke/shadow choices. The user can override
# the font size via the slider; everything else stays tied to the style preset.
_STYLE_DEFAULTS: dict[CaptionStyle, dict] = {
"tiktok": {"font_size": 22, "bold": 1, "border_style": 1, "outline": 3, "shadow": 1},
"youtube": {"font_size": 18, "bold": 0, "border_style": 4, "outline": 8, "shadow": 0},
"minimal": {"font_size": 16, "bold": 0, "border_style": 1, "outline": 1, "shadow": 0},
}
def _clamp(value: int, lo: int, hi: int) -> int:
return max(lo, min(hi, value))
def _force_style_for(
style: CaptionStyle,
position: Position,
h_align: HAlign = "center",
font_size: int | None = None,
margin_v: int | None = None,
) -> str:
"""Return an ffmpeg `subtitles=...:force_style='...'` string.
Args:
style: Visual preset β€” sets weight, stroke, shadow defaults.
position: top / middle / bottom row.
h_align: left / center / right column.
font_size: Override the style's default font size (clamped to FONT_SIZE_MIN..MAX).
margin_v: Override vertical margin in pixels (clamped to MARGIN_V_MIN..MAX).
"""
defaults = _STYLE_DEFAULTS[style]
fs = _clamp(font_size if font_size is not None else defaults["font_size"],
FONT_SIZE_MIN, FONT_SIZE_MAX)
mv = _clamp(margin_v if margin_v is not None else _DEFAULT_MARGIN_V[position],
MARGIN_V_MIN, MARGIN_V_MAX)
align = _POSITION_ROW[position] + _HALIGN_COL[h_align]
parts = [
"FontName=Arial",
f"FontSize={fs}",
f"Bold={defaults['bold']}",
"PrimaryColour=&H00FFFFFF",
]
if style == "youtube":
# White on translucent black box
parts.append("BackColour=&HB8000000")
elif style == "minimal":
# Subtle semi-transparent stroke instead of hard black
parts.append("OutlineColour=&H80000000")
else: # tiktok β€” hard black stroke
parts.append("OutlineColour=&H00000000")
parts += [
f"BorderStyle={defaults['border_style']}",
f"Outline={defaults['outline']}",
f"Shadow={defaults['shadow']}",
f"Alignment={align}",
f"MarginV={mv}",
# Symmetric horizontal margins so left/right alignment has breathing room
"MarginL=40",
"MarginR=40",
]
return ",".join(parts)
def _burn_in(
video_path: Path,
srt_path: Path,
dest: Path,
style: CaptionStyle,
position: Position,
h_align: HAlign = "center",
font_size: int | None = None,
margin_v: int | None = None,
) -> Path:
"""Render captions into the video pixels via ffmpeg + libass."""
force_style = _force_style_for(style, position, h_align, font_size, margin_v)
# Escape path for ffmpeg subtitle filter (single quotes around path,
# and we replace any existing single quotes since they'd break the filter).
srt_str = str(srt_path).replace("'", r"\'").replace(":", r"\:")
vf = f"subtitles='{srt_str}':force_style='{force_style}'"
cmd = [
"ffmpeg", "-y",
"-i", str(video_path),
"-vf", vf,
"-c:a", "copy",
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "22",
str(dest),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg burn-in failed: {result.stderr[-300:]}")
return dest
# ── Public entry point ────────────────────────────────────────────────
def generate_subtitles(
*,
input_path: Path,
out_dir: Path,
source_lang_name: str,
target_lang_name: str,
fmt: Format,
style: CaptionStyle = "tiktok",
position: Position = "bottom",
h_align: HAlign = "center",
font_size: int | None = None,
margin_v: int | None = None,
) -> dict:
"""
Run the full subtitle pipeline. Returns:
{
"format": "srt" | "vtt" | "txt" | "mp4",
"filename": <name in out_dir>,
"segments": <int>,
"translated": <bool>,
}
"""
is_burn = fmt == "mp4"
if is_burn and not _is_video(input_path):
raise ValueError("Burn-in requires a video file.")
# 1. Extract audio (or use as-is)
if _is_video(input_path):
audio_path = _extract_audio(input_path, out_dir)
else:
audio_path = input_path
# 2. Transcribe
src_code = _resolve_lang(source_lang_name)
segments = transcribe(str(audio_path), language=src_code)
if not segments:
raise RuntimeError("Transcription produced no segments.")
# 3. Translate if requested
translated = False
same_as_source = (
target_lang_name == "Same as source"
or target_lang_name.lower() == source_lang_name.lower()
)
if not same_as_source:
segments = translate(segments, target_lang_name)
translated = True
# 4. Emit
if fmt == "srt":
out = write_srt(segments, out_dir / "captions.srt", translated)
elif fmt == "vtt":
out = write_vtt(segments, out_dir / "captions.vtt", translated)
elif fmt == "txt":
out = write_txt(segments, out_dir / "transcript.txt", translated)
else: # mp4
srt_path = write_srt(segments, out_dir / "captions.srt", translated)
out = _burn_in(
input_path, srt_path, out_dir / "captioned.mp4",
style, position, h_align, font_size, margin_v,
)
return {
"format": fmt,
"filename": out.name,
"segments": len(segments),
"translated": translated,
}