| import logging |
| import asyncio |
| import json |
| import uuid |
| import os |
| from datetime import datetime |
| from zoneinfo import ZoneInfo |
| from typing import Annotated, Optional, AsyncIterable, Any, Dict |
| import random |
| import http.server |
| import socketserver |
| import threading |
|
|
| from dotenv import load_dotenv |
| from livekit import rtc |
| from livekit.agents import ( |
| AutoSubscribe, |
| JobContext, |
| JobProcess, |
| WorkerOptions, |
| cli, |
| llm, |
| AgentSession, |
| metrics, |
| MetricsCollectedEvent, |
| Agent, |
| ) |
| from livekit.agents.llm import function_tool |
| from livekit.agents.voice import ( |
| RunContext, |
| ModelSettings, |
| ) |
| from livekit.plugins import openai, deepgram, cartesia, silero, groq |
|
|
| |
| from groq import Groq as GroqClient |
|
|
| |
| import sentry_sdk |
| from logger import logger |
| from validators import validate_phone_number, validate_appointment_time, validate_purpose, validate_appointment_id |
|
|
| |
| try: |
| from livekit.plugins import bey |
| BEY_AVAILABLE = True |
| except ImportError: |
| BEY_AVAILABLE = False |
| logging.warning("Beyond Presence plugin not available. Install with: pip install \"livekit-agents[bey]\"") |
|
|
| from db import Database |
|
|
| load_dotenv() |
|
|
| |
| if os.getenv("SENTRY_DSN"): |
| sentry_sdk.init( |
| dsn=os.getenv("SENTRY_DSN"), |
| traces_sample_rate=0.1, |
| environment=os.getenv("ENVIRONMENT", "production") |
| ) |
| print("✅ Sentry error tracking enabled") |
|
|
| logger = logging.getLogger("voice-agent") |
| logger.setLevel(logging.INFO) |
|
|
| |
| logging.getLogger("hpack").setLevel(logging.WARNING) |
| logging.getLogger("httpx").setLevel(logging.WARNING) |
| logging.getLogger("livekit").setLevel(logging.INFO) |
| logging.getLogger("urllib3").setLevel(logging.WARNING) |
|
|
| def get_groq_api_key(): |
| """Rotate between multiple Groq API keys if available to avoid rate limits.""" |
| keys_str = os.getenv("GROQ_API_KEYS", "") |
| if keys_str: |
| keys = [k.strip() for k in keys_str.split(",") if k.strip()] |
| if keys: |
| chosen = random.choice(keys) |
| print(f"DEBUG: Selected Groq Key from list of {len(keys)}. Prefix: {chosen[:5]}...") |
| return chosen |
| |
| single_key = os.getenv("GROQ_API_KEY") |
| if single_key: |
| print(f"DEBUG: Using single GROQ_API_KEY. Prefix: {single_key[:5]}...") |
| return single_key |
| |
| print("DEBUG: No Groq API Key found!") |
| return None |
|
|
| try: |
| from flagsmith import Flagsmith |
| flagsmith = Flagsmith(environment_key=os.getenv("FLAGSMITH_ENVIRONMENT_KEY", "default")) |
| except Exception: |
| flagsmith = None |
|
|
| |
|
|
|
|
|
|
|
|
|
|
| SYSTEM_PROMPT = """ |
| You are the SkyTask Clinic Assistant, a friendly and capable voice receptionist. |
| |
| # User: {name} | Status: {status} | Goal: {goal_instruction} |
| # Rules |
| - Voice response: Plain text only. Natural and polite. |
| - Be warm: Use "Good morning", "Thank you", "Please". |
| - Length: 1-3 sentences, but don't be robotic. |
| - Speak nums: "five five five". No emojis/markdown. |
| - Address user by name if known. |
| # Flow |
| 1. Identify user (ask phone/name). |
| 2. Tools: book_appointment, check_slots, retrieve_appointments, cancel/modify, summarize_call, end_conversation. |
| - STRICT: Only call these tools. Do NOT invent new tools. |
| - Do NOT speak tool names. Execute silently. |
| - summarize_call: When user asks "summarize" or "recap" - gives summary but continues call |
| - end_conversation: When user says "end call", "goodbye", "bye" - ends the call |
| 3. Verify name mismatches. |
| # Guardrails |
| - Privacy protection active. |
| - Scope: Clinic appointments only. |
| """ |
|
|
| class Assistant(Agent): |
| def __init__(self, db: Database, user_context: dict, room): |
| current_time_ist = datetime.now(ZoneInfo("Asia/Kolkata")).strftime("%Y-%m-%d %I:%M %p") |
| |
| |
| instructions = SYSTEM_PROMPT.format( |
| name="Guest", |
| status="Unidentified", |
| goal_instruction="Ask for their phone number (and name) to pull up their file. Say: 'Hi! I'm the clinic assistant. May I have your phone number to get started?'" |
| ) |
| instructions += f"\n\nCurrent time (IST): {current_time_ist}" |
| |
| super().__init__(instructions=instructions) |
| self.db = db |
| self.user_context = user_context |
| self.room = room |
| self.current_time_str = current_time_ist |
| self.should_disconnect = False |
| |
| |
| self.usage_collector = None |
| self.assistant = None |
| self.start_time = datetime.now() |
| self.avatar_type = None |
| self.tts_provider = None |
| |
| |
| self.summary_generated = False |
| |
| |
| @room.on("data_received") |
| def on_data_received(data_packet): |
| try: |
| payload = data_packet.data.decode('utf-8') |
| data = json.loads(payload) |
| |
| if data.get("type") == "request_end_call": |
| logger.info("🔴 Frontend requested end call via button - triggering end_conversation") |
| |
| asyncio.create_task(self.end_conversation("User clicked End Call button")) |
| except Exception as e: |
| logger.warning(f"Error processing frontend data message: {e}") |
| |
| def update_instructions_with_name(self, name: str): |
| """Update the agent's instructions to include the user's name""" |
| try: |
| |
| new_instructions = SYSTEM_PROMPT.format( |
| name=name, |
| status="Authenticated", |
| goal_instruction=f"Help {name} with appointments. Address them as {name}." |
| ) |
| full_instructions = f"{new_instructions}\n\nCurrent time (IST): {self.current_time_str}" |
| |
| |
| self._instructions = full_instructions |
| |
| print(f"✅ Updated agent instructions with user name: {name}") |
| print(f"🔍 DEBUG - NEW PROMPT:\n{new_instructions}") |
| return True |
| except Exception as e: |
| print(f"Failed to update instructions: {e}") |
| return False |
|
|
| |
|
|
| @function_tool() |
| async def identify_user( |
| self, |
| contact_number: str |
| ): |
| """Identify the user by their phone number. Only call this when you have received a numeric phone number. |
| |
| Args: |
| contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string. |
| """ |
| if not contact_number or len(contact_number.strip()) < 3: |
| return "Error: A valid contact number is required to identify the user." |
|
|
| try: |
| contact_number = validate_phone_number(contact_number) |
| except ValueError as e: |
| return f"Error: {str(e)}" |
|
|
| await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number}) |
| logger.info(f"Identifying user with number: {contact_number}") |
| user = self.db.get_user(contact_number) |
| if not user: |
| user = self.db.create_user(contact_number) |
| is_new = True |
| else: |
| is_new = False |
| |
| self.user_context["contact_number"] = contact_number |
| self.user_context["user_name"] = user.get("name", "User") |
| |
| name = user.get('name', 'User') |
| |
| |
| self.update_instructions_with_name(name) |
| |
| |
| |
| if hasattr(self, 'chat_ctx') and self.chat_ctx: |
| try: |
| self.chat_ctx.items.append( |
| llm.ChatMessage( |
| role="system", |
| content=[f"IMPORTANT: The user's name is {name}. You MUST address them as {name} in all future responses. When they ask 'what's my name' or 'do you know my name', respond with 'Yes, {name}, your name is {name}.'"] |
| ) |
| ) |
| print(f"✅ Injected name '{name}' into chat context") |
| except Exception as e: |
| print(f"Could not inject into chat context: {e}") |
| |
| |
| result_msg = f"User identified successfully. Their name is {name}. You MUST immediately respond by saying: 'Great to meet you, {name}! How can I help you today?' Use their name {name} in your response right now." |
| await self._emit_frontend_event("identify_user", "success", result={"name": name, "is_new": is_new}) |
| return result_msg |
|
|
| @function_tool() |
| async def verify_identity( |
| self, |
| contact_number: str, |
| stated_name: str |
| ): |
| """Verify the user's identity using both their phone number and stated name. |
| Use this when the user provides both pieces of information. |
| |
| Args: |
| contact_number: The user's phone number (numeric). |
| stated_name: The name the user introduced themselves with. |
| """ |
| if not contact_number or len(contact_number.strip()) < 3: |
| return "Error: A valid contact number is required." |
|
|
| try: |
| contact_number = validate_phone_number(contact_number) |
| except ValueError as e: |
| return f"Error: {str(e)}" |
|
|
| await self._emit_frontend_event("verify_identity", "started", {"contact_number": contact_number, "name": stated_name}) |
| logger.info(f"Verifying identity: {stated_name} with {contact_number}") |
| |
| user = self.db.get_user(contact_number) |
| |
| if not user: |
| |
| user = self.db.create_user(contact_number, name=stated_name) |
| is_new = True |
| db_name = stated_name |
| match = True |
| else: |
| is_new = False |
| db_name = user.get("name", "User") |
| |
| match = stated_name.lower() in db_name.lower() or db_name.lower() in stated_name.lower() |
| |
| self.user_context["contact_number"] = contact_number |
| self.user_context["user_name"] = db_name |
| |
| |
| self.update_instructions_with_name(db_name) |
| |
| if match: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| result_msg = f"Identity verified! The user is indeed {db_name}. Greet them naturally as {db_name}." |
| await self._emit_frontend_event("verify_identity", "success", result={"name": db_name, "match": True}) |
| return result_msg |
| else: |
| |
| result_msg = f"Identity Mismatch Warning: The phone number belongs to '{db_name}', but user said '{stated_name}'. politely ask: 'I have this number registered under {db_name}. Are you {db_name}?'" |
| await self._emit_frontend_event("verify_identity", "warning", result={"db_name": db_name, "stated_name": stated_name, "match": False}) |
| return result_msg |
|
|
| async def _emit_frontend_event(self, tool_name: str, status: str, args: dict = None, result: dict = None): |
| try: |
| payload = json.dumps({ |
| "type": "tool_call", |
| "tool": tool_name, |
| "status": status, |
| "args": args, |
| "result": result |
| }) |
| await self.room.local_participant.publish_data(payload, reliable=True) |
| except Exception as e: |
| logger.error(f"Failed to emit frontend event: {e}") |
|
|
| @function_tool() |
| async def hello(self, response: str = ""): |
| """This tool is used for greetings. |
| |
| Args: |
| response: The greeting response. |
| """ |
| return "Hello! How can I help you today?" |
|
|
| @function_tool() |
| async def identify_user( |
| self, |
| contact_number: str |
| ): |
| """Identify the user by their phone number. Only call this when you have received a numeric phone number. |
| |
| Args: |
| contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string. |
| """ |
| if not contact_number or len(contact_number.strip()) < 3: |
| return "Error: A valid contact number is required to identify the user." |
|
|
| try: |
| contact_number = validate_phone_number(contact_number) |
| except ValueError as e: |
| return f"Error: {str(e)}" |
|
|
| await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number}) |
| logger.info(f"Identifying user with number: {contact_number}") |
| user = self.db.get_user(contact_number) |
| if not user: |
| user = self.db.create_user(contact_number) |
| is_new = True |
| else: |
| is_new = False |
| |
| self.user_context["contact_number"] = contact_number |
| self.user_context["user_name"] = user.get("name", "User") |
| |
| |
| |
| |
| result_msg = f"User identified. Name: {user.get('name')}. New user: {is_new}." |
| await self._emit_frontend_event("identify_user", "success", result={"name": user.get('name'), "is_new": is_new}) |
| return result_msg |
|
|
| @function_tool() |
| async def fetch_slots(self, location: str): |
| """Fetch available appointment slots. |
| |
| Args: |
| location: The clinic location to check (e.g. 'main', 'downtown'). |
| """ |
| logger.info(f"Fetching available slots for {location}") |
| await self._emit_frontend_event("fetch_slots", "started", {"location": location}) |
| |
| |
| available_slots = self.db.get_available_slots() |
| slots_json = json.dumps(available_slots) |
| |
| await self._emit_frontend_event("fetch_slots", "success", result=available_slots) |
| return slots_json |
|
|
| @function_tool() |
| async def book_appointment( |
| self, |
| time: str, |
| purpose: str |
| ): |
| """Book an appointment for the identified user. |
| |
| Args: |
| time: The ISO 8601 formatted date and time for the appointment. |
| purpose: Purpose of the appointment. |
| """ |
| await self._emit_frontend_event("book_appointment", "started", {"time": time, "purpose": purpose}) |
| contact_number = self.user_context.get("contact_number") |
| if not contact_number: |
| return "Error: User not identified. Please ask for phone number first." |
|
|
| try: |
| contact_number = validate_phone_number(contact_number) |
| except ValueError as e: |
| return f"Error validation phone: {str(e)}" |
|
|
| logger.info(f"Booking appointment for {contact_number} at {time}") |
| |
| is_available = self.db.check_slot_availability(datetime.fromisoformat(time)) |
| if not is_available: |
| return "Error: Slot not available." |
|
|
| result = self.db.book_appointment(contact_number, time, purpose) |
| if result: |
| await self._emit_frontend_event("book_appointment", "success", result=result) |
| return f"Appointment booked successfully. ID: {result.get('id')}" |
| else: |
| await self._emit_frontend_event("book_appointment", "failed") |
| return "Failed to book appointment." |
|
|
| @function_tool() |
| async def retrieve_appointments(self, user_confirmation: str): |
| """Retrieve past and upcoming appointments for the identified user. |
| |
| Args: |
| user_confirmation: The user's confirmation to see their appointments (e.g. 'show them', 'yes'). |
| """ |
| await self._emit_frontend_event("retrieve_appointments", "started") |
| contact_number = self.user_context.get("contact_number") |
| if not contact_number: |
| return "Error: User not identified." |
|
|
| try: |
| contact_number = validate_phone_number(contact_number) |
| except ValueError as e: |
| return f"Error: {str(e)}" |
| |
| appointments = self.db.get_user_appointments(contact_number) |
| if not appointments: |
| await self._emit_frontend_event("retrieve_appointments", "success", result=[]) |
| return "No appointments found." |
| |
| await self._emit_frontend_event("retrieve_appointments", "success", result=appointments) |
| return json.dumps(appointments) |
|
|
| @function_tool() |
| async def cancel_appointment( |
| self, |
| appointment_id: str |
| ): |
| """Cancel an appointment. |
| |
| Args: |
| appointment_id: The ID of the appointment to cancel. |
| """ |
| await self._emit_frontend_event("cancel_appointment", "started", {"appointment_id": appointment_id}) |
| success = self.db.cancel_appointment(appointment_id) |
| if success: |
| await self._emit_frontend_event("cancel_appointment", "success", result={"id": appointment_id}) |
| return "Appointment cancelled successfully." |
| else: |
| await self._emit_frontend_event("cancel_appointment", "failed") |
| return "Failed to cancel appointment." |
|
|
| @function_tool() |
| async def modify_appointment( |
| self, |
| appointment_id: str, |
| new_time: str |
| ): |
| """Modify the date/time of an appointment. |
| |
| Args: |
| appointment_id: The ID of the appointment to modify. |
| new_time: The new ISO 8601 formatted date and time. |
| """ |
| await self._emit_frontend_event("modify_appointment", "started", {"appointment_id": appointment_id, "new_time": new_time}) |
| success = self.db.modify_appointment(appointment_id, new_time) |
| if success: |
| await self._emit_frontend_event("modify_appointment", "success", result={"id": appointment_id, "new_time": new_time}) |
| return "Appointment modified successfully." |
| else: |
| await self._emit_frontend_event("modify_appointment", "failed") |
| return "Failed to modify appointment." |
|
|
| @function_tool() |
| async def summarize_call( |
| self, |
| request: Annotated[str, "User's request for summary"] = "summarize" |
| ) -> str: |
| """Provide a summary of the current call without ending it. |
| |
| Use this when the user asks for a summary but wants to continue the conversation. |
| Example triggers: "Can you summarize?", "What did we discuss?", "Recap please" |
| |
| Args: |
| request: The user's request for a summary (e.g., "summarize", "recap") |
| |
| Returns: |
| str: A spoken summary of the conversation so far. |
| """ |
| logger.info(f"Generating mid-call summary (not ending): {request}") |
| |
| |
| contact = self.user_context.get("contact_number") |
| if not contact: |
| return "So far, we've discussed your appointments. Is there anything else I can help you with?" |
| |
| |
| summary = self.usage_collector.get_summary() |
| usage_stats = { |
| "stt_duration": summary.stt_audio_duration, |
| "llm_prompt_tokens": summary.llm_prompt_tokens, |
| "llm_completion_tokens": summary.llm_completion_tokens, |
| "tts_chars": summary.tts_characters_count |
| } |
| |
| duration = (datetime.now() - self.start_time).total_seconds() |
| user_name = self.user_context.get("user_name", "the patient") |
| |
| |
| try: |
| summary_data = await generate_and_save_summary( |
| self.db, |
| self.assistant.chat_ctx, |
| contact, |
| duration, |
| self.avatar_type, |
| self.tts_provider, |
| user_name, |
| usage_stats |
| ) |
| if summary_data and isinstance(summary_data, dict): |
| spoken_summary = summary_data.get("spoken_text", "So far, we've discussed your appointments.") |
| logger.info(f"Mid-call summary: {spoken_summary}") |
| return spoken_summary |
| except Exception as e: |
| logger.error(f"Failed to generate mid-call summary: {e}") |
| |
| return "So far, we've discussed your appointments. Is there anything else I can help you with?" |
|
|
| @function_tool() |
| async def end_conversation(self, summary_request: str): |
| """End the current conversation session and generate a final summary. |
| |
| Args: |
| summary_request: The user's request to end or wrap up (e.g. 'bye', 'summarize', 'we're done'). |
| """ |
| logger.info("Ending conversation - generating summary first") |
| |
| |
| if self.summary_generated: |
| logger.warning("Summary already generated - skipping duplicate generation") |
| return "Thank you for calling. Goodbye!" |
| |
| spoken_text = "Thank you for calling. Have a great day!" |
| summary_sent = False |
| |
| |
| contact = self.user_context.get("contact_number") |
| if contact: |
| |
| summary = self.usage_collector.get_summary() |
| usage_stats = { |
| "stt_duration": summary.stt_audio_duration, |
| "llm_prompt_tokens": summary.llm_prompt_tokens, |
| "llm_completion_tokens": summary.llm_completion_tokens, |
| "tts_chars": summary.tts_characters_count |
| } |
| |
| duration = (datetime.now() - self.start_time).total_seconds() |
| user_name = self.user_context.get("user_name", "the patient") |
| |
| |
| try: |
| summary_data = await generate_and_save_summary( |
| self.db, |
| self.assistant.chat_ctx, |
| contact, |
| duration, |
| self.avatar_type, |
| self.tts_provider, |
| user_name, |
| usage_stats |
| ) |
| if summary_data and isinstance(summary_data, dict): |
| |
| spoken_text = summary_data.get("spoken_text", spoken_text) |
| |
| |
| payload = json.dumps({ |
| "type": "summary", |
| "summary": summary_data |
| }) |
| await self.room.local_participant.publish_data(payload, reliable=True) |
| logger.info("Summary sent to frontend") |
| summary_sent = True |
| |
| |
| self.summary_generated = True |
| |
| |
| |
| await asyncio.sleep(0.1) |
| close_payload = json.dumps({"type": "close_session"}) |
| await self.room.local_participant.publish_data(close_payload, reliable=True) |
| logger.info("✅ close_session sent - UI will auto-disconnect") |
| |
| except Exception as e: |
| logger.error(f"Failed to process summary: {e}") |
| |
| |
| if not summary_sent: |
| logger.warning("Sending fallback summary with cost placeholder") |
| fallback = { |
| "content": "Call ended. See cost breakdown below.", |
| "spoken_text": spoken_text, |
| "costs": {"stt": 0.0, "tts": 0.0, "llm": 0.0, "avatar": 0.0, "total": 0.0}, |
| "status": "fallback" |
| } |
| try: |
| payload = json.dumps({"type": "summary", "summary": fallback}) |
| await self.room.local_participant.publish_data(payload, reliable=True) |
| logger.info("Fallback summary sent to frontend") |
| except Exception as e: |
| logger.error(f"Failed to send fallback: {e}") |
| |
| |
| |
| |
| |
| |
| self.should_disconnect = True |
| logger.info("Disconnect requested - waiting for speech to finish") |
| |
| |
| asyncio.create_task(self.safeguard_disconnect()) |
| |
| |
| return spoken_text |
|
|
| async def safeguard_disconnect(self): |
| """Force disconnect if normal flow fails.""" |
| logger.info("Safeguard: Timer started (10s)...") |
| await asyncio.sleep(10.0) |
| |
| state = self.room.connection_state |
| logger.info(f"Safeguard: Timeout reached. Room state is: {state}") |
| |
| if state == "connected": |
| logger.warning("Safeguard: Timed out. Sending close_session event.") |
| try: |
| payload = json.dumps({"type": "close_session"}) |
| await self.room.local_participant.publish_data(payload, reliable=True) |
| logger.info("Safeguard: close_session event sent.") |
| except Exception as e: |
| logger.warning(f"Safeguard: Failed to send event: {e}") |
| |
| await asyncio.sleep(3.0) |
| |
| if self.room.connection_state == "connected": |
| logger.warning("Safeguard: Force disconnecting room now.") |
| await self.room.disconnect() |
| else: |
| logger.info("Safeguard: Room already disconnected, taking no action.") |
|
|
| def calculate_costs(duration_seconds: float, tts_chars: int, avatar_type: str, tts_provider: str, prompt_tokens: int = 0, completion_tokens: int = 0): |
| |
| stt_rate = 0.006 |
| |
| stt_rate = 0.006 |
| |
| |
| |
| |
| llm_rate_input = 0.15 / 1_000_000 |
| llm_rate_output = 0.60 / 1_000_000 |
| |
| |
| if tts_provider == "cartesia": |
| tts_rate = 0.050 / 1000 |
| tts_label = "Cartesia" |
| elif tts_provider == "deepgram": |
| tts_rate = 0.015 / 1000 |
| tts_label = "Deepgram" |
| else: |
| tts_rate = 0.000 |
| tts_label = "Groq" |
|
|
| |
| avatar_rate = 0.05 if avatar_type == 'bey' else 0 |
|
|
| |
| stt_cost = (duration_seconds / 60) * stt_rate |
| tts_cost = tts_chars * tts_rate |
|
|
| |
| if prompt_tokens == 0 and completion_tokens == 0: |
| |
| |
| estimated_input_tokens = (duration_seconds / 60) * 200 |
| estimated_output_tokens = (tts_chars / 4) |
| llm_cost = (estimated_input_tokens * llm_rate_input) + (estimated_output_tokens * llm_rate_output) |
| else: |
| llm_cost = (prompt_tokens * llm_rate_input) + (completion_tokens * llm_rate_output) |
| avatar_cost = (duration_seconds / 60) * avatar_rate |
| |
| total = stt_cost + tts_cost + llm_cost + avatar_cost |
| |
| |
| logger.info(f"Cost calculation: duration={duration_seconds}s, tts_chars={tts_chars}, provider={tts_provider}") |
| logger.info(f"Costs: STT=${stt_cost:.6f}, TTS=${tts_cost:.6f}, LLM=${llm_cost:.6f}, Avatar=${avatar_cost:.6f}") |
| |
| return { |
| "stt": round(stt_cost, 6), |
| "tts": round(tts_cost, 6), |
| "llm": round(llm_cost, 6), |
| "avatar": round(avatar_cost, 6), |
| "total": round(total, 6), |
| "currency": "USD", |
| "labels": { |
| "tts": tts_label, |
| "stt": "Deepgram", |
| "llm": "Groq/OpenAI", |
| "avatar": "Beyond Presence" if avatar_type == 'bey' else "3D Avatar" |
| } |
| } |
|
|
| async def generate_and_save_summary(db: Database, chat_ctx: llm.ChatContext, contact_number: str, duration: float, avatar_type: str, tts_provider: str, user_name: str = "the patient", usage_stats: dict = None) -> Optional[Dict[str, Any]]: |
| if not contact_number: |
| logger.warning("No contact number to save summary for.") |
| return |
|
|
| logger.info("Generating conversation summary...") |
| |
| transcript = "" |
| messages_to_save = [] |
| |
| |
| try: |
| if hasattr(chat_ctx, 'items'): |
| items = chat_ctx.items |
| elif hasattr(chat_ctx, 'messages'): |
| items = chat_ctx.messages |
| else: |
| items = [] |
| |
| for item in items: |
| if isinstance(item, llm.ChatMessage): |
| role = item.role |
| content = item.content |
| |
| |
| content_str = content |
| if isinstance(content, list): |
| content_str = " ".join([str(c) for c in content]) |
| |
| if isinstance(content_str, str): |
| transcript += f"{role}: {content_str}\n" |
|
|
| |
| msg_data = { |
| "role": role, |
| "content": content_str, |
| "tool_name": None, |
| "tool_args": None |
| } |
| |
| |
| if hasattr(item, 'tool_calls') and item.tool_calls: |
| try: |
| tc = item.tool_calls[0] |
| |
| if isinstance(tc, dict): |
| msg_data["tool_name"] = tc.get('function', {}).get('name') |
| msg_data["tool_args"] = tc.get('function', {}).get('arguments') |
| else: |
| |
| fn = getattr(tc, 'function', None) |
| if fn: |
| msg_data["tool_name"] = getattr(fn, 'name', None) |
| msg_data["tool_args"] = getattr(fn, 'arguments', None) |
| except Exception: |
| pass |
| |
| if role == "tool": |
| msg_data["tool_name"] = getattr(item, 'name', getattr(item, 'tool_call_id', None)) |
|
|
| messages_to_save.append(msg_data) |
| |
| |
| if messages_to_save: |
| try: |
| |
| session_id = str(uuid.uuid4()) |
| db.save_chat_transcript(session_id, contact_number, messages_to_save) |
| except Exception as e: |
| logger.error(f"Failed to save chat transcript to DB: {e}") |
|
|
| except Exception as e: |
| logger.error(f"Error extracting transcript: {e}") |
| |
| |
| logger.info(f"Calculating costs with usage_stats: {usage_stats}") |
| if usage_stats: |
| tts_chars = usage_stats.get("tts_chars", 0) |
| prompt_tokens = usage_stats.get("llm_prompt_tokens", 0) |
| completion_tokens = usage_stats.get("llm_completion_tokens", 0) |
| costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider, prompt_tokens, completion_tokens) |
| else: |
| |
| tts_chars = len(transcript) // 2 |
| costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider) |
| |
| logger.info(f"Calculated costs: {costs}") |
| |
| prompt = ( |
| f"Summarize the conversation with {user_name} in JSON format.\n" |
| f"Transcript:\n{transcript}\n\n" |
| "CRITICAL: Use natural time formats like '9 AM' or '2:30 PM', NOT 'nine zero zero hours'\n" |
| "Return a valid JSON object with exactly two keys:\n" |
| "1. 'spoken': A 1-2 sentence spoken closing for TTS. Natural, human-like, polite. No special chars. Start with 'To recap,'.\n" |
| "2. 'written': A detailed bulleted summary for the user interface. Include topics, appointments booked, and outcome.\n" |
| "IMPORTANT: Ensure the JSON is valid. Do NOT use unescaped newlines in the 'written' string or 'spoken' string. Use \\n for line breaks.\n" |
| ) |
|
|
| max_retries = 3 |
| retry_delay = 1 |
| |
| for attempt in range(max_retries): |
| try: |
| |
| api_key = os.getenv("GROQ_API_KEY_SUMMARY") or get_groq_api_key() |
| client = GroqClient(api_key=api_key) |
| |
| |
| response = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant. Output valid JSON only. Do not output markdown blocks."}, |
| {"role": "user", "content": prompt} |
| ], |
| temperature=0.7, |
| max_tokens=500 |
| ) |
| |
| full_response = response.choices[0].message.content |
| |
| |
| summary_input_cost = response.usage.prompt_tokens * (0.59 / 1_000_000) |
| summary_output_cost = response.usage.completion_tokens * (0.79 / 1_000_000) |
| summary_cost = summary_input_cost + summary_output_cost |
| |
| logger.info(f"🔍 RAW LLM RESPONSE: {full_response}") |
| logger.info(f"💰 Summary LLM cost: ${summary_cost:.6f} ({response.usage.prompt_tokens} + {response.usage.completion_tokens} tokens)") |
| |
| |
| spoken = "To recap, we discussed your appointments. Have a great day!" |
| written = "" |
| |
| try: |
| |
| clean_json = full_response.replace("```json", "").replace("```", "").strip() |
| |
| |
| import re |
| match = re.search(r"\{.*\}", clean_json, re.DOTALL) |
| if match: |
| clean_json = match.group(0) |
| |
| data = json.loads(clean_json) |
| spoken = data.get("spoken", spoken) |
| written = data.get("written", "") |
| |
| except (json.JSONDecodeError, AttributeError) as e: |
| logger.warning(f"Failed to parse JSON summary (standard): {e}. Retrying with Regex Fallback.") |
| |
| try: |
| import re |
| |
| s_match = re.search(r'"spoken"\s*:\s*"(.*?)"', clean_json, re.DOTALL) |
| if s_match: |
| spoken = s_match.group(1) |
| |
| |
| w_match = re.search(r'"written"\s*:\s*"(.*?)(?<!\\)"', clean_json, re.DOTALL) |
| if w_match: |
| written = w_match.group(1).replace("\\n", "\n") |
| else: |
| |
| written = clean_json |
| except Exception as ex: |
| logger.error(f"Regex fallback failed: {ex}") |
| written = clean_json |
|
|
| |
| if not written.strip(): |
| written = f"Summary: {spoken.strip()}" |
|
|
| logger.info(f"Spoken Summary: {spoken.strip()}") |
| logger.info(f"📝 WRITTEN SUMMARY:\\n{written.strip()}") |
| logger.info(f"=" * 80) |
| db.save_summary(contact_number, written.strip()) |
| |
| |
| costs['llm'] += summary_cost |
| costs['total'] += summary_cost |
| |
| |
| summary_result = { |
| "text": written.strip(), |
| "content": written.strip(), |
| "spoken_text": spoken.strip(), |
| "costs": costs, |
| "status": "completed" |
| } |
| logger.info(f"📊 Summary with costs: {summary_result}") |
| |
| |
| print(f"\\n{'='*80}") |
| print(f"📋 CALL SUMMARY GENERATED") |
| print(f"{'='*80}") |
| print(f"Contact: {contact_number}") |
| print(f"Summary: {written.strip()}") |
| print(f"Costs: STT=${costs['stt']:.4f} | TTS=${costs['tts']:.4f} | LLM=${costs['llm']:.6f} | Total=${costs['total']:.4f}") |
| print(f"{'='*80}\\n") |
| |
| return summary_result |
| |
| except Exception as e: |
| logger.warning(f"Summary generation attempt {attempt+1} failed: {e}") |
| if attempt < max_retries - 1: |
| await asyncio.sleep(retry_delay * (2 ** attempt)) |
| else: |
| logger.error("All summary generation attempts failed.") |
| return { |
| "text": "Call summary unavailable.", |
| "content": "Call summary unavailable.", |
| "spoken_text": "Thank you for calling. Have a great day!", |
| "costs": costs, |
| "status": "failed" |
| } |
| |
|
|
| def prewarm(proc: JobProcess): |
| """Prewarm worker to reduce cold start latency""" |
| from logger import logger as struct_logger |
| from db import Database |
| |
| struct_logger.info("Prewarming worker...") |
| |
| try: |
| |
| db = Database() |
| proc.userdata["db"] = db |
| struct_logger.info("✅ Database connection prewarmed") |
| |
| |
| proc.userdata["vad"] = silero.VAD.load() |
| struct_logger.info("✅ VAD model prewarmed") |
| |
| |
| proc.userdata["slots"] = db.get_available_slots() |
| struct_logger.info("✅ Appointment slots cached") |
| |
| |
| from cache import cache |
| proc.userdata["cache"] = cache |
| struct_logger.info(f"✅ Redis cache prewarmed (enabled: {cache.enabled})") |
| |
| struct_logger.info("🚀 Worker prewarmed successfully - ready for calls!") |
| |
| except Exception as e: |
| struct_logger.error(f"Prewarming failed: {e}", error=str(e)) |
|
|
| async def entrypoint(ctx: JobContext): |
| |
| await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) |
| |
| |
| if "db" in ctx.proc.userdata: |
| db = ctx.proc.userdata["db"] |
| logger.info("Using prewarmed Database connection") |
| else: |
| db = Database() |
| logger.info("Initialized new Database connection") |
|
|
| user_context = {} |
| participant = await ctx.wait_for_participant() |
|
|
| avatar_type = '3d' |
| user_tts_pref = None |
| if participant.metadata: |
| try: |
| metadata = json.loads(participant.metadata) |
| avatar_type = metadata.get('avatarType', '3d') |
| user_tts_pref = metadata.get('ttsProvider') |
| except Exception as e: |
| logger.warning(f"Failed to parse participant metadata: {e}") |
| |
| logger.info(f"Avatar type requested by {participant.identity}: {avatar_type}") |
|
|
| |
| tts_provider = os.getenv("TTS_PROVIDER", "deepgram") |
| try: |
| flags = flagsmith.get_environment_flags() |
| |
| tts_provider_flag = flags.get_feature_value("tts_provider") |
| if tts_provider_flag: |
| tts_provider = tts_provider_flag |
| logger.info(f"Flagsmith: tts_provider={tts_provider}") |
| except Exception as e: |
| logger.warning(f"Failed to fetch feature flags from Flagsmith: {e}. Using default: {tts_provider}") |
|
|
| if tts_provider == "cartesia": |
| logger.info("Using Cartesia TTS") |
| agent_tts = cartesia.TTS() |
| elif tts_provider == "groq": |
| logger.info("Using Groq TTS") |
| agent_tts = groq.TTS(model="canopylabs/orpheus-v1-english") |
| else: |
| logger.info("Using Deepgram TTS (Default)") |
| agent_tts = deepgram.TTS() |
|
|
| |
| usage_collector = metrics.UsageCollector() |
|
|
| |
| session = AgentSession( |
| stt=deepgram.STT(), |
| llm=groq.LLM( |
| model="openai/gpt-oss-120b", |
| api_key=get_groq_api_key(), |
| temperature=0.5, |
| ), |
| tts=agent_tts, |
| vad=silero.VAD.load( |
| min_speech_duration=0.1, |
| min_silence_duration=0.5, |
| prefix_padding_duration=0.2, |
| ), |
| ) |
|
|
| @session.on("metrics_collected") |
| def _on_metrics_collected(ev: MetricsCollectedEvent): |
| |
| usage_collector.collect(ev.metrics) |
|
|
| assistant = Assistant(db, user_context, ctx.room) |
| start_time = datetime.now() |
| assistant.usage_collector = usage_collector |
| assistant.assistant = assistant |
| assistant.avatar_type = avatar_type |
| assistant.tts_provider = tts_provider |
|
|
| @session.on("agent_speech_stopped") |
| def _on_agent_speech_stopped(ev: Any = None): |
| """Disconnect if the agent has finished speaking and a disconnect was requested.""" |
| if assistant.should_disconnect: |
| async def _disconnect_sequence(): |
| logger.info("Agent finished speaking. Sending close_session event then closing room.") |
| try: |
| payload = json.dumps({"type": "close_session"}) |
| await ctx.room.local_participant.publish_data(payload, reliable=True) |
| logger.info("close_session event sent to frontend") |
| except Exception as e: |
| logger.warning(f"Failed to publish close_session: {e}") |
| |
| |
| await asyncio.sleep(2.0) |
| |
| |
| if ctx.room.connection_state == "connected": |
| logger.info("Frontend didn't disconnect, forcing disconnect") |
| await ctx.room.disconnect() |
| else: |
| logger.info("Frontend disconnected gracefully") |
|
|
| asyncio.create_task(_disconnect_sequence()) |
|
|
| @session.on("agent_speech_interrupted") |
| def _on_agent_speech_interrupted(ev: Any = None): |
| """Handle case where agent summary/goodbye is interrupted by noise/user.""" |
| if assistant.should_disconnect: |
| logger.info("Agent speech interrupted during disconnect phase. Triggering disconnect sequence.") |
| |
| _on_agent_speech_stopped(ev) |
|
|
| @session.on("agent_speech_started") |
| def _on_agent_speech_started(ev: Any = None): |
| logger.info("Agent speech STARTED.") |
|
|
|
|
| await session.start(room=ctx.room, agent=assistant) |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| if avatar_type == 'bey' and BEY_AVAILABLE: |
| logger.info("Initializing Beyond Presence avatar...") |
| |
| |
| |
| async def send_init_signal(): |
| for _ in range(5): |
| try: |
| await ctx.room.local_participant.publish_data( |
| json.dumps({"type": "avatar_initializing"}), |
| reliable=True |
| ) |
| except: pass |
| await asyncio.sleep(0.5) |
| |
| asyncio.create_task(send_init_signal()) |
| |
| try: |
| bey_session = bey.AvatarSession( |
| api_key=os.environ.get("BEYOND_PRESENCE_API_KEY"), |
| avatar_id=os.environ.get("BEYOND_PRESENCE_AVATAR_ID", "b9be11b8-89fb-4227-8f86-4a881393cbdb"), |
| ) |
| await bey_session.start(session, room=ctx.room) |
| logger.info("Beyond Presence avatar started successfully (API level)") |
| |
| |
| |
| logger.info("Waiting for avatar participant to join room...") |
| avatar_joined = False |
| for _ in range(40): |
| |
| |
| |
| |
| p = ctx.room.remote_participants.get("bey-avatar-agent") |
| if p: |
| |
| video_tracks = [t for t in p.track_publications.values() if t.kind == rtc.TrackKind.KIND_VIDEO] |
| if video_tracks: |
| logger.info("✅ Avatar participant joined and video track found!") |
| avatar_joined = True |
| break |
| await asyncio.sleep(1) |
| |
| if not avatar_joined: |
| logger.warning("Timed out waiting for avatar participant to join - proceeding anyway") |
| |
| except Exception as e: |
| logger.error(f"Failed to start Beyond Presence avatar: {e}") |
| logger.info("Falling back to audio-only mode") |
|
|
| |
| hour = datetime.now(ZoneInfo("Asia/Kolkata")).hour |
| if 5 <= hour < 12: |
| greeting = "Good morning" |
| elif 12 <= hour < 17: |
| greeting = "Good afternoon" |
| else: |
| greeting = "Good evening" |
| |
| |
| |
| |
| |
| |
| if ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED: |
| try: |
| logger.info(f"Speaking greeting: {greeting}...") |
| |
| await session.say( |
| f"{greeting}, thank you for calling SkyTask Clinic. May I have your phone number?", |
| allow_interruptions=True |
| ) |
| except RuntimeError as e: |
| logger.warning(f"Could not speak greeting - error: {e}") |
| else: |
| logger.warning("Session not running - skipping greeting (user may have disconnected)") |
|
|
| |
| try: |
| payload = json.dumps({"type": "session_ready"}) |
| await ctx.room.local_participant.publish_data(payload, reliable=True) |
| logger.info("✅ Session ready signal sent to frontend") |
| except Exception as e: |
| logger.warning(f"Failed to send session_ready: {e}") |
|
|
|
|
|
|
| |
| while ctx.room.connection_state == "connected": |
| await asyncio.sleep(1) |
|
|
| contact_number = user_context.get("contact_number") |
| if contact_number and not assistant.summary_generated: |
| logger.info("Disconnect summary generation (backup)...") |
| duration = (datetime.now() - start_time).total_seconds() |
| user_name = user_context.get("user_name", "the patient") |
| await generate_and_save_summary(db, assistant.chat_ctx, contact_number, duration, avatar_type, tts_provider, user_name) |
|
|
|
|
| def start_health_check_server(): |
| """Starts a simple HTTP server for health checks.""" |
| try: |
| port = int(os.getenv("PORT", 8080)) |
| |
| class HealthCheckHandler(http.server.BaseHTTPRequestHandler): |
| def do_GET(self): |
| if self.path == "/health" or self.path == "/": |
| self.send_response(200) |
| self.send_header("Content-type", "application/json") |
| self.end_headers() |
| self.wfile.write(b'{"status": "healthy"}') |
| else: |
| self.send_response(404) |
| self.end_headers() |
| |
| def log_message(self, format, *args): |
| pass |
|
|
| |
| socketserver.TCPServer.allow_reuse_address = True |
| |
| httpd = socketserver.TCPServer(("", port), HealthCheckHandler) |
| print(f"✅ Health check server listening on port {port}") |
| |
| |
| thread = threading.Thread(target=httpd.serve_forever, daemon=True) |
| thread.start() |
| except Exception as e: |
| print(f"⚠️ Failed to start health check server: {e}") |
|
|
| if __name__ == "__main__": |
| start_health_check_server() |
| |
| cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|