customeragent-api / server /tests /simulate_real_conversation.py
anasraza526's picture
Clean deploy to Hugging Face
ac90985
import asyncio
import logging
from typing import List, Dict
from app.services.medical_orchestrator import get_medical_orchestrator
from app.services.context_manager import EntryContext
# Quiet logs
logging.basicConfig(level=logging.ERROR)
async def simulate_conversation():
print("\n" + "="*60)
print("🗣️ REAL USER CONVERSATION SIMULATION")
print("="*60)
orchestrator = get_medical_orchestrator()
ctx = EntryContext(tenant_id="hospital_xyz", user_id="real_patient_01")
# Simulating a session history
history: List[Dict] = []
conversation_flow = [
"Hi, I haven't been feeling well today.",
"I have a throbbing headache and I feel a bit nauseous.",
"It started about 4 hours ago.",
"Is it safe to take ibuprofen for this?"
]
print(f"Patient ID: {ctx.user_id}")
print(f"Context: {ctx.tenant_id} (Healthcare)\n")
for user_msg in conversation_flow:
print(f"👤 User: {user_msg}")
# Process the query
response, conf, is_risk = await orchestrator.process_query(user_msg, ctx, session_history=history)
print(f"🤖 Bot: {response}")
print(f" [Internal: Conf={conf:.2f}, Risk={is_risk}]")
print("-" * 40)
# Update history (Mocking what the API layer would do)
history.append({"role": "user", "content": user_msg})
history.append({"role": "assistant", "content": response})
print("\n" + "="*60)
print("✅ SIMULATION COMPLETE")
print("="*60)
if __name__ == "__main__":
asyncio.run(simulate_conversation())