"""End-to-end product pipeline.""" import dataclasses import json import logging import re from pathlib import Path from humeo_core.primitives.ingest import extract_keyframes from humeo_core.schemas import Clip, LayoutInstruction, LayoutKind, RatingFeedback, RenderTheme, Scene from humeo import interactive, session_state from humeo.clip_assembly import apply_render_spans, assemble_clip, write_clip_plan from humeo.clip_selection_cache import cache_valid, load_meta, transcript_fingerprint, write_artifacts from humeo.clip_selector import ( clip_quality_priority_score, load_clips, renumber_clips_dense, save_clips, select_clips, ) from humeo.config import MAX_CLIP_DURATION_SEC, MIN_CLIP_DURATION_SEC, PipelineConfig from humeo.content_pruning import run_content_pruning_stage, snap_render_windows_to_sentence_boundaries from humeo.cutter import generate_ass from humeo.hook_detector import run_hook_detection_stage from humeo.hook_library import resolve_hook_library_path from humeo.ingest import ( download_video, extract_audio, stage_local_video, transcript_cache_valid, transcribe_whisperx, ) from humeo.layout_vision import run_layout_vision_stage from humeo.render_qa import qa_record_flags, run_render_qa from humeo.render_window import clip_for_render from humeo.reframe_ffmpeg import reframe_clip_ffmpeg from humeo.transcript_align import clip_subtitle_words, group_words_to_cue_chunks from humeo.video_cache import ( extract_youtube_video_id, ingest_complete, normalize_local_source_path, read_youtube_info_json, resolve_work_directory, upsert_manifest_from_info, ) logger = logging.getLogger(__name__) _WEAK_HOOK_START_WORDS = { "actually", "basically", "honestly", "look", "listen", "okay", "ok", "right", "so", "well", "yeah", } _WEAK_HOOK_START_PHRASES = {"i mean", "kind of", "sort of", "you know"} _STRONG_HOOK_LATEST_START_SEC = 6.0 _FINAL_QUALITY_THRESHOLD = 0.68 _NATIVE_HIGHLIGHT_CHART_DOMINANCE_Y2 = 0.68 _NATIVE_HIGHLIGHT_MIN_PERSON_WIDTH = 0.42 _NATIVE_HIGHLIGHT_MAX_TOP_ANCHORED_PERSON_Y1 = 0.12 _NATIVE_HIGHLIGHT_SPLIT_TO_CENTER_MIN_ZOOM = 1.20 _PRESENTATION_REFERENCE_RE = re.compile( r"\b(" r"as you can(?: also)? see|you can(?: also)? see|what you can(?: also)? see|look at|take a look|shown here|" r"shown on|on the screen|on this slide|this chart|the chart|this graph|" r"the graph|this slide|this matrix|the matrix|red line|yellow line|" r"blue line|green line|top there|bottom there|x-axis|y-axis" r")\b", flags=re.IGNORECASE, ) def _split_chart_person_to_center(instruction: LayoutInstruction) -> LayoutInstruction: updates = { "layout": LayoutKind.SIT_CENTER, "zoom": max(float(instruction.zoom), _NATIVE_HIGHLIGHT_SPLIT_TO_CENTER_MIN_ZOOM), "person_tracking": [], "split_chart_region": None, "split_person_region": None, "split_second_chart_region": None, "split_second_person_region": None, "chart_x_norm": 0.0, "top_band_ratio": 0.5, } if instruction.split_person_region is not None: updates["person_x_norm"] = float(instruction.split_person_region.center_x) return instruction.model_copy( update=updates ) def _rerun_config(config: PipelineConfig, steering_notes: list[str]) -> PipelineConfig: return dataclasses.replace( config, steering_notes=list(steering_notes), force_clip_selection=True, overwrite_outputs=True, ) def _build_steering_from_feedback(feedback: RatingFeedback) -> str: parts: list[str] = [] if "wrong_moments" in feedback.issues: parts.append("Previous selection picked the wrong moments. Reselect with different candidates.") if "bad_cuts" in feedback.issues: parts.append( "Clip boundaries were bad. Prefer clips starting on clean sentence beginnings and ending on completed thoughts." ) if "boring" in feedback.issues: parts.append("Previous selection lacked energy. Bias strongly toward high-emotion, high-hook moments.") if "confusing" in feedback.issues: parts.append("Previous clips needed too much context. Pick moments that make sense standalone.") if "wrong_layout" in feedback.issues: logger.warning("Received wrong_layout feedback, but layout overrides are not available until Gate 2 ships.") if "length_off" in feedback.issues: parts.append("Clip durations felt off. Respect the duration bounds strictly.") if "other" in feedback.issues and feedback.free_text: parts.append(feedback.free_text) return " ".join(parts).strip() def _ensure_work_dir(config: PipelineConfig) -> None: """Resolve ``config.work_dir`` when unset (per-video cache) or ensure it exists.""" if config.work_dir is not None: return config.work_dir = resolve_work_directory( youtube_url=config.youtube_url, explicit_work_dir=None, use_video_cache=config.use_video_cache, cache_root=config.cache_root, ) def _filter_render_valid_clips(clips: list, *, stage_label: str) -> list: """Drop clips whose actual render window violates the duration contract.""" valid: list = [] dropped = 0 for clip in clips: render_clip = clip_for_render(clip) render_duration = render_clip.duration_sec if MIN_CLIP_DURATION_SEC <= render_duration <= MAX_CLIP_DURATION_SEC: valid.append(clip) continue dropped += 1 logger.warning( "%s: dropping clip %s because render-window duration %.1fs is outside [%ds, %ds] " "(trim_start=%.1fs trim_end=%.1fs).", stage_label, clip.clip_id, render_duration, MIN_CLIP_DURATION_SEC, MAX_CLIP_DURATION_SEC, clip.trim_start_sec, clip.trim_end_sec, ) if dropped: logger.warning("%s: dropped %d invalid render-window clip(s).", stage_label, dropped) return valid def _hook_window_text(clip, transcript: dict) -> str: if clip.hook_start_sec is None or clip.hook_end_sec is None: return "" abs_start = clip.start_time_sec + clip.hook_start_sec abs_end = clip.start_time_sec + clip.hook_end_sec parts: list[str] = [] for seg in transcript.get("segments", []) or []: start = float(seg.get("start", 0.0)) end = float(seg.get("end", start)) if end <= abs_start or start >= abs_end: continue text = str(seg.get("text", "")).strip() if text: parts.append(text) return " ".join(parts).strip() def _filter_weak_hook_clips(clips: list, transcript: dict, *, min_kept: int) -> list: if len(clips) <= min_kept: return clips kept: list = [] dropped: list[str] = [] for clip in clips: hook_start = clip.hook_start_sec if ( hook_start is not None and hook_start > _STRONG_HOOK_LATEST_START_SEC and len(clips) - len(dropped) > min_kept ): dropped.append( f"{clip.clip_id} (hook starts at {hook_start:.1f}s; target <= {_STRONG_HOOK_LATEST_START_SEC:.1f}s)" ) continue hook_text = _hook_window_text(clip, transcript).lower() first_words = [word.strip(".,!?;:'\"()[]{}") for word in hook_text.split()] first_words = [word for word in first_words if word] first_word = first_words[0] if first_words else "" first_phrase = " ".join(first_words[:2]) if ( first_word in _WEAK_HOOK_START_WORDS or first_phrase in _WEAK_HOOK_START_PHRASES ) and len(clips) - len(dropped) > min_kept: weak_text = first_phrase if first_phrase in _WEAK_HOOK_START_PHRASES else first_word dropped.append(f"{clip.clip_id} (weak opener: {weak_text})") continue kept.append(clip) if dropped: logger.info("Dropped %d weak-hook clip(s): %s", len(dropped), ", ".join(dropped)) return kept def _caption_chunk_penalty(clip, transcript: dict, *, render_theme) -> float: words = clip_subtitle_words(transcript, clip).words if not words: return 0.08 if str(render_theme) == "native_highlight": cue_words = 6 cue_sec = 2.4 prefer_break_on_punctuation = True min_words_before_break = 4 elif str(render_theme) == "reference_lower_third": cue_words = 10 cue_sec = 2.8 prefer_break_on_punctuation = True min_words_before_break = 5 else: cue_words = 10 cue_sec = 2.8 prefer_break_on_punctuation = False min_words_before_break = 1 cue_chunks = group_words_to_cue_chunks( words, max_words_per_cue=cue_words, max_cue_sec=cue_sec, prefer_break_on_punctuation=prefer_break_on_punctuation, min_words_before_break=min_words_before_break, ) penalty = 0.0 for chunk in cue_chunks: duration = chunk[-1].end_time - chunk[0].start_time if len(chunk) == 1 and len(cue_chunks) > 1: penalty += 0.04 if len(chunk) >= cue_words and duration < 0.65: penalty += 0.04 if duration > cue_sec + 0.35: penalty += 0.03 return min(0.18, penalty) def _filter_low_quality_clips(clips: list, transcript: dict, *, min_kept: int, render_theme) -> list: if len(clips) <= min_kept: return renumber_clips_dense(clips) ranked: list[tuple[float, object, float]] = [] for clip in clips: render_clip = clip_for_render(clip) caption_penalty = _caption_chunk_penalty(render_clip, transcript, render_theme=render_theme) score = clip_quality_priority_score(clip) - caption_penalty ranked.append((score, clip, caption_penalty)) ranked.sort(key=lambda item: item[0], reverse=True) kept = [clip for score, clip, _ in ranked if score >= _FINAL_QUALITY_THRESHOLD] if len(kept) < min_kept: kept = [clip for _score, clip, _penalty in ranked[:min_kept]] dropped = [ f"{clip.clip_id} (score={score:.2f}, caption_penalty={caption_penalty:.2f})" for score, clip, caption_penalty in ranked if clip not in kept ] if dropped: logger.info( "Dropped %d low-quality clip(s) after pruning: %s", len(dropped), ", ".join(dropped), ) return renumber_clips_dense(kept) def _clip_references_presentation(clip) -> bool: text_parts = [ getattr(clip, "viral_hook", ""), getattr(clip, "transcript", ""), getattr(clip, "suggested_overlay_title", ""), getattr(clip, "topic", ""), ] text = " ".join(str(part or "") for part in text_parts) return bool(_PRESENTATION_REFERENCE_RE.search(text)) def _normalize_layout_for_render( instruction: LayoutInstruction, *, render_theme: RenderTheme, clip=None, ) -> LayoutInstruction: if render_theme != RenderTheme.NATIVE_HIGHLIGHT: return instruction if instruction.layout != LayoutKind.SPLIT_CHART_PERSON: return instruction if clip is None or not _clip_references_presentation(clip): return _split_chart_person_to_center(instruction) chart = instruction.split_chart_region person = instruction.split_person_region if chart is None or person is None: return _split_chart_person_to_center(instruction) return instruction def _load_layout_raw_by_clip(work_dir: Path) -> dict[str, dict]: path = work_dir / "layout_vision.json" if not path.is_file(): return {} try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception as exc: # noqa: BLE001 - optional QA metadata logger.warning("Could not read layout raw metadata for QA: %s", exc) return {} clips = payload.get("clips", {}) if not isinstance(clips, dict): return {} out: dict[str, dict] = {} for clip_id, item in clips.items(): if isinstance(item, dict) and isinstance(item.get("raw"), dict): out[str(clip_id)] = item["raw"] return out def _normalize_rerender_clip_id(raw: str) -> str: text = str(raw).strip() match = re.search(r"(\d+)$", text) if match: return f"{int(match.group(1)):03d}" return text def _warned_clip_ids_from_qa(output_dir: Path) -> set[str]: manifest_path = output_dir / "render_qa" / "qa_manifest.json" if not manifest_path.is_file(): return set() try: payload = json.loads(manifest_path.read_text(encoding="utf-8")) except Exception as exc: # noqa: BLE001 - stale QA should not block renders logger.warning("Could not read QA manifest for warned-only rerender: %s", exc) return set() warned: set[str] = set() for record in payload.get("shorts", []): if not isinstance(record, dict): continue clip_id = record.get("clip_id") if clip_id and qa_record_flags(record): warned.add(_normalize_rerender_clip_id(str(clip_id))) return warned def _load_layout_instruction_cache(work_dir: Path) -> dict[str, LayoutInstruction]: path = work_dir / "layout_vision.json" if not path.is_file(): return {} try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception as exc: # noqa: BLE001 - cache fallback logger.warning("Could not read cached layout instructions: %s", exc) return {} clips = payload.get("clips", {}) if not isinstance(clips, dict): return {} out: dict[str, LayoutInstruction] = {} for clip_id, item in clips.items(): if not isinstance(item, dict) or "instruction" not in item: continue try: out[str(clip_id)] = LayoutInstruction.model_validate(item["instruction"]) except Exception as exc: # noqa: BLE001 logger.warning("Ignoring invalid cached layout for clip %s: %s", clip_id, exc) return out def run_pipeline(config: PipelineConfig) -> list[Path]: """ Execute the full podcast-to-shorts pipeline. Args: config: Pipeline configuration. Returns: List of paths to the final short-form MP4 files. """ logger.info("=" * 60) logger.info("HUMEO PIPELINE START") logger.info("Source: %s", config.youtube_url) logger.info("Output: %s", config.output_dir) logger.info("=" * 60) _ensure_work_dir(config) assert config.work_dir is not None state = None if config.interactive: state = session_state.load_state(config.work_dir, config.youtube_url) if config.steering_notes: if list(config.steering_notes) != state.steering_notes: state.steering_notes = list(config.steering_notes) session_state.save_state(config.work_dir, state) elif state.steering_notes: config = dataclasses.replace( config, steering_notes=list(state.steering_notes), force_clip_selection=True, overwrite_outputs=True, ) logger.info( "Loaded %d steering note(s) from session state for this source.", len(state.steering_notes), ) # ------------------------------------------------------------------ # Stage 1: Ingest # ------------------------------------------------------------------ logger.info("--- STAGE 1: INGESTION ---") source_video = config.work_dir / "source.mp4" transcript_path = config.work_dir / "transcript.json" local_source_path = normalize_local_source_path(config.youtube_url) reuse_ingest = ingest_complete(config.work_dir, config.youtube_url) if reuse_ingest: logger.info("Cached ingest found for this source (reusing source + transcript).") elif local_source_path is not None: source_video = stage_local_video(local_source_path, config.work_dir) elif source_video.exists(): logger.info("Source video already downloaded, skipping download.") else: source_video = download_video(config.youtube_url, config.work_dir) transcript_reusable = transcript_cache_valid(config.work_dir) if reuse_ingest and transcript_reusable: logger.info("Transcript already exists, loading.") with open(transcript_path, "r", encoding="utf-8") as f: transcript = json.load(f) elif transcript_reusable and local_source_path is None: logger.info("Transcript already exists, loading.") with open(transcript_path, "r", encoding="utf-8") as f: transcript = json.load(f) else: if transcript_path.exists(): logger.info("Transcript cache mismatch for current transcription settings; regenerating.") audio_path = extract_audio(source_video, config.work_dir) transcript = transcribe_whisperx(audio_path, config.work_dir) if local_source_path is None: vid = extract_youtube_video_id(config.youtube_url) info = read_youtube_info_json(config.work_dir) if not info and vid: info = {"id": vid, "webpage_url": config.youtube_url} if info: upsert_manifest_from_info( work_dir=config.work_dir, youtube_url=config.youtube_url, info=info, cache_root=config.cache_root, ) # ------------------------------------------------------------------ # Stage 2: Clip Selection # ------------------------------------------------------------------ logger.info("--- STAGE 2: CLIP SELECTION ---") clips_path = config.work_dir / "clips.json" fp = transcript_fingerprint(transcript) meta = load_meta(config.work_dir) cache_hit = ( clips_path.is_file() and not config.force_clip_selection and meta is not None and cache_valid(meta, fp, config) ) if cache_hit: clips = load_clips(clips_path) logger.info("Clip selection cache hit (transcript + provider/model unchanged); skipping LLM.") else: clips, raw = select_clips( transcript, gemini_model=config.gemini_model, hook_library_path=resolve_hook_library_path(config), candidate_count=config.clip_selection_candidate_count, quality_threshold=config.clip_selection_quality_threshold, min_kept=config.clip_selection_min_kept, max_kept=config.clip_selection_max_kept, steering_notes=config.steering_notes, ) save_clips(clips, clips_path) write_artifacts( config.work_dir, transcript=transcript, config=config, raw_response=raw, ) logger.info("Selected %d clips:", len(clips)) for clip in clips: logger.info( " [%s] %.1fs-%.1fs (%.1fs) score=%.2f - %s", clip.clip_id, clip.start_time_sec, clip.end_time_sec, clip.duration_sec, clip.virality_score, clip.topic, ) # ------------------------------------------------------------------ # Stage 2.25: Hook Detection # ------------------------------------------------------------------ # The clip selector is unreliable at localising the hook sentence and # tends to return the 0.0-3.0s placeholder verbatim, which would disable # start-trim in Stage 2.5. This stage asks Gemini to localise the real # hook per clip so Stage 2.5 can clamp against a real window. logger.info("--- STAGE 2.25: HOOK DETECTION (enabled=%s) ---", config.detect_hooks) clips = run_hook_detection_stage( config.work_dir, clips, transcript, transcript_fp=fp, config=config, ) clips = _filter_weak_hook_clips( clips, transcript, min_kept=config.clip_selection_min_kept, ) # ------------------------------------------------------------------ # Stage 2.5: Content Pruning (HIVE-style inner-clip tightening) # ------------------------------------------------------------------ # Tightens each candidate window by writing trim_start_sec / trim_end_sec # on the Clip models. keyframe extraction and layout vision below both # consume ``clip_for_render(clip)`` so they automatically operate on the # pruned window without further changes. logger.info("--- STAGE 2.5: CONTENT PRUNING (level=%s) ---", config.prune_level) clips = run_content_pruning_stage( config.work_dir, clips, transcript, transcript_fp=fp, config=config, ) clips = snap_render_windows_to_sentence_boundaries(clips, transcript) clips = _filter_render_valid_clips(clips, stage_label="Stage 2.5 guardrail") clips = _filter_low_quality_clips( clips, transcript, min_kept=config.clip_selection_min_kept, render_theme=config.render_theme, ) rerender_target_ids = { _normalize_rerender_clip_id(clip_id) for clip_id in config.rerender_clip_ids } if config.rerender_warned_only: rerender_target_ids.update(_warned_clip_ids_from_qa(config.output_dir)) if rerender_target_ids: before_count = len(clips) clips = [clip for clip in clips if clip.clip_id in rerender_target_ids] missing = sorted(rerender_target_ids - {clip.clip_id for clip in clips}) logger.info( "Rerender target filter: keeping %d / %d clip(s): %s", len(clips), before_count, ", ".join(clip.clip_id for clip in clips) or "(none)", ) if missing: logger.warning("Requested rerender clip id(s) not found: %s", ", ".join(missing)) if not clips: logger.warning("No clips matched rerender target filter; nothing to render.") return [] # ------------------------------------------------------------------ # Stage 2.75: Hard-cut assembly # ------------------------------------------------------------------ logger.info("--- STAGE 2.75: CLIP ASSEMBLY ---") clips = apply_render_spans(clips, transcript) assembled_dir = config.work_dir / "assembled" assembled_by_id = { clip.clip_id: assemble_clip(source_video, clip, transcript, assembled_dir) for clip in clips } clips = [assembled_by_id[clip.clip_id].clip for clip in clips] assembled_clips_path = write_clip_plan(config.work_dir / "assembled_clips.json", clips) if config.interactive and state is not None: result = interactive.approve_clips(clips) if result.action == "quit": logger.info("Aborted by user at Gate 1.") return [] if result.action == "refine": state.iteration += 1 if result.steering_note: state.steering_notes.append(result.steering_note) state.last_selected_ids = None session_state.save_state(config.work_dir, state) if state.iteration >= config.max_iterations: logger.warning("Iteration cap hit. Proceeding with current clips.") else: return run_pipeline(_rerun_config(config, state.steering_notes)) elif result.action == "proceed": selected_ids = list(result.selected_ids or []) state.last_selected_ids = selected_ids session_state.save_state(config.work_dir, state) clip_by_id = {clip.clip_id: clip for clip in clips} clips = [clip_by_id[clip_id] for clip_id in selected_ids] elif result.action == "accept_all": state.last_selected_ids = [clip.clip_id for clip in clips] session_state.save_state(config.work_dir, state) # ------------------------------------------------------------------ # Stage 3: Clip layouts # ------------------------------------------------------------------ logger.info("--- STAGE 3: CLIP LAYOUTS ---") keyframes_dir = config.work_dir / "keyframes" clip_scenes: list[Scene] = [] source_videos_by_scene: dict[str, Path] = {} for clip in clips: assembled = assembled_by_id[clip.clip_id] rw = clip_for_render(clip) clip_scenes.append( Scene(scene_id=clip.clip_id, start_time=rw.start_time_sec, end_time=rw.end_time_sec) ) source_videos_by_scene[clip.clip_id] = assembled.source_path layout_instructions: dict[str, LayoutInstruction] = {} if rerender_target_ids: cached_layouts = _load_layout_instruction_cache(config.work_dir) if all(clip.clip_id in cached_layouts for clip in clips): layout_instructions = { clip.clip_id: cached_layouts[clip.clip_id] for clip in clips } logger.info( "Using cached layout instructions for rerender target(s): %s", ", ".join(layout_instructions), ) if not layout_instructions: extracted_scenes: list[Scene] = [] for scene in clip_scenes: extracted_scenes.extend( extract_keyframes( str(source_videos_by_scene[scene.scene_id]), [scene], str(keyframes_dir / scene.scene_id), ) ) clip_scenes = extracted_scenes layout_instructions = run_layout_vision_stage( config.work_dir, clip_scenes, source_video=source_video, source_videos_by_scene=source_videos_by_scene, transcript_fp=fp, clips_path=assembled_clips_path, config=config, ) # ------------------------------------------------------------------ # Stage 4: Render # ------------------------------------------------------------------ logger.info("--- STAGE 4: RENDER ---") final_outputs: list[Path] = [] render_clips_by_id: dict[str, Clip] = {} render_transcripts_by_id: dict[str, dict] = {} render_layouts_by_id: dict[str, LayoutInstruction] = {} render_sources_by_id: dict[str, Path] = {} subtitles_dir = config.work_dir / "subtitles" subtitles_dir.mkdir(parents=True, exist_ok=True) for clip in clips: assembled = assembled_by_id[clip.clip_id] instr = layout_instructions.get(clip.clip_id) if instr is None: hint = clip.layout_hint or LayoutKind.SIT_CENTER instr = LayoutInstruction(clip_id=clip.clip_id, layout=hint) instr = _normalize_layout_for_render(instr, render_theme=config.render_theme, clip=clip) clip.layout = instr.layout rclip = clip_for_render(clip) render_clips_by_id[clip.clip_id] = rclip render_transcripts_by_id[clip.clip_id] = assembled.transcript render_layouts_by_id[clip.clip_id] = instr render_sources_by_id[clip.clip_id] = assembled.source_path subtitle_path = None if config.burn_subtitles: # ASS (not SRT) so the caption file's PlayResY matches the output # resolution and libass' font/margin scaling is 1:1. subtitle_path = generate_ass( rclip, assembled.transcript, subtitles_dir, max_words_per_cue=config.subtitle_max_words_per_cue, max_cue_sec=config.subtitle_max_cue_sec, play_res_x=1080, play_res_y=1920, font_size=config.subtitle_font_size, margin_v=config.subtitle_margin_v, render_theme=config.render_theme, native_highlight_lead_sec=config.subtitle_highlight_lead_sec, native_highlight_min_dwell_sec=config.subtitle_highlight_min_dwell_sec, repair_word_timings=config.repair_subtitle_word_timings, ) else: logger.info("Clip %s: subtitle burn disabled for this run.", clip.clip_id) final_path = config.output_dir / f"short_{clip.clip_id}.mp4" should_overwrite_clip = config.overwrite_outputs or clip.clip_id in rerender_target_ids if final_path.exists() and not should_overwrite_clip: logger.info("Clip %s already rendered, skipping.", clip.clip_id) final_outputs.append(final_path) continue if final_path.exists() and should_overwrite_clip: logger.info("Clip %s exists; overwriting for this render pass.", clip.clip_id) # Font size and margin are already baked into the ASS file at # PlayResY=1920, so the compile primitive does not need to override # them -- but it still does, harmlessly, for single-source overrides. reframe_clip_ffmpeg( input_path=assembled.source_path, output_path=final_path, clip=rclip, layout_instruction=instr, subtitle_path=subtitle_path, subtitle_font_size=config.subtitle_font_size, subtitle_margin_v=config.subtitle_margin_v, title_text=clip.suggested_overlay_title, render_theme=config.render_theme, ) final_outputs.append(final_path) if config.render_qa and final_outputs: logger.info("--- STAGE 4.5: RENDER QA ---") try: run_render_qa( output_dir=config.output_dir, final_outputs=final_outputs, render_clips_by_id=render_clips_by_id, transcripts_by_id=render_transcripts_by_id, layouts_by_id=render_layouts_by_id, assembled_sources_by_id=render_sources_by_id, raw_layouts_by_id=_load_layout_raw_by_clip(config.work_dir), reference_video=config.qa_reference_video, debug_overlay=config.qa_debug_overlay, ) except Exception as exc: # noqa: BLE001 - QA must not fail delivery logger.warning("Render QA failed, leaving rendered shorts intact: %s", exc) # ------------------------------------------------------------------ # Done # ------------------------------------------------------------------ logger.info("=" * 60) logger.info("PIPELINE COMPLETE - %d shorts generated:", len(final_outputs)) for p in final_outputs: logger.info(" -> %s", p) logger.info("=" * 60) if config.interactive and final_outputs and state is not None: feedback = interactive.rate_output(final_outputs) state.last_rating = feedback session_state.save_state(config.work_dir, state) if feedback.rating == 3: logger.info("Rated Great. Shipped.") return final_outputs steering = _build_steering_from_feedback(feedback) if not steering: logger.warning("Interactive feedback recorded, but it is not actionable until a later gate ships.") return final_outputs state.iteration += 1 state.steering_notes.append(steering) session_state.save_state(config.work_dir, state) if state.iteration >= config.max_iterations: logger.warning("Iteration cap hit. Source may not have a strong short.") return final_outputs return run_pipeline(_rerun_config(config, state.steering_notes)) return final_outputs