Arnavkumar01 commited on
Commit
6ee9d08
·
1 Parent(s): ccd9d86

Changes made in requirements.txt, dockerfile and main.py to handle twilio websocket connection

Browse files
Files changed (3) hide show
  1. Dockerfile +12 -11
  2. main.py +319 -99
  3. requirements.txt +1 -0
Dockerfile CHANGED
@@ -1,29 +1,30 @@
1
  # 1. Start with a lean and official Python base image
2
  FROM python:3.10-slim
3
 
4
- # Install dependencies needed for psycopg2
5
- RUN apt-get update && apt-get install -y libpq-dev && rm -rf /var/lib/apt/lists/*
6
 
7
  # 2. Set the working directory inside the container
8
  WORKDIR /app
9
 
10
- # 3. Create a non-root user and set up the cache
11
  RUN useradd -m -u 1000 user
12
  RUN mkdir -p /app/.cache && chown -R user:user /app/.cache
13
  ENV HF_HOME="/app/.cache"
14
  USER user
15
 
16
- # Add the local bin directory to the PATH (good practice)
17
  ENV PATH="/home/user/.local/bin:${PATH}"
18
 
19
- # 4. Copy and install requirements
20
  COPY --chown=user:user requirements.txt .
21
  RUN pip install --no-cache-dir -r requirements.txt
22
 
23
- # 5. Copy your application code
24
- COPY --chown=user:user main.py .
25
 
26
- # 6. --- START OF FIX ---
27
- # Define the command to run your application using the full path to gunicorn
28
- CMD /home/user/.local/bin/gunicorn --bind 0.0.0.0:7860 --workers 1 --worker-class uvicorn.workers.UvicornWorker main:app
29
- # --- END OF FIX ---
 
 
1
  # 1. Start with a lean and official Python base image
2
  FROM python:3.10-slim
3
 
4
+ # Install dependencies for psycopg2 and audio processing
5
+ RUN apt-get update && apt-get install -y libpq-dev ffmpeg && rm -rf /var/lib/apt/lists/*
6
 
7
  # 2. Set the working directory inside the container
8
  WORKDIR /app
9
 
10
+ # 3. Create a non-root user and set up cache
11
  RUN useradd -m -u 1000 user
12
  RUN mkdir -p /app/.cache && chown -R user:user /app/.cache
13
  ENV HF_HOME="/app/.cache"
14
  USER user
15
 
16
+ # Add local bin directory to PATH
17
  ENV PATH="/home/user/.local/bin:${PATH}"
18
 
19
+ # 4. Copy and install dependencies
20
  COPY --chown=user:user requirements.txt .
21
  RUN pip install --no-cache-dir -r requirements.txt
22
 
23
+ # 5. Copy the app source code
24
+ COPY --chown=user:user . .
25
 
26
+ # 6. Expose the port used by Hugging Face Spaces
27
+ EXPOSE 7860
28
+
29
+ # 7. Run the FastAPI app using Uvicorn (better for WebSockets)
30
+ CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]
main.py CHANGED
@@ -4,8 +4,9 @@ import logging
4
  import json
5
  import re
6
  from contextlib import asynccontextmanager
7
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, status, Depends, Header, HTTPException
8
- from fastapi.concurrency import run_in_threadpool # Import for handling blocking calls
 
9
  from pydantic import BaseModel
10
  from dotenv import load_dotenv
11
  from openai import OpenAI
@@ -14,8 +15,14 @@ from langchain_huggingface import HuggingFaceEmbeddings
14
  from langchain_postgres.vectorstores import PGVector
15
  from sqlalchemy import create_engine
16
 
 
 
 
 
 
 
 
17
  # --- SETUP ---
18
- # Suppress noisy logs from underlying libraries
19
  os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
20
  logging.getLogger('tensorflow').setLevel(logging.ERROR)
21
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -42,15 +49,24 @@ TABLE_DESCRIPTIONS = """
42
  - "feedback_source": Customer feedback and ratings for projects.
43
  """
44
 
 
 
 
 
 
 
45
  # --- GLOBAL VARIABLES FOR LIFESPAN ---
46
- # These will be populated at startup
47
  embeddings = None
48
  vector_store = None
49
 
50
- # --- FASTAPI LIFESPAN MANAGEMENT ---
 
 
 
 
 
51
  @asynccontextmanager
52
  async def lifespan(app: FastAPI):
53
- # This code runs on startup
54
  global embeddings, vector_store
55
  logging.info(f"Initializing embedding model: '{EMBEDDING_MODEL}'...")
56
  embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
@@ -61,17 +77,15 @@ async def lifespan(app: FastAPI):
61
  vector_store = PGVector(
62
  connection=engine,
63
  collection_name=COLLECTION_NAME,
64
- embeddings=embeddings, # <-- CRITICAL FIX: Corrected parameter name
65
  )
66
  logging.info("Successfully connected to the vector store.")
67
  yield
68
- # This code would run on shutdown (if needed)
69
  logging.info("Application shutting down.")
70
 
71
- # --- INITIALIZE FastAPI APP WITH LIFESPAN ---
 
72
  app = FastAPI(lifespan=lifespan)
73
- client_openai = OpenAI(api_key=OPENAI_API_KEY)
74
- client_elevenlabs = ElevenLabs(api_key=ELEVENLABS_API_KEY)
75
 
76
 
77
  # --- PROMPTS ---
@@ -87,6 +101,7 @@ You are a query analysis agent. Your task is to transform a user's query into a
87
  4. If no specific status keywords are mentioned (e.g., the user asks generally about projects in a location), set the filter table to null.
88
  5. Respond ONLY with a JSON object containing "search_query" and "filter_table" (which should be the table name string or null).
89
  """
 
90
  ANSWER_SYSTEM_PROMPT = """
91
  You are an expert AI assistant for a premier real estate developer.
92
  ## YOUR PERSONA
@@ -101,127 +116,332 @@ You are an expert AI assistant for a premier real estate developer.
101
  3. **Stay on Topic:** Only answer questions related to real estate.
102
  """
103
 
104
- # --- HELPER FUNCTIONS ---
105
- def transcribe_audio(audio_bytes: bytes) -> str:
106
- """This is a blocking function."""
 
107
  try:
108
- with open("temp_audio.wav", "wb") as f: f.write(audio_bytes)
109
- with open("temp_audio.wav", "rb") as audio_file:
110
- transcript = client_openai.audio.transcriptions.create(model="whisper-1", file=audio_file)
111
- return transcript.text
 
 
 
 
112
  except Exception as e:
113
- logging.error(f"Error during transcription: {e}")
114
- return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  async def formulate_search_plan(user_query: str) -> dict:
117
  logging.info("Formulating search plan with Planner LLM...")
118
- try:
119
- response = client_openai.chat.completions.create( # This can be async if using an async client
120
- model=PLANNER_MODEL,
121
- messages=[{"role": "user", "content": QUERY_FORMULATION_PROMPT.format(user_query=user_query)}],
122
- response_format={"type": "json_object"},
123
- temperature=0.0
124
- )
125
- plan = json.loads(response.choices[0].message.content)
126
- logging.info(f"Search plan received: {plan}")
127
- return plan
128
- except Exception as e:
129
- logging.error(f"Error in Planner LLM call: {e}")
130
- return {"search_query": user_query, "filter_table": None}
 
 
 
131
 
132
  async def get_agent_response(user_text: str) -> str:
133
- """Runs the full RAG and generation logic for a given text query."""
134
- search_plan = await formulate_search_plan(user_text)
135
- search_query = search_plan.get("search_query", user_text)
136
- filter_table = search_plan.get("filter_table")
137
-
138
- # --- START OF MODIFICATION ---
139
- search_filter = {"source_table": filter_table} if filter_table else {}
140
- if search_filter:
141
- logging.info(f"Applying initial filter: {search_filter}")
142
-
143
- # First attempt: A specific, filtered search
144
- retrieved_docs = vector_store.similarity_search(search_query, k=3, filter=search_filter)
145
-
146
- # If the first attempt finds nothing, try a broader search
147
- if not retrieved_docs:
148
- logging.info("Initial search returned no results. Performing a broader fallback search.")
149
- retrieved_docs = vector_store.similarity_search(search_query, k=3) # No filter this time
150
- # --- END OF MODIFICATION ---
151
-
152
- context_text = "\n\n".join([doc.page_content for doc in retrieved_docs])
153
- logging.info(f"Retrieved Context: {context_text[:500]}...")
154
-
155
- final_prompt_messages = [
156
- {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
157
- {"role": "system", "content": f"Use the following CONTEXT to answer:\n{context_text}"},
158
- {"role": "user", "content": f"My original question was: '{user_text}'"}
159
- ]
160
- final_response = client_openai.chat.completions.create(
161
- model=ANSWERER_MODEL,
162
- messages=final_prompt_messages
163
- )
164
- return final_response.choices[0].message.content
165
 
166
- # --- Add this new function ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  async def verify_token(x_auth_token: str = Header(...)):
168
- """A dependency to verify the shared secret token."""
169
  if not SHARED_SECRET or x_auth_token != SHARED_SECRET:
170
  logging.warning("Authentication failed for /test-text-query.")
171
  raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing authentication token")
172
  logging.info("Authentication successful for /test-text-query.")
173
 
 
174
  # --- API Endpoints ---
175
  class TextQuery(BaseModel):
176
  query: str
177
 
 
178
  @app.post("/test-text-query", dependencies=[Depends(verify_token)])
179
  async def test_text_query_endpoint(query: TextQuery):
180
- """Endpoint for text-based testing via Swagger UI."""
181
  logging.info(f"Received text query: {query.query}")
182
  response_text = await get_agent_response(query.query)
183
  logging.info(f"Generated text response: {response_text}")
184
  return {"response": response_text}
185
 
 
 
186
  @app.websocket("/listen")
187
  async def websocket_endpoint(websocket: WebSocket):
188
- auth_token = websocket.headers.get("x-auth-token")
189
- if not SHARED_SECRET or auth_token != SHARED_SECRET:
190
- logging.warning(f"Authentication failed. Closing connection.")
191
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
192
- return
193
-
194
  await websocket.accept()
195
- logging.info("Authentication successful. Call connected.")
 
 
196
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  while True:
198
- message = await websocket.receive_json()
199
- audio_base64 = message.get('audio')
200
- if not audio_base64: continue
 
201
 
202
- # PERFORMANCE FIX: Run blocking transcription in a separate thread
203
- user_text = await run_in_threadpool(
204
- transcribe_audio, base64.b64decode(audio_base64)
205
- )
206
- logging.info(f"User said: {user_text}")
207
- if not user_text.strip(): continue
208
-
209
- agent_response_text = await get_agent_response(user_text)
210
- logging.info(f"AI Responded: {agent_response_text}")
211
-
212
- # PERFORMANCE FIX: Run blocking audio generation in a separate thread
213
- audio_output = await run_in_threadpool(
214
- client_elevenlabs.generate,
215
- text=agent_response_text,
216
- voice=ELEVENLABS_VOICE_NAME,
217
- model="eleven_multilingual_v2"
218
- )
219
- response_audio_base64 = base64.b64encode(audio_output).decode('utf-8')
220
- await websocket.send_json({'audio': response_audio_base64})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
 
222
  except WebSocketDisconnect:
223
- logging.info("Call disconnected.")
224
  except Exception as e:
225
  logging.error(f"An error occurred in the main loop: {e}", exc_info=True)
226
  finally:
227
- await websocket.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  import json
5
  import re
6
  from contextlib import asynccontextmanager
7
+ from typing import Optional
8
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status, Depends, Header, HTTPException
9
+ from fastapi.concurrency import run_in_threadpool
10
  from pydantic import BaseModel
11
  from dotenv import load_dotenv
12
  from openai import OpenAI
 
15
  from langchain_postgres.vectorstores import PGVector
16
  from sqlalchemy import create_engine
17
 
18
+ # --- NEW IMPORTS FOR TWILIO INTEGRATION ---
19
+ import asyncio
20
+ import audioop
21
+ import wave
22
+ import io
23
+ from pydub import AudioSegment
24
+
25
  # --- SETUP ---
 
26
  os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
27
  logging.getLogger('tensorflow').setLevel(logging.ERROR)
28
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
49
  - "feedback_source": Customer feedback and ratings for projects.
50
  """
51
 
52
+ # VAD Configuration
53
+ SILENCE_THRESHOLD = 1000 # RMS threshold for speech detection (tune based on testing)
54
+ MAX_AUDIO_BYTES = 80000 # Max buffer ~10s at 8kHz (prevent overflow)
55
+ # Max loop iterations to avoid infinite loops (safety)
56
+ MAX_LOOP_COUNT = 1200
57
+
58
  # --- GLOBAL VARIABLES FOR LIFESPAN ---
 
59
  embeddings = None
60
  vector_store = None
61
 
62
+ # Initialize clients (will be used after load_dotenv)
63
+ client_openai = OpenAI(api_key=OPENAI_API_KEY)
64
+ client_elevenlabs = ElevenLabs(api_key=ELEVENLABS_API_KEY)
65
+
66
+
67
+ # --- LIFESPAN / STARTUP ---
68
  @asynccontextmanager
69
  async def lifespan(app: FastAPI):
 
70
  global embeddings, vector_store
71
  logging.info(f"Initializing embedding model: '{EMBEDDING_MODEL}'...")
72
  embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
 
77
  vector_store = PGVector(
78
  connection=engine,
79
  collection_name=COLLECTION_NAME,
80
+ embeddings=embeddings,
81
  )
82
  logging.info("Successfully connected to the vector store.")
83
  yield
 
84
  logging.info("Application shutting down.")
85
 
86
+
87
+ # --- FASTAPI APP ---
88
  app = FastAPI(lifespan=lifespan)
 
 
89
 
90
 
91
  # --- PROMPTS ---
 
101
  4. If no specific status keywords are mentioned (e.g., the user asks generally about projects in a location), set the filter table to null.
102
  5. Respond ONLY with a JSON object containing "search_query" and "filter_table" (which should be the table name string or null).
103
  """
104
+
105
  ANSWER_SYSTEM_PROMPT = """
106
  You are an expert AI assistant for a premier real estate developer.
107
  ## YOUR PERSONA
 
116
  3. **Stay on Topic:** Only answer questions related to real estate.
117
  """
118
 
119
+
120
+ # --- HELPER FUNCTIONS (sync helpers executed in threadpool) ---
121
+ def convert_mulaw_to_wav_bytes(mulaw_bytes: bytes) -> bytes:
122
+ """Converts raw mulaw audio bytes (8kHz) to in-memory WAV file bytes."""
123
  try:
124
+ pcm_bytes = audioop.ulaw2lin(mulaw_bytes, 2)
125
+ with io.BytesIO() as wav_buffer:
126
+ with wave.open(wav_buffer, 'wb') as wav_file:
127
+ wav_file.setnchannels(1)
128
+ wav_file.setsampwidth(2)
129
+ wav_file.setframerate(8000)
130
+ wav_file.writeframes(pcm_bytes)
131
+ return wav_buffer.getvalue()
132
  except Exception as e:
133
+ logging.error(f"Error converting mulaw to WAV: {e}", exc_info=True)
134
+ return b''
135
+
136
+
137
+ def transcribe_audio_sync(audio_wav_bytes: bytes) -> str:
138
+ """Synchronous transcription using the OpenAI client (to be called inside threadpool)."""
139
+ for attempt in range(3):
140
+ try:
141
+ audio_file = io.BytesIO(audio_wav_bytes)
142
+ audio_file.name = "stream.wav"
143
+ transcript = client_openai.audio.transcriptions.create(model="whisper-1", file=audio_file)
144
+ text = transcript.text
145
+
146
+ # If Hindi script present, transliterate to Roman (Hinglish)
147
+ if re.search(r'[\u0900-\u097F]', text):
148
+ translit_prompt = f"Transliterate this Hindi text to Roman script (Hinglish style): {text}"
149
+ response = client_openai.chat.completions.create(
150
+ model="gpt-4o-mini",
151
+ messages=[{"role": "user", "content": translit_prompt}],
152
+ temperature=0.0
153
+ )
154
+ text = response.choices[0].message.content
155
 
156
+ return text
157
+ except Exception as e:
158
+ logging.error(f"Error during transcription (attempt {attempt+1}): {e}", exc_info=True)
159
+ if attempt == 2:
160
+ return ""
161
+
162
+
163
+ def convert_audio_to_mulaw_sync(audio_bytes: bytes) -> bytes:
164
+ """Synchronous conversion of arbitrary audio bytes to 8kHz mulaw (for Twilio)."""
165
+ for attempt in range(3):
166
+ try:
167
+ audio_segment = AudioSegment.from_file(io.BytesIO(audio_bytes))
168
+ audio_segment = audio_segment.set_frame_rate(8000)
169
+ audio_segment = audio_segment.set_channels(1)
170
+ pcm_data = audio_segment.raw_data
171
+ mulaw_data = audioop.lin2ulaw(pcm_data, 2)
172
+ return mulaw_data
173
+ except Exception as e:
174
+ logging.error(f"Error converting audio to mulaw (attempt {attempt+1}): {e}", exc_info=True)
175
+ if attempt == 2:
176
+ return b''
177
+
178
+
179
+ def generate_elevenlabs_sync(text: str, voice: str, model: str = "eleven_multilingual_v2", output_format: str = "mp3_44100_128") -> bytes:
180
+ """Synchronous ElevenLabs generation wrapper for run_in_threadpool."""
181
+ for attempt in range(3):
182
+ try:
183
+ # The ElevenLabs client call is synchronous in this codebase
184
+ return client_elevenlabs.generate(
185
+ text=text,
186
+ voice=voice,
187
+ model=model,
188
+ output_format=output_format
189
+ )
190
+ except Exception as e:
191
+ logging.error(f"Error in ElevenLabs generate (attempt {attempt+1}): {e}", exc_info=True)
192
+ if attempt == 2:
193
+ return b''
194
+
195
+
196
+ # --- LLM / RAG helpers (async, but will call sync via threadpool when appropriate) ---
197
  async def formulate_search_plan(user_query: str) -> dict:
198
  logging.info("Formulating search plan with Planner LLM...")
199
+ for attempt in range(3):
200
+ try:
201
+ response = client_openai.chat.completions.create(
202
+ model=PLANNER_MODEL,
203
+ messages=[{"role": "user", "content": QUERY_FORMULATION_PROMPT.format(user_query=user_query)}],
204
+ response_format={"type": "json_object"},
205
+ temperature=0.0
206
+ )
207
+ plan = json.loads(response.choices[0].message.content)
208
+ logging.info(f"Search plan received: {plan}")
209
+ return plan
210
+ except Exception as e:
211
+ logging.error(f"Error in Planner LLM call (attempt {attempt+1}): {e}", exc_info=True)
212
+ if attempt == 2:
213
+ return {"search_query": user_query, "filter_table": None}
214
+
215
 
216
  async def get_agent_response(user_text: str) -> str:
217
+ """Runs RAG and generation logic for a given text query with retries."""
218
+ for attempt in range(3):
219
+ try:
220
+ search_plan = await formulate_search_plan(user_text)
221
+ search_query = search_plan.get("search_query", user_text)
222
+ filter_table = search_plan.get("filter_table")
223
+
224
+ search_filter = {"source_table": filter_table} if filter_table else {}
225
+ if search_filter:
226
+ logging.info(f"Applying initial filter: {search_filter}")
227
+
228
+ retrieved_docs = vector_store.similarity_search(search_query, k=3, filter=search_filter)
229
+
230
+ if not retrieved_docs:
231
+ logging.info("Initial search returned no results. Performing a broader fallback search.")
232
+ retrieved_docs = vector_store.similarity_search(search_query, k=3)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
 
234
+ context_text = "\n\n".join([doc.page_content for doc in retrieved_docs])
235
+ logging.info(f"Retrieved Context (preview): {context_text[:500]}...")
236
+
237
+ final_prompt_messages = [
238
+ {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
239
+ {"role": "system", "content": f"Use the following CONTEXT to answer:\n{context_text}"},
240
+ {"role": "user", "content": f"My original question was: '{user_text}'"}
241
+ ]
242
+ final_response = client_openai.chat.completions.create(
243
+ model=ANSWERER_MODEL,
244
+ messages=final_prompt_messages
245
+ )
246
+ return final_response.choices[0].message.content
247
+ except Exception as e:
248
+ logging.error(f"Error in get_agent_response (attempt {attempt+1}): {e}", exc_info=True)
249
+ if attempt == 2:
250
+ return "Sorry, I couldn't generate a response. Please try again."
251
+
252
+
253
+ # --- AUTH DEPENDENCY ---
254
  async def verify_token(x_auth_token: str = Header(...)):
255
+ """Dependency to verify the shared secret token."""
256
  if not SHARED_SECRET or x_auth_token != SHARED_SECRET:
257
  logging.warning("Authentication failed for /test-text-query.")
258
  raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing authentication token")
259
  logging.info("Authentication successful for /test-text-query.")
260
 
261
+
262
  # --- API Endpoints ---
263
  class TextQuery(BaseModel):
264
  query: str
265
 
266
+
267
  @app.post("/test-text-query", dependencies=[Depends(verify_token)])
268
  async def test_text_query_endpoint(query: TextQuery):
 
269
  logging.info(f"Received text query: {query.query}")
270
  response_text = await get_agent_response(query.query)
271
  logging.info(f"Generated text response: {response_text}")
272
  return {"response": response_text}
273
 
274
+
275
+ # --- WEBHOOK / WEBSOCKET FOR TWILIO STREAMING ---
276
  @app.websocket("/listen")
277
  async def websocket_endpoint(websocket: WebSocket):
 
 
 
 
 
 
278
  await websocket.accept()
279
+ logging.info("WebSocket connection accepted from Twilio.")
280
+ stream_sid: Optional[str] = None
281
+
282
  try:
283
+ first_message = await websocket.receive_json()
284
+ event = first_message.get("event")
285
+
286
+ if event != "start":
287
+ logging.error("Expected 'start' message. Closing.")
288
+ await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
289
+ return
290
+
291
+ start_data = first_message.get("start", {})
292
+ custom_params = start_data.get("customParameters", {})
293
+ if not custom_params:
294
+ logging.error("Missing customParameters in start event. Closing.")
295
+ await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
296
+ return
297
+
298
+ auth_token = custom_params.get("x-auth-token")
299
+ stream_sid = start_data.get("streamSid")
300
+
301
+ if not SHARED_SECRET or auth_token != SHARED_SECRET:
302
+ logging.warning("Authentication failed. Invalid token. Closing connection.")
303
+ await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
304
+ return
305
+
306
+ logging.info(f"Authentication successful. Stream SID: {stream_sid}")
307
+ logging.debug(f"Full start message: {first_message}")
308
+
309
+ # Main loop with VAD using timeout + RMS threshold
310
+ accumulated_audio_mulaw = b''
311
+ loop_counter = 0
312
+
313
  while True:
314
+ loop_counter += 1
315
+ if loop_counter > MAX_LOOP_COUNT:
316
+ logging.info("Max loop count reached. Exiting to prevent hang.")
317
+ break
318
 
319
+ try:
320
+ message_str = await asyncio.wait_for(websocket.receive_text(), timeout=1.0)
321
+ message = json.loads(message_str)
322
+ event = message.get("event")
323
+
324
+ if event == "media":
325
+ payload = message['media']['payload']
326
+ mulaw_chunk = base64.b64decode(payload)
327
+
328
+ # Compute RMS to avoid buffering pure silence / static
329
+ try:
330
+ pcm_chunk = audioop.ulaw2lin(mulaw_chunk, 2)
331
+ rms = audioop.rms(pcm_chunk, 2)
332
+ except Exception as e:
333
+ logging.debug(f"Could not compute RMS on chunk: {e}")
334
+ rms = 0
335
+
336
+ if rms > SILENCE_THRESHOLD:
337
+ accumulated_audio_mulaw += mulaw_chunk
338
+ logging.debug(f"Buffered audio chunk; RMS={rms}, total_bytes={len(accumulated_audio_mulaw)}")
339
+ else:
340
+ logging.debug(f"Ignored low-energy chunk; RMS={rms}")
341
+
342
+ # Safety: if buffer too large, process it
343
+ if len(accumulated_audio_mulaw) > MAX_AUDIO_BYTES:
344
+ logging.info(f"Max audio buffer reached ({len(accumulated_audio_mulaw)} bytes). Processing buffer.")
345
+ await process_audio_buffer(websocket, stream_sid or "", accumulated_audio_mulaw)
346
+ accumulated_audio_mulaw = b''
347
+
348
+ elif event == "stop":
349
+ logging.info("Twilio stream sent 'stop' event.")
350
+ # Process remaining buffered audio before breaking
351
+ if accumulated_audio_mulaw:
352
+ logging.info(f"Processing remaining audio on stop event ({len(accumulated_audio_mulaw)} bytes).")
353
+ await process_audio_buffer(websocket, stream_sid or "", accumulated_audio_mulaw)
354
+ accumulated_audio_mulaw = b''
355
+ break
356
+
357
+ else:
358
+ logging.debug(f"Ignored unknown event type: {event}")
359
+
360
+ except asyncio.TimeoutError:
361
+ # VAD trigger: no new data within timeout -> treat as end-of-speech
362
+ if accumulated_audio_mulaw:
363
+ logging.info(f"End of speech detected (timeout). Processing {len(accumulated_audio_mulaw)} bytes.")
364
+ await process_audio_buffer(websocket, stream_sid or "", accumulated_audio_mulaw)
365
+ accumulated_audio_mulaw = b''
366
+ else:
367
+ # No buffered audio, loop again
368
+ pass
369
+
370
+ except (ValueError, json.JSONDecodeError) as e:
371
+ logging.warning(f"Invalid message received: {e}. Skipping this message.")
372
+ except WebSocketDisconnect:
373
+ logging.info("WebSocket disconnected by client.")
374
+ break
375
 
376
  except WebSocketDisconnect:
377
+ logging.info("Call disconnected during start phase.")
378
  except Exception as e:
379
  logging.error(f"An error occurred in the main loop: {e}", exc_info=True)
380
  finally:
381
+ try:
382
+ await websocket.close()
383
+ except Exception:
384
+ pass
385
+
386
+
387
+ # --- PROCESS AUDIO BUFFER (async wrapper that uses sync helpers in threadpool) ---
388
+ async def process_audio_buffer(websocket: WebSocket, stream_sid: str, accumulated_audio_mulaw: bytes):
389
+ logging.info(f"Processing audio buffer of {len(accumulated_audio_mulaw)} bytes...")
390
+
391
+ # 1. Convert accumulated mulaw audio to WAV (in threadpool)
392
+ wav_bytes = await run_in_threadpool(convert_mulaw_to_wav_bytes, accumulated_audio_mulaw)
393
+ if not wav_bytes:
394
+ logging.warning("WAV conversion produced no bytes. Skipping processing.")
395
+ return
396
+
397
+ # 2. Transcribe the WAV audio (in threadpool)
398
+ user_text = await run_in_threadpool(transcribe_audio_sync, wav_bytes)
399
+ if not user_text or not user_text.strip():
400
+ logging.info("Transcription empty; skipping further processing.")
401
+ return
402
+
403
+ user_text = user_text.strip()
404
+ logging.info(f"User said: {user_text}")
405
+
406
+ # 3. Get AI agent response (async)
407
+ agent_response_text = await get_agent_response(user_text)
408
+ logging.info(f"AI Responded (preview): {agent_response_text[:200]}")
409
+
410
+ if not agent_response_text or not agent_response_text.strip():
411
+ logging.warning("Agent generated empty response; skipping TTS.")
412
+ return
413
+
414
+ # 4. Generate AI speech with ElevenLabs (in threadpool wrapper with retries inside)
415
+ ai_audio_bytes = await run_in_threadpool(generate_elevenlabs_sync, agent_response_text, ELEVENLABS_VOICE_NAME)
416
+ if not ai_audio_bytes:
417
+ logging.error("ElevenLabs returned no audio bytes; skipping sending audio.")
418
+ return
419
+
420
+ # 5. Convert AI speech to 8kHz mulaw for Twilio (in threadpool)
421
+ mulaw_payload_bytes = await run_in_threadpool(convert_audio_to_mulaw_sync, ai_audio_bytes)
422
+ if not mulaw_payload_bytes:
423
+ logging.error("Conversion to mulaw failed; skipping sending audio.")
424
+ return
425
+
426
+ # 6. Base64 encode and send the audio back to Twilio
427
+ try:
428
+ base64_payload = base64.b64encode(mulaw_payload_bytes).decode('utf-8')
429
+ await websocket.send_json({
430
+ "event": "media",
431
+ "streamSid": stream_sid,
432
+ "media": {"payload": base64_payload}
433
+ })
434
+ logging.info("Sent AI audio response back to Twilio.")
435
+ except Exception as e:
436
+ logging.error(f"Failed to send AI audio to Twilio: {e}", exc_info=True)
437
+ return
438
+
439
+ # 7. Send 'clear' to flush Twilio's buffer
440
+ try:
441
+ await websocket.send_json({"event": "clear", "streamSid": stream_sid})
442
+ logging.info("Sent clear event to Twilio.")
443
+ except Exception as e:
444
+ logging.error(f"Failed to send 'clear' event: {e}", exc_info=True)
445
+
446
+
447
+ # End of file
requirements.txt CHANGED
@@ -6,6 +6,7 @@ elevenlabs==2.17.0
6
  gunicorn==23.0.0
7
  psycopg2-binary==2.9.10
8
  pandas==2.2.3
 
9
  python-dotenv==1.1.0
10
  sentence-transformers==5.1.1
11
  langchain-huggingface==0.3.1
 
6
  gunicorn==23.0.0
7
  psycopg2-binary==2.9.10
8
  pandas==2.2.3
9
+ pydub==0.25.1
10
  python-dotenv==1.1.0
11
  sentence-transformers==5.1.1
12
  langchain-huggingface==0.3.1