Voice-AI-Agent / core /backend.py
rakib72642's picture
added communication full layer
5dabf9d
raw
history blame
23.6 kB
from __future__ import annotations
import asyncio
import json
import os
import uuid
import aiosmtplib
import aiosqlite
import pytz
from datetime import datetime, timedelta
from dotenv import load_dotenv
from langchain_core.messages import (
AIMessage, AIMessageChunk, HumanMessage, RemoveMessage,
SystemMessage, ToolMessage,
)
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from twilio.rest import Client
from typing import Annotated, TypedDict, Optional, AsyncGenerator
from email.message import EmailMessage
from dateparser.search import search_dates
from langchain_ollama import ChatOllama
load_dotenv()
# ═══════════════════════════════════════════════════════════════════════════════
# STATE
# ═══════════════════════════════════════════════════════════════════════════════
class ChatState(TypedDict):
messages: Annotated[list, add_messages]
summary: str
# ═══════════════════════════════════════════════════════════════════════════════
# HELPERS
# ═══════════════════════════════════════════════════════════════════════════════
def get_db_path() -> str:
return os.path.join(os.path.dirname(__file__), "daa.db")
def format_bd_number(num: str) -> str:
num = num.strip().replace(" ", "")
if num.startswith("01") and len(num) == 11:
return "+88" + num
if num.startswith("8801"):
return "+" + num
return num
def send_sms(to_number: str, message: str) -> None:
client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN"))
client.messages.create(
body=message,
from_=os.getenv("TWILIO_PHONE_NUMBER"),
to=to_number,
)
async def send_mail(to_mail: str, subject: str, body: str):
email = EmailMessage()
email["From"] = "walidofficework@gmail.com"
email["To"] = to_mail
email["Subject"] = subject
email.set_content(body)
await aiosmtplib.send(
email,
hostname="smtp.gmail.com",
port=465,
username="walidofficework@gmail.com",
password="bajq dkqr qacs pehr",
use_tls=True,
)
# ═══════════════════════════════════════════════════════════════════════════════
# TOOLS
# ═══════════════════════════════════════════════════════════════════════════════
@tool
def get_bd_time() -> str:
"""Get current Bangladesh date and time along with the next 14 days."""
# Bangladesh timezone
tz = pytz.timezone("Asia/Dhaka")
# Current datetime
now = datetime.now(tz)
# Create result dictionary
result = {
"CURRENT_DATETIME": now.strftime("%Y-%m-%d %H:%M:%S %Z"),
"TODAY": now.strftime("%A, %B %d, %Y"),
"TOMORROW": (now + timedelta(days=1)).strftime("%A, %B %d, %Y"),
"NEXT_14_DAYS": {}
}
# Generate next 14 days
for i in range(1, 15):
future_date = now + timedelta(days=i)
result["NEXT_14_DAYS"][f"+{i}"] = future_date.strftime("%A, %B %d, %Y")
return json.dumps(result)
@tool
async def get_doctor_categories() -> str:
"""
Fetch all unique doctor categories from the database.
"""
db_path = get_db_path()
query = """
SELECT DISTINCT category
FROM doctors
WHERE category IS NOT NULL
AND TRIM(category) != ''
ORDER BY category ASC
"""
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(query)
rows = await cursor.fetchall()
categories = [row["category"] for row in rows]
return json.dumps({
"success": True,
"count": len(categories),
"data": categories
})
@tool
async def get_doctors_by_day(
visiting_day: str,
) -> str:
"""
Get all doctors available on a specific visiting day.
Example inputs:
- Sunday
- Monday
- Friday
"""
db_path = get_db_path()
query = """
SELECT *
FROM doctors
WHERE LOWER(visiting_days) LIKE ?
"""
param = [f"%{visiting_day.lower()}%"]
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(query, param)
rows = await cursor.fetchall()
if not rows:
return json.dumps({
"success": False,
"message": f"No doctors found for {visiting_day}.",
"data": []
})
doctors = [dict(row) for row in rows]
return json.dumps({
"success": True,
"visiting_day": visiting_day,
"count": len(doctors),
"data": doctors
}, ensure_ascii=False)
@tool
async def search_doctor(
name: str = "",
category: str = "",
visiting_days: str = "",
) -> str:
"""
Search doctors by name, category, or visiting_days from the database.
Any combination of filters is supported (OR logic across fields).
"""
db_path = get_db_path()
query = "SELECT * FROM doctors WHERE 1=1"
params: list = []
conditions: list[str] = []
if name:
conditions.append("LOWER(doctor_name) LIKE ?")
params.append(f"%{name.lower()}%")
if category:
conditions.append("LOWER(category) LIKE ?")
params.append(f"%{category.lower()}%")
if visiting_days:
conditions.append("LOWER(visiting_days) LIKE ?")
params.append(f"%{visiting_days.lower()}%")
if conditions:
query += " AND (" + " OR ".join(conditions) + ")"
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
if not rows:
return json.dumps({"success": False, "message": "No doctors found.", "data": []})
return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]})
@tool
async def search_appointment_by_phone(patient_num: str) -> str:
"""Search all appointments using the patient's phone number."""
db_path = get_db_path()
patient_num = format_bd_number(patient_num)
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM patients WHERE patient_num = ? ORDER BY visiting_date ASC",
(patient_num,),
)
rows = await cursor.fetchall()
if not rows:
return json.dumps({
"success": False,
"message": "No appointments found for this phone number.",
"data": [],
})
return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]})
@tool
async def book_appointment(
doctor_id: int,
patient_name: str,
patient_age: str,
patient_num: str,
visiting_date: str,
patient_mail: str
) -> str:
"""
Book a doctor appointment and save it to the patients table.
Args:
doctor_id: Doctor's ID from search_doctor results.
patient_name: Full name of the patient.
patient_age: Age of the patient (e.g. "32").
patient_num: Contact phone number of the patient.
visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15).
patient_mail: Mail address for confirmation mail.
"""
db_path = get_db_path()
patient_num = format_bd_number(patient_num)
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,))
doctor = await cursor.fetchone()
if not doctor:
return f"No doctor found with ID {doctor_id}. Please search for a doctor first."
doctor_data = dict(doctor)
doctor_name = doctor_data.get("doctor_name", "Unknown")
doctor_category = doctor_data.get("category", "Unknown")
cursor = await db.execute(
"""SELECT id FROM patients
WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""",
(doctor_name, visiting_date, patient_num),
)
if await cursor.fetchone():
return (
f"A booking for {patient_name} with Dr. {doctor_name} "
f"on {visiting_date} already exists."
)
await db.execute(
"""INSERT INTO patients
(doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail),
)
await db.commit()
# Mail SMS confirmation
mail_message = (
f"Doctor : {doctor_name}\n"
f"Patient : {patient_name}\n"
f"Visit Date : {visiting_date}\n"
f"Please arrive 10 minutes early."
)
try:
await send_mail(
to_mail=patient_mail,
subject="βœ… Appointment Confirmed!",
body=mail_message,
)
mail_status = "\nπŸ“§ Mail confirmation sent."
except Exception as e:
mail_status = f"\n⚠️ Mail failed: {str(e)}"
return (
f"βœ… Appointment Booked!\n"
f"━━━━━━━━━━━━━━━━━━━━━━\n"
f"Doctor : {doctor_name}\n"
f"Patient : {patient_name}\n"
f"Age : {patient_age}\n"
f"Date : {visiting_date}\n"
f"Contact : {patient_num}\n"
f"━━━━━━━━━━━━━━━━━━━━━━\n"
f"Please arrive 10 minutes early."
f"{mail_status}"
)
@tool
async def delete_appointment(patient_num: str, doctor_name: str) -> str:
"""Delete an appointment using the patient's phone number and doctor name."""
db_path = get_db_path()
patient_num = format_bd_number(patient_num)
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"""SELECT * FROM patients
WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""",
(patient_num, doctor_name),
)
if not await cursor.fetchone():
return json.dumps({"success": False, "message": "No matching appointment found."})
await db.execute(
"""DELETE FROM patients
WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""",
(patient_num, doctor_name),
)
await db.commit()
return json.dumps({
"success": True,
"message": f"Appointment with Dr. {doctor_name} deleted successfully.",
})
# ═══════════════════════════════════════════════════════════════════════════════
# SYSTEM PROMPT
# ═══════════════════════════════════════════════════════════════════════════════
BASE_SYSTEM = """
You are a Doctor Appointment Assistant AI.
Your job is to help users manage medical appointments.
CAPABILITIES:
- Book doctor appointments
- Reschedule appointments
- Cancel appointments
- Collect patient details
STRICT RULES:
- You are NOT a doctor.
- NEVER diagnose diseases.
- NEVER recommend medicines or treatments.
APPOINTMENT FLOW:
1. Detect intent (book / cancel / reschedule / inquiry)
2. Collect details
3. Confirm all details before final booking
STYLE:
- Be short, clear, structured
- Ask one question at a time when needed
- Focus on completing booking
LANGUAGE RULE:
- Detect user language from latest message.
- If English β†’ reply English.
- If Bangla β†’ reply Bangla (বাংলা).
- If Banglish β†’ reply Bangla (বাংলা).
- Never mix languages unless user mixes first.
TOOLS:
- Use backend tools if available for scheduling
- Always confirm before final action
"""
SUMMARY_SYSTEM = (
BASE_SYSTEM
+ "\nYou also have a condensed memory of previous conversations:\n\n"
"{summary}\n\n"
"Use this memory for continuity. Do not repeat it unless asked."
)
# ═══════════════════════════════════════════════════════════════════════════════
# AGENT
# ═══════════════════════════════════════════════════════════════════════════════
class AIBackend:
# ── FIX-BUG1: was `_init_` (single underscores) β€” never called by Python
def __init__(self, use_gemini: bool = False, use_ollama: bool = True, use_fallback: bool = False):
self.use_gemini = use_gemini
self.use_ollama = use_ollama
self.use_fallback = use_fallback
os.environ.setdefault("LANGCHAIN_PROJECT", "Doctor Appointment Automation")
if use_gemini:
self.llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0.3,
)
elif use_ollama:
self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2)
else:
# Local fallback β€” extend as needed
self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2)
self.tools = [
search_doctor,
book_appointment,
get_bd_time,
search_appointment_by_phone,
delete_appointment,
get_doctor_categories,
get_doctors_by_day
]
self.tool_node = ToolNode(self.tools)
self.llm_with_tools = self.llm.bind_tools(self.tools)
# ── Setup ──────────────────────────────────────────────────────────────────
async def async_setup(self) -> None:
db_path = get_db_path()
self.conn = await aiosqlite.connect(db_path)
self.checkpointer = AsyncSqliteSaver(self.conn)
await self._create_tables()
self.graph = self._build_graph()
self.summary_graph = self._build_summary_graph()
print("[Backend] AIBackend ready βœ“")
async def _create_tables(self) -> None:
await self.conn.execute("""
CREATE TABLE IF NOT EXISTS userid_threadid (
userId TEXT UNIQUE NOT NULL,
threadId TEXT UNIQUE NOT NULL
)
""")
await self.conn.execute("""
CREATE TABLE IF NOT EXISTS doctors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doctor_name TEXT,
category TEXT,
visiting_days TEXT,
visiting_time TEXT,
visiting_money INTEGER
)
""")
await self.conn.execute("""
CREATE TABLE IF NOT EXISTS patients (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doctor_name TEXT,
doctor_category TEXT,
patient_name TEXT,
patient_age TEXT,
patient_num TEXT,
visiting_date TEXT,
patient_mail TEXT
)
""")
await self.conn.commit()
# ── Summarise node ─────────────────────────────────────────────────────────
async def summarize_conversation(self, state: ChatState):
existing = state.get("summary", "")
messages = state["messages"]
if existing:
prompt = (
f"Existing summary:\n{existing}\n\n"
"Update the summary with the new messages above. "
"Keep it concise, bullet-pointed, and information-dense. "
"Preserve unresolved issues and ongoing tasks."
)
else:
prompt = (
"Summarise this conversation. "
"Capture goals, decisions, preferences, and unresolved questions. "
"Be concise and use bullet points."
)
response = await self.llm.ainvoke(messages + [HumanMessage(content=prompt)])
return {
"summary": response.content,
"messages": [RemoveMessage(id=m.id) for m in messages[:-2]],
}
# ── Chat node ──────────────────────────────────────────────────────────────
async def chat_node(self, state: ChatState):
"""
Invokes the LLM with tool bindings and returns the AI response.
Uses ainvoke() (not collect-all-then-return astream()) so the call is
clean and deterministic. Token-level streaming is handled by LangGraph
itself via stream_mode="messages" in ai_only_stream(), which intercepts
the underlying LLM streaming at the graph level.
"""
summary = state.get("summary", "")
messages = state["messages"]
print("#" * 50)
print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<")
print(f"[SUMMARY]: {summary[:120] if summary else 'None'}")
for m in messages:
print(f" [{m.__class__.__name__}]: {str(m.content)[:160]}")
print("#" * 50)
sys_content = SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM
full_messages = [SystemMessage(content=sys_content)] + list(messages)
response = await self.llm_with_tools.ainvoke(full_messages)
print(f"[AI]: {str(response.content)[:200]}")
print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<")
return {"messages": [response]}
# ── Graph ──────────────────────────────────────────────────────────────────
def _build_graph(self):
g = StateGraph(ChatState)
g.add_node("chat_node", self.chat_node)
g.add_node("tools", self.tool_node)
g.add_edge(START, "chat_node")
g.add_conditional_edges("chat_node", tools_condition)
g.add_edge("tools", "chat_node")
return g.compile(checkpointer=self.checkpointer)
def _build_summary_graph(self):
g = StateGraph(ChatState)
g.add_node("summarize_node", self.summarize_conversation)
g.add_edge(START, "summarize_node")
g.add_edge("summarize_node", END)
return g.compile(checkpointer=self.checkpointer)
# ── Streaming ──────────────────────────────────────────────────────────────
async def ai_only_stream(
self, initial_state: dict, config: dict
) -> AsyncGenerator[str, None]:
"""
Async generator β€” yields AI text tokens as they arrive.
FIX-BUG9: narrowed isinstance check to exclude ToolMessage content
from being streamed to the user, and guards against non-str content
(e.g. multimodal list payloads from Ollama tool-call chunks).
"""
async for chunk, _meta in self.graph.astream(
initial_state, config=config, stream_mode="messages"
):
# Only yield text content from AI messages.
# Exclude ToolMessage (tool execution results) β€” they contain
# raw JSON that should not be streamed directly to the user.
if (
isinstance(chunk, (AIMessage, AIMessageChunk))
and not isinstance(chunk, ToolMessage)
and isinstance(chunk.content, str)
and chunk.content
):
yield chunk.content
# Auto-summarise in background when history grows long
try:
current = await self.graph.aget_state(config)
if len(current.values.get("messages", [])) > 10:
asyncio.create_task(
self.summary_graph.ainvoke(current.values, config=config)
)
print("@" * 20, "Summarisation triggered", "@" * 20)
except Exception as exc:
print(f"[Backend] Summarisation check failed: {exc}")
# ── Thread management ──────────────────────────────────────────────────────
@staticmethod
def generate_thread_id() -> str:
return str(uuid.uuid4())
async def retrieve_all_threads(self) -> list[str]:
threads: set[str] = set()
async for cp in self.checkpointer.alist(None):
threads.add(cp.config["configurable"]["thread_id"])
return list(threads)
# ── Public entry point ─────────────────────────────────────────────────────
async def main(self, user_id: str, user_query: str) -> AsyncGenerator[str, None]:
"""Return an async generator of AI text tokens."""
async with self.conn.execute(
"SELECT threadId FROM userid_threadid WHERE userId = ?", (user_id,)
) as cursor:
row = await cursor.fetchone()
if row is None:
thread_id = user_id + self.generate_thread_id()
await self.conn.execute(
"INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)",
(user_id, thread_id),
)
await self.conn.commit()
else:
thread_id = row[0]
initial_state = {"messages": [HumanMessage(content=user_query)]}
config = {
"configurable": {"thread_id": thread_id},
"metadata": {"thread_id": thread_id},
"run_name": "chat_turn",
}
return self.ai_only_stream(initial_state, config)