JakgritB
feat(editor): subtitle-first editor + AI subtitle pipeline
89e1dc4
import shutil
import subprocess
from pathlib import Path
from typing import Callable
from app.core.config import Settings
from app.models.schemas import ChannelProfile, ClipCandidate, TranscriptSegment
from app.services.subtitles import write_single_caption_srt, write_srt, write_srt_from_cues
from app.storage import JobStore
class ClipGenerator:
def __init__(self, settings: Settings, store: JobStore) -> None:
self.settings = settings
self.store = store
def generate(
self,
job_id: str,
video_path: Path,
clips: list[ClipCandidate],
transcript: list[TranscriptSegment],
profile: ChannelProfile,
progress_callback: Callable[[int, int], None] | None = None,
) -> list[ClipCandidate]:
rendered: list[ClipCandidate] = []
total = len(clips)
for index, clip in enumerate(clips, start=1):
if progress_callback:
progress_callback(index, total)
rendered.append(self.render_one(job_id, video_path, clip, transcript, profile, index))
return rendered
def render_one(
self,
job_id: str,
video_path: Path,
clip: ClipCandidate,
transcript: list[TranscriptSegment],
profile: ChannelProfile,
index: int = 1,
) -> ClipCandidate:
job_dir = self.store.job_dir(job_id)
output_name = f"clip_{index:02}_{clip.id[:8]}.mp4"
subtitle_name = f"clip_{index:02}_{clip.id[:8]}.srt"
output_path = job_dir / output_name
subtitle_path = job_dir / subtitle_name
duration = max(1.0, clip.end_seconds - clip.start_seconds)
if clip.subtitle_cues:
subtitle_cues = write_srt_from_cues(subtitle_path, clip.subtitle_cues)
elif clip.subtitle_text.strip():
subtitle_cues = write_single_caption_srt(subtitle_path, duration, clip.subtitle_text)
else:
subtitle_cues = write_srt(subtitle_path, clip.start_seconds, clip.end_seconds, transcript)
self._run_ffmpeg(video_path, output_path, subtitle_path, clip, profile)
clip.video_url = self.store.media_url(job_id, output_name)
clip.download_url = clip.video_url
clip.metadata["subtitle_file"] = self.store.media_url(job_id, subtitle_name)
clip.metadata["subtitle_cues"] = subtitle_cues
return clip
def _run_ffmpeg(
self,
video_path: Path,
output_path: Path,
subtitle_path: Path,
clip: ClipCandidate,
profile: ChannelProfile,
) -> None:
ffmpeg = shutil.which(self.settings.ffmpeg_binary)
if not ffmpeg or not video_path.exists() or video_path.stat().st_size == 0:
output_path.write_bytes(b"")
return
keep_ranges = self._compute_keep_ranges(clip)
post_filters = [self._platform_filter(profile), self._subtitle_filter(subtitle_path)]
post_chain = ",".join(post_filters)
if len(keep_ranges) <= 1:
start, end = keep_ranges[0]
command = [
ffmpeg,
"-y",
"-ss",
f"{start:.3f}",
"-i",
str(video_path),
"-t",
f"{max(0.5, end - start):.3f}",
"-vf",
post_chain,
"-c:v",
self.settings.ffmpeg_video_codec,
"-c:a",
"aac",
"-b:a",
"160k",
"-movflags",
"+faststart",
str(output_path),
]
else:
# Build concat filter that keeps multiple segments and skips middle ranges
parts = []
labels_v = []
labels_a = []
for i, (start, end) in enumerate(keep_ranges):
parts.append(
f"[0:v]trim=start={start:.3f}:end={end:.3f},setpts=PTS-STARTPTS[v{i}]"
)
parts.append(
f"[0:a]atrim=start={start:.3f}:end={end:.3f},asetpts=PTS-STARTPTS[a{i}]"
)
labels_v.append(f"[v{i}]")
labels_a.append(f"[a{i}]")
concat_inputs = "".join(
f"{labels_v[i]}{labels_a[i]}" for i in range(len(keep_ranges))
)
parts.append(
f"{concat_inputs}concat=n={len(keep_ranges)}:v=1:a=1[vc][ac]"
)
parts.append(f"[vc]{post_chain}[vout]")
filter_complex = ";".join(parts)
command = [
ffmpeg,
"-y",
"-i",
str(video_path),
"-filter_complex",
filter_complex,
"-map",
"[vout]",
"-map",
"[ac]",
"-c:v",
self.settings.ffmpeg_video_codec,
"-c:a",
"aac",
"-b:a",
"160k",
"-movflags",
"+faststart",
str(output_path),
]
try:
subprocess.run(command, check=True, capture_output=True, text=True, timeout=180)
return
except Exception:
fallback = command.copy()
try:
fallback[fallback.index(self.settings.ffmpeg_video_codec)] = (
self.settings.ffmpeg_cpu_codec
)
except ValueError:
pass
try:
subprocess.run(fallback, check=True, capture_output=True, text=True, timeout=180)
return
except Exception:
output_path.write_bytes(b"")
def _compute_keep_ranges(self, clip: ClipCandidate) -> list[tuple[float, float]]:
"""Return absolute video time ranges to keep, after subtracting skip_ranges."""
clip_start = float(clip.start_seconds)
clip_end = float(clip.end_seconds)
if not clip.skip_ranges:
return [(clip_start, clip_end)]
# Skip ranges are relative to clip start. Convert to absolute and sort.
skips: list[tuple[float, float]] = []
for skip in clip.skip_ranges:
s = clip_start + max(0.0, float(skip.start_seconds))
e = clip_start + max(0.0, float(skip.end_seconds))
if e > s:
skips.append((min(s, clip_end), min(e, clip_end)))
skips.sort()
# Merge overlapping
merged: list[tuple[float, float]] = []
for s, e in skips:
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
# Compute keep segments
keeps: list[tuple[float, float]] = []
cursor = clip_start
for s, e in merged:
if s > cursor:
keeps.append((cursor, s))
cursor = max(cursor, e)
if cursor < clip_end:
keeps.append((cursor, clip_end))
return keeps if keeps else [(clip_start, clip_end)]
def _platform_filter(self, profile: ChannelProfile) -> str:
if profile.target_platform.value in {"tiktok", "youtube_shorts", "instagram_reels"}:
return "scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920"
return "scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2"
def _subtitle_filter(self, subtitle_path: Path) -> str:
escaped = str(subtitle_path.resolve()).replace("\\", "/").replace(":", "\\:")
style = (
"Fontname=Arial,"
"Fontsize=22,"
"PrimaryColour=&H00FFFFFF,"
"OutlineColour=&H00000000,"
"BorderStyle=1,"
"Outline=2,"
"Shadow=1,"
"Alignment=2,"
"MarginV=210"
)
return f"subtitles='{escaped}':force_style='{style}'"