| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import os |
| import uuid |
| import aiosmtplib |
|
|
| import aiosqlite |
| import pytz |
| from datetime import datetime, timedelta |
| from dotenv import load_dotenv |
|
|
| from langchain_core.messages import ( |
| AIMessage, AIMessageChunk, HumanMessage, RemoveMessage, |
| SystemMessage, ToolMessage, |
| ) |
| from langchain_core.tools import tool |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver |
| from langgraph.graph import END, START, StateGraph |
| from langgraph.graph.message import add_messages |
| from langgraph.prebuilt import ToolNode, tools_condition |
| from twilio.rest import Client |
| from typing import Annotated, TypedDict, Optional, AsyncGenerator |
| from email.message import EmailMessage |
| from dateparser.search import search_dates |
| from langchain_ollama import ChatOllama |
|
|
| load_dotenv() |
|
|
|
|
| |
| |
| |
| class ChatState(TypedDict): |
| messages: Annotated[list, add_messages] |
| summary: str |
|
|
|
|
| |
| |
| |
| def get_db_path() -> str: |
| return os.path.join(os.path.dirname(__file__), "daa.db") |
|
|
|
|
| def format_bd_number(num: str) -> str: |
| num = num.strip().replace(" ", "") |
| if num.startswith("01") and len(num) == 11: |
| return "+88" + num |
| if num.startswith("8801"): |
| return "+" + num |
| return num |
|
|
|
|
| def send_sms(to_number: str, message: str) -> None: |
| client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN")) |
| client.messages.create( |
| body=message, |
| from_=os.getenv("TWILIO_PHONE_NUMBER"), |
| to=to_number, |
| ) |
|
|
|
|
| async def send_mail(to_mail: str, subject: str, body: str): |
| email = EmailMessage() |
| email["From"] = "walidofficework@gmail.com" |
| email["To"] = to_mail |
| email["Subject"] = subject |
| email.set_content(body) |
|
|
| await aiosmtplib.send( |
| email, |
| hostname="smtp.gmail.com", |
| port=465, |
| username="walidofficework@gmail.com", |
| password="bajq dkqr qacs pehr", |
| use_tls=True, |
| ) |
|
|
| |
| |
| |
| @tool |
| def get_bd_time() -> str: |
| """Get current Bangladesh date and time along with the next 14 days.""" |
| |
| tz = pytz.timezone("Asia/Dhaka") |
| |
| now = datetime.now(tz) |
| |
| result = { |
| "CURRENT_DATETIME": now.strftime("%Y-%m-%d %H:%M:%S %Z"), |
| "TODAY": now.strftime("%A, %B %d, %Y"), |
| "TOMORROW": (now + timedelta(days=1)).strftime("%A, %B %d, %Y"), |
| "NEXT_14_DAYS": {} |
| } |
| |
| for i in range(1, 15): |
| future_date = now + timedelta(days=i) |
| result["NEXT_14_DAYS"][f"+{i}"] = future_date.strftime("%A, %B %d, %Y") |
| return json.dumps(result) |
|
|
| @tool |
| async def get_doctor_categories() -> str: |
| """ |
| Fetch all unique doctor categories from the database. |
| """ |
|
|
| db_path = get_db_path() |
|
|
| query = """ |
| SELECT DISTINCT category |
| FROM doctors |
| WHERE category IS NOT NULL |
| AND TRIM(category) != '' |
| ORDER BY category ASC |
| """ |
|
|
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
|
|
| cursor = await db.execute(query) |
| rows = await cursor.fetchall() |
|
|
| categories = [row["category"] for row in rows] |
|
|
| return json.dumps({ |
| "success": True, |
| "count": len(categories), |
| "data": categories |
| }) |
|
|
| @tool |
| async def get_doctors_by_day( |
| visiting_day: str, |
| ) -> str: |
| """ |
| Get all doctors available on a specific visiting day. |
| Example inputs: |
| - Sunday |
| - Monday |
| - Friday |
| """ |
|
|
| db_path = get_db_path() |
|
|
| query = """ |
| SELECT * |
| FROM doctors |
| WHERE LOWER(visiting_days) LIKE ? |
| """ |
|
|
| param = [f"%{visiting_day.lower()}%"] |
|
|
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
|
|
| cursor = await db.execute(query, param) |
| rows = await cursor.fetchall() |
|
|
| if not rows: |
| return json.dumps({ |
| "success": False, |
| "message": f"No doctors found for {visiting_day}.", |
| "data": [] |
| }) |
|
|
| doctors = [dict(row) for row in rows] |
|
|
| return json.dumps({ |
| "success": True, |
| "visiting_day": visiting_day, |
| "count": len(doctors), |
| "data": doctors |
| }, ensure_ascii=False) |
|
|
| @tool |
| async def search_doctor( |
| name: str = "", |
| category: str = "", |
| visiting_days: str = "", |
| ) -> str: |
| """ |
| Search doctors by name, category, or visiting_days from the database. |
| Any combination of filters is supported (OR logic across fields). |
| """ |
| db_path = get_db_path() |
| query = "SELECT * FROM doctors WHERE 1=1" |
| params: list = [] |
| conditions: list[str] = [] |
|
|
| if name: |
| conditions.append("LOWER(doctor_name) LIKE ?") |
| params.append(f"%{name.lower()}%") |
| if category: |
| conditions.append("LOWER(category) LIKE ?") |
| params.append(f"%{category.lower()}%") |
| if visiting_days: |
| conditions.append("LOWER(visiting_days) LIKE ?") |
| params.append(f"%{visiting_days.lower()}%") |
|
|
| if conditions: |
| query += " AND (" + " OR ".join(conditions) + ")" |
|
|
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute(query, params) |
| rows = await cursor.fetchall() |
|
|
| if not rows: |
| return json.dumps({"success": False, "message": "No doctors found.", "data": []}) |
|
|
| return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]}) |
|
|
|
|
| @tool |
| async def search_appointment_by_phone(patient_num: str) -> str: |
| """Search all appointments using the patient's phone number.""" |
| db_path = get_db_path() |
| patient_num = format_bd_number(patient_num) |
|
|
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute( |
| "SELECT * FROM patients WHERE patient_num = ? ORDER BY visiting_date ASC", |
| (patient_num,), |
| ) |
| rows = await cursor.fetchall() |
|
|
| if not rows: |
| return json.dumps({ |
| "success": False, |
| "message": "No appointments found for this phone number.", |
| "data": [], |
| }) |
| return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]}) |
|
|
|
|
| @tool |
| async def book_appointment( |
| doctor_id: int, |
| patient_name: str, |
| patient_age: str, |
| patient_num: str, |
| visiting_date: str, |
| patient_mail: str |
| ) -> str: |
| """ |
| Book a doctor appointment and save it to the patients table. |
| Args: |
| doctor_id: Doctor's ID from search_doctor results. |
| patient_name: Full name of the patient. |
| patient_age: Age of the patient (e.g. "32"). |
| patient_num: Contact phone number of the patient. |
| visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15). |
| patient_mail: Mail address for confirmation mail. |
| """ |
| db_path = get_db_path() |
| patient_num = format_bd_number(patient_num) |
|
|
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
|
|
| cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,)) |
| doctor = await cursor.fetchone() |
| if not doctor: |
| return f"No doctor found with ID {doctor_id}. Please search for a doctor first." |
|
|
| doctor_data = dict(doctor) |
| doctor_name = doctor_data.get("doctor_name", "Unknown") |
| doctor_category = doctor_data.get("category", "Unknown") |
|
|
| cursor = await db.execute( |
| """SELECT id FROM patients |
| WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""", |
| (doctor_name, visiting_date, patient_num), |
| ) |
| if await cursor.fetchone(): |
| return ( |
| f"A booking for {patient_name} with Dr. {doctor_name} " |
| f"on {visiting_date} already exists." |
| ) |
|
|
| await db.execute( |
| """INSERT INTO patients |
| (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail) |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", |
| (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail), |
| ) |
| await db.commit() |
|
|
| |
| mail_message = ( |
| f"Doctor : {doctor_name}\n" |
| f"Patient : {patient_name}\n" |
| f"Visit Date : {visiting_date}\n" |
| f"Please arrive 10 minutes early." |
| ) |
| try: |
| await send_mail( |
| to_mail=patient_mail, |
| subject="β
Appointment Confirmed!", |
| body=mail_message, |
| ) |
| mail_status = "\nπ§ Mail confirmation sent." |
| except Exception as e: |
| mail_status = f"\nβ οΈ Mail failed: {str(e)}" |
|
|
| return ( |
| f"β
Appointment Booked!\n" |
| f"ββββββββββββββββββββββ\n" |
| f"Doctor : {doctor_name}\n" |
| f"Patient : {patient_name}\n" |
| f"Age : {patient_age}\n" |
| f"Date : {visiting_date}\n" |
| f"Contact : {patient_num}\n" |
| f"ββββββββββββββββββββββ\n" |
| f"Please arrive 10 minutes early." |
| f"{mail_status}" |
| ) |
|
|
|
|
| @tool |
| async def delete_appointment(patient_num: str, doctor_name: str) -> str: |
| """Delete an appointment using the patient's phone number and doctor name.""" |
| db_path = get_db_path() |
| patient_num = format_bd_number(patient_num) |
|
|
| async with aiosqlite.connect(db_path) as db: |
| db.row_factory = aiosqlite.Row |
|
|
| cursor = await db.execute( |
| """SELECT * FROM patients |
| WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""", |
| (patient_num, doctor_name), |
| ) |
| if not await cursor.fetchone(): |
| return json.dumps({"success": False, "message": "No matching appointment found."}) |
|
|
| await db.execute( |
| """DELETE FROM patients |
| WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""", |
| (patient_num, doctor_name), |
| ) |
| await db.commit() |
|
|
| return json.dumps({ |
| "success": True, |
| "message": f"Appointment with Dr. {doctor_name} deleted successfully.", |
| }) |
|
|
|
|
| |
| |
| |
| BASE_SYSTEM = """ |
| You are a Doctor Appointment Assistant AI. |
| |
| Your job is to help users manage medical appointments. |
| |
| CAPABILITIES: |
| - Book doctor appointments |
| - Reschedule appointments |
| - Cancel appointments |
| - Collect patient details |
| |
| STRICT RULES: |
| - You are NOT a doctor. |
| - NEVER diagnose diseases. |
| - NEVER recommend medicines or treatments. |
| |
| APPOINTMENT FLOW: |
| 1. Detect intent (book / cancel / reschedule / inquiry) |
| 2. Collect details |
| 3. Confirm all details before final booking |
| |
| STYLE: |
| - Be short, clear, structured |
| - Ask one question at a time when needed |
| - Focus on completing booking |
| |
| LANGUAGE RULE: |
| - Detect user language from latest message. |
| - If English β reply English. |
| - If Bangla β reply Bangla (বাΰ¦ΰ¦²ΰ¦Ύ). |
| - If Banglish β reply Bangla (বাΰ¦ΰ¦²ΰ¦Ύ). |
| - Never mix languages unless user mixes first. |
| |
| TOOLS: |
| - Use backend tools if available for scheduling |
| - Always confirm before final action |
| """ |
|
|
| SUMMARY_SYSTEM = ( |
| BASE_SYSTEM |
| + "\nYou also have a condensed memory of previous conversations:\n\n" |
| "{summary}\n\n" |
| "Use this memory for continuity. Do not repeat it unless asked." |
| ) |
|
|
|
|
| |
| |
| |
| class AIBackend: |
|
|
| |
| def __init__(self, use_gemini: bool = False, use_ollama: bool = True, use_fallback: bool = False): |
| self.use_gemini = use_gemini |
| self.use_ollama = use_ollama |
| self.use_fallback = use_fallback |
| os.environ.setdefault("LANGCHAIN_PROJECT", "Doctor Appointment Automation") |
|
|
| if use_gemini: |
| self.llm = ChatGoogleGenerativeAI( |
| model="gemini-2.0-flash", |
| temperature=0.3, |
| ) |
| elif use_ollama: |
| self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2) |
| else: |
| |
| self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2) |
|
|
| self.tools = [ |
| search_doctor, |
| book_appointment, |
| get_bd_time, |
| search_appointment_by_phone, |
| delete_appointment, |
| get_doctor_categories, |
| get_doctors_by_day |
| ] |
| self.tool_node = ToolNode(self.tools) |
| self.llm_with_tools = self.llm.bind_tools(self.tools) |
|
|
| |
| async def async_setup(self) -> None: |
| db_path = get_db_path() |
| self.conn = await aiosqlite.connect(db_path) |
| self.checkpointer = AsyncSqliteSaver(self.conn) |
| await self._create_tables() |
| self.graph = self._build_graph() |
| self.summary_graph = self._build_summary_graph() |
| print("[Backend] AIBackend ready β") |
|
|
| async def _create_tables(self) -> None: |
| await self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS userid_threadid ( |
| userId TEXT UNIQUE NOT NULL, |
| threadId TEXT UNIQUE NOT NULL |
| ) |
| """) |
| await self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS doctors ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| doctor_name TEXT, |
| category TEXT, |
| visiting_days TEXT, |
| visiting_time TEXT, |
| visiting_money INTEGER |
| ) |
| """) |
| await self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS patients ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| doctor_name TEXT, |
| doctor_category TEXT, |
| patient_name TEXT, |
| patient_age TEXT, |
| patient_num TEXT, |
| visiting_date TEXT, |
| patient_mail TEXT |
| ) |
| """) |
| await self.conn.commit() |
|
|
| |
| async def summarize_conversation(self, state: ChatState): |
| existing = state.get("summary", "") |
| messages = state["messages"] |
|
|
| if existing: |
| prompt = ( |
| f"Existing summary:\n{existing}\n\n" |
| "Update the summary with the new messages above. " |
| "Keep it concise, bullet-pointed, and information-dense. " |
| "Preserve unresolved issues and ongoing tasks." |
| ) |
| else: |
| prompt = ( |
| "Summarise this conversation. " |
| "Capture goals, decisions, preferences, and unresolved questions. " |
| "Be concise and use bullet points." |
| ) |
|
|
| response = await self.llm.ainvoke(messages + [HumanMessage(content=prompt)]) |
| return { |
| "summary": response.content, |
| "messages": [RemoveMessage(id=m.id) for m in messages[:-2]], |
| } |
|
|
| |
| async def chat_node(self, state: ChatState): |
| """ |
| Invokes the LLM with tool bindings and returns the AI response. |
| |
| Uses ainvoke() (not collect-all-then-return astream()) so the call is |
| clean and deterministic. Token-level streaming is handled by LangGraph |
| itself via stream_mode="messages" in ai_only_stream(), which intercepts |
| the underlying LLM streaming at the graph level. |
| """ |
| summary = state.get("summary", "") |
| messages = state["messages"] |
|
|
| print("#" * 50) |
| print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<") |
| print(f"[SUMMARY]: {summary[:120] if summary else 'None'}") |
| for m in messages: |
| print(f" [{m.__class__.__name__}]: {str(m.content)[:160]}") |
| print("#" * 50) |
|
|
| sys_content = SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM |
| full_messages = [SystemMessage(content=sys_content)] + list(messages) |
|
|
| response = await self.llm_with_tools.ainvoke(full_messages) |
|
|
| print(f"[AI]: {str(response.content)[:200]}") |
| print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<") |
| return {"messages": [response]} |
|
|
| |
| def _build_graph(self): |
| g = StateGraph(ChatState) |
| g.add_node("chat_node", self.chat_node) |
| g.add_node("tools", self.tool_node) |
| g.add_edge(START, "chat_node") |
| g.add_conditional_edges("chat_node", tools_condition) |
| g.add_edge("tools", "chat_node") |
| return g.compile(checkpointer=self.checkpointer) |
|
|
| def _build_summary_graph(self): |
| g = StateGraph(ChatState) |
| g.add_node("summarize_node", self.summarize_conversation) |
| g.add_edge(START, "summarize_node") |
| g.add_edge("summarize_node", END) |
| return g.compile(checkpointer=self.checkpointer) |
|
|
| |
| async def ai_only_stream( |
| self, initial_state: dict, config: dict |
| ) -> AsyncGenerator[str, None]: |
| """ |
| Async generator β yields AI text tokens as they arrive. |
| |
| FIX-BUG9: narrowed isinstance check to exclude ToolMessage content |
| from being streamed to the user, and guards against non-str content |
| (e.g. multimodal list payloads from Ollama tool-call chunks). |
| """ |
| async for chunk, _meta in self.graph.astream( |
| initial_state, config=config, stream_mode="messages" |
| ): |
| |
| |
| |
| if ( |
| isinstance(chunk, (AIMessage, AIMessageChunk)) |
| and not isinstance(chunk, ToolMessage) |
| and isinstance(chunk.content, str) |
| and chunk.content |
| ): |
| yield chunk.content |
|
|
| |
| try: |
| current = await self.graph.aget_state(config) |
| if len(current.values.get("messages", [])) > 10: |
| asyncio.create_task( |
| self.summary_graph.ainvoke(current.values, config=config) |
| ) |
| print("@" * 20, "Summarisation triggered", "@" * 20) |
| except Exception as exc: |
| print(f"[Backend] Summarisation check failed: {exc}") |
|
|
| |
| @staticmethod |
| def generate_thread_id() -> str: |
| return str(uuid.uuid4()) |
|
|
| async def retrieve_all_threads(self) -> list[str]: |
| threads: set[str] = set() |
| async for cp in self.checkpointer.alist(None): |
| threads.add(cp.config["configurable"]["thread_id"]) |
| return list(threads) |
|
|
| |
| async def main(self, user_id: str, user_query: str) -> AsyncGenerator[str, None]: |
| """Return an async generator of AI text tokens.""" |
| async with self.conn.execute( |
| "SELECT threadId FROM userid_threadid WHERE userId = ?", (user_id,) |
| ) as cursor: |
| row = await cursor.fetchone() |
|
|
| if row is None: |
| thread_id = user_id + self.generate_thread_id() |
| await self.conn.execute( |
| "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)", |
| (user_id, thread_id), |
| ) |
| await self.conn.commit() |
| else: |
| thread_id = row[0] |
|
|
| initial_state = {"messages": [HumanMessage(content=user_query)]} |
| config = { |
| "configurable": {"thread_id": thread_id}, |
| "metadata": {"thread_id": thread_id}, |
| "run_name": "chat_turn", |
| } |
| return self.ai_only_stream(initial_state, config) |
|
|