| import shutil |
| import subprocess |
| from pathlib import Path |
| from typing import Callable |
|
|
| from app.core.config import Settings |
| from app.models.schemas import ChannelProfile, ClipCandidate, TranscriptSegment |
| from app.services.subtitles import write_single_caption_srt, write_srt, write_srt_from_cues |
| from app.storage import JobStore |
|
|
|
|
| class ClipGenerator: |
| def __init__(self, settings: Settings, store: JobStore) -> None: |
| self.settings = settings |
| self.store = store |
|
|
| def generate( |
| self, |
| job_id: str, |
| video_path: Path, |
| clips: list[ClipCandidate], |
| transcript: list[TranscriptSegment], |
| profile: ChannelProfile, |
| progress_callback: Callable[[int, int], None] | None = None, |
| ) -> list[ClipCandidate]: |
| rendered: list[ClipCandidate] = [] |
| total = len(clips) |
| for index, clip in enumerate(clips, start=1): |
| if progress_callback: |
| progress_callback(index, total) |
| rendered.append(self.render_one(job_id, video_path, clip, transcript, profile, index)) |
| return rendered |
|
|
| def render_one( |
| self, |
| job_id: str, |
| video_path: Path, |
| clip: ClipCandidate, |
| transcript: list[TranscriptSegment], |
| profile: ChannelProfile, |
| index: int = 1, |
| ) -> ClipCandidate: |
| job_dir = self.store.job_dir(job_id) |
| output_name = f"clip_{index:02}_{clip.id[:8]}.mp4" |
| subtitle_name = f"clip_{index:02}_{clip.id[:8]}.srt" |
| output_path = job_dir / output_name |
| subtitle_path = job_dir / subtitle_name |
|
|
| duration = max(1.0, clip.end_seconds - clip.start_seconds) |
| if clip.subtitle_cues: |
| subtitle_cues = write_srt_from_cues(subtitle_path, clip.subtitle_cues) |
| elif clip.subtitle_text.strip(): |
| subtitle_cues = write_single_caption_srt(subtitle_path, duration, clip.subtitle_text) |
| else: |
| subtitle_cues = write_srt(subtitle_path, clip.start_seconds, clip.end_seconds, transcript) |
| self._run_ffmpeg(video_path, output_path, subtitle_path, clip, profile) |
|
|
| clip.video_url = self.store.media_url(job_id, output_name) |
| clip.download_url = clip.video_url |
| clip.metadata["subtitle_file"] = self.store.media_url(job_id, subtitle_name) |
| clip.metadata["subtitle_cues"] = subtitle_cues |
| return clip |
|
|
| def _run_ffmpeg( |
| self, |
| video_path: Path, |
| output_path: Path, |
| subtitle_path: Path, |
| clip: ClipCandidate, |
| profile: ChannelProfile, |
| ) -> None: |
| ffmpeg = shutil.which(self.settings.ffmpeg_binary) |
| if not ffmpeg or not video_path.exists() or video_path.stat().st_size == 0: |
| output_path.write_bytes(b"") |
| return |
|
|
| keep_ranges = self._compute_keep_ranges(clip) |
| post_filters = [self._platform_filter(profile), self._subtitle_filter(subtitle_path)] |
| post_chain = ",".join(post_filters) |
|
|
| if len(keep_ranges) <= 1: |
| start, end = keep_ranges[0] |
| command = [ |
| ffmpeg, |
| "-y", |
| "-ss", |
| f"{start:.3f}", |
| "-i", |
| str(video_path), |
| "-t", |
| f"{max(0.5, end - start):.3f}", |
| "-vf", |
| post_chain, |
| "-c:v", |
| self.settings.ffmpeg_video_codec, |
| "-c:a", |
| "aac", |
| "-b:a", |
| "160k", |
| "-movflags", |
| "+faststart", |
| str(output_path), |
| ] |
| else: |
| |
| parts = [] |
| labels_v = [] |
| labels_a = [] |
| for i, (start, end) in enumerate(keep_ranges): |
| parts.append( |
| f"[0:v]trim=start={start:.3f}:end={end:.3f},setpts=PTS-STARTPTS[v{i}]" |
| ) |
| parts.append( |
| f"[0:a]atrim=start={start:.3f}:end={end:.3f},asetpts=PTS-STARTPTS[a{i}]" |
| ) |
| labels_v.append(f"[v{i}]") |
| labels_a.append(f"[a{i}]") |
| concat_inputs = "".join( |
| f"{labels_v[i]}{labels_a[i]}" for i in range(len(keep_ranges)) |
| ) |
| parts.append( |
| f"{concat_inputs}concat=n={len(keep_ranges)}:v=1:a=1[vc][ac]" |
| ) |
| parts.append(f"[vc]{post_chain}[vout]") |
| filter_complex = ";".join(parts) |
| command = [ |
| ffmpeg, |
| "-y", |
| "-i", |
| str(video_path), |
| "-filter_complex", |
| filter_complex, |
| "-map", |
| "[vout]", |
| "-map", |
| "[ac]", |
| "-c:v", |
| self.settings.ffmpeg_video_codec, |
| "-c:a", |
| "aac", |
| "-b:a", |
| "160k", |
| "-movflags", |
| "+faststart", |
| str(output_path), |
| ] |
|
|
| try: |
| subprocess.run(command, check=True, capture_output=True, text=True, timeout=180) |
| return |
| except Exception: |
| fallback = command.copy() |
| try: |
| fallback[fallback.index(self.settings.ffmpeg_video_codec)] = ( |
| self.settings.ffmpeg_cpu_codec |
| ) |
| except ValueError: |
| pass |
| try: |
| subprocess.run(fallback, check=True, capture_output=True, text=True, timeout=180) |
| return |
| except Exception: |
| output_path.write_bytes(b"") |
|
|
| def _compute_keep_ranges(self, clip: ClipCandidate) -> list[tuple[float, float]]: |
| """Return absolute video time ranges to keep, after subtracting skip_ranges.""" |
| clip_start = float(clip.start_seconds) |
| clip_end = float(clip.end_seconds) |
| if not clip.skip_ranges: |
| return [(clip_start, clip_end)] |
|
|
| |
| skips: list[tuple[float, float]] = [] |
| for skip in clip.skip_ranges: |
| s = clip_start + max(0.0, float(skip.start_seconds)) |
| e = clip_start + max(0.0, float(skip.end_seconds)) |
| if e > s: |
| skips.append((min(s, clip_end), min(e, clip_end))) |
| skips.sort() |
|
|
| |
| merged: list[tuple[float, float]] = [] |
| for s, e in skips: |
| if merged and s <= merged[-1][1]: |
| merged[-1] = (merged[-1][0], max(merged[-1][1], e)) |
| else: |
| merged.append((s, e)) |
|
|
| |
| keeps: list[tuple[float, float]] = [] |
| cursor = clip_start |
| for s, e in merged: |
| if s > cursor: |
| keeps.append((cursor, s)) |
| cursor = max(cursor, e) |
| if cursor < clip_end: |
| keeps.append((cursor, clip_end)) |
|
|
| return keeps if keeps else [(clip_start, clip_end)] |
|
|
| def _platform_filter(self, profile: ChannelProfile) -> str: |
| if profile.target_platform.value in {"tiktok", "youtube_shorts", "instagram_reels"}: |
| return "scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920" |
| return "scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2" |
|
|
| def _subtitle_filter(self, subtitle_path: Path) -> str: |
| escaped = str(subtitle_path.resolve()).replace("\\", "/").replace(":", "\\:") |
| style = ( |
| "Fontname=Arial," |
| "Fontsize=22," |
| "PrimaryColour=&H00FFFFFF," |
| "OutlineColour=&H00000000," |
| "BorderStyle=1," |
| "Outline=2," |
| "Shadow=1," |
| "Alignment=2," |
| "MarginV=210" |
| ) |
| return f"subtitles='{escaped}':force_style='{style}'" |
|
|