from __future__ import annotations import asyncio import json import os import uuid import aiosqlite import pytz from datetime import datetime, timedelta from dotenv import load_dotenv import re from langchain_core.messages import ( AIMessage, AIMessageChunk, HumanMessage, RemoveMessage, SystemMessage, ToolMessage, ) from langchain_core.tools import tool from langchain_google_genai import ChatGoogleGenerativeAI from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.graph import END, START, StateGraph from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition from twilio.rest import Client from typing import Annotated, TypedDict, Optional, AsyncGenerator from email.message import EmailMessage from dateparser.search import search_dates from langchain_ollama import ChatOllama load_dotenv() # ═══════════════════════════════════════════════════════════════════════════════ # STATE # ═══════════════════════════════════════════════════════════════════════════════ class ChatState(TypedDict): messages: Annotated[list, add_messages] summary: str # ═══════════════════════════════════════════════════════════════════════════════ # HELPERS # ═══════════════════════════════════════════════════════════════════════════════ def get_db_path() -> str: return os.path.join(os.path.dirname(__file__), "daa.db") def format_bd_number(num: str) -> str: num = _normalize_digits(num).replace(" ", "") if num.startswith("01") and len(num) == 11: return "+88" + num if num.startswith("8801"): return "+" + num return num def _clean_text(text: str) -> str: return re.sub(r"\s+", " ", (text or "").strip()) _DIGIT_TRANSLATION = str.maketrans({ "০": "0", "১": "1", "২": "2", "৩": "3", "৪": "4", "৫": "5", "৬": "6", "৭": "7", "৮": "8", "৯": "9", "٠": "0", "١": "1", "٢": "2", "٣": "3", "٤": "4", "٥": "5", "٦": "6", "٧": "7", "٨": "8", "٩": "9", }) def _normalize_digits(text: str) -> str: return _clean_text(text).translate(_DIGIT_TRANSLATION) DAY_ALIASES = { "sunday": "Sunday", "monday": "Monday", "tuesday": "Tuesday", "wednesday": "Wednesday", "thursday": "Thursday", "friday": "Friday", "saturday": "Saturday", "রবিবার": "Sunday", "সোমবার": "Monday", "মঙ্গলবার": "Tuesday", "বুধবার": "Wednesday", "বৃহস্পতিবার": "Thursday", "শুক্রবার": "Friday", "শনিবার": "Saturday", } BOOKING_CONFIRM_WORDS = ( "জি", "ঠিক আছে", "ঠিক", "হ্যাঁ", "yes", "okay", "ok", "তথ্য ঠিক", "সব ঠিক", ) TOOL_INTENT_WORDS = ( "book", "booking", "appointment", "অ্যাপয়েন্ট", "অ্যাপয়েন্টমেন্ট", "বুক", "slot", "স্লট", "available", "availability", "ডাক্তার", "doctor", "দেখাতে", "কেনসেল", "cancel", "বাতিল", "delete", "খালি", "কোন ডাক্তার", ) SPECIALTY_ALIASES = { "চক্ষু": ["ophthalmologist", "eye specialist", "eye doctor", "ophthalmology", "eye"], "আই": ["ophthalmologist", "eye specialist", "eye doctor", "ophthalmology", "eye"], "চোখ": ["ophthalmologist", "eye specialist", "eye doctor", "ophthalmology", "eye"], "হৃদরোগ": ["cardiologist", "heart", "cardio", "cardiology"], "কার্ডিও": ["cardiologist", "heart", "cardio", "cardiology"], "মেডিসিন": ["medicine", "internal medicine", "physician", "general medicine"], "নিউরো": ["neurologist", "neurology", "brain"], "স্নায়ু": ["neurologist", "neurology", "brain"], "নাক": ["ent", "otolaryngologist", "ear nose throat"], "কান": ["ent", "otolaryngologist", "ear nose throat"], "গলা": ["ent", "otolaryngologist", "ear nose throat"], "চর্ম": ["dermatologist", "skin", "dermatology"], "স্কিন": ["dermatologist", "skin", "dermatology"], "ডেন্টাল": ["dentist", "dental", "teeth"], "দাঁত": ["dentist", "dental", "teeth"], "গাইনী": ["gynecologist", "gynaecologist", "obgyn", "women"], "মহিলা": ["gynecologist", "gynaecologist", "obgyn", "women"], "শিশু": ["pediatrician", "child", "children"], "পেডিয়াট্রিক": ["pediatrician", "child", "children"], "অর্থো": ["orthopedic", "orthopaedic", "bone"], "হাড়": ["orthopedic", "orthopaedic", "bone"], "বক্ষ": ["chest", "pulmonologist", "respiratory"], "শ্বাস": ["pulmonologist", "respiratory", "chest"], "কিডনি": ["nephrologist", "kidney", "renal"], "গ্যাস্ট্রো": ["gastroenterologist", "stomach", "digestive"], "পেট": ["gastroenterologist", "stomach", "digestive"], } def _normalize_day(term: str) -> str: raw = _clean_text(term) if not raw: return "" lower = raw.lower() return DAY_ALIASES.get(lower, DAY_ALIASES.get(raw, raw)) def _expand_search_terms(text: str) -> list[str]: """ Expand Bangla/Banglish doctor-search text into English-friendly terms. """ raw = _clean_text(text) if not raw: return [] terms: set[str] = {raw.lower()} raw_lower = raw.lower() for bangla_key, aliases in SPECIALTY_ALIASES.items(): if bangla_key in raw or bangla_key.lower() in raw_lower: terms.update(a.lower() for a in aliases) if raw_lower in DAY_ALIASES: terms.add(DAY_ALIASES[raw_lower].lower()) # Keep the individual tokens too, because users may mix Bangla and English. for token in re.split(r"[,\s/|]+", raw_lower): token = token.strip() if token: terms.add(token) return sorted(terms) def _parse_visit_date(text: str) -> Optional[str]: """ Parse a user-facing date into YYYY-MM-DD in Bangladesh time. Accepts ISO, English relative dates, and many natural-language variants. """ text = _clean_text(text) if not text: return None if re.fullmatch(r"\d{4}-\d{2}-\d{2}", text): return text tz = pytz.timezone("Asia/Dhaka") now = datetime.now(tz) lower = text.lower() if text in {"আজ", "today"}: return now.strftime("%Y-%m-%d") if text in {"আগামীকাল", "tomorrow"}: return (now + timedelta(days=1)).strftime("%Y-%m-%d") if text in {"পরশু", "day after tomorrow"}: return (now + timedelta(days=2)).strftime("%Y-%m-%d") day_name = _normalize_day(text) if day_name in DAY_ALIASES.values(): target_idx = [ "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday" ].index(day_name) current_idx = now.weekday() delta = (target_idx - current_idx) % 7 if delta == 0: delta = 7 return (now + timedelta(days=delta)).strftime("%Y-%m-%d") try: found = search_dates( text, settings={ "PREFER_DATES_FROM": "future", "TIMEZONE": "Asia/Dhaka", "RETURN_AS_TIMEZONE_AWARE": False, "RELATIVE_BASE": now.replace(tzinfo=None), }, ) if found: return found[0][1].strftime("%Y-%m-%d") except Exception: pass return None def _message_text(content) -> str: if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): if item.get("type") == "text": parts.append(str(item.get("text", ""))) elif "text" in item: parts.append(str(item.get("text", ""))) else: parts.append(str(item)) return _clean_text(" ".join(parts)) return _clean_text(str(content)) def _last_human_text(messages) -> str: for message in reversed(messages): if isinstance(message, HumanMessage): return _message_text(message.content) return "" def _previous_ai_text(messages) -> str: seen_human = False for message in reversed(messages): if isinstance(message, HumanMessage) and not seen_human: seen_human = True continue if seen_human and isinstance(message, AIMessage): return _message_text(message.content) return "" def _has_tool_calls(message: AIMessage) -> bool: tool_calls = getattr(message, "tool_calls", None) if tool_calls: return True additional_kwargs = getattr(message, "additional_kwargs", {}) or {} return bool(additional_kwargs.get("tool_calls")) def _looks_like_tool_turn(text: str) -> bool: lowered = _clean_text(text).lower() if not lowered: return False return any(token.lower() in lowered for token in TOOL_INTENT_WORDS) or any( token.lower() in lowered for token in BOOKING_CONFIRM_WORDS ) def send_sms(to_number: str, message: str) -> None: client = Client(os.getenv("TWILIO_ACCOUNT_SID"), os.getenv("TWILIO_AUTH_TOKEN")) client.messages.create( body=message, from_=os.getenv("TWILIO_PHONE_NUMBER"), to=to_number, ) async def send_mail(to_mail: str, subject: str, body: str): try: import aiosmtplib # type: ignore except Exception as exc: raise RuntimeError("Email sending is not configured (aiosmtplib missing).") from exc smtp_user = os.getenv("SMTP_USER", "rakib.hedigital@gmail.com").strip() smtp_pass = os.getenv("SMTP_PASSWORD", "").strip() if not smtp_pass: raise RuntimeError("Email sending is not configured (SMTP_PASSWORD missing).") email = EmailMessage() email["From"] = smtp_user email["To"] = to_mail email["Subject"] = subject # Plain-text fallback email.set_content(body) # Professional HTML version try: html = _format_email_html(subject=subject, body_text=body) email.add_alternative(html, subtype="html") except Exception: pass await aiosmtplib.send( email, hostname="smtp.gmail.com", port=465, username=smtp_user, password=smtp_pass, use_tls=True, ) def _format_email_html(subject: str, body_text: str) -> str: """ Render a simple, professional HTML email. Input body_text should be plain text with newlines. """ safe = ( (body_text or "") .replace("&", "&") .replace("<", "<") .replace(">", ">") ) safe = safe.replace("\n", "
") return f"""\
{subject}
Aasha • Hospital Assistant
{safe}
This is an automated message. If you did not request this, please ignore it.
""" def _format_appt_email_text( action: str, doctor_name: str, patient_name: str, patient_num: str, visiting_date: str, visiting_day: str, visiting_time: str, extra: str = "", ) -> str: action_line = { "booked": "✅ Appointment Confirmed", "updated": "✅ Appointment Updated", "cancelled": "✅ Appointment Cancelled", }.get(action, "✅ Appointment Update") lines = [ action_line, "", f"Doctor : {doctor_name}", f"Patient : {patient_name}", f"Contact : {patient_num}", f"Visit Date : {visiting_date}", f"Visit Day : {visiting_day}", f"Visit Time : {visiting_time}", ] if extra: lines.extend(["", extra.strip()]) lines.extend(["", "Thank you.", "Aasha • Hospital Assistant"]) return "\n".join(lines) # ═══════════════════════════════════════════════════════════════════════════════ # TOOLS # ═══════════════════════════════════════════════════════════════════════════════ @tool def get_bd_time() -> str: """Get current Bangladesh date and time along with the next 14 days.""" # Bangladesh timezone tz = pytz.timezone("Asia/Dhaka") # Current datetime now = datetime.now(tz) # Create result dictionary result = { "CURRENT_DATETIME": now.strftime("%Y-%m-%d %H:%M:%S %Z"), "TODAY": now.strftime("%A, %B %d, %Y"), "TOMORROW": (now + timedelta(days=1)).strftime("%A, %B %d, %Y"), "NEXT_14_DAYS": {} } # Generate next 14 days for i in range(1, 15): future_date = now + timedelta(days=i) result["NEXT_14_DAYS"][f"+{i}"] = future_date.strftime("%A, %B %d, %Y") return json.dumps(result) @tool async def get_categories_by_day(visiting_day: str = "") -> str: """ Fetch unique doctor categories. If visiting_day is provided → filter by that day If empty → return all categories """ db_path = get_db_path() query = """ SELECT DISTINCT category FROM doctors WHERE category IS NOT NULL AND TRIM(category) != '' """ params = [] # Optional filter if visiting_day: visiting_day = _normalize_day(visiting_day) query += " AND LOWER(visiting_days) LIKE ?" params.append(f"%{visiting_day.lower()}%") query += " ORDER BY category ASC" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, params) rows = await cursor.fetchall() categories = [row["category"] for row in rows] if not categories: return json.dumps({ "success": False, "message": "No categories found.", "data": [] }, ensure_ascii=False) return json.dumps({ "success": True, "visiting_day": visiting_day if visiting_day else "ALL", "count": len(categories), "data": categories }, ensure_ascii=False) @tool async def get_doctors_by_day(visiting_day: str = "") -> str: """ Get doctors by visiting day. If visiting_day is provided → filter by that day If empty → return all doctors Example: - "Sunday" - "Monday" - "" """ db_path = get_db_path() query = """ SELECT * FROM doctors WHERE 1=1 """ params = [] # Optional filter if visiting_day: visiting_day = _normalize_day(visiting_day) query += " AND LOWER(visiting_days) LIKE ?" params.append(f"%{visiting_day.lower()}%") async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, params) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": f"No doctors found for {visiting_day if visiting_day else 'ALL days'}.", "data": [] }, ensure_ascii=False) doctors = [dict(row) for row in rows] return json.dumps({ "success": True, "visiting_day": visiting_day if visiting_day else "ALL", "count": len(doctors), "data": doctors }, ensure_ascii=False) @tool async def find_doctors(query: str = "", visiting_day: str = "") -> str: """ Flexible doctor search for Bangla, Banglish, or English queries. Use this for questions like: - "চক্ষু ডাক্তার" - "আজ কোন cardiologist আছে?" - "মঙ্গলবার available pediatric doctor" """ db_path = get_db_path() query_text = _clean_text(query) day_text = _normalize_day(visiting_day) terms = _expand_search_terms(query_text) sql = "SELECT * FROM doctors WHERE 1=1" params: list[str] = [] conditions: list[str] = [] if day_text: conditions.append("LOWER(visiting_days) LIKE ?") params.append(f"%{day_text.lower()}%") if terms: term_clauses = [] for term in terms: term_clauses.append("(LOWER(doctor_name) LIKE ? OR LOWER(category) LIKE ? OR LOWER(visiting_days) LIKE ?)") params.extend([f"%{term}%", f"%{term}%", f"%{term}%"]) conditions.append("(" + " OR ".join(term_clauses) + ")") if conditions: sql += " AND " + " AND ".join(conditions) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(sql, params) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": "No doctors found.", "query": query_text, "visiting_day": day_text or "ALL", "data": [], }, ensure_ascii=False) doctors = [dict(row) for row in rows] return json.dumps({ "success": True, "count": len(doctors), "query": query_text, "visiting_day": day_text or "ALL", "data": doctors, }, ensure_ascii=False) @tool async def search_doctor( name: str = "", category: str = "", visiting_days: str = "", ) -> str: """ Search doctors by name, category, or visiting_days from the database. Any combination of filters is supported (OR logic across fields). """ db_path = get_db_path() name = _clean_text(name) category = _clean_text(category) visiting_days = _clean_text(visiting_days) name_terms = _expand_search_terms(name) category_terms = _expand_search_terms(category) day_text = _normalize_day(visiting_days) if visiting_days else "" query = "SELECT * FROM doctors WHERE 1=1" params: list = [] conditions: list[str] = [] if name_terms: name_clauses = [] for term in name_terms: name_clauses.append("LOWER(doctor_name) LIKE ?") params.append(f"%{term}%") conditions.append("(" + " OR ".join(name_clauses) + ")") if category_terms: category_clauses = [] for term in category_terms: category_clauses.append("LOWER(category) LIKE ?") params.append(f"%{term}%") conditions.append("(" + " OR ".join(category_clauses) + ")") if day_text: conditions.append("LOWER(visiting_days) LIKE ?") params.append(f"%{day_text.lower()}%") if conditions: query += " AND (" + " OR ".join(conditions) + ")" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, params) rows = await cursor.fetchall() if not rows: return json.dumps({"success": False, "message": "No doctors found.", "data": []}, ensure_ascii=False) return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]}, ensure_ascii=False) @tool async def search_appointment_by_phone(patient_num: str) -> str: """Search all appointments using the patient's phone number.""" db_path = get_db_path() patient_num = format_bd_number(patient_num) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM patients WHERE patient_num = ? ORDER BY visiting_date ASC", (patient_num,), ) rows = await cursor.fetchall() if not rows: return json.dumps({ "success": False, "message": "No appointments found for this phone number.", "data": [], }) return json.dumps({"success": True, "count": len(rows), "data": [dict(r) for r in rows]}) @tool async def book_appointment( doctor_id: int = 0, doctor_name: str = "", category: str = "", patient_name: str = "", patient_age: str = "", patient_num: str = "", visiting_date: str = "", visiting_day: str = "", visiting_time: str = "", patient_mail: str = "" ) -> str: """ Book a doctor appointment and save it to the patients table. Args: doctor_id: Doctor's ID from search_doctor results (preferred). doctor_name: Doctor name if doctor_id is not available. category: Optional doctor category if doctor_id is not available. patient_name: Full name of the patient. patient_age: Age of the patient (e.g. "32"). patient_num: Contact phone number of the patient. visiting_date: Date of visit in YYYY-MM-DD format or natural text (optional if visiting_day is provided). visiting_day: Day of visit (e.g. "Sunday", "রবিবার", "today") — required if visiting_date is not provided. visiting_time: Time of visit (e.g. "6pm-9pm") — required (can be auto-filled from doctor record if missing). patient_mail: Required email address for confirmation mail. """ db_path = get_db_path() patient_num = format_bd_number(patient_num) patient_name = _clean_text(patient_name) patient_age = _clean_text(patient_age) doctor_name = _clean_text(doctor_name) category = _clean_text(category) visiting_date = _clean_text(visiting_date) visiting_day = _clean_text(visiting_day) visiting_time = _clean_text(visiting_time) patient_mail = _clean_text(patient_mail) if visiting_date: parsed_date = _parse_visit_date(visiting_date) if parsed_date: visiting_date = parsed_date elif visiting_day: parsed_date = _parse_visit_date(visiting_day) if parsed_date: visiting_date = parsed_date # Mandatory fields if not patient_name: return "Missing booking details. Need patient name." if not patient_age: return "Missing booking details. Need patient age." if not patient_num: return "Missing booking details. Need patient phone number." if not (doctor_id or doctor_name): return "Missing booking details. Need doctor name." if not visiting_date: return "Missing booking details. Need day/date to visit the doctor." if not patient_mail: return "Missing booking details. Need email address for confirmation." async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row doctor = None if doctor_id: cursor = await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,)) doctor = await cursor.fetchone() if doctor is None and doctor_name: cursor = await db.execute( "SELECT * FROM doctors WHERE LOWER(doctor_name) = LOWER(?)", (doctor_name,), ) doctor = await cursor.fetchone() if doctor is None and category: cursor = await db.execute( "SELECT * FROM doctors WHERE LOWER(category) LIKE ? ORDER BY id LIMIT 1", (f"%{category.lower()}%",), ) doctor = await cursor.fetchone() if not doctor: return ( "No doctor found. Please search first and provide either " "doctor_id, doctor_name, or category." ) doctor_data = dict(doctor) doctor_name = doctor_data.get("doctor_name", "Unknown") doctor_category = doctor_data.get("category", "Unknown") doctor_visiting_days = doctor_data.get("visiting_days", "") or "" doctor_visiting_time = doctor_data.get("visiting_time", "") or "" # Auto-fill visiting_time from doctor record if caller didn't provide it if not visiting_time: visiting_time = doctor_visiting_time.strip() if not visiting_time: return "Missing booking details. Need time to visit the doctor." # Keep visiting_day if provided; otherwise derive from date (English day) if not visiting_day and visiting_date: try: import datetime as _dt y, m, d = [int(x) for x in visiting_date.split("-")] visiting_day = _dt.date(y, m, d).strftime("%A") except Exception: visiting_day = "" cursor = await db.execute( """SELECT id FROM patients WHERE doctor_name = ? AND visiting_date = ? AND patient_num = ?""", (doctor_name, visiting_date, patient_num), ) if await cursor.fetchone(): return ( f"A booking for {patient_name} with Dr. {doctor_name} " f"on {visiting_date} already exists." ) # Create booking await db.execute( """INSERT INTO patients (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, visiting_day, visiting_time, patient_mail) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (doctor_name, doctor_category, patient_name, patient_age, patient_num, visiting_date, visiting_day, visiting_time, patient_mail), ) await db.commit() # Mail confirmation is mandatory. mail_message = ( f"Doctor : {doctor_name}\n" f"Patient : {patient_name}\n" f"Visit Date : {visiting_date}\n" f"Visit Day : {visiting_day}\n" f"Visit Time : {visiting_time}\n" f"Please arrive on time." ) try: await send_mail( to_mail=patient_mail, subject="✅ Appointment Confirmed!", body=mail_message, ) mail_status = "\n📧 Confirmation mail sent." except Exception as e: mail_status = f"\n⚠️ Mail failed: {str(e)}" return ( f"✅ Appointment Booked!\n" f"━━━━━━━━━━━━━━━━━━━━━━\n" f"Doctor : {doctor_name}\n" f"Patient : {patient_name}\n" f"Age : {patient_age}\n" f"Date : {visiting_date}\n" f"Day : {visiting_day}\n" f"Time : {visiting_time}\n" f"Contact : {patient_num}\n" f"Email : {patient_mail}\n" f"━━━━━━━━━━━━━━━━━━━━━━\n" f"Please arrive on time." f"{mail_status}" ) @tool async def update_appointment( patient_num: str, doctor_name: str = "", doctor_id: int = 0, new_visiting_date: str = "", new_doctor_name: str = "", new_patient_num: str = "", new_patient_mail: str = "", ) -> str: """ Update an existing appointment found by phone number. You can update: - visit date/day - doctor (by name or id) - phone number - email Rules: - patient_num is required for lookup. - If multiple appointments exist for the phone number, provide doctor_name (or doctor_id) to select which one to update. - A confirmation email is REQUIRED for updates: either the existing appointment has an email, or provide new_patient_mail. """ db_path = get_db_path() patient_num_norm = format_bd_number(patient_num) selector_name = _clean_text(doctor_name) new_doctor_name = _clean_text(new_doctor_name) new_patient_num = format_bd_number(new_patient_num) if new_patient_num else "" new_patient_mail = _clean_text(new_patient_mail) new_visiting_date = _clean_text(new_visiting_date) parsed_date = _parse_visit_date(new_visiting_date) if new_visiting_date else None if parsed_date: new_visiting_date = parsed_date if not patient_num_norm: return "Missing details. Need patient phone number." if not any([new_visiting_date, new_doctor_name, new_patient_num, new_patient_mail, doctor_id]): return "Nothing to update. Provide new date, doctor, phone, or email." async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row # Find matching appointments params = [patient_num_norm] q = "SELECT * FROM patients WHERE patient_num = ?" if selector_name: q += " AND LOWER(doctor_name) = LOWER(?)" params.append(selector_name) rows = await (await db.execute(q, params)).fetchall() if not rows: return "No appointment found for this phone number." if len(rows) > 1 and not selector_name: return ( "Multiple appointments found for this phone number. " "Please specify the doctor name to update." ) appt = dict(rows[0]) # Resolve new doctor if requested resolved_doctor = None if doctor_id: d = await (await db.execute("SELECT * FROM doctors WHERE id = ?", (doctor_id,))).fetchone() if d: resolved_doctor = dict(d) if resolved_doctor is None and new_doctor_name: d = await (await db.execute( "SELECT * FROM doctors WHERE LOWER(doctor_name) = LOWER(?)", (new_doctor_name,), )).fetchone() if d: resolved_doctor = dict(d) else: return f"No doctor found with name '{new_doctor_name}'." # Build updated fields updated_doctor_name = appt.get("doctor_name", "") updated_doctor_category = appt.get("doctor_category", "") updated_visiting_time = appt.get("visiting_time", "") updated_visiting_day = appt.get("visiting_day", "") updated_visiting_date = appt.get("visiting_date", "") updated_patient_num = appt.get("patient_num", "") updated_patient_mail = appt.get("patient_mail", "") if resolved_doctor: updated_doctor_name = resolved_doctor.get("doctor_name", updated_doctor_name) updated_doctor_category = resolved_doctor.get("category", updated_doctor_category) updated_visiting_time = (resolved_doctor.get("visiting_time") or updated_visiting_time).strip() updated_visiting_day = (resolved_doctor.get("visiting_days") or updated_visiting_day).strip() if new_visiting_date: updated_visiting_date = new_visiting_date # Derive English day name from date for consistency try: import datetime as _dt y, m, d = [int(x) for x in updated_visiting_date.split("-")] updated_visiting_day = _dt.date(y, m, d).strftime("%A") except Exception: pass if new_patient_num: updated_patient_num = new_patient_num if new_patient_mail: updated_patient_mail = new_patient_mail if not updated_patient_mail: return "Email is required to update an appointment. Please provide an email address." await db.execute( """UPDATE patients SET doctor_name = ?, doctor_category = ?, patient_num = ?, visiting_date = ?, visiting_day = ?, visiting_time = ?, patient_mail = ? WHERE id = ?""", ( updated_doctor_name, updated_doctor_category, updated_patient_num, updated_visiting_date, updated_visiting_day, updated_visiting_time, updated_patient_mail, appt["id"], ), ) await db.commit() # Send confirmation email patient_name = appt.get("patient_name", "Patient") email_text = _format_appt_email_text( action="updated", doctor_name=updated_doctor_name, patient_name=patient_name, patient_num=updated_patient_num, visiting_date=updated_visiting_date, visiting_day=updated_visiting_day, visiting_time=updated_visiting_time, extra="Your appointment details have been updated successfully.", ) try: await send_mail( to_mail=updated_patient_mail, subject="Appointment Updated", body=email_text, ) mail_status = "📧 Confirmation mail sent." except Exception as e: mail_status = f"⚠️ Mail failed: {str(e)}" return ( "✅ Appointment Updated!\n" "━━━━━━━━━━━━━━━━━━━━━━\n" f"Doctor : {updated_doctor_name}\n" f"Patient : {patient_name}\n" f"Date : {updated_visiting_date}\n" f"Day : {updated_visiting_day}\n" f"Time : {updated_visiting_time}\n" f"Contact : {updated_patient_num}\n" f"Email : {updated_patient_mail}\n" "━━━━━━━━━━━━━━━━━━━━━━\n" f"{mail_status}" ) @tool async def delete_appointment( patient_num: str, doctor_name: str = "", doctor_id: int = 0, patient_mail: str = "", ) -> str: """ Cancel (delete) an appointment. Sends a confirmation email to the patient (required). """ db_path = get_db_path() patient_num = format_bd_number(patient_num) doctor_name = _clean_text(doctor_name) patient_mail = _clean_text(patient_mail) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row appt_row = None if not doctor_name and doctor_id: cursor = await db.execute("SELECT doctor_name FROM doctors WHERE id = ?", (doctor_id,)) row = await cursor.fetchone() if row: doctor_name = row["doctor_name"] if doctor_name: cursor = await db.execute( """SELECT * FROM patients WHERE patient_num = ? AND LOWER(doctor_name) = LOWER(?)""", (patient_num, doctor_name), ) appt_row = await cursor.fetchone() if not appt_row: return json.dumps({"success": False, "message": "No matching appointment found."}) else: cursor = await db.execute( """SELECT * FROM patients WHERE patient_num = ? ORDER BY visiting_date ASC, id ASC""", (patient_num,), ) rows = await cursor.fetchall() if not rows: return json.dumps({"success": False, "message": "No matching appointment found."}) if len(rows) > 1: return json.dumps({ "success": False, "message": "Multiple appointments found. Please specify the doctor name to cancel.", "count": len(rows), "data": [dict(row) for row in rows], }, ensure_ascii=False) appt_row = rows[0] doctor_name = appt_row["doctor_name"] or doctor_name # Resolve email (required) appt = dict(appt_row) if appt_row is not None else {} appt_email = _clean_text(appt.get("patient_mail", "")) or patient_mail if not appt_email: return json.dumps({ "success": False, "message": "Email is required to cancel an appointment. Please provide the email address.", }, ensure_ascii=False) # Delete after we have all details for mail await db.execute("DELETE FROM patients WHERE id = ?", (appt["id"],)) await db.commit() # Send cancellation email email_text = _format_appt_email_text( action="cancelled", doctor_name=appt.get("doctor_name", doctor_name), patient_name=appt.get("patient_name", "Patient"), patient_num=appt.get("patient_num", patient_num), visiting_date=appt.get("visiting_date", ""), visiting_day=appt.get("visiting_day", ""), visiting_time=appt.get("visiting_time", ""), extra="Your appointment has been cancelled successfully.", ) try: await send_mail( to_mail=appt_email, subject="Appointment Cancelled", body=email_text, ) mail_status = "Confirmation mail sent." except Exception as e: mail_status = f"Mail failed: {str(e)}" return json.dumps({ "success": True, "message": f"Appointment with Dr. {doctor_name} cancelled successfully. {mail_status}", }, ensure_ascii=False) # ═══════════════════════════════════════════════════════════════════════════════ # SYSTEM PROMPT # ═══════════════════════════════════════════════════════════════════════════════ BASE_SYSTEM = """ You are Aasha, a warm, Bangla-first hospital phone-call assistant and medical appointment concierge. Your job is to help people find doctors, check availability, and manage appointments. PERSONA (Voice & Vibe) - Sound like a professional, polite, and friendly Bangla female call-support assistant, like a real appointment booking or customer service executive. - Maintain a calm, warm, and naturally cheerful tone — smooth, confident, and service-oriented. - Speak like a native Bangla speaker in a phone conversation, using natural, human-like phrasing (not robotic or overly scripted). - Keep a soft “jolly” positivity, but controlled and professional — suitable for customer support, booking, and service interactions. - Be clear, structured, and helpful when giving information, confirming details, or handling requests. - Stay patient, respectful, and reassuring when the user is confused, frustrated, or stressed. - Show polite enthusiasm when confirming bookings, completing tasks, or successfully helping the user. - Use simple, conversational Bangla with a natural flow, like real call-center communication. - Keep responses short, clear, and easy to understand, as in real phone support conversations. - Avoid slang, exaggeration, or overly social-media-style expressions. - Avoid flirting, romance, sexual behavior, or emotionally manipulative language. - Never claim to be human; clearly remain an AI assistant. - Always prioritize clarity, professionalism, helpfulness, and a calm positive tone suitable for real customer support and appointment handling. CORE BEHAVIOR: - Speak friendly, cheerful, well-behaved young female, naturally, politely, and engagingly (short sentences, warm tone). - Default to Bangla when the user speaks Bangla or Banglish. - Keep replies short, helpful, and one step at a time (avoid big paragraphs). - Use gentle acknowledgements: e.g., “বুঝতে পেরেছি”, “চিন্তা করবেন না”, “আমি আছি”. - Ask 1 clear question at a time; confirm important details before actions. - If the database fields are English, translate the user's Bangla intent into English before calling tools. - Never answer doctor availability or booking questions from memory when a tool can verify it. STRICT SAFETY: - You are NOT a doctor. - Never diagnose diseases. - Never recommend medicines or treatments. - If the user asks medical/health advice, politely redirect to a doctor and offer appointment help. APPOINTMENT FLOW: 1. Understand the user's intent. 2. Use tools to find the right doctor or appointment record. 3. Ask only for missing details. 4. Confirm important details before booking or deleting. TOOL RULES: - Use `find_doctors` first for doctor search, specialty search, and availability search. - Use `get_doctors_by_day` or `get_categories_by_day` when the user asks about a day directly. - Use `book_appointment` only after identifying the doctor and required patient details. - Use `update_appointment` when the user wants to change an existing appointment. - Never invent `doctor_id`. Get it from tool results or resolve by doctor_name/category. - If the user gives a Bangla date like "আগামীকাল" or "পরশু", convert it to a real date before booking. - Email is REQUIRED for booking and must be used to send a confirmation mail. - If the user already provided name, age, phone, and date and then confirms, call `book_appointment` immediately. - If the user asks to cancel and only gives a phone number, cancel the single matching appointment if there is exactly one. LANGUAGE RULE: - Respond in the user’s language. - If the user uses Bangla, reply in clear Bangla. - If the user uses Banglish, reply in Bangla unless they clearly prefer English. - Always generate numbers in english - Time and dates should be written in spoken Bangla style when applicable, for example: [দশটা ২৮ মিনিট, চারটা বেজে তিরিশ মিনিট, দশটা ১২ বাজে, এখন টাইম হচ্ছে সাতটা তিরিশ] DATA RULE: - Doctor names, categories, and days in the database are English. - Bangla terms such as চক্ষু/কার্ডিও/শিশু/চর্ম must be translated to English search terms before tool calls. RESPONSE STYLE: - Be concise. - Be reassuring. - Be jolly and encouraging, but not over-the-top. - Ask one clear question when more information is needed. """ SUMMARY_SYSTEM = ( BASE_SYSTEM + "\nYou also have a condensed memory of previous conversations:\n\n" "{summary}\n\n" "Use this memory for continuity. Do not repeat it unless asked." ) FORCED_TOOL_SYSTEM = """ The previous assistant turn failed to use a tool even though the user intent is clear. You must now choose the correct tool instead of answering in prose: - Use `find_doctors` or `search_doctor` for doctor/specialty/availability questions. - Use `get_doctors_by_day` or `get_categories_by_day` for day-based availability. - Use `book_appointment` when the user is confirming a booking. - Use `update_appointment` when the user wants to update an appointment. - Use `delete_appointment` when the user is cancelling a booking. Important booking rules: - Email is REQUIRED. Do not book without an email address. - Visiting time is REQUIRED. If the doctor record has a visiting_time, use it and confirm it with the user. - If the user already gave name, age, phone, doctor name, visit day/date, visit time, and email, do not ask again. - If the user has already confirmed the details, book immediately. Important cancellation rules: - If the user gave only a phone number and there is exactly one matching appointment, cancel it directly. - If multiple appointments match, ask only for the doctor name. - Email is REQUIRED to cancel or update. If missing, ask for email. Do not give a normal conversational answer before the tool call. """ # ═══════════════════════════════════════════════════════════════════════════════ # AGENT # ═══════════════════════════════════════════════════════════════════════════════ class AIBackend: # ── FIX-BUG1: was `_init_` (single underscores) — never called by Python def __init__(self, use_gemini: bool = False, use_ollama: bool = True, use_fallback: bool = False): self.use_gemini = use_gemini self.use_ollama = use_ollama self.use_fallback = use_fallback os.environ.setdefault("LANGCHAIN_PROJECT", "Doctor Appointment Automation") if use_gemini: self.llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash", temperature=0.01, ) elif use_ollama: self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.01) else: # Local fallback — extend as needed self.llm = ChatOllama(model="gemma4:e4b", streaming=True, temperature=0.01) self.tools = [ find_doctors, search_doctor, book_appointment, get_bd_time, search_appointment_by_phone, update_appointment, delete_appointment, get_categories_by_day, get_doctors_by_day ] self.tool_node = ToolNode(self.tools) self.llm_with_tools = self.llm.bind_tools(self.tools) # ── Setup ────────────────────────────────────────────────────────────────── async def async_setup(self) -> None: db_path = get_db_path() self.conn = await aiosqlite.connect(db_path) self.checkpointer = AsyncSqliteSaver(self.conn) await self._create_tables() self.graph = self._build_graph() self.summary_graph = self._build_summary_graph() print("[Backend] AIBackend ready ✓") async def _create_tables(self) -> None: await self.conn.execute(""" CREATE TABLE IF NOT EXISTS userid_threadid ( userId TEXT UNIQUE NOT NULL, threadId TEXT UNIQUE NOT NULL ) """) await self.conn.execute(""" CREATE TABLE IF NOT EXISTS doctors ( id INTEGER PRIMARY KEY AUTOINCREMENT, doctor_name TEXT, category TEXT, visiting_days TEXT, visiting_time TEXT, visiting_money INTEGER ) """) await self.conn.execute(""" CREATE TABLE IF NOT EXISTS patients ( id INTEGER PRIMARY KEY AUTOINCREMENT, doctor_name TEXT, doctor_category TEXT, patient_name TEXT, patient_age TEXT, patient_num TEXT, visiting_date TEXT, visiting_day TEXT, visiting_time TEXT, patient_mail TEXT ) """) await self.conn.commit() # Lightweight migrations for older DBs async def _ensure_column(table: str, col: str, col_type: str) -> None: async with self.conn.execute(f"PRAGMA table_info({table})") as cur: rows = await cur.fetchall() existing = {r[1] for r in rows} # (cid,name,type,notnull,dflt,pk) if col in existing: return await self.conn.execute(f"ALTER TABLE {table} ADD COLUMN {col} {col_type}") await self.conn.commit() await _ensure_column("patients", "visiting_day", "TEXT") await _ensure_column("patients", "visiting_time", "TEXT") # ── Summarise node ───────────────────────────────────────────────────────── async def summarize_conversation(self, state: ChatState): existing = state.get("summary", "") messages = state["messages"] if existing: prompt = ( f"Existing summary:\n{existing}\n\n" "Update the summary with the new messages above. " "Keep it concise, bullet-pointed, and information-dense. " "Preserve unresolved issues and ongoing tasks." ) else: prompt = ( "Summarise this conversation. " "Capture goals, decisions, preferences, and unresolved questions. " "Be concise and use bullet points." ) response = await self.llm.ainvoke(messages + [HumanMessage(content=prompt)]) return { "summary": response.content, "messages": [RemoveMessage(id=m.id) for m in messages[:-2]], } def _should_retry_tool_call(self, state: ChatState, response: AIMessage) -> bool: if _has_tool_calls(response): return False messages = state["messages"] latest_user = _last_human_text(messages) previous_ai = _previous_ai_text(messages) if not _looks_like_tool_turn(latest_user): return False previous_ai_lower = previous_ai.lower() booking_clues = ( "name", "patient", "age", "phone", "email", "নাম", "বয়স", "ফোন", "ইমেইল", ) cancellation_clues = ( "cancel", "বাতিল", "delete", "cancel করার", "কেনসেল", "appointment", "অ্যাপয়েন্ট", ) if any(clue in latest_user.lower() for clue in booking_clues): return True if any(clue in previous_ai_lower for clue in booking_clues): return True if any(clue in latest_user.lower() for clue in cancellation_clues): return True if any(clue in previous_ai_lower for clue in cancellation_clues): return True return True # ── Chat node ────────────────────────────────────────────────────────────── async def chat_node(self, state: ChatState): """ Invokes the LLM with tool bindings and returns the AI response. Uses ainvoke() (not collect-all-then-return astream()) so the call is clean and deterministic. Token-level streaming is handled by LangGraph itself via stream_mode="messages" in ai_only_stream(), which intercepts the underlying LLM streaming at the graph level. """ summary = state.get("summary", "") messages = state["messages"] print("#" * 50) print(">>>>>>>>>> CHAT NODE START <<<<<<<<<<") print(f"[SUMMARY]: {summary[:120] if summary else 'None'}") for m in messages: print(f" [{m.__class__.__name__}]: {str(m.content)[:160]}") print("#" * 50) sys_content = SUMMARY_SYSTEM.format(summary=summary) if summary else BASE_SYSTEM full_messages = [SystemMessage(content=sys_content)] + list(messages) response = await self.llm_with_tools.ainvoke(full_messages) if self._should_retry_tool_call(state, response): retry_messages = full_messages + [ AIMessage(content=_message_text(response.content)), SystemMessage(content=FORCED_TOOL_SYSTEM), ] retry_response = await self.llm_with_tools.ainvoke(retry_messages) if _has_tool_calls(retry_response): response = retry_response print(f"[AI]: {str(response.content)[:200]}") print(">>>>>>>>>> CHAT NODE END <<<<<<<<<<") return {"messages": [response]} # ── Graph ────────────────────────────────────────────────────────────────── def _build_graph(self): g = StateGraph(ChatState) g.add_node("chat_node", self.chat_node) g.add_node("tools", self.tool_node) g.add_edge(START, "chat_node") g.add_conditional_edges("chat_node", tools_condition) g.add_edge("tools", "chat_node") return g.compile(checkpointer=self.checkpointer) def _build_summary_graph(self): g = StateGraph(ChatState) g.add_node("summarize_node", self.summarize_conversation) g.add_edge(START, "summarize_node") g.add_edge("summarize_node", END) return g.compile(checkpointer=self.checkpointer) # ── Streaming ────────────────────────────────────────────────────────────── async def ai_only_stream( self, initial_state: dict, config: dict ) -> AsyncGenerator[str, None]: """ Async generator — yields AI text tokens as they arrive. FIX-BUG9: narrowed isinstance check to exclude ToolMessage content from being streamed to the user, and guards against non-str content (e.g. multimodal list payloads from Ollama tool-call chunks). """ async for chunk, _meta in self.graph.astream( initial_state, config=config, stream_mode="messages" ): # Only yield text content from AI messages. # Exclude ToolMessage (tool execution results) — they contain # raw JSON that should not be streamed directly to the user. if ( isinstance(chunk, (AIMessage, AIMessageChunk)) and not isinstance(chunk, ToolMessage) and isinstance(chunk.content, str) and chunk.content ): yield chunk.content # Auto-summarise in background when history grows long try: current = await self.graph.aget_state(config) if len(current.values.get("messages", [])) > 10: asyncio.create_task( self.summary_graph.ainvoke(current.values, config=config) ) print("@" * 20, "Summarisation triggered", "@" * 20) except Exception as exc: print(f"[Backend] Summarisation check failed: {exc}") # ── Thread management ────────────────────────────────────────────────────── @staticmethod def generate_thread_id() -> str: return str(uuid.uuid4()) async def retrieve_all_threads(self) -> list[str]: threads: set[str] = set() async for cp in self.checkpointer.alist(None): threads.add(cp.config["configurable"]["thread_id"]) return list(threads) async def ensure_user_thread(self, user_id: str) -> str: """Create a DB-backed thread for a user if it does not already exist.""" user_id = _clean_text(user_id)[:64] if not user_id: raise ValueError("user_id is required") async with self.conn.execute( "SELECT threadId FROM userid_threadid WHERE userId = ?", (user_id,), ) as cursor: row = await cursor.fetchone() if row is not None: return row[0] thread_id = user_id + self.generate_thread_id() await self.conn.execute( "INSERT INTO userid_threadid (userId, threadId) VALUES (?, ?)", (user_id, thread_id), ) await self.conn.commit() return thread_id # ── Public entry point ───────────────────────────────────────────────────── async def main(self, user_id: str, user_query: str) -> AsyncGenerator[str, None]: """Return an async generator of AI text tokens.""" thread_id = await self.ensure_user_thread(user_id) initial_state = {"messages": [HumanMessage(content=user_query)]} config = { "configurable": {"thread_id": thread_id}, "metadata": {"thread_id": thread_id}, "run_name": "chat_turn", } return self.ai_only_stream(initial_state, config)