| """ |
| services/streaming.py — Production-grade parallel TTS streamer |
| |
| FIX-ISSUE4 (Natural, slow, small-chunk TTS): |
| The previous code used character-count thresholds that produced large |
| sentence-level chunks (25–65 chars), causing buffered, robotic-feeling |
| speech with a burst of audio at once. |
| |
| New behaviour: |
| • Flush at word boundaries (2–3 words) for voice-like pacing. |
| • Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which |
| corresponds to roughly 2–3 average Bengali/English words. |
| • Hard limit of 40 chars ensures no chunk ever gets too large. |
| • Sentence-ending punctuation (।.!?) always flushes immediately |
| regardless of length, giving natural pause points. |
| • The TTS rate is set to "-35%" in tts.py (slightly slower than before). |
| |
| Result: audio arrives in small, fast, overlapping synthesis tasks, |
| giving a low-latency, smooth, natural speech feel. |
| |
| FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import re |
| from dataclasses import dataclass, field |
| from typing import AsyncGenerator |
|
|
| from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| FIRST_FLUSH_MIN = 10 |
| FIRST_FLUSH_HARD = 30 |
| SUBSEQUENT_FLUSH_MIN = 18 |
| SUBSEQUENT_FLUSH_HARD = 40 |
|
|
| _backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS" |
| print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}–{SUBSEQUENT_FLUSH_HARD} chars") |
|
|
| MIN_CHARS = 2 |
| SENTENCE_BOUNDARIES = frozenset(".!?।॥\n") |
| CLAUSE_BOUNDARIES = frozenset(",;:—–") |
| _SENTINEL = object() |
|
|
|
|
| def _clean_for_tts(text: str) -> str: |
| text = re.sub(r"\*{1,3}", "", text) |
| text = re.sub(r"#+\s*", "", text) |
| text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE) |
| text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE) |
| text = re.sub(r"`+", "", text) |
| text = re.sub(r"\n{2,}", "\n", text) |
| return text.strip() |
|
|
|
|
| def _should_flush(buffer: str, first_chunk: bool) -> bool: |
| n = len(buffer) |
| if n == 0: |
| return False |
|
|
| flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN |
| hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD |
|
|
| |
| if n >= hard_limit: |
| return True |
|
|
| last_char = buffer[-1] |
|
|
| |
| if last_char in SENTENCE_BOUNDARIES and n >= flush_min: |
| return True |
|
|
| |
| if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70: |
| return True |
|
|
| |
| if last_char == ' ' and n >= flush_min: |
| return True |
|
|
| return False |
|
|
|
|
| @dataclass |
| class _AudioSlot: |
| index: int |
| queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue()) |
| done: bool = False |
|
|
| def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) |
| def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) |
|
|
|
|
| class ParallelTTSStreamer: |
| def __init__(self, voice: str | None = None) -> None: |
| self.voice = voice |
| self.buffer = "" |
| self._cancelled = False |
| self._first_chunk = True |
| self._slot_index = 0 |
| self._slots: list[_AudioSlot] = [] |
| self._slots_lock = asyncio.Lock() |
| self._tasks: list[asyncio.Task] = [] |
| self._llm_done = asyncio.Event() |
| self._slot_added = asyncio.Event() |
|
|
| async def add_token(self, token: str) -> None: |
| if not token or self._cancelled: |
| return |
| self.buffer += token |
| if _should_flush(self.buffer, self._first_chunk): |
| self._first_chunk = False |
| await self._schedule_chunk() |
|
|
| async def _schedule_chunk(self) -> None: |
| if self._cancelled: |
| self.buffer = "" |
| return |
| text = _clean_for_tts(self.buffer.strip()) |
| self.buffer = "" |
| if len(text) < MIN_CHARS: |
| return |
| async with self._slots_lock: |
| slot = _AudioSlot(index=self._slot_index) |
| self._slot_index += 1 |
| self._slots.append(slot) |
| self._slot_added.set() |
| task = asyncio.create_task(self._synthesise(text, slot)) |
| self._tasks.append(task) |
| task.add_done_callback( |
| lambda t: self._tasks.remove(t) if t in self._tasks else None |
| ) |
|
|
| async def _synthesise(self, text: str, slot: _AudioSlot) -> None: |
| if self._cancelled: |
| slot.mark_error() |
| return |
| try: |
| async for chunk in text_to_speech_stream(text, voice=self.voice): |
| if self._cancelled: |
| break |
| await slot.queue.put(chunk) |
| except asyncio.CancelledError: |
| pass |
| except Exception as exc: |
| print(f"[Streamer] TTS error for '{text[:50]}': {exc}") |
| finally: |
| slot.mark_done() |
|
|
| async def flush(self) -> None: |
| if self.buffer.strip(): |
| await self._schedule_chunk() |
| self._llm_done.set() |
|
|
| async def cancel(self) -> None: |
| self._cancelled = True |
| tasks = list(self._tasks) |
| self._tasks.clear() |
| for t in tasks: |
| t.cancel() |
| if tasks: |
| await asyncio.gather(*tasks, return_exceptions=True) |
| async with self._slots_lock: |
| for slot in self._slots: |
| if not slot.done: |
| slot.mark_error() |
| self._llm_done.set() |
| self._slot_added.set() |
|
|
| async def stream_audio(self) -> AsyncGenerator[bytes, None]: |
| """ |
| Deliver TTS audio chunks in slot order. |
| |
| FIX-BUG5 — double-check pattern eliminates TOCTOU race: |
| 1. clear() the event |
| 2. Re-check slot list under lock (slot may have been added between |
| previous check and clear()) |
| 3. Only then wait() — so we never miss a newly-added slot |
| """ |
| delivered = 0 |
| while True: |
| async with self._slots_lock: |
| slot = self._slots[delivered] if delivered < len(self._slots) else None |
|
|
| if slot is None: |
| if self._llm_done.is_set(): |
| async with self._slots_lock: |
| total = len(self._slots) |
| if delivered >= total: |
| break |
|
|
| |
| self._slot_added.clear() |
| async with self._slots_lock: |
| have_new = delivered < len(self._slots) |
| if have_new: |
| continue |
| try: |
| await asyncio.wait_for(self._slot_added.wait(), timeout=10.0) |
| except asyncio.TimeoutError: |
| print("[Streamer] Timeout waiting for TTS slot.") |
| break |
| continue |
|
|
| |
| while True: |
| item = await slot.queue.get() |
| if item is _SENTINEL: |
| break |
| if not self._cancelled: |
| yield item |
| delivered += 1 |
|
|
| def reset(self) -> None: |
| self._cancelled = False |
| self._first_chunk = True |
| self.buffer = "" |
| self._slot_index = 0 |
| self._slots.clear() |
| self._tasks.clear() |
| self._llm_done.clear() |
| self._slot_added.clear() |
|
|