"""Map source-timeline ASR words to per-clip subtitle timings (t=0 at clip in-point).""" from __future__ import annotations from humeo_core.schemas import Clip, ClipSubtitleWords, RenderTheme, TranscriptWord # Whisper / WhisperX / OpenAI-normalized segment shapes _MAX_WORDS_PER_CUE = 8 _MAX_CUE_SEC = 4.0 _PUNCTUATION_BREAK_CHARS = (".", "?", "!", ";", ":") _SENTENCE_RESTART_WORDS = frozenset( { "And", "But", "Did", "Now", "So", "That", "Then", "This", "Those", "What", "When", "Where", "Why", } ) def _iter_words_from_segments(transcript: dict) -> list[TranscriptWord]: out: list[TranscriptWord] = [] for seg in transcript.get("segments", []) or []: words = seg.get("words") or [] if words: for raw in words: w = str(raw.get("word", "")).strip() if not w: continue out.append( TranscriptWord( word=w, start_time=float(raw["start"]), end_time=float(raw["end"]), ) ) continue # Segment-level only (no word list): treat whole segment as one token text = str(seg.get("text", "")).strip() if text: out.append( TranscriptWord( word=text, start_time=float(seg.get("start", 0.0)), end_time=float(seg.get("end", 0.0)), ) ) return out def clip_subtitle_words(transcript: dict, clip: Clip) -> ClipSubtitleWords: """Words overlapping ``clip`` with times shifted to start at 0 (clip-local).""" clip_start = clip.start_time_sec clip_end = clip.end_time_sec words = _iter_words_from_segments(transcript) local: list[TranscriptWord] = [] for w in words: if w.end_time <= clip_start or w.start_time >= clip_end: continue t0 = max(w.start_time, clip_start) - clip_start t1 = min(w.end_time, clip_end) - clip_start if t1 <= t0: continue local.append(TranscriptWord(word=w.word, start_time=t0, end_time=t1)) if local: return ClipSubtitleWords(words=local) return ClipSubtitleWords(words=_fallback_even_words(clip)) def _fallback_even_words(clip: Clip) -> list[TranscriptWord]: """Even split over clip duration when no word timestamps exist.""" text = (clip.transcript or "").strip() if not text: return [] parts = text.split() if not parts: return [] d = clip.duration_sec step = d / len(parts) out: list[TranscriptWord] = [] for i, p in enumerate(parts): out.append( TranscriptWord( word=p, start_time=i * step, end_time=(i + 1) * step if i < len(parts) - 1 else d, ) ) return out def _looks_like_sentence_restart(prev_word: str, next_word: str) -> bool: prev = prev_word.rstrip("\"')]}") nxt = next_word.lstrip("\"'([{") if not prev or not nxt: return False if nxt in _SENTENCE_RESTART_WORDS: return True return any(ch.isdigit() for ch in prev) and nxt[0].isupper() def clip_words_to_srt_lines( words: list[TranscriptWord], *, max_words_per_cue: int = _MAX_WORDS_PER_CUE, max_cue_sec: float = _MAX_CUE_SEC, prefer_break_on_punctuation: bool = False, min_words_before_break: int = 1, ) -> list[tuple[float, float, str]]: """Group words into SRT cues: max N words and max duration per cue.""" chunks = group_words_to_cue_chunks( words, max_words_per_cue=max_words_per_cue, max_cue_sec=max_cue_sec, prefer_break_on_punctuation=prefer_break_on_punctuation, min_words_before_break=min_words_before_break, ) return [ (chunk[0].start_time, chunk[-1].end_time, " ".join(w.word for w in chunk)) for chunk in chunks ] def group_words_to_cue_chunks( words: list[TranscriptWord], *, max_words_per_cue: int = _MAX_WORDS_PER_CUE, max_cue_sec: float = _MAX_CUE_SEC, prefer_break_on_punctuation: bool = False, min_words_before_break: int = 1, ) -> list[list[TranscriptWord]]: """Group words into timed cue chunks while preserving per-word timings.""" if not words: return [] max_words_per_cue = max(1, int(max_words_per_cue)) max_cue_sec = max(0.2, float(max_cue_sec)) min_words_before_break = max(1, int(min_words_before_break)) chunks_out: list[list[TranscriptWord]] = [] i = 0 n = len(words) while i < n: chunk: list[TranscriptWord] = [words[i]] t0 = words[i].start_time end_t = words[i].end_time j = i + 1 while j < n: w = words[j] if len(chunk) >= max_words_per_cue: break if w.start_time - t0 > max_cue_sec: break if ( prefer_break_on_punctuation and (len(chunk) >= 2 or end_t - t0 >= 0.45) and _looks_like_sentence_restart(chunk[-1].word, w.word) ): break chunk.append(w) end_t = w.end_time j += 1 if ( prefer_break_on_punctuation and len(chunk) >= min_words_before_break and chunk[-1].word.rstrip("\"')]}").endswith(_PUNCTUATION_BREAK_CHARS) ): break chunks_out.append(chunk) i = j return chunks_out def format_srt(lines: list[tuple[float, float, str]]) -> str: blocks: list[str] = [] for idx, (start, end, text) in enumerate(lines, start=1): blocks.append( f"{idx}\n{_fmt_time(start)} --> {_fmt_time(end)}\n{text}\n" ) return "\n".join(blocks) def _fmt_time(seconds: float) -> str: hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) millis = int(round((seconds % 1) * 1000)) if millis >= 1000: millis = 999 return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" # --------------------------------------------------------------------------- # ASS / SubStation Alpha output (the format libass natively renders) # --------------------------------------------------------------------------- def _fmt_ass_time(seconds: float) -> str: """ASS time format: ``H:MM:SS.cs`` (centiseconds).""" seconds = max(0.0, seconds) hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = seconds % 60 whole = int(secs) cs = int(round((secs - whole) * 100)) if cs >= 100: cs = 99 return f"{hours:d}:{minutes:02d}:{whole:02d}.{cs:02d}" def _escape_ass_text(text: str) -> str: """Escape characters that are significant to the ASS dialogue parser.""" return ( text.replace("\\", r"\\") .replace("{", r"\{") .replace("}", r"\}") .replace("\n", r"\N") ) def format_ass( lines: list[tuple[float, float, str]], *, play_res_x: int, play_res_y: int, font_size: int, margin_v: int, margin_h: int = 60, font_name: str = "Arial", render_theme: RenderTheme = RenderTheme.LEGACY, ) -> str: """Render captions as an ASS script whose PlayRes matches the output video. Why this exists: libass' font/margin scaling multiplies every pixel-ish value by ``video_height / PlayResY``. The default ``PlayResY=288`` blew ``FontSize=48`` up to ~320 output pixels and pushed ``MarginV`` to the middle of the frame. Pinning ``PlayResY`` to the actual output height makes that scale factor exactly 1.0, so ``font_size`` and ``margin_v`` below are honest output pixel values. """ if render_theme == RenderTheme.REFERENCE_LOWER_THIRD: style_line = ( f"Style: Default,{font_name},{font_size},&H00FFFFFF,&H000000FF," "&H00000000,&H00000000,-1,0,0,0,100,100,-1,0,1,3,0,2," f"{margin_h},{margin_h},{margin_v},0\n" ) else: style_line = ( f"Style: Default,{font_name},{font_size},&H00FFFFFF,&H000000FF," f"&H00000000,&H70000000,-1,0,0,0,100,100,0,0,4,0,0,2," f"{margin_h},{margin_h},{margin_v},0\n" ) header = ( "[Script Info]\n" "ScriptType: v4.00+\n" f"PlayResX: {play_res_x}\n" f"PlayResY: {play_res_y}\n" "WrapStyle: 0\n" "ScaledBorderAndShadow: yes\n" "YCbCr Matrix: None\n" "\n" "[V4+ Styles]\n" "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, " "OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, " "ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, " "Alignment, MarginL, MarginR, MarginV, Encoding\n" + style_line + "\n" "[Events]\n" "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" ) events = [] for start, end, text in lines: events.append( f"Dialogue: 0,{_fmt_ass_time(start)},{_fmt_ass_time(end)},Default,," f"0,0,0,,{_escape_ass_text(text)}" ) return header + "\n".join(events) + ("\n" if events else "")