File size: 13,440 Bytes
75ee53d 5dabf9d fc967af 5dabf9d 75ee53d b70a952 ed5b8b8 b70a952 4d2289b 75ee53d 5dabf9d 58fed26 5dabf9d 75ee53d 5dabf9d 4d2289b 16676c4 ed5b8b8 2d19124 f84481c 2d19124 16676c4 2d19124 ed5b8b8 75ee53d 4d2289b 5dabf9d 4d2289b 5dabf9d 75ee53d 5dabf9d 4d2289b 5dabf9d 4d2289b 5dabf9d 4d2289b 75ee53d 4d2289b f84481c ed5b8b8 b70a952 75ee53d 4d2289b 2d19124 75ee53d 4d2289b 75ee53d f84481c 58fed26 ed5b8b8 b70a952 ed5b8b8 58fed26 2d19124 4d2289b 2d19124 4d2289b 2d19124 4d2289b 58fed26 2d19124 58fed26 ed5b8b8 4d2289b b70a952 5dabf9d 2d19124 ed5b8b8 4d2289b f84481c 4d2289b b70a952 5dabf9d 4d2289b 5dabf9d 4d2289b 75ee53d 4d2289b 75ee53d 4d2289b 75ee53d 4d2289b 75ee53d 4d2289b 75ee53d 4d2289b ed5b8b8 4d2289b 75ee53d 4d2289b b70a952 f84481c 5dabf9d 75ee53d 4d2289b 5dabf9d 75ee53d f84481c ed5b8b8 75ee53d 5dabf9d 4d2289b ed5b8b8 4d2289b 75ee53d 5dabf9d 4d2289b 75ee53d 5dabf9d 75ee53d 5dabf9d 75ee53d 58fed26 75ee53d 58fed26 4d2289b 5dabf9d 75ee53d 5dabf9d 4d2289b 5dabf9d 2d19124 5dabf9d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | """
services/streaming.py — Production-grade parallel TTS streamer
FIX-ISSUE4 (Natural, slow, small-chunk TTS):
The previous code used character-count thresholds that produced large
sentence-level chunks (25–65 chars), causing buffered, robotic-feeling
speech with a burst of audio at once.
New behaviour:
• Flush at word boundaries (2–3 words) for voice-like pacing.
• Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which
corresponds to roughly 2–3 average Bengali/English words.
• Hard limit of 40 chars ensures no chunk ever gets too large.
• Sentence-ending punctuation (।.!?) always flushes immediately
regardless of length, giving natural pause points.
• The TTS rate is slightly faster than neutral in tts.py for a more
conversational pace.
Result: audio arrives in small, fast, overlapping synthesis tasks,
giving a low-latency, smooth, natural speech feel.
FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version.
"""
from __future__ import annotations
import asyncio
import re
from dataclasses import dataclass, field
from typing import AsyncGenerator
from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE
# ── Chunk size tuning ──────────────────────────────────────────────────────────
# These character counts correspond roughly to:
# FIRST_FLUSH_MIN ~2 words (get audio playing ASAP)
# SUBSEQUENT_FLUSH_MIN ~3 words (natural conversational phrase)
# HARD_LIMIT ~6 words (never accumulate more than this)
#
# At average Bengali word length ~4–5 chars + space:
# 10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words
if USE_ELEVENLABS:
# ElevenLabs per-chunk latency is higher; flush smaller chunks so the
# first playable audio arrives sooner and pauses feel shorter.
FIRST_FLUSH_MIN = 8
FIRST_FLUSH_HARD = 18
SUBSEQUENT_FLUSH_MIN = 14
SUBSEQUENT_FLUSH_HARD = 28
else:
FIRST_FLUSH_MIN = 10
FIRST_FLUSH_HARD = 30
SUBSEQUENT_FLUSH_MIN = 18
SUBSEQUENT_FLUSH_HARD = 40
_backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS"
print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}–{SUBSEQUENT_FLUSH_HARD} chars")
MIN_CHARS = 2
SENTENCE_BOUNDARIES = frozenset(".!?।॥\n")
CLAUSE_BOUNDARIES = frozenset(",;:—–")
_SENTINEL = object()
_DIGIT_WORDS = {
"0": "শূন্য",
"1": "এক",
"2": "দুই",
"3": "তিন",
"4": "চার",
"5": "পাঁচ",
"6": "ছয়",
"7": "সাত",
"8": "আট",
"9": "নয়",
"০": "শূন্য",
"১": "এক",
"২": "দুই",
"৩": "তিন",
"৪": "চার",
"৫": "পাঁচ",
"৬": "ছয়",
"৭": "সাত",
"৮": "আট",
"৯": "নয়",
"٠": "শূন্য",
"١": "এক",
"٢": "দুই",
"٣": "তিন",
"٤": "চার",
"٥": "পাঁচ",
"٦": "ছয়",
"٧": "সাত",
"٨": "আট",
"٩": "নয়",
}
def _spoken_phone_text(text: str) -> str:
if not text:
return ""
def repl(match: re.Match[str]) -> str:
chunk = match.group(0)
digits = [ch for ch in chunk if ch in _DIGIT_WORDS]
if len(digits) < 10:
return chunk
spoken = " ".join(_DIGIT_WORDS[ch] for ch in digits)
prev_char = text[match.start() - 1] if match.start() > 0 else ""
next_char = text[match.end()] if match.end() < len(text) else ""
if prev_char and not prev_char.isspace() and prev_char not in "([<{\"'":
spoken = " " + spoken
if next_char and not next_char.isspace() and next_char not in ")]>.,!?;:}\"'":
spoken = spoken + " "
return spoken
out = re.sub(r"[+\d০-৯٠-٩][\d০-৯٠-٩\s().\-]{8,}[\d০-৯٠-٩]", repl, text)
return re.sub(r"[ \t]{2,}", " ", out)
def _clean_for_tts(text: str) -> str:
# Strip emotion/tone tags like "[calm]" "[neutral]" "[happy]" etc.
# These are useful for UI but often degrade or break TTS synthesis.
# Remove them wherever they appear, then normalize whitespace.
text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text)
# Also strip orphaned tag fragments that can occur if the streamer flushes
# mid-tag during token streaming (e.g. "[neutral" or "neutral]").
text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text)
text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text)
text = re.sub(r"\*{1,3}", "", text)
text = re.sub(r"#+\s*", "", text)
text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"`+", "", text)
text = re.sub(r"\n{2,}", "\n", text)
# Collapse runs of spaces introduced by tag removal.
text = re.sub(r"[ \t]{2,}", " ", text)
text = _spoken_phone_text(text)
# Keep normal spaces so chunk boundaries don't glue words together.
return text.strip("\n\r\t")
def _flush_reason(buffer: str, first_chunk: bool) -> str | None:
"""
Like _should_flush, but returns the reason so we can preserve spacing
when flushing at a word boundary.
"""
n = len(buffer)
if n == 0:
return None
flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN
hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
if n >= hard_limit:
return "hard"
last_char = buffer[-1]
if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
return "sentence"
if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
return "clause"
if last_char == " " and n >= flush_min:
return "space"
return None
def _should_flush(buffer: str, first_chunk: bool) -> bool:
n = len(buffer)
if n == 0:
return False
flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN
hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
# Hard limit — always flush regardless of boundary
if n >= hard_limit:
return True
last_char = buffer[-1]
# Sentence ending — flush immediately (natural pause point)
if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
return True
# Clause boundary — flush at ~75% of hard limit
if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
return True
# Word boundary (space after minimum words reached)
if last_char == ' ' and n >= flush_min:
return True
return False
@dataclass
class _AudioSlot:
index: int
queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue())
done: bool = False
def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
class ParallelTTSStreamer:
def __init__(self, voice: str | None = None) -> None:
self.voice = voice
self.buffer = ""
self._cancelled = False
self._first_chunk = True
self._carry_space = False
self._slot_index = 0
self._slots: list[_AudioSlot] = []
self._slots_lock = asyncio.Lock()
self._tasks: list[asyncio.Task] = []
self._llm_done = asyncio.Event()
self._slot_added = asyncio.Event()
self._last_flush_t: float = 0.0
self._last_token_t: float = 0.0
async def add_token(self, token: str) -> None:
if not token or self._cancelled:
return
loop = asyncio.get_running_loop()
now = loop.time()
self._last_token_t = now
# If we flushed at a word boundary previously, preserve a single
# inter-word space so Bengali/English words don't get glued together.
if self.buffer == " " and token[:1].isspace():
token = token.lstrip()
self.buffer += token
reason = _flush_reason(self.buffer, self._first_chunk)
if reason is not None:
self._first_chunk = False
self._carry_space = (reason == "space")
await self._schedule_chunk()
self._last_flush_t = now
return
# Safety valve: if tokens arrive without good boundaries, we can go a
# long time without scheduling any TTS slots → streamer timeout/no audio.
# Force a flush after a short delay once we have enough text.
flush_min = FIRST_FLUSH_MIN if self._first_chunk else SUBSEQUENT_FLUSH_MIN
if len(self.buffer) >= flush_min and (now - self._last_flush_t) >= 0.8:
self._first_chunk = False
# Time-based flush: don't force a carry space.
self._carry_space = False
await self._schedule_chunk()
self._last_flush_t = now
async def _schedule_chunk(self) -> None:
if self._cancelled:
self.buffer = ""
return
raw = self.buffer
self.buffer = " " if self._carry_space else ""
self._carry_space = False
# IMPORTANT: don't lose an inter-word space when the flush happened
# exactly at a word boundary (buffer ended with " ").
text = _clean_for_tts(raw)
if len(text) < MIN_CHARS:
return
async with self._slots_lock:
slot = _AudioSlot(index=self._slot_index)
self._slot_index += 1
self._slots.append(slot)
self._slot_added.set()
task = asyncio.create_task(self._synthesise(text, slot))
self._tasks.append(task)
task.add_done_callback(
lambda t: self._tasks.remove(t) if t in self._tasks else None
)
async def _synthesise(self, text: str, slot: _AudioSlot) -> None:
if self._cancelled:
slot.mark_error()
return
try:
async for chunk in text_to_speech_stream(text, voice=self.voice):
if self._cancelled:
break
await slot.queue.put(chunk)
except asyncio.CancelledError:
pass
except Exception as exc:
print(f"[Streamer] TTS error for '{text[:50]}': {exc}")
finally:
slot.mark_done()
async def flush(self) -> None:
if self.buffer.strip():
await self._schedule_chunk()
self._llm_done.set()
async def cancel(self) -> None:
self._cancelled = True
tasks = list(self._tasks)
self._tasks.clear()
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async with self._slots_lock:
for slot in self._slots:
if not slot.done:
slot.mark_error()
self._llm_done.set()
self._slot_added.set()
async def stream_audio(self) -> AsyncGenerator[bytes, None]:
"""
Deliver TTS audio chunks in slot order.
FIX-BUG5 — double-check pattern eliminates TOCTOU race:
1. clear() the event
2. Re-check slot list under lock (slot may have been added between
previous check and clear())
3. Only then wait() — so we never miss a newly-added slot
"""
delivered = 0
while True:
async with self._slots_lock:
slot = self._slots[delivered] if delivered < len(self._slots) else None
if slot is None:
if self._llm_done.is_set():
async with self._slots_lock:
total = len(self._slots)
if delivered >= total:
break # All slots consumed; done.
# FIX-BUG5: clear → re-check → wait
self._slot_added.clear()
async with self._slots_lock:
have_new = delivered < len(self._slots)
if have_new:
continue
try:
await asyncio.wait_for(self._slot_added.wait(), timeout=30.0)
except asyncio.TimeoutError:
# Don't abort the whole stream; LLM/TTS backends can stall.
# Keep waiting unless the LLM already finished.
if self._llm_done.is_set():
break
print("[Streamer] Timeout waiting for TTS slot (continuing)…")
continue
continue
# Drain this slot's audio queue in order
while True:
item = await slot.queue.get()
if item is _SENTINEL:
break
if not self._cancelled:
yield item
delivered += 1
def reset(self) -> None:
self._cancelled = False
self._first_chunk = True
self._carry_space = False
self.buffer = ""
self._slot_index = 0
self._slots.clear()
self._tasks.clear()
self._llm_done.clear()
self._slot_added.clear()
|