Spaces:
Running
Running
| """ | |
| pipeline/processor.py | |
| Cuts each highlight segment and converts it to 9:16 vertical MP4. | |
| Optionally burns word-level ASS captions into the frame. | |
| """ | |
| import textwrap | |
| from pathlib import Path | |
| from typing import Callable, List, Optional | |
| from utils import Segment, ReelOutput, log, run_cmd | |
| # ββ Crop filter builder ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_crop_filter(src_w: int, src_h: int, out_w: int, out_h: int) -> str: | |
| """ | |
| Build an FFmpeg filtergraph that centre-crops any source aspect ratio | |
| into out_w Γ out_h with no black bars and no distortion. | |
| Strategy: | |
| 1. Scale so height = out_h (width may exceed out_w) | |
| 2. Crop the centre out_w pixels from the scaled frame | |
| """ | |
| src_ratio = src_w / src_h if src_h else 1 | |
| tgt_ratio = out_w / out_h | |
| if abs(src_ratio - tgt_ratio) < 0.01: | |
| return f"scale={out_w}:{out_h}:flags=lanczos" | |
| # Scale height to out_h, let width be whatever it needs to be | |
| scaled_w = round(src_w * out_h / src_h) | |
| scaled_w += scaled_w % 2 # must be even for yuv420p | |
| # Ensure we have at least out_w pixels to crop from | |
| if scaled_w < out_w: | |
| scaled_w = out_w | |
| return ( | |
| f"scale={scaled_w}:{out_h}:flags=lanczos," | |
| f"crop={out_w}:{out_h}:(in_w-{out_w})/2:0" | |
| ) | |
| # ββ ASS caption builder ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_ass_captions( | |
| words : List[dict], | |
| seg_start : float, | |
| seg_end : float, | |
| out_w : int, | |
| font_size : int, | |
| font_color : str, | |
| ) -> str: | |
| """ | |
| Build an ASS subtitle file string for one reel segment. | |
| Groups words into lines of 4, timed to AssemblyAI word timestamps. | |
| """ | |
| # Filter to words inside this segment | |
| seg_words = [ | |
| w for w in words | |
| if w["start_ms"] / 1000 >= seg_start | |
| and w["end_ms"] / 1000 <= seg_end + 0.5 | |
| ] | |
| color_map = { | |
| "white" : "&H00FFFFFF", | |
| "yellow": "&H0000FFFF", | |
| "cyan" : "&H00FFFF00", | |
| } | |
| ass_color = color_map.get(font_color.lower(), "&H00FFFFFF") | |
| def _ts(ms: int) -> str: | |
| """Convert absolute ms timestamp β ASS time relative to segment start.""" | |
| total_s = max(0.0, (ms - int(seg_start * 1000)) / 1000) | |
| h = int(total_s // 3600) | |
| m = int((total_s % 3600) // 60) | |
| s = int(total_s % 60) | |
| cs = int((total_s * 100) % 100) | |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" | |
| WORDS_PER_LINE = 4 | |
| events = [] | |
| for i in range(0, len(seg_words), WORDS_PER_LINE): | |
| chunk = seg_words[i : i + WORDS_PER_LINE] | |
| if not chunk: | |
| continue | |
| t_start = _ts(chunk[0]["start_ms"]) | |
| t_end = _ts(chunk[-1]["end_ms"]) | |
| text = " ".join(w["text"] for w in chunk) | |
| events.append(f"Dialogue: 0,{t_start},{t_end},Default,,0,0,0,,{text}") | |
| header = textwrap.dedent(f"""\ | |
| [Script Info] | |
| ScriptType: v4.00+ | |
| PlayResX: {out_w} | |
| WrapStyle: 0 | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Default,Arial,{font_size},{ass_color},&H000000FF,&H00000000,&H99000000,-1,0,0,0,100,100,0,0,3,0,0,2,30,30,60,1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text | |
| """) | |
| return header + "\n".join(events) + "\n" | |
| # ββ Main cut function ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def cut_and_convert_segment( | |
| video_path : Path, | |
| segment : Segment, | |
| output_path : Path, | |
| src_w : int, | |
| src_h : int, | |
| out_w : int, | |
| out_h : int, | |
| words : List[dict], | |
| add_captions : bool = True, | |
| caption_font_size : int = 48, | |
| caption_color : str = "white", | |
| temp_dir : Optional[Path] = None, | |
| ) -> Path: | |
| """ | |
| Cut one segment from the source video and convert to vertical 9:16 MP4. | |
| Optionally burns in word-level captions. | |
| Returns the output Path. | |
| """ | |
| if temp_dir is None: | |
| temp_dir = output_path.parent | |
| crop_filter = _build_crop_filter(src_w, src_h, out_w, out_h) | |
| vf_parts = [crop_filter] | |
| # ββ Captions ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if add_captions and words: | |
| ass_content = _build_ass_captions( | |
| words = words, | |
| seg_start = segment.start, | |
| seg_end = segment.end, | |
| out_w = out_w, | |
| font_size = caption_font_size, | |
| font_color = caption_color, | |
| ) | |
| ass_path = temp_dir / f"caps_{segment.index}.ass" | |
| ass_path.write_text(ass_content, encoding="utf-8") | |
| # Escape path for FFmpeg filter syntax (colons, backslashes) | |
| ass_escaped = str(ass_path).replace("\\", "/").replace(":", "\\:") | |
| vf_parts.append(f"ass={ass_escaped}") | |
| vf_string = ",".join(vf_parts) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| # Seeking BEFORE -i is fast (keyframe-level) + accurate for H.264 | |
| "-ss", str(segment.start), | |
| "-to", str(segment.end), | |
| "-i", str(video_path), | |
| "-vf", vf_string, | |
| "-c:v", "libx264", | |
| "-preset", "fast", # good encode speed on CPU | |
| "-crf", "23", # quality/size balance | |
| "-profile:v", "high", | |
| "-level", "4.1", | |
| "-pix_fmt", "yuv420p", # max device compatibility | |
| "-movflags", "+faststart", | |
| "-c:a", "aac", | |
| "-b:a", "128k", | |
| "-ar", "44100", | |
| str(output_path), | |
| ] | |
| run_cmd(cmd, f"Cutting reel #{segment.index} [{segment.start:.1f}s β {segment.end:.1f}s]") | |
| size_mb = output_path.stat().st_size / 1_048_576 | |
| log("β ", f"Reel #{segment.index} β {output_path.name} ({size_mb:.1f} MB)") | |
| return output_path | |
| # ββ Batch processing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_all_segments( | |
| video_path : Path, | |
| segments : List[Segment], | |
| reels_dir : Path, | |
| temp_dir : Path, | |
| src_w : int, | |
| src_h : int, | |
| out_w : int, | |
| out_h : int, | |
| words : List[dict], | |
| add_captions : bool = True, | |
| caption_font_size : int = 48, | |
| caption_color : str = "white", | |
| progress_cb : Optional[Callable[[str, int], None]] = None, | |
| ) -> List[ReelOutput]: | |
| """ | |
| Process all segments sequentially, emitting progress for each. | |
| Returns list of ReelOutput objects. | |
| """ | |
| results: List[ReelOutput] = [] | |
| total = len(segments) | |
| for i, seg in enumerate(segments): | |
| if progress_cb: | |
| pct = int((i / total) * 100) | |
| progress_cb("cutting_reels", pct) | |
| reel_filename = f"reel_{seg.index:02d}_{int(seg.start)}s-{int(seg.end)}s.mp4" | |
| reel_path = reels_dir / reel_filename | |
| cut_and_convert_segment( | |
| video_path = video_path, | |
| segment = seg, | |
| output_path = reel_path, | |
| src_w = src_w, | |
| src_h = src_h, | |
| out_w = out_w, | |
| out_h = out_h, | |
| words = words, | |
| add_captions = add_captions, | |
| caption_font_size = caption_font_size, | |
| caption_color = caption_color, | |
| temp_dir = temp_dir, | |
| ) | |
| results.append(ReelOutput( | |
| index = seg.index, | |
| path = reel_path, | |
| segment = seg, | |
| file_size = reel_path.stat().st_size, | |
| )) | |
| if progress_cb: | |
| progress_cb("cutting_reels", 100) | |
| return results | |