from langgraph.graph import StateGraph, START, END from typing import TypedDict, Annotated from langchain_core.messages import BaseMessage from langgraph.graph.message import add_messages from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langchain_ollama import ChatOllama from langgraph.prebuilt import ToolNode, tools_condition from langchain_community.tools import DuckDuckGoSearchRun from langchain_core.tools import tool from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, RemoveMessage, SystemMessage import aiosqlite, uuid, os, httpx, asyncio from twilio.rest import Client from dotenv import load_dotenv import json, pytz from datetime import datetime ######################### STATE ######################### class ChatState(TypedDict): messages: Annotated[list[BaseMessage], add_messages] summary: str ######################### TOOLS ######################### def get_db_path(): return os.path.join(os.path.dirname(__file__), "daa.db") def send_sms(to_number: str, message: str): client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN")) client.messages.create( body=message, from_=os.getenv("TWILIO_PHONE_NUMBER"), to=to_number ) def format_bd_number(num: str) -> str: num = num.strip().replace(" ", "") if num.startswith("01") and len(num) == 11: return "+88" + num if num.startswith("8801"): return "+" + num return num # already formatted or unknown @tool def get_bd_time() -> str: """ Get current Bangladesh time (Asia/Dhaka) with weekday name """ tz = pytz.timezone("Asia/Dhaka") now = datetime.now(tz) return now.strftime("%Y-%m-%d %H:%M:%S (%A, Bangladesh Time)") @tool async def search_doctor(name: str = "", category: str = "", visiting_days: str = "") -> str: """ Search doctors by name, category, or visiting_days from SQLite database. Any combination of filters is supported (OR logic for each field). """ db_path = get_db_path() query = "SELECT * FROM doctors WHERE 1=1" params = [] conditions = [] if name: conditions.append("LOWER(doctor_name) LIKE ?") params.append(f"%{name.lower()}%") if category: conditions.append("LOWER(category) LIKE ?") params.append(f"%{category.lower()}%") if visiting_days: conditions.append("LOWER(visiting_days) LIKE ?") params.append(f"%{visiting_days.lower()}%") if conditions: query += " AND (" + " OR ".join(conditions) + ")" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, params) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": "No doctors found matching your search.", "data": [] }) return json.dumps({ "success": True, "count": len(rows), "data": [dict(r) for r in rows] }) @tool async def search_appointment_by_phone(patient_num: str) -> str: """ Search all appointments using patient phone number. """ db_path = get_db_path() patient_num = format_bd_number(patient_num) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(""" SELECT * FROM patients WHERE patient_num = ? ORDER BY visiting_date ASC """, (patient_num,)) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": "No appointments found for this phone number.", "data": [] }) return json.dumps({ "success": True, "count": len(rows), "data": [dict(r) for r in rows] }) @tool async def book_appointment(doctor_id: int, patient_name: str, patient_age: str, patient_num: str, visiting_date: str) -> str: """ Book a doctor appointment and save it to the patients table. Args: doctor_id: Doctor's ID from search_doctor results. patient_name: Full name of the patient. patient_age: Age of the patient (e.g. "32"). patient_num: Contact phone number of the patient. visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15). Returns a booking confirmation with the new record ID. """ db_path = get_db_path() async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row patient_num = format_bd_number(patient_num) # Verify doctor exists cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,)) doctor = await cursor.fetchone() if not doctor: return f"No doctor found with ID {doctor_id}. Please search for a doctor first." doctor_data = dict(doctor) doctor_name = doctor_data.get("doctor_name", "Unknown") doctor_category = doctor_data.get("doctor_category", "Unknown") # Check for conflicting booking (same doctor + same date) cursor = await db.execute( """SELECT id FROM patients WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""", (doctor_name, visiting_date, patient_num), ) conflict = await cursor.fetchone() if conflict: return ( f"A booking for {patient_name} with Dr. {doctor_name} " f"on {visiting_date} already exists." ) # Insert into patients table cursor = await db.execute( """INSERT INTO patients (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date) VALUES (?, ?, ?, ?, ?, ?)""", (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date), ) await db.commit() # Send SMS confirmation sms_message = ( f"✅ Appointment Confirmed!\n" f"Doctor : {doctor_name}\n" f"Patient : {patient_name}\n" f"Visit Date : {visiting_date}\n" f"Please arrive 10 minutes early." ) # try: # send_sms(to_number=patient_num, message=sms_message) # sms_status = "📱 SMS confirmation sent." # except Exception as e: # sms_status = f"⚠️ SMS failed: {str(e)}" return ( f"✅ Appointment Booked!\n" f"━━━━━━━━━━━━━━━━━━━━━━\n" f"Doctor : {doctor_name}\n" f"Patient : {patient_name}\n" f"Age : {patient_age}\n" f"Date : {visiting_date}\n" f"Contact : {patient_num}\n" f"━━━━━━━━━━━━━━━━━━━━━━\n" f"Please arrive 10 minutes early." # f"{sms_status}" ) async def delete_appointment(patient_num: str, doctor_name: str) -> str: """ Delete an appointment using patient phone number and doctor name. """ db_path = get_db_path() # normalize phone number patient_num = format_bd_number(patient_num) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row # check if appointment exists first cursor = await db.execute(""" SELECT * FROM patients WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?) """, (patient_num, doctor_name)) row = await cursor.fetchone() if not row: return json.dumps({ "success": False, "message": "No matching appointment found to delete." }) # delete appointment await db.execute(""" DELETE FROM patients WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?) """, (patient_num, doctor_name)) await db.commit() return json.dumps({ "success": True, "message": f"Appointment with Dr. {doctor_name} deleted successfully." }) ######################### MAIN AGENT CLASS ######################### class AIBackend: def __init__(self): load_dotenv() os.environ["LANGCHAIN_PROJECT"] = "Doctor Appointment Automation" self.llm = ChatOllama(model="gemma4:e4b", streaming=True) # qwen2.5:3b, gemma4:e4b self.tools = [search_doctor, book_appointment, get_bd_time, search_appointment_by_phone, delete_appointment] self.tool_node = ToolNode(self.tools) self.llm_with_tools = self.llm.bind_tools(self.tools) async def async_setup(self): db_path = os.path.join(os.path.dirname(__file__), "daa.db") self.conn = await aiosqlite.connect(db_path) self.checkpointer = AsyncSqliteSaver(self.conn) await self._create_user_table() self.graph = self._build_graph() self.summary_graph = self._build_summary_graph() async def _create_user_table(self): await self.conn.execute(""" CREATE TABLE IF NOT EXISTS userid_threadid ( userId TEXT UNIQUE NOT NULL, threadId TEXT UNIQUE NOT NULL ) """) await self.conn.commit() ######################### SUMMARIZE NODE ######################### async def summarize_conversation(self, state: ChatState): existing_summary = state.get("summary", "") messages = state["messages"] prompt = ( f""" You are maintaining a long-term conversation memory for a chatbot. Existing summary: {existing_summary} Update and extend the summary using ONLY the new conversation messages above. Instructions: - Preserve important existing context. - Add new facts, decisions, preferences, goals, issues, and ongoing tasks. - Keep technical details concise but meaningful. - Track unresolved problems or follow-up actions. - Avoid repetition and remove outdated or redundant information when appropriate. - Maintain chronological consistency. - Write the summary in clear bullet points. - Focus on information useful for future conversations and contextual continuity. - Do NOT include casual greetings or temporary small talk unless important. - Keep the summary compact but information-dense. """ if existing_summary else """ You are creating a long-term conversation memory summary for a chatbot. Summarize the conversation above. Instructions: - Capture important user information, goals, preferences, projects, and decisions. - Include technical issues, debugging progress, and solutions discussed. - Track ongoing tasks or unresolved questions. - Ignore casual greetings and low-value chatter. - Write concise, structured bullet points. - Keep the summary compact but highly informative for future context retention. """ ) messages_for_summary = messages + [HumanMessage(content=prompt)] response = await self.llm.ainvoke(messages_for_summary) return { "summary": response.content, "messages": [RemoveMessage(id=m.id) for m in messages[:-2]], } async def should_summarize(self, state: ChatState): if len(state["messages"]) > 10: return "summarize_node" return "chat_node" ######################### CHAT NODE ######################### async def chat_node(self, state: ChatState): summary = state.get("summary", "") messages = state["messages"] print('#'*50) print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<") if summary: print(f"[SUMMARY]:\n{summary}\n") else: print("[NO SUMMARY YET]\n") print('$'*50) print("[MESSAGES]:") for m in messages: role = m.__class__.__name__ print(f" [{role}]: {m.content[:200]}") print('$'*50,'\n') if summary: summary_message = SystemMessage( content=( "You are provided with a condensed memory of previous conversations.\n\n" f"Conversation Memory:\n{summary}\n\n" "Instructions:\n" "- Use this memory as long-term conversational context.\n" "- Maintain continuity with the user's previous discussions, projects, goals, and preferences.\n" "- Prioritize recent and relevant information when generating responses.\n" "- Do not repeat the summary unless necessary.\n" "- If new information conflicts with old memory, prefer the latest context.\n" "- Use the memory naturally to improve personalization, reasoning, and follow-up responses.\n" "- Treat unresolved issues, active projects, and pending tasks as ongoing unless stated otherwise." ) ) messages = [summary_message] + messages response = await self.llm_with_tools.ainvoke(messages) print(f"Final [{response.__class__.__name__}]: {response.content[:200]}") print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<") print('#'*50) return {"messages": [response]} ######################### GRAPH ######################### def _build_graph(self): g = StateGraph(ChatState) g.add_node("chat_node", self.chat_node) g.add_node("tools", self.tool_node) g.add_edge(START, "chat_node") g.add_conditional_edges("chat_node", tools_condition) g.add_edge("tools", "chat_node") return g.compile(checkpointer=self.checkpointer) def _build_summary_graph(self): g = StateGraph(ChatState) g.add_node("summarize_node", self.summarize_conversation) g.add_edge(START, "summarize_node") g.add_edge("summarize_node", END) return g.compile(checkpointer=self.checkpointer) ######################### STREAMING ######################### async def ai_only_stream(self, initial_state: dict, config: dict): async for message_chunk, metadata in self.graph.astream(initial_state, config=config, stream_mode="messages"): if isinstance(message_chunk, AIMessage) and message_chunk.content: yield message_chunk.content # Auto Summarization Execute current_state = await self.graph.aget_state(config) if len(current_state.values.get("messages", [])) > 10: asyncio.create_task( self.summary_graph.ainvoke(current_state.values, config=config) ) print('@'*20,'Summarization Execute','@'*20) ######################### THREAD ID ######################### @staticmethod def generate_thread_id() -> str: return str(uuid.uuid4()) ######################### RETRIEVE ALL THREADS ######################### async def retrieve_all_threads(self): all_threads = set() async for checkpoint in self.checkpointer.alist(None): all_threads.add(checkpoint.config["configurable"]["thread_id"]) return list(all_threads) ######################### MAIN ENTRY POINT ######################### async def main(self, user_id: str, user_query: str): async with self.conn.execute( "SELECT userId, threadId FROM userid_threadid WHERE userId = ?", (user_id,) ) as cursor: result = await cursor.fetchone() if result is None: thread_id = user_id + self.generate_thread_id() await self.conn.execute( "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)", (user_id, thread_id), ) await self.conn.commit() else: thread_id = result[1] initial_state = {"messages": [HumanMessage(content=user_query)]} config = { "configurable": {"thread_id": thread_id}, "metadata": {"thread_id": thread_id}, "run_name": "chat_turn", } return self.ai_only_stream(initial_state, config)