import shutil import subprocess from pathlib import Path from typing import Callable from app.core.config import Settings from app.models.schemas import ChannelProfile, ClipCandidate, TranscriptSegment from app.services.subtitles import write_single_caption_srt, write_srt, write_srt_from_cues from app.storage import JobStore class ClipGenerator: def __init__(self, settings: Settings, store: JobStore) -> None: self.settings = settings self.store = store def generate( self, job_id: str, video_path: Path, clips: list[ClipCandidate], transcript: list[TranscriptSegment], profile: ChannelProfile, progress_callback: Callable[[int, int], None] | None = None, ) -> list[ClipCandidate]: rendered: list[ClipCandidate] = [] total = len(clips) for index, clip in enumerate(clips, start=1): if progress_callback: progress_callback(index, total) rendered.append(self.render_one(job_id, video_path, clip, transcript, profile, index)) return rendered def render_one( self, job_id: str, video_path: Path, clip: ClipCandidate, transcript: list[TranscriptSegment], profile: ChannelProfile, index: int = 1, ) -> ClipCandidate: job_dir = self.store.job_dir(job_id) output_name = f"clip_{index:02}_{clip.id[:8]}.mp4" subtitle_name = f"clip_{index:02}_{clip.id[:8]}.srt" output_path = job_dir / output_name subtitle_path = job_dir / subtitle_name duration = max(1.0, clip.end_seconds - clip.start_seconds) if clip.subtitle_cues: subtitle_cues = write_srt_from_cues(subtitle_path, clip.subtitle_cues) elif clip.subtitle_text.strip(): subtitle_cues = write_single_caption_srt(subtitle_path, duration, clip.subtitle_text) else: subtitle_cues = write_srt(subtitle_path, clip.start_seconds, clip.end_seconds, transcript) self._run_ffmpeg(video_path, output_path, subtitle_path, clip, profile) clip.video_url = self.store.media_url(job_id, output_name) clip.download_url = clip.video_url clip.metadata["subtitle_file"] = self.store.media_url(job_id, subtitle_name) clip.metadata["subtitle_cues"] = subtitle_cues return clip def _run_ffmpeg( self, video_path: Path, output_path: Path, subtitle_path: Path, clip: ClipCandidate, profile: ChannelProfile, ) -> None: ffmpeg = shutil.which(self.settings.ffmpeg_binary) if not ffmpeg or not video_path.exists() or video_path.stat().st_size == 0: output_path.write_bytes(b"") return keep_ranges = self._compute_keep_ranges(clip) post_filters = [self._platform_filter(profile), self._subtitle_filter(subtitle_path)] post_chain = ",".join(post_filters) if len(keep_ranges) <= 1: start, end = keep_ranges[0] command = [ ffmpeg, "-y", "-ss", f"{start:.3f}", "-i", str(video_path), "-t", f"{max(0.5, end - start):.3f}", "-vf", post_chain, "-c:v", self.settings.ffmpeg_video_codec, "-c:a", "aac", "-b:a", "160k", "-movflags", "+faststart", str(output_path), ] else: # Build concat filter that keeps multiple segments and skips middle ranges parts = [] labels_v = [] labels_a = [] for i, (start, end) in enumerate(keep_ranges): parts.append( f"[0:v]trim=start={start:.3f}:end={end:.3f},setpts=PTS-STARTPTS[v{i}]" ) parts.append( f"[0:a]atrim=start={start:.3f}:end={end:.3f},asetpts=PTS-STARTPTS[a{i}]" ) labels_v.append(f"[v{i}]") labels_a.append(f"[a{i}]") concat_inputs = "".join( f"{labels_v[i]}{labels_a[i]}" for i in range(len(keep_ranges)) ) parts.append( f"{concat_inputs}concat=n={len(keep_ranges)}:v=1:a=1[vc][ac]" ) parts.append(f"[vc]{post_chain}[vout]") filter_complex = ";".join(parts) command = [ ffmpeg, "-y", "-i", str(video_path), "-filter_complex", filter_complex, "-map", "[vout]", "-map", "[ac]", "-c:v", self.settings.ffmpeg_video_codec, "-c:a", "aac", "-b:a", "160k", "-movflags", "+faststart", str(output_path), ] try: subprocess.run(command, check=True, capture_output=True, text=True, timeout=180) return except Exception: fallback = command.copy() try: fallback[fallback.index(self.settings.ffmpeg_video_codec)] = ( self.settings.ffmpeg_cpu_codec ) except ValueError: pass try: subprocess.run(fallback, check=True, capture_output=True, text=True, timeout=180) return except Exception: output_path.write_bytes(b"") def _compute_keep_ranges(self, clip: ClipCandidate) -> list[tuple[float, float]]: """Return absolute video time ranges to keep, after subtracting skip_ranges.""" clip_start = float(clip.start_seconds) clip_end = float(clip.end_seconds) if not clip.skip_ranges: return [(clip_start, clip_end)] # Skip ranges are relative to clip start. Convert to absolute and sort. skips: list[tuple[float, float]] = [] for skip in clip.skip_ranges: s = clip_start + max(0.0, float(skip.start_seconds)) e = clip_start + max(0.0, float(skip.end_seconds)) if e > s: skips.append((min(s, clip_end), min(e, clip_end))) skips.sort() # Merge overlapping merged: list[tuple[float, float]] = [] for s, e in skips: if merged and s <= merged[-1][1]: merged[-1] = (merged[-1][0], max(merged[-1][1], e)) else: merged.append((s, e)) # Compute keep segments keeps: list[tuple[float, float]] = [] cursor = clip_start for s, e in merged: if s > cursor: keeps.append((cursor, s)) cursor = max(cursor, e) if cursor < clip_end: keeps.append((cursor, clip_end)) return keeps if keeps else [(clip_start, clip_end)] def _platform_filter(self, profile: ChannelProfile) -> str: if profile.target_platform.value in {"tiktok", "youtube_shorts", "instagram_reels"}: return "scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920" return "scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2" def _subtitle_filter(self, subtitle_path: Path) -> str: escaped = str(subtitle_path.resolve()).replace("\\", "/").replace(":", "\\:") style = ( "Fontname=Arial," "Fontsize=22," "PrimaryColour=&H00FFFFFF," "OutlineColour=&H00000000," "BorderStyle=1," "Outline=2," "Shadow=1," "Alignment=2," "MarginV=210" ) return f"subtitles='{escaped}':force_style='{style}'"