videovoice / tools_api /voice_clone.py
github-actions[bot]
deploy: switch to chatterbox requirements @ 4319730
5b7cd5f
"""
Voice clone playground — single-engine TTS from a sample + text input.
This Space runs only ONE engine (s4_tts enforces TTS_ENGINE match), so the
endpoint accepts no engine parameter. The frontend is responsible for fanning
out to multiple Spaces when the user wants comparison output.
Long text is split into ~200-char chunks at sentence/word boundaries and
synthesised as multiple segments, then concatenated into one MP3.
"""
from __future__ import annotations
import os
import re
import subprocess
from pathlib import Path
from steps.s4_tts import synthesise_segments
_AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac"}
def _prepare_sample(sample_path: Path, out_dir: Path) -> Path:
"""Convert any uploaded sample (audio or video) to a clean 24kHz mono WAV.
TTS internals (s4_tts) call torchaudio.load via libsndfile, which only
understands WAV/FLAC. Anything else — including MP4 video, MP3, M4A —
has to be re-encoded first. We do this here so callers don't need to.
"""
out = out_dir / "sample_prepared.wav"
cmd = [
"ffmpeg", "-y", "-i", str(sample_path),
"-vn", # drop video stream if present
"-ac", "1", # mono
"-ar", "24000", # 24kHz — sweet spot for the TTS engines
"-acodec", "pcm_s16le",
str(out),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
if result.returncode != 0:
raise ValueError(
"Couldn't read the uploaded sample. Use a clean audio file "
"(WAV, MP3, M4A) or a video with an audio track."
)
return out
def _isolate_vocals(prepared_sample: Path, out_dir: Path) -> Path:
"""Run Demucs source separation on the prepared sample and return a
vocals-only WAV (24kHz mono) suitable for TTS reference.
Mirrors what the dub pipeline (steps.s1b_separate) does so cloned voice
doesn't pick up music / ambient noise from the uploaded sample. Falls back
to the raw prepared sample if separation fails (model missing, oom, etc.)
rather than failing the whole clone request.
"""
try:
from steps.s1b_separate import separate_audio
except ImportError as e:
print(f"[voice_clone] Demucs unavailable, skipping vocal isolation: {e}")
return prepared_sample
separate_dir = out_dir / "separate"
separate_dir.mkdir(parents=True, exist_ok=True)
try:
vocals_16k_path, _accompaniment = separate_audio(str(prepared_sample), str(separate_dir))
except Exception as e:
print(f"[voice_clone] Demucs separation failed, using raw sample: {e}")
return prepared_sample
# Resample vocals from 16 kHz mono → 24 kHz mono for the TTS engines
vocals_24k = out_dir / "vocals_24k.wav"
cmd = [
"ffmpeg", "-y", "-i", vocals_16k_path,
"-ac", "1", "-ar", "24000",
"-acodec", "pcm_s16le",
str(vocals_24k),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"[voice_clone] Vocals resample failed, using 16kHz: {result.stderr[-200:]}")
return Path(vocals_16k_path)
return vocals_24k
CHUNK_TARGET_CHARS = 200
CHUNK_HARD_MAX = 280 # under chatterbox's 300-char per-segment ceiling
def _split_text(text: str) -> list[str]:
"""Split into chunks of ~CHUNK_TARGET_CHARS at sentence then word boundaries."""
text = text.strip()
if not text:
return []
if len(text) <= CHUNK_HARD_MAX:
return [text]
# First pass: sentence boundaries
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks: list[str] = []
current = ""
for sent in sentences:
if not sent.strip():
continue
if len(current) + 1 + len(sent) <= CHUNK_TARGET_CHARS:
current = f"{current} {sent}".strip() if current else sent
else:
if current:
chunks.append(current)
# Sentence itself may exceed target — break it on words
if len(sent) > CHUNK_HARD_MAX:
words = sent.split()
buf = ""
for w in words:
if len(buf) + 1 + len(w) > CHUNK_HARD_MAX:
if buf:
chunks.append(buf)
buf = w
else:
buf = f"{buf} {w}".strip() if buf else w
if buf:
current = buf
else:
current = ""
else:
current = sent
if current:
chunks.append(current)
return chunks
def _build_segments(chunks: list[str], chunk_secs: float = 8.0) -> list[dict]:
"""Construct segment dicts for synthesise_segments — fake timing windows."""
segs = []
cursor = 0.0
for text in chunks:
# Allocate a generous window so _trim_to_duration doesn't clip output.
# Headroom is 1.4× so 8s window allows up to ~11s of audio per chunk.
segs.append({
"start": cursor,
"end": cursor + chunk_secs,
"text": text,
"translated_text": text,
"tts_text": text,
})
cursor += chunk_secs
return segs
def _concat_wavs_to_mp3(wav_paths: list[Path], dest: Path) -> Path:
"""Concat in order via ffmpeg concat demuxer, then encode MP3."""
if not wav_paths:
raise RuntimeError("No TTS chunks to concatenate.")
if len(wav_paths) == 1:
cmd = [
"ffmpeg", "-y", "-i", str(wav_paths[0]),
"-codec:a", "libmp3lame", "-b:a", "192k",
str(dest),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg encode failed: {result.stderr[-300:]}")
return dest
list_file = dest.with_suffix(".txt")
list_file.write_text(
"\n".join(f"file '{p.as_posix()}'" for p in wav_paths),
encoding="utf-8",
)
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", str(list_file),
"-codec:a", "libmp3lame", "-b:a", "192k",
str(dest),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
list_file.unlink(missing_ok=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg concat failed: {result.stderr[-300:]}")
return dest
def clone_voice(
*,
sample_path: Path,
text: str,
out_dir: Path,
language_id: str = "en",
) -> dict:
"""
Run TTS on `text` using the voice from `sample_path`. Returns:
{
"filename": "voice.mp3",
"engine": <current TTS_ENGINE>,
"chunks": <int>,
}
"""
text = (text or "").strip()
if not text:
raise ValueError("Text is required.")
chunks = _split_text(text)
segments = _build_segments(chunks)
# Normalise the sample (handles video, mp3, m4a, etc.) → 24kHz mono WAV
prepared_sample = _prepare_sample(sample_path, out_dir)
# Demucs source separation → isolate vocals so the clone doesn't pick up
# background music or ambient noise. Same step the dub pipeline uses.
reference_for_tts = _isolate_vocals(prepared_sample, out_dir)
seg_out_dir = out_dir / "tts"
seg_out_dir.mkdir(parents=True, exist_ok=True)
tts_result = None
for msg in synthesise_segments(
segments=segments,
reference_audio_path=str(reference_for_tts),
language_id=language_id,
output_dir=str(seg_out_dir),
):
if isinstance(msg, dict) and "__TTS_RESULT__" in msg:
tts_result = msg["__TTS_RESULT__"]
if not tts_result:
raise RuntimeError("TTS produced no output.")
wav_paths = [Path(seg["tts_path"]) for seg in tts_result if seg.get("tts_path")]
if not wav_paths:
raise RuntimeError("TTS result missing audio paths.")
mp3_path = _concat_wavs_to_mp3(wav_paths, out_dir / "voice.mp3")
return {
"filename": mp3_path.name,
"engine": os.getenv("TTS_ENGINE", "chatterbox").lower(),
"chunks": len(chunks),
}