""" Voice clone playground — single-engine TTS from a sample + text input. This Space runs only ONE engine (s4_tts enforces TTS_ENGINE match), so the endpoint accepts no engine parameter. The frontend is responsible for fanning out to multiple Spaces when the user wants comparison output. Long text is split into ~200-char chunks at sentence/word boundaries and synthesised as multiple segments, then concatenated into one MP3. """ from __future__ import annotations import os import re import subprocess from pathlib import Path from steps.s4_tts import synthesise_segments _AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac"} def _prepare_sample(sample_path: Path, out_dir: Path) -> Path: """Convert any uploaded sample (audio or video) to a clean 24kHz mono WAV. TTS internals (s4_tts) call torchaudio.load via libsndfile, which only understands WAV/FLAC. Anything else — including MP4 video, MP3, M4A — has to be re-encoded first. We do this here so callers don't need to. """ out = out_dir / "sample_prepared.wav" cmd = [ "ffmpeg", "-y", "-i", str(sample_path), "-vn", # drop video stream if present "-ac", "1", # mono "-ar", "24000", # 24kHz — sweet spot for the TTS engines "-acodec", "pcm_s16le", str(out), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) if result.returncode != 0: raise ValueError( "Couldn't read the uploaded sample. Use a clean audio file " "(WAV, MP3, M4A) or a video with an audio track." ) return out def _isolate_vocals(prepared_sample: Path, out_dir: Path) -> Path: """Run Demucs source separation on the prepared sample and return a vocals-only WAV (24kHz mono) suitable for TTS reference. Mirrors what the dub pipeline (steps.s1b_separate) does so cloned voice doesn't pick up music / ambient noise from the uploaded sample. Falls back to the raw prepared sample if separation fails (model missing, oom, etc.) rather than failing the whole clone request. """ try: from steps.s1b_separate import separate_audio except ImportError as e: print(f"[voice_clone] Demucs unavailable, skipping vocal isolation: {e}") return prepared_sample separate_dir = out_dir / "separate" separate_dir.mkdir(parents=True, exist_ok=True) try: vocals_16k_path, _accompaniment = separate_audio(str(prepared_sample), str(separate_dir)) except Exception as e: print(f"[voice_clone] Demucs separation failed, using raw sample: {e}") return prepared_sample # Resample vocals from 16 kHz mono → 24 kHz mono for the TTS engines vocals_24k = out_dir / "vocals_24k.wav" cmd = [ "ffmpeg", "-y", "-i", vocals_16k_path, "-ac", "1", "-ar", "24000", "-acodec", "pcm_s16le", str(vocals_24k), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode != 0: print(f"[voice_clone] Vocals resample failed, using 16kHz: {result.stderr[-200:]}") return Path(vocals_16k_path) return vocals_24k CHUNK_TARGET_CHARS = 200 CHUNK_HARD_MAX = 280 # under chatterbox's 300-char per-segment ceiling def _split_text(text: str) -> list[str]: """Split into chunks of ~CHUNK_TARGET_CHARS at sentence then word boundaries.""" text = text.strip() if not text: return [] if len(text) <= CHUNK_HARD_MAX: return [text] # First pass: sentence boundaries sentences = re.split(r"(?<=[.!?])\s+", text) chunks: list[str] = [] current = "" for sent in sentences: if not sent.strip(): continue if len(current) + 1 + len(sent) <= CHUNK_TARGET_CHARS: current = f"{current} {sent}".strip() if current else sent else: if current: chunks.append(current) # Sentence itself may exceed target — break it on words if len(sent) > CHUNK_HARD_MAX: words = sent.split() buf = "" for w in words: if len(buf) + 1 + len(w) > CHUNK_HARD_MAX: if buf: chunks.append(buf) buf = w else: buf = f"{buf} {w}".strip() if buf else w if buf: current = buf else: current = "" else: current = sent if current: chunks.append(current) return chunks def _build_segments(chunks: list[str], chunk_secs: float = 8.0) -> list[dict]: """Construct segment dicts for synthesise_segments — fake timing windows.""" segs = [] cursor = 0.0 for text in chunks: # Allocate a generous window so _trim_to_duration doesn't clip output. # Headroom is 1.4× so 8s window allows up to ~11s of audio per chunk. segs.append({ "start": cursor, "end": cursor + chunk_secs, "text": text, "translated_text": text, "tts_text": text, }) cursor += chunk_secs return segs def _concat_wavs_to_mp3(wav_paths: list[Path], dest: Path) -> Path: """Concat in order via ffmpeg concat demuxer, then encode MP3.""" if not wav_paths: raise RuntimeError("No TTS chunks to concatenate.") if len(wav_paths) == 1: cmd = [ "ffmpeg", "-y", "-i", str(wav_paths[0]), "-codec:a", "libmp3lame", "-b:a", "192k", str(dest), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode != 0: raise RuntimeError(f"ffmpeg encode failed: {result.stderr[-300:]}") return dest list_file = dest.with_suffix(".txt") list_file.write_text( "\n".join(f"file '{p.as_posix()}'" for p in wav_paths), encoding="utf-8", ) cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(list_file), "-codec:a", "libmp3lame", "-b:a", "192k", str(dest), ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) list_file.unlink(missing_ok=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg concat failed: {result.stderr[-300:]}") return dest def clone_voice( *, sample_path: Path, text: str, out_dir: Path, language_id: str = "en", ) -> dict: """ Run TTS on `text` using the voice from `sample_path`. Returns: { "filename": "voice.mp3", "engine": , "chunks": , } """ text = (text or "").strip() if not text: raise ValueError("Text is required.") chunks = _split_text(text) segments = _build_segments(chunks) # Normalise the sample (handles video, mp3, m4a, etc.) → 24kHz mono WAV prepared_sample = _prepare_sample(sample_path, out_dir) # Demucs source separation → isolate vocals so the clone doesn't pick up # background music or ambient noise. Same step the dub pipeline uses. reference_for_tts = _isolate_vocals(prepared_sample, out_dir) seg_out_dir = out_dir / "tts" seg_out_dir.mkdir(parents=True, exist_ok=True) tts_result = None for msg in synthesise_segments( segments=segments, reference_audio_path=str(reference_for_tts), language_id=language_id, output_dir=str(seg_out_dir), ): if isinstance(msg, dict) and "__TTS_RESULT__" in msg: tts_result = msg["__TTS_RESULT__"] if not tts_result: raise RuntimeError("TTS produced no output.") wav_paths = [Path(seg["tts_path"]) for seg in tts_result if seg.get("tts_path")] if not wav_paths: raise RuntimeError("TTS result missing audio paths.") mp3_path = _concat_wavs_to_mp3(wav_paths, out_dir / "voice.mp3") return { "filename": mp3_path.name, "engine": os.getenv("TTS_ENGINE", "chatterbox").lower(), "chunks": len(chunks), }