""" services/streaming.py — Production-grade parallel TTS streamer FIX-ISSUE4 (Natural, slow, small-chunk TTS): The previous code used character-count thresholds that produced large sentence-level chunks (25–65 chars), causing buffered, robotic-feeling speech with a burst of audio at once. New behaviour: • Flush at word boundaries (2–3 words) for voice-like pacing. • Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which corresponds to roughly 2–3 average Bengali/English words. • Hard limit of 40 chars ensures no chunk ever gets too large. • Sentence-ending punctuation (।.!?) always flushes immediately regardless of length, giving natural pause points. • The TTS rate is slightly faster than neutral in tts.py for a more conversational pace. Result: audio arrives in small, fast, overlapping synthesis tasks, giving a low-latency, smooth, natural speech feel. FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version. """ from __future__ import annotations import asyncio import re from dataclasses import dataclass, field from typing import AsyncGenerator from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE # ── Chunk size tuning ────────────────────────────────────────────────────────── # These character counts correspond roughly to: # FIRST_FLUSH_MIN ~2 words (get audio playing ASAP) # SUBSEQUENT_FLUSH_MIN ~3 words (natural conversational phrase) # HARD_LIMIT ~6 words (never accumulate more than this) # # At average Bengali word length ~4–5 chars + space: # 10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words if USE_ELEVENLABS: # ElevenLabs per-chunk latency is higher; flush smaller chunks so the # first playable audio arrives sooner and pauses feel shorter. FIRST_FLUSH_MIN = 8 FIRST_FLUSH_HARD = 18 SUBSEQUENT_FLUSH_MIN = 14 SUBSEQUENT_FLUSH_HARD = 28 else: FIRST_FLUSH_MIN = 10 FIRST_FLUSH_HARD = 30 SUBSEQUENT_FLUSH_MIN = 18 SUBSEQUENT_FLUSH_HARD = 40 _backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS" print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}–{SUBSEQUENT_FLUSH_HARD} chars") MIN_CHARS = 2 SENTENCE_BOUNDARIES = frozenset(".!?।॥\n") CLAUSE_BOUNDARIES = frozenset(",;:—–") _SENTINEL = object() _DIGIT_WORDS = { "0": "শূন্য", "1": "এক", "2": "দুই", "3": "তিন", "4": "চার", "5": "পাঁচ", "6": "ছয়", "7": "সাত", "8": "আট", "9": "নয়", "০": "শূন্য", "১": "এক", "২": "দুই", "৩": "তিন", "৪": "চার", "৫": "পাঁচ", "৬": "ছয়", "৭": "সাত", "৮": "আট", "৯": "নয়", "٠": "শূন্য", "١": "এক", "٢": "দুই", "٣": "তিন", "٤": "চার", "٥": "পাঁচ", "٦": "ছয়", "٧": "সাত", "٨": "আট", "٩": "নয়", } def _spoken_phone_text(text: str) -> str: if not text: return "" def repl(match: re.Match[str]) -> str: chunk = match.group(0) digits = [ch for ch in chunk if ch in _DIGIT_WORDS] if len(digits) < 10: return chunk spoken = " ".join(_DIGIT_WORDS[ch] for ch in digits) prev_char = text[match.start() - 1] if match.start() > 0 else "" next_char = text[match.end()] if match.end() < len(text) else "" if prev_char and not prev_char.isspace() and prev_char not in "([<{\"'": spoken = " " + spoken if next_char and not next_char.isspace() and next_char not in ")]>.,!?;:}\"'": spoken = spoken + " " return spoken out = re.sub(r"[+\d০-৯٠-٩][\d০-৯٠-٩\s().\-]{8,}[\d০-৯٠-٩]", repl, text) return re.sub(r"[ \t]{2,}", " ", out) def _clean_for_tts(text: str) -> str: # Strip emotion/tone tags like "[calm]" "[neutral]" "[happy]" etc. # These are useful for UI but often degrade or break TTS synthesis. # Remove them wherever they appear, then normalize whitespace. text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text) # Also strip orphaned tag fragments that can occur if the streamer flushes # mid-tag during token streaming (e.g. "[neutral" or "neutral]"). text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text) text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text) text = re.sub(r"\*{1,3}", "", text) text = re.sub(r"#+\s*", "", text) text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE) text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE) text = re.sub(r"`+", "", text) text = re.sub(r"\n{2,}", "\n", text) # Collapse runs of spaces introduced by tag removal. text = re.sub(r"[ \t]{2,}", " ", text) text = _spoken_phone_text(text) # Keep normal spaces so chunk boundaries don't glue words together. return text.strip("\n\r\t") def _flush_reason(buffer: str, first_chunk: bool) -> str | None: """ Like _should_flush, but returns the reason so we can preserve spacing when flushing at a word boundary. """ n = len(buffer) if n == 0: return None flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD if n >= hard_limit: return "hard" last_char = buffer[-1] if last_char in SENTENCE_BOUNDARIES and n >= flush_min: return "sentence" if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70: return "clause" if last_char == " " and n >= flush_min: return "space" return None def _should_flush(buffer: str, first_chunk: bool) -> bool: n = len(buffer) if n == 0: return False flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD # Hard limit — always flush regardless of boundary if n >= hard_limit: return True last_char = buffer[-1] # Sentence ending — flush immediately (natural pause point) if last_char in SENTENCE_BOUNDARIES and n >= flush_min: return True # Clause boundary — flush at ~75% of hard limit if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70: return True # Word boundary (space after minimum words reached) if last_char == ' ' and n >= flush_min: return True return False @dataclass class _AudioSlot: index: int queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue()) done: bool = False def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) class ParallelTTSStreamer: def __init__(self, voice: str | None = None) -> None: self.voice = voice self.buffer = "" self._cancelled = False self._first_chunk = True self._carry_space = False self._slot_index = 0 self._slots: list[_AudioSlot] = [] self._slots_lock = asyncio.Lock() self._tasks: list[asyncio.Task] = [] self._llm_done = asyncio.Event() self._slot_added = asyncio.Event() self._last_flush_t: float = 0.0 self._last_token_t: float = 0.0 async def add_token(self, token: str) -> None: if not token or self._cancelled: return loop = asyncio.get_running_loop() now = loop.time() self._last_token_t = now # If we flushed at a word boundary previously, preserve a single # inter-word space so Bengali/English words don't get glued together. if self.buffer == " " and token[:1].isspace(): token = token.lstrip() self.buffer += token reason = _flush_reason(self.buffer, self._first_chunk) if reason is not None: self._first_chunk = False self._carry_space = (reason == "space") await self._schedule_chunk() self._last_flush_t = now return # Safety valve: if tokens arrive without good boundaries, we can go a # long time without scheduling any TTS slots → streamer timeout/no audio. # Force a flush after a short delay once we have enough text. flush_min = FIRST_FLUSH_MIN if self._first_chunk else SUBSEQUENT_FLUSH_MIN if len(self.buffer) >= flush_min and (now - self._last_flush_t) >= 0.8: self._first_chunk = False # Time-based flush: don't force a carry space. self._carry_space = False await self._schedule_chunk() self._last_flush_t = now async def _schedule_chunk(self) -> None: if self._cancelled: self.buffer = "" return raw = self.buffer self.buffer = " " if self._carry_space else "" self._carry_space = False # IMPORTANT: don't lose an inter-word space when the flush happened # exactly at a word boundary (buffer ended with " "). text = _clean_for_tts(raw) if len(text) < MIN_CHARS: return async with self._slots_lock: slot = _AudioSlot(index=self._slot_index) self._slot_index += 1 self._slots.append(slot) self._slot_added.set() task = asyncio.create_task(self._synthesise(text, slot)) self._tasks.append(task) task.add_done_callback( lambda t: self._tasks.remove(t) if t in self._tasks else None ) async def _synthesise(self, text: str, slot: _AudioSlot) -> None: if self._cancelled: slot.mark_error() return try: async for chunk in text_to_speech_stream(text, voice=self.voice): if self._cancelled: break await slot.queue.put(chunk) except asyncio.CancelledError: pass except Exception as exc: print(f"[Streamer] TTS error for '{text[:50]}': {exc}") finally: slot.mark_done() async def flush(self) -> None: if self.buffer.strip(): await self._schedule_chunk() self._llm_done.set() async def cancel(self) -> None: self._cancelled = True tasks = list(self._tasks) self._tasks.clear() for t in tasks: t.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) async with self._slots_lock: for slot in self._slots: if not slot.done: slot.mark_error() self._llm_done.set() self._slot_added.set() async def stream_audio(self) -> AsyncGenerator[bytes, None]: """ Deliver TTS audio chunks in slot order. FIX-BUG5 — double-check pattern eliminates TOCTOU race: 1. clear() the event 2. Re-check slot list under lock (slot may have been added between previous check and clear()) 3. Only then wait() — so we never miss a newly-added slot """ delivered = 0 while True: async with self._slots_lock: slot = self._slots[delivered] if delivered < len(self._slots) else None if slot is None: if self._llm_done.is_set(): async with self._slots_lock: total = len(self._slots) if delivered >= total: break # All slots consumed; done. # FIX-BUG5: clear → re-check → wait self._slot_added.clear() async with self._slots_lock: have_new = delivered < len(self._slots) if have_new: continue try: await asyncio.wait_for(self._slot_added.wait(), timeout=30.0) except asyncio.TimeoutError: # Don't abort the whole stream; LLM/TTS backends can stall. # Keep waiting unless the LLM already finished. if self._llm_done.is_set(): break print("[Streamer] Timeout waiting for TTS slot (continuing)…") continue continue # Drain this slot's audio queue in order while True: item = await slot.queue.get() if item is _SENTINEL: break if not self._cancelled: yield item delivered += 1 def reset(self) -> None: self._cancelled = False self._first_chunk = True self._carry_space = False self.buffer = "" self._slot_index = 0 self._slots.clear() self._tasks.clear() self._llm_done.clear() self._slot_added.clear()