""" services/streaming.py — Production-grade parallel TTS streamer FIX-ISSUE4 (Natural, slow, small-chunk TTS): The previous code used character-count thresholds that produced large sentence-level chunks (25–65 chars), causing buffered, robotic-feeling speech with a burst of audio at once. New behaviour: • Flush at word boundaries (2–3 words) for voice-like pacing. • Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which corresponds to roughly 2–3 average Bengali/English words. • Hard limit of 40 chars ensures no chunk ever gets too large. • Sentence-ending punctuation (।.!?) always flushes immediately regardless of length, giving natural pause points. • The TTS rate is set to "-35%" in tts.py (slightly slower than before). Result: audio arrives in small, fast, overlapping synthesis tasks, giving a low-latency, smooth, natural speech feel. FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version. """ from __future__ import annotations import asyncio import re from dataclasses import dataclass, field from typing import AsyncGenerator from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE # ── Chunk size tuning ────────────────────────────────────────────────────────── # These character counts correspond roughly to: # FIRST_FLUSH_MIN ~2 words (get audio playing ASAP) # SUBSEQUENT_FLUSH_MIN ~3 words (natural conversational phrase) # HARD_LIMIT ~6 words (never accumulate more than this) # # At average Bengali word length ~4–5 chars + space: # 10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words FIRST_FLUSH_MIN = 10 FIRST_FLUSH_HARD = 30 SUBSEQUENT_FLUSH_MIN = 18 SUBSEQUENT_FLUSH_HARD = 40 _backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS" print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}–{SUBSEQUENT_FLUSH_HARD} chars") MIN_CHARS = 2 SENTENCE_BOUNDARIES = frozenset(".!?।॥\n") CLAUSE_BOUNDARIES = frozenset(",;:—–") _SENTINEL = object() def _clean_for_tts(text: str) -> str: text = re.sub(r"\*{1,3}", "", text) text = re.sub(r"#+\s*", "", text) text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE) text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE) text = re.sub(r"`+", "", text) text = re.sub(r"\n{2,}", "\n", text) return text.strip() def _should_flush(buffer: str, first_chunk: bool) -> bool: n = len(buffer) if n == 0: return False flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD # Hard limit — always flush regardless of boundary if n >= hard_limit: return True last_char = buffer[-1] # Sentence ending — flush immediately (natural pause point) if last_char in SENTENCE_BOUNDARIES and n >= flush_min: return True # Clause boundary — flush at ~75% of hard limit if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70: return True # Word boundary (space after minimum words reached) if last_char == ' ' and n >= flush_min: return True return False @dataclass class _AudioSlot: index: int queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue()) done: bool = False def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) class ParallelTTSStreamer: def __init__(self, voice: str | None = None) -> None: self.voice = voice self.buffer = "" self._cancelled = False self._first_chunk = True self._slot_index = 0 self._slots: list[_AudioSlot] = [] self._slots_lock = asyncio.Lock() self._tasks: list[asyncio.Task] = [] self._llm_done = asyncio.Event() self._slot_added = asyncio.Event() async def add_token(self, token: str) -> None: if not token or self._cancelled: return self.buffer += token if _should_flush(self.buffer, self._first_chunk): self._first_chunk = False await self._schedule_chunk() async def _schedule_chunk(self) -> None: if self._cancelled: self.buffer = "" return text = _clean_for_tts(self.buffer.strip()) self.buffer = "" if len(text) < MIN_CHARS: return async with self._slots_lock: slot = _AudioSlot(index=self._slot_index) self._slot_index += 1 self._slots.append(slot) self._slot_added.set() task = asyncio.create_task(self._synthesise(text, slot)) self._tasks.append(task) task.add_done_callback( lambda t: self._tasks.remove(t) if t in self._tasks else None ) async def _synthesise(self, text: str, slot: _AudioSlot) -> None: if self._cancelled: slot.mark_error() return try: async for chunk in text_to_speech_stream(text, voice=self.voice): if self._cancelled: break await slot.queue.put(chunk) except asyncio.CancelledError: pass except Exception as exc: print(f"[Streamer] TTS error for '{text[:50]}': {exc}") finally: slot.mark_done() async def flush(self) -> None: if self.buffer.strip(): await self._schedule_chunk() self._llm_done.set() async def cancel(self) -> None: self._cancelled = True tasks = list(self._tasks) self._tasks.clear() for t in tasks: t.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) async with self._slots_lock: for slot in self._slots: if not slot.done: slot.mark_error() self._llm_done.set() self._slot_added.set() async def stream_audio(self) -> AsyncGenerator[bytes, None]: """ Deliver TTS audio chunks in slot order. FIX-BUG5 — double-check pattern eliminates TOCTOU race: 1. clear() the event 2. Re-check slot list under lock (slot may have been added between previous check and clear()) 3. Only then wait() — so we never miss a newly-added slot """ delivered = 0 while True: async with self._slots_lock: slot = self._slots[delivered] if delivered < len(self._slots) else None if slot is None: if self._llm_done.is_set(): async with self._slots_lock: total = len(self._slots) if delivered >= total: break # All slots consumed; done. # FIX-BUG5: clear → re-check → wait self._slot_added.clear() async with self._slots_lock: have_new = delivered < len(self._slots) if have_new: continue try: await asyncio.wait_for(self._slot_added.wait(), timeout=10.0) except asyncio.TimeoutError: print("[Streamer] Timeout waiting for TTS slot.") break continue # Drain this slot's audio queue in order while True: item = await slot.queue.get() if item is _SENTINEL: break if not self._cancelled: yield item delivered += 1 def reset(self) -> None: self._cancelled = False self._first_chunk = True self.buffer = "" self._slot_index = 0 self._slots.clear() self._tasks.clear() self._llm_done.clear() self._slot_added.clear()