| |
| """ |
| MediAgent - Autonomous Multi-Agent Medical Imaging Analysis System |
| Production FastAPI Server & Orchestrator Entry Point |
| |
| New in v2.0: |
| - DICOM (.dcm) file support with automatic metadata extraction |
| - Real-time token-level streaming during report generation |
| - AMD GPU metrics endpoint (/metrics/gpu) |
| - Post-report clinical advisor chat (/chat/{report_id}) |
| - FHIR R4 DiagnosticReport export (/export/fhir/{report_id}) |
| """ |
|
|
| import json |
| import logging |
| import base64 |
| import uuid |
| import asyncio |
| import subprocess |
| import uvicorn |
| import os |
| from datetime import datetime |
| from typing import Dict, Optional, Any, AsyncGenerator |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
|
|
| from core.llm import LLMClient |
| from core.models import PatientInput, PipelineState, ChatRequest, ChatResponse |
| from core.pipeline import PipelineOrchestrator |
| from agents.intake import IntakeAgent |
| from agents.vision import VisionAgent |
| from agents.research import ResearchAgent |
| from agents.report import ReportAgent |
| from agents.critic import CriticAgent |
| from agents.advisor import ClinicalAdvisorAgent |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| force=True |
| ) |
| logger = logging.getLogger("mediagent.server") |
|
|
| |
| |
| |
| LLM_BASE_URL = os.getenv("LLM_BASE_URL", "http://localhost:8000/v1") |
| DEMO_MODE = os.getenv("DEMO_MODE", "false").lower() == "true" |
|
|
| |
| |
| |
| app = FastAPI( |
| title="MediAgent API", |
| version="2.0.0", |
| description="Autonomous Multi-Agent Medical Imaging Analysis System β AMD MI300X", |
| docs_url="/api/docs", |
| redoc_url="/api/redoc" |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| pipeline_registry: Dict[str, PipelineState] = {} |
| REGISTRY_MAX_SIZE = 200 |
|
|
|
|
| |
| |
| |
| @app.on_event("startup") |
| async def startup_event(): |
| logger.info("π MediAgent v2.0 β System Startup") |
| if DEMO_MODE: |
| logger.info("β οΈ DEMO MODE ACTIVE β no real inference will be performed") |
| try: |
| llm_client = LLMClient() |
| app.state.llm_client = llm_client |
| app.state.intake_agent = IntakeAgent(llm_client=llm_client) |
| app.state.vision_agent = VisionAgent(llm_client=llm_client) |
| app.state.research_agent = ResearchAgent(llm_client=llm_client) |
| app.state.report_agent = ReportAgent(llm_client=llm_client) |
| app.state.critic_agent = CriticAgent(llm_client=llm_client) |
| app.state.advisor_agent = ClinicalAdvisorAgent(llm_client=llm_client) |
|
|
| def _default_cb(state: PipelineState): |
| pass |
|
|
| app.state.orchestrator = PipelineOrchestrator( |
| intake_agent=app.state.intake_agent, |
| vision_agent=app.state.vision_agent, |
| research_agent=app.state.research_agent, |
| report_agent=app.state.report_agent, |
| critic_agent=app.state.critic_agent, |
| on_status_update=_default_cb, |
| ) |
| logger.info("β
All agents initialised. MediAgent v2.0 online.") |
| except Exception as e: |
| logger.critical("π₯ Startup failure: %s", e) |
| raise SystemExit(str(e)) |
|
|
|
|
| |
| |
| |
|
|
| def _evict_registry(): |
| if len(pipeline_registry) >= REGISTRY_MAX_SIZE: |
| del pipeline_registry[next(iter(pipeline_registry))] |
|
|
|
|
| async def _read_and_encode_image(image: UploadFile): |
| image_bytes = await image.read() |
| if len(image_bytes) > 20 * 1024 * 1024: |
| raise HTTPException(status_code=413, detail="Image exceeds 20 MB size limit.") |
|
|
| filename = (image.filename or "").lower() |
| content_type = image.content_type or "" |
|
|
| is_dicom = ( |
| filename.endswith(".dcm") |
| or "dicom" in content_type |
| or image_bytes[:4] == b"\x00\x00\x00\x00" |
| or image_bytes[128:132] == b"DICM" |
| ) |
|
|
| if is_dicom: |
| try: |
| from core.dicom import parse_dicom |
| b64_image, dicom_meta = parse_dicom(image_bytes) |
| logger.info("DICOM parsed | meta keys: %s", list(dicom_meta.keys())) |
| return b64_image, dicom_meta |
| except Exception as e: |
| logger.warning("DICOM parse failed (%s), treating as regular image", e) |
|
|
| b64_data = base64.b64encode(image_bytes).decode("utf-8") |
| mime = content_type if content_type.startswith("image/") else "image/jpeg" |
| return f"data:{mime};base64,{b64_data}", None |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def serve_frontend(): |
| return FileResponse("static/index.html") |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| return { |
| "status": "healthy", |
| "version": "2.0.0", |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "system": "MediAgent", |
| "infrastructure": "AMD Instinct MI300X / ROCm / vLLM", |
| "agents_loaded": hasattr(app.state, "orchestrator"), |
| "active_sessions": len(pipeline_registry), |
| "demo_mode": DEMO_MODE, |
| "features": ["dicom", "clinical_chat", "gpu_metrics"], |
| } |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/metrics/gpu") |
| async def get_gpu_metrics(): |
| metrics: Dict[str, Any] = { |
| "available": False, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| } |
|
|
| try: |
| import amdsmi |
| amdsmi.amdsmi_init() |
| devices = amdsmi.amdsmi_get_processor_handles() |
| cards = [] |
| for i, dev in enumerate(devices): |
| try: |
| usage = amdsmi.amdsmi_get_gpu_activity(dev) |
| vram = amdsmi.amdsmi_get_gpu_memory_usage(dev, amdsmi.AmdSmiMemoryType.VRAM) |
| vtotal = amdsmi.amdsmi_get_gpu_memory_total(dev, amdsmi.AmdSmiMemoryType.VRAM) |
| temp = amdsmi.amdsmi_get_temp_metric(dev, amdsmi.AmdSmiTemperatureType.JUNCTION, |
| amdsmi.AmdSmiTemperatureMetric.CURRENT) |
| power = amdsmi.amdsmi_get_power_info(dev) |
| clk = amdsmi.amdsmi_get_clk_freq(dev, amdsmi.AmdSmiClkType.GFX) |
| cards.append({ |
| "card": f"GPU {i}", |
| "gpu_use_pct": usage.get("gfx_activity", 0), |
| "vram_used_mb": round(vram / 1024 / 1024, 1), |
| "vram_total_mb": round(vtotal / 1024 / 1024, 1), |
| "temp_c": temp, |
| "power_w": round(power.get("current_socket_power", 0), 1), |
| "clk_mhz": clk.get("cur_clk", 0), |
| }) |
| except Exception: |
| pass |
| amdsmi.amdsmi_shut_down() |
| if cards: |
| metrics["available"] = True |
| metrics["source"] = "amdsmi" |
| metrics["cards"] = cards |
| return metrics |
| except Exception: |
| pass |
|
|
| try: |
| result = subprocess.run( |
| ["rocm-smi", "--showuse", "--showmeminfo", "vram", "--showtemp", "--showpower", "--json"], |
| capture_output=True, text=True, timeout=5 |
| ) |
| if result.returncode == 0 and result.stdout.strip(): |
| raw = json.loads(result.stdout) |
| cards = [] |
| for key, val in raw.items(): |
| if not isinstance(val, dict): |
| continue |
|
|
| def _pick(d, *keys, default=0): |
| for k in keys: |
| v = d.get(k) |
| if v is not None: |
| try: return float(str(v).replace("%","").strip()) |
| except ValueError: pass |
| return default |
|
|
| vram_used = _pick(val, |
| "VRAM Total Used Memory (B)", "Used VRAM (B)", "vram_used", |
| "VRAM Total Used Memory (MiB)", "Used VRAM (MiB)", "VRAM Total Memory Used (MiB)") |
| vram_total = _pick(val, |
| "VRAM Total Memory (B)", "Total VRAM (B)", "vram_total", |
| "VRAM Total Memory (MiB)", "Total VRAM (MiB)", "VRAM Total Memory Size (MiB)") |
| if vram_used > 1_000_000: vram_used = round(vram_used / 1024 / 1024, 1) |
| if vram_total > 1_000_000: vram_total = round(vram_total / 1024 / 1024, 1) |
|
|
| cards.append({ |
| "card": key, |
| "gpu_use_pct": _pick(val, "GPU use (%)", "GPU Use (%)", "GFX Activity (%)", "GPU activity (%)"), |
| "vram_used_mb": vram_used, |
| "vram_total_mb": vram_total, |
| "temp_c": _pick(val, "Temperature (Sensor junction) (C)", |
| "Temperature (Sensor HBM 0) (C)", "Junction Temperature (C)"), |
| "power_w": _pick(val, "Current Socket Graphics Package Power (W)", |
| "Average Graphics Package Power (W)", "Socket Power (W)"), |
| }) |
|
|
| if cards: |
| metrics["available"] = True |
| metrics["source"] = "rocm-smi" |
| metrics["cards"] = cards |
| return metrics |
| except FileNotFoundError: |
| pass |
| except Exception as e: |
| logger.debug("rocm-smi JSON failed: %s", e) |
|
|
| metrics["note"] = "AMD GPU metrics unavailable β is ROCm installed?" |
| return metrics |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/analyze") |
| async def analyze_image( |
| image: UploadFile = File(...), |
| symptoms: str = Form(default=""), |
| age: Optional[int] = Form(default=None, ge=0, le=120), |
| sex: Optional[str] = Form(default=None), |
| clinical_context: str = Form(default=""), |
| ): |
| logger.info("[SYNC] New analysis request | file=%s", image.filename) |
| report_id = f"REP-{uuid.uuid4().hex[:12].upper()}" |
|
|
| try: |
| b64_image, dicom_meta = await _read_and_encode_image(image) |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"Image processing failed: {e}") |
|
|
| if dicom_meta: |
| if age is None and dicom_meta.get("age"): |
| try: age = int(dicom_meta["age"]) |
| except (ValueError, TypeError): pass |
| if not sex and dicom_meta.get("sex"): |
| sex = dicom_meta["sex"] |
| if not clinical_context and dicom_meta.get("study_description"): |
| clinical_context = f"DICOM: {dicom_meta.get('study_description','')} | {dicom_meta.get('body_part','')}" |
|
|
| patient_input = PatientInput( |
| image_base64=b64_image, symptoms=symptoms, age=age, |
| sex=sex, clinical_context=clinical_context |
| ) |
|
|
| _evict_registry() |
| pipeline_registry[report_id] = PipelineState() |
|
|
| try: |
| state = app.state.orchestrator.run(patient_input) |
| pipeline_registry[report_id] = state |
| if not state.final_report: |
| raise HTTPException(status_code=500, detail="Pipeline completed without final report.") |
| report_dict = state.final_report.model_dump() |
| if dicom_meta: |
| report_dict["dicom_metadata"] = dicom_meta |
| logger.info("[SYNC] Complete | report_id=%s", report_id) |
| return JSONResponse(content=report_dict) |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.exception("Pipeline failure: %s", e) |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/analyze/stream") |
| async def analyze_stream( |
| image: UploadFile = File(...), |
| symptoms: str = Form(default=""), |
| age: Optional[int] = Form(default=None, ge=0, le=120), |
| sex: Optional[str] = Form(default=None), |
| clinical_context: str = Form(default=""), |
| ): |
| logger.info("[STREAM] New streaming analysis request | file=%s", image.filename) |
|
|
| try: |
| b64_image, dicom_meta = await _read_and_encode_image(image) |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
|
|
| if dicom_meta: |
| if age is None and dicom_meta.get("age"): |
| try: age = int(dicom_meta["age"]) |
| except (ValueError, TypeError): pass |
| if not sex and dicom_meta.get("sex"): |
| sex = dicom_meta["sex"] |
| if not clinical_context and dicom_meta.get("study_description"): |
| clinical_context = f"DICOM: {dicom_meta.get('study_description','')} | {dicom_meta.get('body_part','')}" |
|
|
| patient_input = PatientInput( |
| image_base64=b64_image, symptoms=symptoms, age=age, |
| sex=sex, clinical_context=clinical_context |
| ) |
|
|
| async def event_generator() -> AsyncGenerator[str, None]: |
| queue: asyncio.Queue = asyncio.Queue() |
| executor = ThreadPoolExecutor(max_workers=1) |
|
|
| _last_statuses: dict = {} |
|
|
| def _status_cb(state: PipelineState): |
| for agent_name, status in state.agent_statuses.items(): |
| if _last_statuses.get(agent_name) != status: |
| _last_statuses[agent_name] = status |
| payload = {"agent": agent_name, "status": status.value} |
| queue.put_nowait(f"data: {json.dumps(payload)}\n\n") |
|
|
| def _run_pipeline(): |
| |
| if DEMO_MODE: |
| import time |
| for agent in ["INTAKE", "VISION", "RESEARCH", "REPORT", "CRITIC"]: |
| queue.put_nowait(f'data: {json.dumps({"agent": agent, "status": "RUNNING"})}\n\n') |
| time.sleep(1.2) |
| queue.put_nowait(f'data: {json.dumps({"agent": agent, "status": "DONE"})}\n\n') |
| demo_report = { |
| "type": "report", |
| "report_id": "REP-DEMO0000001", |
| "data": { |
| "report_id": "REP-DEMO0000001", |
| "overall_severity": "SIGNIFICANT", |
| "vision_summary": "Demo mode active β live inference runs on AMD Instinct MI300X via ROCm + vLLM.", |
| "research_summary": "This is a demonstration deployment. Real analysis requires the AMD MI300X inference backend.", |
| "agent_pipeline_status": "DEMO", |
| "sections": { |
| "clinical_history": "Demo submission β AMD Developer Hackathon 2026.", |
| "technique": "Demo mode active. No live GPU inference on this host.", |
| "findings": "This Space demonstrates the full MediAgent UI and pipeline architecture. Live multimodal analysis runs on AMD Instinct MI300X via ROCm + vLLM. See the video demo for live inference on real medical images.", |
| "impression": "1. Demo mode active β no real image analysis performed (85%)\n\nConfidence Level: N/A β Demo deployment", |
| "recommendations": "Deploy with LLM_BASE_URL pointed at a live vLLM ROCm endpoint to enable full analysis.\n\n[QUALITY ASSESSMENT]\nScore: N/A | Demo mode", |
| "disclaimer": "This analysis is AI-generated and must be reviewed by a licensed radiologist before any clinical decisions are made." |
| }, |
| "vision_findings": [ |
| {"severity": "SIGNIFICANT", "confidence_score": 85, "description": "Demo mode β live analysis requires AMD MI300X backend"}, |
| {"severity": "NORMAL", "confidence_score": 95, "description": "Demo mode β system operational"} |
| ], |
| "differential_diagnoses": [ |
| {"condition_name": "Demo Mode Active", "match_probability": 100}, |
| {"condition_name": "Requires AMD MI300X", "match_probability": 85} |
| ] |
| } |
| } |
| queue.put_nowait(f'data: {json.dumps(demo_report)}\n\n') |
| queue.put_nowait(None) |
| return |
| |
| try: |
| report_id = f"REP-{uuid.uuid4().hex[:12].upper()}" |
| orchestrator = PipelineOrchestrator( |
| intake_agent=app.state.intake_agent, |
| vision_agent=app.state.vision_agent, |
| research_agent=app.state.research_agent, |
| report_agent=app.state.report_agent, |
| critic_agent=app.state.critic_agent, |
| on_status_update=_status_cb, |
| ) |
| state = orchestrator.run(patient_input) |
| _evict_registry() |
| pipeline_registry[report_id] = state |
|
|
| if state.final_report: |
| report_dict = state.final_report.model_dump() |
| if dicom_meta: |
| report_dict["dicom_metadata"] = dicom_meta |
| if state.vision_output: |
| report_dict["vision_findings"] = [ |
| {"severity": f.severity.value, "confidence_score": f.confidence_score, "description": f.description} |
| for f in state.vision_output.findings |
| ] |
| if state.research_output: |
| report_dict["differential_diagnoses"] = [ |
| {"condition_name": d.condition_name, "match_probability": d.match_probability} |
| for d in state.research_output.differential_diagnoses[:5] |
| ] |
| payload = {"type": "report", "data": report_dict, "report_id": report_id} |
| queue.put_nowait(f"data: {json.dumps(payload, default=str)}\n\n") |
| else: |
| queue.put_nowait(f"data: {json.dumps({'type':'error','message':'Pipeline produced no report'})}\n\n") |
| except Exception as e: |
| logger.exception("Stream pipeline crashed: %s", e) |
| queue.put_nowait(f"data: {json.dumps({'type':'error','message':str(e)})}\n\n") |
| finally: |
| queue.put_nowait(None) |
|
|
| asyncio.get_running_loop().run_in_executor(executor, _run_pipeline) |
|
|
| while True: |
| event = await queue.get() |
| if event is None: |
| break |
| yield event |
|
|
| return StreamingResponse(event_generator(), media_type="text/event-stream") |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/chat/{report_id}", response_model=ChatResponse) |
| async def clinical_chat(report_id: str, request: ChatRequest): |
| if DEMO_MODE: |
| return ChatResponse( |
| answer="This is a demo deployment. Clinical Q&A is available when running on AMD Instinct MI300X with live inference. See the video demo for full functionality.", |
| report_id=report_id |
| ) |
|
|
| if report_id not in pipeline_registry: |
| raise HTTPException(status_code=404, detail="Report not found. Run analysis first.") |
|
|
| state = pipeline_registry[report_id] |
| if not state.final_report: |
| raise HTTPException(status_code=400, detail="Report not yet complete.") |
|
|
| loop = asyncio.get_running_loop() |
| answer = await loop.run_in_executor( |
| None, |
| app.state.advisor_agent.answer, |
| request.question, |
| state.final_report, |
| ) |
|
|
| return ChatResponse(answer=answer, report_id=report_id) |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/status/{report_id}") |
| async def get_pipeline_status(report_id: str): |
| if report_id not in pipeline_registry: |
| raise HTTPException(status_code=404, detail="Report ID not found.") |
| state = pipeline_registry[report_id] |
| return { |
| "report_id": report_id, |
| "current_step": state.current_step, |
| "agent_statuses": {k: v.value for k, v in state.agent_statuses.items()}, |
| "error_log": state.error_log, |
| "completed": state.current_step == "COMPLETE", |
| } |
|
|
|
|
| |
| |
| |
| app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
| if __name__ == "__main__": |
| logger.info("π₯ Starting MediAgent v2.0 on port 8090") |
| uvicorn.run("main:app", host="0.0.0.0", port=8090, log_level="info", reload=False) |