videovoice / steps /s5_sync.py
github-actions[bot]
deploy: switch to chatterbox requirements @ 9fe21f0
82e7376
"""
Step 6: Audio sync — match synthesised segment durations to original timestamps.
For each segment:
- Too long → speed up using ffmpeg atempo filter
- Too short → pad with silence at the end
Then stitch all segments into a single final audio track.
"""
import array
import math
import os
import subprocess
import wave
from pathlib import Path
def _get_wav_duration(wav_path: str) -> float:
with wave.open(wav_path, 'r') as f:
frames = f.getnframes()
rate = f.getframerate()
return frames / float(rate)
def _speedup_audio(input_path: str, output_path: str, factor: float) -> None:
"""Speed up/slow down audio by factor using ffmpeg atempo (supports 0.5–100x via chaining)."""
# atempo supports 0.5 to 2.0, chain filters for larger factors
filters = []
remaining = factor
while remaining > 2.0:
filters.append("atempo=2.0")
remaining /= 2.0
while remaining < 0.5:
filters.append("atempo=0.5")
remaining /= 0.5
filters.append(f"atempo={remaining:.4f}")
filter_str = ",".join(filters)
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-filter:a", filter_str,
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg atempo failed:\n{result.stderr}")
def _pad_silence(input_path: str, output_path: str, target_duration: float) -> None:
"""Pad audio with silence to reach target_duration seconds."""
current = _get_wav_duration(input_path)
pad_seconds = max(0, target_duration - current)
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-af", f"apad=pad_dur={pad_seconds:.4f}",
"-t", str(target_duration),
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg apad failed:\n{result.stderr}")
def _trim_audio(input_path: str, output_path: str, duration: float) -> None:
"""Trim audio to exactly duration seconds."""
tmp = output_path + ".trim.wav"
cmd = ["ffmpeg", "-y", "-i", input_path, "-t", str(duration), tmp]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg trim failed:\n{result.stderr}")
os.replace(tmp, output_path)
def _detect_pauses(words: list[dict], min_pause: float = 0.15) -> list[dict]:
"""Find gaps between consecutive words that exceed min_pause seconds.
Returns list of {after_word_idx, position, duration} sorted by position.
"""
pauses = []
for i in range(len(words) - 1):
gap = words[i + 1]["start"] - words[i]["end"]
if gap >= min_pause:
pauses.append({
"after_word_idx": i,
"position": words[i]["end"],
"duration": gap,
})
return pauses
def _find_tts_silences(wav_path: str, threshold_db: float = -35.0,
min_dur: float = 0.08) -> list[dict]:
"""Find silence regions in a TTS WAV using RMS energy.
Returns list of {start, end, duration} for each detected silence region.
"""
with wave.open(wav_path, "r") as f:
n_frames = f.getnframes()
sample_rate = f.getframerate()
raw = f.readframes(n_frames)
# Convert raw bytes to 16-bit signed samples
samples = array.array("h", raw)
window_size = int(0.02 * sample_rate) # 20 ms windows
hop = window_size // 2
threshold_linear = 10 ** (threshold_db / 20.0) * 32768 # dBFS to linear amplitude
silences: list[dict] = []
in_silence = False
silence_start = 0.0
for pos in range(0, len(samples) - window_size, hop):
chunk = samples[pos:pos + window_size]
rms = math.sqrt(sum(s * s for s in chunk) / window_size)
t = pos / sample_rate
if rms < threshold_linear:
if not in_silence:
in_silence = True
silence_start = t
else:
if in_silence:
dur = t - silence_start
if dur >= min_dur:
silences.append({"start": silence_start, "end": t, "duration": dur})
in_silence = False
# Close trailing silence
if in_silence:
t_end = len(samples) / sample_rate
dur = t_end - silence_start
if dur >= min_dur:
silences.append({"start": silence_start, "end": t_end, "duration": dur})
return silences
def _read_wav_samples(wav_path: str) -> tuple[array.array, int]:
"""Read a mono 16-bit WAV and return (samples, sample_rate)."""
with wave.open(wav_path, "r") as f:
sr = f.getframerate()
raw = f.readframes(f.getnframes())
return array.array("h", raw), sr
def _write_wav_samples(samples: array.array, sample_rate: int, output_path: str) -> None:
"""Write 16-bit mono samples to a WAV file."""
with wave.open(output_path, "w") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(sample_rate)
f.writeframes(samples.tobytes())
def _pause_aware_sync(tts_path: str, synced_path: str, target_duration: float,
words: list[dict], max_speed: float,
max_overflow: float = 0.0) -> None:
"""Sync TTS audio using pause-aware strategy: compress silences first, then atempo.
When TTS is too long: shrink detected silence regions before speeding up speech.
When TTS is too short: distribute extra padding at natural pause points.
`max_overflow`: extra seconds the synced output may exceed target_duration without
trimming. The caller borrows this budget from the inter-segment silence that follows,
so we never silently drop trailing words just to hit `target_duration` exactly.
"""
tts_duration = _get_wav_duration(tts_path)
original_pauses = _detect_pauses(words)
tts_silences = _find_tts_silences(tts_path)
total_tts_silence = sum(s["duration"] for s in tts_silences)
hard_cap = target_duration + max_overflow
overshoot_vs_cap = tts_duration - hard_cap
if tts_duration > target_duration * 1.02:
if tts_silences and total_tts_silence > 0:
if overshoot_vs_cap <= 0:
# Already within hard_cap once we factor in the borrow budget — keep TTS as-is.
import shutil
shutil.copy(tts_path, synced_path)
print(f"[s5] pause-aware: within +{max_overflow:.2f}s borrow, no compression")
else:
removable = min(total_tts_silence * 0.9, overshoot_vs_cap)
if removable >= overshoot_vs_cap:
compression_ratio = 1.0 - (removable / total_tts_silence)
_compress_silences(tts_path, synced_path, tts_silences, compression_ratio)
print(f"[s5] pause-aware: compressed silences (ratio {compression_ratio:.2f}, +{max_overflow:.2f}s borrow)")
else:
_compress_silences(tts_path, synced_path, tts_silences, 0.1) # keep 10%
remaining_dur = _get_wav_duration(synced_path)
speed_factor = remaining_dur / hard_cap if hard_cap > 0 else max_speed
if speed_factor > max_speed:
print(f"[s5] pause-aware: WARNING speed x{speed_factor:.2f} exceeds max, capping at x{max_speed} (will overflow next gap)")
speed_factor = max_speed
print(f"[s5] pause-aware: compressed silences + speedup x{speed_factor:.2f} (+{max_overflow:.2f}s borrow)")
tmp = synced_path + ".tmp.wav"
_speedup_audio(synced_path, tmp, speed_factor)
os.replace(tmp, synced_path)
else:
# No silences detected — uniform speedup, but use hard_cap as the target.
speed_factor = tts_duration / hard_cap if hard_cap > 0 else max_speed
if speed_factor > max_speed:
print(f"[s5] pause-aware: WARNING speed x{speed_factor:.2f} exceeds max, capping at x{max_speed} (will overflow next gap)")
speed_factor = max_speed
print(f"[s5] pause-aware: uniform speedup x{speed_factor:.2f} (no silences, +{max_overflow:.2f}s borrow)")
_speedup_audio(tts_path, synced_path, speed_factor)
elif tts_duration < target_duration * 0.98:
shortfall = target_duration - tts_duration
if tts_silences and original_pauses:
# Distribute padding at detected silence positions
_distribute_padding(tts_path, synced_path, tts_silences, shortfall, target_duration)
print(f"[s5] pause-aware: distributed {shortfall:.2f}s padding across {len(tts_silences)} pause points")
else:
# No pause points — pad at end
_pad_silence(tts_path, synced_path, target_duration)
print(f"[s5] pause-aware: padded {shortfall:.2f}s at end (no pause points)")
else:
import shutil
shutil.copy(tts_path, synced_path)
def _compress_silences(input_path: str, output_path: str,
silences: list[dict], keep_ratio: float) -> None:
"""Rewrite WAV with silence regions compressed to keep_ratio of their original duration."""
samples, sr = _read_wav_samples(input_path)
out = array.array("h")
prev_end_sample = 0
for sil in silences:
sil_start = int(sil["start"] * sr)
sil_end = int(sil["end"] * sr)
# Copy speech before this silence
out.extend(samples[prev_end_sample:sil_start])
# Keep only keep_ratio of the silence
kept_samples = int((sil_end - sil_start) * keep_ratio)
if kept_samples > 0:
out.extend(samples[sil_start:sil_start + kept_samples])
prev_end_sample = sil_end
# Copy remaining speech after last silence
out.extend(samples[prev_end_sample:])
_write_wav_samples(out, sr, output_path)
def _distribute_padding(input_path: str, output_path: str,
tts_silences: list[dict], shortfall: float,
target_duration: float) -> None:
"""Insert extra silence distributed across detected pause points."""
samples, sr = _read_wav_samples(input_path)
n_points = len(tts_silences)
pad_per_point = shortfall / n_points
out = array.array("h")
prev_end_sample = 0
for sil in tts_silences:
sil_end = int(sil["end"] * sr)
# Copy everything up to end of this silence region
out.extend(samples[prev_end_sample:sil_end])
# Insert extra silence
extra_samples = int(pad_per_point * sr)
out.extend(array.array("h", [0] * extra_samples))
prev_end_sample = sil_end
# Copy remaining audio
out.extend(samples[prev_end_sample:])
_write_wav_samples(out, sr, output_path)
# Trim to exact target if slightly over due to rounding
actual = len(out) / sr
if actual > target_duration * 1.02:
_trim_audio(output_path, output_path, target_duration)
def _generate_silence(output_path: str, duration: float, sample_rate: int = 16000) -> None:
"""Generate a silent WAV file of given duration."""
num_samples = int(duration * sample_rate)
with wave.open(output_path, "w") as f:
f.setnchannels(1)
f.setsampwidth(2) # 16-bit
f.setframerate(sample_rate)
f.writeframes(b"\x00\x00" * num_samples)
def sync_and_stitch(
segments: list[dict],
output_path: str = "tmp/audio/final_audio.wav",
synced_dir: str = "tmp/audio/tts_synced",
max_speed: float = 1.8,
) -> str:
"""
Sync each TTS segment to its original timestamp window and stitch into a single WAV.
Args:
segments: List of dicts with {start, end, tts_path}.
output_path: Where to write the final stitched audio.
synced_dir: Temp directory for per-segment synced WAVs.
max_speed: Maximum allowed speedup factor (default 1.8x to preserve naturalness).
Returns:
Path to the final stitched audio WAV.
"""
Path(synced_dir).mkdir(parents=True, exist_ok=True)
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# Detect TTS sample rate from the first segment
with wave.open(segments[0]["tts_path"], 'r') as f:
tts_sample_rate = f.getframerate()
print(f"[s5] TTS sample rate: {tts_sample_rate} Hz")
concat_list_path = "tmp/concat_list.txt"
concat_entries = []
# Track the real wall-clock playback cursor. When a segment overflows its
# original window, the cursor moves past the segment's nominal end, and the
# next inter-segment silence shrinks accordingly — overflow is absorbed by
# the following gap instead of being trimmed off the end of the audio.
playback_cursor = 0.0
for i, seg in enumerate(segments):
start = seg["start"]
end = seg["end"]
target_duration = end - start
tts_path = seg["tts_path"]
# Fill gap before this segment with silence — but only as much as the
# cursor is actually behind. If a prior segment overflowed past `start`,
# `gap` goes negative and we skip the silence (and start slightly late).
gap = start - playback_cursor
if gap > 0.01:
sil_path = os.path.join(synced_dir, f"silence_{i:04d}.wav")
_generate_silence(sil_path, gap, sample_rate=tts_sample_rate)
concat_entries.append(sil_path)
playback_cursor += gap
elif gap < -0.05:
print(f"[s5] Seg {i}: running {-gap:.2f}s behind original timeline (prior overflow absorbed)")
# Borrow budget: how much we may overflow `target_duration` without
# trimming. We can use the silence between this segment's `end` and the
# next segment's `start`. Last segment has no follower → 0.
if i + 1 < len(segments):
allowed_overflow = max(segments[i + 1]["start"] - end, 0.0)
else:
allowed_overflow = 0.0
tts_duration = _get_wav_duration(tts_path)
synced_path = os.path.join(synced_dir, f"synced_{i:04d}.wav")
hard_cap = target_duration + allowed_overflow
words = seg.get("words")
if words and len(words) > 1:
print(f"[s5] Seg {i}: pause-aware sync ({tts_duration:.2f}s -> {target_duration:.2f}s, +{allowed_overflow:.2f}s borrow)")
_pause_aware_sync(tts_path, synced_path, target_duration, words, max_speed,
max_overflow=allowed_overflow)
elif tts_duration > target_duration * 1.02:
# Speed up only as far as needed to land within hard_cap; if the
# required factor exceeds max_speed, cap it and let it overflow —
# the next gap will shrink to absorb it. Never trim.
if tts_duration <= hard_cap:
speed_factor = 1.0
else:
speed_factor = tts_duration / hard_cap if hard_cap > 0 else max_speed
if speed_factor > max_speed:
print(f"[s5] Seg {i}: WARNING speed x{speed_factor:.2f} exceeds max, capping at x{max_speed} (will overflow next gap)")
speed_factor = max_speed
if speed_factor > 1.001:
print(f"[s5] Seg {i}: speeding up x{speed_factor:.2f} (+{allowed_overflow:.2f}s borrow)")
_speedup_audio(tts_path, synced_path, speed_factor)
else:
import shutil
shutil.copy(tts_path, synced_path)
print(f"[s5] Seg {i}: within +{allowed_overflow:.2f}s borrow, no speedup")
elif tts_duration < target_duration * 0.98:
print(f"[s5] Seg {i}: padding {target_duration - tts_duration:.2f}s silence")
_pad_silence(tts_path, synced_path, target_duration)
else:
import shutil
shutil.copy(tts_path, synced_path)
concat_entries.append(synced_path)
playback_cursor += _get_wav_duration(synced_path)
# Write concat list for ffmpeg
with open(concat_list_path, "w") as f:
for entry in concat_entries:
abs_entry = os.path.abspath(entry)
f.write(f"file '{abs_entry}'\n")
# Concatenate all segments (re-encode to normalize sample rates)
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", concat_list_path,
"-ar", str(tts_sample_rate),
"-ac", "1",
"-acodec", "pcm_s16le",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg concat failed:\n{result.stderr}")
print(f"[s5] Audio sync complete → {output_path} ✓")
return output_path