rakib72642's picture
Enhance text processing for TTS: strip emotion tags and improve whitespace handling; add full text recovery for LLM responses.
2d19124
"""
services/tts.py — Ultra Low-Latency Dual TTS Backend
FIX-ISSUE4 (Normal-speed TTS):
• Default rate is now slightly faster than normal for a more natural
conversational pace.
• split_sentences() now splits on ALL clause delimiters (commas, colons,
em-dashes) in addition to sentence endings, so synthesis tasks are
smaller and start sooner. This pairs with streaming.py's 2–3 word
flush threshold for maximum low-latency playback.
• Parts are synthesised sequentially to guarantee word order in playback.
"""
from dotenv import load_dotenv
import os, re, asyncio
load_dotenv()
USE_ELEVENLABS = True
EDGE_VOICE = "bn-BD-NabanitaNeural"
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY", "")
ELEVENLABS_VOICE_ID = os.getenv("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM")
ELEVENLABS_MODEL_ID = os.getenv("ELEVENLABS_MODEL_ID", "eleven_multilingual_v2")
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def _parse_pct(text: str) -> float:
"""
Parse strings like '+10%', '-5%', '12%' into a multiplier delta.
Returns 0.0 when empty/invalid.
"""
raw = (text or "").strip()
if not raw:
return 0.0
if raw.endswith("%"):
raw = raw[:-1].strip()
try:
return float(raw) / 100.0
except Exception:
return 0.0
# ElevenLabs speed configuration:
# - `ELEVENLABS_SPEED` is the base speed (1.0 ≈ normal, >1.0 = faster).
# - `ELEVENLABS_SPEED_PCT` is an optional relative adjustment like "+10%" / "-5%".
# Effective speed = base * (1 + pct).
# - `ELEVENLABS_SPEED_MAX` sets the upper clamp (default 3.0). If your ElevenLabs
# model/voice rejects high values, lower this (e.g. 2.5).
_ELEVEN_BASE_SPEED = float(os.getenv("ELEVENLABS_SPEED", "2.8"))
_ELEVEN_SPEED_PCT = _parse_pct(os.getenv("ELEVENLABS_SPEED_PCT", "0%"))
_ELEVEN_SPEED_MAX = float(os.getenv("ELEVENLABS_SPEED_MAX", "3.5"))
ELEVENLABS_SPEED = _clamp(_ELEVEN_BASE_SPEED * (1.0 + _ELEVEN_SPEED_PCT), 0.5, _ELEVEN_SPEED_MAX)
ELEVENLABS_OUTPUT_FORMAT = "mp3_22050_32"
ELEVENLABS_STABILITY = 0.45
ELEVENLABS_SIMILARITY = 0.80
ELEVENLABS_STYLE = 0.35
ELEVENLABS_SPEAKER_BOOST = True
try:
import edge_tts # type: ignore
EDGE_TTS_AVAILABLE = True
except Exception:
edge_tts = None
EDGE_TTS_AVAILABLE = False
print("[TTS] edge_tts not available; will fall back to ElevenLabs if possible")
if USE_ELEVENLABS and not ELEVENLABS_API_KEY:
raise RuntimeError("[TTS] ELEVENLABS_API_KEY missing")
if not EDGE_TTS_AVAILABLE and not ELEVENLABS_API_KEY:
raise RuntimeError("[TTS] Neither edge_tts nor ELEVENLABS_API_KEY is available")
print(
f"[TTS] Backend: {'ElevenLabs' if USE_ELEVENLABS else 'Edge-TTS'} | "
f"edge rate: +18% | eleven speed: {ELEVENLABS_SPEED:.2f} (base {_ELEVEN_BASE_SPEED:.2f}, pct {_ELEVEN_SPEED_PCT:+.0%})"
)
def split_sentences(text: str) -> list[str]:
"""
Split text into small synthesis chunks for low-latency streaming.
FIX-ISSUE4: Split on sentence boundaries AND clause boundaries so each
TTS task is small (a phrase, not a full sentence). This allows synthesis
to start sooner for later parts of a long response.
"""
# Strip any emotion/tone tags like "[calm]" "[neutral]" etc. These are
# intended for UI display and can degrade/break some TTS backends.
text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text)
text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text)
text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text)
text = re.sub(r"[ \t]{2,}", " ", text).strip("\n\r\t")
if not text:
return []
# Split on sentence-ending punctuation AND clause delimiters
# The lookbehind keeps the delimiter attached to the preceding chunk.
parts = re.split(r'(?<=[।.!?,;:—–])\s+', text)
# Filter out anything too short to synthesise (punctuation-only fragments)
return [p.strip() for p in parts if len(p.strip()) > 1]
async def _edge_tts_stream(text: str, voice: str = EDGE_VOICE, rate: str = "+22%"):
"""
Stream Edge-TTS audio for a single text chunk.
Default rate is slightly faster than normal.
"""
if edge_tts is None:
raise RuntimeError("edge_tts is not installed")
text = text.strip("\n\r\t")
if not text:
return
try:
async for chunk in edge_tts.Communicate(text, voice, rate=rate).stream():
if chunk["type"] == "audio":
yield chunk["data"]
await asyncio.sleep(0)
except Exception as exc:
print(f"[TTS][Edge] {exc}")
raise
async def _elevenlabs_stream(
text: str,
voice_id: str = ELEVENLABS_VOICE_ID,
model_id: str = ELEVENLABS_MODEL_ID,
output_format: str = ELEVENLABS_OUTPUT_FORMAT,
speed: float = ELEVENLABS_SPEED,
stability: float = ELEVENLABS_STABILITY,
similarity: float = ELEVENLABS_SIMILARITY,
style: float = ELEVENLABS_STYLE,
speaker_boost: bool = ELEVENLABS_SPEAKER_BOOST,
):
import httpx
text = text.strip("\n\r\t")
if not text:
return
# Reduce unnatural pauses for short streamed chunks.
# ElevenLabs adds strong pauses on sentence-ending punctuation; for
# low-latency streaming we prefer faster turn-taking.
text = re.sub(r"[।.!?,;:—–]+$", "", text).strip("\n\r\t")
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}/stream"
headers = {
"xi-api-key": ELEVENLABS_API_KEY,
"Content-Type": "application/json",
"Accept": "audio/mpeg",
}
payload = {
"text": text,
"model_id": model_id,
"voice_settings": {
"stability": stability,
"similarity_boost": similarity,
"style": style,
"use_speaker_boost": speaker_boost,
"speed": speed,
},
}
try:
got_any = False
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0, connect=5.0, read=None)) as client:
async with client.stream(
"POST", url, headers=headers, json=payload,
params={"output_format": output_format}
) as resp:
if resp.status_code != 200:
raise RuntimeError(f"[TTS][ElevenLabs] HTTP {resp.status_code}")
async for chunk in resp.aiter_bytes(chunk_size=512):
if chunk:
got_any = True
yield chunk
await asyncio.sleep(0)
if not got_any:
raise RuntimeError("[TTS][ElevenLabs] No audio received")
except Exception as exc:
print(f"[TTS][ElevenLabs] {exc}")
raise
async def text_to_speech_stream(
text: str,
voice: str | None = None,
rate: str = "+22%",
):
"""
Stream TTS audio for `text`.
Splits text into small clause-level parts, synthesises each part in order,
and yields one complete audio blob per part in order.
IMPORTANT:
The browser playback path uses decodeAudioData(), which expects a
self-contained audio buffer. Forwarding provider stream fragments
directly causes decode buffering/stalls on the client. We therefore
accumulate each phrase's bytes and only emit it once the part is fully
synthesised. The phrases are kept intentionally small by
services/streaming.py, so latency remains low.
"""
# Preserve normal spaces inside/around streamed phrase chunks; don't
# aggressively trim because it can glue words across chunk boundaries
# (e.g. "দিয়ে" + "আপনার" → "দিয়েআপনার").
text = text.strip("\n\r\t")
if not text:
return
voice_to_use = voice or (ELEVENLABS_VOICE_ID if USE_ELEVENLABS else EDGE_VOICE)
parts = split_sentences(text)
if not parts:
return
_SENT = object() # sentinel
async def _synth_part(part: str, q: asyncio.Queue):
buf = bytearray()
backend_ok = False
try:
if USE_ELEVENLABS:
async for chunk in _elevenlabs_stream(part, voice_id=voice_to_use):
buf.extend(chunk)
else:
async for chunk in _edge_tts_stream(part, voice=voice_to_use, rate=rate):
buf.extend(chunk)
backend_ok = True
if buf:
await q.put(bytes(buf))
except Exception as exc:
print(f"[TTS] synth error: {exc}")
# Primary backend failed. Try the other backend before giving up.
try:
buf.clear()
if USE_ELEVENLABS:
if EDGE_TTS_AVAILABLE:
async for chunk in _edge_tts_stream(part, voice=EDGE_VOICE, rate=rate):
buf.extend(chunk)
elif ELEVENLABS_API_KEY:
async for chunk in _elevenlabs_stream(part, voice_id=ELEVENLABS_VOICE_ID):
buf.extend(chunk)
else:
if ELEVENLABS_API_KEY:
async for chunk in _elevenlabs_stream(part, voice_id=ELEVENLABS_VOICE_ID):
buf.extend(chunk)
elif EDGE_TTS_AVAILABLE:
async for chunk in _edge_tts_stream(part, voice=EDGE_VOICE, rate=rate):
buf.extend(chunk)
backend_ok = bool(buf)
if buf:
await q.put(bytes(buf))
except Exception as fallback_exc:
print(f"[TTS] fallback synth error: {fallback_exc}")
finally:
if not backend_ok and not buf:
print(f"[TTS] no audio produced for chunk: {part[:60]!r}")
await q.put(_SENT)
# Sequential synthesis guarantees exact playback order.
for part in parts:
q: asyncio.Queue = asyncio.Queue()
await _synth_part(part, q)
while True:
chunk = await q.get()
if chunk is _SENT:
break
yield chunk