Spaces:
Sleeping
Sleeping
| """ | |
| VoiceVerse AI β TTS Module. | |
| Primary: Qwen3-TTS via HF Inference API | |
| Fallback: Edge-TTS (CPU, no key needed) | |
| Voice + audio style per mode: | |
| Summary β neutral female voice, normal rate | |
| Podcast β HOST_1 female (AriaNeural) / HOST_2 male (GuyNeural) | |
| Rap β male voice, faster rate (+40%), bass boost via pydub | |
| Song β female voice, normal rate | |
| Debate β DEBATER_A female (JennyNeural, +5%) / DEBATER_B male (DavisNeural, -5%) | |
| Story β female voice, slow rate (-30%), long silence gaps between sentences | |
| """ | |
| import os | |
| import re | |
| import asyncio | |
| from utils import logger, get_temp_filepath | |
| QWEN_TTS_MODEL = "Qwen/Qwen3-TTS" | |
| TTS_MAX_CHARS = 3000 | |
| # ββ Voice assignments βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Summary / Song / Story β single female voice | |
| EDGE_VOICE_FEMALE = "en-US-AriaNeural" | |
| # Podcast | |
| EDGE_VOICE_HOST_FEMALE = "en-US-AriaNeural" # HOST_1 β female | |
| EDGE_VOICE_HOST_MALE = "en-US-GuyNeural" # HOST_2 β male | |
| # Rap β male voice reads the rap | |
| EDGE_VOICE_RAP = "en-US-GuyNeural" | |
| RAP_RATE = "+40%" # fast delivery | |
| # Debate | |
| EDGE_VOICE_DEBATER_A = "en-US-JennyNeural" # female, pro β assertive | |
| EDGE_VOICE_DEBATER_B = "en-US-DavisNeural" # male, con β skeptical | |
| DEBATE_RATE_A = "+8%" # slightly faster | |
| DEBATE_RATE_B = "-5%" # slightly slower, deliberate | |
| # Story β slow, warm delivery | |
| EDGE_VOICE_STORY = "en-US-AriaNeural" | |
| STORY_RATE = "-30%" # noticeably slower | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Low-level TTS helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _qwen_tts(text: str) -> str | None: | |
| token = os.environ.get("HF_TOKEN") | |
| if not token: | |
| return None | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(token=token) | |
| audio_bytes = client.text_to_speech(text=text[:TTS_MAX_CHARS], model=QWEN_TTS_MODEL) | |
| if not audio_bytes: | |
| return None | |
| path = get_temp_filepath(suffix=".wav") | |
| with open(path, "wb") as f: | |
| f.write(audio_bytes) | |
| logger.info("Qwen TTS: %s (%d bytes)", path, len(audio_bytes)) | |
| return path | |
| except Exception as e: | |
| logger.warning("Qwen TTS failed: %s", e) | |
| return None | |
| def _edge_tts(text: str, voice: str = EDGE_VOICE_FEMALE, rate: str = "+0%") -> str: | |
| """ | |
| Generate audio via Edge-TTS. | |
| rate: SSML prosody rate string, e.g. "+40%" faster, "-30%" slower. | |
| """ | |
| import edge_tts | |
| path = get_temp_filepath(suffix=".mp3") | |
| snippet = text[:TTS_MAX_CHARS] | |
| async def _run(): | |
| communicate = edge_tts.Communicate(snippet, voice, rate=rate) | |
| await communicate.save(path) | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as pool: | |
| pool.submit(asyncio.run, _run()).result(timeout=120) | |
| else: | |
| loop.run_until_complete(_run()) | |
| except RuntimeError: | |
| asyncio.run(_run()) | |
| if os.path.getsize(path) == 0: | |
| raise RuntimeError("Edge-TTS produced an empty audio file.") | |
| logger.info("Edge-TTS: %s (voice=%s rate=%s)", path, voice, rate) | |
| return path | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Audio post-processing | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _apply_rap_fx(path: str) -> str: | |
| """ | |
| Apply bass boost to a rap audio file using pydub. | |
| Low-frequency boost makes it sound punchier and more rap-like. | |
| Returns path to processed file (new file). | |
| """ | |
| try: | |
| from pydub import AudioSegment | |
| from pydub.effects import low_pass_filter | |
| audio = AudioSegment.from_file(path) | |
| # Split into bass (low) and mid/high frequencies | |
| bass = low_pass_filter(audio, 200) # frequencies below 200 Hz | |
| highs = audio - low_pass_filter(audio, 200) # everything above | |
| # Boost bass by 6 dB, keep highs as-is, combine | |
| boosted = (bass + 6).overlay(highs) | |
| out = get_temp_filepath(suffix=".mp3") | |
| boosted.export(out, format="mp3") | |
| logger.info("Rap bass boost applied β %s", out) | |
| return out | |
| except Exception as e: | |
| logger.warning("Rap FX failed (%s) β returning original audio", e) | |
| return path | |
| def _concat(paths: list[str], silence_ms: int = 300) -> str: | |
| """Concatenate audio files with silence between each segment.""" | |
| if len(paths) == 1: | |
| return paths[0] | |
| try: | |
| from pydub import AudioSegment | |
| combined = AudioSegment.empty() | |
| silence = AudioSegment.silent(duration=silence_ms) | |
| for p in paths: | |
| combined += AudioSegment.from_file(p) + silence | |
| out = get_temp_filepath(suffix=".mp3") | |
| combined.export(out, format="mp3") | |
| logger.info("Concatenated %d segments β %s", len(paths), out) | |
| return out | |
| except Exception as e: | |
| logger.warning("pydub concat failed (%s) β trying ffmpeg fallback", e) | |
| return _concat_ffmpeg(paths) | |
| def _concat_ffmpeg(paths: list[str]) -> str: | |
| """Fallback: concatenate audio files using ffmpeg directly via subprocess.""" | |
| import subprocess | |
| import tempfile | |
| out = get_temp_filepath(suffix=".mp3") | |
| # Write a concat list file for ffmpeg | |
| list_path = get_temp_filepath(suffix=".txt") | |
| with open(list_path, "w") as f: | |
| for p in paths: | |
| f.write(f"file '{p}'\n") | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", | |
| "-i", list_path, "-c", "copy", out], | |
| check=True, capture_output=True, timeout=120, | |
| ) | |
| logger.info("ffmpeg concat: %d segments β %s", len(paths), out) | |
| return out | |
| except Exception as e2: | |
| logger.warning("ffmpeg concat also failed (%s) β returning first segment", e2) | |
| return paths[0] | |
| def _add_story_gaps(path: str) -> str: | |
| """ | |
| Insert longer silence gaps between sentences in story audio. | |
| Gives the warm, unhurried feel of a storyteller. | |
| """ | |
| try: | |
| from pydub import AudioSegment | |
| audio = AudioSegment.from_file(path) | |
| gap = AudioSegment.silent(duration=600) # 600 ms between sentences | |
| # Split on natural pauses (every ~5 seconds of audio) and re-join with gaps | |
| chunk_ms = 5000 | |
| chunks = [audio[i:i + chunk_ms] for i in range(0, len(audio), chunk_ms)] | |
| combined = AudioSegment.empty() | |
| for chunk in chunks: | |
| combined += chunk + gap | |
| out = get_temp_filepath(suffix=".mp3") | |
| combined.export(out, format="mp3") | |
| logger.info("Story gaps applied β %s", out) | |
| return out | |
| except Exception as e: | |
| logger.warning("Story gap insertion failed (%s) β returning original", e) | |
| return path | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Dialogue script parser | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_dialogue(script: str, tag_a: str, tag_b: str) -> list[tuple[str, str]]: | |
| """Parse a HOST_X / DEBATER_X tagged script into (speaker, text) segments.""" | |
| segments: list[tuple[str, str]] = [] | |
| prefix_a = f"{tag_a}:" | |
| prefix_b = f"{tag_b}:" | |
| for line in script.splitlines(): | |
| line = line.strip() | |
| if line.startswith(prefix_a): | |
| text = line[len(prefix_a):].strip() | |
| if text: | |
| if segments and segments[-1][0] == tag_a: | |
| segments[-1] = (tag_a, segments[-1][1] + " " + text) | |
| else: | |
| segments.append((tag_a, text)) | |
| elif line.startswith(prefix_b): | |
| text = line[len(prefix_b):].strip() | |
| if text: | |
| if segments and segments[-1][0] == tag_b: | |
| segments[-1] = (tag_b, segments[-1][1] + " " + text) | |
| else: | |
| segments.append((tag_b, text)) | |
| return segments | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Per-mode audio generators | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_audio_podcast(script: str) -> tuple[str, str]: | |
| """ | |
| Podcast: HOST_1 = female (AriaNeural), HOST_2 = male (GuyNeural). | |
| Normal conversational rate, 300 ms silence between turns. | |
| """ | |
| segments = _parse_dialogue(script, "HOST_1", "HOST_2") | |
| if not segments: | |
| logger.warning("No HOST tags β falling back to single voice") | |
| return generate_audio(script) | |
| voice_map = { | |
| "HOST_1": (EDGE_VOICE_HOST_FEMALE, "+0%"), | |
| "HOST_2": (EDGE_VOICE_HOST_MALE, "+0%"), | |
| } | |
| paths = [] | |
| for speaker, text in segments: | |
| voice, rate = voice_map[speaker] | |
| try: | |
| paths.append(_edge_tts(text, voice=voice, rate=rate)) | |
| except Exception as e: | |
| logger.warning("Podcast segment failed %s: %s", speaker, e) | |
| if not paths: | |
| raise RuntimeError("All podcast segments failed.") | |
| return _concat(paths, silence_ms=300), "Edge-TTS (Podcast)" | |
| def generate_audio_debate(script: str) -> tuple[str, str]: | |
| """ | |
| Debate: DEBATER_A = female (JennyNeural, assertive +8%), | |
| DEBATER_B = male (DavisNeural, deliberate -5%). | |
| 400 ms silence between turns for debate feel. | |
| """ | |
| segments = _parse_dialogue(script, "DEBATER_A", "DEBATER_B") | |
| if not segments: | |
| logger.warning("No DEBATER tags β falling back to single voice") | |
| return generate_audio(script) | |
| voice_map = { | |
| "DEBATER_A": (EDGE_VOICE_DEBATER_A, DEBATE_RATE_A), | |
| "DEBATER_B": (EDGE_VOICE_DEBATER_B, DEBATE_RATE_B), | |
| } | |
| paths = [] | |
| for speaker, text in segments: | |
| voice, rate = voice_map[speaker] | |
| try: | |
| paths.append(_edge_tts(text, voice=voice, rate=rate)) | |
| except Exception as e: | |
| logger.warning("Debate segment failed %s: %s", speaker, e) | |
| if not paths: | |
| raise RuntimeError("All debate segments failed.") | |
| return _concat(paths, silence_ms=400), "Edge-TTS (Debate)" | |
| def generate_audio_rap(script: str) -> tuple[str, str]: | |
| """ | |
| Rap: male voice, fast rate (+40%), then bass boost applied via pydub. | |
| """ | |
| path = _edge_tts(script, voice=EDGE_VOICE_RAP, rate=RAP_RATE) | |
| path = _apply_rap_fx(path) | |
| return path, "Edge-TTS (Rap)" | |
| def generate_audio_story(script: str) -> tuple[str, str]: | |
| """ | |
| Story: female voice, slow rate (-30%), then sentence gaps widened via pydub. | |
| """ | |
| path = _edge_tts(script, voice=EDGE_VOICE_STORY, rate=STORY_RATE) | |
| path = _add_story_gaps(path) | |
| return path, "Edge-TTS (Story)" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Unified public interface | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_audio(text: str, voice_id: str | None = None) -> tuple[str, str]: | |
| """Single-voice TTS for Summary and Song modes. Tries Qwen first.""" | |
| if not text or not text.strip(): | |
| raise ValueError("No text provided for audio generation.") | |
| path = _qwen_tts(text) | |
| if path and os.path.exists(path): | |
| return path, "Qwen3-TTS" | |
| return _edge_tts(text, voice=voice_id or EDGE_VOICE_FEMALE), "Edge-TTS" | |