Voice-AI-Agent / core /backend.py
rakib72642's picture
project init
f2ea5fc
raw
history blame
16.9 kB
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langchain_ollama import ChatOllama
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, RemoveMessage, SystemMessage
import aiosqlite, uuid, os, httpx, asyncio
from twilio.rest import Client
from dotenv import load_dotenv
import json, pytz
from datetime import datetime
######################### STATE #########################
class ChatState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
summary: str
######################### TOOLS #########################
def get_db_path():
return os.path.join(os.path.dirname(__file__), "daa.db")
def send_sms(to_number: str, message: str):
client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN"))
client.messages.create(
body=message,
from_=os.getenv("TWILIO_PHONE_NUMBER"),
to=to_number
)
def format_bd_number(num: str) -> str:
num = num.strip().replace(" ", "")
if num.startswith("01") and len(num) == 11:
return "+88" + num
if num.startswith("8801"):
return "+" + num
return num # already formatted or unknown
@tool
def get_bd_time() -> str:
"""
Get current Bangladesh time (Asia/Dhaka) with weekday name
"""
tz = pytz.timezone("Asia/Dhaka")
now = datetime.now(tz)
return now.strftime("%Y-%m-%d %H:%M:%S (%A, Bangladesh Time)")
@tool
async def search_doctor(name: str = "", category: str = "", visiting_days: str = "") -> str:
"""
Search doctors by name, category, or visiting_days from SQLite database.
Any combination of filters is supported (OR logic for each field).
"""
db_path = get_db_path()
query = "SELECT * FROM doctors WHERE 1=1"
params = []
conditions = []
if name:
conditions.append("LOWER(doctor_name) LIKE ?")
params.append(f"%{name.lower()}%")
if category:
conditions.append("LOWER(category) LIKE ?")
params.append(f"%{category.lower()}%")
if visiting_days:
conditions.append("LOWER(visiting_days) LIKE ?")
params.append(f"%{visiting_days.lower()}%")
if conditions:
query += " AND (" + " OR ".join(conditions) + ")"
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
if not rows:
return json.dumps({
"success": False,
"message": "No doctors found matching your search.",
"data": []
})
return json.dumps({
"success": True,
"count": len(rows),
"data": [dict(r) for r in rows]
})
@tool
async def search_appointment_by_phone(patient_num: str) -> str:
"""
Search all appointments using patient phone number.
"""
db_path = get_db_path()
patient_num = format_bd_number(patient_num)
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("""
SELECT * FROM patients
WHERE patient_num = ?
ORDER BY visiting_date ASC
""", (patient_num,))
rows = await cursor.fetchall()
if not rows:
return json.dumps({
"success": False,
"message": "No appointments found for this phone number.",
"data": []
})
return json.dumps({
"success": True,
"count": len(rows),
"data": [dict(r) for r in rows]
})
@tool
async def book_appointment(doctor_id: int, patient_name: str, patient_age: str, patient_num: str, visiting_date: str) -> str:
"""
Book a doctor appointment and save it to the patients table.
Args:
doctor_id: Doctor's ID from search_doctor results.
patient_name: Full name of the patient.
patient_age: Age of the patient (e.g. "32").
patient_num: Contact phone number of the patient.
visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15).
Returns a booking confirmation with the new record ID.
"""
db_path = get_db_path()
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
patient_num = format_bd_number(patient_num)
# Verify doctor exists
cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,))
doctor = await cursor.fetchone()
if not doctor:
return f"No doctor found with ID {doctor_id}. Please search for a doctor first."
doctor_data = dict(doctor)
doctor_name = doctor_data.get("doctor_name", "Unknown")
doctor_category = doctor_data.get("doctor_category", "Unknown")
# Check for conflicting booking (same doctor + same date)
cursor = await db.execute(
"""SELECT id FROM patients
WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""",
(doctor_name, visiting_date, patient_num),
)
conflict = await cursor.fetchone()
if conflict:
return (
f"A booking for {patient_name} with Dr. {doctor_name} "
f"on {visiting_date} already exists."
)
# Insert into patients table
cursor = await db.execute(
"""INSERT INTO patients (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date)
VALUES (?, ?, ?, ?, ?, ?)""",
(doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date),
)
await db.commit()
# Send SMS confirmation
sms_message = (
f"✅ Appointment Confirmed!\n"
f"Doctor : {doctor_name}\n"
f"Patient : {patient_name}\n"
f"Visit Date : {visiting_date}\n"
f"Please arrive 10 minutes early."
)
# try:
# send_sms(to_number=patient_num, message=sms_message)
# sms_status = "📱 SMS confirmation sent."
# except Exception as e:
# sms_status = f"⚠️ SMS failed: {str(e)}"
return (
f"✅ Appointment Booked!\n"
f"━━━━━━━━━━━━━━━━━━━━━━\n"
f"Doctor : {doctor_name}\n"
f"Patient : {patient_name}\n"
f"Age : {patient_age}\n"
f"Date : {visiting_date}\n"
f"Contact : {patient_num}\n"
f"━━━━━━━━━━━━━━━━━━━━━━\n"
f"Please arrive 10 minutes early."
# f"{sms_status}"
)
async def delete_appointment(patient_num: str, doctor_name: str) -> str:
"""
Delete an appointment using patient phone number and doctor name.
"""
db_path = get_db_path()
# normalize phone number
patient_num = format_bd_number(patient_num)
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
# check if appointment exists first
cursor = await db.execute("""
SELECT * FROM patients
WHERE patient_num = ?
AND LOWER(doctor_name) = LOWER(?)
""", (patient_num, doctor_name))
row = await cursor.fetchone()
if not row:
return json.dumps({
"success": False,
"message": "No matching appointment found to delete."
})
# delete appointment
await db.execute("""
DELETE FROM patients
WHERE patient_num = ?
AND LOWER(doctor_name) = LOWER(?)
""", (patient_num, doctor_name))
await db.commit()
return json.dumps({
"success": True,
"message": f"Appointment with Dr. {doctor_name} deleted successfully."
})
######################### MAIN AGENT CLASS #########################
class AIBackend:
def __init__(self):
load_dotenv()
os.environ["LANGCHAIN_PROJECT"] = "Doctor Appointment Automation"
self.llm = ChatOllama(model="gemma4:e4b", streaming=True) # qwen2.5:3b, gemma4:e4b
self.tools = [search_doctor, book_appointment, get_bd_time, search_appointment_by_phone, delete_appointment]
self.tool_node = ToolNode(self.tools)
self.llm_with_tools = self.llm.bind_tools(self.tools)
async def async_setup(self):
db_path = os.path.join(os.path.dirname(__file__), "daa.db")
self.conn = await aiosqlite.connect(db_path)
self.checkpointer = AsyncSqliteSaver(self.conn)
await self._create_user_table()
self.graph = self._build_graph()
self.summary_graph = self._build_summary_graph()
async def _create_user_table(self):
await self.conn.execute("""
CREATE TABLE IF NOT EXISTS userid_threadid (
userId TEXT UNIQUE NOT NULL,
threadId TEXT UNIQUE NOT NULL
)
""")
await self.conn.commit()
######################### SUMMARIZE NODE #########################
async def summarize_conversation(self, state: ChatState):
existing_summary = state.get("summary", "")
messages = state["messages"]
prompt = (
f"""
You are maintaining a long-term conversation memory for a chatbot.
Existing summary:
{existing_summary}
Update and extend the summary using ONLY the new conversation messages above.
Instructions:
- Preserve important existing context.
- Add new facts, decisions, preferences, goals, issues, and ongoing tasks.
- Keep technical details concise but meaningful.
- Track unresolved problems or follow-up actions.
- Avoid repetition and remove outdated or redundant information when appropriate.
- Maintain chronological consistency.
- Write the summary in clear bullet points.
- Focus on information useful for future conversations and contextual continuity.
- Do NOT include casual greetings or temporary small talk unless important.
- Keep the summary compact but information-dense.
"""
if existing_summary
else
"""
You are creating a long-term conversation memory summary for a chatbot.
Summarize the conversation above.
Instructions:
- Capture important user information, goals, preferences, projects, and decisions.
- Include technical issues, debugging progress, and solutions discussed.
- Track ongoing tasks or unresolved questions.
- Ignore casual greetings and low-value chatter.
- Write concise, structured bullet points.
- Keep the summary compact but highly informative for future context retention.
"""
)
messages_for_summary = messages + [HumanMessage(content=prompt)]
response = await self.llm.ainvoke(messages_for_summary)
return {
"summary": response.content,
"messages": [RemoveMessage(id=m.id) for m in messages[:-2]],
}
async def should_summarize(self, state: ChatState):
if len(state["messages"]) > 10:
return "summarize_node"
return "chat_node"
######################### CHAT NODE #########################
async def chat_node(self, state: ChatState):
summary = state.get("summary", "")
messages = state["messages"]
print('#'*50)
print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<")
if summary:
print(f"[SUMMARY]:\n{summary}\n")
else:
print("[NO SUMMARY YET]\n")
print('$'*50)
print("[MESSAGES]:")
for m in messages:
role = m.__class__.__name__
print(f" [{role}]: {m.content[:200]}")
print('$'*50,'\n')
if summary:
summary_message = SystemMessage(
content=(
"You are provided with a condensed memory of previous conversations.\n\n"
f"Conversation Memory:\n{summary}\n\n"
"Instructions:\n"
"- Use this memory as long-term conversational context.\n"
"- Maintain continuity with the user's previous discussions, projects, goals, and preferences.\n"
"- Prioritize recent and relevant information when generating responses.\n"
"- Do not repeat the summary unless necessary.\n"
"- If new information conflicts with old memory, prefer the latest context.\n"
"- Use the memory naturally to improve personalization, reasoning, and follow-up responses.\n"
"- Treat unresolved issues, active projects, and pending tasks as ongoing unless stated otherwise."
)
)
messages = [summary_message] + messages
response = await self.llm_with_tools.ainvoke(messages)
print(f"Final [{response.__class__.__name__}]: {response.content[:200]}")
print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<")
print('#'*50)
return {"messages": [response]}
######################### GRAPH #########################
def _build_graph(self):
g = StateGraph(ChatState)
g.add_node("chat_node", self.chat_node)
g.add_node("tools", self.tool_node)
g.add_edge(START, "chat_node")
g.add_conditional_edges("chat_node", tools_condition)
g.add_edge("tools", "chat_node")
return g.compile(checkpointer=self.checkpointer)
def _build_summary_graph(self):
g = StateGraph(ChatState)
g.add_node("summarize_node", self.summarize_conversation)
g.add_edge(START, "summarize_node")
g.add_edge("summarize_node", END)
return g.compile(checkpointer=self.checkpointer)
######################### STREAMING #########################
async def ai_only_stream(self, initial_state: dict, config: dict):
async for message_chunk, metadata in self.graph.astream(initial_state, config=config, stream_mode="messages"):
if isinstance(message_chunk, AIMessage) and message_chunk.content:
yield message_chunk.content
# Auto Summarization Execute
current_state = await self.graph.aget_state(config)
if len(current_state.values.get("messages", [])) > 10:
asyncio.create_task(
self.summary_graph.ainvoke(current_state.values, config=config)
)
print('@'*20,'Summarization Execute','@'*20)
######################### THREAD ID #########################
@staticmethod
def generate_thread_id() -> str:
return str(uuid.uuid4())
######################### RETRIEVE ALL THREADS #########################
async def retrieve_all_threads(self):
all_threads = set()
async for checkpoint in self.checkpointer.alist(None):
all_threads.add(checkpoint.config["configurable"]["thread_id"])
return list(all_threads)
######################### MAIN ENTRY POINT #########################
async def main(self, user_id: str, user_query: str):
async with self.conn.execute(
"SELECT userId, threadId FROM userid_threadid WHERE userId = ?", (user_id,)
) as cursor:
result = await cursor.fetchone()
if result is None:
thread_id = user_id + self.generate_thread_id()
await self.conn.execute(
"INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)",
(user_id, thread_id),
)
await self.conn.commit()
else:
thread_id = result[1]
initial_state = {"messages": [HumanMessage(content=user_query)]}
config = {
"configurable": {"thread_id": thread_id},
"metadata": {"thread_id": thread_id},
"run_name": "chat_turn",
}
return self.ai_only_stream(initial_state, config)