""" services/tts.py — Ultra Low-Latency Dual TTS Backend FIX-ISSUE4 (Normal-speed TTS): • Default rate is now slightly faster than normal for a more natural conversational pace. • split_sentences() now splits on ALL clause delimiters (commas, colons, em-dashes) in addition to sentence endings, so synthesis tasks are smaller and start sooner. This pairs with streaming.py's 2–3 word flush threshold for maximum low-latency playback. • Parts are synthesised sequentially to guarantee word order in playback. """ from dotenv import load_dotenv import os, re, asyncio load_dotenv() USE_ELEVENLABS = True EDGE_VOICE = "bn-BD-NabanitaNeural" ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY", "") ELEVENLABS_VOICE_ID = os.getenv("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") ELEVENLABS_MODEL_ID = os.getenv("ELEVENLABS_MODEL_ID", "eleven_multilingual_v2") def _clamp(v: float, lo: float, hi: float) -> float: return max(lo, min(hi, v)) def _parse_pct(text: str) -> float: """ Parse strings like '+10%', '-5%', '12%' into a multiplier delta. Returns 0.0 when empty/invalid. """ raw = (text or "").strip() if not raw: return 0.0 if raw.endswith("%"): raw = raw[:-1].strip() try: return float(raw) / 100.0 except Exception: return 0.0 # ElevenLabs speed configuration: # - `ELEVENLABS_SPEED` is the base speed (1.0 ≈ normal, >1.0 = faster). # - `ELEVENLABS_SPEED_PCT` is an optional relative adjustment like "+10%" / "-5%". # Effective speed = base * (1 + pct). # - `ELEVENLABS_SPEED_MAX` sets the upper clamp (default 3.0). If your ElevenLabs # model/voice rejects high values, lower this (e.g. 2.5). _ELEVEN_BASE_SPEED = float(os.getenv("ELEVENLABS_SPEED", "2.8")) _ELEVEN_SPEED_PCT = _parse_pct(os.getenv("ELEVENLABS_SPEED_PCT", "0%")) _ELEVEN_SPEED_MAX = float(os.getenv("ELEVENLABS_SPEED_MAX", "3.5")) ELEVENLABS_SPEED = _clamp(_ELEVEN_BASE_SPEED * (1.0 + _ELEVEN_SPEED_PCT), 0.5, _ELEVEN_SPEED_MAX) ELEVENLABS_OUTPUT_FORMAT = "mp3_22050_32" ELEVENLABS_STABILITY = 0.45 ELEVENLABS_SIMILARITY = 0.80 ELEVENLABS_STYLE = 0.35 ELEVENLABS_SPEAKER_BOOST = True try: import edge_tts # type: ignore EDGE_TTS_AVAILABLE = True except Exception: edge_tts = None EDGE_TTS_AVAILABLE = False print("[TTS] edge_tts not available; will fall back to ElevenLabs if possible") if USE_ELEVENLABS and not ELEVENLABS_API_KEY: raise RuntimeError("[TTS] ELEVENLABS_API_KEY missing") if not EDGE_TTS_AVAILABLE and not ELEVENLABS_API_KEY: raise RuntimeError("[TTS] Neither edge_tts nor ELEVENLABS_API_KEY is available") print( f"[TTS] Backend: {'ElevenLabs' if USE_ELEVENLABS else 'Edge-TTS'} | " f"edge rate: +18% | eleven speed: {ELEVENLABS_SPEED:.2f} (base {_ELEVEN_BASE_SPEED:.2f}, pct {_ELEVEN_SPEED_PCT:+.0%})" ) def split_sentences(text: str) -> list[str]: """ Split text into small synthesis chunks for low-latency streaming. FIX-ISSUE4: Split on sentence boundaries AND clause boundaries so each TTS task is small (a phrase, not a full sentence). This allows synthesis to start sooner for later parts of a long response. """ # Strip any emotion/tone tags like "[calm]" "[neutral]" etc. These are # intended for UI display and can degrade/break some TTS backends. text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text) text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text) text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text) text = re.sub(r"[ \t]{2,}", " ", text).strip("\n\r\t") if not text: return [] # Split on sentence-ending punctuation AND clause delimiters # The lookbehind keeps the delimiter attached to the preceding chunk. parts = re.split(r'(?<=[।.!?,;:—–])\s+', text) # Filter out anything too short to synthesise (punctuation-only fragments) return [p.strip() for p in parts if len(p.strip()) > 1] async def _edge_tts_stream(text: str, voice: str = EDGE_VOICE, rate: str = "+22%"): """ Stream Edge-TTS audio for a single text chunk. Default rate is slightly faster than normal. """ if edge_tts is None: raise RuntimeError("edge_tts is not installed") text = text.strip("\n\r\t") if not text: return try: async for chunk in edge_tts.Communicate(text, voice, rate=rate).stream(): if chunk["type"] == "audio": yield chunk["data"] await asyncio.sleep(0) except Exception as exc: print(f"[TTS][Edge] {exc}") raise async def _elevenlabs_stream( text: str, voice_id: str = ELEVENLABS_VOICE_ID, model_id: str = ELEVENLABS_MODEL_ID, output_format: str = ELEVENLABS_OUTPUT_FORMAT, speed: float = ELEVENLABS_SPEED, stability: float = ELEVENLABS_STABILITY, similarity: float = ELEVENLABS_SIMILARITY, style: float = ELEVENLABS_STYLE, speaker_boost: bool = ELEVENLABS_SPEAKER_BOOST, ): import httpx text = text.strip("\n\r\t") if not text: return # Reduce unnatural pauses for short streamed chunks. # ElevenLabs adds strong pauses on sentence-ending punctuation; for # low-latency streaming we prefer faster turn-taking. text = re.sub(r"[।.!?,;:—–]+$", "", text).strip("\n\r\t") url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}/stream" headers = { "xi-api-key": ELEVENLABS_API_KEY, "Content-Type": "application/json", "Accept": "audio/mpeg", } payload = { "text": text, "model_id": model_id, "voice_settings": { "stability": stability, "similarity_boost": similarity, "style": style, "use_speaker_boost": speaker_boost, "speed": speed, }, } try: got_any = False async with httpx.AsyncClient(timeout=httpx.Timeout(10.0, connect=5.0, read=None)) as client: async with client.stream( "POST", url, headers=headers, json=payload, params={"output_format": output_format} ) as resp: if resp.status_code != 200: raise RuntimeError(f"[TTS][ElevenLabs] HTTP {resp.status_code}") async for chunk in resp.aiter_bytes(chunk_size=512): if chunk: got_any = True yield chunk await asyncio.sleep(0) if not got_any: raise RuntimeError("[TTS][ElevenLabs] No audio received") except Exception as exc: print(f"[TTS][ElevenLabs] {exc}") raise async def text_to_speech_stream( text: str, voice: str | None = None, rate: str = "+22%", ): """ Stream TTS audio for `text`. Splits text into small clause-level parts, synthesises each part in order, and yields one complete audio blob per part in order. IMPORTANT: The browser playback path uses decodeAudioData(), which expects a self-contained audio buffer. Forwarding provider stream fragments directly causes decode buffering/stalls on the client. We therefore accumulate each phrase's bytes and only emit it once the part is fully synthesised. The phrases are kept intentionally small by services/streaming.py, so latency remains low. """ # Preserve normal spaces inside/around streamed phrase chunks; don't # aggressively trim because it can glue words across chunk boundaries # (e.g. "দিয়ে" + "আপনার" → "দিয়েআপনার"). text = text.strip("\n\r\t") if not text: return voice_to_use = voice or (ELEVENLABS_VOICE_ID if USE_ELEVENLABS else EDGE_VOICE) parts = split_sentences(text) if not parts: return _SENT = object() # sentinel async def _synth_part(part: str, q: asyncio.Queue): buf = bytearray() backend_ok = False try: if USE_ELEVENLABS: async for chunk in _elevenlabs_stream(part, voice_id=voice_to_use): buf.extend(chunk) else: async for chunk in _edge_tts_stream(part, voice=voice_to_use, rate=rate): buf.extend(chunk) backend_ok = True if buf: await q.put(bytes(buf)) except Exception as exc: print(f"[TTS] synth error: {exc}") # Primary backend failed. Try the other backend before giving up. try: buf.clear() if USE_ELEVENLABS: if EDGE_TTS_AVAILABLE: async for chunk in _edge_tts_stream(part, voice=EDGE_VOICE, rate=rate): buf.extend(chunk) elif ELEVENLABS_API_KEY: async for chunk in _elevenlabs_stream(part, voice_id=ELEVENLABS_VOICE_ID): buf.extend(chunk) else: if ELEVENLABS_API_KEY: async for chunk in _elevenlabs_stream(part, voice_id=ELEVENLABS_VOICE_ID): buf.extend(chunk) elif EDGE_TTS_AVAILABLE: async for chunk in _edge_tts_stream(part, voice=EDGE_VOICE, rate=rate): buf.extend(chunk) backend_ok = bool(buf) if buf: await q.put(bytes(buf)) except Exception as fallback_exc: print(f"[TTS] fallback synth error: {fallback_exc}") finally: if not backend_ok and not buf: print(f"[TTS] no audio produced for chunk: {part[:60]!r}") await q.put(_SENT) # Sequential synthesis guarantees exact playback order. for part in parts: q: asyncio.Queue = asyncio.Queue() await _synth_part(part, q) while True: chunk = await q.get() if chunk is _SENT: break yield chunk