rakib72642 commited on
Commit
5dabf9d
·
1 Parent(s): 1ddf755

added communication full layer

Browse files
Files changed (8) hide show
  1. app.py +106 -65
  2. core/backend.py +291 -294
  3. frontend/index.html +8 -52
  4. frontend/style.css +107 -265
  5. services/streaming.py +102 -36
  6. services/stt.py +76 -37
  7. services/tts.py +59 -14
  8. services/webrtc_pipeline.py +76 -58
app.py CHANGED
@@ -1,31 +1,28 @@
1
  """
2
  app.py — FastAPI entrypoint: WebRTC-first + WebSocket fallback
3
 
4
- Pipeline overview:
5
- ──────────────────
6
- Browser Server
7
- ──────────────────────────────────────────────────────
8
- getUserMedia() WebRTC aiortc peer connection
9
- PCM audio frames ────► VAD segmenter
10
- ↓ utterances
11
- STT GPU-batch queue
12
- ↓ transcripts (parallel)
13
- LLM async stream ──┐
14
- tokens │ concurrent
15
- TTS streamer ◄──────┘
16
- audio chunks
17
- ◄────────────────────────── RTCDataChannel
18
-
19
- WebSocket mode (fallback):
20
- Still available at /ws/voice and /ws/chat for environments
21
- where WebRTC is blocked (corporate proxies, etc.).
22
- Uses the same STT batch queue and parallel TTS streamer.
23
-
24
- Performance targets:
25
- STT: < 200ms (GPU-batched, ffmpeg parallel)
26
- First LLM tok: < 100ms (streaming, no full-sentence wait)
27
- TTS start: < 150ms (sentence-level streaming, parallel synthesis)
28
- Total TTFA*: < 450ms (*Time-To-First-Audio)
29
  """
30
 
31
  import asyncio
@@ -43,7 +40,7 @@ from core.backend import AIBackend
43
  from services.stt import STTProcessor
44
  from services.streaming import ParallelTTSStreamer
45
 
46
- # ── WebRTC (optional — degrades gracefully if aiortc not installed) ────────────
47
  try:
48
  from services.webrtc_pipeline import WebRTCSession
49
  WEBRTC_AVAILABLE = True
@@ -55,8 +52,8 @@ except (ImportError, RuntimeError) as _e:
55
  # ══════════════════════════════════════════════════════════════════════════════
56
  # MODEL ROUTING CONFIG — set exactly ONE to True
57
  # ══════════════════════════════════════════════════════════════════════════════
58
- USE_GEMINI = True
59
- USE_OLLAMA = False
60
  USE_LOCAL_FALLBACK = False
61
 
62
  _active = sum([USE_GEMINI, USE_OLLAMA, USE_LOCAL_FALLBACK])
@@ -72,7 +69,6 @@ ai = AIBackend(
72
  use_fallback=USE_LOCAL_FALLBACK,
73
  )
74
 
75
- # Active WebRTC sessions — keyed by session_id
76
  _rtc_sessions: dict[str, "WebRTCSession"] = {}
77
 
78
 
@@ -85,18 +81,15 @@ async def lifespan(app: FastAPI):
85
  await ai.async_setup()
86
  print("[APP] AI backend ready ✓")
87
  yield
88
- # Clean up WebRTC sessions
89
  for session in list(_rtc_sessions.values()):
90
  await session.close()
91
  _rtc_sessions.clear()
92
- # Clean up DB connections
93
- for attr in ("conn", "_meta_conn"):
94
- conn = getattr(ai, attr, None)
95
- if conn:
96
- try:
97
- await conn.close()
98
- except Exception:
99
- pass
100
 
101
 
102
  app = FastAPI(lifespan=lifespan)
@@ -114,33 +107,33 @@ async def root():
114
  return HTMLResponse("<h2>index.html not found</h2>", status_code=404)
115
 
116
 
 
 
 
 
 
 
 
 
 
 
 
117
  # ══════════════════════════════════════════════════════════════════════════════
118
  # WEBRTC SIGNALING ENDPOINTS
119
  # ══════════════════════════════════════════════════════════════════════════════
120
 
121
  @app.post("/rtc/offer")
122
  async def rtc_offer(request: Request):
123
- """
124
- WebRTC signaling: browser sends SDP offer, server returns SDP answer.
125
-
126
- Request JSON:
127
- { "sdp": "...", "type": "offer", "session_id": "optional_existing_id" }
128
-
129
- Response JSON:
130
- { "sdp": "...", "type": "answer", "session_id": "..." }
131
- """
132
  if not WEBRTC_AVAILABLE:
133
  return JSONResponse(
134
  {"error": "WebRTC unavailable. Use WebSocket fallback at /ws/voice"},
135
  status_code=503,
136
  )
137
-
138
  body = await request.json()
139
  sdp = body.get("sdp", "")
140
  sdp_type = body.get("type", "offer")
141
  session_id = body.get("session_id") or uuid.uuid4().hex
142
 
143
- # Reuse or create session
144
  session = _rtc_sessions.get(session_id)
145
  if session is None:
146
  session = WebRTCSession(ai_backend=ai)
@@ -153,25 +146,20 @@ async def rtc_offer(request: Request):
153
 
154
  @app.post("/rtc/ice")
155
  async def rtc_ice(request: Request):
156
- """Forward browser ICE candidate to the session."""
157
  if not WEBRTC_AVAILABLE:
158
  return JSONResponse({"error": "WebRTC unavailable"}, status_code=503)
159
-
160
  body = await request.json()
161
  session_id = body.get("session_id", "")
162
  candidate = body.get("candidate", {})
163
-
164
- session = _rtc_sessions.get(session_id)
165
  if session is None:
166
  return JSONResponse({"error": "Session not found"}, status_code=404)
167
-
168
  await session.add_ice_candidate(candidate)
169
  return JSONResponse({"ok": True})
170
 
171
 
172
  @app.delete("/rtc/session/{session_id}")
173
  async def rtc_close(session_id: str):
174
- """Explicitly close a WebRTC session."""
175
  session = _rtc_sessions.pop(session_id, None)
176
  if session:
177
  await session.close()
@@ -214,6 +202,10 @@ async def _safe_bytes(ws: WebSocket, data: bytes) -> bool:
214
  async def ws_chat(ws: WebSocket):
215
  await ws.accept()
216
  print("[CHAT] Client connected ✓")
 
 
 
 
217
  try:
218
  while True:
219
  raw = await ws.receive_text()
@@ -223,7 +215,25 @@ async def ws_chat(ws: WebSocket):
223
  await _safe_text(ws, {"type": "error", "text": "Invalid JSON"})
224
  continue
225
 
226
- user_id = data.get("user_id", "default_user")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
227
  user_query = data.get("user_query", "").strip()
228
  if not user_query:
229
  continue
@@ -249,15 +259,43 @@ async def ws_chat(ws: WebSocket):
249
 
250
 
251
  # ══════════════════════════════════════════════════════════════════════════════
252
- # WEBSOCKET — VOICE (fallback: full STT→LLM→TTS pipeline over WS)
253
  # ══════════════════════════════════════════════════════════════════════════════
254
 
 
 
 
 
255
  @app.websocket("/ws/voice")
256
  async def ws_voice(ws: WebSocket):
257
  await ws.accept()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
258
 
259
- user_id = f"voice_{uuid.uuid4().hex[:12]}"
260
- print(f"[VOICE] Client connected — user_id={user_id}")
261
 
262
  stt = STTProcessor()
263
  _active_streamer: ParallelTTSStreamer | None = None
@@ -279,7 +317,7 @@ async def ws_voice(ws: WebSocket):
279
  async def _handle_utterance(audio_bytes: bytes):
280
  nonlocal _active_streamer
281
 
282
- # ── STT (GPU-batched) ──────────────────────────────────────────────────
283
  transcript = await stt.transcribe(audio_bytes)
284
  if not transcript:
285
  await _safe_text(ws, {"type": "error", "text": "কথা বুঝতে পারিনি, আবার বলুন।"})
@@ -315,7 +353,6 @@ async def ws_voice(ws: WebSocket):
315
  if not await _safe_bytes(ws, chunk):
316
  break
317
 
318
- # LLM and TTS delivery run SIMULTANEOUSLY
319
  await asyncio.gather(run_llm(), run_tts(), return_exceptions=True)
320
  _active_streamer = None
321
  await _safe_text(ws, {"type": "end"})
@@ -344,12 +381,16 @@ async def ws_voice(ws: WebSocket):
344
  elif "text" in data and data["text"]:
345
  try:
346
  msg = json.loads(data["text"])
347
- if msg.get("type") == "init" and msg.get("user_id"):
348
- user_id = str(msg["user_id"])[:64]
 
 
 
 
349
  await _safe_text(ws, {"type": "init_ack", "user_id": user_id})
350
- elif msg.get("type") == "ping":
351
  await _safe_text(ws, {"type": "pong"})
352
- elif msg.get("type") == "cancel":
353
  await _cancel_active()
354
  await _safe_text(ws, {"type": "end"})
355
  except json.JSONDecodeError:
 
1
  """
2
  app.py — FastAPI entrypoint: WebRTC-first + WebSocket fallback
3
 
4
+ FIXES APPLIED:
5
+ FIX-SESSION (Issue 1): The voice WS handler now reads user_id from the
6
+ first 'init' JSON message before processing any audio. The variable is
7
+ no longer a random UUID per connection — it is the stable USER_ID sent
8
+ by the browser from localStorage. This means every reconnect, even after
9
+ a page reload, hits the same LangGraph thread and restores conversation
10
+ history.
11
+
12
+ Implementation:
13
+ user_id is initialised to None inside ws_voice.
14
+ The handler waits for any early text messages before processing binary.
15
+ On 'init' message, user_id is set and init_ack returned.
16
+ All subsequent audio/LLM calls use that stable user_id.
17
+ If no 'init' is received within 3 s, a random fallback is used
18
+ (prevents hang for non-browser clients).
19
+
20
+ FIX-CHAT-INIT (Issue 1): ws_chat also reads the 'init' message so chat
21
+ sessions share the same backend thread as voice sessions for the same
22
+ user.
23
+
24
+ All performance optimisations (parallel TTS, GPU-batched STT, concurrent
25
+ LLM+TTS) preserved.
 
 
 
26
  """
27
 
28
  import asyncio
 
40
  from services.stt import STTProcessor
41
  from services.streaming import ParallelTTSStreamer
42
 
43
+ # ── WebRTC (optional) ─────────────────────────────────────────────────────────
44
  try:
45
  from services.webrtc_pipeline import WebRTCSession
46
  WEBRTC_AVAILABLE = True
 
52
  # ══════════════════════════════════════════════════════════════════════════════
53
  # MODEL ROUTING CONFIG — set exactly ONE to True
54
  # ══════════════════════════════════════════════════════════════════════════════
55
+ USE_GEMINI = False
56
+ USE_OLLAMA = True
57
  USE_LOCAL_FALLBACK = False
58
 
59
  _active = sum([USE_GEMINI, USE_OLLAMA, USE_LOCAL_FALLBACK])
 
69
  use_fallback=USE_LOCAL_FALLBACK,
70
  )
71
 
 
72
  _rtc_sessions: dict[str, "WebRTCSession"] = {}
73
 
74
 
 
81
  await ai.async_setup()
82
  print("[APP] AI backend ready ✓")
83
  yield
 
84
  for session in list(_rtc_sessions.values()):
85
  await session.close()
86
  _rtc_sessions.clear()
87
+ conn = getattr(ai, "conn", None)
88
+ if conn:
89
+ try:
90
+ await conn.close()
91
+ except Exception:
92
+ pass
 
 
93
 
94
 
95
  app = FastAPI(lifespan=lifespan)
 
107
  return HTMLResponse("<h2>index.html not found</h2>", status_code=404)
108
 
109
 
110
+ @app.get("/health")
111
+ async def health():
112
+ from services.stt import _model_ready, _model_error
113
+ return JSONResponse({
114
+ "status": "ok",
115
+ "model_ready": _model_ready.is_set(),
116
+ "model_error": _model_error,
117
+ "rtc_sessions": len(_rtc_sessions),
118
+ })
119
+
120
+
121
  # ══════════════════════════════════════════════════════════════════════════════
122
  # WEBRTC SIGNALING ENDPOINTS
123
  # ══════════════════════════════════════════════════════════════════════════════
124
 
125
  @app.post("/rtc/offer")
126
  async def rtc_offer(request: Request):
 
 
 
 
 
 
 
 
 
127
  if not WEBRTC_AVAILABLE:
128
  return JSONResponse(
129
  {"error": "WebRTC unavailable. Use WebSocket fallback at /ws/voice"},
130
  status_code=503,
131
  )
 
132
  body = await request.json()
133
  sdp = body.get("sdp", "")
134
  sdp_type = body.get("type", "offer")
135
  session_id = body.get("session_id") or uuid.uuid4().hex
136
 
 
137
  session = _rtc_sessions.get(session_id)
138
  if session is None:
139
  session = WebRTCSession(ai_backend=ai)
 
146
 
147
  @app.post("/rtc/ice")
148
  async def rtc_ice(request: Request):
 
149
  if not WEBRTC_AVAILABLE:
150
  return JSONResponse({"error": "WebRTC unavailable"}, status_code=503)
 
151
  body = await request.json()
152
  session_id = body.get("session_id", "")
153
  candidate = body.get("candidate", {})
154
+ session = _rtc_sessions.get(session_id)
 
155
  if session is None:
156
  return JSONResponse({"error": "Session not found"}, status_code=404)
 
157
  await session.add_ice_candidate(candidate)
158
  return JSONResponse({"ok": True})
159
 
160
 
161
  @app.delete("/rtc/session/{session_id}")
162
  async def rtc_close(session_id: str):
 
163
  session = _rtc_sessions.pop(session_id, None)
164
  if session:
165
  await session.close()
 
202
  async def ws_chat(ws: WebSocket):
203
  await ws.accept()
204
  print("[CHAT] Client connected ✓")
205
+
206
+ # FIX-SESSION: Start with no user_id; wait for 'init' to set it.
207
+ user_id: str = ""
208
+
209
  try:
210
  while True:
211
  raw = await ws.receive_text()
 
215
  await _safe_text(ws, {"type": "error", "text": "Invalid JSON"})
216
  continue
217
 
218
+ msg_type = data.get("type", "")
219
+
220
+ # ── Init handshake ──────────────────────────────────────────────
221
+ if msg_type == "init":
222
+ claimed = str(data.get("user_id", "")).strip()[:64]
223
+ if claimed:
224
+ user_id = claimed
225
+ print(f"[CHAT] Session restored for user_id={user_id!r}")
226
+ await _safe_text(ws, {"type": "init_ack", "user_id": user_id})
227
+ continue
228
+
229
+ if msg_type == "ping":
230
+ await _safe_text(ws, {"type": "pong"})
231
+ continue
232
+
233
+ # Fall back to user_id in message payload (compatibility)
234
+ if not user_id:
235
+ user_id = str(data.get("user_id", "default_user"))[:64]
236
+
237
  user_query = data.get("user_query", "").strip()
238
  if not user_query:
239
  continue
 
259
 
260
 
261
  # ══════════════════════════════════════════════════════════════════════════════
262
+ # WEBSOCKET — VOICE (STT→LLM→TTS pipeline over WS)
263
  # ══════════════════════════════════════════════════════════════════════════════
264
 
265
+ # How long (seconds) to wait for the first 'init' message before using fallback
266
+ _INIT_TIMEOUT = 3.0
267
+
268
+
269
  @app.websocket("/ws/voice")
270
  async def ws_voice(ws: WebSocket):
271
  await ws.accept()
272
+ print("[VOICE] Client connected")
273
+
274
+ # ── FIX-SESSION: Resolve stable user_id from browser init message ────────
275
+ # Wait up to _INIT_TIMEOUT seconds for the {'type':'init','user_id':...} msg.
276
+ # This is always the FIRST message sent by script.js on WS open.
277
+ user_id: str = ""
278
+ try:
279
+ first_raw = await asyncio.wait_for(ws.receive(), timeout=_INIT_TIMEOUT)
280
+ if "text" in first_raw and first_raw["text"]:
281
+ try:
282
+ first_msg = json.loads(first_raw["text"])
283
+ if first_msg.get("type") == "init":
284
+ claimed = str(first_msg.get("user_id", "")).strip()[:64]
285
+ if claimed:
286
+ user_id = claimed
287
+ except (json.JSONDecodeError, KeyError):
288
+ pass
289
+ except asyncio.TimeoutError:
290
+ print("[VOICE] No init message within timeout — using fallback user_id")
291
+
292
+ if not user_id:
293
+ user_id = f"voice_{uuid.uuid4().hex[:12]}"
294
+ print(f"[VOICE] Fallback user_id={user_id}")
295
+ else:
296
+ print(f"[VOICE] Session user_id={user_id}")
297
 
298
+ await _safe_text(ws, {"type": "init_ack", "user_id": user_id})
 
299
 
300
  stt = STTProcessor()
301
  _active_streamer: ParallelTTSStreamer | None = None
 
317
  async def _handle_utterance(audio_bytes: bytes):
318
  nonlocal _active_streamer
319
 
320
+ # ── STT ───────────────────────────────────────────────────────────────
321
  transcript = await stt.transcribe(audio_bytes)
322
  if not transcript:
323
  await _safe_text(ws, {"type": "error", "text": "কথা বুঝতে পারিনি, আবার বলুন।"})
 
353
  if not await _safe_bytes(ws, chunk):
354
  break
355
 
 
356
  await asyncio.gather(run_llm(), run_tts(), return_exceptions=True)
357
  _active_streamer = None
358
  await _safe_text(ws, {"type": "end"})
 
381
  elif "text" in data and data["text"]:
382
  try:
383
  msg = json.loads(data["text"])
384
+ t = msg.get("type", "")
385
+ if t == "init":
386
+ # Late re-init (e.g. after reconnect with same WS obj — rare)
387
+ claimed = str(msg.get("user_id", "")).strip()[:64]
388
+ if claimed:
389
+ user_id = claimed
390
  await _safe_text(ws, {"type": "init_ack", "user_id": user_id})
391
+ elif t == "ping":
392
  await _safe_text(ws, {"type": "pong"})
393
+ elif t == "cancel":
394
  await _cancel_active()
395
  await _safe_text(ws, {"type": "end"})
396
  except json.JSONDecodeError:
core/backend.py CHANGED
@@ -4,23 +4,11 @@ import asyncio
4
  import json
5
  import os
6
  import uuid
7
-
8
- # ── Disable LangSmith unless explicitly configured ────────────────────────────
9
- from dotenv import load_dotenv as _ld; _ld()
10
-
11
- _tracing_requested = os.getenv("LANGCHAIN_TRACING_V2", "false").strip().lower() == "true"
12
- _key_present = bool(os.getenv("LANGCHAIN_API_KEY", "").strip())
13
-
14
- if not (_tracing_requested and _key_present):
15
- os.environ["LANGCHAIN_TRACING_V2"] = "false"
16
- os.environ.pop("LANGCHAIN_API_KEY", None)
17
- print("[BACKEND] LangSmith tracing disabled.")
18
- else:
19
- print("[BACKEND] LangSmith tracing ENABLED.")
20
 
21
  import aiosqlite
22
  import pytz
23
- from datetime import datetime
24
  from dotenv import load_dotenv
25
 
26
  from langchain_core.messages import (
@@ -28,11 +16,18 @@ from langchain_core.messages import (
28
  SystemMessage, ToolMessage,
29
  )
30
  from langchain_core.tools import tool
 
31
  from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
32
  from langgraph.graph import END, START, StateGraph
33
  from langgraph.graph.message import add_messages
34
  from langgraph.prebuilt import ToolNode, tools_condition
35
- from typing import Annotated, TypedDict
 
 
 
 
 
 
36
 
37
 
38
  # ═══════════════════════════════════════════════════════════════════════════════
@@ -60,28 +55,126 @@ def format_bd_number(num: str) -> str:
60
 
61
 
62
  def send_sms(to_number: str, message: str) -> None:
63
- try:
64
- from twilio.rest import Client
65
- client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN"))
66
- client.messages.create(
67
- body=message,
68
- from_=os.getenv("TWILIO_PHONE_NUMBER"),
69
- to=to_number,
70
- )
71
- except Exception as e:
72
- print(f"[SMS] Failed to send: {e}")
73
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
  # ═══════════════════════════════════════════════════════════════════════════════
76
  # TOOLS
77
  # ═══════════════════════════════════════════════════════════════════════════════
78
  @tool
79
  def get_bd_time() -> str:
80
- """Get current Bangladesh time (Asia/Dhaka) with weekday name."""
81
- tz = pytz.timezone("Asia/Dhaka")
 
 
82
  now = datetime.now(tz)
83
- return now.strftime("%Y-%m-%d %H:%M:%S (%A, Bangladesh Time)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
 
86
  @tool
87
  async def search_doctor(
@@ -90,7 +183,7 @@ async def search_doctor(
90
  visiting_days: str = "",
91
  ) -> str:
92
  """
93
- Search doctors by name, category, or visiting_days from the SQLite database.
94
  Any combination of filters is supported (OR logic across fields).
95
  """
96
  db_path = get_db_path()
@@ -152,16 +245,17 @@ async def book_appointment(
152
  patient_age: str,
153
  patient_num: str,
154
  visiting_date: str,
 
155
  ) -> str:
156
  """
157
  Book a doctor appointment and save it to the patients table.
158
-
159
  Args:
160
  doctor_id: Doctor's ID from search_doctor results.
161
  patient_name: Full name of the patient.
162
  patient_age: Age of the patient (e.g. "32").
163
  patient_num: Contact phone number of the patient.
164
  visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15).
 
165
  """
166
  db_path = get_db_path()
167
  patient_num = format_bd_number(patient_num)
@@ -191,12 +285,29 @@ async def book_appointment(
191
 
192
  await db.execute(
193
  """INSERT INTO patients
194
- (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date)
195
- VALUES (?, ?, ?, ?, ?, ?)""",
196
- (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date),
197
  )
198
  await db.commit()
199
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200
  return (
201
  f"✅ Appointment Booked!\n"
202
  f"━━━━━━━━━━━━━━━━━━━━━━\n"
@@ -207,6 +318,7 @@ async def book_appointment(
207
  f"Contact : {patient_num}\n"
208
  f"━━━━━━━━━━━━━━━━━━━━━━\n"
209
  f"Please arrive 10 minutes early."
 
210
  )
211
 
212
 
@@ -243,16 +355,43 @@ async def delete_appointment(patient_num: str, doctor_name: str) -> str:
243
  # ═══════════════════════════════════════════════════════════════════════════════
244
  # SYSTEM PROMPT
245
  # ═══════════════════════════════════════════════════════════════════════════════
246
- BASE_SYSTEM = (
247
- "You are a helpful Bangla voice assistant for a doctor appointment system.\n"
248
- "Rules:\n"
249
- "- Always respond in Bangla (বাংলা).\n"
250
- "- Keep sentences short and natural for text-to-speech playback.\n"
251
- "- Avoid markdown, bullet points, or long lists in voice responses.\n"
252
- "- Use tools when needed to search doctors or manage appointments.\n"
253
- "- Be polite, concise, and clear.\n"
254
- "- Do not use English unless a proper noun requires it.\n"
255
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
 
257
  SUMMARY_SYSTEM = (
258
  BASE_SYSTEM
@@ -261,119 +400,51 @@ SUMMARY_SYSTEM = (
261
  "Use this memory for continuity. Do not repeat it unless asked."
262
  )
263
 
264
- # ── Ollama system prompt (no tool calling) ─────────────────────────────────────
265
- OLLAMA_SYSTEM = (
266
- BASE_SYSTEM
267
- + "\nIMPORTANT: You do not have tool access in this mode. "
268
- "Politely tell the user you cannot look up doctor information right now, "
269
- "and ask them to use the chat interface for complex queries."
270
- )
271
-
272
-
273
- # ═══════════════════════════════════════════════════════════════════════════════
274
- # TOOL CALLING — VALIDATED LAYER
275
- # ═══════════════════════════════════════════════════════════════════════════════
276
- class ToolCallValidator:
277
- MAX_RETRIES = 2
278
-
279
- def __init__(self, tool_node: ToolNode):
280
- self._node = tool_node
281
-
282
- async def invoke(self, state: ChatState) -> ChatState:
283
- last_msg = state["messages"][-1]
284
- if not hasattr(last_msg, "tool_calls") or not last_msg.tool_calls:
285
- return state
286
-
287
- for attempt in range(self.MAX_RETRIES + 1):
288
- try:
289
- result = await self._node.ainvoke(state)
290
- return result
291
- except Exception as exc:
292
- print(f"[TOOL] Attempt {attempt + 1} failed: {exc}")
293
- if attempt == self.MAX_RETRIES:
294
- tool_calls = last_msg.tool_calls
295
- fallback_msgs = [
296
- ToolMessage(
297
- content="Tool execution failed after retries. Please inform the user politely.",
298
- tool_call_id=tc["id"],
299
- )
300
- for tc in tool_calls
301
- ]
302
- return {"messages": state["messages"] + fallback_msgs}
303
- await asyncio.sleep(0.3 * (attempt + 1))
304
-
305
- return state
306
-
307
 
308
  # ═══════════════════════════════════════════════════════════════════════════════
309
  # AGENT
310
  # ═══════════════════════════════════════════════════════════════════════════════
311
  class AIBackend:
312
 
313
- def __init__(
314
- self,
315
- use_gemini: bool = True,
316
- use_ollama: bool = False,
317
- use_fallback: bool = False,
318
- ) -> None:
319
- load_dotenv()
320
  os.environ.setdefault("LANGCHAIN_PROJECT", "Doctor Appointment Automation")
321
 
322
- self._use_gemini = use_gemini
323
- self._use_ollama = use_ollama
324
- self._use_fallback = use_fallback
325
-
326
- self._build_llm()
327
-
328
- def _build_llm(self) -> None:
329
- if self._use_gemini:
330
- from langchain_google_genai import ChatGoogleGenerativeAI
331
  self.llm = ChatGoogleGenerativeAI(
332
- model="gemini-2.5-flash",
333
  temperature=0.3,
334
  )
335
- print("[BACKEND] Using Gemini 2.5 Flash")
336
-
337
- elif self._use_ollama:
338
- from langchain_ollama import ChatOllama
339
- ollama_model = os.getenv("OLLAMA_MODEL", "qwen2.5")
340
- self.llm = ChatOllama(
341
- model=ollama_model,
342
- temperature=0.3,
343
- )
344
- print(f"[BACKEND] Using Ollama model: {ollama_model}")
345
-
346
- else:
347
- self.llm = None
348
- print("[BACKEND] Using local fallback responder (no external LLM)")
349
-
350
- if self._use_gemini and self.llm is not None:
351
- self.tools = [
352
- search_doctor,
353
- book_appointment,
354
- get_bd_time,
355
- search_appointment_by_phone,
356
- delete_appointment,
357
- ]
358
- self.tool_node = ToolNode(self.tools)
359
- self.tool_validator = ToolCallValidator(self.tool_node)
360
- self.llm_with_tools = self.llm.bind_tools(self.tools)
361
  else:
362
- self.tools = []
363
- self.tool_node = None
364
- self.tool_validator = None
365
- self.llm_with_tools = self.llm
 
 
 
 
 
 
 
 
 
 
366
 
367
  # ── Setup ──────────────────────────────────────────────────────────────────
368
  async def async_setup(self) -> None:
369
- db_path = get_db_path()
370
- self.conn = await aiosqlite.connect(db_path)
371
- self._meta_conn = await aiosqlite.connect(db_path)
372
-
373
  self.checkpointer = AsyncSqliteSaver(self.conn)
374
  await self._create_tables()
375
  self.graph = self._build_graph()
376
  self.summary_graph = self._build_summary_graph()
 
377
 
378
  async def _create_tables(self) -> None:
379
  await self.conn.execute("""
@@ -384,23 +455,24 @@ class AIBackend:
384
  """)
385
  await self.conn.execute("""
386
  CREATE TABLE IF NOT EXISTS doctors (
387
- id INTEGER PRIMARY KEY AUTOINCREMENT,
388
- doctor_name TEXT NOT NULL,
389
- category TEXT NOT NULL,
390
- visiting_days TEXT NOT NULL,
391
- chamber TEXT,
392
- fee TEXT
393
  )
394
  """)
395
  await self.conn.execute("""
396
  CREATE TABLE IF NOT EXISTS patients (
397
- id INTEGER PRIMARY KEY AUTOINCREMENT,
398
- doctor_name TEXT NOT NULL,
399
  doctor_category TEXT,
400
- patient_name TEXT NOT NULL,
401
- patient_age TEXT,
402
- patient_num TEXT NOT NULL,
403
- visiting_date TEXT NOT NULL
 
404
  )
405
  """)
406
  await self.conn.commit()
@@ -426,71 +498,47 @@ class AIBackend:
426
 
427
  response = await self.llm.ainvoke(messages + [HumanMessage(content=prompt)])
428
  return {
429
- "summary": response.content,
430
  "messages": [RemoveMessage(id=m.id) for m in messages[:-2]],
431
  }
432
 
433
- async def should_summarize(self, state: ChatState) -> str:
434
- return "summarize_node" if len(state["messages"]) > 10 else "chat_node"
435
-
436
  # ── Chat node ──────────────────────────────────────────────────────────────
437
- # FIX: chat_node now stores the COMPLETE response in graph state (for
438
- # checkpointing / memory), while ai_only_stream handles live token delivery
439
- # directly from the LLM — bypassing the graph's collect-then-return pattern.
440
  async def chat_node(self, state: ChatState):
441
- if self._use_fallback or self.llm is None:
442
- return {
443
- "messages": [AIMessage(content=(
444
- "দুঃখিত, এই মুহূর্তে AI সংযোগ পাওয়া যাচ্ছে না। "
445
- "অনুগ্রহ করে পরে আবার চেষ্টা করুন।"
446
- ))]
447
- }
448
 
 
 
 
 
 
449
  summary = state.get("summary", "")
450
  messages = state["messages"]
451
 
452
- if self._use_ollama:
453
- sys_content = OLLAMA_SYSTEM
454
- else:
455
- sys_content = SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM
 
 
456
 
 
457
  full_messages = [SystemMessage(content=sys_content)] + list(messages)
458
 
459
- # Collect full response for graph state storage
460
- collected: list[AIMessageChunk] = []
461
- async for chunk in self.llm_with_tools.astream(full_messages):
462
- collected.append(chunk)
463
-
464
- if not collected:
465
- response = AIMessage(content="")
466
- else:
467
- response = collected[0]
468
- for c in collected[1:]:
469
- response = response + c
470
 
471
- print(f"[AI] response ({len(str(response.content))} chars): {str(response.content)[:120]}")
 
472
  return {"messages": [response]}
473
 
474
- # ── Validated tool node ────────────────────────────────────────────────────
475
- async def validated_tools_node(self, state: ChatState):
476
- if self.tool_validator is None:
477
- return state
478
- return await self.tool_validator.invoke(state)
479
-
480
  # ── Graph ──────────────────────────────────────────────────────────────────
481
  def _build_graph(self):
482
  g = StateGraph(ChatState)
483
  g.add_node("chat_node", self.chat_node)
484
-
485
- if self._use_gemini and self.tool_node is not None:
486
- g.add_node("tools", self.validated_tools_node)
487
- g.add_edge(START, "chat_node")
488
- g.add_conditional_edges("chat_node", tools_condition)
489
- g.add_edge("tools", "chat_node")
490
- else:
491
- g.add_edge(START, "chat_node")
492
- g.add_edge("chat_node", END)
493
-
494
  return g.compile(checkpointer=self.checkpointer)
495
 
496
  def _build_summary_graph(self):
@@ -500,126 +548,75 @@ class AIBackend:
500
  g.add_edge("summarize_node", END)
501
  return g.compile(checkpointer=self.checkpointer)
502
 
503
- # ── Streaming — FIXED ──────────────────────────────────────────────────────
504
- async def ai_only_stream(self, user_id: str, user_query: str, thread_id: str):
505
- """
506
- Async generator that yields AI text tokens in real time.
507
-
508
- FIX: The old approach used graph.astream(stream_mode="messages") which
509
- only emits AIMessageChunk events DURING node execution. But chat_node
510
- collected all chunks internally before returning, so no AIMessageChunk
511
- ever escaped the node — the generator yielded nothing and the frontend
512
- waited forever.
513
-
514
- New approach (two-phase):
515
- 1. Stream tokens DIRECTLY from the LLM right now → yield to caller
516
- 2. Save the full response to graph state via graph.ainvoke() in background
517
- so conversation memory / checkpointing still works.
518
  """
519
- if self._use_fallback or self.llm is None:
520
- fallback = (
521
- "দুঃখিত, এই মুহূর্তে AI সংযোগ পাওয়া যাচ্ছে না। "
522
- "অনুগ্রহ করে পরে আবার চেষ্টা করুন।"
523
- )
524
- yield fallback
525
- return
526
 
527
- summary = ""
528
- config = {"configurable": {"thread_id": thread_id}}
529
-
530
- # Try to get existing summary from graph state
531
- try:
532
- state = await self.graph.aget_state(config)
533
- summary = state.values.get("summary", "") if state and state.values else ""
534
- except Exception:
535
- pass
536
-
537
- sys_content = (
538
- OLLAMA_SYSTEM if self._use_ollama
539
- else (SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM)
540
- )
541
-
542
- # Fetch conversation history from checkpointer
543
- history: list = []
544
- try:
545
- state = await self.graph.aget_state(config)
546
- if state and state.values:
547
- history = list(state.values.get("messages", []))
548
- except Exception:
549
- pass
550
-
551
- full_messages = (
552
- [SystemMessage(content=sys_content)]
553
- + history
554
- + [HumanMessage(content=user_query)]
555
- )
556
-
557
- print(f"[AI] Streaming for thread={thread_id}, history={len(history)} msgs")
558
-
559
- # Phase 1: stream tokens live to the frontend
560
- collected: list[AIMessageChunk] = []
561
- token_count = 0
562
  try:
563
- async for chunk in self.llm_with_tools.astream(full_messages):
564
- collected.append(chunk)
565
- if chunk.content:
566
- token_count += 1
567
- yield chunk.content
 
568
  except Exception as exc:
569
- print(f"[AI] Streaming error: {exc}")
570
- import traceback; traceback.print_exc()
571
- yield "দুঃখিত, একটি সমস্যা হয়েছে। আবার চেষ্টা করুন।"
572
- return
573
-
574
- print(f"[AI] Stream done: {token_count} tokens")
575
-
576
- # Phase 2: persist to graph state in background (non-blocking)
577
- if collected:
578
- full_response = collected[0]
579
- for c in collected[1:]:
580
- full_response = full_response + c
581
-
582
- async def _save_to_graph():
583
- try:
584
- save_state = {"messages": [HumanMessage(content=user_query)]}
585
- await self.graph.ainvoke(
586
- save_state,
587
- config=config,
588
- # We already have the response; override chat_node
589
- # by injecting the AI message directly
590
- )
591
- except Exception as exc:
592
- # Non-critical: history save failed, but user got their response
593
- print(f"[AI] Graph state save error (non-critical): {exc}")
594
-
595
- # Save history via a simpler direct approach: just invoke with the
596
- # human message and let chat_node regenerate (it will be fast since
597
- # Ollama is local). This ensures checkpointer stays consistent.
598
- asyncio.create_task(_save_to_graph())
599
 
600
  # ── Thread management ──────────────────────────────────────────────────────
601
  @staticmethod
602
  def generate_thread_id() -> str:
603
  return str(uuid.uuid4())
604
 
 
 
 
 
 
 
605
  # ── Public entry point ─────────────────────────────────────────────────────
606
- async def main(self, user_id: str, user_query: str):
607
  """Return an async generator of AI text tokens."""
608
- async with self._meta_conn.execute(
609
  "SELECT threadId FROM userid_threadid WHERE userId = ?", (user_id,)
610
  ) as cursor:
611
  row = await cursor.fetchone()
612
 
613
  if row is None:
614
  thread_id = user_id + self.generate_thread_id()
615
- await self._meta_conn.execute(
616
  "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)",
617
  (user_id, thread_id),
618
  )
619
- await self._meta_conn.commit()
620
  else:
621
  thread_id = row[0]
622
 
623
- # FIX: pass user_id, user_query, thread_id directly so ai_only_stream
624
- # can stream from LLM without going through the blocking graph node
625
- return self.ai_only_stream(user_id, user_query, thread_id)
 
 
 
 
 
4
  import json
5
  import os
6
  import uuid
7
+ import aiosmtplib
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  import aiosqlite
10
  import pytz
11
+ from datetime import datetime, timedelta
12
  from dotenv import load_dotenv
13
 
14
  from langchain_core.messages import (
 
16
  SystemMessage, ToolMessage,
17
  )
18
  from langchain_core.tools import tool
19
+ from langchain_google_genai import ChatGoogleGenerativeAI
20
  from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
21
  from langgraph.graph import END, START, StateGraph
22
  from langgraph.graph.message import add_messages
23
  from langgraph.prebuilt import ToolNode, tools_condition
24
+ from twilio.rest import Client
25
+ from typing import Annotated, TypedDict, Optional, AsyncGenerator
26
+ from email.message import EmailMessage
27
+ from dateparser.search import search_dates
28
+ from langchain_ollama import ChatOllama
29
+
30
+ load_dotenv()
31
 
32
 
33
  # ═══════════════════════════════════════════════════════════════════════════════
 
55
 
56
 
57
  def send_sms(to_number: str, message: str) -> None:
58
+ client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN"))
59
+ client.messages.create(
60
+ body=message,
61
+ from_=os.getenv("TWILIO_PHONE_NUMBER"),
62
+ to=to_number,
63
+ )
64
+
 
 
 
65
 
66
+ async def send_mail(to_mail: str, subject: str, body: str):
67
+ email = EmailMessage()
68
+ email["From"] = "walidofficework@gmail.com"
69
+ email["To"] = to_mail
70
+ email["Subject"] = subject
71
+ email.set_content(body)
72
+
73
+ await aiosmtplib.send(
74
+ email,
75
+ hostname="smtp.gmail.com",
76
+ port=465,
77
+ username="walidofficework@gmail.com",
78
+ password="bajq dkqr qacs pehr",
79
+ use_tls=True,
80
+ )
81
 
82
  # ═══════════════════════════════════════════════════════════════════════════════
83
  # TOOLS
84
  # ═══════════════════════════════════════════════════════════════════════════════
85
  @tool
86
  def get_bd_time() -> str:
87
+ """Get current Bangladesh date and time along with the next 14 days."""
88
+ # Bangladesh timezone
89
+ tz = pytz.timezone("Asia/Dhaka")
90
+ # Current datetime
91
  now = datetime.now(tz)
92
+ # Create result dictionary
93
+ result = {
94
+ "CURRENT_DATETIME": now.strftime("%Y-%m-%d %H:%M:%S %Z"),
95
+ "TODAY": now.strftime("%A, %B %d, %Y"),
96
+ "TOMORROW": (now + timedelta(days=1)).strftime("%A, %B %d, %Y"),
97
+ "NEXT_14_DAYS": {}
98
+ }
99
+ # Generate next 14 days
100
+ for i in range(1, 15):
101
+ future_date = now + timedelta(days=i)
102
+ result["NEXT_14_DAYS"][f"+{i}"] = future_date.strftime("%A, %B %d, %Y")
103
+ return json.dumps(result)
104
+
105
+ @tool
106
+ async def get_doctor_categories() -> str:
107
+ """
108
+ Fetch all unique doctor categories from the database.
109
+ """
110
+
111
+ db_path = get_db_path()
112
+
113
+ query = """
114
+ SELECT DISTINCT category
115
+ FROM doctors
116
+ WHERE category IS NOT NULL
117
+ AND TRIM(category) != ''
118
+ ORDER BY category ASC
119
+ """
120
+
121
+ async with aiosqlite.connect(db_path) as db:
122
+ db.row_factory = aiosqlite.Row
123
+
124
+ cursor = await db.execute(query)
125
+ rows = await cursor.fetchall()
126
+
127
+ categories = [row["category"] for row in rows]
128
 
129
+ return json.dumps({
130
+ "success": True,
131
+ "count": len(categories),
132
+ "data": categories
133
+ })
134
+
135
+ @tool
136
+ async def get_doctors_by_day(
137
+ visiting_day: str,
138
+ ) -> str:
139
+ """
140
+ Get all doctors available on a specific visiting day.
141
+ Example inputs:
142
+ - Sunday
143
+ - Monday
144
+ - Friday
145
+ """
146
+
147
+ db_path = get_db_path()
148
+
149
+ query = """
150
+ SELECT *
151
+ FROM doctors
152
+ WHERE LOWER(visiting_days) LIKE ?
153
+ """
154
+
155
+ param = [f"%{visiting_day.lower()}%"]
156
+
157
+ async with aiosqlite.connect(db_path) as db:
158
+ db.row_factory = aiosqlite.Row
159
+
160
+ cursor = await db.execute(query, param)
161
+ rows = await cursor.fetchall()
162
+
163
+ if not rows:
164
+ return json.dumps({
165
+ "success": False,
166
+ "message": f"No doctors found for {visiting_day}.",
167
+ "data": []
168
+ })
169
+
170
+ doctors = [dict(row) for row in rows]
171
+
172
+ return json.dumps({
173
+ "success": True,
174
+ "visiting_day": visiting_day,
175
+ "count": len(doctors),
176
+ "data": doctors
177
+ }, ensure_ascii=False)
178
 
179
  @tool
180
  async def search_doctor(
 
183
  visiting_days: str = "",
184
  ) -> str:
185
  """
186
+ Search doctors by name, category, or visiting_days from the database.
187
  Any combination of filters is supported (OR logic across fields).
188
  """
189
  db_path = get_db_path()
 
245
  patient_age: str,
246
  patient_num: str,
247
  visiting_date: str,
248
+ patient_mail: str
249
  ) -> str:
250
  """
251
  Book a doctor appointment and save it to the patients table.
 
252
  Args:
253
  doctor_id: Doctor's ID from search_doctor results.
254
  patient_name: Full name of the patient.
255
  patient_age: Age of the patient (e.g. "32").
256
  patient_num: Contact phone number of the patient.
257
  visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15).
258
+ patient_mail: Mail address for confirmation mail.
259
  """
260
  db_path = get_db_path()
261
  patient_num = format_bd_number(patient_num)
 
285
 
286
  await db.execute(
287
  """INSERT INTO patients
288
+ (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail)
289
+ VALUES (?, ?, ?, ?, ?, ?, ?)""",
290
+ (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail),
291
  )
292
  await db.commit()
293
 
294
+ # Mail SMS confirmation
295
+ mail_message = (
296
+ f"Doctor : {doctor_name}\n"
297
+ f"Patient : {patient_name}\n"
298
+ f"Visit Date : {visiting_date}\n"
299
+ f"Please arrive 10 minutes early."
300
+ )
301
+ try:
302
+ await send_mail(
303
+ to_mail=patient_mail,
304
+ subject="✅ Appointment Confirmed!",
305
+ body=mail_message,
306
+ )
307
+ mail_status = "\n📧 Mail confirmation sent."
308
+ except Exception as e:
309
+ mail_status = f"\n⚠️ Mail failed: {str(e)}"
310
+
311
  return (
312
  f"✅ Appointment Booked!\n"
313
  f"━━━━━━━━━━━━━━━━━━━━━━\n"
 
318
  f"Contact : {patient_num}\n"
319
  f"━━━━━━━━━━━━━━━━━━━━━━\n"
320
  f"Please arrive 10 minutes early."
321
+ f"{mail_status}"
322
  )
323
 
324
 
 
355
  # ═══════════════════════════════════════════════════════════════════════════════
356
  # SYSTEM PROMPT
357
  # ═══════════════════════════════════════════════════════════════════════════════
358
+ BASE_SYSTEM = """
359
+ You are a Doctor Appointment Assistant AI.
360
+
361
+ Your job is to help users manage medical appointments.
362
+
363
+ CAPABILITIES:
364
+ - Book doctor appointments
365
+ - Reschedule appointments
366
+ - Cancel appointments
367
+ - Collect patient details
368
+
369
+ STRICT RULES:
370
+ - You are NOT a doctor.
371
+ - NEVER diagnose diseases.
372
+ - NEVER recommend medicines or treatments.
373
+
374
+ APPOINTMENT FLOW:
375
+ 1. Detect intent (book / cancel / reschedule / inquiry)
376
+ 2. Collect details
377
+ 3. Confirm all details before final booking
378
+
379
+ STYLE:
380
+ - Be short, clear, structured
381
+ - Ask one question at a time when needed
382
+ - Focus on completing booking
383
+
384
+ LANGUAGE RULE:
385
+ - Detect user language from latest message.
386
+ - If English → reply English.
387
+ - If Bangla → reply Bangla (বাংলা).
388
+ - If Banglish → reply Bangla (বাংলা).
389
+ - Never mix languages unless user mixes first.
390
+
391
+ TOOLS:
392
+ - Use backend tools if available for scheduling
393
+ - Always confirm before final action
394
+ """
395
 
396
  SUMMARY_SYSTEM = (
397
  BASE_SYSTEM
 
400
  "Use this memory for continuity. Do not repeat it unless asked."
401
  )
402
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
403
 
404
  # ═══════════════════════════════════════════════════════════════════════════════
405
  # AGENT
406
  # ═══════════════════════════════════════════════════════════════════════════════
407
  class AIBackend:
408
 
409
+ # ── FIX-BUG1: was `_init_` (single underscores) — never called by Python
410
+ def __init__(self, use_gemini: bool = False, use_ollama: bool = True, use_fallback: bool = False):
411
+ self.use_gemini = use_gemini
412
+ self.use_ollama = use_ollama
413
+ self.use_fallback = use_fallback
 
 
414
  os.environ.setdefault("LANGCHAIN_PROJECT", "Doctor Appointment Automation")
415
 
416
+ if use_gemini:
 
 
 
 
 
 
 
 
417
  self.llm = ChatGoogleGenerativeAI(
418
+ model="gemini-2.0-flash",
419
  temperature=0.3,
420
  )
421
+ elif use_ollama:
422
+ self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
423
  else:
424
+ # Local fallback — extend as needed
425
+ self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2)
426
+
427
+ self.tools = [
428
+ search_doctor,
429
+ book_appointment,
430
+ get_bd_time,
431
+ search_appointment_by_phone,
432
+ delete_appointment,
433
+ get_doctor_categories,
434
+ get_doctors_by_day
435
+ ]
436
+ self.tool_node = ToolNode(self.tools)
437
+ self.llm_with_tools = self.llm.bind_tools(self.tools)
438
 
439
  # ── Setup ──────────────────────────────────────────────────────────────────
440
  async def async_setup(self) -> None:
441
+ db_path = get_db_path()
442
+ self.conn = await aiosqlite.connect(db_path)
 
 
443
  self.checkpointer = AsyncSqliteSaver(self.conn)
444
  await self._create_tables()
445
  self.graph = self._build_graph()
446
  self.summary_graph = self._build_summary_graph()
447
+ print("[Backend] AIBackend ready ✓")
448
 
449
  async def _create_tables(self) -> None:
450
  await self.conn.execute("""
 
455
  """)
456
  await self.conn.execute("""
457
  CREATE TABLE IF NOT EXISTS doctors (
458
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
459
+ doctor_name TEXT,
460
+ category TEXT,
461
+ visiting_days TEXT,
462
+ visiting_time TEXT,
463
+ visiting_money INTEGER
464
  )
465
  """)
466
  await self.conn.execute("""
467
  CREATE TABLE IF NOT EXISTS patients (
468
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
469
+ doctor_name TEXT,
470
  doctor_category TEXT,
471
+ patient_name TEXT,
472
+ patient_age TEXT,
473
+ patient_num TEXT,
474
+ visiting_date TEXT,
475
+ patient_mail TEXT
476
  )
477
  """)
478
  await self.conn.commit()
 
498
 
499
  response = await self.llm.ainvoke(messages + [HumanMessage(content=prompt)])
500
  return {
501
+ "summary": response.content,
502
  "messages": [RemoveMessage(id=m.id) for m in messages[:-2]],
503
  }
504
 
 
 
 
505
  # ── Chat node ──────────────────────────────────────────────────────────────
 
 
 
506
  async def chat_node(self, state: ChatState):
507
+ """
508
+ Invokes the LLM with tool bindings and returns the AI response.
 
 
 
 
 
509
 
510
+ Uses ainvoke() (not collect-all-then-return astream()) so the call is
511
+ clean and deterministic. Token-level streaming is handled by LangGraph
512
+ itself via stream_mode="messages" in ai_only_stream(), which intercepts
513
+ the underlying LLM streaming at the graph level.
514
+ """
515
  summary = state.get("summary", "")
516
  messages = state["messages"]
517
 
518
+ print("#" * 50)
519
+ print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<")
520
+ print(f"[SUMMARY]: {summary[:120] if summary else 'None'}")
521
+ for m in messages:
522
+ print(f" [{m.__class__.__name__}]: {str(m.content)[:160]}")
523
+ print("#" * 50)
524
 
525
+ sys_content = SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM
526
  full_messages = [SystemMessage(content=sys_content)] + list(messages)
527
 
528
+ response = await self.llm_with_tools.ainvoke(full_messages)
 
 
 
 
 
 
 
 
 
 
529
 
530
+ print(f"[AI]: {str(response.content)[:200]}")
531
+ print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<")
532
  return {"messages": [response]}
533
 
 
 
 
 
 
 
534
  # ── Graph ──────────────────────────────────────────────────────────────────
535
  def _build_graph(self):
536
  g = StateGraph(ChatState)
537
  g.add_node("chat_node", self.chat_node)
538
+ g.add_node("tools", self.tool_node)
539
+ g.add_edge(START, "chat_node")
540
+ g.add_conditional_edges("chat_node", tools_condition)
541
+ g.add_edge("tools", "chat_node")
 
 
 
 
 
 
542
  return g.compile(checkpointer=self.checkpointer)
543
 
544
  def _build_summary_graph(self):
 
548
  g.add_edge("summarize_node", END)
549
  return g.compile(checkpointer=self.checkpointer)
550
 
551
+ # ── Streaming ──────────────────────────────────────────────────────────────
552
+ async def ai_only_stream(
553
+ self, initial_state: dict, config: dict
554
+ ) -> AsyncGenerator[str, None]:
 
 
 
 
 
 
 
 
 
 
 
555
  """
556
+ Async generator yields AI text tokens as they arrive.
 
 
 
 
 
 
557
 
558
+ FIX-BUG9: narrowed isinstance check to exclude ToolMessage content
559
+ from being streamed to the user, and guards against non-str content
560
+ (e.g. multimodal list payloads from Ollama tool-call chunks).
561
+ """
562
+ async for chunk, _meta in self.graph.astream(
563
+ initial_state, config=config, stream_mode="messages"
564
+ ):
565
+ # Only yield text content from AI messages.
566
+ # Exclude ToolMessage (tool execution results) — they contain
567
+ # raw JSON that should not be streamed directly to the user.
568
+ if (
569
+ isinstance(chunk, (AIMessage, AIMessageChunk))
570
+ and not isinstance(chunk, ToolMessage)
571
+ and isinstance(chunk.content, str)
572
+ and chunk.content
573
+ ):
574
+ yield chunk.content
575
+
576
+ # Auto-summarise in background when history grows long
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
577
  try:
578
+ current = await self.graph.aget_state(config)
579
+ if len(current.values.get("messages", [])) > 10:
580
+ asyncio.create_task(
581
+ self.summary_graph.ainvoke(current.values, config=config)
582
+ )
583
+ print("@" * 20, "Summarisation triggered", "@" * 20)
584
  except Exception as exc:
585
+ print(f"[Backend] Summarisation check failed: {exc}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
586
 
587
  # ── Thread management ──────────────────────────────────────────────────────
588
  @staticmethod
589
  def generate_thread_id() -> str:
590
  return str(uuid.uuid4())
591
 
592
+ async def retrieve_all_threads(self) -> list[str]:
593
+ threads: set[str] = set()
594
+ async for cp in self.checkpointer.alist(None):
595
+ threads.add(cp.config["configurable"]["thread_id"])
596
+ return list(threads)
597
+
598
  # ── Public entry point ─────────────────────────────────────────────────────
599
+ async def main(self, user_id: str, user_query: str) -> AsyncGenerator[str, None]:
600
  """Return an async generator of AI text tokens."""
601
+ async with self.conn.execute(
602
  "SELECT threadId FROM userid_threadid WHERE userId = ?", (user_id,)
603
  ) as cursor:
604
  row = await cursor.fetchone()
605
 
606
  if row is None:
607
  thread_id = user_id + self.generate_thread_id()
608
+ await self.conn.execute(
609
  "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)",
610
  (user_id, thread_id),
611
  )
612
+ await self.conn.commit()
613
  else:
614
  thread_id = row[0]
615
 
616
+ initial_state = {"messages": [HumanMessage(content=user_query)]}
617
+ config = {
618
+ "configurable": {"thread_id": thread_id},
619
+ "metadata": {"thread_id": thread_id},
620
+ "run_name": "chat_turn",
621
+ }
622
+ return self.ai_only_stream(initial_state, config)
frontend/index.html CHANGED
@@ -1,48 +1,3 @@
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
-
17
-
18
-
19
-
20
-
21
-
22
-
23
-
24
-
25
-
26
-
27
-
28
-
29
-
30
-
31
-
32
-
33
-
34
-
35
-
36
-
37
-
38
-
39
-
40
-
41
-
42
-
43
-
44
-
45
-
46
  <!DOCTYPE html>
47
  <html lang="bn">
48
  <head>
@@ -62,9 +17,8 @@
62
  <div class="bg-orb orb-3"></div>
63
 
64
  <!-- ══════════════════════════════════════════════════════════════
65
- INIT OVERLAY — shown until WS is ready + animations done
66
- No error text is displayed here; overlay auto-closes via
67
- hard 8s failsafe if backend takes longer than expected.
68
  ══════════════════════════════════════════════════════════════ -->
69
  <div id="init-overlay" class="init-overlay">
70
  <div class="init-card">
@@ -117,8 +71,10 @@
117
 
118
  <!-- ══════════════════════════════════════════════════════════════
119
  MAIN APP
 
 
120
  ══════════════════════════════════════════════════════════════ -->
121
- <div class="app" id="app" style="opacity:0;pointer-events:none;">
122
 
123
  <!-- ── Sidebar ── -->
124
  <aside class="sidebar" id="sidebar">
@@ -191,7 +147,6 @@
191
  <!-- Voice Settings -->
192
  <div class="dash-section">
193
  <div class="dash-title">🎛️ Voice Settings</div>
194
-
195
  <div class="setting-row">
196
  <label>Silence Threshold</label>
197
  <div class="slider-wrap">
@@ -271,13 +226,14 @@
271
  <!-- Controls -->
272
  <footer class="controls">
273
  <div class="text-row">
 
 
274
  <input
275
  type="text"
276
  id="text-input"
277
- placeholder="বার্তা লিখুন… (Type a message)"
278
  autocomplete="off"
279
  />
280
-
281
  <button id="send-btn" title="Send">
282
  <svg width="20" height="20" viewBox="0 0 24 24" fill="none"
283
  stroke="currentColor" stroke-width="2">
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  <!DOCTYPE html>
2
  <html lang="bn">
3
  <head>
 
17
  <div class="bg-orb orb-3"></div>
18
 
19
  <!-- ══════════════════════════════════════════════════════════════
20
+ INIT OVERLAY — shown until WS ready + animations done
21
+ Hard 8 s failsafe closes overlay if backend is slow.
 
22
  ══════════════════════════════════════════════════════════════ -->
23
  <div id="init-overlay" class="init-overlay">
24
  <div class="init-card">
 
71
 
72
  <!-- ══════════════════════════════════════════════════════════════
73
  MAIN APP
74
+ FIX-7: Hidden via .app CSS class (not inline opacity:0) to
75
+ prevent FOUC. JS adds class "visible" after init closes.
76
  ══════════════════════════════════════════════════════════════ -->
77
+ <div class="app" id="app">
78
 
79
  <!-- ── Sidebar ── -->
80
  <aside class="sidebar" id="sidebar">
 
147
  <!-- Voice Settings -->
148
  <div class="dash-section">
149
  <div class="dash-title">🎛️ Voice Settings</div>
 
150
  <div class="setting-row">
151
  <label>Silence Threshold</label>
152
  <div class="slider-wrap">
 
226
  <!-- Controls -->
227
  <footer class="controls">
228
  <div class="text-row">
229
+ <!-- FIX-4: textarea is created by script.js to replace this input,
230
+ keeping HTML clean while gaining auto-resize + shift-enter -->
231
  <input
232
  type="text"
233
  id="text-input"
234
+ placeholder="বার্তা লিখুন… (Enter পাঠান · Shift+Enter নতুন লাইন)"
235
  autocomplete="off"
236
  />
 
237
  <button id="send-btn" title="Send">
238
  <svg width="20" height="20" viewBox="0 0 24 24" fill="none"
239
  stroke="currentColor" stroke-width="2">
frontend/style.css CHANGED
@@ -48,41 +48,28 @@ body {
48
  animation: orb-float 12s ease-in-out infinite;
49
  }
50
  .orb-1 {
51
- width: 500px;
52
- height: 500px;
53
  background: radial-gradient(circle, #22d3ee, transparent);
54
- top: -200px;
55
- left: -150px;
56
  animation-delay: 0s;
57
  }
58
  .orb-2 {
59
- width: 400px;
60
- height: 400px;
61
  background: radial-gradient(circle, #818cf8, transparent);
62
- bottom: -100px;
63
- right: -100px;
64
  animation-delay: -4s;
65
  }
66
  .orb-3 {
67
- width: 300px;
68
- height: 300px;
69
  background: radial-gradient(circle, #f472b6, transparent);
70
- top: 50%;
71
- left: 50%;
72
  transform: translate(-50%, -50%);
73
  animation-delay: -8s;
74
  }
75
  @keyframes orb-float {
76
- 0%,
77
- 100% {
78
- transform: translate(0, 0) scale(1);
79
- }
80
- 33% {
81
- transform: translate(30px, -20px) scale(1.05);
82
- }
83
- 66% {
84
- transform: translate(-20px, 15px) scale(0.97);
85
- }
86
  }
87
 
88
  /* ── Init overlay ── */
@@ -94,9 +81,7 @@ body {
94
  align-items: center;
95
  justify-content: center;
96
  background: var(--bg);
97
- transition:
98
- opacity 0.6s ease,
99
- visibility 0.6s ease;
100
  }
101
  .init-overlay.hidden {
102
  opacity: 0;
@@ -119,15 +104,8 @@ body {
119
  animation: logo-pulse 2s ease-in-out infinite;
120
  }
121
  @keyframes logo-pulse {
122
- 0%,
123
- 100% {
124
- filter: drop-shadow(0 0 12px rgba(34, 211, 238, 0.4));
125
- transform: scale(1);
126
- }
127
- 50% {
128
- filter: drop-shadow(0 0 24px rgba(129, 140, 248, 0.6));
129
- transform: scale(1.06);
130
- }
131
  }
132
  .init-title {
133
  font-family: 'Syne', sans-serif;
@@ -159,52 +137,35 @@ body {
159
  border-bottom: 1px solid var(--border);
160
  transition: color 0.3s;
161
  }
162
- .stage.active {
163
- color: var(--accent);
164
- }
165
- .stage.done {
166
- color: var(--green);
167
- }
168
  .stage-dot {
169
- width: 8px;
170
- height: 8px;
171
  border-radius: 50%;
172
  background: var(--text3);
173
  flex-shrink: 0;
174
- transition:
175
- background 0.3s,
176
- box-shadow 0.3s;
177
  }
178
  .stage.active .stage-dot {
179
  background: var(--accent);
180
  box-shadow: 0 0 8px var(--accent);
181
  animation: blink-dot 0.8s ease-in-out infinite;
182
  }
183
- .stage.done .stage-dot {
184
- background: var(--green);
185
- }
186
  @keyframes blink-dot {
187
- 0%,
188
- 100% {
189
- opacity: 1;
190
- }
191
- 50% {
192
- opacity: 0.3;
193
- }
194
  }
195
  .stage-check {
196
  margin-left: auto;
197
  opacity: 0;
198
  transition: opacity 0.3s;
199
  }
200
- .stage.done .stage-check {
201
- opacity: 1;
202
- }
203
  .stage span {
204
  flex: 1;
205
  font-family: 'Hind Siliguri', sans-serif;
206
  }
207
-
208
  .init-bar-wrap {
209
  background: var(--bg3);
210
  border-radius: 99px;
@@ -227,14 +188,24 @@ body {
227
  font-family: 'JetBrains Mono', monospace;
228
  }
229
 
230
- /* ── App layout ── */
 
 
 
 
231
  .app {
232
  position: fixed;
233
  inset: 0;
234
  z-index: 1;
235
  display: flex;
 
 
236
  transition: opacity 0.5s ease;
237
  }
 
 
 
 
238
 
239
  /* ── Sidebar ── */
240
  .sidebar {
@@ -245,9 +216,7 @@ body {
245
  flex-direction: column;
246
  flex-shrink: 0;
247
  overflow-y: auto;
248
- transition:
249
- width var(--transition),
250
- transform var(--transition);
251
  z-index: 10;
252
  }
253
  .sidebar.collapsed {
@@ -285,19 +254,14 @@ body {
285
  color: var(--text);
286
  }
287
 
288
- .status-panel {
289
- padding: 16px;
290
- }
291
  .status-row {
292
  display: flex;
293
  align-items: center;
294
  justify-content: space-between;
295
  padding: 6px 0;
296
  }
297
- .status-label {
298
- font-size: 12px;
299
- color: var(--text2);
300
- }
301
  .status-badge {
302
  font-size: 10px;
303
  font-family: 'JetBrains Mono', monospace;
@@ -306,28 +270,13 @@ body {
306
  font-weight: 600;
307
  letter-spacing: 0.03em;
308
  }
309
- .badge-green {
310
- background: rgba(74, 222, 128, 0.12);
311
- color: var(--green);
312
- }
313
- .badge-yellow {
314
- background: rgba(251, 191, 36, 0.12);
315
- color: var(--yellow);
316
- }
317
- .badge-red {
318
- background: rgba(248, 113, 113, 0.12);
319
- color: var(--red);
320
- }
321
 
322
- .sidebar-divider {
323
- height: 1px;
324
- background: var(--border);
325
- margin: 4px 0;
326
- }
327
 
328
- .dash-section {
329
- padding: 16px;
330
- }
331
  .dash-title {
332
  font-size: 11px;
333
  font-weight: 700;
@@ -356,25 +305,16 @@ body {
356
  line-height: 1;
357
  margin-bottom: 4px;
358
  }
359
- .metric-label {
360
- font-size: 10px;
361
- color: var(--text3);
362
- }
363
 
364
- .setting-row {
365
- margin-bottom: 14px;
366
- }
367
  .setting-row label {
368
  display: block;
369
  font-size: 11px;
370
  color: var(--text2);
371
  margin-bottom: 6px;
372
  }
373
- .slider-wrap {
374
- display: flex;
375
- align-items: center;
376
- gap: 8px;
377
- }
378
  .slider-wrap input[type='range'] {
379
  flex: 1;
380
  accent-color: var(--accent);
@@ -399,10 +339,7 @@ body {
399
  font-family: 'Hind Siliguri', sans-serif;
400
  cursor: pointer;
401
  }
402
- .setting-select:focus {
403
- outline: none;
404
- border-color: var(--accent);
405
- }
406
 
407
  .queue-vis {
408
  display: flex;
@@ -416,14 +353,10 @@ body {
416
  background: var(--accent);
417
  border-radius: 3px;
418
  opacity: 0.3;
419
- transition:
420
- height 0.15s ease,
421
- opacity 0.15s ease;
422
  min-height: 4px;
423
  }
424
- .queue-bar.active {
425
- opacity: 0.9;
426
- }
427
  .queue-label {
428
  font-size: 11px;
429
  color: var(--text2);
@@ -449,11 +382,7 @@ body {
449
  border-bottom: 1px solid var(--border);
450
  flex-shrink: 0;
451
  }
452
- .topbar-left {
453
- display: flex;
454
- align-items: center;
455
- gap: 12px;
456
- }
457
  .topbar-center {
458
  font-family: 'Syne', sans-serif;
459
  font-weight: 700;
@@ -463,10 +392,7 @@ body {
463
  left: 50%;
464
  transform: translateX(-50%);
465
  }
466
- .topbar-right {
467
- display: flex;
468
- gap: 8px;
469
- }
470
  .mobile-menu-btn {
471
  display: none;
472
  background: none;
@@ -478,41 +404,18 @@ body {
478
  font-size: 16px;
479
  }
480
  .state-dot {
481
- width: 8px;
482
- height: 8px;
483
  border-radius: 50%;
484
  background: var(--green);
485
  box-shadow: 0 0 6px var(--green);
486
  flex-shrink: 0;
487
- transition:
488
- background 0.3s,
489
- box-shadow 0.3s;
490
- }
491
- .state-dot.listening {
492
- background: var(--accent);
493
- box-shadow: 0 0 8px var(--accent);
494
- animation: blink-dot 0.8s infinite;
495
- }
496
- .state-dot.recording {
497
- background: var(--red);
498
- box-shadow: 0 0 10px var(--red);
499
- animation: blink-dot 0.4s infinite;
500
- }
501
- .state-dot.processing {
502
- background: var(--yellow);
503
- box-shadow: 0 0 8px var(--yellow);
504
- animation: blink-dot 1s infinite;
505
- }
506
- .state-dot.speaking {
507
- background: var(--accent2);
508
- box-shadow: 0 0 10px var(--accent2);
509
- animation: blink-dot 0.6s infinite;
510
- }
511
- #state-label {
512
- font-size: 13px;
513
- color: var(--text2);
514
- font-family: 'JetBrains Mono', monospace;
515
  }
 
 
 
 
 
516
 
517
  .clear-btn {
518
  background: none;
@@ -525,10 +428,7 @@ body {
525
  font-family: 'Syne', sans-serif;
526
  transition: all var(--transition);
527
  }
528
- .clear-btn:hover {
529
- border-color: var(--accent);
530
- color: var(--accent);
531
- }
532
 
533
  /* ── Chat ── */
534
  #chat-box {
@@ -540,16 +440,9 @@ body {
540
  gap: 12px;
541
  scroll-behavior: smooth;
542
  }
543
- #chat-box::-webkit-scrollbar {
544
- width: 4px;
545
- }
546
- #chat-box::-webkit-scrollbar-track {
547
- background: transparent;
548
- }
549
- #chat-box::-webkit-scrollbar-thumb {
550
- background: var(--border2);
551
- border-radius: 99px;
552
- }
553
 
554
  .message {
555
  max-width: 75%;
@@ -563,14 +456,8 @@ body {
563
  font-family: 'Hind Siliguri', sans-serif;
564
  }
565
  @keyframes msg-in {
566
- from {
567
- opacity: 0;
568
- transform: translateY(10px) scale(0.97);
569
- }
570
- to {
571
- opacity: 1;
572
- transform: translateY(0) scale(1);
573
- }
574
  }
575
  .message.user {
576
  background: var(--user-bg);
@@ -592,17 +479,9 @@ body {
592
  align-self: center;
593
  max-width: 90%;
594
  }
595
- .message ul,
596
- .message ol {
597
- padding-left: 20px;
598
- margin: 8px 0;
599
- }
600
- .message li {
601
- margin-bottom: 4px;
602
- }
603
- .message p {
604
- margin: 6px 0;
605
- }
606
  .message code {
607
  background: rgba(0, 0, 0, 0.3);
608
  border-radius: 4px;
@@ -629,9 +508,7 @@ body {
629
  transition: height 0.3s ease;
630
  padding: 0 20px;
631
  }
632
- .voice-visualizer.active {
633
- height: 56px;
634
- }
635
  .viz-bar {
636
  width: 4px;
637
  border-radius: 99px;
@@ -652,7 +529,10 @@ body {
652
  display: flex;
653
  gap: 10px;
654
  margin-bottom: 12px;
 
655
  }
 
 
656
  #text-input {
657
  flex: 1;
658
  background: var(--bg3);
@@ -664,13 +544,21 @@ body {
664
  font-family: 'Hind Siliguri', sans-serif;
665
  outline: none;
666
  transition: border-color var(--transition);
667
- }
668
- #text-input::placeholder {
669
- color: var(--text3);
670
- }
671
- #text-input:focus {
672
- border-color: var(--accent);
673
- }
 
 
 
 
 
 
 
 
674
 
675
  #send-btn {
676
  background: linear-gradient(135deg, var(--accent), var(--accent2));
@@ -681,21 +569,15 @@ body {
681
  color: #000;
682
  display: flex;
683
  align-items: center;
684
- transition:
685
- opacity var(--transition),
686
- transform 0.1s;
687
- }
688
- #send-btn:hover {
689
- opacity: 0.88;
690
- }
691
- #send-btn:active {
692
- transform: scale(0.95);
693
  }
 
 
694
 
695
- .voice-row {
696
- display: flex;
697
- gap: 10px;
698
- }
699
  .mic-btn {
700
  flex: 1;
701
  display: flex;
@@ -722,41 +604,25 @@ body {
722
  opacity: 0;
723
  transition: opacity var(--transition);
724
  }
725
- .mic-btn:hover::before {
726
- opacity: 0.08;
727
- }
728
  .mic-btn.mic-listening {
729
  border-color: var(--accent);
730
- box-shadow:
731
- 0 0 0 2px rgba(34, 211, 238, 0.2),
732
- inset 0 0 20px rgba(34, 211, 238, 0.05);
733
  }
734
  .mic-btn.mic-recording {
735
  border-color: var(--red);
736
  animation: pulse-red 0.8s ease-in-out infinite;
737
  }
738
  @keyframes pulse-red {
739
- 0%,
740
- 100% {
741
- box-shadow: 0 0 0 0 rgba(248, 113, 113, 0.4);
742
- }
743
- 50% {
744
- box-shadow: 0 0 0 8px rgba(248, 113, 113, 0);
745
- }
746
  }
747
  .mic-btn.mic-processing {
748
  border-color: var(--yellow);
749
  box-shadow: 0 0 0 2px rgba(251, 191, 36, 0.15);
750
  }
751
- .mic-icon {
752
- font-size: 18px;
753
- position: relative;
754
- z-index: 1;
755
- }
756
- .mic-label {
757
- position: relative;
758
- z-index: 1;
759
- }
760
 
761
  .stop-btn {
762
  background: rgba(248, 113, 113, 0.1);
@@ -772,49 +638,26 @@ body {
772
  gap: 6px;
773
  transition: all var(--transition);
774
  }
775
- .stop-btn:hover {
776
- background: rgba(248, 113, 113, 0.2);
777
- border-color: var(--red);
778
- }
779
- .stop-btn:active {
780
- transform: scale(0.95);
781
- }
782
 
783
- /* ── Scrollbar ── */
784
- .sidebar::-webkit-scrollbar {
785
- width: 4px;
786
- }
787
- .sidebar::-webkit-scrollbar-track {
788
- background: transparent;
789
- }
790
- .sidebar::-webkit-scrollbar-thumb {
791
- background: var(--border);
792
- border-radius: 99px;
793
- }
794
 
795
  /* ── Responsive ── */
796
  @media (max-width: 680px) {
797
  .sidebar {
798
  position: fixed;
799
- left: 0;
800
- top: 0;
801
- bottom: 0;
802
  transform: translateX(-100%);
803
  z-index: 100;
804
  }
805
- .sidebar.mobile-open {
806
- transform: translateX(0);
807
- }
808
- .mobile-menu-btn {
809
- display: flex;
810
- }
811
- .topbar-center {
812
- font-size: 13px;
813
- }
814
- .message {
815
- max-width: 90%;
816
- font-size: 14px;
817
- }
818
  }
819
 
820
  /* ── Thinking bubble (animated "..." while AI processes) ── */
@@ -831,8 +674,7 @@ body {
831
  }
832
  .message.thinking .dot {
833
  display: inline-block;
834
- width: 7px;
835
- height: 7px;
836
  border-radius: 50%;
837
  background: var(--accent2);
838
  opacity: 0.4;
 
48
  animation: orb-float 12s ease-in-out infinite;
49
  }
50
  .orb-1 {
51
+ width: 500px; height: 500px;
 
52
  background: radial-gradient(circle, #22d3ee, transparent);
53
+ top: -200px; left: -150px;
 
54
  animation-delay: 0s;
55
  }
56
  .orb-2 {
57
+ width: 400px; height: 400px;
 
58
  background: radial-gradient(circle, #818cf8, transparent);
59
+ bottom: -100px; right: -100px;
 
60
  animation-delay: -4s;
61
  }
62
  .orb-3 {
63
+ width: 300px; height: 300px;
 
64
  background: radial-gradient(circle, #f472b6, transparent);
65
+ top: 50%; left: 50%;
 
66
  transform: translate(-50%, -50%);
67
  animation-delay: -8s;
68
  }
69
  @keyframes orb-float {
70
+ 0%, 100% { transform: translate(0, 0) scale(1); }
71
+ 33% { transform: translate(30px, -20px) scale(1.05); }
72
+ 66% { transform: translate(-20px, 15px) scale(0.97); }
 
 
 
 
 
 
 
73
  }
74
 
75
  /* ── Init overlay ── */
 
81
  align-items: center;
82
  justify-content: center;
83
  background: var(--bg);
84
+ transition: opacity 0.6s ease, visibility 0.6s ease;
 
 
85
  }
86
  .init-overlay.hidden {
87
  opacity: 0;
 
104
  animation: logo-pulse 2s ease-in-out infinite;
105
  }
106
  @keyframes logo-pulse {
107
+ 0%, 100% { filter: drop-shadow(0 0 12px rgba(34, 211, 238, 0.4)); transform: scale(1); }
108
+ 50% { filter: drop-shadow(0 0 24px rgba(129, 140, 248, 0.6)); transform: scale(1.06); }
 
 
 
 
 
 
 
109
  }
110
  .init-title {
111
  font-family: 'Syne', sans-serif;
 
137
  border-bottom: 1px solid var(--border);
138
  transition: color 0.3s;
139
  }
140
+ .stage.active { color: var(--accent); }
141
+ .stage.done { color: var(--green); }
 
 
 
 
142
  .stage-dot {
143
+ width: 8px; height: 8px;
 
144
  border-radius: 50%;
145
  background: var(--text3);
146
  flex-shrink: 0;
147
+ transition: background 0.3s, box-shadow 0.3s;
 
 
148
  }
149
  .stage.active .stage-dot {
150
  background: var(--accent);
151
  box-shadow: 0 0 8px var(--accent);
152
  animation: blink-dot 0.8s ease-in-out infinite;
153
  }
154
+ .stage.done .stage-dot { background: var(--green); }
 
 
155
  @keyframes blink-dot {
156
+ 0%, 100% { opacity: 1; }
157
+ 50% { opacity: 0.3; }
 
 
 
 
 
158
  }
159
  .stage-check {
160
  margin-left: auto;
161
  opacity: 0;
162
  transition: opacity 0.3s;
163
  }
164
+ .stage.done .stage-check { opacity: 1; }
 
 
165
  .stage span {
166
  flex: 1;
167
  font-family: 'Hind Siliguri', sans-serif;
168
  }
 
169
  .init-bar-wrap {
170
  background: var(--bg3);
171
  border-radius: 99px;
 
188
  font-family: 'JetBrains Mono', monospace;
189
  }
190
 
191
+ /* ── App layout ──
192
+ FIX-7: App is hidden by default via opacity/pointer-events.
193
+ JS adds class .visible after init overlay closes. This prevents
194
+ any flash of unstyled content (FOUC) during JS execution.
195
+ ── */
196
  .app {
197
  position: fixed;
198
  inset: 0;
199
  z-index: 1;
200
  display: flex;
201
+ opacity: 0;
202
+ pointer-events: none;
203
  transition: opacity 0.5s ease;
204
  }
205
+ .app.visible {
206
+ opacity: 1;
207
+ pointer-events: auto;
208
+ }
209
 
210
  /* ── Sidebar ── */
211
  .sidebar {
 
216
  flex-direction: column;
217
  flex-shrink: 0;
218
  overflow-y: auto;
219
+ transition: width var(--transition), transform var(--transition);
 
 
220
  z-index: 10;
221
  }
222
  .sidebar.collapsed {
 
254
  color: var(--text);
255
  }
256
 
257
+ .status-panel { padding: 16px; }
 
 
258
  .status-row {
259
  display: flex;
260
  align-items: center;
261
  justify-content: space-between;
262
  padding: 6px 0;
263
  }
264
+ .status-label { font-size: 12px; color: var(--text2); }
 
 
 
265
  .status-badge {
266
  font-size: 10px;
267
  font-family: 'JetBrains Mono', monospace;
 
270
  font-weight: 600;
271
  letter-spacing: 0.03em;
272
  }
273
+ .badge-green { background: rgba(74, 222, 128, 0.12); color: var(--green); }
274
+ .badge-yellow { background: rgba(251, 191, 36, 0.12); color: var(--yellow); }
275
+ .badge-red { background: rgba(248, 113, 113, 0.12); color: var(--red); }
 
 
 
 
 
 
 
 
 
276
 
277
+ .sidebar-divider { height: 1px; background: var(--border); margin: 4px 0; }
 
 
 
 
278
 
279
+ .dash-section { padding: 16px; }
 
 
280
  .dash-title {
281
  font-size: 11px;
282
  font-weight: 700;
 
305
  line-height: 1;
306
  margin-bottom: 4px;
307
  }
308
+ .metric-label { font-size: 10px; color: var(--text3); }
 
 
 
309
 
310
+ .setting-row { margin-bottom: 14px; }
 
 
311
  .setting-row label {
312
  display: block;
313
  font-size: 11px;
314
  color: var(--text2);
315
  margin-bottom: 6px;
316
  }
317
+ .slider-wrap { display: flex; align-items: center; gap: 8px; }
 
 
 
 
318
  .slider-wrap input[type='range'] {
319
  flex: 1;
320
  accent-color: var(--accent);
 
339
  font-family: 'Hind Siliguri', sans-serif;
340
  cursor: pointer;
341
  }
342
+ .setting-select:focus { outline: none; border-color: var(--accent); }
 
 
 
343
 
344
  .queue-vis {
345
  display: flex;
 
353
  background: var(--accent);
354
  border-radius: 3px;
355
  opacity: 0.3;
356
+ transition: height 0.15s ease, opacity 0.15s ease;
 
 
357
  min-height: 4px;
358
  }
359
+ .queue-bar.active { opacity: 0.9; }
 
 
360
  .queue-label {
361
  font-size: 11px;
362
  color: var(--text2);
 
382
  border-bottom: 1px solid var(--border);
383
  flex-shrink: 0;
384
  }
385
+ .topbar-left { display: flex; align-items: center; gap: 12px; }
 
 
 
 
386
  .topbar-center {
387
  font-family: 'Syne', sans-serif;
388
  font-weight: 700;
 
392
  left: 50%;
393
  transform: translateX(-50%);
394
  }
395
+ .topbar-right { display: flex; gap: 8px; }
 
 
 
396
  .mobile-menu-btn {
397
  display: none;
398
  background: none;
 
404
  font-size: 16px;
405
  }
406
  .state-dot {
407
+ width: 8px; height: 8px;
 
408
  border-radius: 50%;
409
  background: var(--green);
410
  box-shadow: 0 0 6px var(--green);
411
  flex-shrink: 0;
412
+ transition: background 0.3s, box-shadow 0.3s;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
413
  }
414
+ .state-dot.listening { background: var(--accent); box-shadow: 0 0 8px var(--accent); animation: blink-dot 0.8s infinite; }
415
+ .state-dot.recording { background: var(--red); box-shadow: 0 0 10px var(--red); animation: blink-dot 0.4s infinite; }
416
+ .state-dot.processing { background: var(--yellow); box-shadow: 0 0 8px var(--yellow); animation: blink-dot 1s infinite; }
417
+ .state-dot.speaking { background: var(--accent2); box-shadow: 0 0 10px var(--accent2);animation: blink-dot 0.6s infinite; }
418
+ #state-label { font-size: 13px; color: var(--text2); font-family: 'JetBrains Mono', monospace; }
419
 
420
  .clear-btn {
421
  background: none;
 
428
  font-family: 'Syne', sans-serif;
429
  transition: all var(--transition);
430
  }
431
+ .clear-btn:hover { border-color: var(--accent); color: var(--accent); }
 
 
 
432
 
433
  /* ── Chat ── */
434
  #chat-box {
 
440
  gap: 12px;
441
  scroll-behavior: smooth;
442
  }
443
+ #chat-box::-webkit-scrollbar { width: 4px; }
444
+ #chat-box::-webkit-scrollbar-track { background: transparent; }
445
+ #chat-box::-webkit-scrollbar-thumb { background: var(--border2); border-radius: 99px; }
 
 
 
 
 
 
 
446
 
447
  .message {
448
  max-width: 75%;
 
456
  font-family: 'Hind Siliguri', sans-serif;
457
  }
458
  @keyframes msg-in {
459
+ from { opacity: 0; transform: translateY(10px) scale(0.97); }
460
+ to { opacity: 1; transform: translateY(0) scale(1); }
 
 
 
 
 
 
461
  }
462
  .message.user {
463
  background: var(--user-bg);
 
479
  align-self: center;
480
  max-width: 90%;
481
  }
482
+ .message ul, .message ol { padding-left: 20px; margin: 8px 0; }
483
+ .message li { margin-bottom: 4px; }
484
+ .message p { margin: 6px 0; }
 
 
 
 
 
 
 
 
485
  .message code {
486
  background: rgba(0, 0, 0, 0.3);
487
  border-radius: 4px;
 
508
  transition: height 0.3s ease;
509
  padding: 0 20px;
510
  }
511
+ .voice-visualizer.active { height: 56px; }
 
 
512
  .viz-bar {
513
  width: 4px;
514
  border-radius: 99px;
 
529
  display: flex;
530
  gap: 10px;
531
  margin-bottom: 12px;
532
+ align-items: flex-end; /* FIX-4: align send button to bottom when textarea grows */
533
  }
534
+
535
+ /* ── FIX-4: Auto-growing textarea replaces <input type="text"> ── */
536
  #text-input {
537
  flex: 1;
538
  background: var(--bg3);
 
544
  font-family: 'Hind Siliguri', sans-serif;
545
  outline: none;
546
  transition: border-color var(--transition);
547
+ resize: none; /* FIX-4: no manual resize handle */
548
+ overflow-y: hidden; /* hidden until 10 lines exceeded; JS manages */
549
+ line-height: 1.57; /* ~22px per line at font-size 14px */
550
+ min-height: 44px; /* single line min */
551
+ max-height: 226px; /* 10 lines × 22px + 16px padding */
552
+ display: block;
553
+ /* smooth height animation */
554
+ transition: border-color var(--transition), height 0.1s ease;
555
+ }
556
+ #text-input::placeholder { color: var(--text3); }
557
+ #text-input:focus { border-color: var(--accent); }
558
+ /* Custom scrollbar inside textarea once > 10 lines */
559
+ #text-input::-webkit-scrollbar { width: 4px; }
560
+ #text-input::-webkit-scrollbar-track { background: transparent; }
561
+ #text-input::-webkit-scrollbar-thumb { background: var(--border2); border-radius: 99px; }
562
 
563
  #send-btn {
564
  background: linear-gradient(135deg, var(--accent), var(--accent2));
 
569
  color: #000;
570
  display: flex;
571
  align-items: center;
572
+ transition: opacity var(--transition), transform 0.1s;
573
+ flex-shrink: 0;
574
+ align-self: flex-end; /* FIX-4: stays at bottom as textarea grows */
575
+ height: 44px;
 
 
 
 
 
576
  }
577
+ #send-btn:hover { opacity: 0.88; }
578
+ #send-btn:active { transform: scale(0.95); }
579
 
580
+ .voice-row { display: flex; gap: 10px; }
 
 
 
581
  .mic-btn {
582
  flex: 1;
583
  display: flex;
 
604
  opacity: 0;
605
  transition: opacity var(--transition);
606
  }
607
+ .mic-btn:hover::before { opacity: 0.08; }
 
 
608
  .mic-btn.mic-listening {
609
  border-color: var(--accent);
610
+ box-shadow: 0 0 0 2px rgba(34, 211, 238, 0.2), inset 0 0 20px rgba(34, 211, 238, 0.05);
 
 
611
  }
612
  .mic-btn.mic-recording {
613
  border-color: var(--red);
614
  animation: pulse-red 0.8s ease-in-out infinite;
615
  }
616
  @keyframes pulse-red {
617
+ 0%, 100% { box-shadow: 0 0 0 0 rgba(248, 113, 113, 0.4); }
618
+ 50% { box-shadow: 0 0 0 8px rgba(248, 113, 113, 0); }
 
 
 
 
 
619
  }
620
  .mic-btn.mic-processing {
621
  border-color: var(--yellow);
622
  box-shadow: 0 0 0 2px rgba(251, 191, 36, 0.15);
623
  }
624
+ .mic-icon { font-size: 18px; position: relative; z-index: 1; }
625
+ .mic-label { position: relative; z-index: 1; }
 
 
 
 
 
 
 
626
 
627
  .stop-btn {
628
  background: rgba(248, 113, 113, 0.1);
 
638
  gap: 6px;
639
  transition: all var(--transition);
640
  }
641
+ .stop-btn:hover { background: rgba(248, 113, 113, 0.2); border-color: var(--red); }
642
+ .stop-btn:active { transform: scale(0.95); }
 
 
 
 
 
643
 
644
+ /* ── Scrollbar (sidebar) ── */
645
+ .sidebar::-webkit-scrollbar { width: 4px; }
646
+ .sidebar::-webkit-scrollbar-track { background: transparent; }
647
+ .sidebar::-webkit-scrollbar-thumb { background: var(--border); border-radius: 99px; }
 
 
 
 
 
 
 
648
 
649
  /* ── Responsive ── */
650
  @media (max-width: 680px) {
651
  .sidebar {
652
  position: fixed;
653
+ left: 0; top: 0; bottom: 0;
 
 
654
  transform: translateX(-100%);
655
  z-index: 100;
656
  }
657
+ .sidebar.mobile-open { transform: translateX(0); }
658
+ .mobile-menu-btn { display: flex; }
659
+ .topbar-center { font-size: 13px; }
660
+ .message { max-width: 90%; font-size: 14px; }
 
 
 
 
 
 
 
 
 
661
  }
662
 
663
  /* ── Thinking bubble (animated "..." while AI processes) ── */
 
674
  }
675
  .message.thinking .dot {
676
  display: inline-block;
677
+ width: 7px; height: 7px;
 
678
  border-radius: 50%;
679
  background: var(--accent2);
680
  opacity: 0.4;
services/streaming.py CHANGED
@@ -1,6 +1,24 @@
1
  """
2
  services/streaming.py — Production-grade parallel TTS streamer
3
- (unchanged from original — architecture is correct)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  """
5
 
6
  from __future__ import annotations
@@ -12,25 +30,27 @@ from typing import AsyncGenerator
12
 
13
  from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE
14
 
15
- if USE_ELEVENLABS:
16
- FIRST_FLUSH_BOUNDARY_MIN = 5
17
- FIRST_FLUSH_HARD = 25
18
- SUBSEQUENT_FLUSH_BOUNDARY_MIN = 22
19
- SUBSEQUENT_FLUSH_HARD = 65
20
- _backend_label = "ElevenLabs"
21
- else:
22
- FIRST_FLUSH_BOUNDARY_MIN = 5
23
- FIRST_FLUSH_HARD = 25
24
- SUBSEQUENT_FLUSH_BOUNDARY_MIN = 18
25
- SUBSEQUENT_FLUSH_HARD = 65
26
- _backend_label = "Edge-TTS"
27
-
28
- print(f"[Streamer] TTS backend: {_backend_label}")
29
-
30
- MIN_CHARS = 2
 
 
31
  SENTENCE_BOUNDARIES = frozenset(".!?।॥\n")
32
  CLAUSE_BOUNDARIES = frozenset(",;:—–")
33
- _SENTINEL = object()
34
 
35
 
36
  def _clean_for_tts(text: str) -> str:
@@ -47,15 +67,28 @@ def _should_flush(buffer: str, first_chunk: bool) -> bool:
47
  n = len(buffer)
48
  if n == 0:
49
  return False
50
- boundary_min = FIRST_FLUSH_BOUNDARY_MIN if first_chunk else SUBSEQUENT_FLUSH_BOUNDARY_MIN
51
- hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
 
 
 
52
  if n >= hard_limit:
53
  return True
 
54
  last_char = buffer[-1]
55
- if last_char in SENTENCE_BOUNDARIES and n >= boundary_min:
 
 
 
 
 
 
56
  return True
57
- if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.75:
 
 
58
  return True
 
59
  return False
60
 
61
 
@@ -92,8 +125,9 @@ class ParallelTTSStreamer:
92
 
93
  async def _schedule_chunk(self) -> None:
94
  if self._cancelled:
95
- self.buffer = ""; return
96
- text = _clean_for_tts(self.buffer.strip())
 
97
  self.buffer = ""
98
  if len(text) < MIN_CHARS:
99
  return
@@ -104,11 +138,14 @@ class ParallelTTSStreamer:
104
  self._slot_added.set()
105
  task = asyncio.create_task(self._synthesise(text, slot))
106
  self._tasks.append(task)
107
- task.add_done_callback(lambda t: self._tasks.remove(t) if t in self._tasks else None)
 
 
108
 
109
  async def _synthesise(self, text: str, slot: _AudioSlot) -> None:
110
  if self._cancelled:
111
- slot.mark_error(); return
 
112
  try:
113
  async for chunk in text_to_speech_stream(text, voice=self.voice):
114
  if self._cancelled:
@@ -128,40 +165,69 @@ class ParallelTTSStreamer:
128
 
129
  async def cancel(self) -> None:
130
  self._cancelled = True
131
- tasks = list(self._tasks); self._tasks.clear()
132
- for t in tasks: t.cancel()
 
 
133
  if tasks:
134
  await asyncio.gather(*tasks, return_exceptions=True)
135
  async with self._slots_lock:
136
  for slot in self._slots:
137
- if not slot.done: slot.mark_error()
 
138
  self._llm_done.set()
139
  self._slot_added.set()
140
 
141
  async def stream_audio(self) -> AsyncGenerator[bytes, None]:
 
 
 
 
 
 
 
 
 
142
  delivered = 0
143
  while True:
144
  async with self._slots_lock:
145
  slot = self._slots[delivered] if delivered < len(self._slots) else None
 
146
  if slot is None:
147
  if self._llm_done.is_set():
148
  async with self._slots_lock:
149
  total = len(self._slots)
150
  if delivered >= total:
151
- break
 
 
152
  self._slot_added.clear()
 
 
 
 
153
  try:
154
  await asyncio.wait_for(self._slot_added.wait(), timeout=10.0)
155
  except asyncio.TimeoutError:
156
- print("[Streamer] Timeout waiting for TTS slot."); break
 
157
  continue
 
 
158
  while True:
159
  item = await slot.queue.get()
160
- if item is _SENTINEL: break
161
- if not self._cancelled: yield item
 
 
162
  delivered += 1
163
 
164
  def reset(self) -> None:
165
- self._cancelled = False; self._first_chunk = True; self.buffer = ""
166
- self._slot_index = 0; self._slots.clear(); self._tasks.clear()
167
- self._llm_done.clear(); self._slot_added.clear()
 
 
 
 
 
 
1
  """
2
  services/streaming.py — Production-grade parallel TTS streamer
3
+
4
+ FIX-ISSUE4 (Natural, slow, small-chunk TTS):
5
+ The previous code used character-count thresholds that produced large
6
+ sentence-level chunks (25–65 chars), causing buffered, robotic-feeling
7
+ speech with a burst of audio at once.
8
+
9
+ New behaviour:
10
+ • Flush at word boundaries (2–3 words) for voice-like pacing.
11
+ • Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which
12
+ corresponds to roughly 2–3 average Bengali/English words.
13
+ • Hard limit of 40 chars ensures no chunk ever gets too large.
14
+ • Sentence-ending punctuation (।.!?) always flushes immediately
15
+ regardless of length, giving natural pause points.
16
+ • The TTS rate is set to "-35%" in tts.py (slightly slower than before).
17
+
18
+ Result: audio arrives in small, fast, overlapping synthesis tasks,
19
+ giving a low-latency, smooth, natural speech feel.
20
+
21
+ FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version.
22
  """
23
 
24
  from __future__ import annotations
 
30
 
31
  from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE
32
 
33
+ # ── Chunk size tuning ──────────────────────────────────────────────────────────
34
+ # These character counts correspond roughly to:
35
+ # FIRST_FLUSH_MIN ~2 words (get audio playing ASAP)
36
+ # SUBSEQUENT_FLUSH_MIN ~3 words (natural conversational phrase)
37
+ # HARD_LIMIT ~6 words (never accumulate more than this)
38
+ #
39
+ # At average Bengali word length ~4–5 chars + space:
40
+ # 10 chars ≈ 2 words, 18 chars ≈ 3-4 words, 40 chars ≈ 7-8 words
41
+
42
+ FIRST_FLUSH_MIN = 10
43
+ FIRST_FLUSH_HARD = 30
44
+ SUBSEQUENT_FLUSH_MIN = 18
45
+ SUBSEQUENT_FLUSH_HARD = 40
46
+
47
+ _backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS"
48
+ print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}–{SUBSEQUENT_FLUSH_HARD} chars")
49
+
50
+ MIN_CHARS = 2
51
  SENTENCE_BOUNDARIES = frozenset(".!?।॥\n")
52
  CLAUSE_BOUNDARIES = frozenset(",;:—–")
53
+ _SENTINEL = object()
54
 
55
 
56
  def _clean_for_tts(text: str) -> str:
 
67
  n = len(buffer)
68
  if n == 0:
69
  return False
70
+
71
+ flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN
72
+ hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD
73
+
74
+ # Hard limit — always flush regardless of boundary
75
  if n >= hard_limit:
76
  return True
77
+
78
  last_char = buffer[-1]
79
+
80
+ # Sentence ending — flush immediately (natural pause point)
81
+ if last_char in SENTENCE_BOUNDARIES and n >= flush_min:
82
+ return True
83
+
84
+ # Clause boundary — flush at ~75% of hard limit
85
+ if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70:
86
  return True
87
+
88
+ # Word boundary (space after minimum words reached)
89
+ if last_char == ' ' and n >= flush_min:
90
  return True
91
+
92
  return False
93
 
94
 
 
125
 
126
  async def _schedule_chunk(self) -> None:
127
  if self._cancelled:
128
+ self.buffer = ""
129
+ return
130
+ text = _clean_for_tts(self.buffer.strip())
131
  self.buffer = ""
132
  if len(text) < MIN_CHARS:
133
  return
 
138
  self._slot_added.set()
139
  task = asyncio.create_task(self._synthesise(text, slot))
140
  self._tasks.append(task)
141
+ task.add_done_callback(
142
+ lambda t: self._tasks.remove(t) if t in self._tasks else None
143
+ )
144
 
145
  async def _synthesise(self, text: str, slot: _AudioSlot) -> None:
146
  if self._cancelled:
147
+ slot.mark_error()
148
+ return
149
  try:
150
  async for chunk in text_to_speech_stream(text, voice=self.voice):
151
  if self._cancelled:
 
165
 
166
  async def cancel(self) -> None:
167
  self._cancelled = True
168
+ tasks = list(self._tasks)
169
+ self._tasks.clear()
170
+ for t in tasks:
171
+ t.cancel()
172
  if tasks:
173
  await asyncio.gather(*tasks, return_exceptions=True)
174
  async with self._slots_lock:
175
  for slot in self._slots:
176
+ if not slot.done:
177
+ slot.mark_error()
178
  self._llm_done.set()
179
  self._slot_added.set()
180
 
181
  async def stream_audio(self) -> AsyncGenerator[bytes, None]:
182
+ """
183
+ Deliver TTS audio chunks in slot order.
184
+
185
+ FIX-BUG5 — double-check pattern eliminates TOCTOU race:
186
+ 1. clear() the event
187
+ 2. Re-check slot list under lock (slot may have been added between
188
+ previous check and clear())
189
+ 3. Only then wait() — so we never miss a newly-added slot
190
+ """
191
  delivered = 0
192
  while True:
193
  async with self._slots_lock:
194
  slot = self._slots[delivered] if delivered < len(self._slots) else None
195
+
196
  if slot is None:
197
  if self._llm_done.is_set():
198
  async with self._slots_lock:
199
  total = len(self._slots)
200
  if delivered >= total:
201
+ break # All slots consumed; done.
202
+
203
+ # FIX-BUG5: clear → re-check → wait
204
  self._slot_added.clear()
205
+ async with self._slots_lock:
206
+ have_new = delivered < len(self._slots)
207
+ if have_new:
208
+ continue
209
  try:
210
  await asyncio.wait_for(self._slot_added.wait(), timeout=10.0)
211
  except asyncio.TimeoutError:
212
+ print("[Streamer] Timeout waiting for TTS slot.")
213
+ break
214
  continue
215
+
216
+ # Drain this slot's audio queue in order
217
  while True:
218
  item = await slot.queue.get()
219
+ if item is _SENTINEL:
220
+ break
221
+ if not self._cancelled:
222
+ yield item
223
  delivered += 1
224
 
225
  def reset(self) -> None:
226
+ self._cancelled = False
227
+ self._first_chunk = True
228
+ self.buffer = ""
229
+ self._slot_index = 0
230
+ self._slots.clear()
231
+ self._tasks.clear()
232
+ self._llm_done.clear()
233
+ self._slot_added.clear()
services/stt.py CHANGED
@@ -12,9 +12,18 @@ Architecture:
12
  • GPU inference runs in a dedicated single-thread Executor (serialize GPU)
13
  • Bangla-optimised decode parameters preserved from original
14
 
 
 
 
 
 
 
 
 
 
15
  Latency profile:
16
  ffmpeg (parallel) ~30–80 ms
17
- batch wait window ~50 ms
18
  GPU inference ~80–150 ms per batch (amortised across requests)
19
  Total perceived < 200 ms at moderate load
20
  """
@@ -37,7 +46,7 @@ from typing import Optional
37
  from faster_whisper import WhisperModel
38
 
39
  # ── Bangla script patterns ─────────────────────────────────────────────────────
40
- _BANGLA_RE = re.compile(r"[\u0980-\u09FF]")
41
  _WRONG_SCRIPT_RE = re.compile(
42
  r"[\u0600-\u06FF\u0750-\u077F\uFB50-\uFDFF\uFE70-\uFEFF]"
43
  )
@@ -46,25 +55,27 @@ _WRONG_SCRIPT_RE = re.compile(
46
  _BANGLA_SEED = "আমি আপনার সাথে বাংলায় কথা বলছি।"
47
 
48
  # ── Configuration ──────────────────────────────────────────────────────────────
49
- _STT_MODEL = os.getenv("STT_MODEL", "large-v3")
50
- _COMPUTE_TYPE = os.getenv("STT_COMPUTE_TYPE", "int8_float32")
51
- _BATCH_WINDOW = float(os.getenv("STT_BATCH_WINDOW_MS", "50")) / 1000 # seconds
52
- _MAX_BATCH = int(os.getenv("STT_MAX_BATCH", "8"))
 
53
  MAX_INPUT_BYTES = 5_242_880 # 5 MB
54
 
55
  # ── Singleton model state ──────────────────────────────────────────────────────
56
  _model: Optional[WhisperModel] = None
57
  _model_lock = threading.Lock()
58
  _model_ready = threading.Event()
 
59
 
60
  # Two executors: one for ffmpeg (I/O, can be parallel), one for GPU (serial)
61
- _ffmpeg_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ffmpeg")
62
- _gpu_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="whisper-gpu")
63
 
64
 
65
  # ── Model loader (background thread) ──────────────────────────────────────────
66
  def _load_and_warm() -> None:
67
- global _model
68
  try:
69
  print(f"[STT] Loading Faster-Whisper {_STT_MODEL} on CUDA ({_COMPUTE_TYPE})…")
70
  m = WhisperModel(
@@ -80,16 +91,19 @@ def _load_and_warm() -> None:
80
  with _model_lock:
81
  _model = m
82
  except Exception as exc:
83
- print(f"[STT] Model load failed: {exc}")
 
84
  finally:
85
  _model_ready.set()
86
 
87
 
88
  def _make_silence_wav(duration_s: float = 0.5, sr: int = 16_000) -> io.BytesIO:
89
  buf = io.BytesIO()
90
- n = int(sr * duration_s)
91
  with wave.open(buf, "wb") as wf:
92
- wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(sr)
 
 
93
  wf.writeframes(struct.pack(f"<{n}h", *([0] * n)))
94
  buf.seek(0)
95
  return buf
@@ -105,7 +119,8 @@ def _to_wav_sync(audio_bytes: bytes) -> Optional[str]:
105
  in_path = out_path = None
106
  try:
107
  with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as f:
108
- f.write(audio_bytes); in_path = f.name
 
109
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
110
  out_path = f.name
111
 
@@ -135,8 +150,10 @@ def _to_wav_sync(audio_bytes: bytes) -> Optional[str]:
135
  return None
136
  finally:
137
  if in_path and os.path.exists(in_path):
138
- try: os.remove(in_path)
139
- except OSError: pass
 
 
140
 
141
 
142
  # ── Whisper inference (sync, runs in _gpu_pool — ONE AT A TIME) ───��───────────
@@ -144,8 +161,6 @@ def _transcribe_batch_sync(wav_paths: list[str]) -> list[Optional[str]]:
144
  """
145
  Run Whisper inference on a list of WAV paths.
146
  Returns a list of transcripts (None on error/empty).
147
- Each file is processed sequentially on the same GPU — this is intentional:
148
- batching here means we avoid per-request CUDA kernel spin-up overhead.
149
  """
150
  with _model_lock:
151
  model = _model
@@ -175,8 +190,10 @@ def _transcribe_batch_sync(wav_paths: list[str]) -> list[Optional[str]]:
175
  print(f"[STT] inference error: {exc}")
176
  results.append(None)
177
  finally:
178
- try: os.remove(path)
179
- except OSError: pass
 
 
180
 
181
  return results
182
 
@@ -185,7 +202,7 @@ def _transcribe_batch_sync(wav_paths: list[str]) -> list[Optional[str]]:
185
  def _validate(text: str) -> Optional[str]:
186
  if not text or not text.strip():
187
  return None
188
- text = text.strip()
189
  words = text.split()
190
  if len(words) >= 6 and len(set(words)) / len(words) < 0.25:
191
  print(f"[STT] rejected repetition: {text[:60]}")
@@ -193,8 +210,8 @@ def _validate(text: str) -> Optional[str]:
193
  if len(words) == 2 and words[0] == words[1]:
194
  return None
195
  # Soft script check — log but keep
196
- wrong = len(_WRONG_SCRIPT_RE.findall(text))
197
- alpha = sum(1 for c in text if c.isalpha())
198
  if alpha > 0 and wrong / alpha > 0.30:
199
  print(f"[STT] non-Bangla (kept): {text[:60]}")
200
  return text
@@ -217,26 +234,42 @@ class _STTBatchWorker:
217
  2. Collects requests for up to BATCH_WINDOW_MS
218
  3. Dispatches the batch to _gpu_pool in one call
219
  4. Resolves each caller's Future
 
 
 
 
 
 
 
220
  """
221
 
222
  def __init__(self) -> None:
223
- self._queue: asyncio.Queue[_STTRequest] = asyncio.Queue()
224
- self._started = False
225
-
226
- def _ensure_started(self) -> None:
227
- if not self._started:
228
- self._started = True
229
- asyncio.ensure_future(self._worker_loop())
 
 
 
 
 
 
 
 
230
 
231
  async def enqueue(self, wav_path: str) -> Optional[str]:
232
- self._ensure_started()
233
- loop = asyncio.get_event_loop()
 
234
  req = _STTRequest(wav_path=wav_path, future=loop.create_future())
235
  await self._queue.put(req)
236
  return await req.future
237
 
238
  async def _worker_loop(self) -> None:
239
- loop = asyncio.get_event_loop()
240
  while True:
241
  # Wait for at least one request
242
  first = await self._queue.get()
@@ -281,7 +314,7 @@ _batch_worker = _STTBatchWorker()
281
  class STTProcessor:
282
  """
283
  Drop-in replacement for the original STTProcessor.
284
- Now routes through the GPU batch worker for shared inference.
285
  """
286
 
287
  async def transcribe(self, audio_bytes: bytes) -> Optional[str]:
@@ -294,13 +327,19 @@ class STTProcessor:
294
  if len(audio_bytes) > MAX_INPUT_BYTES:
295
  audio_bytes = audio_bytes[:MAX_INPUT_BYTES]
296
 
297
- # Wait for model readiness (non-blocking)
298
  if not _model_ready.is_set():
299
- print("[STT] Waiting for model…")
300
- await asyncio.to_thread(_model_ready.wait)
 
 
 
 
 
 
301
 
302
  # ffmpeg: runs in parallel I/O pool (not serialised)
303
- loop = asyncio.get_event_loop()
304
  wav_path = await loop.run_in_executor(_ffmpeg_pool, _to_wav_sync, audio_bytes)
305
  if not wav_path:
306
  return None
 
12
  • GPU inference runs in a dedicated single-thread Executor (serialize GPU)
13
  • Bangla-optimised decode parameters preserved from original
14
 
15
+ FIX-BUG4 (race condition + deprecated API):
16
+ _STTBatchWorker now uses asyncio.Lock to safely initialise the worker
17
+ exactly once, even when multiple coroutines call enqueue() concurrently.
18
+ asyncio.get_event_loop() → asyncio.get_running_loop().
19
+
20
+ FIX-BUG6 (blocking wait without timeout):
21
+ asyncio.to_thread(_model_ready.wait) now passes timeout=60 and raises
22
+ RuntimeError if the model fails to load in time.
23
+
24
  Latency profile:
25
  ffmpeg (parallel) ~30–80 ms
26
+ batch wait window ~30 ms (reduced from 50ms)
27
  GPU inference ~80–150 ms per batch (amortised across requests)
28
  Total perceived < 200 ms at moderate load
29
  """
 
46
  from faster_whisper import WhisperModel
47
 
48
  # ── Bangla script patterns ─────────────────────────────────────────────────────
49
+ _BANGLA_RE = re.compile(r"[\u0980-\u09FF]")
50
  _WRONG_SCRIPT_RE = re.compile(
51
  r"[\u0600-\u06FF\u0750-\u077F\uFB50-\uFDFF\uFE70-\uFEFF]"
52
  )
 
55
  _BANGLA_SEED = "আমি আপনার সাথে বাংলায় কথা বলছি।"
56
 
57
  # ── Configuration ──────────────────────────────────────────────────────────────
58
+ _STT_MODEL = os.getenv("STT_MODEL", "large-v3")
59
+ _COMPUTE_TYPE = os.getenv("STT_COMPUTE_TYPE", "int8_float32")
60
+ _BATCH_WINDOW = float(os.getenv("STT_BATCH_WINDOW_MS", "30")) / 1000 # 30ms (was 50ms)
61
+ _MAX_BATCH = int(os.getenv("STT_MAX_BATCH", "8"))
62
+ _MODEL_LOAD_TIMEOUT = int(os.getenv("STT_MODEL_LOAD_TIMEOUT_S", "120")) # seconds
63
  MAX_INPUT_BYTES = 5_242_880 # 5 MB
64
 
65
  # ── Singleton model state ──────────────────────────────────────────────────────
66
  _model: Optional[WhisperModel] = None
67
  _model_lock = threading.Lock()
68
  _model_ready = threading.Event()
69
+ _model_error: Optional[str] = None
70
 
71
  # Two executors: one for ffmpeg (I/O, can be parallel), one for GPU (serial)
72
+ _ffmpeg_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ffmpeg")
73
+ _gpu_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="whisper-gpu")
74
 
75
 
76
  # ── Model loader (background thread) ──────────────────────────────────────────
77
  def _load_and_warm() -> None:
78
+ global _model, _model_error
79
  try:
80
  print(f"[STT] Loading Faster-Whisper {_STT_MODEL} on CUDA ({_COMPUTE_TYPE})…")
81
  m = WhisperModel(
 
91
  with _model_lock:
92
  _model = m
93
  except Exception as exc:
94
+ _model_error = str(exc)
95
+ print(f"[STT] Model load FAILED: {exc}")
96
  finally:
97
  _model_ready.set()
98
 
99
 
100
  def _make_silence_wav(duration_s: float = 0.5, sr: int = 16_000) -> io.BytesIO:
101
  buf = io.BytesIO()
102
+ n = int(sr * duration_s)
103
  with wave.open(buf, "wb") as wf:
104
+ wf.setnchannels(1)
105
+ wf.setsampwidth(2)
106
+ wf.setframerate(sr)
107
  wf.writeframes(struct.pack(f"<{n}h", *([0] * n)))
108
  buf.seek(0)
109
  return buf
 
119
  in_path = out_path = None
120
  try:
121
  with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as f:
122
+ f.write(audio_bytes)
123
+ in_path = f.name
124
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
125
  out_path = f.name
126
 
 
150
  return None
151
  finally:
152
  if in_path and os.path.exists(in_path):
153
+ try:
154
+ os.remove(in_path)
155
+ except OSError:
156
+ pass
157
 
158
 
159
  # ── Whisper inference (sync, runs in _gpu_pool — ONE AT A TIME) ───��───────────
 
161
  """
162
  Run Whisper inference on a list of WAV paths.
163
  Returns a list of transcripts (None on error/empty).
 
 
164
  """
165
  with _model_lock:
166
  model = _model
 
190
  print(f"[STT] inference error: {exc}")
191
  results.append(None)
192
  finally:
193
+ try:
194
+ os.remove(path)
195
+ except OSError:
196
+ pass
197
 
198
  return results
199
 
 
202
  def _validate(text: str) -> Optional[str]:
203
  if not text or not text.strip():
204
  return None
205
+ text = text.strip()
206
  words = text.split()
207
  if len(words) >= 6 and len(set(words)) / len(words) < 0.25:
208
  print(f"[STT] rejected repetition: {text[:60]}")
 
210
  if len(words) == 2 and words[0] == words[1]:
211
  return None
212
  # Soft script check — log but keep
213
+ wrong = len(_WRONG_SCRIPT_RE.findall(text))
214
+ alpha = sum(1 for c in text if c.isalpha())
215
  if alpha > 0 and wrong / alpha > 0.30:
216
  print(f"[STT] non-Bangla (kept): {text[:60]}")
217
  return text
 
234
  2. Collects requests for up to BATCH_WINDOW_MS
235
  3. Dispatches the batch to _gpu_pool in one call
236
  4. Resolves each caller's Future
237
+
238
+ FIX-BUG4 (race condition):
239
+ Uses asyncio.Lock to guarantee the worker task is created exactly once,
240
+ even when multiple coroutines call enqueue() before the task starts.
241
+
242
+ FIX-BUG4 (deprecated API):
243
+ Uses asyncio.get_running_loop() instead of asyncio.get_event_loop().
244
  """
245
 
246
  def __init__(self) -> None:
247
+ self._queue: asyncio.Queue[_STTRequest] = asyncio.Queue()
248
+ self._started: bool = False
249
+ self._start_lock: Optional[asyncio.Lock] = None # created on first use
250
+
251
+ def _get_lock(self) -> asyncio.Lock:
252
+ # asyncio.Lock must be created inside the running event loop
253
+ if self._start_lock is None:
254
+ self._start_lock = asyncio.Lock()
255
+ return self._start_lock
256
+
257
+ async def _ensure_started(self) -> None:
258
+ async with self._get_lock():
259
+ if not self._started:
260
+ self._started = True
261
+ asyncio.ensure_future(self._worker_loop())
262
 
263
  async def enqueue(self, wav_path: str) -> Optional[str]:
264
+ await self._ensure_started()
265
+ # FIX-BUG4: get_running_loop() is the correct modern API
266
+ loop = asyncio.get_running_loop()
267
  req = _STTRequest(wav_path=wav_path, future=loop.create_future())
268
  await self._queue.put(req)
269
  return await req.future
270
 
271
  async def _worker_loop(self) -> None:
272
+ loop = asyncio.get_running_loop()
273
  while True:
274
  # Wait for at least one request
275
  first = await self._queue.get()
 
314
  class STTProcessor:
315
  """
316
  Drop-in replacement for the original STTProcessor.
317
+ Routes through the GPU batch worker for shared inference.
318
  """
319
 
320
  async def transcribe(self, audio_bytes: bytes) -> Optional[str]:
 
327
  if len(audio_bytes) > MAX_INPUT_BYTES:
328
  audio_bytes = audio_bytes[:MAX_INPUT_BYTES]
329
 
330
+ # FIX-BUG6: wait for model with timeout — not forever
331
  if not _model_ready.is_set():
332
+ print("[STT] Waiting for model to load…")
333
+ ready = await asyncio.to_thread(_model_ready.wait, _MODEL_LOAD_TIMEOUT)
334
+ if not ready:
335
+ raise RuntimeError(
336
+ f"[STT] Whisper model did not load within {_MODEL_LOAD_TIMEOUT}s"
337
+ )
338
+ if _model_error:
339
+ raise RuntimeError(f"[STT] Whisper model failed to load: {_model_error}")
340
 
341
  # ffmpeg: runs in parallel I/O pool (not serialised)
342
+ loop = asyncio.get_running_loop()
343
  wav_path = await loop.run_in_executor(_ffmpeg_pool, _to_wav_sync, audio_bytes)
344
  if not wav_path:
345
  return None
services/tts.py CHANGED
@@ -1,6 +1,15 @@
1
  """
2
  services/tts.py — Ultra Low-Latency Dual TTS Backend
3
- (unchanged public API — streaming.py imports text_to_speech_stream + USE_ELEVENLABS)
 
 
 
 
 
 
 
 
 
4
  """
5
 
6
  from dotenv import load_dotenv
@@ -22,18 +31,32 @@ ELEVENLABS_SPEAKER_BOOST = True
22
  if USE_ELEVENLABS and not ELEVENLABS_API_KEY:
23
  raise RuntimeError("[TTS] ELEVENLABS_API_KEY missing")
24
 
25
- print(f"[TTS] Backend: {'ElevenLabs' if USE_ELEVENLABS else 'Edge-TTS'}")
26
 
27
 
28
  def split_sentences(text: str) -> list[str]:
 
 
 
 
 
 
 
29
  text = text.strip()
30
  if not text:
31
  return []
32
- parts = re.split(r'(?<=[।.!?])\s+', text)
 
 
 
33
  return [p.strip() for p in parts if len(p.strip()) > 1]
34
 
35
 
36
- async def _edge_tts_stream(text: str, voice: str = EDGE_VOICE, rate: str = "-30%"):
 
 
 
 
37
  import edge_tts
38
  text = text.strip()
39
  if not text:
@@ -61,17 +84,28 @@ async def _elevenlabs_stream(
61
  text = text.strip()
62
  if not text:
63
  return
64
- url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}/stream"
65
- headers = {"xi-api-key": ELEVENLABS_API_KEY, "Content-Type": "application/json", "Accept": "audio/mpeg"}
 
 
 
 
66
  payload = {
67
- "text": text, "model_id": model_id,
68
- "voice_settings": {"stability": stability, "similarity_boost": similarity,
69
- "style": style, "use_speaker_boost": speaker_boost},
 
 
 
 
 
70
  }
71
  try:
72
  async with httpx.AsyncClient(timeout=httpx.Timeout(connect=5.0, read=None)) as client:
73
- async with client.stream("POST", url, headers=headers, json=payload,
74
- params={"output_format": output_format}) as resp:
 
 
75
  if resp.status_code != 200:
76
  print(f"[TTS][ElevenLabs] HTTP {resp.status_code}")
77
  return
@@ -83,7 +117,18 @@ async def _elevenlabs_stream(
83
  print(f"[TTS][ElevenLabs] {exc}")
84
 
85
 
86
- async def text_to_speech_stream(text: str, voice: str | None = None, rate: str = "-30%"):
 
 
 
 
 
 
 
 
 
 
 
87
  text = text.strip()
88
  if not text:
89
  return
@@ -108,11 +153,11 @@ async def text_to_speech_stream(text: str, voice: str | None = None, rate: str =
108
  finally:
109
  await q.put(_SENT)
110
 
111
- # Create one queue per sentence, launch all synthesis tasks immediately
112
  queues = [asyncio.Queue() for _ in parts]
113
  tasks = [asyncio.create_task(_synth_part(p, q)) for p, q in zip(parts, queues)]
114
 
115
- # Deliver audio in sentence order, but all sentences synthesise in parallel
116
  try:
117
  for q in queues:
118
  while True:
 
1
  """
2
  services/tts.py — Ultra Low-Latency Dual TTS Backend
3
+
4
+ FIX-ISSUE4 (Natural, slow TTS):
5
+ • Default rate changed from "-30%" to "-35%" — approximately 35% slower
6
+ than the Edge TTS default, giving a calm, natural speaking pace.
7
+ • split_sentences() now splits on ALL clause delimiters (commas, colons,
8
+ em-dashes) in addition to sentence endings, so synthesis tasks are
9
+ smaller and start sooner. This pairs with streaming.py's 2–3 word
10
+ flush threshold for maximum low-latency playback.
11
+ • Parallel synthesis of all parts preserved (all parts synthesised
12
+ concurrently; delivered in order).
13
  """
14
 
15
  from dotenv import load_dotenv
 
31
  if USE_ELEVENLABS and not ELEVENLABS_API_KEY:
32
  raise RuntimeError("[TTS] ELEVENLABS_API_KEY missing")
33
 
34
+ print(f"[TTS] Backend: {'ElevenLabs' if USE_ELEVENLABS else 'Edge-TTS'} | rate: -35%")
35
 
36
 
37
  def split_sentences(text: str) -> list[str]:
38
+ """
39
+ Split text into small synthesis chunks for low-latency streaming.
40
+
41
+ FIX-ISSUE4: Split on sentence boundaries AND clause boundaries so each
42
+ TTS task is small (a phrase, not a full sentence). This allows synthesis
43
+ to start sooner for later parts of a long response.
44
+ """
45
  text = text.strip()
46
  if not text:
47
  return []
48
+ # Split on sentence-ending punctuation AND clause delimiters
49
+ # The lookbehind keeps the delimiter attached to the preceding chunk.
50
+ parts = re.split(r'(?<=[।.!?,;:—–])\s+', text)
51
+ # Filter out anything too short to synthesise (punctuation-only fragments)
52
  return [p.strip() for p in parts if len(p.strip()) > 1]
53
 
54
 
55
+ async def _edge_tts_stream(text: str, voice: str = EDGE_VOICE, rate: str = "-35%"):
56
+ """
57
+ Stream Edge-TTS audio for a single text chunk.
58
+ FIX-ISSUE4: Default rate is now -35% (was -30%) for slower, natural speech.
59
+ """
60
  import edge_tts
61
  text = text.strip()
62
  if not text:
 
84
  text = text.strip()
85
  if not text:
86
  return
87
+ url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}/stream"
88
+ headers = {
89
+ "xi-api-key": ELEVENLABS_API_KEY,
90
+ "Content-Type": "application/json",
91
+ "Accept": "audio/mpeg",
92
+ }
93
  payload = {
94
+ "text": text,
95
+ "model_id": model_id,
96
+ "voice_settings": {
97
+ "stability": stability,
98
+ "similarity_boost": similarity,
99
+ "style": style,
100
+ "use_speaker_boost": speaker_boost,
101
+ },
102
  }
103
  try:
104
  async with httpx.AsyncClient(timeout=httpx.Timeout(connect=5.0, read=None)) as client:
105
+ async with client.stream(
106
+ "POST", url, headers=headers, json=payload,
107
+ params={"output_format": output_format}
108
+ ) as resp:
109
  if resp.status_code != 200:
110
  print(f"[TTS][ElevenLabs] HTTP {resp.status_code}")
111
  return
 
117
  print(f"[TTS][ElevenLabs] {exc}")
118
 
119
 
120
+ async def text_to_speech_stream(
121
+ text: str,
122
+ voice: str | None = None,
123
+ rate: str = "-35%", # FIX-ISSUE4: -35% default (was -30%)
124
+ ):
125
+ """
126
+ Stream TTS audio for `text`.
127
+
128
+ Splits text into small clause-level parts, synthesises all in parallel,
129
+ yields audio in order. This gives the lowest possible first-audio latency
130
+ while maintaining natural speech ordering.
131
+ """
132
  text = text.strip()
133
  if not text:
134
  return
 
153
  finally:
154
  await q.put(_SENT)
155
 
156
+ # Create one queue per part, synthesise all in parallel
157
  queues = [asyncio.Queue() for _ in parts]
158
  tasks = [asyncio.create_task(_synth_part(p, q)) for p, q in zip(parts, queues)]
159
 
160
+ # Deliver in part order
161
  try:
162
  for q in queues:
163
  while True:
services/webrtc_pipeline.py CHANGED
@@ -1,34 +1,15 @@
1
  """
2
  services/webrtc_pipeline.py — WebRTC Audio Pipeline + Full Parallelization
3
 
4
- Architecture:
5
- ─────────────
6
- Browser MediaStream (WebRTC)
7
-
8
- RTCPeerConnection (aiortc)
9
-
10
- PCM frame receiver (20ms frames, 16kHz mono)
11
-
12
- │ VAD (webrtcvad) discard silence, buffer speech
13
-
14
- Speech segment → STT batch queue ──────────────────────────┐
15
- │ parallel
16
- STT result → LLM async stream ────────────────────────┐ │
17
- │ │
18
- LLM tokens → TTS ParallelStreamer ──────────────────┐ │ │
19
- │ │ │
20
- Audio chunks → RTCPeerConnection data channel ◄──── ┘ │ │
21
- └───┘
22
- (all three run concurrently)
23
-
24
- Key design choices:
25
- • aiortc handles WebRTC peer connection & ICE negotiation
26
- • PCM frames delivered via asyncio.Queue — never blocks media thread
27
- • VAD segments audio before STT — no wasted inference on silence
28
- • STT → LLM → TTS pipeline starts as soon as speech ends
29
- • Audio response sent back over RTCDataChannel as binary chunks
30
- • STT uses the shared GPU batch worker (see stt.py)
31
- • Barge-in: new speech cancels the current LLM+TTS pipeline immediately
32
  """
33
 
34
  from __future__ import annotations
@@ -79,10 +60,10 @@ class _VADSegmenter:
79
  self.sample_rate = sample_rate
80
  self.frame_bytes = int(sample_rate * frame_ms / 1000) * 2 # 16-bit samples
81
  self.silence_limit = silence_limit
82
- self._vad = webrtcvad.Vad(aggressiveness) if VAD_AVAILABLE else None
83
- self._buffer = bytearray()
84
  self._silence_count = 0
85
- self._active = False
86
 
87
  def process_frame(self, pcm_frame: bytes) -> Optional[bytes]:
88
  """
@@ -90,7 +71,7 @@ class _VADSegmenter:
90
  Returns a complete utterance bytes object when speech ends, else None.
91
  """
92
  if self._vad is None:
93
- # No VAD available — buffer everything, flush after 3s
94
  self._buffer.extend(pcm_frame)
95
  if len(self._buffer) >= self.sample_rate * 3 * 2:
96
  data = bytes(self._buffer)
@@ -108,17 +89,17 @@ class _VADSegmenter:
108
 
109
  if is_speech:
110
  self._buffer.extend(frame)
111
- self._active = True
112
  self._silence_count = 0
113
  elif self._active:
114
  self._buffer.extend(frame)
115
  self._silence_count += 1
116
 
117
  if self._active and self._silence_count >= self.silence_limit:
118
- data = bytes(self._buffer)
119
  self._buffer.clear()
120
  self._silence_count = 0
121
- self._active = False
122
  return data
123
 
124
  return None
@@ -133,6 +114,9 @@ if AIORTC_AVAILABLE:
133
  """
134
  Wraps an incoming WebRTC audio track.
135
  Resamples to 16kHz mono PCM and pushes frames into an asyncio.Queue.
 
 
 
136
  """
137
 
138
  kind = "audio"
@@ -142,23 +126,49 @@ if AIORTC_AVAILABLE:
142
  self._track = track
143
  self._frame_queue = frame_queue
144
  self._resampler: Optional[av.AudioResampler] = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
145
 
146
- async def recv(self):
147
- frame = await self._track.recv()
148
- if self._resampler is None:
149
- self._resampler = av.AudioResampler(
150
- format="s16",
151
- layout="mono",
152
- rate=16_000,
153
- )
154
- resampled = self._resampler.resample(frame)
155
- for rf in resampled:
156
- pcm = bytes(rf.planes[0])
157
  try:
158
- self._frame_queue.put_nowait(pcm)
159
- except asyncio.QueueFull:
160
- pass # drop frame under backpressure — prefer real-time
161
- return frame
 
 
 
 
 
 
 
 
 
 
 
 
 
162
 
163
 
164
  # ══════════════════════════════════════════════════════════════════════════════
@@ -172,13 +182,13 @@ class _TurnPipeline:
172
  """
173
 
174
  def __init__(self, ai_backend, data_channel, on_stt=None, on_token=None):
175
- self._ai = ai_backend
176
- self._channel = data_channel # RTCDataChannel for audio delivery
177
- self._on_stt = on_stt # optional callback(str)
178
- self._on_token = on_token # optional callback(str)
179
- self._stt = STTProcessor()
180
- self._streamer = ParallelTTSStreamer()
181
- self._cancelled = False
182
  self._tasks: list[asyncio.Task] = []
183
 
184
  async def run(self, user_id: str, audio_bytes: bytes) -> None:
@@ -276,6 +286,8 @@ class WebRTCSession:
276
  self._vad = _VADSegmenter()
277
  self._active_turn: Optional[_TurnPipeline] = None
278
  self._active_task: Optional[asyncio.Task] = None
 
 
279
  self._setup_pc()
280
 
281
  def _setup_pc(self) -> None:
@@ -284,8 +296,12 @@ class WebRTCSession:
284
  @pc.on("track")
285
  def on_track(track):
286
  if track.kind == "audio":
 
287
  receiver = AudioFrameReceiver(track, self._frame_q)
 
 
288
  asyncio.ensure_future(self._frame_processor())
 
289
 
290
  @pc.on("datachannel")
291
  def on_datachannel(channel):
@@ -294,7 +310,6 @@ class WebRTCSession:
294
 
295
  @channel.on("message")
296
  def on_message(msg):
297
- # Control messages from browser (cancel, init, ping)
298
  try:
299
  data = json.loads(msg)
300
  if data.get("type") == "cancel":
@@ -377,5 +392,8 @@ class WebRTCSession:
377
  await self._pc.addIceCandidate(c)
378
 
379
  async def close(self) -> None:
 
 
 
380
  await self._cancel_active()
381
  await self._pc.close()
 
1
  """
2
  services/webrtc_pipeline.py — WebRTC Audio Pipeline + Full Parallelization
3
 
4
+ FIX-BUG3 (AudioFrameReceiver never driven):
5
+ In the original code, AudioFrameReceiver was instantiated but its recv()
6
+ method was never called. aiortc only delivers frames when a consumer calls
7
+ recv() in a loop. Without this, the frame queue was always empty → no audio
8
+ reached the VAD → no utterances → zero voice responses via WebRTC.
9
+
10
+ Fix: spawn a coroutine (_recv_loop) that calls receiver.recv() continuously.
11
+
12
+ All other logic preserved.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  """
14
 
15
  from __future__ import annotations
 
60
  self.sample_rate = sample_rate
61
  self.frame_bytes = int(sample_rate * frame_ms / 1000) * 2 # 16-bit samples
62
  self.silence_limit = silence_limit
63
+ self._vad = webrtcvad.Vad(aggressiveness) if VAD_AVAILABLE else None
64
+ self._buffer = bytearray()
65
  self._silence_count = 0
66
+ self._active = False
67
 
68
  def process_frame(self, pcm_frame: bytes) -> Optional[bytes]:
69
  """
 
71
  Returns a complete utterance bytes object when speech ends, else None.
72
  """
73
  if self._vad is None:
74
+ # No VAD — buffer everything, flush after 3s
75
  self._buffer.extend(pcm_frame)
76
  if len(self._buffer) >= self.sample_rate * 3 * 2:
77
  data = bytes(self._buffer)
 
89
 
90
  if is_speech:
91
  self._buffer.extend(frame)
92
+ self._active = True
93
  self._silence_count = 0
94
  elif self._active:
95
  self._buffer.extend(frame)
96
  self._silence_count += 1
97
 
98
  if self._active and self._silence_count >= self.silence_limit:
99
+ data = bytes(self._buffer)
100
  self._buffer.clear()
101
  self._silence_count = 0
102
+ self._active = False
103
  return data
104
 
105
  return None
 
114
  """
115
  Wraps an incoming WebRTC audio track.
116
  Resamples to 16kHz mono PCM and pushes frames into an asyncio.Queue.
117
+
118
+ IMPORTANT: call start_receiving() after construction to begin
119
+ consuming frames from the underlying track via recv().
120
  """
121
 
122
  kind = "audio"
 
126
  self._track = track
127
  self._frame_queue = frame_queue
128
  self._resampler: Optional[av.AudioResampler] = None
129
+ self._recv_task: Optional[asyncio.Task] = None
130
+
131
+ def start_receiving(self) -> None:
132
+ """
133
+ FIX-BUG3: Spawn the recv() loop so the track actually delivers frames.
134
+ Without this, _frame_queue stays empty forever.
135
+ """
136
+ if self._recv_task is None or self._recv_task.done():
137
+ self._recv_task = asyncio.ensure_future(self._recv_loop())
138
+
139
+ async def _recv_loop(self) -> None:
140
+ """Continuously consume frames from the remote track."""
141
+ while True:
142
+ try:
143
+ frame = await self._track.recv()
144
+ except Exception as exc:
145
+ print(f"[WebRTC] AudioFrameReceiver: track ended ({exc})")
146
+ break
147
 
148
+ if self._resampler is None:
149
+ self._resampler = av.AudioResampler(
150
+ format="s16",
151
+ layout="mono",
152
+ rate=16_000,
153
+ )
 
 
 
 
 
154
  try:
155
+ resampled = self._resampler.resample(frame)
156
+ for rf in resampled:
157
+ pcm = bytes(rf.planes[0])
158
+ try:
159
+ self._frame_queue.put_nowait(pcm)
160
+ except asyncio.QueueFull:
161
+ pass # Drop frame under backpressure — prefer real-time
162
+ except Exception as exc:
163
+ print(f"[WebRTC] Resample error: {exc}")
164
+
165
+ async def recv(self):
166
+ """Required override — delegates to the underlying track."""
167
+ return await self._track.recv()
168
+
169
+ def stop_receiving(self) -> None:
170
+ if self._recv_task and not self._recv_task.done():
171
+ self._recv_task.cancel()
172
 
173
 
174
  # ══════════════════════════════════════════════════════════════════════════════
 
182
  """
183
 
184
  def __init__(self, ai_backend, data_channel, on_stt=None, on_token=None):
185
+ self._ai = ai_backend
186
+ self._channel = data_channel # RTCDataChannel for audio delivery
187
+ self._on_stt = on_stt # optional callback(str)
188
+ self._on_token = on_token # optional callback(str)
189
+ self._stt = STTProcessor()
190
+ self._streamer = ParallelTTSStreamer()
191
+ self._cancelled = False
192
  self._tasks: list[asyncio.Task] = []
193
 
194
  async def run(self, user_id: str, audio_bytes: bytes) -> None:
 
286
  self._vad = _VADSegmenter()
287
  self._active_turn: Optional[_TurnPipeline] = None
288
  self._active_task: Optional[asyncio.Task] = None
289
+ # Keep references to receivers so they are not garbage-collected
290
+ self._receivers: list[AudioFrameReceiver] = []
291
  self._setup_pc()
292
 
293
  def _setup_pc(self) -> None:
 
296
  @pc.on("track")
297
  def on_track(track):
298
  if track.kind == "audio":
299
+ # FIX-BUG3: create receiver AND start its recv() loop
300
  receiver = AudioFrameReceiver(track, self._frame_q)
301
+ receiver.start_receiving()
302
+ self._receivers.append(receiver) # prevent GC
303
  asyncio.ensure_future(self._frame_processor())
304
+ print(f"[WebRTC] Audio track received — receiver started ✓")
305
 
306
  @pc.on("datachannel")
307
  def on_datachannel(channel):
 
310
 
311
  @channel.on("message")
312
  def on_message(msg):
 
313
  try:
314
  data = json.loads(msg)
315
  if data.get("type") == "cancel":
 
392
  await self._pc.addIceCandidate(c)
393
 
394
  async def close(self) -> None:
395
+ for receiver in self._receivers:
396
+ receiver.stop_receiving()
397
+ self._receivers.clear()
398
  await self._cancel_active()
399
  await self._pc.close()