studiox-reel-cutter / processor.py
rajank18
error fixed
5639f16
"""
pipeline/processor.py
Cuts each highlight segment and converts it to 9:16 vertical MP4.
Optionally burns word-level ASS captions into the frame.
"""
import textwrap
from pathlib import Path
from typing import Callable, List, Optional
from utils import Segment, ReelOutput, log, run_cmd
# ── Crop filter builder ──────────────────────────────────────────────────────
def _build_crop_filter(src_w: int, src_h: int, out_w: int, out_h: int) -> str:
"""
Build an FFmpeg filtergraph that centre-crops any source aspect ratio
into out_w Γ— out_h with no black bars and no distortion.
Strategy:
1. Scale so height = out_h (width may exceed out_w)
2. Crop the centre out_w pixels from the scaled frame
"""
src_ratio = src_w / src_h if src_h else 1
tgt_ratio = out_w / out_h
if abs(src_ratio - tgt_ratio) < 0.01:
return f"scale={out_w}:{out_h}:flags=lanczos"
# Scale height to out_h, let width be whatever it needs to be
scaled_w = round(src_w * out_h / src_h)
scaled_w += scaled_w % 2 # must be even for yuv420p
# Ensure we have at least out_w pixels to crop from
if scaled_w < out_w:
scaled_w = out_w
return (
f"scale={scaled_w}:{out_h}:flags=lanczos,"
f"crop={out_w}:{out_h}:(in_w-{out_w})/2:0"
)
# ── ASS caption builder ──────────────────────────────────────────────────────
def _build_ass_captions(
words : List[dict],
seg_start : float,
seg_end : float,
out_w : int,
font_size : int,
font_color : str,
) -> str:
"""
Build an ASS subtitle file string for one reel segment.
Groups words into lines of 4, timed to AssemblyAI word timestamps.
"""
# Filter to words inside this segment
seg_words = [
w for w in words
if w["start_ms"] / 1000 >= seg_start
and w["end_ms"] / 1000 <= seg_end + 0.5
]
color_map = {
"white" : "&H00FFFFFF",
"yellow": "&H0000FFFF",
"cyan" : "&H00FFFF00",
}
ass_color = color_map.get(font_color.lower(), "&H00FFFFFF")
def _ts(ms: int) -> str:
"""Convert absolute ms timestamp β†’ ASS time relative to segment start."""
total_s = max(0.0, (ms - int(seg_start * 1000)) / 1000)
h = int(total_s // 3600)
m = int((total_s % 3600) // 60)
s = int(total_s % 60)
cs = int((total_s * 100) % 100)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
WORDS_PER_LINE = 4
events = []
for i in range(0, len(seg_words), WORDS_PER_LINE):
chunk = seg_words[i : i + WORDS_PER_LINE]
if not chunk:
continue
t_start = _ts(chunk[0]["start_ms"])
t_end = _ts(chunk[-1]["end_ms"])
text = " ".join(w["text"] for w in chunk)
events.append(f"Dialogue: 0,{t_start},{t_end},Default,,0,0,0,,{text}")
header = textwrap.dedent(f"""\
[Script Info]
ScriptType: v4.00+
PlayResX: {out_w}
WrapStyle: 0
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,Arial,{font_size},{ass_color},&H000000FF,&H00000000,&H99000000,-1,0,0,0,100,100,0,0,3,0,0,2,30,30,60,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
""")
return header + "\n".join(events) + "\n"
# ── Main cut function ────────────────────────────────────────────────────────
def cut_and_convert_segment(
video_path : Path,
segment : Segment,
output_path : Path,
src_w : int,
src_h : int,
out_w : int,
out_h : int,
words : List[dict],
add_captions : bool = True,
caption_font_size : int = 48,
caption_color : str = "white",
temp_dir : Optional[Path] = None,
) -> Path:
"""
Cut one segment from the source video and convert to vertical 9:16 MP4.
Optionally burns in word-level captions.
Returns the output Path.
"""
if temp_dir is None:
temp_dir = output_path.parent
crop_filter = _build_crop_filter(src_w, src_h, out_w, out_h)
vf_parts = [crop_filter]
# ── Captions ──────────────────────────────────────────────────────────────
if add_captions and words:
ass_content = _build_ass_captions(
words = words,
seg_start = segment.start,
seg_end = segment.end,
out_w = out_w,
font_size = caption_font_size,
font_color = caption_color,
)
ass_path = temp_dir / f"caps_{segment.index}.ass"
ass_path.write_text(ass_content, encoding="utf-8")
# Escape path for FFmpeg filter syntax (colons, backslashes)
ass_escaped = str(ass_path).replace("\\", "/").replace(":", "\\:")
vf_parts.append(f"ass={ass_escaped}")
vf_string = ",".join(vf_parts)
cmd = [
"ffmpeg", "-y",
# Seeking BEFORE -i is fast (keyframe-level) + accurate for H.264
"-ss", str(segment.start),
"-to", str(segment.end),
"-i", str(video_path),
"-vf", vf_string,
"-c:v", "libx264",
"-preset", "fast", # good encode speed on CPU
"-crf", "23", # quality/size balance
"-profile:v", "high",
"-level", "4.1",
"-pix_fmt", "yuv420p", # max device compatibility
"-movflags", "+faststart",
"-c:a", "aac",
"-b:a", "128k",
"-ar", "44100",
str(output_path),
]
run_cmd(cmd, f"Cutting reel #{segment.index} [{segment.start:.1f}s β†’ {segment.end:.1f}s]")
size_mb = output_path.stat().st_size / 1_048_576
log("βœ…", f"Reel #{segment.index} β†’ {output_path.name} ({size_mb:.1f} MB)")
return output_path
# ── Batch processing ─────────────────────────────────────────────────────────
def process_all_segments(
video_path : Path,
segments : List[Segment],
reels_dir : Path,
temp_dir : Path,
src_w : int,
src_h : int,
out_w : int,
out_h : int,
words : List[dict],
add_captions : bool = True,
caption_font_size : int = 48,
caption_color : str = "white",
progress_cb : Optional[Callable[[str, int], None]] = None,
) -> List[ReelOutput]:
"""
Process all segments sequentially, emitting progress for each.
Returns list of ReelOutput objects.
"""
results: List[ReelOutput] = []
total = len(segments)
for i, seg in enumerate(segments):
if progress_cb:
pct = int((i / total) * 100)
progress_cb("cutting_reels", pct)
reel_filename = f"reel_{seg.index:02d}_{int(seg.start)}s-{int(seg.end)}s.mp4"
reel_path = reels_dir / reel_filename
cut_and_convert_segment(
video_path = video_path,
segment = seg,
output_path = reel_path,
src_w = src_w,
src_h = src_h,
out_w = out_w,
out_h = out_h,
words = words,
add_captions = add_captions,
caption_font_size = caption_font_size,
caption_color = caption_color,
temp_dir = temp_dir,
)
results.append(ReelOutput(
index = seg.index,
path = reel_path,
segment = seg,
file_size = reel_path.stat().st_size,
))
if progress_cb:
progress_cb("cutting_reels", 100)
return results