| """ |
| VoiceVerse AI β TTS Module. |
| |
| Primary: Qwen3-TTS via HF Inference API |
| Fallback: Edge-TTS (CPU, no key needed) |
| |
| Voice + audio style per mode: |
| Summary β neutral female voice, normal rate |
| Podcast β HOST_1 female (AriaNeural) / HOST_2 male (GuyNeural) |
| Rap β male voice, faster rate (+40%), bass boost via pydub |
| Song β female voice, normal rate |
| Debate β DEBATER_A female (AriaNeural, +8%) / DEBATER_B male (GuyNeural, -5%) |
| Story β female voice, slow rate (-30%), long silence gaps between sentences |
| """ |
|
|
| import os |
| import re |
| import asyncio |
| from utils import logger, get_temp_filepath |
|
|
| QWEN_TTS_MODEL = "Qwen/Qwen3-TTS" |
| TTS_MAX_CHARS = 3000 |
|
|
| |
| |
| EDGE_VOICE_FEMALE = "en-US-AriaNeural" |
|
|
| |
| EDGE_VOICE_HOST_FEMALE = "en-US-AriaNeural" |
| EDGE_VOICE_HOST_MALE = "en-US-GuyNeural" |
|
|
| |
| EDGE_VOICE_RAP = "en-US-GuyNeural" |
| RAP_RATE = "+40%" |
|
|
| |
| EDGE_VOICE_DEBATER_A = "en-US-AriaNeural" |
| EDGE_VOICE_DEBATER_B = "en-US-GuyNeural" |
| DEBATE_RATE_A = "+8%" |
| DEBATE_RATE_B = "-5%" |
|
|
| |
| EDGE_VOICE_STORY = "en-US-AriaNeural" |
| STORY_RATE = "-30%" |
|
|
|
|
| |
| |
| |
|
|
| def _qwen_tts(text: str) -> str | None: |
| token = os.environ.get("HF_TOKEN") |
| if not token: |
| return None |
| try: |
| from huggingface_hub import InferenceClient |
| client = InferenceClient(token=token) |
| audio_bytes = client.text_to_speech(text=text[:TTS_MAX_CHARS], model=QWEN_TTS_MODEL) |
| if not audio_bytes: |
| return None |
| path = get_temp_filepath(suffix=".wav") |
| with open(path, "wb") as f: |
| f.write(audio_bytes) |
| logger.info("Qwen TTS: %s (%d bytes)", path, len(audio_bytes)) |
| return path |
| except Exception as e: |
| logger.warning("Qwen TTS failed: %s", e) |
| return None |
|
|
|
|
| def _edge_tts(text: str, voice: str = EDGE_VOICE_FEMALE, rate: str = "+0%", pitch: str = "+0Hz") -> str: |
| """ |
| Generate audio via Edge-TTS. |
| rate: SSML prosody rate string, e.g. "+40%" faster, "-30%" slower. |
| pitch: SSML prosody pitch string, e.g. "+50Hz" higher, "-50Hz" lower. |
| """ |
| import edge_tts |
| path = get_temp_filepath(suffix=".mp3") |
| snippet = text[:TTS_MAX_CHARS] |
|
|
| async def _run(): |
| communicate = edge_tts.Communicate(snippet, voice, rate=rate, pitch=pitch) |
| await communicate.save(path) |
|
|
| try: |
| loop = asyncio.get_event_loop() |
| if loop.is_running(): |
| import concurrent.futures |
| with concurrent.futures.ThreadPoolExecutor() as pool: |
| pool.submit(asyncio.run, _run()).result(timeout=120) |
| else: |
| loop.run_until_complete(_run()) |
| except RuntimeError: |
| asyncio.run(_run()) |
|
|
| if os.path.getsize(path) == 0: |
| raise RuntimeError("Edge-TTS produced an empty audio file.") |
| logger.info("Edge-TTS: %s (voice=%s rate=%s)", path, voice, rate) |
| return path |
|
|
|
|
| |
| |
| |
|
|
| def _apply_rap_fx(path: str) -> str: |
| """ |
| Apply bass boost to a rap audio file using pydub. |
| Low-frequency boost makes it sound punchier and more rap-like. |
| Returns path to processed file (new file). |
| """ |
| try: |
| from pydub import AudioSegment |
| from pydub.effects import low_pass_filter |
|
|
| audio = AudioSegment.from_file(path) |
|
|
| |
| bass = low_pass_filter(audio, 200) |
| highs = audio - low_pass_filter(audio, 200) |
|
|
| |
| boosted = (bass + 10).overlay(highs) |
|
|
| out = get_temp_filepath(suffix=".mp3") |
| boosted.export(out, format="mp3") |
| logger.info("Rap bass boost applied β %s", out) |
| return out |
| except Exception as e: |
| logger.warning("Rap FX failed (%s) β returning original audio", e) |
| return path |
|
|
|
|
| def _concat(paths: list[str], silence_ms: int = 300) -> str: |
| """Concatenate audio files with silence between each segment.""" |
| if len(paths) == 1: |
| return paths[0] |
| try: |
| from pydub import AudioSegment |
| combined = AudioSegment.empty() |
| silence = AudioSegment.silent(duration=silence_ms) |
| for p in paths: |
| combined += AudioSegment.from_file(p) + silence |
| out = get_temp_filepath(suffix=".mp3") |
| combined.export(out, format="mp3") |
| logger.info("Concatenated %d segments β %s", len(paths), out) |
| return out |
| except Exception as e: |
| logger.warning("pydub concat failed (%s) β trying ffmpeg fallback", e) |
| return _concat_ffmpeg(paths) |
|
|
|
|
| def _concat_ffmpeg(paths: list[str]) -> str: |
| """Fallback: concatenate audio files using ffmpeg directly via subprocess.""" |
| import subprocess |
| import tempfile |
|
|
| out = get_temp_filepath(suffix=".mp3") |
|
|
| |
| list_path = get_temp_filepath(suffix=".txt") |
| with open(list_path, "w") as f: |
| for p in paths: |
| f.write(f"file '{p}'\n") |
|
|
| try: |
| subprocess.run( |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", |
| "-i", list_path, "-c", "copy", out], |
| check=True, capture_output=True, timeout=120, |
| ) |
| logger.info("ffmpeg concat: %d segments β %s", len(paths), out) |
| return out |
| except Exception as e2: |
| logger.warning("ffmpeg concat also failed (%s) β returning first segment", e2) |
| return paths[0] |
|
|
|
|
| def _add_story_gaps(path: str) -> str: |
| """ |
| Insert longer silence gaps between sentences in story audio. |
| Gives the warm, unhurried feel of a storyteller. |
| """ |
| try: |
| from pydub import AudioSegment |
| audio = AudioSegment.from_file(path) |
| gap = AudioSegment.silent(duration=600) |
| |
| chunk_ms = 5000 |
| chunks = [audio[i:i + chunk_ms] for i in range(0, len(audio), chunk_ms)] |
| combined = AudioSegment.empty() |
| for chunk in chunks: |
| combined += chunk + gap |
| out = get_temp_filepath(suffix=".mp3") |
| combined.export(out, format="mp3") |
| logger.info("Story gaps applied β %s", out) |
| return out |
| except Exception as e: |
| logger.warning("Story gap insertion failed (%s) β returning original", e) |
| return path |
|
|
|
|
| |
| |
| |
|
|
| def _parse_dialogue(script: str, tag_a: str, tag_b: str) -> list[tuple[str, str]]: |
| """Parse a HOST_X / DEBATER_X tagged script into (speaker, text) segments.""" |
| segments: list[tuple[str, str]] = [] |
| prefix_a = f"{tag_a}:" |
| prefix_b = f"{tag_b}:" |
|
|
| for line in script.splitlines(): |
| line = line.strip() |
| if line.startswith(prefix_a): |
| text = line[len(prefix_a):].strip() |
| if text: |
| if segments and segments[-1][0] == tag_a: |
| segments[-1] = (tag_a, segments[-1][1] + " " + text) |
| else: |
| segments.append((tag_a, text)) |
| elif line.startswith(prefix_b): |
| text = line[len(prefix_b):].strip() |
| if text: |
| if segments and segments[-1][0] == tag_b: |
| segments[-1] = (tag_b, segments[-1][1] + " " + text) |
| else: |
| segments.append((tag_b, text)) |
| return segments |
|
|
|
|
| |
| |
| |
|
|
| def generate_audio_podcast(script: str) -> tuple[str, str]: |
| """ |
| Podcast: ALEX = female (AriaNeural), SAM = male (GuyNeural). |
| Normal conversational rate, 300 ms silence between turns. |
| """ |
| segments = _parse_dialogue(script, "ALEX", "SAM") |
| if not segments: |
| logger.warning("No ALEX/SAM tags β falling back to single voice") |
| return generate_audio(script) |
|
|
| voice_map = { |
| "ALEX": (EDGE_VOICE_HOST_FEMALE, "+0%"), |
| "SAM": (EDGE_VOICE_HOST_MALE, "+0%"), |
| } |
| paths = [] |
| for speaker, text in segments: |
| voice, rate = voice_map[speaker] |
| try: |
| paths.append(_edge_tts(text, voice=voice, rate=rate)) |
| except Exception as e: |
| logger.warning("Podcast segment failed %s: %s", speaker, e) |
|
|
| if not paths: |
| raise RuntimeError("All podcast segments failed.") |
| return _concat(paths, silence_ms=300), "Edge-TTS (Podcast)" |
|
|
|
|
| def generate_audio_debate(script: str) -> tuple[str, str]: |
| """ |
| Debate: MAYA = female (AriaNeural, assertive +8%), |
| RYAN = male (GuyNeural, deliberate -5%). |
| 400 ms silence between turns for debate feel. |
| """ |
| segments = _parse_dialogue(script, "MAYA", "RYAN") |
| if not segments: |
| logger.warning("No MAYA/RYAN tags β falling back to single voice") |
| return generate_audio(script) |
|
|
| voice_map = { |
| "MAYA": (EDGE_VOICE_DEBATER_A, DEBATE_RATE_A), |
| "RYAN": (EDGE_VOICE_DEBATER_B, DEBATE_RATE_B), |
| } |
| paths = [] |
| for speaker, text in segments: |
| voice, rate = voice_map[speaker] |
| try: |
| paths.append(_edge_tts(text, voice=voice, rate=rate)) |
| except Exception as e: |
| logger.warning("Debate segment failed %s: %s", speaker, e) |
|
|
| if not paths: |
| raise RuntimeError("All debate segments failed.") |
| return _concat(paths, silence_ms=400), "Edge-TTS (Debate)" |
|
|
|
|
| def generate_audio_rap(script: str) -> tuple[str, str]: |
| """ |
| Rap: TTS each line separately with short pauses for rhythm, |
| then concatenate and apply bass boost for a punchier sound. |
| """ |
| |
| lines = [ln.strip() for ln in script.splitlines() if ln.strip()] |
|
|
| if len(lines) <= 1: |
| |
| path = _edge_tts(script, voice=EDGE_VOICE_RAP, rate=RAP_RATE) |
| path = _apply_rap_fx(path) |
| return path, "Edge-TTS (Rap)" |
|
|
| |
| paths = [] |
| for line in lines: |
| try: |
| paths.append(_edge_tts(line, voice=EDGE_VOICE_RAP, rate=RAP_RATE)) |
| except Exception as e: |
| logger.warning("Rap line TTS failed: %s", e) |
|
|
| if not paths: |
| raise RuntimeError("All rap line TTS failed.") |
|
|
| |
| combined = _concat(paths, silence_ms=200) |
| |
| combined = _apply_rap_fx(combined) |
| return combined, "Edge-TTS (Rap)" |
|
|
|
|
| def generate_audio_story(script: str) -> tuple[str, str]: |
| """ |
| Story: female voice, slow rate (-30%), then sentence gaps widened via pydub. |
| """ |
| path = _edge_tts(script, voice=EDGE_VOICE_STORY, rate=STORY_RATE) |
| path = _add_story_gaps(path) |
| return path, "Edge-TTS (Story)" |
|
|
|
|
| |
| |
| |
|
|
| def apply_pitch_shift(path: str, pitch_semitones: float) -> str: |
| """ |
| Shift pitch of an audio file by the given number of semitones using pydub. |
| Positive = higher pitch, negative = lower pitch. |
| Returns path to new file, or original if processing fails. |
| """ |
| if abs(pitch_semitones) < 0.1: |
| return path |
| try: |
| from pydub import AudioSegment |
| audio = AudioSegment.from_file(path) |
| |
| factor = 2 ** (pitch_semitones / 12.0) |
| new_sample_rate = int(audio.frame_rate * factor) |
| shifted = audio._spawn(audio.raw_data, overrides={"frame_rate": new_sample_rate}) |
| |
| shifted = shifted.set_frame_rate(audio.frame_rate) |
| out = get_temp_filepath(suffix=".mp3") |
| shifted.export(out, format="mp3") |
| logger.info("Pitch shifted by %.1f semitones β %s", pitch_semitones, out) |
| return out |
| except Exception as e: |
| logger.warning("Pitch shift failed (%s) β returning original", e) |
| return path |
|
|
|
|
| def generate_audio(text: str, voice_id: str | None = None) -> tuple[str, str]: |
| """Single-voice TTS for Summary and Song modes. Tries Qwen first.""" |
| if not text or not text.strip(): |
| raise ValueError("No text provided for audio generation.") |
| path = _qwen_tts(text) |
| if path and os.path.exists(path): |
| return path, "Qwen3-TTS" |
| return _edge_tts(text, voice=voice_id or EDGE_VOICE_FEMALE), "Edge-TTS" |
|
|