""" pipeline/processor.py Cuts each highlight segment and converts it to 9:16 vertical MP4. Optionally burns word-level ASS captions into the frame. """ import textwrap from pathlib import Path from typing import Callable, List, Optional from utils import Segment, ReelOutput, log, run_cmd # ── Crop filter builder ────────────────────────────────────────────────────── def _build_crop_filter(src_w: int, src_h: int, out_w: int, out_h: int) -> str: """ Build an FFmpeg filtergraph that centre-crops any source aspect ratio into out_w × out_h with no black bars and no distortion. Strategy: 1. Scale so height = out_h (width may exceed out_w) 2. Crop the centre out_w pixels from the scaled frame """ src_ratio = src_w / src_h if src_h else 1 tgt_ratio = out_w / out_h if abs(src_ratio - tgt_ratio) < 0.01: return f"scale={out_w}:{out_h}:flags=lanczos" # Scale height to out_h, let width be whatever it needs to be scaled_w = round(src_w * out_h / src_h) scaled_w += scaled_w % 2 # must be even for yuv420p # Ensure we have at least out_w pixels to crop from if scaled_w < out_w: scaled_w = out_w return ( f"scale={scaled_w}:{out_h}:flags=lanczos," f"crop={out_w}:{out_h}:(in_w-{out_w})/2:0" ) # ── ASS caption builder ────────────────────────────────────────────────────── def _build_ass_captions( words : List[dict], seg_start : float, seg_end : float, out_w : int, font_size : int, font_color : str, ) -> str: """ Build an ASS subtitle file string for one reel segment. Groups words into lines of 4, timed to AssemblyAI word timestamps. """ # Filter to words inside this segment seg_words = [ w for w in words if w["start_ms"] / 1000 >= seg_start and w["end_ms"] / 1000 <= seg_end + 0.5 ] color_map = { "white" : "&H00FFFFFF", "yellow": "&H0000FFFF", "cyan" : "&H00FFFF00", } ass_color = color_map.get(font_color.lower(), "&H00FFFFFF") def _ts(ms: int) -> str: """Convert absolute ms timestamp → ASS time relative to segment start.""" total_s = max(0.0, (ms - int(seg_start * 1000)) / 1000) h = int(total_s // 3600) m = int((total_s % 3600) // 60) s = int(total_s % 60) cs = int((total_s * 100) % 100) return f"{h}:{m:02d}:{s:02d}.{cs:02d}" WORDS_PER_LINE = 4 events = [] for i in range(0, len(seg_words), WORDS_PER_LINE): chunk = seg_words[i : i + WORDS_PER_LINE] if not chunk: continue t_start = _ts(chunk[0]["start_ms"]) t_end = _ts(chunk[-1]["end_ms"]) text = " ".join(w["text"] for w in chunk) events.append(f"Dialogue: 0,{t_start},{t_end},Default,,0,0,0,,{text}") header = textwrap.dedent(f"""\ [Script Info] ScriptType: v4.00+ PlayResX: {out_w} WrapStyle: 0 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: Default,Arial,{font_size},{ass_color},&H000000FF,&H00000000,&H99000000,-1,0,0,0,100,100,0,0,3,0,0,2,30,30,60,1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """) return header + "\n".join(events) + "\n" # ── Main cut function ──────────────────────────────────────────────────────── def cut_and_convert_segment( video_path : Path, segment : Segment, output_path : Path, src_w : int, src_h : int, out_w : int, out_h : int, words : List[dict], add_captions : bool = True, caption_font_size : int = 48, caption_color : str = "white", temp_dir : Optional[Path] = None, ) -> Path: """ Cut one segment from the source video and convert to vertical 9:16 MP4. Optionally burns in word-level captions. Returns the output Path. """ if temp_dir is None: temp_dir = output_path.parent crop_filter = _build_crop_filter(src_w, src_h, out_w, out_h) vf_parts = [crop_filter] # ── Captions ────────────────────────────────────────────────────────────── if add_captions and words: ass_content = _build_ass_captions( words = words, seg_start = segment.start, seg_end = segment.end, out_w = out_w, font_size = caption_font_size, font_color = caption_color, ) ass_path = temp_dir / f"caps_{segment.index}.ass" ass_path.write_text(ass_content, encoding="utf-8") # Escape path for FFmpeg filter syntax (colons, backslashes) ass_escaped = str(ass_path).replace("\\", "/").replace(":", "\\:") vf_parts.append(f"ass={ass_escaped}") vf_string = ",".join(vf_parts) cmd = [ "ffmpeg", "-y", # Seeking BEFORE -i is fast (keyframe-level) + accurate for H.264 "-ss", str(segment.start), "-to", str(segment.end), "-i", str(video_path), "-vf", vf_string, "-c:v", "libx264", "-preset", "fast", # good encode speed on CPU "-crf", "23", # quality/size balance "-profile:v", "high", "-level", "4.1", "-pix_fmt", "yuv420p", # max device compatibility "-movflags", "+faststart", "-c:a", "aac", "-b:a", "128k", "-ar", "44100", str(output_path), ] run_cmd(cmd, f"Cutting reel #{segment.index} [{segment.start:.1f}s → {segment.end:.1f}s]") size_mb = output_path.stat().st_size / 1_048_576 log("✅", f"Reel #{segment.index} → {output_path.name} ({size_mb:.1f} MB)") return output_path # ── Batch processing ───────────────────────────────────────────────────────── def process_all_segments( video_path : Path, segments : List[Segment], reels_dir : Path, temp_dir : Path, src_w : int, src_h : int, out_w : int, out_h : int, words : List[dict], add_captions : bool = True, caption_font_size : int = 48, caption_color : str = "white", progress_cb : Optional[Callable[[str, int], None]] = None, ) -> List[ReelOutput]: """ Process all segments sequentially, emitting progress for each. Returns list of ReelOutput objects. """ results: List[ReelOutput] = [] total = len(segments) for i, seg in enumerate(segments): if progress_cb: pct = int((i / total) * 100) progress_cb("cutting_reels", pct) reel_filename = f"reel_{seg.index:02d}_{int(seg.start)}s-{int(seg.end)}s.mp4" reel_path = reels_dir / reel_filename cut_and_convert_segment( video_path = video_path, segment = seg, output_path = reel_path, src_w = src_w, src_h = src_h, out_w = out_w, out_h = out_h, words = words, add_captions = add_captions, caption_font_size = caption_font_size, caption_color = caption_color, temp_dir = temp_dir, ) results.append(ReelOutput( index = seg.index, path = reel_path, segment = seg, file_size = reel_path.stat().st_size, )) if progress_cb: progress_cb("cutting_reels", 100) return results