Spaces:
Running on Zero
Running on Zero
File size: 8,236 Bytes
0422215 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 | """
Voice clone playground β single-engine TTS from a sample + text input.
This Space runs only ONE engine (s4_tts enforces TTS_ENGINE match), so the
endpoint accepts no engine parameter. The frontend is responsible for fanning
out to multiple Spaces when the user wants comparison output.
Long text is split into ~200-char chunks at sentence/word boundaries and
synthesised as multiple segments, then concatenated into one MP3.
"""
from __future__ import annotations
import os
import re
import subprocess
from pathlib import Path
from steps.s4_tts import synthesise_segments
_AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac"}
def _prepare_sample(sample_path: Path, out_dir: Path) -> Path:
"""Convert any uploaded sample (audio or video) to a clean 24kHz mono WAV.
TTS internals (s4_tts) call torchaudio.load via libsndfile, which only
understands WAV/FLAC. Anything else β including MP4 video, MP3, M4A β
has to be re-encoded first. We do this here so callers don't need to.
"""
out = out_dir / "sample_prepared.wav"
cmd = [
"ffmpeg", "-y", "-i", str(sample_path),
"-vn", # drop video stream if present
"-ac", "1", # mono
"-ar", "24000", # 24kHz β sweet spot for the TTS engines
"-acodec", "pcm_s16le",
str(out),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
if result.returncode != 0:
raise ValueError(
"Couldn't read the uploaded sample. Use a clean audio file "
"(WAV, MP3, M4A) or a video with an audio track."
)
return out
def _isolate_vocals(prepared_sample: Path, out_dir: Path) -> Path:
"""Run Demucs source separation on the prepared sample and return a
vocals-only WAV (24kHz mono) suitable for TTS reference.
Mirrors what the dub pipeline (steps.s1b_separate) does so cloned voice
doesn't pick up music / ambient noise from the uploaded sample. Falls back
to the raw prepared sample if separation fails (model missing, oom, etc.)
rather than failing the whole clone request.
"""
try:
from steps.s1b_separate import separate_audio
except ImportError as e:
print(f"[voice_clone] Demucs unavailable, skipping vocal isolation: {e}")
return prepared_sample
separate_dir = out_dir / "separate"
separate_dir.mkdir(parents=True, exist_ok=True)
try:
vocals_16k_path, _accompaniment = separate_audio(str(prepared_sample), str(separate_dir))
except Exception as e:
print(f"[voice_clone] Demucs separation failed, using raw sample: {e}")
return prepared_sample
# Resample vocals from 16 kHz mono β 24 kHz mono for the TTS engines
vocals_24k = out_dir / "vocals_24k.wav"
cmd = [
"ffmpeg", "-y", "-i", vocals_16k_path,
"-ac", "1", "-ar", "24000",
"-acodec", "pcm_s16le",
str(vocals_24k),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f"[voice_clone] Vocals resample failed, using 16kHz: {result.stderr[-200:]}")
return Path(vocals_16k_path)
return vocals_24k
CHUNK_TARGET_CHARS = 200
CHUNK_HARD_MAX = 280 # under chatterbox's 300-char per-segment ceiling
def _split_text(text: str) -> list[str]:
"""Split into chunks of ~CHUNK_TARGET_CHARS at sentence then word boundaries."""
text = text.strip()
if not text:
return []
if len(text) <= CHUNK_HARD_MAX:
return [text]
# First pass: sentence boundaries
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks: list[str] = []
current = ""
for sent in sentences:
if not sent.strip():
continue
if len(current) + 1 + len(sent) <= CHUNK_TARGET_CHARS:
current = f"{current} {sent}".strip() if current else sent
else:
if current:
chunks.append(current)
# Sentence itself may exceed target β break it on words
if len(sent) > CHUNK_HARD_MAX:
words = sent.split()
buf = ""
for w in words:
if len(buf) + 1 + len(w) > CHUNK_HARD_MAX:
if buf:
chunks.append(buf)
buf = w
else:
buf = f"{buf} {w}".strip() if buf else w
if buf:
current = buf
else:
current = ""
else:
current = sent
if current:
chunks.append(current)
return chunks
def _build_segments(chunks: list[str], chunk_secs: float = 8.0) -> list[dict]:
"""Construct segment dicts for synthesise_segments β fake timing windows."""
segs = []
cursor = 0.0
for text in chunks:
# Allocate a generous window so _trim_to_duration doesn't clip output.
# Headroom is 1.4Γ so 8s window allows up to ~11s of audio per chunk.
segs.append({
"start": cursor,
"end": cursor + chunk_secs,
"text": text,
"translated_text": text,
"tts_text": text,
})
cursor += chunk_secs
return segs
def _concat_wavs_to_mp3(wav_paths: list[Path], dest: Path) -> Path:
"""Concat in order via ffmpeg concat demuxer, then encode MP3."""
if not wav_paths:
raise RuntimeError("No TTS chunks to concatenate.")
if len(wav_paths) == 1:
cmd = [
"ffmpeg", "-y", "-i", str(wav_paths[0]),
"-codec:a", "libmp3lame", "-b:a", "192k",
str(dest),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg encode failed: {result.stderr[-300:]}")
return dest
list_file = dest.with_suffix(".txt")
list_file.write_text(
"\n".join(f"file '{p.as_posix()}'" for p in wav_paths),
encoding="utf-8",
)
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", str(list_file),
"-codec:a", "libmp3lame", "-b:a", "192k",
str(dest),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
list_file.unlink(missing_ok=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg concat failed: {result.stderr[-300:]}")
return dest
def clone_voice(
*,
sample_path: Path,
text: str,
out_dir: Path,
language_id: str = "en",
) -> dict:
"""
Run TTS on `text` using the voice from `sample_path`. Returns:
{
"filename": "voice.mp3",
"engine": <current TTS_ENGINE>,
"chunks": <int>,
}
"""
text = (text or "").strip()
if not text:
raise ValueError("Text is required.")
chunks = _split_text(text)
segments = _build_segments(chunks)
# Normalise the sample (handles video, mp3, m4a, etc.) β 24kHz mono WAV
prepared_sample = _prepare_sample(sample_path, out_dir)
# Demucs source separation β isolate vocals so the clone doesn't pick up
# background music or ambient noise. Same step the dub pipeline uses.
reference_for_tts = _isolate_vocals(prepared_sample, out_dir)
seg_out_dir = out_dir / "tts"
seg_out_dir.mkdir(parents=True, exist_ok=True)
tts_result = None
for msg in synthesise_segments(
segments=segments,
reference_audio_path=str(reference_for_tts),
language_id=language_id,
output_dir=str(seg_out_dir),
):
if isinstance(msg, dict) and "__TTS_RESULT__" in msg:
tts_result = msg["__TTS_RESULT__"]
if not tts_result:
raise RuntimeError("TTS produced no output.")
wav_paths = [Path(seg["tts_path"]) for seg in tts_result if seg.get("tts_path")]
if not wav_paths:
raise RuntimeError("TTS result missing audio paths.")
mp3_path = _concat_wavs_to_mp3(wav_paths, out_dir / "voice.mp3")
return {
"filename": mp3_path.name,
"engine": os.getenv("TTS_ENGINE", "chatterbox").lower(),
"chunks": len(chunks),
}
|