Spaces:
Sleeping
Sleeping
| """Best-effort render QA artifacts for finished shorts.""" | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import math | |
| import re | |
| import shutil | |
| import subprocess | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| from humeo_core.schemas import Clip, LayoutInstruction, LayoutKind, TranscriptWord | |
| from humeo.transcript_align import clip_subtitle_words | |
| logger = logging.getLogger(__name__) | |
| _CONTACT_COLUMNS = 8 | |
| _CONTACT_ROWS = 5 | |
| _CONTACT_THUMB_W = 270 | |
| _DEBUG_FPS = 10 | |
| _PIXEL_QA_SAMPLES = 8 | |
| _PIXEL_QA_W = 360 | |
| _PIXEL_QA_CAPTION_MIN_Y_RATIO = 0.40 | |
| def _clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float: | |
| return max(lo, min(hi, value)) | |
| def _ensure_ffmpeg() -> str: | |
| exe = shutil.which("ffmpeg") | |
| if not exe: | |
| raise RuntimeError("ffmpeg not found on PATH") | |
| return exe | |
| def _ensure_ffprobe() -> str: | |
| exe = shutil.which("ffprobe") | |
| if not exe: | |
| raise RuntimeError("ffprobe not found on PATH") | |
| return exe | |
| def _run(cmd: list[str]) -> None: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| def _probe_duration(path: Path) -> float | None: | |
| try: | |
| out = subprocess.run( | |
| [ | |
| _ensure_ffprobe(), | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "default=nokey=1:noprint_wrappers=1", | |
| str(path), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return float((out.stdout or "").strip()) | |
| except Exception: | |
| return None | |
| def _probe_size(path: Path) -> tuple[int, int] | None: | |
| try: | |
| out = subprocess.run( | |
| [ | |
| _ensure_ffprobe(), | |
| "-v", | |
| "error", | |
| "-select_streams", | |
| "v:0", | |
| "-show_entries", | |
| "stream=width,height", | |
| "-of", | |
| "csv=p=0", | |
| str(path), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| width, height = (out.stdout or "").strip().split(",") | |
| return int(width), int(height) | |
| except Exception: | |
| return None | |
| def create_contact_sheet( | |
| video_path: Path, | |
| output_path: Path, | |
| *, | |
| columns: int = _CONTACT_COLUMNS, | |
| rows: int = _CONTACT_ROWS, | |
| thumb_width: int = _CONTACT_THUMB_W, | |
| ) -> Path: | |
| """Create an evenly sampled contact sheet for one rendered short.""" | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| duration = _probe_duration(video_path) or 40.0 | |
| frame_count = max(1, columns * rows) | |
| sample_fps = max(0.1, min(4.0, frame_count / max(duration, 1.0))) | |
| vf = ( | |
| f"fps={sample_fps:.6f}," | |
| f"scale={thumb_width}:-1," | |
| f"tile={columns}x{rows}:padding=2:margin=0" | |
| ) | |
| _run( | |
| [ | |
| _ensure_ffmpeg(), | |
| "-y", | |
| "-loglevel", | |
| "error", | |
| "-i", | |
| str(video_path), | |
| "-vf", | |
| vf, | |
| "-frames:v", | |
| "1", | |
| str(output_path), | |
| ] | |
| ) | |
| return output_path | |
| def create_ab_compare( | |
| reference_video: Path, | |
| output_video: Path, | |
| compare_path: Path, | |
| *, | |
| fps: float = 4.0, | |
| columns: int = _CONTACT_COLUMNS, | |
| rows: int = _CONTACT_ROWS, | |
| thumb_width: int = _CONTACT_THUMB_W, | |
| output_seek_sec: float = 0.0, | |
| ) -> Path: | |
| """Stack reference and output contact sheets into one compare image.""" | |
| compare_path.parent.mkdir(parents=True, exist_ok=True) | |
| ref_sheet = compare_path.with_name(compare_path.stem + "_reference.jpg") | |
| out_sheet = compare_path.with_name(compare_path.stem + "_output.jpg") | |
| tile = f"tile={columns}x{rows}:padding=2:margin=0" | |
| common_vf = f"fps={fps:.6f},scale={thumb_width}:-1,{tile}" | |
| _run( | |
| [ | |
| _ensure_ffmpeg(), | |
| "-y", | |
| "-loglevel", | |
| "error", | |
| "-i", | |
| str(reference_video), | |
| "-vf", | |
| common_vf, | |
| "-frames:v", | |
| "1", | |
| str(ref_sheet), | |
| ] | |
| ) | |
| output_cmd = [ | |
| _ensure_ffmpeg(), | |
| "-y", | |
| "-loglevel", | |
| "error", | |
| ] | |
| if output_seek_sec > 0.0: | |
| output_cmd.extend(["-ss", f"{output_seek_sec:.3f}"]) | |
| output_cmd.extend( | |
| [ | |
| "-i", | |
| str(output_video), | |
| "-vf", | |
| common_vf, | |
| "-frames:v", | |
| "1", | |
| str(out_sheet), | |
| ] | |
| ) | |
| _run(output_cmd) | |
| _run( | |
| [ | |
| _ensure_ffmpeg(), | |
| "-y", | |
| "-loglevel", | |
| "error", | |
| "-i", | |
| str(ref_sheet), | |
| "-i", | |
| str(out_sheet), | |
| "-filter_complex", | |
| "[0:v][1:v]vstack=inputs=2", | |
| "-frames:v", | |
| "1", | |
| str(compare_path), | |
| ] | |
| ) | |
| return compare_path | |
| def _even(value: int) -> int: | |
| return max(2, value - (value % 2)) | |
| def _base_crop_size(src_w: int, src_h: int, target_aspect: float) -> tuple[int, int]: | |
| if src_w / src_h >= target_aspect: | |
| base_ch = src_h | |
| base_cw = int(round(base_ch * target_aspect)) | |
| else: | |
| base_cw = src_w | |
| base_ch = int(round(base_cw / target_aspect)) | |
| return _even(base_cw), _even(base_ch) | |
| def _crop_size(src_w: int, src_h: int, zoom: float) -> tuple[int, int]: | |
| base_cw, base_ch = _base_crop_size(src_w, src_h, 9 / 16) | |
| zoom = max(1.0, float(zoom)) | |
| return _even(int(round(base_cw / zoom))), _even(int(round(base_ch / zoom))) | |
| def _center_expr(layout: LayoutInstruction, src_w: int) -> str: | |
| points = sorted(layout.person_tracking, key=lambda p: p.t_sec) | |
| if not points: | |
| return f"{_clamp(layout.person_x_norm) * src_w:.3f}" | |
| expr = f"{_clamp(points[-1].x_norm) * src_w:.3f}" | |
| for idx in range(len(points) - 2, -1, -1): | |
| threshold = (float(points[idx].t_sec) + float(points[idx + 1].t_sec)) / 2.0 | |
| value = _clamp(points[idx].x_norm) * src_w | |
| expr = f"if(lt(t\\,{threshold:.3f})\\,{value:.3f}\\,{expr})" | |
| return expr | |
| def _raw_bbox_filter( | |
| raw_layout: dict[str, Any], | |
| key: str, | |
| *, | |
| src_w: int, | |
| src_h: int, | |
| color: str, | |
| ) -> str | None: | |
| box = raw_layout.get(key) | |
| if not isinstance(box, dict): | |
| return None | |
| try: | |
| x1 = float(box["x1"]) | |
| y1 = float(box["y1"]) | |
| x2 = float(box["x2"]) | |
| y2 = float(box["y2"]) | |
| except (KeyError, TypeError, ValueError): | |
| return None | |
| if max(abs(x1), abs(y1), abs(x2), abs(y2)) <= 1.5: | |
| x1, x2 = x1 * src_w, x2 * src_w | |
| y1, y2 = y1 * src_h, y2 * src_h | |
| x = max(0, min(src_w - 2, int(round(x1)))) | |
| y = max(0, min(src_h - 2, int(round(y1)))) | |
| w = max(2, min(src_w - x, int(round(x2 - x1)))) | |
| h = max(2, min(src_h - y, int(round(y2 - y1)))) | |
| return f"drawbox=x={x}:y={y}:w={w}:h={h}:color={color}:t=4" | |
| def create_crop_debug_overlay( | |
| source_video: Path, | |
| output_path: Path, | |
| *, | |
| clip: Clip, | |
| layout: LayoutInstruction, | |
| raw_layout: dict[str, Any] | None = None, | |
| ) -> Path: | |
| """Create a low-res source preview with crop, speaker center, and bbox overlays.""" | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| src_w, src_h = _probe_size(source_video) or (1920, 1080) | |
| zoom = ( | |
| max(layout.zoom, 1.25) | |
| if layout.layout == LayoutKind.ZOOM_CALL_CENTER | |
| else max(layout.zoom, 1.0) | |
| ) | |
| cw, ch = _crop_size(src_w, src_h, zoom) | |
| center_y = 0.5 if layout.layout == LayoutKind.ZOOM_CALL_CENTER else 0.48 | |
| y = _even(max(0, min(src_h - ch, int(round(center_y * src_h - ch / 2))))) | |
| center = _center_expr(layout, src_w) | |
| max_x = max(0, src_w - cw) | |
| crop_x = f"floor(max(0\\,min({max_x}\\,({center})-{cw}/2))/2)*2" | |
| filters = [ | |
| f"fps={_DEBUG_FPS}", | |
| f"drawbox=x={crop_x}:y={y}:w={cw}:h={ch}:color=0x00FF00@0.85:t=6", | |
| f"drawbox=x=({center})-3:y=0:w=6:h=ih:color=0xA855F7@0.45:t=fill", | |
| ] | |
| raw_layout = raw_layout or {} | |
| for key, color in ( | |
| ("person_bbox", "0x38BDF8@0.85"), | |
| ("face_bbox", "0xFACC15@0.9"), | |
| ("second_person_bbox", "0xFB923C@0.85"), | |
| ("second_face_bbox", "0xF97316@0.9"), | |
| ): | |
| bbox_filter = _raw_bbox_filter(raw_layout, key, src_w=src_w, src_h=src_h, color=color) | |
| if bbox_filter: | |
| filters.append(bbox_filter) | |
| filters.append("scale=540:-2") | |
| duration = max(0.1, clip.duration_sec) | |
| _run( | |
| [ | |
| _ensure_ffmpeg(), | |
| "-y", | |
| "-loglevel", | |
| "error", | |
| "-t", | |
| f"{duration:.3f}", | |
| "-i", | |
| str(source_video), | |
| "-vf", | |
| ",".join(filters), | |
| "-an", | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "ultrafast", | |
| "-crf", | |
| "26", | |
| "-movflags", | |
| "+faststart", | |
| str(output_path), | |
| ] | |
| ) | |
| return output_path | |
| def _word_timing_metrics(words: list[TranscriptWord]) -> dict[str, Any]: | |
| invalid = 0 | |
| very_short = 0 | |
| very_long = 0 | |
| overlaps = 0 | |
| max_gap = 0.0 | |
| prev_end: float | None = None | |
| for word in words: | |
| start = float(word.start_time) | |
| end = float(word.end_time) | |
| duration = end - start | |
| if not (math.isfinite(start) and math.isfinite(end)) or duration <= 0.0: | |
| invalid += 1 | |
| if 0.0 < duration < 0.055: | |
| very_short += 1 | |
| if duration > 1.65: | |
| very_long += 1 | |
| if prev_end is not None: | |
| if start < prev_end - 0.06: | |
| overlaps += 1 | |
| max_gap = max(max_gap, start - prev_end) | |
| prev_end = end | |
| count = len(words) | |
| return { | |
| "word_count": count, | |
| "invalid_count": invalid, | |
| "very_short_count": very_short, | |
| "very_long_count": very_long, | |
| "overlap_count": overlaps, | |
| "max_gap_sec": round(max_gap, 3), | |
| } | |
| def _tracking_metrics(layout: LayoutInstruction) -> dict[str, Any]: | |
| points = sorted(layout.person_tracking, key=lambda p: p.t_sec) | |
| jumps = [ | |
| abs(float(points[idx].x_norm) - float(points[idx - 1].x_norm)) | |
| for idx in range(1, len(points)) | |
| ] | |
| edge_count = sum(1 for p in points if p.x_norm < 0.16 or p.x_norm > 0.84) | |
| return { | |
| "tracking_sample_count": len(points), | |
| "max_tracking_jump_norm": round(max(jumps) if jumps else 0.0, 4), | |
| "edge_sample_count": edge_count, | |
| } | |
| def _bbox_from_mask(mask: np.ndarray) -> tuple[int, int, int, int] | None: | |
| ys, xs = np.where(mask) | |
| if len(xs) == 0 or len(ys) == 0: | |
| return None | |
| return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 | |
| def _expand_bbox( | |
| bbox: tuple[int, int, int, int], | |
| *, | |
| width: int, | |
| height: int, | |
| pad_x: int, | |
| pad_y: int, | |
| ) -> tuple[int, int, int, int]: | |
| x1, y1, x2, y2 = bbox | |
| return ( | |
| max(0, x1 - pad_x), | |
| max(0, y1 - pad_y), | |
| min(width, x2 + pad_x), | |
| min(height, y2 + pad_y), | |
| ) | |
| def _bbox_area(bbox: tuple[int, int, int, int] | None) -> int: | |
| if bbox is None: | |
| return 0 | |
| x1, y1, x2, y2 = bbox | |
| return max(0, x2 - x1) * max(0, y2 - y1) | |
| def _bbox_intersection_area( | |
| first: tuple[int, int, int, int] | None, | |
| second: tuple[int, int, int, int] | None, | |
| ) -> int: | |
| if first is None or second is None: | |
| return 0 | |
| ax1, ay1, ax2, ay2 = first | |
| bx1, by1, bx2, by2 = second | |
| return _bbox_area((max(ax1, bx1), max(ay1, by1), min(ax2, bx2), min(ay2, by2))) | |
| def _sample_final_frames( | |
| video_path: Path, | |
| frames_dir: Path, | |
| *, | |
| sample_count: int = _PIXEL_QA_SAMPLES, | |
| width: int = _PIXEL_QA_W, | |
| ) -> list[tuple[float, Path]]: | |
| duration = _probe_duration(video_path) or 0.0 | |
| if duration <= 0.0: | |
| return [] | |
| frames_dir.mkdir(parents=True, exist_ok=True) | |
| samples: list[tuple[float, Path]] = [] | |
| for idx in range(max(1, sample_count)): | |
| time_sec = duration * float(idx + 1) / float(sample_count + 1) | |
| frame_path = frames_dir / f"frame_{idx + 1:03d}.jpg" | |
| try: | |
| _run( | |
| [ | |
| _ensure_ffmpeg(), | |
| "-y", | |
| "-loglevel", | |
| "error", | |
| "-ss", | |
| f"{time_sec:.3f}", | |
| "-i", | |
| str(video_path), | |
| "-frames:v", | |
| "1", | |
| "-vf", | |
| f"scale={width}:-2", | |
| str(frame_path), | |
| ] | |
| ) | |
| except Exception as exc: # noqa: BLE001 - keep QA warning-based | |
| logger.warning( | |
| "Pixel QA frame sample failed for %s at %.2fs: %s", | |
| video_path, | |
| time_sec, | |
| exc, | |
| ) | |
| continue | |
| if frame_path.is_file(): | |
| samples.append((time_sec, frame_path)) | |
| return samples | |
| def _caption_masks(arr: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
| rgb = arr.astype(np.int16) | |
| r = rgb[:, :, 0] | |
| g = rgb[:, :, 1] | |
| b = rgb[:, :, 2] | |
| purple = ( | |
| (r >= 85) | |
| & (r <= 190) | |
| & (g >= 35) | |
| & (g <= 155) | |
| & (b >= 145) | |
| & ((b - r) >= 32) | |
| & ((r - g) >= 8) | |
| ) | |
| white = (r >= 205) & (g >= 205) & (b >= 205) | |
| return purple, white | |
| def _frame_pixel_record(frame_path: Path, *, time_sec: float) -> dict[str, Any]: | |
| image = Image.open(frame_path).convert("RGB") | |
| arr = np.asarray(image) | |
| height, width = arr.shape[:2] | |
| brightness = float(arr.mean() / 255.0) | |
| contrast = float(arr.std() / 255.0) | |
| blank = brightness < 0.035 or contrast < 0.025 | |
| purple, white = _caption_masks(arr) | |
| y_grid = np.arange(height)[:, None] | |
| x_grid = np.arange(width)[None, :] | |
| caption_region = y_grid >= int(round(height * _PIXEL_QA_CAPTION_MIN_Y_RATIO)) | |
| purple = purple & caption_region | |
| purple_bbox = _bbox_from_mask(purple) | |
| caption_bbox = None | |
| if purple_bbox is not None: | |
| expanded = _expand_bbox( | |
| purple_bbox, | |
| width=width, | |
| height=height, | |
| pad_x=max(36, width // 8), | |
| pad_y=max(14, height // 34), | |
| ) | |
| ex1, ey1, ex2, ey2 = expanded | |
| nearby_white = ( | |
| white | |
| & (x_grid >= ex1) | |
| & (x_grid <= ex2) | |
| & (y_grid >= ey1) | |
| & (y_grid <= ey2) | |
| ) | |
| caption_bbox = _bbox_from_mask(purple | nearby_white) | |
| if caption_bbox is not None: | |
| caption_bbox = _expand_bbox( | |
| caption_bbox, | |
| width=width, | |
| height=height, | |
| pad_x=4, | |
| pad_y=4, | |
| ) | |
| face_safe_zone = ( | |
| int(round(width * 0.10)), | |
| int(round(height * 0.06)), | |
| int(round(width * 0.90)), | |
| int(round(height * 0.52)), | |
| ) | |
| caption_area = _bbox_area(caption_bbox) | |
| overlap_area = _bbox_intersection_area(caption_bbox, face_safe_zone) | |
| overlap_ratio = overlap_area / max(1, caption_area) | |
| edge_hit = False | |
| edge_bbox = purple_bbox or caption_bbox | |
| if edge_bbox is not None: | |
| x1, y1, x2, y2 = edge_bbox | |
| edge_margin_x = max(2, int(round(width * 0.015))) | |
| edge_margin_y = max(2, int(round(height * 0.01))) | |
| edge_hit = ( | |
| x1 <= edge_margin_x | |
| or x2 >= width - edge_margin_x | |
| or y2 >= height - edge_margin_y | |
| ) | |
| flags: list[str] = [] | |
| if blank: | |
| flags.append("blank_or_flat_frame") | |
| if edge_hit: | |
| flags.append("caption_edge_clip_check") | |
| if caption_bbox is not None and overlap_ratio >= 0.18: | |
| flags.append("caption_face_safe_zone_check") | |
| return { | |
| "time_sec": round(time_sec, 3), | |
| "frame_path": str(frame_path), | |
| "brightness": round(brightness, 4), | |
| "contrast": round(contrast, 4), | |
| "caption_bbox": list(caption_bbox) if caption_bbox is not None else None, | |
| "purple_bbox": list(purple_bbox) if purple_bbox is not None else None, | |
| "face_safe_zone": list(face_safe_zone), | |
| "caption_face_safe_zone_overlap": round(overlap_ratio, 4), | |
| "flags": flags, | |
| } | |
| def _draw_bbox( | |
| draw: ImageDraw.ImageDraw, | |
| bbox: list[int] | tuple[int, int, int, int] | None, | |
| *, | |
| color: str, | |
| width: int = 3, | |
| ) -> None: | |
| if not bbox: | |
| return | |
| draw.rectangle(tuple(int(v) for v in bbox), outline=color, width=width) | |
| def _write_pixel_qa_sheet(records: list[dict[str, Any]], output_path: Path) -> Path | None: | |
| if not records: | |
| return None | |
| frames: list[Image.Image] = [] | |
| for record in records: | |
| frame_path = Path(str(record.get("frame_path", ""))) | |
| if not frame_path.is_file(): | |
| continue | |
| img = Image.open(frame_path).convert("RGB") | |
| draw = ImageDraw.Draw(img) | |
| has_warning = bool(record.get("flags")) | |
| _draw_bbox(draw, record.get("face_safe_zone"), color="#22c55e", width=2) | |
| _draw_bbox(draw, record.get("caption_bbox"), color="#ef4444" if has_warning else "#a855f7") | |
| label = f"{record.get('time_sec', 0):.1f}s" | |
| if has_warning: | |
| label += " " + ",".join(str(flag) for flag in record.get("flags", [])) | |
| draw.rectangle((0, 0, img.width, 24), fill=(0, 0, 0)) | |
| draw.text((6, 5), label, fill=(255, 255, 255)) | |
| frames.append(img) | |
| if not frames: | |
| return None | |
| columns = min(4, len(frames)) | |
| rows = int(math.ceil(len(frames) / columns)) | |
| tile_w = max(frame.width for frame in frames) | |
| tile_h = max(frame.height for frame in frames) | |
| sheet = Image.new("RGB", (columns * tile_w, rows * tile_h), (12, 12, 12)) | |
| for idx, frame in enumerate(frames): | |
| x = (idx % columns) * tile_w | |
| y = (idx // columns) * tile_h | |
| sheet.paste(frame, (x, y)) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| sheet.save(output_path, quality=92) | |
| return output_path | |
| def analyze_rendered_pixels(video_path: Path, qa_dir: Path, *, clip_id: str) -> dict[str, Any]: | |
| """Sample rendered frames and run simple pixel-level QA checks.""" | |
| frames_dir = qa_dir / f"short_{clip_id}_pixel_frames" | |
| records: list[dict[str, Any]] = [] | |
| sheet: Path | None = None | |
| try: | |
| samples = _sample_final_frames(video_path, frames_dir) | |
| for time_sec, frame_path in samples: | |
| records.append(_frame_pixel_record(frame_path, time_sec=time_sec)) | |
| sheet = _write_pixel_qa_sheet(records, qa_dir / f"short_{clip_id}_pixel_qa.jpg") | |
| finally: | |
| shutil.rmtree(frames_dir, ignore_errors=True) | |
| sample_count = len(records) | |
| caption_seen = sum(1 for record in records if record.get("caption_bbox") is not None) | |
| blank_count = sum(1 for record in records if "blank_or_flat_frame" in record.get("flags", [])) | |
| edge_hits = sum(1 for record in records if "caption_edge_clip_check" in record.get("flags", [])) | |
| safe_zone_hits = sum( | |
| 1 for record in records if "caption_face_safe_zone_check" in record.get("flags", []) | |
| ) | |
| min_contrast = min((float(record.get("contrast", 0.0)) for record in records), default=0.0) | |
| mean_brightness = ( | |
| sum(float(record.get("brightness", 0.0)) for record in records) / sample_count | |
| if sample_count | |
| else 0.0 | |
| ) | |
| score = 1.0 | |
| if sample_count == 0: | |
| score = 0.0 | |
| else: | |
| missing_ratio = max(0.0, (max(2, sample_count // 4) - caption_seen) / max(1, sample_count)) | |
| score -= (blank_count / sample_count) * 0.55 | |
| score -= (edge_hits / sample_count) * 0.28 | |
| score -= (safe_zone_hits / sample_count) * 0.35 | |
| score -= missing_ratio * 0.20 | |
| score = _clamp(score) | |
| flags: list[str] = [] | |
| if sample_count == 0: | |
| flags.append("pixel_qa_no_samples") | |
| if blank_count: | |
| flags.append("blank_or_flat_frame") | |
| if edge_hits: | |
| flags.append("caption_edge_clip_check") | |
| if safe_zone_hits: | |
| flags.append("caption_face_safe_zone_check") | |
| if sample_count and caption_seen < max(2, sample_count // 4): | |
| flags.append("caption_pixels_sparse_check") | |
| return { | |
| "pixel_score": round(score, 3), | |
| "flags": flags, | |
| "sample_count": sample_count, | |
| "caption_seen_frames": caption_seen, | |
| "blank_frame_count": blank_count, | |
| "caption_edge_hit_count": edge_hits, | |
| "caption_face_safe_zone_hit_count": safe_zone_hits, | |
| "mean_brightness": round(mean_brightness, 4), | |
| "min_contrast": round(min_contrast, 4), | |
| "annotated_sheet": str(sheet) if sheet is not None else None, | |
| "frames": [ | |
| { | |
| "time_sec": record["time_sec"], | |
| "caption_bbox": record["caption_bbox"], | |
| "flags": record["flags"], | |
| } | |
| for record in records | |
| ], | |
| } | |
| def score_short( | |
| output_video: Path, | |
| *, | |
| clip: Clip, | |
| transcript: dict, | |
| layout: LayoutInstruction, | |
| ) -> dict[str, Any]: | |
| """Return lightweight, deterministic QA scores for one rendered short.""" | |
| words = clip_subtitle_words(transcript, clip).words | |
| word_metrics = _word_timing_metrics(words) | |
| tracking = _tracking_metrics(layout) | |
| width_height = _probe_size(output_video) | |
| duration = _probe_duration(output_video) | |
| word_count = max(1, int(word_metrics["word_count"])) | |
| caption_score = 1.0 | |
| caption_score -= (word_metrics["invalid_count"] / word_count) * 0.55 | |
| caption_score -= (word_metrics["very_short_count"] / word_count) * 0.22 | |
| caption_score -= (word_metrics["very_long_count"] / word_count) * 0.20 | |
| caption_score -= (word_metrics["overlap_count"] / word_count) * 0.28 | |
| if word_metrics["word_count"] == 0: | |
| caption_score = 0.25 | |
| caption_score = _clamp(caption_score) | |
| sample_count = max(1, int(tracking["tracking_sample_count"])) | |
| max_jump = float(tracking["max_tracking_jump_norm"]) | |
| speaker_score = 1.0 | |
| speaker_score -= (int(tracking["edge_sample_count"]) / sample_count) * 0.35 | |
| speaker_score -= max(0.0, max_jump - 0.18) * 1.4 | |
| if layout.layout not in (LayoutKind.SIT_CENTER, LayoutKind.ZOOM_CALL_CENTER): | |
| speaker_score = max(0.82, speaker_score) | |
| speaker_score = _clamp(speaker_score) | |
| crop_jump_score = _clamp(1.0 - max(0.0, max_jump - 0.12) * 2.1) | |
| video_score = 1.0 | |
| if width_height != (1080, 1920): | |
| video_score -= 0.18 | |
| if duration is None or duration <= 0.0: | |
| video_score -= 0.35 | |
| video_score = _clamp(video_score) | |
| overall = ( | |
| caption_score * 0.35 | |
| + speaker_score * 0.30 | |
| + crop_jump_score * 0.20 | |
| + video_score * 0.15 | |
| ) | |
| flags: list[str] = [] | |
| if caption_score < 0.82: | |
| flags.append("caption_timing_check") | |
| if speaker_score < 0.82: | |
| flags.append("speaker_centering_check") | |
| if crop_jump_score < 0.82: | |
| flags.append("crop_jump_check") | |
| if video_score < 0.9: | |
| flags.append("video_probe_check") | |
| return { | |
| "overall_score": round(overall, 3), | |
| "caption_score": round(caption_score, 3), | |
| "speaker_centering_score": round(speaker_score, 3), | |
| "crop_jump_score": round(crop_jump_score, 3), | |
| "video_score": round(video_score, 3), | |
| "flags": flags, | |
| "video": { | |
| "duration_sec": round(duration, 3) if duration is not None else None, | |
| "size": list(width_height) if width_height else None, | |
| }, | |
| "word_timing": word_metrics, | |
| "tracking": tracking, | |
| } | |
| def _clip_id_from_output(path: Path) -> str: | |
| match = re.search(r"short_([^\\/]+?)\.mp4$", path.name, flags=re.IGNORECASE) | |
| return match.group(1) if match else path.stem | |
| def qa_record_flags(record: dict[str, Any]) -> list[str]: | |
| flags: list[str] = [] | |
| score = record.get("score") | |
| if isinstance(score, dict): | |
| flags.extend(str(flag) for flag in score.get("flags", []) if str(flag)) | |
| pixel_qa = record.get("pixel_qa") | |
| if isinstance(pixel_qa, dict): | |
| flags.extend(str(flag) for flag in pixel_qa.get("flags", []) if str(flag)) | |
| if record.get("errors"): | |
| flags.append("qa_error") | |
| return list(dict.fromkeys(flags)) | |
| def qa_summary_lines(manifest_path: Path) -> list[str]: | |
| if not manifest_path.is_file(): | |
| return [] | |
| try: | |
| payload = json.loads(manifest_path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return [] | |
| records = payload.get("shorts", []) | |
| if not isinstance(records, list): | |
| return [] | |
| lines: list[str] = [] | |
| for record in records: | |
| if not isinstance(record, dict): | |
| continue | |
| clip_id = str(record.get("clip_id", "")).strip() | |
| if not clip_id: | |
| continue | |
| flags = qa_record_flags(record) | |
| status = "WARN " + ", ".join(flags) if flags else "OK" | |
| lines.append(f"short_{clip_id} {status}") | |
| return lines | |
| def run_render_qa( | |
| *, | |
| output_dir: Path, | |
| final_outputs: list[Path], | |
| render_clips_by_id: dict[str, Clip], | |
| transcripts_by_id: dict[str, dict], | |
| layouts_by_id: dict[str, LayoutInstruction], | |
| assembled_sources_by_id: dict[str, Path], | |
| raw_layouts_by_id: dict[str, dict[str, Any]] | None = None, | |
| reference_video: Path | None = None, | |
| debug_overlay: bool = True, | |
| ) -> Path: | |
| """Create QA artifacts for all rendered shorts and return the manifest path.""" | |
| qa_dir = output_dir / "render_qa" | |
| qa_dir.mkdir(parents=True, exist_ok=True) | |
| raw_layouts_by_id = raw_layouts_by_id or {} | |
| manifest_path = qa_dir / "qa_manifest.json" | |
| records_by_id: dict[str, dict[str, Any]] = {} | |
| if manifest_path.is_file(): | |
| try: | |
| existing = json.loads(manifest_path.read_text(encoding="utf-8")) | |
| for item in existing.get("shorts", []): | |
| if isinstance(item, dict) and item.get("clip_id"): | |
| records_by_id[str(item["clip_id"])] = item | |
| except Exception as exc: # noqa: BLE001 - stale QA should not block updates | |
| logger.warning("Ignoring stale QA manifest at %s: %s", manifest_path, exc) | |
| for video_path in final_outputs: | |
| clip_id = _clip_id_from_output(video_path) | |
| clip = render_clips_by_id.get(clip_id) | |
| transcript = transcripts_by_id.get(clip_id) | |
| layout = layouts_by_id.get(clip_id) | |
| record: dict[str, Any] = { | |
| "clip_id": clip_id, | |
| "output": str(video_path), | |
| "artifacts": {}, | |
| "errors": [], | |
| } | |
| try: | |
| sheet = create_contact_sheet(video_path, qa_dir / f"short_{clip_id}_contact.jpg") | |
| record["artifacts"]["contact_sheet"] = str(sheet) | |
| except Exception as exc: # noqa: BLE001 - QA must not fail the render | |
| record["errors"].append(f"contact_sheet: {exc}") | |
| logger.warning("Render QA contact sheet failed for %s: %s", clip_id, exc) | |
| if reference_video is not None and reference_video.is_file(): | |
| try: | |
| compare = create_ab_compare( | |
| reference_video, | |
| video_path, | |
| qa_dir / f"short_{clip_id}_ab_compare.jpg", | |
| ) | |
| record["artifacts"]["ab_compare"] = str(compare) | |
| except Exception as exc: # noqa: BLE001 | |
| record["errors"].append(f"ab_compare: {exc}") | |
| logger.warning("Render QA A/B compare failed for %s: %s", clip_id, exc) | |
| if debug_overlay and clip is not None and layout is not None: | |
| source = assembled_sources_by_id.get(clip_id) | |
| if source is not None and source.is_file(): | |
| try: | |
| debug = create_crop_debug_overlay( | |
| source, | |
| qa_dir / f"short_{clip_id}_crop_debug.mp4", | |
| clip=clip, | |
| layout=layout, | |
| raw_layout=raw_layouts_by_id.get(clip_id), | |
| ) | |
| record["artifacts"]["crop_debug_overlay"] = str(debug) | |
| except Exception as exc: # noqa: BLE001 | |
| record["errors"].append(f"crop_debug_overlay: {exc}") | |
| logger.warning("Render QA crop debug failed for %s: %s", clip_id, exc) | |
| try: | |
| pixel_qa = analyze_rendered_pixels(video_path, qa_dir, clip_id=clip_id) | |
| record["pixel_qa"] = pixel_qa | |
| if pixel_qa.get("annotated_sheet"): | |
| record["artifacts"]["pixel_qa_sheet"] = pixel_qa["annotated_sheet"] | |
| except Exception as exc: # noqa: BLE001 | |
| record["errors"].append(f"pixel_qa: {exc}") | |
| logger.warning("Render QA pixel checks failed for %s: %s", clip_id, exc) | |
| pixel_qa = None | |
| if clip is not None and transcript is not None and layout is not None: | |
| score = score_short( | |
| video_path, | |
| clip=clip, | |
| transcript=transcript, | |
| layout=layout, | |
| ) | |
| if isinstance(pixel_qa, dict): | |
| pixel_score = float(pixel_qa.get("pixel_score", 0.0)) | |
| score["pixel_score"] = round(pixel_score, 3) | |
| merged_flags = list( | |
| dict.fromkeys(score.get("flags", []) + pixel_qa.get("flags", [])) | |
| ) | |
| score["flags"] = merged_flags | |
| score["overall_score"] = round( | |
| _clamp(float(score["overall_score"]) * 0.80 + pixel_score * 0.20), | |
| 3, | |
| ) | |
| record["score"] = score | |
| else: | |
| record["errors"].append("score: missing clip, transcript, or layout") | |
| records_by_id[clip_id] = record | |
| manifest: dict[str, Any] = { | |
| "shorts": [records_by_id[key] for key in sorted(records_by_id)] | |
| } | |
| manifest_path.write_text( | |
| json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", | |
| encoding="utf-8", | |
| ) | |
| logger.info("Render QA manifest written: %s", manifest_path) | |
| logger.info("Render QA summary:") | |
| for line in qa_summary_lines(manifest_path): | |
| logger.info(" %s", line) | |
| return manifest_path | |
| def _main() -> None: | |
| parser = argparse.ArgumentParser(description="Create a reference/output A/B contact sheet.") | |
| parser.add_argument("--reference", type=Path, required=True, help="Reference video path.") | |
| parser.add_argument( | |
| "--output-video", | |
| type=Path, | |
| required=True, | |
| help="Rendered output video path.", | |
| ) | |
| parser.add_argument("--out", type=Path, required=True, help="Compare image output path.") | |
| parser.add_argument("--fps", type=float, default=4.0, help="Contact-sheet sample FPS.") | |
| args = parser.parse_args() | |
| create_ab_compare(args.reference, args.output_video, args.out, fps=args.fps) | |
| print(args.out) | |
| if __name__ == "__main__": | |
| _main() | |