"""Subtitle helpers for the product pipeline.""" import logging import math import os import re from pathlib import Path from humeo_core.schemas import Clip, RenderTheme, TranscriptWord from humeo.transcript_align import ( clip_subtitle_words, clip_words_to_srt_lines, format_ass, format_srt, group_words_to_cue_chunks, ) logger = logging.getLogger(__name__) _NATIVE_HIGHLIGHT_FONT_NAME = "League Spartan" _NATIVE_HIGHLIGHT_PURPLE = "&H00F65C8B" _NATIVE_HIGHLIGHT_LEAD_SEC = 0.06 _NATIVE_HIGHLIGHT_MIN_DWELL_SEC = 0.16 _NATIVE_HIGHLIGHT_MIN_VALID_WORD_SEC = 0.035 _NATIVE_HIGHLIGHT_MAX_VALID_WORD_SEC = 1.65 _NATIVE_HIGHLIGHT_MAX_LINE_WIDTH_RATIO = 0.62 _NATIVE_HIGHLIGHT_SAFE_MARGIN_X = 150 _NATIVE_HIGHLIGHT_ROUNDING_OVERRIDE = r"\blur3.0" _NATIVE_HIGHLIGHT_STOPWORDS = { "a", "all", "an", "and", "are", "as", "at", "be", "but", "by", "for", "from", "i", "if", "in", "is", "it", "of", "on", "or", "so", "that", "the", "their", "there", "they", "this", "to", "was", "we", "with", "you", "your", "has", "have", "had", "been", "being", } def _balance_reference_caption(text: str) -> str: words = text.split() if len(words) <= 5 and len(text) <= 28: return text best_idx = 1 best_delta = 10**9 for idx in range(1, len(words)): left = " ".join(words[:idx]) right = " ".join(words[idx:]) line_penalty = 0 if len(words[:idx]) < 2 or len(words[idx:]) < 2: line_penalty += 1000 delta = abs(len(left) - len(right)) + abs(len(words[:idx]) - len(words[idx:])) * 6 + line_penalty if delta < best_delta: best_delta = delta best_idx = idx return " ".join(words[:best_idx]) + "\n" + " ".join(words[best_idx:]) def _native_line_width(font, words) -> float: return _text_width(font, " ".join(word.word for word in words)) def _native_highlight_partition_penalty(lines, font, max_line_width: float) -> float: widths = [_native_line_width(font, line) for line in lines] overflow = sum(max(0.0, width - max_line_width) for width in widths) word_counts = [len(line) for line in lines] total_words = sum(word_counts) width_balance = (max(widths) - min(widths)) if len(widths) > 1 else 0.0 word_balance = (max(word_counts) - min(word_counts)) if len(word_counts) > 1 else 0 single_word_penalty = sum(260 for line in lines if len(line) == 1 and total_words > 3) return ( overflow * 80.0 + len(lines) * 120.0 + width_balance * 0.16 + word_balance * 120.0 + single_word_penalty ) def _candidate_native_highlight_partitions(words, max_lines: int): n = len(words) if n == 0: return [] if max_lines <= 1 or n == 1: return [[list(words)]] out = [[list(words)]] for first_break in range(1, n): out.append([list(words[:first_break]), list(words[first_break:])]) if max_lines >= 3 and n >= 3: for first_break in range(1, n - 1): for second_break in range(first_break + 1, n): out.append( [ list(words[:first_break]), list(words[first_break:second_break]), list(words[second_break:]), ] ) return out def _split_native_highlight_lines(words, *, font=None, max_line_width: float | None = None): if len(words) <= 3 and len(" ".join(word.word for word in words)) <= 22: return [list(words)] if len(words) < 2: return [list(words)] if font is not None and max_line_width is not None: candidates = _candidate_native_highlight_partitions(words, max_lines=3) return min( candidates, key=lambda lines: _native_highlight_partition_penalty( lines, font, max_line_width, ), ) best_idx = 1 best_delta = 10**9 for idx in range(1, len(words)): left_words = words[:idx] right_words = words[idx:] left = " ".join(word.word for word in left_words) right = " ".join(word.word for word in right_words) line_penalty = 0 if len(left_words) < 2 or len(right_words) < 2: line_penalty += 800 delta = abs(len(left) - len(right)) + abs(len(left_words) - len(right_words)) * 7 + line_penalty if delta < best_delta: best_delta = delta best_idx = idx return [list(words[:best_idx]), list(words[best_idx:])] def _clean_native_highlight_token(text: str) -> str: return re.sub(r"(^[^A-Za-z0-9$%#]+|[^A-Za-z0-9$%#]+$)", "", text or "") def _native_highlight_span_score(words) -> float: cleaned = [_clean_native_highlight_token(word.word) for word in words] cleaned = [token for token in cleaned if token] if not cleaned: return -1e9 if all(token.lower() in _NATIVE_HIGHLIGHT_STOPWORDS for token in cleaned): return -1e9 score = 0.0 for token in cleaned: lower = token.lower() if lower not in _NATIVE_HIGHLIGHT_STOPWORDS: score += 2.0 if any(ch.isdigit() for ch in token) or "$" in token or "%" in token: score += 3.0 if len(token) >= 6: score += 0.8 if token.isupper() and len(token) > 1: score += 0.6 if len(cleaned) == 2: score -= 0.55 if any(any(ch.isdigit() for ch in token) or "$" in token or "%" in token for token in cleaned): score += 1.1 elif cleaned[0].lower() in _NATIVE_HIGHLIGHT_STOPWORDS or cleaned[1].lower() in _NATIVE_HIGHLIGHT_STOPWORDS: score -= 0.6 else: score += 0.3 if len(" ".join(cleaned)) > 18: score -= 0.6 return score def _should_render_native_highlight_group(words) -> bool: cleaned = [_clean_native_highlight_token(word.word) for word in words] cleaned = [token for token in cleaned if token] if not cleaned: return False return any(token.lower() not in _NATIVE_HIGHLIGHT_STOPWORDS for token in cleaned) def _native_highlight_font_path() -> Path | None: try: import humeo_core bundled = ( Path(humeo_core.__file__).resolve().parent / "assets" / "fonts" / "LeagueSpartan-Bold.ttf" ) if bundled.is_file(): return bundled except Exception: pass windows_fonts = Path(os.environ.get("WINDIR", r"C:\Windows")) / "Fonts" for filename in ("arialbd.ttf", "Arialbd.ttf", "ARIALBD.TTF", "arial.ttf"): path = windows_fonts / filename if path.is_file(): return path return None def _text_width(font, text: str) -> float: if not text: return 0.0 if hasattr(font, "getlength"): return float(font.getlength(text)) bbox = font.getbbox(text) return float(bbox[2] - bbox[0]) def _text_height(font) -> int: bbox = font.getbbox("Ag") return max(1, int(round(bbox[3] - bbox[1]))) def _escape_ass_text(text: str) -> str: return ( text.replace("\\", r"\\") .replace("{", r"\{") .replace("}", r"\}") .replace("\n", r"\N") ) def _native_highlight_overlay_text(line_words, highlight_idx: int) -> str: parts: list[str] = [] for word_idx, word in enumerate(line_words): if word_idx == highlight_idx: parts.append( f"{{\\rHighlight{_NATIVE_HIGHLIGHT_ROUNDING_OVERRIDE}}}" f"{_escape_ass_text(word.word)}" "{\\rInvisible}" ) else: parts.append(_escape_ass_text(word.word)) return " ".join(parts) def _word_timing_weight(word: TranscriptWord) -> float: token = _clean_native_highlight_token(word.word) return max(0.65, min(2.2, len(token or word.word) / 5.5)) def _suspicious_native_highlight_timing( words: list[TranscriptWord], idx: int, *, clip_duration: float, ) -> bool: word = words[idx] start = float(word.start_time) end = float(word.end_time) if not (math.isfinite(start) and math.isfinite(end)): return True if start < -0.01 or end > clip_duration + 0.25: return True duration = end - start if duration < _NATIVE_HIGHLIGHT_MIN_VALID_WORD_SEC: return True if duration > _NATIVE_HIGHLIGHT_MAX_VALID_WORD_SEC: return True if idx > 0: prev = words[idx - 1] if start < float(prev.start_time) - 0.03: return True if start < float(prev.end_time) - 0.35: return True if idx + 1 < len(words): nxt = words[idx + 1] if float(nxt.start_time) < start - 0.03: return True return False def _repair_native_highlight_timings( words: list[TranscriptWord], *, clip_duration: float, ) -> list[TranscriptWord]: """Repair obvious ASR word timestamp glitches before per-word highlighting. This is intentionally conservative: clean Whisper/ElevenLabs timings pass through almost unchanged, while zero-length, reversed, huge, or badly overlapping word timings get interpolated between neighboring reliable words. """ if not words: return [] clip_duration = max(0.0, float(clip_duration)) records: list[dict[str, object]] = [] for idx, word in enumerate(words): start = max(0.0, min(clip_duration, float(word.start_time))) end = max(0.0, min(clip_duration, float(word.end_time))) records.append( { "word": word.word, "start": start, "end": end, "bad": _suspicious_native_highlight_timing( words, idx, clip_duration=clip_duration, ), "weight": _word_timing_weight(word), } ) idx = 0 while idx < len(records): if not records[idx]["bad"]: idx += 1 continue run_start = idx while idx < len(records) and records[idx]["bad"]: idx += 1 run_end = idx - 1 count = run_end - run_start + 1 left_time = ( float(records[run_start - 1]["end"]) if run_start > 0 else max(0.0, float(records[run_start]["start"])) ) right_time = ( float(records[run_end + 1]["start"]) if run_end + 1 < len(records) else min(clip_duration, max(left_time, float(records[run_end]["end"]))) ) weight_span = sum(float(r["weight"]) for r in records[run_start : run_end + 1]) * 0.13 min_span = max(0.11 * count, weight_span) if right_time <= left_time + min_span: right_time = min(clip_duration, left_time + min_span) if right_time <= left_time: right_time = min(clip_duration, left_time + max(0.08, 0.12 * count)) span = max(0.001, right_time - left_time) weights = [float(r["weight"]) for r in records[run_start : run_end + 1]] total_weight = max(0.001, sum(weights)) cursor = left_time for offset, weight in enumerate(weights): rec = records[run_start + offset] next_cursor = ( right_time if offset == count - 1 else cursor + span * (weight / total_weight) ) rec["start"] = cursor rec["end"] = max(cursor + 0.04, next_cursor) cursor = float(rec["end"]) repaired: list[TranscriptWord] = [] prev_end = 0.0 for rec in records: start = max(0.0, float(rec["start"])) end = max(start + 0.02, float(rec["end"])) if start < prev_end - 0.02: start = prev_end end = max(end, start + 0.04) if clip_duration > 0.0: end = min(clip_duration, end) if end <= start: start = max(0.0, min(start, clip_duration - 0.02)) end = min(clip_duration, start + 0.04) repaired.append(TranscriptWord(word=str(rec["word"]), start_time=start, end_time=end)) prev_end = max(prev_end, end) return repaired def _native_highlight_word_windows( words: list[TranscriptWord], *, lead_sec: float, min_dwell_sec: float, ) -> list[tuple[float, float]]: if not words: return [] lead_sec = max(0.0, float(lead_sec)) min_dwell_sec = max(0.02, float(min_dwell_sec)) cue_start = max(0.0, words[0].start_time - lead_sec) cue_end = max(words[-1].end_time, words[-1].start_time + min_dwell_sec) starts: list[float] = [] for idx, word in enumerate(words): start = max(cue_start, float(word.start_time) - lead_sec) if idx > 0: start = max(start, starts[-1] + 0.01) starts.append(start) windows: list[tuple[float, float]] = [] for idx, word in enumerate(words): start = starts[idx] natural_end = max(float(word.end_time), start + min_dwell_sec) limit = starts[idx + 1] if idx + 1 < len(starts) else cue_end end = min(natural_end, limit) if end <= start: end = min(limit, start + 0.01) windows.append((start, max(start + 0.01, end))) return windows def _fmt_ass_time(seconds: float) -> str: seconds = max(0.0, seconds) hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = seconds % 60 whole = int(secs) cs = int(round((secs - whole) * 100)) if cs >= 100: cs = 99 return f"{hours:d}:{minutes:02d}:{whole:02d}.{cs:02d}" def _format_native_highlight_ass( cue_chunks, *, play_res_x: int, play_res_y: int, font_size: int, margin_v: int, font_name: str, highlight_lead_sec: float = _NATIVE_HIGHLIGHT_LEAD_SEC, highlight_min_dwell_sec: float = _NATIVE_HIGHLIGHT_MIN_DWELL_SEC, ) -> str: from PIL import ImageFont font_path = _native_highlight_font_path() if font_path is not None: font = ImageFont.truetype(str(font_path), size=font_size) else: font = ImageFont.load_default() line_height = max(font_size, _text_height(font) + 6) line_gap = max(8, int(round(font_size * 0.08))) bottom_anchor = play_res_y - margin_v safe_margin_x = min( int(round(play_res_x * 0.12)), max(24, _NATIVE_HIGHLIGHT_SAFE_MARGIN_X), ) max_line_width = min( play_res_x * _NATIVE_HIGHLIGHT_MAX_LINE_WIDTH_RATIO, play_res_x - (safe_margin_x * 2), ) header = ( "[Script Info]\n" "ScriptType: v4.00+\n" f"PlayResX: {play_res_x}\n" f"PlayResY: {play_res_y}\n" "WrapStyle: 0\n" "ScaledBorderAndShadow: yes\n" "YCbCr Matrix: None\n" "\n" "[V4+ Styles]\n" "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, " "OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, " "ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, " "Alignment, MarginL, MarginR, MarginV, Encoding\n" f"Style: Base,{font_name},{font_size},&H00FFFFFF,&H000000FF,&H00101010,&H00000000,-1,0,0,0,100,100,-1,0,1,4,0,8,0,0,0,0\n" f"Style: Highlight,{font_name},{font_size},&H00FFFFFF,&H000000FF,{_NATIVE_HIGHLIGHT_PURPLE},&H00000000,-1,0,0,0,100,100,-1,0,3,4,0,8,0,0,0,0\n" f"Style: Invisible,{font_name},{font_size},&HFF000000,&H000000FF,&HFF000000,&HFF000000,-1,0,0,0,100,100,-1,0,1,0,0,8,0,0,0,0\n" "\n" "[Events]\n" "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" ) events: list[str] = [] for cue_words in cue_chunks: if not cue_words: continue lines = _split_native_highlight_lines( cue_words, font=font, max_line_width=max_line_width, ) cue_windows = _native_highlight_word_windows( cue_words, lead_sec=highlight_lead_sec, min_dwell_sec=highlight_min_dwell_sec, ) block_height = len(lines) * line_height + max(0, len(lines) - 1) * line_gap block_top = bottom_anchor - block_height cue_start = cue_windows[0][0] if cue_windows else cue_words[0].start_time cue_end = cue_windows[-1][1] if cue_windows else cue_words[-1].end_time word_offset = 0 line_center_x = play_res_x / 2.0 for line_idx, line_words in enumerate(lines): if not line_words: continue line_text = " ".join(word.word for word in line_words) line_top = block_top + line_idx * (line_height + line_gap) events.append( "Dialogue: 1," f"{_fmt_ass_time(cue_start)},{_fmt_ass_time(cue_end)},Base,,0,0,0,," f"{{\\an8\\pos({line_center_x:.1f},{line_top:.1f})}}{_escape_ass_text(line_text)}" ) for word_idx, word in enumerate(line_words): cleaned = _clean_native_highlight_token(word.word) if not cleaned: continue word_start, word_end = cue_windows[word_offset + word_idx] events.append( "Dialogue: 0," f"{_fmt_ass_time(word_start)},{_fmt_ass_time(word_end)},Invisible,,0,0,0,," f"{{\\an8\\pos({line_center_x:.1f},{line_top:.1f})}}" f"{_native_highlight_overlay_text(line_words, word_idx)}" ) word_offset += len(line_words) return header + "\n".join(events) + ("\n" if events else "") def generate_srt( clip: Clip, transcript: dict, output_dir: Path, *, max_words_per_cue: int = 8, max_cue_sec: float = 4.0, ) -> Path: """ Build an SRT file from word-level ASR aligned to this clip's timeline. ``transcript`` is the persisted ``transcript.json`` (segments with optional per-word timestamps). Times are shifted so 0 = clip in-point. """ srt_path = output_dir / f"clip_{clip.clip_id}.srt" aligned = clip_subtitle_words(transcript, clip) lines = clip_words_to_srt_lines( aligned.words, max_words_per_cue=max_words_per_cue, max_cue_sec=max_cue_sec, ) srt_path.write_text(format_srt(lines), encoding="utf-8") logger.info("Generated SRT: %s (%d cues)", srt_path, len(lines)) return srt_path def generate_ass( clip: Clip, transcript: dict, output_dir: Path, *, max_words_per_cue: int = 4, max_cue_sec: float = 2.2, play_res_x: int = 1080, play_res_y: int = 1920, font_size: int = 48, margin_v: int = 160, margin_h: int = 60, font_name: str = "Arial", render_theme: RenderTheme = RenderTheme.LEGACY, native_highlight_lead_sec: float = _NATIVE_HIGHLIGHT_LEAD_SEC, native_highlight_min_dwell_sec: float = _NATIVE_HIGHLIGHT_MIN_DWELL_SEC, repair_word_timings: bool = True, ) -> Path: """Generate an ASS caption file tuned for direct libass rendering. Unlike SRT → libass (default PlayResY=288), an ASS file with ``PlayResY = output_height`` means libass' scale factor is 1.0, so the ``font_size`` / ``margin_v`` arguments below are honest output pixels. This is the root-cause fix for the "captions rendering in the middle of the frame, four times too large" bug the user reported. """ ass_path = output_dir / f"clip_{clip.clip_id}.ass" aligned = clip_subtitle_words(transcript, clip) cue_words = max_words_per_cue cue_sec = max_cue_sec cue_font_size = font_size cue_margin_v = margin_v prefer_break_on_punctuation = False min_words_before_break = 1 if render_theme == RenderTheme.REFERENCE_LOWER_THIRD: cue_words = max(max_words_per_cue, 7) cue_sec = max(max_cue_sec, 2.6) cue_font_size = max(font_size, 52) cue_margin_v = min(margin_v, 136) prefer_break_on_punctuation = True min_words_before_break = 5 elif render_theme == RenderTheme.NATIVE_HIGHLIGHT: cue_words = 4 cue_sec = 1.45 cue_font_size = max(font_size, 80) cue_margin_v = max(margin_v, 300) prefer_break_on_punctuation = True min_words_before_break = 3 aligned_words = aligned.words if render_theme == RenderTheme.NATIVE_HIGHLIGHT and repair_word_timings: aligned_words = _repair_native_highlight_timings( aligned_words, clip_duration=clip.duration_sec, ) cue_chunks = group_words_to_cue_chunks( aligned_words, max_words_per_cue=cue_words, max_cue_sec=cue_sec, prefer_break_on_punctuation=prefer_break_on_punctuation, min_words_before_break=min_words_before_break, ) lines = [ (chunk[0].start_time, chunk[-1].end_time, " ".join(word.word for word in chunk)) for chunk in cue_chunks ] if render_theme == RenderTheme.REFERENCE_LOWER_THIRD: lines = [(start, end, _balance_reference_caption(text)) for start, end, text in lines] ass_text = format_ass( lines, play_res_x=play_res_x, play_res_y=play_res_y, font_size=cue_font_size, margin_v=cue_margin_v, margin_h=margin_h, font_name="Source Sans 3", render_theme=render_theme, ) elif render_theme == RenderTheme.NATIVE_HIGHLIGHT: ass_text = _format_native_highlight_ass( cue_chunks, play_res_x=play_res_x, play_res_y=play_res_y, font_size=cue_font_size, margin_v=cue_margin_v, font_name=_NATIVE_HIGHLIGHT_FONT_NAME, highlight_lead_sec=native_highlight_lead_sec, highlight_min_dwell_sec=native_highlight_min_dwell_sec, ) else: ass_text = format_ass( lines, play_res_x=play_res_x, play_res_y=play_res_y, font_size=cue_font_size, margin_v=cue_margin_v, margin_h=margin_h, font_name=font_name, render_theme=render_theme, ) ass_path.write_text(ass_text, encoding="utf-8") logger.info("Generated ASS: %s (%d cues)", ass_path, len(lines)) return ass_path