Spaces:
Running on Zero
Running on Zero
| """ | |
| Voice clone playground — single-engine TTS from a sample + text input. | |
| This Space runs only ONE engine (s4_tts enforces TTS_ENGINE match), so the | |
| endpoint accepts no engine parameter. The frontend is responsible for fanning | |
| out to multiple Spaces when the user wants comparison output. | |
| Long text is split into ~200-char chunks at sentence/word boundaries and | |
| synthesised as multiple segments, then concatenated into one MP3. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import subprocess | |
| from pathlib import Path | |
| from steps.s4_tts import synthesise_segments | |
| _AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac"} | |
| def _prepare_sample(sample_path: Path, out_dir: Path) -> Path: | |
| """Convert any uploaded sample (audio or video) to a clean 24kHz mono WAV. | |
| TTS internals (s4_tts) call torchaudio.load via libsndfile, which only | |
| understands WAV/FLAC. Anything else — including MP4 video, MP3, M4A — | |
| has to be re-encoded first. We do this here so callers don't need to. | |
| """ | |
| out = out_dir / "sample_prepared.wav" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(sample_path), | |
| "-vn", # drop video stream if present | |
| "-ac", "1", # mono | |
| "-ar", "24000", # 24kHz — sweet spot for the TTS engines | |
| "-acodec", "pcm_s16le", | |
| str(out), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) | |
| if result.returncode != 0: | |
| raise ValueError( | |
| "Couldn't read the uploaded sample. Use a clean audio file " | |
| "(WAV, MP3, M4A) or a video with an audio track." | |
| ) | |
| return out | |
| def _isolate_vocals(prepared_sample: Path, out_dir: Path) -> Path: | |
| """Run Demucs source separation on the prepared sample and return a | |
| vocals-only WAV (24kHz mono) suitable for TTS reference. | |
| Mirrors what the dub pipeline (steps.s1b_separate) does so cloned voice | |
| doesn't pick up music / ambient noise from the uploaded sample. Falls back | |
| to the raw prepared sample if separation fails (model missing, oom, etc.) | |
| rather than failing the whole clone request. | |
| """ | |
| try: | |
| from steps.s1b_separate import separate_audio | |
| except ImportError as e: | |
| print(f"[voice_clone] Demucs unavailable, skipping vocal isolation: {e}") | |
| return prepared_sample | |
| separate_dir = out_dir / "separate" | |
| separate_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| vocals_16k_path, _accompaniment = separate_audio(str(prepared_sample), str(separate_dir)) | |
| except Exception as e: | |
| print(f"[voice_clone] Demucs separation failed, using raw sample: {e}") | |
| return prepared_sample | |
| # Resample vocals from 16 kHz mono → 24 kHz mono for the TTS engines | |
| vocals_24k = out_dir / "vocals_24k.wav" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", vocals_16k_path, | |
| "-ac", "1", "-ar", "24000", | |
| "-acodec", "pcm_s16le", | |
| str(vocals_24k), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) | |
| if result.returncode != 0: | |
| print(f"[voice_clone] Vocals resample failed, using 16kHz: {result.stderr[-200:]}") | |
| return Path(vocals_16k_path) | |
| return vocals_24k | |
| CHUNK_TARGET_CHARS = 200 | |
| CHUNK_HARD_MAX = 280 # under chatterbox's 300-char per-segment ceiling | |
| def _split_text(text: str) -> list[str]: | |
| """Split into chunks of ~CHUNK_TARGET_CHARS at sentence then word boundaries.""" | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| if len(text) <= CHUNK_HARD_MAX: | |
| return [text] | |
| # First pass: sentence boundaries | |
| sentences = re.split(r"(?<=[.!?])\s+", text) | |
| chunks: list[str] = [] | |
| current = "" | |
| for sent in sentences: | |
| if not sent.strip(): | |
| continue | |
| if len(current) + 1 + len(sent) <= CHUNK_TARGET_CHARS: | |
| current = f"{current} {sent}".strip() if current else sent | |
| else: | |
| if current: | |
| chunks.append(current) | |
| # Sentence itself may exceed target — break it on words | |
| if len(sent) > CHUNK_HARD_MAX: | |
| words = sent.split() | |
| buf = "" | |
| for w in words: | |
| if len(buf) + 1 + len(w) > CHUNK_HARD_MAX: | |
| if buf: | |
| chunks.append(buf) | |
| buf = w | |
| else: | |
| buf = f"{buf} {w}".strip() if buf else w | |
| if buf: | |
| current = buf | |
| else: | |
| current = "" | |
| else: | |
| current = sent | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| def _build_segments(chunks: list[str], chunk_secs: float = 8.0) -> list[dict]: | |
| """Construct segment dicts for synthesise_segments — fake timing windows.""" | |
| segs = [] | |
| cursor = 0.0 | |
| for text in chunks: | |
| # Allocate a generous window so _trim_to_duration doesn't clip output. | |
| # Headroom is 1.4× so 8s window allows up to ~11s of audio per chunk. | |
| segs.append({ | |
| "start": cursor, | |
| "end": cursor + chunk_secs, | |
| "text": text, | |
| "translated_text": text, | |
| "tts_text": text, | |
| }) | |
| cursor += chunk_secs | |
| return segs | |
| def _concat_wavs_to_mp3(wav_paths: list[Path], dest: Path) -> Path: | |
| """Concat in order via ffmpeg concat demuxer, then encode MP3.""" | |
| if not wav_paths: | |
| raise RuntimeError("No TTS chunks to concatenate.") | |
| if len(wav_paths) == 1: | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(wav_paths[0]), | |
| "-codec:a", "libmp3lame", "-b:a", "192k", | |
| str(dest), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg encode failed: {result.stderr[-300:]}") | |
| return dest | |
| list_file = dest.with_suffix(".txt") | |
| list_file.write_text( | |
| "\n".join(f"file '{p.as_posix()}'" for p in wav_paths), | |
| encoding="utf-8", | |
| ) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "concat", "-safe", "0", | |
| "-i", str(list_file), | |
| "-codec:a", "libmp3lame", "-b:a", "192k", | |
| str(dest), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) | |
| list_file.unlink(missing_ok=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg concat failed: {result.stderr[-300:]}") | |
| return dest | |
| def clone_voice( | |
| *, | |
| sample_path: Path, | |
| text: str, | |
| out_dir: Path, | |
| language_id: str = "en", | |
| ) -> dict: | |
| """ | |
| Run TTS on `text` using the voice from `sample_path`. Returns: | |
| { | |
| "filename": "voice.mp3", | |
| "engine": <current TTS_ENGINE>, | |
| "chunks": <int>, | |
| } | |
| """ | |
| text = (text or "").strip() | |
| if not text: | |
| raise ValueError("Text is required.") | |
| chunks = _split_text(text) | |
| segments = _build_segments(chunks) | |
| # Normalise the sample (handles video, mp3, m4a, etc.) → 24kHz mono WAV | |
| prepared_sample = _prepare_sample(sample_path, out_dir) | |
| # Demucs source separation → isolate vocals so the clone doesn't pick up | |
| # background music or ambient noise. Same step the dub pipeline uses. | |
| reference_for_tts = _isolate_vocals(prepared_sample, out_dir) | |
| seg_out_dir = out_dir / "tts" | |
| seg_out_dir.mkdir(parents=True, exist_ok=True) | |
| tts_result = None | |
| for msg in synthesise_segments( | |
| segments=segments, | |
| reference_audio_path=str(reference_for_tts), | |
| language_id=language_id, | |
| output_dir=str(seg_out_dir), | |
| ): | |
| if isinstance(msg, dict) and "__TTS_RESULT__" in msg: | |
| tts_result = msg["__TTS_RESULT__"] | |
| if not tts_result: | |
| raise RuntimeError("TTS produced no output.") | |
| wav_paths = [Path(seg["tts_path"]) for seg in tts_result if seg.get("tts_path")] | |
| if not wav_paths: | |
| raise RuntimeError("TTS result missing audio paths.") | |
| mp3_path = _concat_wavs_to_mp3(wav_paths, out_dir / "voice.mp3") | |
| return { | |
| "filename": mp3_path.name, | |
| "engine": os.getenv("TTS_ENGINE", "chatterbox").lower(), | |
| "chunks": len(chunks), | |
| } | |