Voice-AI-Agent / services /streaming.py
rakib72642's picture
added communication full layer
5dabf9d
raw
history blame
8.28 kB
"""
services/streaming.py — Production-grade parallel TTS streamer
FIX-ISSUE4 (Natural, slow, small-chunk TTS):
The previous code used character-count thresholds that produced large
sentence-level chunks (25–65 chars), causing buffered, robotic-feeling
speech with a burst of audio at once.
New behaviour:
• Flush at word boundaries (2–3 words) for voice-like pacing.
• Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which
corresponds to roughly 2–3 average Bengali/English words.
• Hard limit of 40 chars ensures no chunk ever gets too large.
• Sentence-ending punctuation (।.!?) always flushes immediately
regardless of length, giving natural pause points.
• The TTS rate is set to "-35%" in tts.py (slightly slower than before).
Result: audio arrives in small, fast, overlapping synthesis tasks,
giving a low-latency, smooth, natural speech feel.
FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version.
"""
from __future__ import annotations
import asyncio
import re
from dataclasses import dataclass, field
from typing import AsyncGenerator
from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE
# ── Chunk size tuning ──────────────────────────────────────────────────────────
# These character counts correspond roughly to:
# FIRST_FLUSH_MIN ~2 words (get audio playing ASAP)
# SUBSEQUENT_FLUSH_MIN ~3 words (natural conversational phrase)
# HARD_LIMIT ~6 words (never accumulate more than this)
#
# At average Bengali word length ~4–5 chars + space:
# 10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words
FIRST_FLUSH_MIN = 10
FIRST_FLUSH_HARD = 30
SUBSEQUENT_FLUSH_MIN = 18
SUBSEQUENT_FLUSH_HARD = 40
_backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS"
print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}{SUBSEQUENT_FLUSH_HARD} chars")
MIN_CHARS = 2
SENTENCE_BOUNDARIES = frozenset(".!?।॥\n")
CLAUSE_BOUNDARIES = frozenset(",;:—–")
_SENTINEL = object()
def _clean_for_tts(text: str) -> str:
text = re.sub(r"\*{1,3}", "", text)
text = re.sub(r"#+\s*", "", text)
text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"`+", "", text)
text = re.sub(r"\n{2,}", "\n", text)
return text.strip()
def _should_flush(buffer: str, first_chunk: bool) -> bool:
n = len(buffer)
if n == 0:
return False
flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN
hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
# Hard limit — always flush regardless of boundary
if n >= hard_limit:
return True
last_char = buffer[-1]
# Sentence ending — flush immediately (natural pause point)
if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
return True
# Clause boundary — flush at ~75% of hard limit
if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
return True
# Word boundary (space after minimum words reached)
if last_char == ' ' and n >= flush_min:
return True
return False
@dataclass
class _AudioSlot:
index: int
queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue())
done: bool = False
def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
class ParallelTTSStreamer:
def __init__(self, voice: str | None = None) -> None:
self.voice = voice
self.buffer = ""
self._cancelled = False
self._first_chunk = True
self._slot_index = 0
self._slots: list[_AudioSlot] = []
self._slots_lock = asyncio.Lock()
self._tasks: list[asyncio.Task] = []
self._llm_done = asyncio.Event()
self._slot_added = asyncio.Event()
async def add_token(self, token: str) -> None:
if not token or self._cancelled:
return
self.buffer += token
if _should_flush(self.buffer, self._first_chunk):
self._first_chunk = False
await self._schedule_chunk()
async def _schedule_chunk(self) -> None:
if self._cancelled:
self.buffer = ""
return
text = _clean_for_tts(self.buffer.strip())
self.buffer = ""
if len(text) < MIN_CHARS:
return
async with self._slots_lock:
slot = _AudioSlot(index=self._slot_index)
self._slot_index += 1
self._slots.append(slot)
self._slot_added.set()
task = asyncio.create_task(self._synthesise(text, slot))
self._tasks.append(task)
task.add_done_callback(
lambda t: self._tasks.remove(t) if t in self._tasks else None
)
async def _synthesise(self, text: str, slot: _AudioSlot) -> None:
if self._cancelled:
slot.mark_error()
return
try:
async for chunk in text_to_speech_stream(text, voice=self.voice):
if self._cancelled:
break
await slot.queue.put(chunk)
except asyncio.CancelledError:
pass
except Exception as exc:
print(f"[Streamer] TTS error for '{text[:50]}': {exc}")
finally:
slot.mark_done()
async def flush(self) -> None:
if self.buffer.strip():
await self._schedule_chunk()
self._llm_done.set()
async def cancel(self) -> None:
self._cancelled = True
tasks = list(self._tasks)
self._tasks.clear()
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async with self._slots_lock:
for slot in self._slots:
if not slot.done:
slot.mark_error()
self._llm_done.set()
self._slot_added.set()
async def stream_audio(self) -> AsyncGenerator[bytes, None]:
"""
Deliver TTS audio chunks in slot order.
FIX-BUG5 — double-check pattern eliminates TOCTOU race:
1. clear() the event
2. Re-check slot list under lock (slot may have been added between
previous check and clear())
3. Only then wait() — so we never miss a newly-added slot
"""
delivered = 0
while True:
async with self._slots_lock:
slot = self._slots[delivered] if delivered < len(self._slots) else None
if slot is None:
if self._llm_done.is_set():
async with self._slots_lock:
total = len(self._slots)
if delivered >= total:
break # All slots consumed; done.
# FIX-BUG5: clear → re-check → wait
self._slot_added.clear()
async with self._slots_lock:
have_new = delivered < len(self._slots)
if have_new:
continue
try:
await asyncio.wait_for(self._slot_added.wait(), timeout=10.0)
except asyncio.TimeoutError:
print("[Streamer] Timeout waiting for TTS slot.")
break
continue
# Drain this slot's audio queue in order
while True:
item = await slot.queue.get()
if item is _SENTINEL:
break
if not self._cancelled:
yield item
delivered += 1
def reset(self) -> None:
self._cancelled = False
self._first_chunk = True
self.buffer = ""
self._slot_index = 0
self._slots.clear()
self._tasks.clear()
self._llm_done.clear()
self._slot_added.clear()