""" Step 6: Audio sync — match synthesised segment durations to original timestamps. For each segment: - Too long → speed up using ffmpeg atempo filter - Too short → pad with silence at the end Then stitch all segments into a single final audio track. """ import array import math import os import subprocess import wave from pathlib import Path def _get_wav_duration(wav_path: str) -> float: with wave.open(wav_path, 'r') as f: frames = f.getnframes() rate = f.getframerate() return frames / float(rate) def _speedup_audio(input_path: str, output_path: str, factor: float) -> None: """Speed up/slow down audio by factor using ffmpeg atempo (supports 0.5–100x via chaining).""" # atempo supports 0.5 to 2.0, chain filters for larger factors filters = [] remaining = factor while remaining > 2.0: filters.append("atempo=2.0") remaining /= 2.0 while remaining < 0.5: filters.append("atempo=0.5") remaining /= 0.5 filters.append(f"atempo={remaining:.4f}") filter_str = ",".join(filters) cmd = [ "ffmpeg", "-y", "-i", input_path, "-filter:a", filter_str, output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg atempo failed:\n{result.stderr}") def _pad_silence(input_path: str, output_path: str, target_duration: float) -> None: """Pad audio with silence to reach target_duration seconds.""" current = _get_wav_duration(input_path) pad_seconds = max(0, target_duration - current) cmd = [ "ffmpeg", "-y", "-i", input_path, "-af", f"apad=pad_dur={pad_seconds:.4f}", "-t", str(target_duration), output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg apad failed:\n{result.stderr}") def _trim_audio(input_path: str, output_path: str, duration: float) -> None: """Trim audio to exactly duration seconds.""" tmp = output_path + ".trim.wav" cmd = ["ffmpeg", "-y", "-i", input_path, "-t", str(duration), tmp] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg trim failed:\n{result.stderr}") os.replace(tmp, output_path) def _detect_pauses(words: list[dict], min_pause: float = 0.15) -> list[dict]: """Find gaps between consecutive words that exceed min_pause seconds. Returns list of {after_word_idx, position, duration} sorted by position. """ pauses = [] for i in range(len(words) - 1): gap = words[i + 1]["start"] - words[i]["end"] if gap >= min_pause: pauses.append({ "after_word_idx": i, "position": words[i]["end"], "duration": gap, }) return pauses def _find_tts_silences(wav_path: str, threshold_db: float = -35.0, min_dur: float = 0.08) -> list[dict]: """Find silence regions in a TTS WAV using RMS energy. Returns list of {start, end, duration} for each detected silence region. """ with wave.open(wav_path, "r") as f: n_frames = f.getnframes() sample_rate = f.getframerate() raw = f.readframes(n_frames) # Convert raw bytes to 16-bit signed samples samples = array.array("h", raw) window_size = int(0.02 * sample_rate) # 20 ms windows hop = window_size // 2 threshold_linear = 10 ** (threshold_db / 20.0) * 32768 # dBFS to linear amplitude silences: list[dict] = [] in_silence = False silence_start = 0.0 for pos in range(0, len(samples) - window_size, hop): chunk = samples[pos:pos + window_size] rms = math.sqrt(sum(s * s for s in chunk) / window_size) t = pos / sample_rate if rms < threshold_linear: if not in_silence: in_silence = True silence_start = t else: if in_silence: dur = t - silence_start if dur >= min_dur: silences.append({"start": silence_start, "end": t, "duration": dur}) in_silence = False # Close trailing silence if in_silence: t_end = len(samples) / sample_rate dur = t_end - silence_start if dur >= min_dur: silences.append({"start": silence_start, "end": t_end, "duration": dur}) return silences def _read_wav_samples(wav_path: str) -> tuple[array.array, int]: """Read a mono 16-bit WAV and return (samples, sample_rate).""" with wave.open(wav_path, "r") as f: sr = f.getframerate() raw = f.readframes(f.getnframes()) return array.array("h", raw), sr def _write_wav_samples(samples: array.array, sample_rate: int, output_path: str) -> None: """Write 16-bit mono samples to a WAV file.""" with wave.open(output_path, "w") as f: f.setnchannels(1) f.setsampwidth(2) f.setframerate(sample_rate) f.writeframes(samples.tobytes()) def _pause_aware_sync(tts_path: str, synced_path: str, target_duration: float, words: list[dict], max_speed: float, max_overflow: float = 0.0) -> None: """Sync TTS audio using pause-aware strategy: compress silences first, then atempo. When TTS is too long: shrink detected silence regions before speeding up speech. When TTS is too short: distribute extra padding at natural pause points. `max_overflow`: extra seconds the synced output may exceed target_duration without trimming. The caller borrows this budget from the inter-segment silence that follows, so we never silently drop trailing words just to hit `target_duration` exactly. """ tts_duration = _get_wav_duration(tts_path) original_pauses = _detect_pauses(words) tts_silences = _find_tts_silences(tts_path) total_tts_silence = sum(s["duration"] for s in tts_silences) hard_cap = target_duration + max_overflow overshoot_vs_cap = tts_duration - hard_cap if tts_duration > target_duration * 1.02: if tts_silences and total_tts_silence > 0: if overshoot_vs_cap <= 0: # Already within hard_cap once we factor in the borrow budget — keep TTS as-is. import shutil shutil.copy(tts_path, synced_path) print(f"[s5] pause-aware: within +{max_overflow:.2f}s borrow, no compression") else: removable = min(total_tts_silence * 0.9, overshoot_vs_cap) if removable >= overshoot_vs_cap: compression_ratio = 1.0 - (removable / total_tts_silence) _compress_silences(tts_path, synced_path, tts_silences, compression_ratio) print(f"[s5] pause-aware: compressed silences (ratio {compression_ratio:.2f}, +{max_overflow:.2f}s borrow)") else: _compress_silences(tts_path, synced_path, tts_silences, 0.1) # keep 10% remaining_dur = _get_wav_duration(synced_path) speed_factor = remaining_dur / hard_cap if hard_cap > 0 else max_speed if speed_factor > max_speed: print(f"[s5] pause-aware: WARNING speed x{speed_factor:.2f} exceeds max, capping at x{max_speed} (will overflow next gap)") speed_factor = max_speed print(f"[s5] pause-aware: compressed silences + speedup x{speed_factor:.2f} (+{max_overflow:.2f}s borrow)") tmp = synced_path + ".tmp.wav" _speedup_audio(synced_path, tmp, speed_factor) os.replace(tmp, synced_path) else: # No silences detected — uniform speedup, but use hard_cap as the target. speed_factor = tts_duration / hard_cap if hard_cap > 0 else max_speed if speed_factor > max_speed: print(f"[s5] pause-aware: WARNING speed x{speed_factor:.2f} exceeds max, capping at x{max_speed} (will overflow next gap)") speed_factor = max_speed print(f"[s5] pause-aware: uniform speedup x{speed_factor:.2f} (no silences, +{max_overflow:.2f}s borrow)") _speedup_audio(tts_path, synced_path, speed_factor) elif tts_duration < target_duration * 0.98: shortfall = target_duration - tts_duration if tts_silences and original_pauses: # Distribute padding at detected silence positions _distribute_padding(tts_path, synced_path, tts_silences, shortfall, target_duration) print(f"[s5] pause-aware: distributed {shortfall:.2f}s padding across {len(tts_silences)} pause points") else: # No pause points — pad at end _pad_silence(tts_path, synced_path, target_duration) print(f"[s5] pause-aware: padded {shortfall:.2f}s at end (no pause points)") else: import shutil shutil.copy(tts_path, synced_path) def _compress_silences(input_path: str, output_path: str, silences: list[dict], keep_ratio: float) -> None: """Rewrite WAV with silence regions compressed to keep_ratio of their original duration.""" samples, sr = _read_wav_samples(input_path) out = array.array("h") prev_end_sample = 0 for sil in silences: sil_start = int(sil["start"] * sr) sil_end = int(sil["end"] * sr) # Copy speech before this silence out.extend(samples[prev_end_sample:sil_start]) # Keep only keep_ratio of the silence kept_samples = int((sil_end - sil_start) * keep_ratio) if kept_samples > 0: out.extend(samples[sil_start:sil_start + kept_samples]) prev_end_sample = sil_end # Copy remaining speech after last silence out.extend(samples[prev_end_sample:]) _write_wav_samples(out, sr, output_path) def _distribute_padding(input_path: str, output_path: str, tts_silences: list[dict], shortfall: float, target_duration: float) -> None: """Insert extra silence distributed across detected pause points.""" samples, sr = _read_wav_samples(input_path) n_points = len(tts_silences) pad_per_point = shortfall / n_points out = array.array("h") prev_end_sample = 0 for sil in tts_silences: sil_end = int(sil["end"] * sr) # Copy everything up to end of this silence region out.extend(samples[prev_end_sample:sil_end]) # Insert extra silence extra_samples = int(pad_per_point * sr) out.extend(array.array("h", [0] * extra_samples)) prev_end_sample = sil_end # Copy remaining audio out.extend(samples[prev_end_sample:]) _write_wav_samples(out, sr, output_path) # Trim to exact target if slightly over due to rounding actual = len(out) / sr if actual > target_duration * 1.02: _trim_audio(output_path, output_path, target_duration) def _generate_silence(output_path: str, duration: float, sample_rate: int = 16000) -> None: """Generate a silent WAV file of given duration.""" num_samples = int(duration * sample_rate) with wave.open(output_path, "w") as f: f.setnchannels(1) f.setsampwidth(2) # 16-bit f.setframerate(sample_rate) f.writeframes(b"\x00\x00" * num_samples) def sync_and_stitch( segments: list[dict], output_path: str = "tmp/audio/final_audio.wav", synced_dir: str = "tmp/audio/tts_synced", max_speed: float = 1.8, ) -> str: """ Sync each TTS segment to its original timestamp window and stitch into a single WAV. Args: segments: List of dicts with {start, end, tts_path}. output_path: Where to write the final stitched audio. synced_dir: Temp directory for per-segment synced WAVs. max_speed: Maximum allowed speedup factor (default 1.8x to preserve naturalness). Returns: Path to the final stitched audio WAV. """ Path(synced_dir).mkdir(parents=True, exist_ok=True) Path(output_path).parent.mkdir(parents=True, exist_ok=True) # Detect TTS sample rate from the first segment with wave.open(segments[0]["tts_path"], 'r') as f: tts_sample_rate = f.getframerate() print(f"[s5] TTS sample rate: {tts_sample_rate} Hz") concat_list_path = "tmp/concat_list.txt" concat_entries = [] # Track the real wall-clock playback cursor. When a segment overflows its # original window, the cursor moves past the segment's nominal end, and the # next inter-segment silence shrinks accordingly — overflow is absorbed by # the following gap instead of being trimmed off the end of the audio. playback_cursor = 0.0 for i, seg in enumerate(segments): start = seg["start"] end = seg["end"] target_duration = end - start tts_path = seg["tts_path"] # Fill gap before this segment with silence — but only as much as the # cursor is actually behind. If a prior segment overflowed past `start`, # `gap` goes negative and we skip the silence (and start slightly late). gap = start - playback_cursor if gap > 0.01: sil_path = os.path.join(synced_dir, f"silence_{i:04d}.wav") _generate_silence(sil_path, gap, sample_rate=tts_sample_rate) concat_entries.append(sil_path) playback_cursor += gap elif gap < -0.05: print(f"[s5] Seg {i}: running {-gap:.2f}s behind original timeline (prior overflow absorbed)") # Borrow budget: how much we may overflow `target_duration` without # trimming. We can use the silence between this segment's `end` and the # next segment's `start`. Last segment has no follower → 0. if i + 1 < len(segments): allowed_overflow = max(segments[i + 1]["start"] - end, 0.0) else: allowed_overflow = 0.0 tts_duration = _get_wav_duration(tts_path) synced_path = os.path.join(synced_dir, f"synced_{i:04d}.wav") hard_cap = target_duration + allowed_overflow words = seg.get("words") if words and len(words) > 1: print(f"[s5] Seg {i}: pause-aware sync ({tts_duration:.2f}s -> {target_duration:.2f}s, +{allowed_overflow:.2f}s borrow)") _pause_aware_sync(tts_path, synced_path, target_duration, words, max_speed, max_overflow=allowed_overflow) elif tts_duration > target_duration * 1.02: # Speed up only as far as needed to land within hard_cap; if the # required factor exceeds max_speed, cap it and let it overflow — # the next gap will shrink to absorb it. Never trim. if tts_duration <= hard_cap: speed_factor = 1.0 else: speed_factor = tts_duration / hard_cap if hard_cap > 0 else max_speed if speed_factor > max_speed: print(f"[s5] Seg {i}: WARNING speed x{speed_factor:.2f} exceeds max, capping at x{max_speed} (will overflow next gap)") speed_factor = max_speed if speed_factor > 1.001: print(f"[s5] Seg {i}: speeding up x{speed_factor:.2f} (+{allowed_overflow:.2f}s borrow)") _speedup_audio(tts_path, synced_path, speed_factor) else: import shutil shutil.copy(tts_path, synced_path) print(f"[s5] Seg {i}: within +{allowed_overflow:.2f}s borrow, no speedup") elif tts_duration < target_duration * 0.98: print(f"[s5] Seg {i}: padding {target_duration - tts_duration:.2f}s silence") _pad_silence(tts_path, synced_path, target_duration) else: import shutil shutil.copy(tts_path, synced_path) concat_entries.append(synced_path) playback_cursor += _get_wav_duration(synced_path) # Write concat list for ffmpeg with open(concat_list_path, "w") as f: for entry in concat_entries: abs_entry = os.path.abspath(entry) f.write(f"file '{abs_entry}'\n") # Concatenate all segments (re-encode to normalize sample rates) cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", concat_list_path, "-ar", str(tts_sample_rate), "-ac", "1", "-acodec", "pcm_s16le", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg concat failed:\n{result.stderr}") print(f"[s5] Audio sync complete → {output_path} ✓") return output_path