rakib72642 commited on
Commit
b70a952
Β·
1 Parent(s): ed5b8b8

updated js+stream+stt+app

Browse files
Files changed (4) hide show
  1. app.py +62 -34
  2. frontend/script.js +89 -19
  3. services/streaming.py +116 -206
  4. services/stt.py +162 -93
app.py CHANGED
@@ -1,3 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
 
2
  import asyncio
3
  import json
@@ -40,9 +55,8 @@ async def root():
40
  return HTMLResponse("<h2>index.html not found</h2>", status_code=404)
41
 
42
 
43
- # ── Helpers ───────────────────────────────────────────────────────────────────
44
  def _ws_open(ws: WebSocket) -> bool:
45
- """Return True if the WebSocket connection is still alive."""
46
  return ws.client_state == WebSocketState.CONNECTED
47
 
48
 
@@ -66,7 +80,7 @@ async def _safe_bytes(ws: WebSocket, data: bytes) -> bool:
66
  return False
67
 
68
 
69
- # ── Text chat WebSocket ───────────────────────────────────────────────────────
70
  @app.websocket("/ws/chat")
71
  async def ws_chat(ws: WebSocket):
72
  await ws.accept()
@@ -91,34 +105,31 @@ async def ws_chat(ws: WebSocket):
91
  async for token in stream:
92
  full_response += token
93
  await _safe_text(ws, {"type": "chat", "text": full_response})
94
- except Exception as e:
95
- print(f"[CHAT] AI error: {e}")
96
- await _safe_text(ws, {"type": "error", "text": str(e)})
97
 
98
  await _safe_text(ws, {"type": "end"})
99
 
100
  except WebSocketDisconnect:
101
  print("[CHAT] Client disconnected")
102
- except Exception as e:
103
- if "disconnect" not in str(e).lower():
104
- print(f"[CHAT] WS error: {e}")
105
 
106
 
107
- # ── Voice WebSocket ───────────────────────────────────────────────────────────
108
  @app.websocket("/ws/voice")
109
  async def ws_voice(ws: WebSocket):
110
  await ws.accept()
111
  print("[VOICE] Client connected")
112
 
113
- stt = STTProcessor()
114
- user_id = "voice_user"
 
115
 
116
  try:
117
  while True:
118
- # ── FIX: Check connection state before every receive ──────────────
119
- # The previous crash "Cannot call receive once a disconnect message
120
- # has been received" happened because we called ws.receive() after
121
- # the client had already disconnected. Now we check first.
122
  if not _ws_open(ws):
123
  print("[VOICE] Connection dropped, exiting handler.")
124
  break
@@ -128,28 +139,32 @@ async def ws_voice(ws: WebSocket):
128
  except WebSocketDisconnect:
129
  print("[VOICE] Client disconnected.")
130
  break
131
- except Exception as e:
132
- # Catches starlette's internal disconnect errors gracefully
133
- if "disconnect" in str(e).lower():
134
  print("[VOICE] Client disconnected (recv error).")
135
  else:
136
- print(f"[VOICE] Receive error: {e}")
137
  break
138
 
139
- # ── Audio blob from VAD ───────────────────────────────────────────
140
  if "bytes" in data and data["bytes"]:
141
  audio_bytes = data["bytes"]
142
  print(f"[VOICE] Received utterance: {len(audio_bytes):,} bytes")
143
 
144
- # 1. STT β€” in thread so event loop isn't blocked
145
- transcript = await asyncio.to_thread(stt.transcribe, audio_bytes)
 
 
 
 
 
 
146
 
147
  if not transcript:
148
  await _safe_text(ws, {
149
  "type": "error",
150
  "text": "কΰ¦₯ΰ¦Ύ বুঝঀে ΰ¦ͺারিনি, আবার বলুনΰ₯€"
151
  })
152
- # Send 'end' so client's isProcessing resets and VAD resumes
153
  await _safe_text(ws, {"type": "end"})
154
  continue
155
 
@@ -158,9 +173,10 @@ async def ws_voice(ws: WebSocket):
158
  break
159
 
160
  # 2. AI + TTS pipeline
161
- tts_streamer = ParallelTTSStreamer()
 
162
 
163
- async def run_ai_and_tts():
164
  try:
165
  stream = await ai.main(user_id, transcript)
166
  async for token in stream:
@@ -169,34 +185,46 @@ async def ws_voice(ws: WebSocket):
169
  if not await _safe_text(ws, {"type": "llm_token", "token": token}):
170
  break
171
  await tts_streamer.add_token(token)
172
- except Exception as e:
173
- print(f"[VOICE] AI error: {e}")
174
  finally:
175
  await tts_streamer.flush()
176
 
177
- async def stream_tts_audio():
178
  async for chunk in tts_streamer.stream_audio():
179
  if not await _safe_bytes(ws, chunk):
180
  break
181
 
182
  await asyncio.gather(run_ai_and_tts(), stream_tts_audio())
 
183
 
184
- # Signal end of turn β†’ client resumes VAD
185
  await _safe_text(ws, {"type": "end"})
186
 
187
- # ── Control messages ──────────────────────────────────────────────
188
  elif "text" in data and data["text"]:
189
  try:
190
  msg = json.loads(data["text"])
191
  if msg.get("type") == "ping":
192
  await _safe_text(ws, {"type": "pong"})
 
 
 
 
 
 
 
 
 
193
  except json.JSONDecodeError:
194
  pass
195
 
196
  except WebSocketDisconnect:
197
  print("[VOICE] Client disconnected (outer)")
198
- except Exception as e:
199
- if "disconnect" not in str(e).lower():
200
- print(f"[VOICE] WS error: {e}")
201
  finally:
 
 
202
  print("[VOICE] Handler exiting cleanly.")
 
1
+ """
2
+ app.py β€” FastAPI entry point
3
+
4
+ Fixes applied
5
+ ─────────────
6
+ 1. STT is now fully async (stt.transcribe is a coroutine) β€” no more
7
+ asyncio.to_thread wrapper needed in the WS handler.
8
+ 2. BARGE-IN: when the client sends a new audio blob while TTS is still
9
+ playing, the running tts_streamer is cancelled before starting a new
10
+ turn. The client enforces isProcessing so this should be rare, but
11
+ the server now handles it gracefully.
12
+ 3. Per-session cancel token stored in `_active_streamer` so any new
13
+ utterance from the same WS cleanly aborts the previous one.
14
+ 4. All other logic (ping/pong, safe send helpers, chat WS) is unchanged.
15
+ """
16
 
17
  import asyncio
18
  import json
 
55
  return HTMLResponse("<h2>index.html not found</h2>", status_code=404)
56
 
57
 
58
+ # ── Helpers ────────────────────────────────────────────────────────────────────
59
  def _ws_open(ws: WebSocket) -> bool:
 
60
  return ws.client_state == WebSocketState.CONNECTED
61
 
62
 
 
80
  return False
81
 
82
 
83
+ # ── Text chat WebSocket ────────────────────────────────────────────────────────
84
  @app.websocket("/ws/chat")
85
  async def ws_chat(ws: WebSocket):
86
  await ws.accept()
 
105
  async for token in stream:
106
  full_response += token
107
  await _safe_text(ws, {"type": "chat", "text": full_response})
108
+ except Exception as exc:
109
+ print(f"[CHAT] AI error: {exc}")
110
+ await _safe_text(ws, {"type": "error", "text": str(exc)})
111
 
112
  await _safe_text(ws, {"type": "end"})
113
 
114
  except WebSocketDisconnect:
115
  print("[CHAT] Client disconnected")
116
+ except Exception as exc:
117
+ if "disconnect" not in str(exc).lower():
118
+ print(f"[CHAT] WS error: {exc}")
119
 
120
 
121
+ # ── Voice WebSocket ────────────────────────────────────────────────────────────
122
  @app.websocket("/ws/voice")
123
  async def ws_voice(ws: WebSocket):
124
  await ws.accept()
125
  print("[VOICE] Client connected")
126
 
127
+ stt = STTProcessor()
128
+ user_id = "voice_user"
129
+ _active_streamer: ParallelTTSStreamer | None = None # barge-in handle
130
 
131
  try:
132
  while True:
 
 
 
 
133
  if not _ws_open(ws):
134
  print("[VOICE] Connection dropped, exiting handler.")
135
  break
 
139
  except WebSocketDisconnect:
140
  print("[VOICE] Client disconnected.")
141
  break
142
+ except Exception as exc:
143
+ if "disconnect" in str(exc).lower():
 
144
  print("[VOICE] Client disconnected (recv error).")
145
  else:
146
+ print(f"[VOICE] Receive error: {exc}")
147
  break
148
 
149
+ # ── Audio blob from client VAD ──────────────────────────────────
150
  if "bytes" in data and data["bytes"]:
151
  audio_bytes = data["bytes"]
152
  print(f"[VOICE] Received utterance: {len(audio_bytes):,} bytes")
153
 
154
+ # ── Barge-in: cancel any running TTS turn ───────────────────
155
+ if _active_streamer is not None:
156
+ print("[VOICE] Barge-in β€” cancelling previous TTS.")
157
+ await _active_streamer.cancel()
158
+ _active_streamer = None
159
+
160
+ # 1. STT β€” now a native coroutine (GPU semaphore inside)
161
+ transcript = await stt.transcribe(audio_bytes)
162
 
163
  if not transcript:
164
  await _safe_text(ws, {
165
  "type": "error",
166
  "text": "কΰ¦₯ΰ¦Ύ বুঝঀে ΰ¦ͺারিনি, আবার বলুনΰ₯€"
167
  })
 
168
  await _safe_text(ws, {"type": "end"})
169
  continue
170
 
 
173
  break
174
 
175
  # 2. AI + TTS pipeline
176
+ tts_streamer = ParallelTTSStreamer()
177
+ _active_streamer = tts_streamer
178
 
179
+ async def run_ai_and_tts() -> None:
180
  try:
181
  stream = await ai.main(user_id, transcript)
182
  async for token in stream:
 
185
  if not await _safe_text(ws, {"type": "llm_token", "token": token}):
186
  break
187
  await tts_streamer.add_token(token)
188
+ except Exception as exc:
189
+ print(f"[VOICE] AI error: {exc}")
190
  finally:
191
  await tts_streamer.flush()
192
 
193
+ async def stream_tts_audio() -> None:
194
  async for chunk in tts_streamer.stream_audio():
195
  if not await _safe_bytes(ws, chunk):
196
  break
197
 
198
  await asyncio.gather(run_ai_and_tts(), stream_tts_audio())
199
+ _active_streamer = None
200
 
201
+ # Signal end-of-turn β†’ client resumes VAD
202
  await _safe_text(ws, {"type": "end"})
203
 
204
+ # ── Control messages ────────────────────────────────────────────
205
  elif "text" in data and data["text"]:
206
  try:
207
  msg = json.loads(data["text"])
208
  if msg.get("type") == "ping":
209
  await _safe_text(ws, {"type": "pong"})
210
+
211
+ # Client can send {"type":"cancel"} to abort TTS mid-turn
212
+ elif msg.get("type") == "cancel":
213
+ if _active_streamer is not None:
214
+ print("[VOICE] Client cancel signal received.")
215
+ await _active_streamer.cancel()
216
+ _active_streamer = None
217
+ await _safe_text(ws, {"type": "end"})
218
+
219
  except json.JSONDecodeError:
220
  pass
221
 
222
  except WebSocketDisconnect:
223
  print("[VOICE] Client disconnected (outer)")
224
+ except Exception as exc:
225
+ if "disconnect" not in str(exc).lower():
226
+ print(f"[VOICE] WS error: {exc}")
227
  finally:
228
+ if _active_streamer is not None:
229
+ await _active_streamer.cancel()
230
  print("[VOICE] Handler exiting cleanly.")
frontend/script.js CHANGED
@@ -1,3 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  const chatBox = document.getElementById('chat-box');
2
  const sendBtn = document.getElementById('send-btn');
3
  const textInput = document.getElementById('text-input');
@@ -20,14 +39,15 @@ let isListening = false;
20
  let isSpeaking = false;
21
  let silenceTimer = null;
22
  let vadInterval = null;
23
- let isProcessing = false; // true while server is processing an utterance
24
 
25
  let currentAIMessage = null;
 
26
  let playbackChain = Promise.resolve();
27
 
28
  // ── VAD config ────────────────────────────────────────────────────────────────
29
- const SILENCE_THRESHOLD_DB = -45; // dBFS; lower = more sensitive
30
- const SILENCE_TIMEOUT_MS = 3000; // 3 s silence β†’ send utterance
31
  const VAD_POLL_MS = 100;
32
 
33
  // ── Text chat ─────────────────────────────────────────────────────────────────
@@ -44,7 +64,6 @@ function sendTextMessage() {
44
  textInput.value = '';
45
  }
46
 
47
- // Chat WS now sends JSON: {"type":"chat","text":"..."} or {"type":"end"}
48
  chatSocket.onmessage = (e) => {
49
  let msg;
50
  try {
@@ -54,7 +73,6 @@ chatSocket.onmessage = (e) => {
54
  }
55
  if (msg.type === 'chat' && msg.text) appendMessage(msg.text, 'ai');
56
  if (msg.type === 'error') appendMessage('⚠️ ' + msg.text, 'system');
57
- // 'end' β€” nothing to do for text chat
58
  };
59
  chatSocket.onerror = (e) => console.error('Chat WS error:', e);
60
  chatSocket.onclose = () => console.log('Chat WS closed');
@@ -83,20 +101,27 @@ voiceSocket.onmessage = (event) => {
83
 
84
  switch (msg.type) {
85
  case 'stt':
86
- // Show transcribed Bangla text as user bubble
87
  appendMessage('🎀 ' + msg.text, 'user');
88
  currentAIMessage = null;
89
  break;
90
 
91
  case 'llm_token':
92
- // Stream AI tokens into growing bubble
93
- if (!currentAIMessage) currentAIMessage = appendMessage('', 'ai');
94
- currentAIMessage.textContent += msg.token;
 
 
 
 
 
95
  chatBox.scrollTop = chatBox.scrollHeight;
96
  break;
97
 
98
  case 'end':
99
- // Server finished this turn β†’ resume VAD listening
 
 
 
100
  currentAIMessage = null;
101
  isProcessing = false;
102
  if (isListening) setMicStatus('listening');
@@ -104,7 +129,6 @@ voiceSocket.onmessage = (event) => {
104
 
105
  case 'error':
106
  appendMessage('⚠️ ' + msg.text, 'system');
107
- // Still need to reset so VAD resumes
108
  isProcessing = false;
109
  if (isListening) setMicStatus('listening');
110
  break;
@@ -117,18 +141,26 @@ voiceSocket.onmessage = (event) => {
117
  }
118
  };
119
 
120
- // ── Audio playback: sequential, no overlap ────────────────────────────────────
121
  function enqueueAudio(buffer) {
122
  playbackChain = playbackChain.then(() => playBuffer(buffer));
123
  }
124
 
125
  function playBuffer(buffer) {
126
  return new Promise((resolve) => {
 
 
 
 
 
127
  const blob = new Blob([buffer], { type: 'audio/mpeg' });
128
  const url = URL.createObjectURL(blob);
129
  const audio = new Audio(url);
 
 
130
  const done = () => {
131
  URL.revokeObjectURL(url);
 
132
  resolve();
133
  };
134
  audio.onended = done;
@@ -140,6 +172,27 @@ function playBuffer(buffer) {
140
  });
141
  }
142
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
  // ── Mic button ────────────────────────────────────────────────────────────────
144
  micBtn.onclick = async () => {
145
  if (!isListening) await startListening();
@@ -172,7 +225,6 @@ async function startListening() {
172
 
173
  isListening = true;
174
  setMicStatus('listening');
175
-
176
  vadInterval = setInterval(vadTick, VAD_POLL_MS);
177
  }
178
 
@@ -184,6 +236,8 @@ function stopListening() {
184
 
185
  if (isSpeaking) stopRecorder(true); // discard in-progress utterance
186
 
 
 
187
  micStream?.getTracks().forEach((t) => t.stop());
188
  audioContext?.close();
189
  micStream = audioContext = analyser = null;
@@ -194,17 +248,24 @@ function stopListening() {
194
 
195
  // ── VAD polling ───────────────────────────────────────────────────────────────
196
  function vadTick() {
197
- if (!analyser || isProcessing) return;
198
 
199
  const data = new Float32Array(analyser.frequencyBinCount);
200
  analyser.getFloatTimeDomainData(data);
201
 
202
- // RMS β†’ dBFS
203
  const rms = Math.sqrt(data.reduce((s, v) => s + v * v, 0) / data.length);
204
  const db = rms > 0 ? 20 * Math.log10(rms) : -Infinity;
205
  const speaking = db > SILENCE_THRESHOLD_DB;
206
 
207
  if (speaking) {
 
 
 
 
 
 
 
 
208
  clearTimeout(silenceTimer);
209
  silenceTimer = null;
210
 
@@ -218,8 +279,11 @@ function vadTick() {
218
  silenceTimer = setTimeout(() => {
219
  silenceTimer = null;
220
  isSpeaking = false;
 
 
 
221
  isProcessing = true;
222
- stopRecorder(false); // send the utterance
223
  setMicStatus('processing');
224
  }, SILENCE_TIMEOUT_MS);
225
  }
@@ -236,7 +300,6 @@ function startRecorder() {
236
  : 'audio/webm';
237
 
238
  mediaRecorder = new MediaRecorder(micStream, { mimeType });
239
-
240
  mediaRecorder.ondataavailable = (e) => {
241
  if (e.data.size > 0) audioChunks.push(e.data);
242
  };
@@ -272,7 +335,7 @@ function stopRecorder(discard = false) {
272
  mediaRecorder = null;
273
  }
274
 
275
- // ── UI ────────────────────────────────────────────────────────────────────────
276
  function setMicStatus(state) {
277
  const labels = {
278
  off: '🎀 Start Voice',
@@ -287,7 +350,14 @@ function setMicStatus(state) {
287
  function appendMessage(text, sender) {
288
  const div = document.createElement('div');
289
  div.className = `message ${sender}`;
290
- div.textContent = text;
 
 
 
 
 
 
 
291
  chatBox.appendChild(div);
292
  chatBox.scrollTop = chatBox.scrollHeight;
293
  return div;
 
1
+ /* ─────────────────────────────────────────────────────────────────────────────
2
+ script.js β€” Voice + text chat client
3
+
4
+ Fixes applied
5
+ ─────────────
6
+ 1. DOUBLE-SEND BUG: silenceTimer is now explicitly cleared whenever
7
+ isProcessing is set to true, so a timer that was already ticking
8
+ can't fire a second stopRecorder() call.
9
+ 2. TTS INTERRUPT / BARGE-IN: stopAllAudio() cancels the current
10
+ HTMLAudioElement and sends {"type":"cancel"} to the server so the
11
+ TTS pipeline also aborts server-side.
12
+ 3. MARKDOWN RENDERING: AI bubble uses marked.parse() instead of
13
+ textContent so Bangla markdown (bold, lists, headings) renders
14
+ correctly in the chat.
15
+ 4. VAD barge-in path: if the user starts speaking while TTS is playing
16
+ the audio stops immediately, isProcessing resets, and the new
17
+ utterance is captured normally.
18
+ ───────────────────────────────────────────────────────────────────────────── */
19
+
20
  const chatBox = document.getElementById('chat-box');
21
  const sendBtn = document.getElementById('send-btn');
22
  const textInput = document.getElementById('text-input');
 
39
  let isSpeaking = false;
40
  let silenceTimer = null;
41
  let vadInterval = null;
42
+ let isProcessing = false; // true while server is processing / TTS playing
43
 
44
  let currentAIMessage = null;
45
+ let currentAudio = null; // the HTMLAudioElement currently playing
46
  let playbackChain = Promise.resolve();
47
 
48
  // ── VAD config ────────────────────────────────────────────────────────────────
49
+ const SILENCE_THRESHOLD_DB = -45; // dBFS
50
+ const SILENCE_TIMEOUT_MS = 3000; // ms of silence before sending utterance
51
  const VAD_POLL_MS = 100;
52
 
53
  // ── Text chat ─────────────────────────────────────────────────────────────────
 
64
  textInput.value = '';
65
  }
66
 
 
67
  chatSocket.onmessage = (e) => {
68
  let msg;
69
  try {
 
73
  }
74
  if (msg.type === 'chat' && msg.text) appendMessage(msg.text, 'ai');
75
  if (msg.type === 'error') appendMessage('⚠️ ' + msg.text, 'system');
 
76
  };
77
  chatSocket.onerror = (e) => console.error('Chat WS error:', e);
78
  chatSocket.onclose = () => console.log('Chat WS closed');
 
101
 
102
  switch (msg.type) {
103
  case 'stt':
 
104
  appendMessage('🎀 ' + msg.text, 'user');
105
  currentAIMessage = null;
106
  break;
107
 
108
  case 'llm_token':
109
+ // FIX: stream tokens into a div; final markdown render happens on 'end'
110
+ if (!currentAIMessage) {
111
+ currentAIMessage = appendMessage('', 'ai');
112
+ currentAIMessage._raw = '';
113
+ }
114
+ currentAIMessage._raw += msg.token;
115
+ // Live preview: render markdown progressively
116
+ currentAIMessage.innerHTML = marked.parse(currentAIMessage._raw);
117
  chatBox.scrollTop = chatBox.scrollHeight;
118
  break;
119
 
120
  case 'end':
121
+ // Ensure final markdown render
122
+ if (currentAIMessage && currentAIMessage._raw) {
123
+ currentAIMessage.innerHTML = marked.parse(currentAIMessage._raw);
124
+ }
125
  currentAIMessage = null;
126
  isProcessing = false;
127
  if (isListening) setMicStatus('listening');
 
129
 
130
  case 'error':
131
  appendMessage('⚠️ ' + msg.text, 'system');
 
132
  isProcessing = false;
133
  if (isListening) setMicStatus('listening');
134
  break;
 
141
  }
142
  };
143
 
144
+ // ── Audio playback ─────────────────────────────────────────────────────────────
145
  function enqueueAudio(buffer) {
146
  playbackChain = playbackChain.then(() => playBuffer(buffer));
147
  }
148
 
149
  function playBuffer(buffer) {
150
  return new Promise((resolve) => {
151
+ if (isProcessing === false) {
152
+ resolve();
153
+ return;
154
+ } // cancelled mid-chain
155
+
156
  const blob = new Blob([buffer], { type: 'audio/mpeg' });
157
  const url = URL.createObjectURL(blob);
158
  const audio = new Audio(url);
159
+ currentAudio = audio;
160
+
161
  const done = () => {
162
  URL.revokeObjectURL(url);
163
+ currentAudio = null;
164
  resolve();
165
  };
166
  audio.onended = done;
 
172
  });
173
  }
174
 
175
+ /**
176
+ * Stop all queued and current audio immediately.
177
+ * Also sends a cancel signal to the server so TTS generation stops.
178
+ */
179
+ function stopAllAudio() {
180
+ // Replace the chain with an already-resolved promise so queued buffers
181
+ // that haven't started yet are silently dropped.
182
+ playbackChain = Promise.resolve();
183
+
184
+ if (currentAudio) {
185
+ currentAudio.pause();
186
+ currentAudio.src = '';
187
+ currentAudio = null;
188
+ }
189
+
190
+ // Tell server to abort TTS pipeline
191
+ if (voiceSocket.readyState === WebSocket.OPEN) {
192
+ voiceSocket.send(JSON.stringify({ type: 'cancel' }));
193
+ }
194
+ }
195
+
196
  // ── Mic button ────────────────────────────────────────────────────────────────
197
  micBtn.onclick = async () => {
198
  if (!isListening) await startListening();
 
225
 
226
  isListening = true;
227
  setMicStatus('listening');
 
228
  vadInterval = setInterval(vadTick, VAD_POLL_MS);
229
  }
230
 
 
236
 
237
  if (isSpeaking) stopRecorder(true); // discard in-progress utterance
238
 
239
+ stopAllAudio();
240
+
241
  micStream?.getTracks().forEach((t) => t.stop());
242
  audioContext?.close();
243
  micStream = audioContext = analyser = null;
 
248
 
249
  // ── VAD polling ───────────────────────────────────────────────────────────────
250
  function vadTick() {
251
+ if (!analyser) return;
252
 
253
  const data = new Float32Array(analyser.frequencyBinCount);
254
  analyser.getFloatTimeDomainData(data);
255
 
 
256
  const rms = Math.sqrt(data.reduce((s, v) => s + v * v, 0) / data.length);
257
  const db = rms > 0 ? 20 * Math.log10(rms) : -Infinity;
258
  const speaking = db > SILENCE_THRESHOLD_DB;
259
 
260
  if (speaking) {
261
+ // FIX: barge-in β€” user started talking while TTS is playing
262
+ if (isProcessing) {
263
+ console.log('[VAD] Barge-in detected β€” stopping TTS.');
264
+ stopAllAudio();
265
+ isProcessing = false;
266
+ }
267
+
268
+ // FIX: clear any pending silence timer so it can't double-fire
269
  clearTimeout(silenceTimer);
270
  silenceTimer = null;
271
 
 
279
  silenceTimer = setTimeout(() => {
280
  silenceTimer = null;
281
  isSpeaking = false;
282
+
283
+ // FIX: set isProcessing *before* stopping the recorder so that
284
+ // if vadTick fires again during onstop it sees the flag and skips.
285
  isProcessing = true;
286
+ stopRecorder(false);
287
  setMicStatus('processing');
288
  }, SILENCE_TIMEOUT_MS);
289
  }
 
300
  : 'audio/webm';
301
 
302
  mediaRecorder = new MediaRecorder(micStream, { mimeType });
 
303
  mediaRecorder.ondataavailable = (e) => {
304
  if (e.data.size > 0) audioChunks.push(e.data);
305
  };
 
335
  mediaRecorder = null;
336
  }
337
 
338
+ // ── UI helpers ────────────────────────────────────────────────────────────────
339
  function setMicStatus(state) {
340
  const labels = {
341
  off: '🎀 Start Voice',
 
350
  function appendMessage(text, sender) {
351
  const div = document.createElement('div');
352
  div.className = `message ${sender}`;
353
+
354
+ if (sender === 'ai' && typeof marked !== 'undefined') {
355
+ // FIX: render Bangla markdown (bold, lists, headings) properly
356
+ div.innerHTML = marked.parse(text);
357
+ } else {
358
+ div.textContent = text;
359
+ }
360
+
361
  chatBox.appendChild(div);
362
  chatBox.scrollTop = chatBox.scrollHeight;
363
  return div;
services/streaming.py CHANGED
@@ -1,217 +1,98 @@
1
- # import asyncio
2
- # import edge_tts
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
 
4
- # # ── Voice ─────────────────────────────────────────────────────────────────────
5
- # VOICE = "bn-BD-NabanitaNeural"
6
-
7
- # # Flush when buffer reaches this many characters (even without punctuation)
8
- # FLUSH_LEN = 50
9
-
10
- # # Don't send a TTS request for fewer than this many characters
11
- # MIN_CHARS = 3
12
-
13
- # # Punctuation marks that trigger an immediate flush
14
- # FLUSH_TRIGGERS = frozenset(".!?ΰ₯€,;:\n—–")
15
-
16
-
17
- # class ParallelTTSStreamer:
18
- # """
19
- # Collects LLM tokens, splits them into prosodic chunks, and converts each
20
- # chunk to audio via edge-tts.
21
-
22
- # FIX: Audio chunks are now guaranteed to arrive IN ORDER by chaining each
23
- # TTS task so it only writes to the queue after the previous task finishes.
24
- # This prevents audio chunks from chunk-2 overtaking chunk-1 during playback.
25
- # """
26
-
27
- # def __init__(self, voice: str = VOICE):
28
- # self.voice = voice
29
- # self.buffer = ""
30
- # self.queue: asyncio.Queue[bytes | None] = asyncio.Queue()
31
- # # Tracks the last scheduled task so each new task waits for it first
32
- # self._prev_task: asyncio.Task | None = None
33
- # self._flush_lock = asyncio.Lock()
34
-
35
- # async def add_token(self, token: str) -> None:
36
- # """Feed a single LLM output token into the streamer."""
37
- # if not token:
38
- # return
39
-
40
- # self.buffer += token
41
-
42
- # should_flush = (
43
- # any(ch in FLUSH_TRIGGERS for ch in token)
44
- # or len(self.buffer) >= FLUSH_LEN
45
- # )
46
-
47
- # if should_flush:
48
- # await self._schedule_flush()
49
-
50
- # async def _schedule_flush(self) -> None:
51
- # """Snapshot the buffer and schedule an ordered TTS task."""
52
- # async with self._flush_lock:
53
- # text = self.buffer.strip()
54
- # self.buffer = ""
55
-
56
- # if len(text) < MIN_CHARS:
57
- # return
58
-
59
- # # Each task waits for the previous one before pushing to queue,
60
- # # guaranteeing in-order audio delivery.
61
- # prev = self._prev_task
62
- # task = asyncio.create_task(self._tts_ordered(text, prev))
63
- # self._prev_task = task
64
-
65
- # async def _tts_ordered(self, text: str, wait_for: asyncio.Task | None) -> None:
66
- # """
67
- # 1. First synthesise audio bytes (can run in parallel with other chunks).
68
- # 2. Then wait for the previous chunk to finish writing to queue.
69
- # 3. Then push our bytes to the queue in order.
70
- # """
71
- # # Step 1: synthesise concurrently (no queue writes yet)
72
- # audio_chunks: list[bytes] = []
73
- # try:
74
- # communicate = edge_tts.Communicate(text, self.voice)
75
- # async for chunk in communicate.stream():
76
- # if chunk["type"] == "audio":
77
- # audio_chunks.append(chunk["data"])
78
- # except Exception as e:
79
- # print(f"[TTS] edge-tts error for '{text[:30]}…': {e}")
80
- # # Still need to chain β€” wait for prev even on error
81
- # if wait_for and not wait_for.done():
82
- # await wait_for
83
- # return
84
-
85
- # # Step 2: wait for the previous chunk to have finished queuing
86
- # if wait_for and not wait_for.done():
87
- # try:
88
- # await wait_for
89
- # except Exception:
90
- # pass
91
-
92
- # # Step 3: push our audio bytes in order
93
- # for data in audio_chunks:
94
- # await self.queue.put(data)
95
-
96
- # async def flush(self) -> None:
97
- # """
98
- # Flush remaining buffer, wait for all in-flight TTS tasks, then
99
- # signal end-of-stream with sentinel None.
100
- # """
101
- # await self._schedule_flush()
102
-
103
- # # Wait for the last chained task (which transitively waits for all)
104
- # if self._prev_task:
105
- # try:
106
- # await self._prev_task
107
- # except Exception:
108
- # pass
109
-
110
- # await self.queue.put(None)
111
-
112
- # async def stream_audio(self):
113
- # """
114
- # Async generator that yields audio bytes in order.
115
- # Stops when the sentinel None is received.
116
- # """
117
- # while True:
118
- # chunk = await self.queue.get()
119
- # if chunk is None:
120
- # break
121
- # yield chunk
122
-
123
-
124
-
125
-
126
-
127
-
128
-
129
-
130
-
131
-
132
-
133
-
134
-
135
-
136
-
137
-
138
-
139
-
140
-
141
-
142
-
143
-
144
-
145
-
146
-
147
-
148
-
149
-
150
-
151
-
152
-
153
-
154
- import re
155
  import asyncio
 
 
156
  import edge_tts
157
 
158
- VOICE = "bn-BD-NabanitaNeural"
159
- FLUSH_LEN = 80 # chars before forced flush (longer = more natural speech)
160
- MIN_CHARS = 5 # don't TTS tiny fragments
161
  FLUSH_TRIGGERS = frozenset(".!?ΰ₯€,;:\n—–")
162
 
163
 
 
164
  def _clean_for_tts(text: str) -> str:
165
- """
166
- Strip markdown and other non-speech symbols before sending to edge-tts.
167
-
168
- The LLM outputs markdown (**, *, #, -, numbers+dot lists) which edge-tts
169
- either reads aloud awkwardly ("asterisk asterisk") or returns 'No audio
170
- was received' on punctuation-only chunks like '**' or '-)'.
171
- """
172
- # Remove bold/italic markers
173
- text = re.sub(r'\*{1,3}', '', text)
174
- # Remove heading markers
175
- text = re.sub(r'#+\s*', '', text)
176
- # Remove list markers like "ΰ§§.", "1.", "-", "β€’"
177
- text = re.sub(r'^\s*[-β€’]\s*', '', text, flags=re.MULTILINE)
178
- text = re.sub(r'^\s*[\d০-৯]+[.)]\s*', '', text, flags=re.MULTILINE)
179
- # Remove leftover backticks
180
- text = re.sub(r'`+', '', text)
181
- # Collapse extra whitespace / blank lines
182
- text = re.sub(r'\n{2,}', '\n', text)
183
- text = text.strip()
184
- return text
185
 
186
 
 
187
  class ParallelTTSStreamer:
188
  """
189
- Collects LLM tokens β†’ splits into prosodic chunks β†’ converts to audio
190
- via edge-tts in parallel β†’ streams audio bytes IN ORDER.
191
 
192
- Audio ordering is guaranteed by a task chain: each chunk task synthesises
193
- audio freely (parallel) but only writes to the queue after the previous
194
- chunk finishes, so the client always hears chunk-1 before chunk-2.
 
 
 
 
 
 
 
 
 
 
 
195
  """
196
 
197
- def __init__(self, voice: str = VOICE):
198
- self.voice = voice
199
- self.buffer = ""
200
  self.queue: asyncio.Queue[bytes | None] = asyncio.Queue()
201
- self._prev_task: asyncio.Task | None = None
202
  self._flush_lock = asyncio.Lock()
 
 
203
 
 
204
  async def add_token(self, token: str) -> None:
205
- if not token:
206
  return
207
- self.buffer += token
208
- if any(ch in FLUSH_TRIGGERS for ch in token) or len(self.buffer) >= FLUSH_LEN:
 
 
 
 
 
 
 
 
209
  await self._schedule_flush()
210
 
 
211
  async def _schedule_flush(self) -> None:
 
 
 
212
  async with self._flush_lock:
213
- raw = self.buffer.strip()
214
- self.buffer = ""
215
 
216
  text = _clean_for_tts(raw)
217
  if len(text) < MIN_CHARS:
@@ -220,41 +101,70 @@ class ParallelTTSStreamer:
220
  prev = self._prev_task
221
  task = asyncio.create_task(self._tts_ordered(text, prev))
222
  self._prev_task = task
 
 
223
 
 
224
  async def _tts_ordered(self, text: str, wait_for: asyncio.Task | None) -> None:
225
- """Synthesise audio (parallel), then write to queue in order."""
226
- # Step 1 β€” synthesise concurrently
227
  audio_chunks: list[bytes] = []
228
- try:
229
- communicate = edge_tts.Communicate(text, self.voice)
230
- async for chunk in communicate.stream():
231
- if chunk["type"] == "audio":
232
- audio_chunks.append(chunk["data"])
233
- except Exception as e:
234
- print(f"[TTS] edge-tts error for '{text[:40]}': {e}")
235
-
236
- # Step 2 β€” wait for previous chunk to finish queuing
 
 
 
237
  if wait_for and not wait_for.done():
238
  try:
239
  await wait_for
240
  except Exception:
241
  pass
242
 
243
- # Step 3 β€” write to queue in order
244
- for data in audio_chunks:
245
- await self.queue.put(data)
 
246
 
 
247
  async def flush(self) -> None:
248
- """Flush remaining buffer, await all tasks, send end sentinel."""
249
  await self._schedule_flush()
250
  if self._prev_task:
251
  try:
252
  await self._prev_task
253
  except Exception:
254
  pass
255
- await self.queue.put(None)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
 
 
257
  async def stream_audio(self):
 
258
  while True:
259
  chunk = await self.queue.get()
260
  if chunk is None:
 
1
+ """
2
+ services/streaming.py β€” Parallel + ordered TTS streamer
3
+
4
+ Fixes applied
5
+ ─────────────
6
+ 1. BUFFER RACE β€” self.buffer is now only mutated while holding
7
+ self._flush_lock, so add_token() and _schedule_flush() can never
8
+ interleave partial writes.
9
+ 2. CANCELLATION β€” ParallelTTSStreamer.cancel() drops all pending tasks
10
+ and poisons the queue with a sentinel so stream_audio() exits
11
+ immediately. app.py calls cancel() when the user starts speaking
12
+ mid-playback, giving true barge-in / interrupt behaviour.
13
+ 3. Markdown stripping (_clean_for_tts) is unchanged.
14
+ 4. Audio ordering guarantee is unchanged (task-chain pattern).
15
+ """
16
+
17
+ from __future__ import annotations
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  import asyncio
20
+ import re
21
+
22
  import edge_tts
23
 
24
+ VOICE = "bn-BD-NabanitaNeural"
25
+ FLUSH_LEN = 80 # chars before forced flush
26
+ MIN_CHARS = 5 # skip tiny fragments
27
  FLUSH_TRIGGERS = frozenset(".!?ΰ₯€,;:\n—–")
28
 
29
 
30
+ # ── Markdown β†’ plain text ──────────────────────────────────────────────────────
31
  def _clean_for_tts(text: str) -> str:
32
+ text = re.sub(r"\*{1,3}", "", text)
33
+ text = re.sub(r"#+\s*", "", text)
34
+ text = re.sub(r"^\s*[-β€’]\s*", "", text, flags=re.MULTILINE)
35
+ text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE)
36
+ text = re.sub(r"`+", "", text)
37
+ text = re.sub(r"\n{2,}", "\n", text)
38
+ return text.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
 
41
+ # ── Streamer ───────────────────────────────────────────────────────────────────
42
  class ParallelTTSStreamer:
43
  """
44
+ Collects LLM tokens β†’ prosodic chunks β†’ parallel edge-tts calls β†’
45
+ serialised audio queue.
46
 
47
+ Usage
48
+ ─────
49
+ streamer = ParallelTTSStreamer()
50
+
51
+ # producer
52
+ await streamer.add_token(token)
53
+ await streamer.flush() # call once when LLM finishes
54
+
55
+ # consumer (run concurrently with producer)
56
+ async for chunk in streamer.stream_audio():
57
+ await ws.send_bytes(chunk)
58
+
59
+ # interrupt (call from any coroutine)
60
+ await streamer.cancel()
61
  """
62
 
63
+ def __init__(self, voice: str = VOICE) -> None:
64
+ self.voice = voice
65
+ self.buffer = ""
66
  self.queue: asyncio.Queue[bytes | None] = asyncio.Queue()
67
+ self._prev_task: asyncio.Task | None = None
68
  self._flush_lock = asyncio.Lock()
69
+ self._cancelled = False
70
+ self._tasks: list[asyncio.Task] = [] # track all live tasks
71
 
72
+ # ── Token intake ───────────────────────────────────────────────────────────
73
  async def add_token(self, token: str) -> None:
74
+ if not token or self._cancelled:
75
  return
76
+
77
+ # FIX: hold the lock for the buffer write too, not just the flush
78
+ async with self._flush_lock:
79
+ self.buffer += token
80
+ should_flush = (
81
+ any(ch in FLUSH_TRIGGERS for ch in token)
82
+ or len(self.buffer) >= FLUSH_LEN
83
+ )
84
+
85
+ if should_flush:
86
  await self._schedule_flush()
87
 
88
+ # ── Flush scheduling ───────────────────────────────────────────────────────
89
  async def _schedule_flush(self) -> None:
90
+ if self._cancelled:
91
+ return
92
+
93
  async with self._flush_lock:
94
+ raw = self.buffer.strip()
95
+ self.buffer = ""
96
 
97
  text = _clean_for_tts(raw)
98
  if len(text) < MIN_CHARS:
 
101
  prev = self._prev_task
102
  task = asyncio.create_task(self._tts_ordered(text, prev))
103
  self._prev_task = task
104
+ self._tasks.append(task)
105
+ task.add_done_callback(lambda t: self._tasks.remove(t) if t in self._tasks else None)
106
 
107
+ # ── Ordered TTS task ───────────────────────────────────────────────────────
108
  async def _tts_ordered(self, text: str, wait_for: asyncio.Task | None) -> None:
109
+ # Step 1 β€” synthesise (may run in parallel with other chunks)
 
110
  audio_chunks: list[bytes] = []
111
+ if not self._cancelled:
112
+ try:
113
+ communicate = edge_tts.Communicate(text, self.voice)
114
+ async for chunk in communicate.stream():
115
+ if self._cancelled:
116
+ break
117
+ if chunk["type"] == "audio":
118
+ audio_chunks.append(chunk["data"])
119
+ except Exception as exc:
120
+ print(f"[TTS] edge-tts error for '{text[:40]}': {exc}")
121
+
122
+ # Step 2 β€” wait for predecessor to finish queuing (preserves order)
123
  if wait_for and not wait_for.done():
124
  try:
125
  await wait_for
126
  except Exception:
127
  pass
128
 
129
+ # Step 3 β€” write to queue (skipped if cancelled)
130
+ if not self._cancelled:
131
+ for data in audio_chunks:
132
+ await self.queue.put(data)
133
 
134
+ # ── Flush remaining buffer ─────────────────────────────────────────────────
135
  async def flush(self) -> None:
136
+ """Call once after the LLM stream ends."""
137
  await self._schedule_flush()
138
  if self._prev_task:
139
  try:
140
  await self._prev_task
141
  except Exception:
142
  pass
143
+ await self.queue.put(None) # end-of-stream sentinel
144
+
145
+ # ── Interrupt / barge-in ───────────────────────────────────────────────────
146
+ async def cancel(self) -> None:
147
+ """
148
+ Immediately abort all in-flight TTS tasks and unblock stream_audio().
149
+ Safe to call from any coroutine while stream_audio() is running.
150
+ """
151
+ self._cancelled = True
152
+
153
+ # Cancel all pending asyncio tasks
154
+ for task in list(self._tasks):
155
+ task.cancel()
156
+
157
+ # Drain and poison the queue so stream_audio() exits
158
+ while not self.queue.empty():
159
+ try:
160
+ self.queue.get_nowait()
161
+ except asyncio.QueueEmpty:
162
+ break
163
+ await self.queue.put(None) # sentinel β†’ stream_audio exits
164
 
165
+ # ── Audio consumer ─────────────────────────────────────────────────────────
166
  async def stream_audio(self):
167
+ """Async generator β€” yields ordered audio bytes until cancelled/done."""
168
  while True:
169
  chunk = await self.queue.get()
170
  if chunk is None:
services/stt.py CHANGED
@@ -1,58 +1,104 @@
1
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  import os
3
  import re
4
  import subprocess
5
  import tempfile
 
6
 
7
  from faster_whisper import WhisperModel
8
 
9
-
10
- model = WhisperModel("large-v3", device="cuda", compute_type="int8_float32")
11
-
12
-
13
- BANGLA_PATTERN = re.compile(r'[\u0980-\u09FF]')
14
-
15
- # Scripts we consider "wrong" β€” Arabic, Urdu, Devanagari (when expecting Bangla)
16
  WRONG_SCRIPT_PATTERN = re.compile(
17
- r'[\u0600-\u06FF' # Arabic / Urdu
18
- r'\u0750-\u077F' # Arabic Supplement
19
- r'\uFB50-\uFDFF' # Arabic Presentation Forms
20
- r'\uFE70-\uFEFF]' # Arabic Presentation Forms-B
21
  )
22
 
 
 
 
 
 
23
 
24
- def _is_valid_bangla(text: str) -> bool:
25
- """
26
- Return True if the transcript looks like real Bangla.
27
 
28
- A valid transcript must:
29
- 1. Contain at least one Bangla Unicode character, OR be very short
30
- (some valid responses are single digits/punctuation).
31
- 2. NOT be dominated by Arabic/Urdu script (Whisper wrong-script error).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  """
33
- bangla_chars = len(BANGLA_PATTERN.findall(text))
34
- wrong_chars = len(WRONG_SCRIPT_PATTERN.findall(text))
35
- total_alpha = sum(1 for c in text if c.isalpha())
 
 
 
 
 
 
 
 
36
 
37
  if total_alpha == 0:
38
- return True # digits/punctuation only β€” let it through
39
 
40
- # If more than 30% of alphabetic chars are Arabic/Urdu script, reject
41
- if total_alpha > 0 and (wrong_chars / total_alpha) > 0.30:
42
  return False
43
 
44
- # Must have at least some Bangla characters for long responses
45
- if total_alpha > 5 and bangla_chars == 0:
46
  return False
47
 
48
  return True
49
 
50
 
 
51
  class STTProcessor:
52
  MIN_INPUT_BYTES = 3_000
53
 
54
- def _to_wav(self, audio_bytes: bytes) -> str | None:
55
- """Convert browser WebM/opus to 16 kHz mono WAV with loudness normalization."""
 
 
 
 
 
56
  in_path = out_path = None
57
  try:
58
  with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as f:
@@ -61,13 +107,17 @@ class STTProcessor:
61
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
62
  out_path = f.name
63
 
64
- result = subprocess.run([
65
- "ffmpeg", "-y", "-loglevel", "warning",
66
- "-i", in_path,
67
- "-ar", "16000", "-ac", "1",
68
- "-af", "loudnorm",
69
- "-f", "wav", out_path,
70
- ], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
 
 
 
 
71
 
72
  if result.returncode != 0:
73
  print("[STT] ffmpeg error:", result.stderr.decode(errors="replace").strip())
@@ -78,71 +128,90 @@ class STTProcessor:
78
 
79
  print(f"[STT] WAV ready: {os.path.getsize(out_path):,} bytes")
80
  return out_path
81
- except Exception as e:
82
- print(f"[STT] _to_wav: {e}")
 
83
  return None
84
  finally:
85
  if in_path and os.path.exists(in_path):
86
- try: os.remove(in_path)
87
- except OSError: pass
88
-
89
- def transcribe(self, audio_bytes: bytes) -> str | None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  if len(audio_bytes) < self.MIN_INPUT_BYTES:
91
  print(f"[STT] Too short ({len(audio_bytes)} B), skipping.")
92
  return None
93
 
94
- wav_path = None
95
- try:
96
- wav_path = self._to_wav(audio_bytes)
97
- if not wav_path:
98
- return None
99
-
100
-
101
- # segments, info = model.transcribe(wav_path, language="bn", task="translate", beam_size=5)
102
-
103
- segments, info = model.transcribe(
104
- wav_path,
105
- language="bn",
106
- beam_size=5,
107
- vad_filter=False, # loudnorm handles quiet audio
108
- condition_on_previous_text=False,
109
- temperature=0,
110
- suppress_tokens=[-1],
111
- no_speech_threshold=0.5,
112
- log_prob_threshold=-1.0,
113
- # task="translate"
114
- # NO initial_prompt β€” causes hallucination loops on base model
115
- )
116
-
117
- text = " ".join(seg.text.strip() for seg in segments).strip()
118
- print(f"[STT] Lang={info.language} prob={info.language_probability:.2f}")
119
-
120
- if not text:
121
- print("[STT] Empty transcript.")
122
- return None
123
-
124
- # ── Hallucination guard: repeated words ───────────────────────────
125
- words = text.split()
126
- if len(words) > 5 and (len(set(words)) / len(words)) < 0.25:
127
- print(f"[STT] Hallucination (repetition) discarded: {text[:60]}")
128
- return None
129
-
130
- # ── Script validation: must be Bangla Unicode ─────────────────────
131
- if not _is_valid_bangla(text):
132
- print(f"[STT] Wrong script (Arabic/Urdu output from base model) "
133
- f"discarded: {text[:60]}")
134
- print("[STT] ⚠ If this keeps happening, ensure you're using "
135
- "model='small' not 'base'.")
136
- return None
137
-
138
- print(f"[STT] Transcript: {text}")
139
- return text
140
 
141
- except Exception as e:
142
- print(f"[STT] transcribe: {e}")
 
 
 
 
143
  import traceback; traceback.print_exc()
144
  return None
145
  finally:
146
- if wav_path and os.path.exists(wav_path):
147
- try: os.remove(wav_path)
148
- except OSError: pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ services/stt.py β€” GPU-safe Faster-Whisper STT processor
3
+
4
+ Fixes applied
5
+ ─────────────
6
+ 1. LAZY model initialisation β€” WhisperModel is loaded once on first use,
7
+ not at import time, so FastAPI starts instantly.
8
+ 2. CUDA semaphore (max 1) β€” only one transcription runs on the GPU at a
9
+ time. Concurrent requests queue here instead of racing on the CUDA
10
+ context, which caused OOM and silent hangs on RTX 3060 (12 GB).
11
+ 3. ffmpeg runs in the same thread as the model call (both inside
12
+ asyncio.to_thread), keeping the async event-loop completely free.
13
+ 4. Hallucination guards and Bangla script validation are unchanged.
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import asyncio
19
  import os
20
  import re
21
  import subprocess
22
  import tempfile
23
+ from threading import Lock
24
 
25
  from faster_whisper import WhisperModel
26
 
27
+ # ── Bangla / wrong-script patterns ────────────────────────────────────────────
28
+ BANGLA_PATTERN = re.compile(r"[\u0980-\u09FF]")
 
 
 
 
 
29
  WRONG_SCRIPT_PATTERN = re.compile(
30
+ r"[\u0600-\u06FF" # Arabic / Urdu
31
+ r"\u0750-\u077F" # Arabic Supplement
32
+ r"\uFB50-\uFDFF" # Arabic Presentation Forms
33
+ r"\uFE70-\uFEFF]" # Arabic Presentation Forms-B
34
  )
35
 
36
+ # ── Lazy singleton ─────────────────────────────────────────────────────────────
37
+ _model: WhisperModel | None = None
38
+ _model_lock = Lock() # protects the one-time initialisation
39
+ # Semaphore lives in the event-loop thread; created on first async use.
40
+ _gpu_semaphore: asyncio.Semaphore | None = None
41
 
 
 
 
42
 
43
+ def _get_model() -> WhisperModel:
44
+ """
45
+ Load WhisperModel on first call, return the cached instance thereafter.
46
+ Thread-safe via a threading.Lock (called from worker threads).
47
+ """
48
+ global _model
49
+ if _model is None:
50
+ with _model_lock:
51
+ if _model is None: # double-checked locking
52
+ print("[STT] Loading Faster-Whisper large-v3 on CUDA …")
53
+ _model = WhisperModel(
54
+ "large-v3",
55
+ device="cuda",
56
+ compute_type="int8_float32",
57
+ )
58
+ print("[STT] Model ready.")
59
+ return _model
60
+
61
+
62
+ def _get_semaphore() -> asyncio.Semaphore:
63
+ """
64
+ Return (or create) a per-event-loop asyncio.Semaphore(1).
65
+ Must be called from the async context (event-loop thread).
66
  """
67
+ global _gpu_semaphore
68
+ if _gpu_semaphore is None:
69
+ _gpu_semaphore = asyncio.Semaphore(1)
70
+ return _gpu_semaphore
71
+
72
+
73
+ # ── Script validation ──────────────────────────────────────────────────────────
74
+ def _is_valid_bangla(text: str) -> bool:
75
+ bangla_chars = len(BANGLA_PATTERN.findall(text))
76
+ wrong_chars = len(WRONG_SCRIPT_PATTERN.findall(text))
77
+ total_alpha = sum(1 for c in text if c.isalpha())
78
 
79
  if total_alpha == 0:
80
+ return True # digits / punctuation β€” allow
81
 
82
+ if (wrong_chars / total_alpha) > 0.30: # >30 % Arabic/Urdu β†’ reject
 
83
  return False
84
 
85
+ if total_alpha > 5 and bangla_chars == 0: # long but zero Bangla β†’ reject
 
86
  return False
87
 
88
  return True
89
 
90
 
91
+ # ── Core processor ─────────────────────────────────────────────────────────────
92
  class STTProcessor:
93
  MIN_INPUT_BYTES = 3_000
94
 
95
+ # ── ffmpeg helper ──────────────────────────────────────────────────────────
96
+ @staticmethod
97
+ def _to_wav(audio_bytes: bytes) -> str | None:
98
+ """
99
+ Convert browser WebM/Opus blob β†’ 16 kHz mono WAV with loudnorm.
100
+ Runs in a worker thread (called via asyncio.to_thread).
101
+ """
102
  in_path = out_path = None
103
  try:
104
  with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as f:
 
107
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
108
  out_path = f.name
109
 
110
+ result = subprocess.run(
111
+ [
112
+ "ffmpeg", "-y", "-loglevel", "warning",
113
+ "-i", in_path,
114
+ "-ar", "16000", "-ac", "1",
115
+ "-af", "loudnorm",
116
+ "-f", "wav", out_path,
117
+ ],
118
+ stdout=subprocess.DEVNULL,
119
+ stderr=subprocess.PIPE,
120
+ )
121
 
122
  if result.returncode != 0:
123
  print("[STT] ffmpeg error:", result.stderr.decode(errors="replace").strip())
 
128
 
129
  print(f"[STT] WAV ready: {os.path.getsize(out_path):,} bytes")
130
  return out_path
131
+
132
+ except Exception as exc:
133
+ print(f"[STT] _to_wav: {exc}")
134
  return None
135
  finally:
136
  if in_path and os.path.exists(in_path):
137
+ try:
138
+ os.remove(in_path)
139
+ except OSError:
140
+ pass
141
+
142
+ # ── Synchronous transcription (runs in worker thread) ─────────────────────
143
+ @staticmethod
144
+ def _transcribe_sync(wav_path: str) -> str | None:
145
+ """
146
+ Whisper inference. Called inside asyncio.to_thread so it never
147
+ blocks the event loop. The GPU semaphore is acquired *before*
148
+ this function is dispatched, so only one call executes at a time.
149
+ """
150
+ model = _get_model()
151
+
152
+ segments, info = model.transcribe(
153
+ wav_path,
154
+ language="bn",
155
+ beam_size=5,
156
+ vad_filter=False,
157
+ condition_on_previous_text=False,
158
+ temperature=0,
159
+ suppress_tokens=[-1],
160
+ no_speech_threshold=0.5,
161
+ log_prob_threshold=-1.0,
162
+ )
163
+
164
+ text = " ".join(seg.text.strip() for seg in segments).strip()
165
+ print(f"[STT] Lang={info.language} prob={info.language_probability:.2f}")
166
+ return text
167
+
168
+ # ── Public async entry-point ───────────────────────────────────────────────
169
+ async def transcribe(self, audio_bytes: bytes) -> str | None:
170
+ """
171
+ Full pipeline: validate β†’ ffmpeg β†’ GPU inference.
172
+
173
+ Awaitable from the async WS handler. GPU access is serialised
174
+ via an asyncio.Semaphore so concurrent sessions queue here
175
+ instead of crashing the CUDA context.
176
+ """
177
  if len(audio_bytes) < self.MIN_INPUT_BYTES:
178
  print(f"[STT] Too short ({len(audio_bytes)} B), skipping.")
179
  return None
180
 
181
+ # ffmpeg conversion (CPU-bound, off event loop)
182
+ wav_path = await asyncio.to_thread(self._to_wav, audio_bytes)
183
+ if not wav_path:
184
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
+ sem = _get_semaphore()
187
+ try:
188
+ async with sem: # serialise GPU access
189
+ text = await asyncio.to_thread(self._transcribe_sync, wav_path)
190
+ except Exception as exc:
191
+ print(f"[STT] transcribe error: {exc}")
192
  import traceback; traceback.print_exc()
193
  return None
194
  finally:
195
+ if os.path.exists(wav_path):
196
+ try:
197
+ os.remove(wav_path)
198
+ except OSError:
199
+ pass
200
+
201
+ if not text:
202
+ print("[STT] Empty transcript.")
203
+ return None
204
+
205
+ # ── Hallucination guard ────────────────────────────────────────────────
206
+ words = text.split()
207
+ if len(words) > 5 and (len(set(words)) / len(words)) < 0.25:
208
+ print(f"[STT] Hallucination (repetition) discarded: {text[:60]}")
209
+ return None
210
+
211
+ # ── Script validation ──────────────────────────────────────────────────
212
+ if not _is_valid_bangla(text):
213
+ print(f"[STT] Wrong script discarded: {text[:60]}")
214
+ return None
215
+
216
+ print(f"[STT] Transcript: {text}")
217
+ return text