| from langgraph.graph import StateGraph, START, END |
| from typing import TypedDict, Annotated |
| from langchain_core.messages import BaseMessage |
| from langgraph.graph.message import add_messages |
| from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver |
| from langchain_ollama import ChatOllama |
| from langgraph.prebuilt import ToolNode, tools_condition |
| from langchain_community.tools import DuckDuckGoSearchRun |
| from langchain_core.tools import tool |
| from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, RemoveMessage, SystemMessage |
| import aiosqlite, uuid, os, httpx, asyncio |
| from twilio.rest import Client |
| from dotenv import load_dotenv |
| import json, pytz |
| from datetime import datetime |
|
|
| |
| class ChatState(TypedDict): |
| messages: Annotated[list[BaseMessage], add_messages] |
| summary: str |
|
|
| |
| def get_db_path(): |
| return os.path.join(os.path.dirname(__file__), "daa.db") |
|
|
| def send_sms(to_number: str, message: str): |
| client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN")) |
| client.messages.create( |
| body=message, |
| from_=os.getenv("TWILIO_PHONE_NUMBER"), |
| to=to_number |
| ) |
|
|
| def format_bd_number(num: str) -> str: |
| num = num.strip().replace(" ", "") |
| if num.startswith("01") and len(num) == 11: |
| return "+88" + num |
| if num.startswith("8801"): |
| return "+" + num |
| return num |
|
|
| @tool |
| def get_bd_time() -> str: |
| """ |
| Get current Bangladesh time (Asia/Dhaka) with weekday name |
| """ |
| tz = pytz.timezone("Asia/Dhaka") |
| now = datetime.now(tz) |
| return now.strftime("%Y-%m-%d %H:%M:%S (%A, Bangladesh Time)") |
|
|
| @tool |
| async def search_doctor(name: str = "", category: str = "", visiting_days: str = "") -> str: |
| """ |
| Search doctors by name, category, or visiting_days from SQLite database. |
| Any combination of filters is supported (OR logic for each field). |
| """ |
| db_path = get_db_path() |
| query = "SELECT * FROM doctors WHERE 1=1" |
| params = [] |
| conditions = [] |
| |
| if name: |
| conditions.append("LOWER(doctor_name) LIKE ?") |
| params.append(f"%{name.lower()}%") |
| |
| if category: |
| conditions.append("LOWER(category) LIKE ?") |
| params.append(f"%{category.lower()}%") |
| |
| if visiting_days: |
| conditions.append("LOWER(visiting_days) LIKE ?") |
| params.append(f"%{visiting_days.lower()}%") |
| |
| if conditions: |
| query += " AND (" + " OR ".join(conditions) + ")" |
| |
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute(query, params) |
| rows = await cursor.fetchall() |
| |
| if not rows: |
| return json.dumps({ |
| "success": False, |
| "message": "No doctors found matching your search.", |
| "data": [] |
| }) |
| |
| return json.dumps({ |
| "success": True, |
| "count": len(rows), |
| "data": [dict(r) for r in rows] |
| }) |
|
|
| @tool |
| async def search_appointment_by_phone(patient_num: str) -> str: |
| """ |
| Search all appointments using patient phone number. |
| """ |
| db_path = get_db_path() |
| patient_num = format_bd_number(patient_num) |
| |
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
| |
| cursor = await db.execute(""" |
| SELECT * FROM patients |
| WHERE patient_num = ? |
| ORDER BY visiting_date ASC |
| """, (patient_num,)) |
| |
| rows = await cursor.fetchall() |
| |
| if not rows: |
| return json.dumps({ |
| "success": False, |
| "message": "No appointments found for this phone number.", |
| "data": [] |
| }) |
| |
| return json.dumps({ |
| "success": True, |
| "count": len(rows), |
| "data": [dict(r) for r in rows] |
| }) |
|
|
| @tool |
| async def book_appointment(doctor_id: int, patient_name: str, patient_age: str, patient_num: str, visiting_date: str) -> str: |
| """ |
| Book a doctor appointment and save it to the patients table. |
| |
| Args: |
| doctor_id: Doctor's ID from search_doctor results. |
| patient_name: Full name of the patient. |
| patient_age: Age of the patient (e.g. "32"). |
| patient_num: Contact phone number of the patient. |
| visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15). |
| |
| Returns a booking confirmation with the new record ID. |
| """ |
| db_path = get_db_path() |
| |
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
| patient_num = format_bd_number(patient_num) |
| |
| |
| cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,)) |
| doctor = await cursor.fetchone() |
| if not doctor: |
| return f"No doctor found with ID {doctor_id}. Please search for a doctor first." |
| |
| doctor_data = dict(doctor) |
| doctor_name = doctor_data.get("doctor_name", "Unknown") |
| doctor_category = doctor_data.get("doctor_category", "Unknown") |
| |
| |
| cursor = await db.execute( |
| """SELECT id FROM patients |
| WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""", |
| (doctor_name, visiting_date, patient_num), |
| ) |
| conflict = await cursor.fetchone() |
| if conflict: |
| return ( |
| f"A booking for {patient_name} with Dr. {doctor_name} " |
| f"on {visiting_date} already exists." |
| ) |
| |
| |
| cursor = await db.execute( |
| """INSERT INTO patients (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date) |
| VALUES (?, ?, ?, ?, ?, ?)""", |
| (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date), |
| ) |
| await db.commit() |
| |
| |
| sms_message = ( |
| f"✅ Appointment Confirmed!\n" |
| f"Doctor : {doctor_name}\n" |
| f"Patient : {patient_name}\n" |
| f"Visit Date : {visiting_date}\n" |
| f"Please arrive 10 minutes early." |
| ) |
| |
| |
| |
| |
| |
| |
| return ( |
| f"✅ Appointment Booked!\n" |
| f"━━━━━━━━━━━━━━━━━━━━━━\n" |
| f"Doctor : {doctor_name}\n" |
| f"Patient : {patient_name}\n" |
| f"Age : {patient_age}\n" |
| f"Date : {visiting_date}\n" |
| f"Contact : {patient_num}\n" |
| f"━━━━━━━━━━━━━━━━━━━━━━\n" |
| f"Please arrive 10 minutes early." |
| |
| ) |
|
|
| async def delete_appointment(patient_num: str, doctor_name: str) -> str: |
| """ |
| Delete an appointment using patient phone number and doctor name. |
| """ |
| db_path = get_db_path() |
| |
| patient_num = format_bd_number(patient_num) |
| |
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
| |
| |
| cursor = await db.execute(""" |
| SELECT * FROM patients |
| WHERE patient_num = ? |
| AND LOWER(doctor_name) = LOWER(?) |
| """, (patient_num, doctor_name)) |
| |
| row = await cursor.fetchone() |
| if not row: |
| return json.dumps({ |
| "success": False, |
| "message": "No matching appointment found to delete." |
| }) |
| |
| |
| await db.execute(""" |
| DELETE FROM patients |
| WHERE patient_num = ? |
| AND LOWER(doctor_name) = LOWER(?) |
| """, (patient_num, doctor_name)) |
| |
| await db.commit() |
| |
| return json.dumps({ |
| "success": True, |
| "message": f"Appointment with Dr. {doctor_name} deleted successfully." |
| }) |
|
|
| |
| class AIBackend: |
| def __init__(self): |
| load_dotenv() |
| os.environ["LANGCHAIN_PROJECT"] = "Doctor Appointment Automation" |
| self.llm = ChatOllama(model="gemma4:e4b", streaming=True) |
| self.tools = [search_doctor, book_appointment, get_bd_time, search_appointment_by_phone, delete_appointment] |
| self.tool_node = ToolNode(self.tools) |
| self.llm_with_tools = self.llm.bind_tools(self.tools) |
| |
| async def async_setup(self): |
| db_path = os.path.join(os.path.dirname(__file__), "daa.db") |
| self.conn = await aiosqlite.connect(db_path) |
| self.checkpointer = AsyncSqliteSaver(self.conn) |
| await self._create_user_table() |
| self.graph = self._build_graph() |
| self.summary_graph = self._build_summary_graph() |
| |
| async def _create_user_table(self): |
| await self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS userid_threadid ( |
| userId TEXT UNIQUE NOT NULL, |
| threadId TEXT UNIQUE NOT NULL |
| ) |
| """) |
| await self.conn.commit() |
| |
| |
| async def summarize_conversation(self, state: ChatState): |
| existing_summary = state.get("summary", "") |
| messages = state["messages"] |
| prompt = ( |
| f""" |
| You are maintaining a long-term conversation memory for a chatbot. |
| |
| Existing summary: |
| {existing_summary} |
| |
| Update and extend the summary using ONLY the new conversation messages above. |
| |
| Instructions: |
| - Preserve important existing context. |
| - Add new facts, decisions, preferences, goals, issues, and ongoing tasks. |
| - Keep technical details concise but meaningful. |
| - Track unresolved problems or follow-up actions. |
| - Avoid repetition and remove outdated or redundant information when appropriate. |
| - Maintain chronological consistency. |
| - Write the summary in clear bullet points. |
| - Focus on information useful for future conversations and contextual continuity. |
| - Do NOT include casual greetings or temporary small talk unless important. |
| - Keep the summary compact but information-dense. |
| """ |
| if existing_summary |
| else |
| """ |
| You are creating a long-term conversation memory summary for a chatbot. |
| |
| Summarize the conversation above. |
| |
| Instructions: |
| - Capture important user information, goals, preferences, projects, and decisions. |
| - Include technical issues, debugging progress, and solutions discussed. |
| - Track ongoing tasks or unresolved questions. |
| - Ignore casual greetings and low-value chatter. |
| - Write concise, structured bullet points. |
| - Keep the summary compact but highly informative for future context retention. |
| """ |
| ) |
| messages_for_summary = messages + [HumanMessage(content=prompt)] |
| response = await self.llm.ainvoke(messages_for_summary) |
| return { |
| "summary": response.content, |
| "messages": [RemoveMessage(id=m.id) for m in messages[:-2]], |
| } |
| |
| async def should_summarize(self, state: ChatState): |
| if len(state["messages"]) > 10: |
| return "summarize_node" |
| return "chat_node" |
| |
| |
| async def chat_node(self, state: ChatState): |
| summary = state.get("summary", "") |
| messages = state["messages"] |
| |
| print('#'*50) |
| print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<") |
| if summary: |
| print(f"[SUMMARY]:\n{summary}\n") |
| else: |
| print("[NO SUMMARY YET]\n") |
| |
| print('$'*50) |
| print("[MESSAGES]:") |
| for m in messages: |
| role = m.__class__.__name__ |
| print(f" [{role}]: {m.content[:200]}") |
| print('$'*50,'\n') |
| |
| if summary: |
| summary_message = SystemMessage( |
| content=( |
| "You are provided with a condensed memory of previous conversations.\n\n" |
| f"Conversation Memory:\n{summary}\n\n" |
| "Instructions:\n" |
| "- Use this memory as long-term conversational context.\n" |
| "- Maintain continuity with the user's previous discussions, projects, goals, and preferences.\n" |
| "- Prioritize recent and relevant information when generating responses.\n" |
| "- Do not repeat the summary unless necessary.\n" |
| "- If new information conflicts with old memory, prefer the latest context.\n" |
| "- Use the memory naturally to improve personalization, reasoning, and follow-up responses.\n" |
| "- Treat unresolved issues, active projects, and pending tasks as ongoing unless stated otherwise." |
| ) |
| ) |
| messages = [summary_message] + messages |
| response = await self.llm_with_tools.ainvoke(messages) |
| print(f"Final [{response.__class__.__name__}]: {response.content[:200]}") |
| print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<") |
| print('#'*50) |
| return {"messages": [response]} |
| |
| |
| def _build_graph(self): |
| g = StateGraph(ChatState) |
| g.add_node("chat_node", self.chat_node) |
| g.add_node("tools", self.tool_node) |
| |
| g.add_edge(START, "chat_node") |
| g.add_conditional_edges("chat_node", tools_condition) |
| g.add_edge("tools", "chat_node") |
| |
| return g.compile(checkpointer=self.checkpointer) |
| |
| def _build_summary_graph(self): |
| g = StateGraph(ChatState) |
| g.add_node("summarize_node", self.summarize_conversation) |
| g.add_edge(START, "summarize_node") |
| g.add_edge("summarize_node", END) |
| return g.compile(checkpointer=self.checkpointer) |
| |
| |
| async def ai_only_stream(self, initial_state: dict, config: dict): |
| async for message_chunk, metadata in self.graph.astream(initial_state, config=config, stream_mode="messages"): |
| if isinstance(message_chunk, AIMessage) and message_chunk.content: |
| yield message_chunk.content |
| |
| |
| current_state = await self.graph.aget_state(config) |
| if len(current_state.values.get("messages", [])) > 10: |
| asyncio.create_task( |
| self.summary_graph.ainvoke(current_state.values, config=config) |
| ) |
| print('@'*20,'Summarization Execute','@'*20) |
| |
| |
| @staticmethod |
| def generate_thread_id() -> str: |
| return str(uuid.uuid4()) |
| |
| |
| async def retrieve_all_threads(self): |
| all_threads = set() |
| async for checkpoint in self.checkpointer.alist(None): |
| all_threads.add(checkpoint.config["configurable"]["thread_id"]) |
| return list(all_threads) |
| |
| |
| async def main(self, user_id: str, user_query: str): |
| async with self.conn.execute( |
| "SELECT userId, threadId FROM userid_threadid WHERE userId = ?", (user_id,) |
| ) as cursor: |
| result = await cursor.fetchone() |
| |
| if result is None: |
| thread_id = user_id + self.generate_thread_id() |
| await self.conn.execute( |
| "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)", |
| (user_id, thread_id), |
| ) |
| await self.conn.commit() |
| else: |
| thread_id = result[1] |
| |
| initial_state = {"messages": [HumanMessage(content=user_query)]} |
| config = { |
| "configurable": {"thread_id": thread_id}, |
| "metadata": {"thread_id": thread_id}, |
| "run_name": "chat_turn", |
| } |
| return self.ai_only_stream(initial_state, config) |