clipforge / src /humeo /pipeline.py
moonlantern1's picture
Fix caption edge clipping and real clip previews
f14fa4b verified
"""End-to-end product pipeline."""
import dataclasses
import json
import logging
import re
from pathlib import Path
from humeo_core.primitives.ingest import extract_keyframes
from humeo_core.schemas import Clip, LayoutInstruction, LayoutKind, RatingFeedback, RenderTheme, Scene
from humeo import interactive, session_state
from humeo.clip_assembly import apply_render_spans, assemble_clip, write_clip_plan
from humeo.clip_selection_cache import cache_valid, load_meta, transcript_fingerprint, write_artifacts
from humeo.clip_selector import (
clip_quality_priority_score,
load_clips,
renumber_clips_dense,
save_clips,
select_clips,
)
from humeo.config import MAX_CLIP_DURATION_SEC, MIN_CLIP_DURATION_SEC, PipelineConfig
from humeo.content_pruning import run_content_pruning_stage, snap_render_windows_to_sentence_boundaries
from humeo.cutter import generate_ass
from humeo.hook_detector import run_hook_detection_stage
from humeo.hook_library import resolve_hook_library_path
from humeo.ingest import (
download_video,
extract_audio,
stage_local_video,
transcript_cache_valid,
transcribe_whisperx,
)
from humeo.layout_vision import run_layout_vision_stage
from humeo.render_qa import qa_record_flags, run_render_qa
from humeo.render_window import clip_for_render
from humeo.reframe_ffmpeg import reframe_clip_ffmpeg
from humeo.transcript_align import clip_subtitle_words, group_words_to_cue_chunks
from humeo.video_cache import (
extract_youtube_video_id,
ingest_complete,
normalize_local_source_path,
read_youtube_info_json,
resolve_work_directory,
upsert_manifest_from_info,
)
logger = logging.getLogger(__name__)
_WEAK_HOOK_START_WORDS = {
"actually",
"basically",
"honestly",
"look",
"listen",
"okay",
"ok",
"right",
"so",
"well",
"yeah",
}
_WEAK_HOOK_START_PHRASES = {"i mean", "kind of", "sort of", "you know"}
_STRONG_HOOK_LATEST_START_SEC = 6.0
_FINAL_QUALITY_THRESHOLD = 0.68
_NATIVE_HIGHLIGHT_CHART_DOMINANCE_Y2 = 0.68
_NATIVE_HIGHLIGHT_MIN_PERSON_WIDTH = 0.42
_NATIVE_HIGHLIGHT_MAX_TOP_ANCHORED_PERSON_Y1 = 0.12
_NATIVE_HIGHLIGHT_SPLIT_TO_CENTER_MIN_ZOOM = 1.20
_PRESENTATION_REFERENCE_RE = re.compile(
r"\b("
r"as you can(?: also)? see|you can(?: also)? see|what you can(?: also)? see|look at|take a look|shown here|"
r"shown on|on the screen|on this slide|this chart|the chart|this graph|"
r"the graph|this slide|this matrix|the matrix|red line|yellow line|"
r"blue line|green line|top there|bottom there|x-axis|y-axis"
r")\b",
flags=re.IGNORECASE,
)
def _split_chart_person_to_center(instruction: LayoutInstruction) -> LayoutInstruction:
updates = {
"layout": LayoutKind.SIT_CENTER,
"zoom": max(float(instruction.zoom), _NATIVE_HIGHLIGHT_SPLIT_TO_CENTER_MIN_ZOOM),
"person_tracking": [],
"split_chart_region": None,
"split_person_region": None,
"split_second_chart_region": None,
"split_second_person_region": None,
"chart_x_norm": 0.0,
"top_band_ratio": 0.5,
}
if instruction.split_person_region is not None:
updates["person_x_norm"] = float(instruction.split_person_region.center_x)
return instruction.model_copy(
update=updates
)
def _rerun_config(config: PipelineConfig, steering_notes: list[str]) -> PipelineConfig:
return dataclasses.replace(
config,
steering_notes=list(steering_notes),
force_clip_selection=True,
overwrite_outputs=True,
)
def _build_steering_from_feedback(feedback: RatingFeedback) -> str:
parts: list[str] = []
if "wrong_moments" in feedback.issues:
parts.append("Previous selection picked the wrong moments. Reselect with different candidates.")
if "bad_cuts" in feedback.issues:
parts.append(
"Clip boundaries were bad. Prefer clips starting on clean sentence beginnings and ending on completed thoughts."
)
if "boring" in feedback.issues:
parts.append("Previous selection lacked energy. Bias strongly toward high-emotion, high-hook moments.")
if "confusing" in feedback.issues:
parts.append("Previous clips needed too much context. Pick moments that make sense standalone.")
if "wrong_layout" in feedback.issues:
logger.warning("Received wrong_layout feedback, but layout overrides are not available until Gate 2 ships.")
if "length_off" in feedback.issues:
parts.append("Clip durations felt off. Respect the duration bounds strictly.")
if "other" in feedback.issues and feedback.free_text:
parts.append(feedback.free_text)
return " ".join(parts).strip()
def _ensure_work_dir(config: PipelineConfig) -> None:
"""Resolve ``config.work_dir`` when unset (per-video cache) or ensure it exists."""
if config.work_dir is not None:
return
config.work_dir = resolve_work_directory(
youtube_url=config.youtube_url,
explicit_work_dir=None,
use_video_cache=config.use_video_cache,
cache_root=config.cache_root,
)
def _filter_render_valid_clips(clips: list, *, stage_label: str) -> list:
"""Drop clips whose actual render window violates the duration contract."""
valid: list = []
dropped = 0
for clip in clips:
render_clip = clip_for_render(clip)
render_duration = render_clip.duration_sec
if MIN_CLIP_DURATION_SEC <= render_duration <= MAX_CLIP_DURATION_SEC:
valid.append(clip)
continue
dropped += 1
logger.warning(
"%s: dropping clip %s because render-window duration %.1fs is outside [%ds, %ds] "
"(trim_start=%.1fs trim_end=%.1fs).",
stage_label,
clip.clip_id,
render_duration,
MIN_CLIP_DURATION_SEC,
MAX_CLIP_DURATION_SEC,
clip.trim_start_sec,
clip.trim_end_sec,
)
if dropped:
logger.warning("%s: dropped %d invalid render-window clip(s).", stage_label, dropped)
return valid
def _hook_window_text(clip, transcript: dict) -> str:
if clip.hook_start_sec is None or clip.hook_end_sec is None:
return ""
abs_start = clip.start_time_sec + clip.hook_start_sec
abs_end = clip.start_time_sec + clip.hook_end_sec
parts: list[str] = []
for seg in transcript.get("segments", []) or []:
start = float(seg.get("start", 0.0))
end = float(seg.get("end", start))
if end <= abs_start or start >= abs_end:
continue
text = str(seg.get("text", "")).strip()
if text:
parts.append(text)
return " ".join(parts).strip()
def _filter_weak_hook_clips(clips: list, transcript: dict, *, min_kept: int) -> list:
if len(clips) <= min_kept:
return clips
kept: list = []
dropped: list[str] = []
for clip in clips:
hook_start = clip.hook_start_sec
if (
hook_start is not None
and hook_start > _STRONG_HOOK_LATEST_START_SEC
and len(clips) - len(dropped) > min_kept
):
dropped.append(
f"{clip.clip_id} (hook starts at {hook_start:.1f}s; target <= {_STRONG_HOOK_LATEST_START_SEC:.1f}s)"
)
continue
hook_text = _hook_window_text(clip, transcript).lower()
first_words = [word.strip(".,!?;:'\"()[]{}") for word in hook_text.split()]
first_words = [word for word in first_words if word]
first_word = first_words[0] if first_words else ""
first_phrase = " ".join(first_words[:2])
if (
first_word in _WEAK_HOOK_START_WORDS or first_phrase in _WEAK_HOOK_START_PHRASES
) and len(clips) - len(dropped) > min_kept:
weak_text = first_phrase if first_phrase in _WEAK_HOOK_START_PHRASES else first_word
dropped.append(f"{clip.clip_id} (weak opener: {weak_text})")
continue
kept.append(clip)
if dropped:
logger.info("Dropped %d weak-hook clip(s): %s", len(dropped), ", ".join(dropped))
return kept
def _caption_chunk_penalty(clip, transcript: dict, *, render_theme) -> float:
words = clip_subtitle_words(transcript, clip).words
if not words:
return 0.08
if str(render_theme) == "native_highlight":
cue_words = 6
cue_sec = 2.4
prefer_break_on_punctuation = True
min_words_before_break = 4
elif str(render_theme) == "reference_lower_third":
cue_words = 10
cue_sec = 2.8
prefer_break_on_punctuation = True
min_words_before_break = 5
else:
cue_words = 10
cue_sec = 2.8
prefer_break_on_punctuation = False
min_words_before_break = 1
cue_chunks = group_words_to_cue_chunks(
words,
max_words_per_cue=cue_words,
max_cue_sec=cue_sec,
prefer_break_on_punctuation=prefer_break_on_punctuation,
min_words_before_break=min_words_before_break,
)
penalty = 0.0
for chunk in cue_chunks:
duration = chunk[-1].end_time - chunk[0].start_time
if len(chunk) == 1 and len(cue_chunks) > 1:
penalty += 0.04
if len(chunk) >= cue_words and duration < 0.65:
penalty += 0.04
if duration > cue_sec + 0.35:
penalty += 0.03
return min(0.18, penalty)
def _filter_low_quality_clips(clips: list, transcript: dict, *, min_kept: int, render_theme) -> list:
if len(clips) <= min_kept:
return renumber_clips_dense(clips)
ranked: list[tuple[float, object, float]] = []
for clip in clips:
render_clip = clip_for_render(clip)
caption_penalty = _caption_chunk_penalty(render_clip, transcript, render_theme=render_theme)
score = clip_quality_priority_score(clip) - caption_penalty
ranked.append((score, clip, caption_penalty))
ranked.sort(key=lambda item: item[0], reverse=True)
kept = [clip for score, clip, _ in ranked if score >= _FINAL_QUALITY_THRESHOLD]
if len(kept) < min_kept:
kept = [clip for _score, clip, _penalty in ranked[:min_kept]]
dropped = [
f"{clip.clip_id} (score={score:.2f}, caption_penalty={caption_penalty:.2f})"
for score, clip, caption_penalty in ranked
if clip not in kept
]
if dropped:
logger.info(
"Dropped %d low-quality clip(s) after pruning: %s",
len(dropped),
", ".join(dropped),
)
return renumber_clips_dense(kept)
def _clip_references_presentation(clip) -> bool:
text_parts = [
getattr(clip, "viral_hook", ""),
getattr(clip, "transcript", ""),
getattr(clip, "suggested_overlay_title", ""),
getattr(clip, "topic", ""),
]
text = " ".join(str(part or "") for part in text_parts)
return bool(_PRESENTATION_REFERENCE_RE.search(text))
def _normalize_layout_for_render(
instruction: LayoutInstruction,
*,
render_theme: RenderTheme,
clip=None,
) -> LayoutInstruction:
if render_theme != RenderTheme.NATIVE_HIGHLIGHT:
return instruction
if instruction.layout != LayoutKind.SPLIT_CHART_PERSON:
return instruction
if clip is None or not _clip_references_presentation(clip):
return _split_chart_person_to_center(instruction)
chart = instruction.split_chart_region
person = instruction.split_person_region
if chart is None or person is None:
return _split_chart_person_to_center(instruction)
return instruction
def _load_layout_raw_by_clip(work_dir: Path) -> dict[str, dict]:
path = work_dir / "layout_vision.json"
if not path.is_file():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception as exc: # noqa: BLE001 - optional QA metadata
logger.warning("Could not read layout raw metadata for QA: %s", exc)
return {}
clips = payload.get("clips", {})
if not isinstance(clips, dict):
return {}
out: dict[str, dict] = {}
for clip_id, item in clips.items():
if isinstance(item, dict) and isinstance(item.get("raw"), dict):
out[str(clip_id)] = item["raw"]
return out
def _normalize_rerender_clip_id(raw: str) -> str:
text = str(raw).strip()
match = re.search(r"(\d+)$", text)
if match:
return f"{int(match.group(1)):03d}"
return text
def _warned_clip_ids_from_qa(output_dir: Path) -> set[str]:
manifest_path = output_dir / "render_qa" / "qa_manifest.json"
if not manifest_path.is_file():
return set()
try:
payload = json.loads(manifest_path.read_text(encoding="utf-8"))
except Exception as exc: # noqa: BLE001 - stale QA should not block renders
logger.warning("Could not read QA manifest for warned-only rerender: %s", exc)
return set()
warned: set[str] = set()
for record in payload.get("shorts", []):
if not isinstance(record, dict):
continue
clip_id = record.get("clip_id")
if clip_id and qa_record_flags(record):
warned.add(_normalize_rerender_clip_id(str(clip_id)))
return warned
def _load_layout_instruction_cache(work_dir: Path) -> dict[str, LayoutInstruction]:
path = work_dir / "layout_vision.json"
if not path.is_file():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception as exc: # noqa: BLE001 - cache fallback
logger.warning("Could not read cached layout instructions: %s", exc)
return {}
clips = payload.get("clips", {})
if not isinstance(clips, dict):
return {}
out: dict[str, LayoutInstruction] = {}
for clip_id, item in clips.items():
if not isinstance(item, dict) or "instruction" not in item:
continue
try:
out[str(clip_id)] = LayoutInstruction.model_validate(item["instruction"])
except Exception as exc: # noqa: BLE001
logger.warning("Ignoring invalid cached layout for clip %s: %s", clip_id, exc)
return out
def run_pipeline(config: PipelineConfig) -> list[Path]:
"""
Execute the full podcast-to-shorts pipeline.
Args:
config: Pipeline configuration.
Returns:
List of paths to the final short-form MP4 files.
"""
logger.info("=" * 60)
logger.info("HUMEO PIPELINE START")
logger.info("Source: %s", config.youtube_url)
logger.info("Output: %s", config.output_dir)
logger.info("=" * 60)
_ensure_work_dir(config)
assert config.work_dir is not None
state = None
if config.interactive:
state = session_state.load_state(config.work_dir, config.youtube_url)
if config.steering_notes:
if list(config.steering_notes) != state.steering_notes:
state.steering_notes = list(config.steering_notes)
session_state.save_state(config.work_dir, state)
elif state.steering_notes:
config = dataclasses.replace(
config,
steering_notes=list(state.steering_notes),
force_clip_selection=True,
overwrite_outputs=True,
)
logger.info(
"Loaded %d steering note(s) from session state for this source.",
len(state.steering_notes),
)
# ------------------------------------------------------------------
# Stage 1: Ingest
# ------------------------------------------------------------------
logger.info("--- STAGE 1: INGESTION ---")
source_video = config.work_dir / "source.mp4"
transcript_path = config.work_dir / "transcript.json"
local_source_path = normalize_local_source_path(config.youtube_url)
reuse_ingest = ingest_complete(config.work_dir, config.youtube_url)
if reuse_ingest:
logger.info("Cached ingest found for this source (reusing source + transcript).")
elif local_source_path is not None:
source_video = stage_local_video(local_source_path, config.work_dir)
elif source_video.exists():
logger.info("Source video already downloaded, skipping download.")
else:
source_video = download_video(config.youtube_url, config.work_dir)
transcript_reusable = transcript_cache_valid(config.work_dir)
if reuse_ingest and transcript_reusable:
logger.info("Transcript already exists, loading.")
with open(transcript_path, "r", encoding="utf-8") as f:
transcript = json.load(f)
elif transcript_reusable and local_source_path is None:
logger.info("Transcript already exists, loading.")
with open(transcript_path, "r", encoding="utf-8") as f:
transcript = json.load(f)
else:
if transcript_path.exists():
logger.info("Transcript cache mismatch for current transcription settings; regenerating.")
audio_path = extract_audio(source_video, config.work_dir)
transcript = transcribe_whisperx(audio_path, config.work_dir)
if local_source_path is None:
vid = extract_youtube_video_id(config.youtube_url)
info = read_youtube_info_json(config.work_dir)
if not info and vid:
info = {"id": vid, "webpage_url": config.youtube_url}
if info:
upsert_manifest_from_info(
work_dir=config.work_dir,
youtube_url=config.youtube_url,
info=info,
cache_root=config.cache_root,
)
# ------------------------------------------------------------------
# Stage 2: Clip Selection
# ------------------------------------------------------------------
logger.info("--- STAGE 2: CLIP SELECTION ---")
clips_path = config.work_dir / "clips.json"
fp = transcript_fingerprint(transcript)
meta = load_meta(config.work_dir)
cache_hit = (
clips_path.is_file()
and not config.force_clip_selection
and meta is not None
and cache_valid(meta, fp, config)
)
if cache_hit:
clips = load_clips(clips_path)
logger.info("Clip selection cache hit (transcript + provider/model unchanged); skipping LLM.")
else:
clips, raw = select_clips(
transcript,
gemini_model=config.gemini_model,
hook_library_path=resolve_hook_library_path(config),
candidate_count=config.clip_selection_candidate_count,
quality_threshold=config.clip_selection_quality_threshold,
min_kept=config.clip_selection_min_kept,
max_kept=config.clip_selection_max_kept,
steering_notes=config.steering_notes,
)
save_clips(clips, clips_path)
write_artifacts(
config.work_dir,
transcript=transcript,
config=config,
raw_response=raw,
)
logger.info("Selected %d clips:", len(clips))
for clip in clips:
logger.info(
" [%s] %.1fs-%.1fs (%.1fs) score=%.2f - %s",
clip.clip_id,
clip.start_time_sec,
clip.end_time_sec,
clip.duration_sec,
clip.virality_score,
clip.topic,
)
# ------------------------------------------------------------------
# Stage 2.25: Hook Detection
# ------------------------------------------------------------------
# The clip selector is unreliable at localising the hook sentence and
# tends to return the 0.0-3.0s placeholder verbatim, which would disable
# start-trim in Stage 2.5. This stage asks Gemini to localise the real
# hook per clip so Stage 2.5 can clamp against a real window.
logger.info("--- STAGE 2.25: HOOK DETECTION (enabled=%s) ---", config.detect_hooks)
clips = run_hook_detection_stage(
config.work_dir,
clips,
transcript,
transcript_fp=fp,
config=config,
)
clips = _filter_weak_hook_clips(
clips,
transcript,
min_kept=config.clip_selection_min_kept,
)
# ------------------------------------------------------------------
# Stage 2.5: Content Pruning (HIVE-style inner-clip tightening)
# ------------------------------------------------------------------
# Tightens each candidate window by writing trim_start_sec / trim_end_sec
# on the Clip models. keyframe extraction and layout vision below both
# consume ``clip_for_render(clip)`` so they automatically operate on the
# pruned window without further changes.
logger.info("--- STAGE 2.5: CONTENT PRUNING (level=%s) ---", config.prune_level)
clips = run_content_pruning_stage(
config.work_dir,
clips,
transcript,
transcript_fp=fp,
config=config,
)
clips = snap_render_windows_to_sentence_boundaries(clips, transcript)
clips = _filter_render_valid_clips(clips, stage_label="Stage 2.5 guardrail")
clips = _filter_low_quality_clips(
clips,
transcript,
min_kept=config.clip_selection_min_kept,
render_theme=config.render_theme,
)
rerender_target_ids = {
_normalize_rerender_clip_id(clip_id)
for clip_id in config.rerender_clip_ids
}
if config.rerender_warned_only:
rerender_target_ids.update(_warned_clip_ids_from_qa(config.output_dir))
if rerender_target_ids:
before_count = len(clips)
clips = [clip for clip in clips if clip.clip_id in rerender_target_ids]
missing = sorted(rerender_target_ids - {clip.clip_id for clip in clips})
logger.info(
"Rerender target filter: keeping %d / %d clip(s): %s",
len(clips),
before_count,
", ".join(clip.clip_id for clip in clips) or "(none)",
)
if missing:
logger.warning("Requested rerender clip id(s) not found: %s", ", ".join(missing))
if not clips:
logger.warning("No clips matched rerender target filter; nothing to render.")
return []
# ------------------------------------------------------------------
# Stage 2.75: Hard-cut assembly
# ------------------------------------------------------------------
logger.info("--- STAGE 2.75: CLIP ASSEMBLY ---")
clips = apply_render_spans(clips, transcript)
assembled_dir = config.work_dir / "assembled"
assembled_by_id = {
clip.clip_id: assemble_clip(source_video, clip, transcript, assembled_dir)
for clip in clips
}
clips = [assembled_by_id[clip.clip_id].clip for clip in clips]
assembled_clips_path = write_clip_plan(config.work_dir / "assembled_clips.json", clips)
if config.interactive and state is not None:
result = interactive.approve_clips(clips)
if result.action == "quit":
logger.info("Aborted by user at Gate 1.")
return []
if result.action == "refine":
state.iteration += 1
if result.steering_note:
state.steering_notes.append(result.steering_note)
state.last_selected_ids = None
session_state.save_state(config.work_dir, state)
if state.iteration >= config.max_iterations:
logger.warning("Iteration cap hit. Proceeding with current clips.")
else:
return run_pipeline(_rerun_config(config, state.steering_notes))
elif result.action == "proceed":
selected_ids = list(result.selected_ids or [])
state.last_selected_ids = selected_ids
session_state.save_state(config.work_dir, state)
clip_by_id = {clip.clip_id: clip for clip in clips}
clips = [clip_by_id[clip_id] for clip_id in selected_ids]
elif result.action == "accept_all":
state.last_selected_ids = [clip.clip_id for clip in clips]
session_state.save_state(config.work_dir, state)
# ------------------------------------------------------------------
# Stage 3: Clip layouts
# ------------------------------------------------------------------
logger.info("--- STAGE 3: CLIP LAYOUTS ---")
keyframes_dir = config.work_dir / "keyframes"
clip_scenes: list[Scene] = []
source_videos_by_scene: dict[str, Path] = {}
for clip in clips:
assembled = assembled_by_id[clip.clip_id]
rw = clip_for_render(clip)
clip_scenes.append(
Scene(scene_id=clip.clip_id, start_time=rw.start_time_sec, end_time=rw.end_time_sec)
)
source_videos_by_scene[clip.clip_id] = assembled.source_path
layout_instructions: dict[str, LayoutInstruction] = {}
if rerender_target_ids:
cached_layouts = _load_layout_instruction_cache(config.work_dir)
if all(clip.clip_id in cached_layouts for clip in clips):
layout_instructions = {
clip.clip_id: cached_layouts[clip.clip_id]
for clip in clips
}
logger.info(
"Using cached layout instructions for rerender target(s): %s",
", ".join(layout_instructions),
)
if not layout_instructions:
extracted_scenes: list[Scene] = []
for scene in clip_scenes:
extracted_scenes.extend(
extract_keyframes(
str(source_videos_by_scene[scene.scene_id]),
[scene],
str(keyframes_dir / scene.scene_id),
)
)
clip_scenes = extracted_scenes
layout_instructions = run_layout_vision_stage(
config.work_dir,
clip_scenes,
source_video=source_video,
source_videos_by_scene=source_videos_by_scene,
transcript_fp=fp,
clips_path=assembled_clips_path,
config=config,
)
# ------------------------------------------------------------------
# Stage 4: Render
# ------------------------------------------------------------------
logger.info("--- STAGE 4: RENDER ---")
final_outputs: list[Path] = []
render_clips_by_id: dict[str, Clip] = {}
render_transcripts_by_id: dict[str, dict] = {}
render_layouts_by_id: dict[str, LayoutInstruction] = {}
render_sources_by_id: dict[str, Path] = {}
subtitles_dir = config.work_dir / "subtitles"
subtitles_dir.mkdir(parents=True, exist_ok=True)
for clip in clips:
assembled = assembled_by_id[clip.clip_id]
instr = layout_instructions.get(clip.clip_id)
if instr is None:
hint = clip.layout_hint or LayoutKind.SIT_CENTER
instr = LayoutInstruction(clip_id=clip.clip_id, layout=hint)
instr = _normalize_layout_for_render(instr, render_theme=config.render_theme, clip=clip)
clip.layout = instr.layout
rclip = clip_for_render(clip)
render_clips_by_id[clip.clip_id] = rclip
render_transcripts_by_id[clip.clip_id] = assembled.transcript
render_layouts_by_id[clip.clip_id] = instr
render_sources_by_id[clip.clip_id] = assembled.source_path
subtitle_path = None
if config.burn_subtitles:
# ASS (not SRT) so the caption file's PlayResY matches the output
# resolution and libass' font/margin scaling is 1:1.
subtitle_path = generate_ass(
rclip,
assembled.transcript,
subtitles_dir,
max_words_per_cue=config.subtitle_max_words_per_cue,
max_cue_sec=config.subtitle_max_cue_sec,
play_res_x=1080,
play_res_y=1920,
font_size=config.subtitle_font_size,
margin_v=config.subtitle_margin_v,
render_theme=config.render_theme,
native_highlight_lead_sec=config.subtitle_highlight_lead_sec,
native_highlight_min_dwell_sec=config.subtitle_highlight_min_dwell_sec,
repair_word_timings=config.repair_subtitle_word_timings,
)
else:
logger.info("Clip %s: subtitle burn disabled for this run.", clip.clip_id)
final_path = config.output_dir / f"short_{clip.clip_id}.mp4"
should_overwrite_clip = config.overwrite_outputs or clip.clip_id in rerender_target_ids
if final_path.exists() and not should_overwrite_clip:
logger.info("Clip %s already rendered, skipping.", clip.clip_id)
final_outputs.append(final_path)
continue
if final_path.exists() and should_overwrite_clip:
logger.info("Clip %s exists; overwriting for this render pass.", clip.clip_id)
# Font size and margin are already baked into the ASS file at
# PlayResY=1920, so the compile primitive does not need to override
# them -- but it still does, harmlessly, for single-source overrides.
reframe_clip_ffmpeg(
input_path=assembled.source_path,
output_path=final_path,
clip=rclip,
layout_instruction=instr,
subtitle_path=subtitle_path,
subtitle_font_size=config.subtitle_font_size,
subtitle_margin_v=config.subtitle_margin_v,
title_text=clip.suggested_overlay_title,
render_theme=config.render_theme,
)
final_outputs.append(final_path)
if config.render_qa and final_outputs:
logger.info("--- STAGE 4.5: RENDER QA ---")
try:
run_render_qa(
output_dir=config.output_dir,
final_outputs=final_outputs,
render_clips_by_id=render_clips_by_id,
transcripts_by_id=render_transcripts_by_id,
layouts_by_id=render_layouts_by_id,
assembled_sources_by_id=render_sources_by_id,
raw_layouts_by_id=_load_layout_raw_by_clip(config.work_dir),
reference_video=config.qa_reference_video,
debug_overlay=config.qa_debug_overlay,
)
except Exception as exc: # noqa: BLE001 - QA must not fail delivery
logger.warning("Render QA failed, leaving rendered shorts intact: %s", exc)
# ------------------------------------------------------------------
# Done
# ------------------------------------------------------------------
logger.info("=" * 60)
logger.info("PIPELINE COMPLETE - %d shorts generated:", len(final_outputs))
for p in final_outputs:
logger.info(" -> %s", p)
logger.info("=" * 60)
if config.interactive and final_outputs and state is not None:
feedback = interactive.rate_output(final_outputs)
state.last_rating = feedback
session_state.save_state(config.work_dir, state)
if feedback.rating == 3:
logger.info("Rated Great. Shipped.")
return final_outputs
steering = _build_steering_from_feedback(feedback)
if not steering:
logger.warning("Interactive feedback recorded, but it is not actionable until a later gate ships.")
return final_outputs
state.iteration += 1
state.steering_notes.append(steering)
session_state.save_state(config.work_dir, state)
if state.iteration >= config.max_iterations:
logger.warning("Iteration cap hit. Source may not have a strong short.")
return final_outputs
return run_pipeline(_rerun_config(config, state.steering_notes))
return final_outputs