Voice-AI-Agent / services /streaming.py
rakib72642's picture
adjusted mobile number problem + numbers problem fixed
16676c4
"""
services/streaming.py — Production-grade parallel TTS streamer
FIX-ISSUE4 (Natural, slow, small-chunk TTS):
The previous code used character-count thresholds that produced large
sentence-level chunks (25–65 chars), causing buffered, robotic-feeling
speech with a burst of audio at once.
New behaviour:
• Flush at word boundaries (2–3 words) for voice-like pacing.
• Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which
corresponds to roughly 2–3 average Bengali/English words.
• Hard limit of 40 chars ensures no chunk ever gets too large.
• Sentence-ending punctuation (।.!?) always flushes immediately
regardless of length, giving natural pause points.
• The TTS rate is slightly faster than neutral in tts.py for a more
conversational pace.
Result: audio arrives in small, fast, overlapping synthesis tasks,
giving a low-latency, smooth, natural speech feel.
FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version.
"""
from __future__ import annotations
import asyncio
import re
from dataclasses import dataclass, field
from typing import AsyncGenerator
from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE
# ── Chunk size tuning ──────────────────────────────────────────────────────────
# These character counts correspond roughly to:
# FIRST_FLUSH_MIN ~2 words (get audio playing ASAP)
# SUBSEQUENT_FLUSH_MIN ~3 words (natural conversational phrase)
# HARD_LIMIT ~6 words (never accumulate more than this)
#
# At average Bengali word length ~4–5 chars + space:
# 10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words
if USE_ELEVENLABS:
# ElevenLabs per-chunk latency is higher; flush smaller chunks so the
# first playable audio arrives sooner and pauses feel shorter.
FIRST_FLUSH_MIN = 8
FIRST_FLUSH_HARD = 18
SUBSEQUENT_FLUSH_MIN = 14
SUBSEQUENT_FLUSH_HARD = 28
else:
FIRST_FLUSH_MIN = 10
FIRST_FLUSH_HARD = 30
SUBSEQUENT_FLUSH_MIN = 18
SUBSEQUENT_FLUSH_HARD = 40
_backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS"
print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}{SUBSEQUENT_FLUSH_HARD} chars")
MIN_CHARS = 2
SENTENCE_BOUNDARIES = frozenset(".!?।॥\n")
CLAUSE_BOUNDARIES = frozenset(",;:—–")
_SENTINEL = object()
_DIGIT_WORDS = {
"0": "শূন্য",
"1": "এক",
"2": "দুই",
"3": "তিন",
"4": "চার",
"5": "পাঁচ",
"6": "ছয়",
"7": "সাত",
"8": "আট",
"9": "নয়",
"০": "শূন্য",
"১": "এক",
"২": "দুই",
"৩": "তিন",
"৪": "চার",
"৫": "পাঁচ",
"৬": "ছয়",
"৭": "সাত",
"৮": "আট",
"৯": "নয়",
"٠": "শূন্য",
"١": "এক",
"٢": "দুই",
"٣": "তিন",
"٤": "চার",
"٥": "পাঁচ",
"٦": "ছয়",
"٧": "সাত",
"٨": "আট",
"٩": "নয়",
}
def _spoken_phone_text(text: str) -> str:
if not text:
return ""
def repl(match: re.Match[str]) -> str:
chunk = match.group(0)
digits = [ch for ch in chunk if ch in _DIGIT_WORDS]
if len(digits) < 10:
return chunk
spoken = " ".join(_DIGIT_WORDS[ch] for ch in digits)
prev_char = text[match.start() - 1] if match.start() > 0 else ""
next_char = text[match.end()] if match.end() < len(text) else ""
if prev_char and not prev_char.isspace() and prev_char not in "([<{\"'":
spoken = " " + spoken
if next_char and not next_char.isspace() and next_char not in ")]>.,!?;:}\"'":
spoken = spoken + " "
return spoken
out = re.sub(r"[+\d০-৯٠-٩][\d০-৯٠-٩\s().\-]{8,}[\d০-৯٠-٩]", repl, text)
return re.sub(r"[ \t]{2,}", " ", out)
def _clean_for_tts(text: str) -> str:
# Strip emotion/tone tags like "[calm]" "[neutral]" "[happy]" etc.
# These are useful for UI but often degrade or break TTS synthesis.
# Remove them wherever they appear, then normalize whitespace.
text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text)
# Also strip orphaned tag fragments that can occur if the streamer flushes
# mid-tag during token streaming (e.g. "[neutral" or "neutral]").
text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text)
text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text)
text = re.sub(r"\*{1,3}", "", text)
text = re.sub(r"#+\s*", "", text)
text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"`+", "", text)
text = re.sub(r"\n{2,}", "\n", text)
# Collapse runs of spaces introduced by tag removal.
text = re.sub(r"[ \t]{2,}", " ", text)
text = _spoken_phone_text(text)
# Keep normal spaces so chunk boundaries don't glue words together.
return text.strip("\n\r\t")
def _flush_reason(buffer: str, first_chunk: bool) -> str | None:
"""
Like _should_flush, but returns the reason so we can preserve spacing
when flushing at a word boundary.
"""
n = len(buffer)
if n == 0:
return None
flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN
hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
if n >= hard_limit:
return "hard"
last_char = buffer[-1]
if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
return "sentence"
if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
return "clause"
if last_char == " " and n >= flush_min:
return "space"
return None
def _should_flush(buffer: str, first_chunk: bool) -> bool:
n = len(buffer)
if n == 0:
return False
flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN
hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
# Hard limit — always flush regardless of boundary
if n >= hard_limit:
return True
last_char = buffer[-1]
# Sentence ending — flush immediately (natural pause point)
if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
return True
# Clause boundary — flush at ~75% of hard limit
if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
return True
# Word boundary (space after minimum words reached)
if last_char == ' ' and n >= flush_min:
return True
return False
@dataclass
class _AudioSlot:
index: int
queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue())
done: bool = False
def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
class ParallelTTSStreamer:
def __init__(self, voice: str | None = None) -> None:
self.voice = voice
self.buffer = ""
self._cancelled = False
self._first_chunk = True
self._carry_space = False
self._slot_index = 0
self._slots: list[_AudioSlot] = []
self._slots_lock = asyncio.Lock()
self._tasks: list[asyncio.Task] = []
self._llm_done = asyncio.Event()
self._slot_added = asyncio.Event()
self._last_flush_t: float = 0.0
self._last_token_t: float = 0.0
async def add_token(self, token: str) -> None:
if not token or self._cancelled:
return
loop = asyncio.get_running_loop()
now = loop.time()
self._last_token_t = now
# If we flushed at a word boundary previously, preserve a single
# inter-word space so Bengali/English words don't get glued together.
if self.buffer == " " and token[:1].isspace():
token = token.lstrip()
self.buffer += token
reason = _flush_reason(self.buffer, self._first_chunk)
if reason is not None:
self._first_chunk = False
self._carry_space = (reason == "space")
await self._schedule_chunk()
self._last_flush_t = now
return
# Safety valve: if tokens arrive without good boundaries, we can go a
# long time without scheduling any TTS slots → streamer timeout/no audio.
# Force a flush after a short delay once we have enough text.
flush_min = FIRST_FLUSH_MIN if self._first_chunk else SUBSEQUENT_FLUSH_MIN
if len(self.buffer) >= flush_min and (now - self._last_flush_t) >= 0.8:
self._first_chunk = False
# Time-based flush: don't force a carry space.
self._carry_space = False
await self._schedule_chunk()
self._last_flush_t = now
async def _schedule_chunk(self) -> None:
if self._cancelled:
self.buffer = ""
return
raw = self.buffer
self.buffer = " " if self._carry_space else ""
self._carry_space = False
# IMPORTANT: don't lose an inter-word space when the flush happened
# exactly at a word boundary (buffer ended with " ").
text = _clean_for_tts(raw)
if len(text) < MIN_CHARS:
return
async with self._slots_lock:
slot = _AudioSlot(index=self._slot_index)
self._slot_index += 1
self._slots.append(slot)
self._slot_added.set()
task = asyncio.create_task(self._synthesise(text, slot))
self._tasks.append(task)
task.add_done_callback(
lambda t: self._tasks.remove(t) if t in self._tasks else None
)
async def _synthesise(self, text: str, slot: _AudioSlot) -> None:
if self._cancelled:
slot.mark_error()
return
try:
async for chunk in text_to_speech_stream(text, voice=self.voice):
if self._cancelled:
break
await slot.queue.put(chunk)
except asyncio.CancelledError:
pass
except Exception as exc:
print(f"[Streamer] TTS error for '{text[:50]}': {exc}")
finally:
slot.mark_done()
async def flush(self) -> None:
if self.buffer.strip():
await self._schedule_chunk()
self._llm_done.set()
async def cancel(self) -> None:
self._cancelled = True
tasks = list(self._tasks)
self._tasks.clear()
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async with self._slots_lock:
for slot in self._slots:
if not slot.done:
slot.mark_error()
self._llm_done.set()
self._slot_added.set()
async def stream_audio(self) -> AsyncGenerator[bytes, None]:
"""
Deliver TTS audio chunks in slot order.
FIX-BUG5 — double-check pattern eliminates TOCTOU race:
1. clear() the event
2. Re-check slot list under lock (slot may have been added between
previous check and clear())
3. Only then wait() — so we never miss a newly-added slot
"""
delivered = 0
while True:
async with self._slots_lock:
slot = self._slots[delivered] if delivered < len(self._slots) else None
if slot is None:
if self._llm_done.is_set():
async with self._slots_lock:
total = len(self._slots)
if delivered >= total:
break # All slots consumed; done.
# FIX-BUG5: clear → re-check → wait
self._slot_added.clear()
async with self._slots_lock:
have_new = delivered < len(self._slots)
if have_new:
continue
try:
await asyncio.wait_for(self._slot_added.wait(), timeout=30.0)
except asyncio.TimeoutError:
# Don't abort the whole stream; LLM/TTS backends can stall.
# Keep waiting unless the LLM already finished.
if self._llm_done.is_set():
break
print("[Streamer] Timeout waiting for TTS slot (continuing)…")
continue
continue
# Drain this slot's audio queue in order
while True:
item = await slot.queue.get()
if item is _SENTINEL:
break
if not self._cancelled:
yield item
delivered += 1
def reset(self) -> None:
self._cancelled = False
self._first_chunk = True
self._carry_space = False
self.buffer = ""
self._slot_index = 0
self._slots.clear()
self._tasks.clear()
self._llm_done.clear()
self._slot_added.clear()