"""Per-clip layout + bbox via Gemini vision (no pixel heuristics in the product pipeline).""" from __future__ import annotations import base64 import hashlib import json import logging import os import re import struct import subprocess from collections.abc import Iterable from io import BytesIO from pathlib import Path from typing import Any from google import genai from google.genai import types from openai import OpenAI from humeo_core.schemas import ( BoundingBox, LayoutInstruction, LayoutKind, Scene, SceneClassification, SceneRegions, TimedCenterPoint, ) from humeo_core.primitives.vision import layout_instruction_from_regions from humeo.config import GEMINI_MODEL, GEMINI_VISION_MODEL, PipelineConfig from humeo.env import ( OPENROUTER_BASE_URL, current_llm_provider, model_name_for_provider, openrouter_default_headers, resolve_gemini_api_key, resolve_llm_provider, resolve_openrouter_api_keys, ) from humeo.gemini_generate import gemini_generate_config logger = logging.getLogger(__name__) LAYOUT_VISION_CACHE_VERSION = 9 LAYOUT_VISION_META = "layout_vision.meta.json" LAYOUT_VISION_JSON = "layout_vision.json" TRACKING_SAMPLE_FRACTIONS = tuple(i / 10.0 for i in range(1, 10)) TRACKING_MIN_SPREAD_NORM = 0.08 TRACKING_OUTLIER_DELTA_NORM = 0.16 TRACKING_OUTLIER_NEIGHBOR_MAX_NORM = 0.10 TRACKING_DEADBAND_NORM = 0.025 TRACKING_MIN_USABLE_POINTS = 5 TRACKING_UNSTABLE_JUMP_NORM = 0.18 FOCUS_SWITCH_LEAD_SEC = 0.35 SPEAKER_FOLLOW_MAX_INTERVAL_SEC = 2.0 TWO_SPEAKER_ACTIVE_ZOOM = 1.28 TWO_SPEAKER_BOTH_ZOOM = 1.0 TWO_SPEAKER_WIDE_ACTIVE_ZOOM = 1.12 TWO_SPEAKER_BOTH_FIT_MARGIN = 0.88 REPLICATE_SAM2_VIDEO_PINNED = ( "meta/sam-2-video:2d7219877ca847f463d749d9b224e62f7b078fe035d60a74b58889b455d5cbad" ) _MIN_SPLIT_STRIP_FRAC = 0.2 _SPLIT_TOP_RATIO_MIN = 0.32 _SPLIT_TOP_RATIO_MAX = 0.48 _SPLIT_FACE_REGION_MIN_HEIGHT = 0.62 _SPLIT_FACE_REGION_HEIGHT_MULT = 2.0 _SPLIT_FACE_TOP_PAD_MULT = 0.30 GEMINI_LAYOUT_VISION_PROMPT = """You are framing a vertical short (9:16) from a 16:9 video frame. HARD RULE: the final short shows AT MOST TWO on-screen items. An "item" is one of person (a human speaker) or chart (slide, graph, data visual, screenshare). That gives exactly five layouts to choose from. Return ONLY a JSON object with this exact shape: { "layout": "zoom_call_center" | "sit_center" | "split_chart_person" | "split_two_persons" | "split_two_charts", "person_bbox": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0} | null, "face_bbox": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0} | null, "chart_bbox": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0} | null, "second_person_bbox": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0} | null, "second_face_bbox": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0} | null, "second_chart_bbox": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0} | null, "reason": "short rationale" } Bbox rules: - All bbox coordinates are normalized 0..1 (left/top = 0, right/bottom = 1). Require x2 > x1 and y2 > y1 when a bbox is non-null. - person_bbox / second_person_bbox: tight box around each speaker's head AND upper body. If two speakers are visible, ``person_bbox`` is the LEFT speaker and ``second_person_bbox`` is the RIGHT speaker (by x-center). - face_bbox / second_face_bbox: TIGHT box around the SPEAKER'S FACE ONLY (forehead to chin, ear to ear). This is NOT the full body — exclude torso, arms, shoulders, tank top, mug, table. The face bbox drives horizontal framing in the 9:16 crop, so putting torso or arms in it will push the face off-screen. * If the subject is shown in profile, the face_bbox still surrounds only the visible half of the head (ear to nose, forehead to chin). It should be roughly square-ish, not a tall body rectangle. * ``face_bbox`` matches ``person_bbox`` (same speaker), ``second_face_bbox`` matches ``second_person_bbox``. * Set face bbox to null ONLY if no face is visible at all (back of head, occluded, off-frame). - chart_bbox / second_chart_bbox: slide, chart, graph, or large on-screen graphic. If two charts are visible, ``chart_bbox`` is the LEFT chart and ``second_chart_bbox`` is the RIGHT chart. - The two bboxes of the same kind must not overlap meaningfully; they should partition the source frame into distinct regions. Layout selection (pick exactly one): - zoom_call_center: ONE person, tight webcam / video-call headshot filling much of the frame. person_bbox + face_bbox set; others null. - sit_center: ONE person, interview / seated framing, or when unsure. person_bbox + face_bbox set; others null. - split_chart_person: ONE chart + ONE person in distinct regions (webinar / explainer). person_bbox + face_bbox + chart_bbox set; second_* null. - split_two_persons: TWO visible speakers (interview two-up, podcast panel). person_bbox + face_bbox AND second_person_bbox + second_face_bbox set; chart bboxes null. - split_two_charts: TWO charts / slides side-by-side. chart_bbox AND second_chart_bbox set; person/face bboxes null. When in doubt prefer ``sit_center``. Never output more than two of {person, chart} items in total. No markdown. JSON only.""" ACTIVE_SPEAKER_VISION_PROMPT = """You are analyzing a single frame from a two-person talking video. Return ONLY a JSON object: { "speaker": "left" | "right" | "both" | "unclear", "reason": "short rationale" } Rules: - "left" means the LEFT visible person appears to be the one speaking in this exact frame. - "right" means the RIGHT visible person appears to be the one speaking in this exact frame. - Use visible cues only: open mouth mid-word, facial expression while talking, hand gesture timing, body engagement. - If both appear to be talking at once, return "both". - If it is impossible to tell from this frame, return "unclear". - No markdown. JSON only.""" def _openai_message_text(content: object) -> str: if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text") if isinstance(text, str): parts.append(text) return "".join(parts) return "" def _json_object_from_vision_response(raw: object) -> dict[str, Any]: if isinstance(raw, dict): return raw if isinstance(raw, list): for item in raw: if isinstance(item, dict): return item if isinstance(raw, str): text = raw.strip() if text.startswith("```"): text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.IGNORECASE) text = re.sub(r"\s*```$", "", text) text = "".join(ch if ch >= " " or ch in "\r\n\t" else " " for ch in text) starts = [idx for idx in (text.find("{"), text.find("[")) if idx >= 0] if starts: decoder = json.JSONDecoder() for start in sorted(starts): try: parsed, _ = decoder.raw_decode(text[start:]) except json.JSONDecodeError: continue return _json_object_from_vision_response(parsed) raise TypeError(f"Expected vision JSON object, got {type(raw).__name__}") def _clips_fingerprint(clips_path: Path) -> str: if not clips_path.is_file(): return "" return hashlib.sha256(clips_path.read_bytes()).hexdigest() def layout_cache_valid( work_dir: Path, *, transcript_fp: str, clips_fp: str, vision_model: str, segmentation_provider: str = "off", segmentation_model: str = "meta/sam-2-video", ) -> bool: meta_path = work_dir / LAYOUT_VISION_META data_path = work_dir / LAYOUT_VISION_JSON if not meta_path.is_file() or not data_path.is_file(): return False try: meta: dict[str, Any] = json.loads(meta_path.read_text(encoding="utf-8")) except json.JSONDecodeError: return False return ( meta.get("layout_vision_cache_version") == LAYOUT_VISION_CACHE_VERSION and meta.get("transcript_sha256") == transcript_fp and meta.get("clips_sha256") == clips_fp and meta.get("gemini_vision_model") == vision_model and meta.get("segmentation_provider", "off") == segmentation_provider and meta.get("segmentation_model", "meta/sam-2-video") == segmentation_model and ( current_llm_provider() is None or ( current_llm_provider() == "google" and meta.get("llm_backend") in (None, "google") ) or meta.get("llm_backend") == current_llm_provider() ) ) def load_layout_cache(work_dir: Path) -> dict[str, dict[str, Any]] | None: p = work_dir / LAYOUT_VISION_JSON if not p.is_file(): return None try: data = json.loads(p.read_text(encoding="utf-8")) except json.JSONDecodeError: return None clips = data.get("clips") return clips if isinstance(clips, dict) else None def write_layout_cache( work_dir: Path, *, transcript_fp: str, clips_fp: str, vision_model: str, clips_payload: dict[str, dict[str, Any]], segmentation_provider: str = "off", segmentation_model: str = "meta/sam-2-video", ) -> None: work_dir.mkdir(parents=True, exist_ok=True) meta = { "layout_vision_cache_version": LAYOUT_VISION_CACHE_VERSION, "transcript_sha256": transcript_fp, "clips_sha256": clips_fp, "gemini_vision_model": vision_model, "llm_backend": current_llm_provider() or "google", "segmentation_provider": segmentation_provider, "segmentation_model": segmentation_model, } (work_dir / LAYOUT_VISION_META).write_text( json.dumps(meta, indent=2) + "\n", encoding="utf-8" ) (work_dir / LAYOUT_VISION_JSON).write_text( json.dumps({"clips": clips_payload}, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) logger.info("Wrote %s and %s", LAYOUT_VISION_META, LAYOUT_VISION_JSON) def _png_dims(path: Path) -> tuple[int, int] | None: try: with path.open("rb") as f: head = f.read(24) if head[:8] != b"\x89PNG\r\n\x1a\n": return None width, height = struct.unpack(">II", head[16:24]) return int(width), int(height) except Exception: return None def _jpeg_dims(path: Path) -> tuple[int, int] | None: try: with path.open("rb") as f: if f.read(2) != b"\xff\xd8": return None sof_markers = { 0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF, } while True: marker_start = f.read(1) if not marker_start: return None if marker_start != b"\xff": continue marker = f.read(1) while marker == b"\xff": marker = f.read(1) if not marker: return None marker_byte = marker[0] if marker_byte in (0xD8, 0xD9, 0x01) or 0xD0 <= marker_byte <= 0xD7: continue seg_len_bytes = f.read(2) if len(seg_len_bytes) != 2: return None seg_len = struct.unpack(">H", seg_len_bytes)[0] if seg_len < 2: return None if marker_byte in sof_markers: frame_header = f.read(5) if len(frame_header) != 5: return None _, height, width = struct.unpack(">BHH", frame_header) return int(width), int(height) f.seek(seg_len - 2, 1) except Exception: return None def _keyframe_dimensions(keyframe_path: str) -> tuple[int, int] | None: path = Path(keyframe_path) try: from PIL import Image # type: ignore with Image.open(path) as img: width, height = img.size return int(width), int(height) except Exception: pass png_dims = _png_dims(path) if png_dims is not None: return png_dims return _jpeg_dims(path) def _normalize_bbox_payload( raw: dict[str, Any], image_size: tuple[int, int] | None ) -> dict[str, Any]: if image_size is None: return dict(raw) width, height = image_size normalized = dict(raw) x_values = [ float(normalized[key]) for key in ("x1", "x2") if isinstance(normalized.get(key), (int, float)) ] y_values = [ float(normalized[key]) for key in ("y1", "y2") if isinstance(normalized.get(key), (int, float)) ] if not x_values and not y_values: return normalized use_thousand_grid = False if any(v > 1.0 for v in x_values + y_values): max_coord = max(x_values + y_values) fits_image_pixels = ( all(v <= float(width) for v in x_values) and all(v <= float(height) for v in y_values) ) if max_coord <= 1000.0 and not fits_image_pixels: use_thousand_grid = True x_scale = 1000.0 if use_thousand_grid else float(width) y_scale = 1000.0 if use_thousand_grid else float(height) axis_scales = { "x1": x_scale, "x2": x_scale, "y1": y_scale, "y2": y_scale, } for key, axis_scale in axis_scales.items(): value = normalized.get(key) if not isinstance(value, (int, float)): continue coord = float(value) if coord > 1.0 and axis_scale > 0.0: coord = coord / axis_scale normalized[key] = max(0.0, min(coord, 1.0)) return normalized def _parse_bbox( raw: object, *, image_size: tuple[int, int] | None = None ) -> BoundingBox | None: if not raw or not isinstance(raw, dict): return None try: return BoundingBox.model_validate(_normalize_bbox_payload(raw, image_size)) except Exception: return None def _instruction_from_gemini_json( scene_id: str, data: dict[str, Any], *, image_size: tuple[int, int] | None = None, ) -> LayoutInstruction: """Translate Gemini's JSON into a validated :class:`LayoutInstruction`. Falls back to ``sit_center`` whenever the LLM returns something the contract doesn't support, so a bad vision call can never crash the pipeline. Also downgrades "two-item" layouts when the second bbox is missing -- e.g. ``split_two_persons`` with only one person_bbox drops to ``sit_center`` rather than rendering a silently-broken split. """ layout_str = str(data.get("layout", "sit_center")).strip() try: kind = LayoutKind(layout_str) except ValueError: kind = LayoutKind.SIT_CENTER pb = _parse_bbox(data.get("person_bbox"), image_size=image_size) fb = _parse_bbox(data.get("face_bbox"), image_size=image_size) cb = _parse_bbox(data.get("chart_bbox"), image_size=image_size) p2 = _parse_bbox(data.get("second_person_bbox"), image_size=image_size) f2 = _parse_bbox(data.get("second_face_bbox"), image_size=image_size) c2 = _parse_bbox(data.get("second_chart_bbox"), image_size=image_size) reason = str(data.get("reason", ""))[:400] # Downgrade any split that is missing its required bboxes, so we never # emit a split layout that will render as garbage. if kind == LayoutKind.SPLIT_CHART_PERSON and (pb is None or cb is None): kind = LayoutKind.SIT_CENTER if pb is not None else LayoutKind.SIT_CENTER if kind == LayoutKind.SPLIT_TWO_PERSONS and (pb is None or p2 is None): kind = LayoutKind.SIT_CENTER if kind == LayoutKind.SPLIT_TWO_CHARTS and (cb is None or c2 is None): kind = LayoutKind.SIT_CENTER regions = SceneRegions( scene_id=scene_id, person_bbox=pb, chart_bbox=cb, raw_reason=reason ) classification = SceneClassification( scene_id=scene_id, layout=kind, confidence=1.0, reason=reason ) instr = layout_instruction_from_regions( regions, classification, clip_id=scene_id ) updates: dict[str, Any] = {} # CENTERING FIX: the single-person 9:16 crop is driven by ``person_x_norm``. # A ``person_bbox`` that spans head + torso + arms is fine for framing # *extent* but its center_x can drift far from the actual face when the # subject is in profile or asymmetric (one arm up, mug on the table, etc). # Prefer the tight ``face_bbox`` center when the model gave us one so the # face lands in the visual center of the vertical crop instead of the # torso doing. face_center = _face_center_x(fb, pb) if face_center is not None: updates["person_x_norm"] = face_center if kind == LayoutKind.SPLIT_CHART_PERSON and pb is not None and cb is not None: render_person = _render_safe_split_person_region(pb, fb) updates["split_chart_region"] = cb updates["split_person_region"] = render_person updates["top_band_ratio"] = _split_chart_person_top_band_ratio(cb, render_person) elif kind == LayoutKind.SPLIT_TWO_PERSONS and pb is not None and p2 is not None: # Order by x-center so ``split_person_region`` is always the LEFT speaker. left, right = sorted((pb, p2), key=lambda b: b.center_x) updates["split_person_region"] = left updates["split_second_person_region"] = right elif kind == LayoutKind.SPLIT_TWO_CHARTS and cb is not None and c2 is not None: left, right = sorted((cb, c2), key=lambda b: b.center_x) updates["split_chart_region"] = left updates["split_second_chart_region"] = right if updates: instr = instr.model_copy(update=updates) return instr def _face_center_x( face: BoundingBox | None, person: BoundingBox | None ) -> float | None: """Pick a horizontal center to aim the 9:16 crop at. Priority: 1. ``face`` bbox center when it looks reasonable (narrow, plausibly inside the matching person bbox). 2. No override (caller keeps the person-bbox center, or the default 0.5 when neither was provided). We sanity-check the face box because Gemini sometimes echoes the full person bbox into ``face_bbox``. If the face bbox is as wide as the person bbox, it gives us nothing new; fall back to the person center rather than pretending we have a tighter signal. """ if face is None: return None face_w = max(0.0, face.x2 - face.x1) if face_w <= 0.0: return None # A real face in a 16:9 frame is rarely wider than ~35% of frame width, # even for tight webcam framing. A face "bbox" that's wider than that # almost certainly includes torso and is no better than person_bbox. if face_w > 0.40: return None # If we have a person bbox too, require the face center to sit inside it # — otherwise the model got confused and matched the wrong subject. if person is not None: if not (person.x1 - 0.02 <= face.center_x <= person.x2 + 0.02): return None return float(face.center_x) def _render_safe_split_person_region( person: BoundingBox, face: BoundingBox | None, ) -> BoundingBox: """Bias split speaker crops toward head-and-shoulders instead of torso.""" if face is None or _face_center_x(face, person) is None: return person face_h = max(0.0, face.y2 - face.y1) if face_h <= 0.0: return person target_h = min( person.y2 - person.y1, max(_SPLIT_FACE_REGION_MIN_HEIGHT, face_h * _SPLIT_FACE_REGION_HEIGHT_MULT), ) top = max(0.0, min(person.y1, face.y1 - face_h * _SPLIT_FACE_TOP_PAD_MULT)) bottom = min(person.y2, top + target_h) if bottom - top < target_h: top = max(0.0, bottom - target_h) if bottom - top <= face_h: return person return person.model_copy(update={"y1": top, "y2": bottom}) def _split_chart_person_top_band_ratio( chart: BoundingBox, person: BoundingBox, ) -> float: """Allocate top/bottom band height from the chart/person aspect needs.""" seam = (chart.x2 + person.x1) / 2.0 seam = max(_MIN_SPLIT_STRIP_FRAC, min(1.0 - _MIN_SPLIT_STRIP_FRAC, seam)) chart_w = max(1e-6, seam) person_w = max(1e-6, 1.0 - seam) chart_need = max(1e-6, (chart.y2 - chart.y1) / chart_w) person_need = max(1e-6, (person.y2 - person.y1) / person_w) ratio = chart_need / (chart_need + person_need) return round(max(_SPLIT_TOP_RATIO_MIN, min(_SPLIT_TOP_RATIO_MAX, ratio)), 3) def _person_center_x_from_data( data: dict[str, Any], image_size: tuple[int, int] | None = None ) -> float | None: person_bbox = _parse_bbox(data.get("person_bbox"), image_size=image_size) face_bbox = _parse_bbox(data.get("face_bbox"), image_size=image_size) face_center = _face_center_x(face_bbox, person_bbox) if face_center is not None: return face_center if person_bbox is not None: return float(person_bbox.center_x) return None def _tracking_sample_times(duration_sec: float) -> list[float]: seen: set[float] = set() out: list[float] = [] for fraction in TRACKING_SAMPLE_FRACTIONS: t_sec = max(0.0, min(duration_sec, duration_sec * fraction)) key = round(t_sec, 3) if key in seen: continue seen.add(key) out.append(t_sec) return out def _tracking_points_from_centers( duration_sec: float, centers: list[tuple[float, float]] ) -> list[TimedCenterPoint]: deduped: list[tuple[float, float]] = [] for t_sec, x_norm in sorted(centers, key=lambda item: item[0]): clamped_t = max(0.0, min(duration_sec, float(t_sec))) clamped_x = max(0.0, min(1.0, float(x_norm))) if deduped and abs(clamped_t - deduped[-1][0]) < 1e-6: deduped[-1] = (clamped_t, clamped_x) else: deduped.append((clamped_t, clamped_x)) if len(deduped) < 2: return [] filtered = list(deduped) for idx in range(1, len(filtered) - 1): prev_x = filtered[idx - 1][1] curr_t, curr_x = filtered[idx] next_x = filtered[idx + 1][1] if ( abs(prev_x - next_x) <= TRACKING_OUTLIER_NEIGHBOR_MAX_NORM and abs(curr_x - prev_x) >= TRACKING_OUTLIER_DELTA_NORM and abs(curr_x - next_x) >= TRACKING_OUTLIER_DELTA_NORM ): filtered[idx] = (curr_t, (prev_x + next_x) / 2.0) smoothed = list(filtered) for idx in range(1, len(filtered) - 1): prev_x = filtered[idx - 1][1] curr_t, curr_x = filtered[idx] next_x = filtered[idx + 1][1] median_x = sorted((prev_x, curr_x, next_x))[1] if abs(curr_x - median_x) > TRACKING_DEADBAND_NORM: smoothed[idx] = (curr_t, median_x) if len(smoothed) >= 5: wider_smoothed = list(smoothed) for idx in range(1, len(smoothed) - 1): window = smoothed[max(0, idx - 2) : min(len(smoothed), idx + 3)] median_x = sorted(x for _, x in window)[len(window) // 2] curr_t, curr_x = smoothed[idx] if abs(curr_x - median_x) >= TRACKING_OUTLIER_DELTA_NORM: wider_smoothed[idx] = (curr_t, median_x) smoothed = wider_smoothed filtered = list(smoothed) for idx in range(1, len(filtered)): prev_t, prev_x = filtered[idx - 1] curr_t, curr_x = filtered[idx] if abs(curr_x - prev_x) < TRACKING_DEADBAND_NORM: filtered[idx] = (curr_t, prev_x) spread = max(x for _, x in filtered) - min(x for _, x in filtered) if spread < TRACKING_MIN_SPREAD_NORM: stable_x = sum(x for _, x in filtered) / len(filtered) return [ TimedCenterPoint(t_sec=0.0, x_norm=stable_x), TimedCenterPoint(t_sec=duration_sec, x_norm=stable_x), ] if filtered[0][0] > 0.0: filtered.insert(0, (0.0, filtered[0][1])) else: filtered[0] = (0.0, filtered[0][1]) if filtered[-1][0] < duration_sec: filtered.append((duration_sec, filtered[-1][1])) else: filtered[-1] = (duration_sec, filtered[-1][1]) return [TimedCenterPoint(t_sec=t_sec, x_norm=x_norm) for t_sec, x_norm in filtered] def _tracking_is_unstable(points: list[TimedCenterPoint]) -> bool: if len(points) < TRACKING_MIN_USABLE_POINTS: return True return any( abs(points[idx].x_norm - points[idx - 1].x_norm) > TRACKING_UNSTABLE_JUMP_NORM for idx in range(1, len(points)) ) def _interpolate_tracking_x(points: list[TimedCenterPoint], t_sec: float) -> float | None: if not points: return None if t_sec <= points[0].t_sec: return float(points[0].x_norm) if t_sec >= points[-1].t_sec: return float(points[-1].x_norm) for idx in range(1, len(points)): left = points[idx - 1] right = points[idx] if right.t_sec < t_sec: continue span = right.t_sec - left.t_sec if span <= 1e-6: return float(right.x_norm) alpha = (t_sec - left.t_sec) / span return float(left.x_norm + (right.x_norm - left.x_norm) * alpha) return float(points[-1].x_norm) def _speaker_seed_boxes( data: dict[str, Any], image_size: tuple[int, int] | None ) -> tuple[BoundingBox, BoundingBox] | None: first_person = _parse_bbox(data.get("person_bbox"), image_size=image_size) first_face = _parse_bbox(data.get("face_bbox"), image_size=image_size) second_person = _parse_bbox(data.get("second_person_bbox"), image_size=image_size) second_face = _parse_bbox(data.get("second_face_bbox"), image_size=image_size) left = first_face or first_person right = second_face or second_person if left is None or right is None: return None ordered = sorted((left, right), key=lambda box: box.center_x) return ordered[0], ordered[1] def _nearest_seed_side( center_x: float, *, left_seed: BoundingBox, right_seed: BoundingBox, ) -> str: left_delta = abs(center_x - left_seed.center_x) right_delta = abs(center_x - right_seed.center_x) return "left" if left_delta <= right_delta else "right" def _focus_frame_visible_speaker_centers( data: dict[str, Any] | None, image_size: tuple[int, int] | None, *, left_seed: BoundingBox, right_seed: BoundingBox, ) -> tuple[dict[str, float], bool]: if not data: return {}, False first_person = _parse_bbox(data.get("person_bbox"), image_size=image_size) first_face = _parse_bbox(data.get("face_bbox"), image_size=image_size) second_person = _parse_bbox(data.get("second_person_bbox"), image_size=image_size) second_face = _parse_bbox(data.get("second_face_bbox"), image_size=image_size) visible_boxes = [box for box in (first_face or first_person, second_face or second_person) if box] if not visible_boxes: return {}, False if len(visible_boxes) >= 2: ordered = sorted(visible_boxes, key=lambda box: box.center_x) return {"left": ordered[0].center_x, "right": ordered[1].center_x}, True only_box = visible_boxes[0] side = _nearest_seed_side(only_box.center_x, left_seed=left_seed, right_seed=right_seed) return {side: only_box.center_x}, False def _two_speaker_full_width_span_norm(image_size: tuple[int, int] | None) -> float: if image_size is None: return 1.0 width, height = image_size if width <= 0 or height <= 0: return 1.0 target_aspect = 9 / 16 if width / height >= target_aspect: return min(1.0, (height * target_aspect) / width) return 1.0 def _can_fit_both_speakers( left_x: float, right_x: float, *, image_size: tuple[int, int] | None, ) -> bool: span = abs(right_x - left_x) allowed = _two_speaker_full_width_span_norm(image_size) * TWO_SPEAKER_BOTH_FIT_MARGIN return span <= allowed def _speaker_follow_sample_times(duration_sec: float) -> list[float]: seen: set[float] = set() out: list[float] = [] dense_times: list[float] = [] if duration_sec > 0: steps = max(1, int(duration_sec / SPEAKER_FOLLOW_MAX_INTERVAL_SEC)) dense_times = [ min(duration_sec, idx * SPEAKER_FOLLOW_MAX_INTERVAL_SEC) for idx in range(1, steps + 1) ] for t_sec in [0.0, *_tracking_sample_times(duration_sec), *dense_times, duration_sec]: key = round(max(0.0, min(duration_sec, t_sec)), 3) if key in seen: continue seen.add(key) out.append(key) return out def _resolve_speaker_focus_samples( samples: list[tuple[float, str]], *, default_side: str = "left", ) -> list[tuple[float, str]]: normalized: list[tuple[float, str | None]] = [] allowed = {"left", "right", "both"} for t_sec, side in samples: normalized.append((float(t_sec), side if side in allowed else None)) out: list[tuple[float, str]] = [] for idx, (t_sec, side) in enumerate(normalized): if side is not None: out.append((t_sec, side)) continue prev_side = out[-1][1] if out else None next_side: str | None = None for _, future_side in normalized[idx + 1 :]: if future_side is not None: next_side = future_side break resolved_side: str if prev_side is not None and next_side is not None: resolved_side = prev_side if prev_side == next_side else "both" else: resolved_side = prev_side or next_side or default_side out.append((t_sec, resolved_side)) return out def _tracking_points_from_focus_states( duration_sec: float, framings: list[tuple[float, float, float]], ) -> list[TimedCenterPoint]: deduped: list[tuple[float, float, float]] = [] for t_sec, x_norm, zoom in sorted(framings, key=lambda item: item[0]): clamped_t = max(0.0, min(duration_sec, float(t_sec))) clamped_x = max(0.0, min(1.0, float(x_norm))) clamped_zoom = max(1.0, min(4.0, float(zoom))) if deduped and abs(clamped_t - deduped[-1][0]) < 1e-6: deduped[-1] = (clamped_t, clamped_x, clamped_zoom) else: deduped.append((clamped_t, clamped_x, clamped_zoom)) if len(deduped) < 2: return [] if deduped[0][0] > 0.0: deduped.insert(0, (0.0, deduped[0][1], deduped[0][2])) else: deduped[0] = (0.0, deduped[0][1], deduped[0][2]) if deduped[-1][0] < duration_sec: deduped.append((duration_sec, deduped[-1][1], deduped[-1][2])) else: deduped[-1] = (duration_sec, deduped[-1][1], deduped[-1][2]) expanded: list[tuple[float, float, float]] = [deduped[0]] for t_sec, x_norm, zoom in deduped[1:]: prev_t, prev_x, prev_zoom = expanded[-1] switch_changed = ( abs(x_norm - prev_x) > TRACKING_DEADBAND_NORM or abs(zoom - prev_zoom) > 0.05 ) if switch_changed: hold_t = max(prev_t, min(t_sec, t_sec - FOCUS_SWITCH_LEAD_SEC)) if hold_t - prev_t > 1e-6: expanded.append((hold_t, prev_x, prev_zoom)) if abs(t_sec - expanded[-1][0]) < 1e-6: expanded[-1] = (t_sec, x_norm, zoom) else: expanded.append((t_sec, x_norm, zoom)) return [ TimedCenterPoint(t_sec=t_sec, x_norm=x_norm, zoom=zoom) for t_sec, x_norm, zoom in expanded ] def _nearest_non_both_focus_side( resolved_focus: list[tuple[float, str]], start_idx: int, *, step: int, ) -> str | None: idx = start_idx while 0 <= idx < len(resolved_focus): side = resolved_focus[idx][1] if side in ("left", "right"): return side idx += step return None def _extract_frame_at_time(source_path: Path, time_sec: float, output_path: Path) -> Path: output_path.parent.mkdir(parents=True, exist_ok=True) subprocess.run( [ "ffmpeg", "-y", "-loglevel", "error", "-ss", f"{time_sec:.3f}", "-i", str(source_path), "-frames:v", "1", "-q:v", "2", str(output_path), ], check=True, capture_output=True, ) return output_path def _probe_video_fps(source_path: Path) -> float: result = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "default=noprint_wrappers=1:nokey=1", str(source_path), ], check=False, capture_output=True, text=True, ) rate = (result.stdout or "").strip() if "/" in rate: num, den = rate.split("/", 1) try: return max(1.0, float(num) / max(float(den), 1.0)) except ValueError: return 30.0 try: return max(1.0, float(rate)) except ValueError: return 30.0 def _segmentation_center_x_from_url(mask_url: str) -> float | None: try: import httpx from PIL import Image # type: ignore except ImportError: return None response = httpx.get(mask_url, timeout=120.0) response.raise_for_status() with Image.open(BytesIO(response.content)) as image: image = image.convert("L") width, height = image.size pixels = image.load() xs: list[int] = [] for y in range(height): for x in range(width): if pixels[x, y] > 16: xs.append(x) if not xs or width <= 0: return None return float(sum(xs) / len(xs) / width) def _segmentation_mask_urls(output: object) -> list[str]: def _coerce_urls(items: Iterable[object]) -> list[str]: urls: list[str] = [] for item in items: if item is None: continue if isinstance(item, (str, Path)): text = str(item).strip() else: url = getattr(item, "url", None) text = str(url).strip() if isinstance(url, str) else str(item).strip() if text: urls.append(text) return urls if isinstance(output, dict): for key in ("black_white_masks", "masks", "output"): value = output.get(key) if isinstance(value, (str, bytes, bytearray)) or value is None: continue try: urls = _coerce_urls(value) except TypeError: continue if urls: return urls return [] if isinstance(output, (str, bytes, bytearray)) or output is None: return [] try: return _coerce_urls(output) except TypeError: return [] def _infer_person_tracking_with_segmentation( scene: Scene, *, source_video: Path, segmentation_model: str, initial_data: dict[str, Any] | None = None, initial_image_size: tuple[int, int] | None = None, seed_bbox: BoundingBox | None = None, object_id: str = "speaker", ) -> tuple[list[TimedCenterPoint], dict[str, Any] | None]: token = (os.environ.get("REPLICATE_API_TOKEN") or "").strip() if not token: raise RuntimeError("REPLICATE_API_TOKEN is not set") if initial_image_size is None: raise RuntimeError("Segmentation fallback requires the keyframe dimensions") if seed_bbox is None and initial_data is None: raise RuntimeError("Segmentation fallback requires an initial vision bbox") if seed_bbox is None: face_bbox = _parse_bbox(initial_data.get("face_bbox"), image_size=initial_image_size) person_bbox = _parse_bbox(initial_data.get("person_bbox"), image_size=initial_image_size) seed_bbox = face_bbox or person_bbox if seed_bbox is None: raise RuntimeError("No seed bbox available for segmentation fallback") try: import replicate except ImportError as exc: raise RuntimeError("replicate package is not installed") from exc width, height = initial_image_size fps = _probe_video_fps(source_video) midpoint_frame = max(0, int(round((scene.duration / 2.0) * fps))) output_frame_interval = max(1, int(round(max(1.0, scene.duration * fps) / 10.0))) click_x = int(round(seed_bbox.center_x * width)) click_y = int(round(seed_bbox.center_y * height)) prompt_frames = [0] if midpoint_frame > 0: prompt_frames.append(midpoint_frame) prompt_coordinates = ",".join(f"[{click_x},{click_y}]" for _ in prompt_frames) prompt_labels = ",".join("1" for _ in prompt_frames) prompt_frame_str = ",".join(str(frame_idx) for frame_idx in prompt_frames) prompt_object_ids = ",".join(object_id for _ in prompt_frames) run_input = { "input_video": None, "click_coordinates": prompt_coordinates, "click_labels": prompt_labels, "click_frames": prompt_frame_str, "click_object_ids": prompt_object_ids, "mask_type": "binary", "annotation_type": "mask", "output_video": False, "output_format": "png", "output_frame_interval": output_frame_interval, } with source_video.open("rb") as handle: client = replicate.Client(api_token=token) run_input["input_video"] = handle try: output = client.run(segmentation_model, input=run_input) resolved_model = segmentation_model except Exception as exc: if ":" in segmentation_model or "404" not in str(exc): raise handle.seek(0) output = client.run(REPLICATE_SAM2_VIDEO_PINNED, input=run_input) resolved_model = REPLICATE_SAM2_VIDEO_PINNED urls = _segmentation_mask_urls(output) if not urls: raise RuntimeError("Segmentation fallback returned no masks") centers: list[tuple[float, float]] = [] for idx, mask_url in enumerate(urls): center_x = _segmentation_center_x_from_url(mask_url) if center_x is None: continue rel_time = min(scene.duration, (idx * output_frame_interval) / fps) centers.append((rel_time, center_x)) points = _tracking_points_from_centers(scene.duration, centers) detail = { "provider": "replicate", "model": resolved_model, "seed_point_px": [click_x, click_y], "seed_frame": midpoint_frame, "prompt_frames": prompt_frames, "output_frame_interval": output_frame_interval, "mask_count": len(urls), } return points, detail def _infer_two_speaker_focus_tracking_with_segmentation( scene: Scene, *, source_video: Path, tracking_dir: Path, model_name: str, segmentation_model: str, initial_data: dict[str, Any], initial_image_size: tuple[int, int] | None, ) -> tuple[list[TimedCenterPoint], dict[str, Any] | None]: seeds = _speaker_seed_boxes(initial_data, initial_image_size) if seeds is None: raise RuntimeError("Two-speaker SAM follow requires both speaker bboxes") left_seed, right_seed = seeds left_points, left_detail = _infer_person_tracking_with_segmentation( scene, source_video=source_video, segmentation_model=segmentation_model, initial_data=initial_data, initial_image_size=initial_image_size, seed_bbox=left_seed, object_id="left_speaker", ) right_points, right_detail = _infer_person_tracking_with_segmentation( scene, source_video=source_video, segmentation_model=segmentation_model, initial_data=initial_data, initial_image_size=initial_image_size, seed_bbox=right_seed, object_id="right_speaker", ) if not left_points or not right_points: raise RuntimeError("Two-speaker SAM follow did not return both speaker tracks") focus_dir = tracking_dir / scene.scene_id / "speaker_focus" focus_samples: list[dict[str, Any]] = [] focus_choices: list[tuple[float, str]] = [] for rel_time in _speaker_follow_sample_times(max(0.0, scene.duration)): abs_time = scene.start_time + rel_time frame_path = focus_dir / f"{scene.scene_id}_{int(round(rel_time * 1000)):06d}.jpg" visible_centers: dict[str, float] = {} both_visible = False try: _extract_frame_at_time(source_video, abs_time, frame_path) frame_image_size = _keyframe_dimensions(str(frame_path)) layout_data: dict[str, Any] | None = None layout_error: str | None = None try: layout_data = _call_gemini_vision(str(frame_path), model_name) visible_centers, both_visible = _focus_frame_visible_speaker_centers( layout_data, frame_image_size, left_seed=left_seed, right_seed=right_seed, ) except Exception as exc: layout_error = str(exc) data = _call_active_speaker_vision(str(frame_path), model_name) speaker = str(data.get("speaker", "unclear")).strip().lower() if speaker not in ("left", "right", "both", "unclear"): speaker = "unclear" if speaker == "unclear" and len(visible_centers) == 1 and not both_visible: speaker = next(iter(visible_centers)) focus_choices.append((rel_time, speaker)) sample = { "time_sec": rel_time, "frame_path": str(frame_path), "speaker": speaker, "raw": data, "visible_centers": visible_centers, "both_visible": both_visible, } if layout_data is not None: sample["layout_raw"] = layout_data if layout_error: sample["layout_error"] = layout_error focus_samples.append(sample) except Exception as exc: focus_choices.append((rel_time, "unclear")) focus_samples.append( { "time_sec": rel_time, "frame_path": str(frame_path), "speaker": "unclear", "visible_centers": visible_centers, "both_visible": both_visible, "error": str(exc), } ) resolved_focus = _resolve_speaker_focus_samples(focus_choices, default_side="left") framings: list[tuple[float, float, float]] = [] for idx, (rel_time, speaker) in enumerate(resolved_focus): sample = focus_samples[idx] if idx < len(focus_samples) else {} sample_visible_centers = sample.get("visible_centers", {}) frame_left_x = ( float(sample_visible_centers["left"]) if isinstance(sample_visible_centers, dict) and "left" in sample_visible_centers else None ) frame_right_x = ( float(sample_visible_centers["right"]) if isinstance(sample_visible_centers, dict) and "right" in sample_visible_centers else None ) both_visible = bool(sample.get("both_visible")) left_x = frame_left_x if frame_left_x is not None else _interpolate_tracking_x(left_points, rel_time) right_x = ( frame_right_x if frame_right_x is not None else _interpolate_tracking_x(right_points, rel_time) ) if left_x is None: left_x = left_seed.center_x if right_x is None: right_x = right_seed.center_x prev_side = _nearest_non_both_focus_side(resolved_focus, idx - 1, step=-1) next_side = _nearest_non_both_focus_side(resolved_focus, idx + 1, step=1) should_widen = False if both_visible and _can_fit_both_speakers(left_x, right_x, image_size=initial_image_size): if speaker == "both": should_widen = True elif ( prev_side is not None and next_side is not None and prev_side != next_side ): should_widen = True if should_widen: x_norm = (left_x + right_x) / 2.0 zoom = TWO_SPEAKER_BOTH_ZOOM elif speaker == "left": x_norm = left_x zoom = TWO_SPEAKER_ACTIVE_ZOOM elif speaker == "right": x_norm = right_x zoom = TWO_SPEAKER_ACTIVE_ZOOM else: fallback_side = prev_side or next_side or "left" x_norm = left_x if fallback_side == "left" else right_x zoom = TWO_SPEAKER_WIDE_ACTIVE_ZOOM framings.append((rel_time, x_norm, zoom)) points = _tracking_points_from_focus_states(scene.duration, framings) detail = { "mode": "two_speaker_follow", "left_segmentation": left_detail, "right_segmentation": right_detail, "focus_samples": focus_samples, "resolved_focus": [ {"time_sec": rel_time, "speaker": speaker} for rel_time, speaker in resolved_focus ], "framing_samples": [ {"time_sec": rel_time, "x_norm": x_norm, "zoom": zoom} for rel_time, x_norm, zoom in framings ], } return points, detail def _infer_person_tracking( scene: Scene, *, source_video: Path, tracking_dir: Path, model_name: str, initial_data: dict[str, Any] | None = None, initial_image_size: tuple[int, int] | None = None, ) -> tuple[list[TimedCenterPoint], list[dict[str, Any]]]: duration_sec = max(0.0, scene.duration) if duration_sec <= 0.0: return [], [] midpoint_rel = duration_sec / 2.0 centers: list[tuple[float, float]] = [] samples: list[dict[str, Any]] = [] if initial_data is not None: center_x = _person_center_x_from_data(initial_data, image_size=initial_image_size) samples.append( { "sample_kind": "midpoint_keyframe", "time_sec": midpoint_rel, "frame_path": scene.keyframe_path, "center_x_norm": center_x, "raw": initial_data, } ) if center_x is not None: centers.append((midpoint_rel, center_x)) scene_tracking_dir = tracking_dir / scene.scene_id for rel_time in _tracking_sample_times(duration_sec): if abs(rel_time - midpoint_rel) < 1e-3: continue abs_time = scene.start_time + rel_time frame_path = scene_tracking_dir / f"{scene.scene_id}_{int(round(rel_time * 1000)):06d}.jpg" try: _extract_frame_at_time(source_video, abs_time, frame_path) data = _call_gemini_vision(str(frame_path), model_name) image_size = _keyframe_dimensions(str(frame_path)) center_x = _person_center_x_from_data(data, image_size=image_size) samples.append( { "sample_kind": "tracking_frame", "time_sec": rel_time, "frame_path": str(frame_path), "center_x_norm": center_x, "raw": data, } ) if center_x is not None: centers.append((rel_time, center_x)) except Exception as e: logger.warning( "Speaker tracking sample failed for %s at %.2fs: %s", scene.scene_id, rel_time, e, ) samples.append( { "sample_kind": "tracking_frame", "time_sec": rel_time, "frame_path": str(frame_path), "error": str(e), } ) return _tracking_points_from_centers(duration_sec, centers), samples def _call_vision_json(keyframe_path: str, model_name: str, prompt: str) -> dict[str, Any]: path = Path(keyframe_path) data = path.read_bytes() mime = "image/jpeg" if path.suffix.lower() in (".jpg", ".jpeg") else "image/png" provider = resolve_llm_provider() resolved_model = model_name_for_provider(model_name, provider) if provider == "google": client = genai.Client(api_key=resolve_gemini_api_key()) response = client.models.generate_content( model=resolved_model, contents=[ types.Part.from_text(text=prompt), types.Part.from_bytes(data=data, mime_type=mime), ], config=gemini_generate_config( temperature=0.2, response_mime_type="application/json", ), ) if not response.text: raise RuntimeError("Gemini vision returned empty response") return _json_object_from_vision_response(response.text) data_url = f"data:{mime};base64,{base64.b64encode(data).decode('ascii')}" keys = resolve_openrouter_api_keys() last_error: Exception | None = None for key_idx, api_key in enumerate(keys, start=1): try: client = OpenAI( api_key=api_key, base_url=OPENROUTER_BASE_URL, default_headers=openrouter_default_headers(), ) response = client.chat.completions.create( model=resolved_model, messages=[ {"role": "system", "content": prompt}, { "role": "user", "content": [ {"type": "text", "text": "Analyze this keyframe and return only JSON."}, {"type": "image_url", "image_url": {"url": data_url}}, ], }, ], temperature=0.2, response_format={"type": "json_object"}, ) text = _openai_message_text(response.choices[0].message.content) if not text: raise RuntimeError("OpenRouter vision returned empty response") if key_idx > 1: logger.info("OpenRouter vision succeeded with fallback key %d/%d", key_idx, len(keys)) return _json_object_from_vision_response(text) except Exception as exc: last_error = exc if key_idx < len(keys): logger.warning( "OpenRouter vision failed with key %d/%d: %s; trying fallback", key_idx, len(keys), exc, ) assert last_error is not None raise last_error def _call_gemini_vision(keyframe_path: str, model_name: str) -> dict[str, Any]: return _call_vision_json(keyframe_path, model_name, GEMINI_LAYOUT_VISION_PROMPT) def _call_active_speaker_vision(frame_path: str, model_name: str) -> dict[str, Any]: return _call_vision_json(frame_path, model_name, ACTIVE_SPEAKER_VISION_PROMPT) def infer_layout_instructions( scenes: list[Scene], *, gemini_vision_model: str, source_video: Path, tracking_dir: Path, source_videos_by_scene: dict[str, Path] | None = None, segmentation_provider: str = "off", segmentation_model: str = "meta/sam-2-video", ) -> tuple[dict[str, LayoutInstruction], dict[str, dict[str, Any]]]: """Return ``(clip_id -> LayoutInstruction, clip_id -> raw_gemini_json)``.""" out: dict[str, LayoutInstruction] = {} raw_by_clip: dict[str, dict[str, Any]] = {} model_name = gemini_vision_model.strip() for s in scenes: sid = s.scene_id if not s.keyframe_path: logger.warning("No keyframe for %s; using sit_center.", sid) out[sid] = LayoutInstruction(clip_id=sid, layout=LayoutKind.SIT_CENTER) raw_by_clip[sid] = {"error": "no keyframe", "layout": "sit_center"} continue try: logger.info("Layout vision for %s (model=%s)...", sid, model_name) data = _call_gemini_vision(s.keyframe_path, model_name) image_size = _keyframe_dimensions(s.keyframe_path) instr = _instruction_from_gemini_json( sid, data, image_size=image_size, ) raw_data = dict(data) speaker_follow_applied = False tracking_source = ( source_videos_by_scene.get(sid, source_video) if source_videos_by_scene else source_video ) if instr.layout == LayoutKind.SPLIT_TWO_PERSONS and segmentation_provider == "replicate": try: focus_points, focus_detail = _infer_two_speaker_focus_tracking_with_segmentation( s, source_video=tracking_source, tracking_dir=tracking_dir, model_name=model_name, segmentation_model=segmentation_model, initial_data=data, initial_image_size=image_size, ) if focus_detail: raw_data["speaker_follow_tracking"] = focus_detail if focus_points: instr = LayoutInstruction( clip_id=sid, layout=LayoutKind.SIT_CENTER, zoom=focus_points[0].zoom or 1.0, person_x_norm=focus_points[0].x_norm, person_tracking=focus_points, ) speaker_follow_applied = True except Exception as exc: raw_data["speaker_follow_tracking"] = {"error": str(exc)} if instr.layout in (LayoutKind.SIT_CENTER, LayoutKind.ZOOM_CALL_CENTER): if speaker_follow_applied: raw_by_clip[sid] = raw_data out[sid] = instr continue # Single-speaker studio/podcast clips should feel locked-off, not # like the crop is hunting around background props. Active speaker # tracking still happens above for true two-person follow shots. raw_data["single_speaker_crop_lock"] = { "mode": "static_keyframe_center", "person_x_norm": round(float(instr.person_x_norm), 4), } raw_by_clip[sid] = raw_data out[sid] = instr except Exception as e: logger.warning("Gemini vision failed for %s: %s — defaulting sit_center", sid, e) out[sid] = LayoutInstruction(clip_id=sid, layout=LayoutKind.SIT_CENTER) raw_by_clip[sid] = {"error": str(e), "layout": "sit_center"} return out, raw_by_clip def _apply_layout_hint_fallbacks( instructions: dict[str, LayoutInstruction], raw_by_clip: dict[str, dict[str, Any]], layout_hints_by_clip: dict[str, LayoutKind], ) -> None: for clip_id, hint in layout_hints_by_clip.items(): instr = instructions.get(clip_id) raw = raw_by_clip.get(clip_id) if instr is None or raw is None or "error" not in raw: continue if instr.layout != LayoutKind.SIT_CENTER: continue if hint == LayoutKind.SPLIT_CHART_PERSON: updated_raw = dict(raw) updated_raw["layout_hint_fallback_rejected"] = hint.value updated_raw["layout_hint_rejected_reason"] = "vision_failed_without_regions" raw_by_clip[clip_id] = updated_raw continue instructions[clip_id] = instr.model_copy(update={"layout": hint}) updated_raw = dict(raw) updated_raw["layout"] = hint.value updated_raw["layout_hint_fallback"] = hint.value raw_by_clip[clip_id] = updated_raw def resolved_vision_model(config: PipelineConfig) -> str: if config.gemini_vision_model: return config.gemini_vision_model.strip() if GEMINI_VISION_MODEL: return GEMINI_VISION_MODEL return (config.gemini_model or GEMINI_MODEL).strip() def run_layout_vision_stage( work_dir: Path, scenes: list[Scene], *, source_video: Path, source_videos_by_scene: dict[str, Path] | None = None, transcript_fp: str, clips_path: Path, config: PipelineConfig, ) -> dict[str, LayoutInstruction]: """Load cache or call Gemini vision for each keyframe; persist JSON artifacts.""" from humeo.clip_selector import load_clips clips_fp = _clips_fingerprint(clips_path) vm = resolved_vision_model(config) layout_hints_by_clip = { clip.clip_id: hint for clip in load_clips(clips_path) if (hint := (clip.layout_hint or clip.layout)) is not None } if ( not config.force_layout_vision and layout_cache_valid( work_dir, transcript_fp=transcript_fp, clips_fp=clips_fp, vision_model=vm, segmentation_provider=config.segmentation_provider, segmentation_model=config.segmentation_model, ) ): cached = load_layout_cache(work_dir) if cached: logger.info("Layout vision cache hit; skipping Gemini vision calls.") return { k: LayoutInstruction.model_validate(v["instruction"]) for k, v in cached.items() if isinstance(v, dict) and "instruction" in v } instructions, raw_by_clip = infer_layout_instructions( scenes, gemini_vision_model=vm, source_video=source_video, tracking_dir=work_dir / "layout_tracking", source_videos_by_scene=source_videos_by_scene, segmentation_provider=config.segmentation_provider, segmentation_model=config.segmentation_model, ) _apply_layout_hint_fallbacks(instructions, raw_by_clip, layout_hints_by_clip) payload: dict[str, dict[str, Any]] = {} for sid, instr in instructions.items(): payload[sid] = { "instruction": json.loads(instr.model_dump_json()), "raw": raw_by_clip.get(sid, {}), } write_layout_cache( work_dir, transcript_fp=transcript_fp, clips_fp=clips_fp, vision_model=vm, clips_payload=payload, segmentation_provider=config.segmentation_provider, segmentation_model=config.segmentation_model, ) return instructions