File size: 13,440 Bytes
75ee53d
 
5dabf9d
 
 
 
 
 
 
 
 
 
 
 
 
fc967af
 
5dabf9d
 
 
 
 
75ee53d
 
b70a952
ed5b8b8
 
b70a952
4d2289b
75ee53d
 
 
 
5dabf9d
 
 
 
 
 
 
 
 
58fed26
 
 
 
 
 
 
 
 
 
 
 
5dabf9d
 
 
 
 
75ee53d
 
5dabf9d
4d2289b
16676c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed5b8b8
 
2d19124
 
 
 
 
 
 
 
f84481c
 
 
 
 
 
2d19124
 
16676c4
2d19124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed5b8b8
 
75ee53d
 
4d2289b
 
5dabf9d
 
 
 
 
4d2289b
 
5dabf9d
75ee53d
5dabf9d
 
 
 
 
 
 
4d2289b
5dabf9d
 
 
4d2289b
5dabf9d
4d2289b
 
 
 
 
75ee53d
 
 
4d2289b
f84481c
 
ed5b8b8
b70a952
75ee53d
 
 
 
 
4d2289b
2d19124
75ee53d
4d2289b
 
 
75ee53d
f84481c
58fed26
 
ed5b8b8
 
b70a952
ed5b8b8
58fed26
 
 
2d19124
 
 
 
4d2289b
2d19124
 
 
4d2289b
2d19124
4d2289b
58fed26
 
 
 
 
 
 
 
 
2d19124
 
58fed26
 
ed5b8b8
4d2289b
b70a952
5dabf9d
 
2d19124
 
 
 
 
 
ed5b8b8
 
4d2289b
 
 
 
f84481c
4d2289b
b70a952
5dabf9d
 
 
4d2289b
 
 
5dabf9d
 
4d2289b
75ee53d
4d2289b
75ee53d
 
4d2289b
75ee53d
4d2289b
75ee53d
4d2289b
75ee53d
4d2289b
ed5b8b8
4d2289b
 
75ee53d
4d2289b
b70a952
f84481c
5dabf9d
 
 
 
75ee53d
 
4d2289b
 
5dabf9d
 
75ee53d
f84481c
ed5b8b8
75ee53d
5dabf9d
 
 
 
 
 
 
 
 
4d2289b
ed5b8b8
4d2289b
75ee53d
5dabf9d
4d2289b
75ee53d
 
 
 
5dabf9d
 
 
75ee53d
5dabf9d
 
 
 
75ee53d
58fed26
75ee53d
58fed26
 
 
 
 
 
4d2289b
5dabf9d
 
75ee53d
 
5dabf9d
 
 
 
4d2289b
 
 
5dabf9d
 
2d19124
5dabf9d
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
"""
services/streaming.py — Production-grade parallel TTS streamer

FIX-ISSUE4 (Natural, slow, small-chunk TTS):
  The previous code used character-count thresholds that produced large
  sentence-level chunks (25–65 chars), causing buffered, robotic-feeling
  speech with a burst of audio at once.

  New behaviour:
    • Flush at word boundaries (2–3 words) for voice-like pacing.
    • Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which
      corresponds to roughly 2–3 average Bengali/English words.
    • Hard limit of 40 chars ensures no chunk ever gets too large.
    • Sentence-ending punctuation (।.!?) always flushes immediately
      regardless of length, giving natural pause points.
    • The TTS rate is slightly faster than neutral in tts.py for a more
      conversational pace.

  Result: audio arrives in small, fast, overlapping synthesis tasks,
  giving a low-latency, smooth, natural speech feel.

FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version.
"""

from __future__ import annotations

import asyncio
import re
from dataclasses import dataclass, field
from typing import AsyncGenerator

from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE

# ── Chunk size tuning ──────────────────────────────────────────────────────────
# These character counts correspond roughly to:
#   FIRST_FLUSH_MIN       ~2 words  (get audio playing ASAP)
#   SUBSEQUENT_FLUSH_MIN  ~3 words  (natural conversational phrase)
#   HARD_LIMIT            ~6 words  (never accumulate more than this)
#
# At average Bengali word length ~4–5 chars + space:
#   10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words

if USE_ELEVENLABS:
    # ElevenLabs per-chunk latency is higher; flush smaller chunks so the
    # first playable audio arrives sooner and pauses feel shorter.
    FIRST_FLUSH_MIN        = 8
    FIRST_FLUSH_HARD       = 18
    SUBSEQUENT_FLUSH_MIN   = 14
    SUBSEQUENT_FLUSH_HARD  = 28
else:
    FIRST_FLUSH_MIN        = 10
    FIRST_FLUSH_HARD       = 30
    SUBSEQUENT_FLUSH_MIN   = 18
    SUBSEQUENT_FLUSH_HARD  = 40

_backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS"
print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}{SUBSEQUENT_FLUSH_HARD} chars")

MIN_CHARS           = 2
SENTENCE_BOUNDARIES = frozenset(".!?।॥\n")
CLAUSE_BOUNDARIES   = frozenset(",;:—–")
_SENTINEL           = object()

_DIGIT_WORDS = {
    "0": "শূন্য",
    "1": "এক",
    "2": "দুই",
    "3": "তিন",
    "4": "চার",
    "5": "পাঁচ",
    "6": "ছয়",
    "7": "সাত",
    "8": "আট",
    "9": "নয়",
    "০": "শূন্য",
    "১": "এক",
    "২": "দুই",
    "৩": "তিন",
    "৪": "চার",
    "৫": "পাঁচ",
    "৬": "ছয়",
    "৭": "সাত",
    "৮": "আট",
    "৯": "নয়",
    "٠": "শূন্য",
    "١": "এক",
    "٢": "দুই",
    "٣": "তিন",
    "٤": "চার",
    "٥": "পাঁচ",
    "٦": "ছয়",
    "٧": "সাত",
    "٨": "আট",
    "٩": "নয়",
}


def _spoken_phone_text(text: str) -> str:
    if not text:
        return ""

    def repl(match: re.Match[str]) -> str:
        chunk = match.group(0)
        digits = [ch for ch in chunk if ch in _DIGIT_WORDS]
        if len(digits) < 10:
            return chunk
        spoken = " ".join(_DIGIT_WORDS[ch] for ch in digits)
        prev_char = text[match.start() - 1] if match.start() > 0 else ""
        next_char = text[match.end()] if match.end() < len(text) else ""
        if prev_char and not prev_char.isspace() and prev_char not in "([<{\"'":
            spoken = " " + spoken
        if next_char and not next_char.isspace() and next_char not in ")]>.,!?;:}\"'":
            spoken = spoken + " "
        return spoken

    out = re.sub(r"[+\d০-৯٠-٩][\d০-৯٠-٩\s().\-]{8,}[\d০-৯٠-٩]", repl, text)
    return re.sub(r"[ \t]{2,}", " ", out)


def _clean_for_tts(text: str) -> str:
    # Strip emotion/tone tags like "[calm]" "[neutral]" "[happy]" etc.
    # These are useful for UI but often degrade or break TTS synthesis.
    # Remove them wherever they appear, then normalize whitespace.
    text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text)
    # Also strip orphaned tag fragments that can occur if the streamer flushes
    # mid-tag during token streaming (e.g. "[neutral" or "neutral]").
    text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text)
    text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text)
    text = re.sub(r"\*{1,3}", "", text)
    text = re.sub(r"#+\s*", "", text)
    text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE)
    text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE)
    text = re.sub(r"`+", "", text)
    text = re.sub(r"\n{2,}", "\n", text)
    # Collapse runs of spaces introduced by tag removal.
    text = re.sub(r"[ \t]{2,}", " ", text)
    text = _spoken_phone_text(text)
    # Keep normal spaces so chunk boundaries don't glue words together.
    return text.strip("\n\r\t")


def _flush_reason(buffer: str, first_chunk: bool) -> str | None:
    """
    Like _should_flush, but returns the reason so we can preserve spacing
    when flushing at a word boundary.
    """
    n = len(buffer)
    if n == 0:
        return None

    flush_min  = FIRST_FLUSH_MIN  if first_chunk else SUBSEQUENT_FLUSH_MIN
    hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD

    if n >= hard_limit:
        return "hard"

    last_char = buffer[-1]

    if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
        return "sentence"

    if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
        return "clause"

    if last_char == " " and n >= flush_min:
        return "space"

    return None


def _should_flush(buffer: str, first_chunk: bool) -> bool:
    n = len(buffer)
    if n == 0:
        return False

    flush_min  = FIRST_FLUSH_MIN  if first_chunk else SUBSEQUENT_FLUSH_MIN
    hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD

    # Hard limit — always flush regardless of boundary
    if n >= hard_limit:
        return True

    last_char = buffer[-1]

    # Sentence ending — flush immediately (natural pause point)
    if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
        return True

    # Clause boundary — flush at ~75% of hard limit
    if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
        return True

    # Word boundary (space after minimum words reached)
    if last_char == ' ' and n >= flush_min:
        return True

    return False


@dataclass
class _AudioSlot:
    index: int
    queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue())
    done:  bool = False

    def mark_done(self)  -> None: self.done = True; self.queue.put_nowait(_SENTINEL)
    def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL)


class ParallelTTSStreamer:
    def __init__(self, voice: str | None = None) -> None:
        self.voice        = voice
        self.buffer       = ""
        self._cancelled   = False
        self._first_chunk = True
        self._carry_space = False
        self._slot_index  = 0
        self._slots: list[_AudioSlot] = []
        self._slots_lock  = asyncio.Lock()
        self._tasks: list[asyncio.Task] = []
        self._llm_done    = asyncio.Event()
        self._slot_added  = asyncio.Event()
        self._last_flush_t: float = 0.0
        self._last_token_t: float = 0.0

    async def add_token(self, token: str) -> None:
        if not token or self._cancelled:
            return
        loop = asyncio.get_running_loop()
        now  = loop.time()
        self._last_token_t = now
        # If we flushed at a word boundary previously, preserve a single
        # inter-word space so Bengali/English words don't get glued together.
        if self.buffer == " " and token[:1].isspace():
            token = token.lstrip()
        self.buffer += token

        reason = _flush_reason(self.buffer, self._first_chunk)
        if reason is not None:
            self._first_chunk = False
            self._carry_space = (reason == "space")
            await self._schedule_chunk()
            self._last_flush_t = now
            return

        # Safety valve: if tokens arrive without good boundaries, we can go a
        # long time without scheduling any TTS slots → streamer timeout/no audio.
        # Force a flush after a short delay once we have enough text.
        flush_min = FIRST_FLUSH_MIN if self._first_chunk else SUBSEQUENT_FLUSH_MIN
        if len(self.buffer) >= flush_min and (now - self._last_flush_t) >= 0.8:
            self._first_chunk = False
            # Time-based flush: don't force a carry space.
            self._carry_space = False
            await self._schedule_chunk()
            self._last_flush_t = now

    async def _schedule_chunk(self) -> None:
        if self._cancelled:
            self.buffer = ""
            return
        raw = self.buffer
        self.buffer = " " if self._carry_space else ""
        self._carry_space = False
        # IMPORTANT: don't lose an inter-word space when the flush happened
        # exactly at a word boundary (buffer ended with " ").
        text = _clean_for_tts(raw)
        if len(text) < MIN_CHARS:
            return
        async with self._slots_lock:
            slot = _AudioSlot(index=self._slot_index)
            self._slot_index += 1
            self._slots.append(slot)
            self._slot_added.set()
        task = asyncio.create_task(self._synthesise(text, slot))
        self._tasks.append(task)
        task.add_done_callback(
            lambda t: self._tasks.remove(t) if t in self._tasks else None
        )

    async def _synthesise(self, text: str, slot: _AudioSlot) -> None:
        if self._cancelled:
            slot.mark_error()
            return
        try:
            async for chunk in text_to_speech_stream(text, voice=self.voice):
                if self._cancelled:
                    break
                await slot.queue.put(chunk)
        except asyncio.CancelledError:
            pass
        except Exception as exc:
            print(f"[Streamer] TTS error for '{text[:50]}': {exc}")
        finally:
            slot.mark_done()

    async def flush(self) -> None:
        if self.buffer.strip():
            await self._schedule_chunk()
        self._llm_done.set()

    async def cancel(self) -> None:
        self._cancelled = True
        tasks = list(self._tasks)
        self._tasks.clear()
        for t in tasks:
            t.cancel()
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        async with self._slots_lock:
            for slot in self._slots:
                if not slot.done:
                    slot.mark_error()
        self._llm_done.set()
        self._slot_added.set()

    async def stream_audio(self) -> AsyncGenerator[bytes, None]:
        """
        Deliver TTS audio chunks in slot order.

        FIX-BUG5 — double-check pattern eliminates TOCTOU race:
          1. clear() the event
          2. Re-check slot list under lock (slot may have been added between
             previous check and clear())
          3. Only then wait() — so we never miss a newly-added slot
        """
        delivered = 0
        while True:
            async with self._slots_lock:
                slot = self._slots[delivered] if delivered < len(self._slots) else None

            if slot is None:
                if self._llm_done.is_set():
                    async with self._slots_lock:
                        total = len(self._slots)
                    if delivered >= total:
                        break  # All slots consumed; done.

                # FIX-BUG5: clear → re-check → wait
                self._slot_added.clear()
                async with self._slots_lock:
                    have_new = delivered < len(self._slots)
                if have_new:
                    continue
                try:
                    await asyncio.wait_for(self._slot_added.wait(), timeout=30.0)
                except asyncio.TimeoutError:
                    # Don't abort the whole stream; LLM/TTS backends can stall.
                    # Keep waiting unless the LLM already finished.
                    if self._llm_done.is_set():
                        break
                    print("[Streamer] Timeout waiting for TTS slot (continuing)…")
                    continue
                continue

            # Drain this slot's audio queue in order
            while True:
                item = await slot.queue.get()
                if item is _SENTINEL:
                    break
                if not self._cancelled:
                    yield item
            delivered += 1

    def reset(self) -> None:
        self._cancelled   = False
        self._first_chunk = True
        self._carry_space = False
        self.buffer       = ""
        self._slot_index  = 0
        self._slots.clear()
        self._tasks.clear()
        self._llm_done.clear()
        self._slot_added.clear()