from __future__ import annotations import asyncio import json import os import uuid import aiosmtplib import aiosqlite import pytz from datetime import datetime, timedelta from dotenv import load_dotenv from langchain_core.messages import ( AIMessage, AIMessageChunk, HumanMessage, RemoveMessage, SystemMessage, ToolMessage, ) from langchain_core.tools import tool from langchain_google_genai import ChatGoogleGenerativeAI from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.graph import END, START, StateGraph from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition from twilio.rest import Client from typing import Annotated, TypedDict, Optional, AsyncGenerator from email.message import EmailMessage from dateparser.search import search_dates from langchain_ollama import ChatOllama load_dotenv() # ═══════════════════════════════════════════════════════════════════════════════ # STATE # ═══════════════════════════════════════════════════════════════════════════════ class ChatState(TypedDict): messages: Annotated[list, add_messages] summary: str # ═══════════════════════════════════════════════════════════════════════════════ # HELPERS # ═══════════════════════════════════════════════════════════════════════════════ def get_db_path() -> str: return os.path.join(os.path.dirname(__file__), "daa.db") def format_bd_number(num: str) -> str: num = num.strip().replace(" ", "") if num.startswith("01") and len(num) == 11: return "+88" + num if num.startswith("8801"): return "+" + num return num def send_sms(to_number: str, message: str) -> None: client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN")) client.messages.create( body=message, from_=os.getenv("TWILIO_PHONE_NUMBER"), to=to_number, ) async def send_mail(to_mail: str, subject: str, body: str): email = EmailMessage() email["From"] = "walidofficework@gmail.com" email["To"] = to_mail email["Subject"] = subject email.set_content(body) await aiosmtplib.send( email, hostname="smtp.gmail.com", port=465, username="walidofficework@gmail.com", password="bajq dkqr qacs pehr", use_tls=True, ) # ═══════════════════════════════════════════════════════════════════════════════ # TOOLS # ═══════════════════════════════════════════════════════════════════════════════ @tool def get_bd_time() -> str: """Get current Bangladesh date and time along with the next 14 days.""" # Bangladesh timezone tz = pytz.timezone("Asia/Dhaka") # Current datetime now = datetime.now(tz) # Create result dictionary result = { "CURRENT_DATETIME": now.strftime("%Y-%m-%d %H:%M:%S %Z"), "TODAY": now.strftime("%A, %B %d, %Y"), "TOMORROW": (now + timedelta(days=1)).strftime("%A, %B %d, %Y"), "NEXT_14_DAYS": {} } # Generate next 14 days for i in range(1, 15): future_date = now + timedelta(days=i) result["NEXT_14_DAYS"][f"+{i}"] = future_date.strftime("%A, %B %d, %Y") return json.dumps(result) @tool async def get_doctor_categories() -> str: """ Fetch all unique doctor categories from the database. """ db_path = get_db_path() query = """ SELECT DISTINCT category FROM doctors WHERE category IS NOT NULL AND TRIM(category) != '' ORDER BY category ASC """ async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query) rows = await cursor.fetchall() categories = [row["category"] for row in rows] return json.dumps({ "success": True, "count": len(categories), "data": categories }) @tool async def get_doctors_by_day( visiting_day: str, ) -> str: """ Get all doctors available on a specific visiting day. Example inputs: - Sunday - Monday - Friday """ db_path = get_db_path() query = """ SELECT * FROM doctors WHERE LOWER(visiting_days) LIKE ? """ param = [f"%{visiting_day.lower()}%"] async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, param) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": f"No doctors found for {visiting_day}.", "data": [] }) doctors = [dict(row) for row in rows] return json.dumps({ "success": True, "visiting_day": visiting_day, "count": len(doctors), "data": doctors }, ensure_ascii=False) @tool async def search_doctor( name: str = "", category: str = "", visiting_days: str = "", ) -> str: """ Search doctors by name, category, or visiting_days from the database. Any combination of filters is supported (OR logic across fields). """ db_path = get_db_path() query = "SELECT * FROM doctors WHERE 1=1" params: list = [] conditions: list[str] = [] if name: conditions.append("LOWER(doctor_name) LIKE ?") params.append(f"%{name.lower()}%") if category: conditions.append("LOWER(category) LIKE ?") params.append(f"%{category.lower()}%") if visiting_days: conditions.append("LOWER(visiting_days) LIKE ?") params.append(f"%{visiting_days.lower()}%") if conditions: query += " AND (" + " OR ".join(conditions) + ")" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, params) rows = await cursor.fetchall() if not rows: return json.dumps({"success": False, "message": "No doctors found.", "data": []}) return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]}) @tool async def search_appointment_by_phone(patient_num: str) -> str: """Search all appointments using the patient's phone number.""" db_path = get_db_path() patient_num = format_bd_number(patient_num) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM patients WHERE patient_num = ? ORDER BY visiting_date ASC", (patient_num,), ) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": "No appointments found for this phone number.", "data": [], }) return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]}) @tool async def book_appointment( doctor_id: int, patient_name: str, patient_age: str, patient_num: str, visiting_date: str, patient_mail: str ) -> str: """ Book a doctor appointment and save it to the patients table. Args: doctor_id: Doctor's ID from search_doctor results. patient_name: Full name of the patient. patient_age: Age of the patient (e.g. "32"). patient_num: Contact phone number of the patient. visiting_date: Date of visit in YYYY-MM-DD format (e.g. 2025-06-15). patient_mail: Mail address for confirmation mail. """ db_path = get_db_path() patient_num = format_bd_number(patient_num) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,)) doctor = await cursor.fetchone() if not doctor: return f"No doctor found with ID {doctor_id}. Please search for a doctor first." doctor_data = dict(doctor) doctor_name = doctor_data.get("doctor_name", "Unknown") doctor_category = doctor_data.get("category", "Unknown") cursor = await db.execute( """SELECT id FROM patients WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""", (doctor_name, visiting_date, patient_num), ) if await cursor.fetchone(): return ( f"A booking for {patient_name} with Dr. {doctor_name} " f"on {visiting_date} already exists." ) await db.execute( """INSERT INTO patients (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail) VALUES (?, ?, ?, ?, ?, ?, ?)""", (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, patient_mail), ) await db.commit() # Mail SMS confirmation mail_message = ( f"Doctor : {doctor_name}\n" f"Patient : {patient_name}\n" f"Visit Date : {visiting_date}\n" f"Please arrive 10 minutes early." ) try: await send_mail( to_mail=patient_mail, subject="✅ Appointment Confirmed!", body=mail_message, ) mail_status = "\n📧 Mail confirmation sent." except Exception as e: mail_status = f"\n⚠️ Mail failed: {str(e)}" return ( f"✅ Appointment Booked!\n" f"━━━━━━━━━━━━━━━━━━━━━━\n" f"Doctor : {doctor_name}\n" f"Patient : {patient_name}\n" f"Age : {patient_age}\n" f"Date : {visiting_date}\n" f"Contact : {patient_num}\n" f"━━━━━━━━━━━━━━━━━━━━━━\n" f"Please arrive 10 minutes early." f"{mail_status}" ) @tool async def delete_appointment(patient_num: str, doctor_name: str) -> str: """Delete an appointment using the patient's phone number and doctor name.""" db_path = get_db_path() patient_num = format_bd_number(patient_num) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( """SELECT * FROM patients WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""", (patient_num, doctor_name), ) if not await cursor.fetchone(): return json.dumps({"success": False, "message": "No matching appointment found."}) await db.execute( """DELETE FROM patients WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""", (patient_num, doctor_name), ) await db.commit() return json.dumps({ "success": True, "message": f"Appointment with Dr. {doctor_name} deleted successfully.", }) # ═══════════════════════════════════════════════════════════════════════════════ # SYSTEM PROMPT # ═══════════════════════════════════════════════════════════════════════════════ BASE_SYSTEM = """ You are a Doctor Appointment Assistant AI. Your job is to help users manage medical appointments. CAPABILITIES: - Book doctor appointments - Reschedule appointments - Cancel appointments - Collect patient details STRICT RULES: - You are NOT a doctor. - NEVER diagnose diseases. - NEVER recommend medicines or treatments. APPOINTMENT FLOW: 1. Detect intent (book / cancel / reschedule / inquiry) 2. Collect details 3. Confirm all details before final booking STYLE: - Be short, clear, structured - Ask one question at a time when needed - Focus on completing booking LANGUAGE RULE: - Detect user language from latest message. - If English → reply English. - If Bangla → reply Bangla (বাংলা). - If Banglish → reply Bangla (বাংলা). - Never mix languages unless user mixes first. TOOLS: - Use backend tools if available for scheduling - Always confirm before final action """ SUMMARY_SYSTEM = ( BASE_SYSTEM + "\nYou also have a condensed memory of previous conversations:\n\n" "{summary}\n\n" "Use this memory for continuity. Do not repeat it unless asked." ) # ═══════════════════════════════════════════════════════════════════════════════ # AGENT # ═══════════════════════════════════════════════════════════════════════════════ class AIBackend: # ── FIX-BUG1: was `_init_` (single underscores) — never called by Python def __init__(self, use_gemini: bool = False, use_ollama: bool = True, use_fallback: bool = False): self.use_gemini = use_gemini self.use_ollama = use_ollama self.use_fallback = use_fallback os.environ.setdefault("LANGCHAIN_PROJECT", "Doctor Appointment Automation") if use_gemini: self.llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash", temperature=0.3, ) elif use_ollama: self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2) else: # Local fallback — extend as needed self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.2) self.tools = [ search_doctor, book_appointment, get_bd_time, search_appointment_by_phone, delete_appointment, get_doctor_categories, get_doctors_by_day ] self.tool_node = ToolNode(self.tools) self.llm_with_tools = self.llm.bind_tools(self.tools) # ── Setup ────────────────────────────────────────────────────────────────── async def async_setup(self) -> None: db_path = get_db_path() self.conn = await aiosqlite.connect(db_path) self.checkpointer = AsyncSqliteSaver(self.conn) await self._create_tables() self.graph = self._build_graph() self.summary_graph = self._build_summary_graph() print("[Backend] AIBackend ready ✓") async def _create_tables(self) -> None: await self.conn.execute(""" CREATE TABLE IF NOT EXISTS userid_threadid ( userId TEXT UNIQUE NOT NULL, threadId TEXT UNIQUE NOT NULL ) """) await self.conn.execute(""" CREATE TABLE IF NOT EXISTS doctors ( id INTEGER PRIMARY KEY AUTOINCREMENT, doctor_name TEXT, category TEXT, visiting_days TEXT, visiting_time TEXT, visiting_money INTEGER ) """) await self.conn.execute(""" CREATE TABLE IF NOT EXISTS patients ( id INTEGER PRIMARY KEY AUTOINCREMENT, doctor_name TEXT, doctor_category TEXT, patient_name TEXT, patient_age TEXT, patient_num TEXT, visiting_date TEXT, patient_mail TEXT ) """) await self.conn.commit() # ── Summarise node ───────────────────────────────────────────────────────── async def summarize_conversation(self, state: ChatState): existing = state.get("summary", "") messages = state["messages"] if existing: prompt = ( f"Existing summary:\n{existing}\n\n" "Update the summary with the new messages above. " "Keep it concise, bullet-pointed, and information-dense. " "Preserve unresolved issues and ongoing tasks." ) else: prompt = ( "Summarise this conversation. " "Capture goals, decisions, preferences, and unresolved questions. " "Be concise and use bullet points." ) response = await self.llm.ainvoke(messages + [HumanMessage(content=prompt)]) return { "summary": response.content, "messages": [RemoveMessage(id=m.id) for m in messages[:-2]], } # ── Chat node ────────────────────────────────────────────────────────────── async def chat_node(self, state: ChatState): """ Invokes the LLM with tool bindings and returns the AI response. Uses ainvoke() (not collect-all-then-return astream()) so the call is clean and deterministic. Token-level streaming is handled by LangGraph itself via stream_mode="messages" in ai_only_stream(), which intercepts the underlying LLM streaming at the graph level. """ summary = state.get("summary", "") messages = state["messages"] print("#" * 50) print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<") print(f"[SUMMARY]: {summary[:120] if summary else 'None'}") for m in messages: print(f" [{m.__class__.__name__}]: {str(m.content)[:160]}") print("#" * 50) sys_content = SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM full_messages = [SystemMessage(content=sys_content)] + list(messages) response = await self.llm_with_tools.ainvoke(full_messages) print(f"[AI]: {str(response.content)[:200]}") print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<") return {"messages": [response]} # ── Graph ────────────────────────────────────────────────────────────────── def _build_graph(self): g = StateGraph(ChatState) g.add_node("chat_node", self.chat_node) g.add_node("tools", self.tool_node) g.add_edge(START, "chat_node") g.add_conditional_edges("chat_node", tools_condition) g.add_edge("tools", "chat_node") return g.compile(checkpointer=self.checkpointer) def _build_summary_graph(self): g = StateGraph(ChatState) g.add_node("summarize_node", self.summarize_conversation) g.add_edge(START, "summarize_node") g.add_edge("summarize_node", END) return g.compile(checkpointer=self.checkpointer) # ── Streaming ────────────────────────────────────────────────────────────── async def ai_only_stream( self, initial_state: dict, config: dict ) -> AsyncGenerator[str, None]: """ Async generator — yields AI text tokens as they arrive. FIX-BUG9: narrowed isinstance check to exclude ToolMessage content from being streamed to the user, and guards against non-str content (e.g. multimodal list payloads from Ollama tool-call chunks). """ async for chunk, _meta in self.graph.astream( initial_state, config=config, stream_mode="messages" ): # Only yield text content from AI messages. # Exclude ToolMessage (tool execution results) — they contain # raw JSON that should not be streamed directly to the user. if ( isinstance(chunk, (AIMessage, AIMessageChunk)) and not isinstance(chunk, ToolMessage) and isinstance(chunk.content, str) and chunk.content ): yield chunk.content # Auto-summarise in background when history grows long try: current = await self.graph.aget_state(config) if len(current.values.get("messages", [])) > 10: asyncio.create_task( self.summary_graph.ainvoke(current.values, config=config) ) print("@" * 20, "Summarisation triggered", "@" * 20) except Exception as exc: print(f"[Backend] Summarisation check failed: {exc}") # ── Thread management ────────────────────────────────────────────────────── @staticmethod def generate_thread_id() -> str: return str(uuid.uuid4()) async def retrieve_all_threads(self) -> list[str]: threads: set[str] = set() async for cp in self.checkpointer.alist(None): threads.add(cp.config["configurable"]["thread_id"]) return list(threads) # ── Public entry point ───────────────────────────────────────────────────── async def main(self, user_id: str, user_query: str) -> AsyncGenerator[str, None]: """Return an async generator of AI text tokens.""" async with self.conn.execute( "SELECT threadId FROM userid_threadid WHERE userId = ?", (user_id,) ) as cursor: row = await cursor.fetchone() if row is None: thread_id = user_id + self.generate_thread_id() await self.conn.execute( "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)", (user_id, thread_id), ) await self.conn.commit() else: thread_id = row[0] initial_state = {"messages": [HumanMessage(content=user_query)]} config = { "configurable": {"thread_id": thread_id}, "metadata": {"thread_id": thread_id}, "run_name": "chat_turn", } return self.ai_only_stream(initial_state, config)