| import os |
| import io |
| import pydicom |
| import numpy as np |
| from PIL import Image |
| import base64 |
| import httpx |
| import asyncio |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
| Z_AI_API_KEY = os.getenv("Z_AI_API_KEY") |
| ROBOFLOW_API_KEY = os.getenv("ROBOFLOW_API_KEY") |
| HEADERS_HF = {"Authorization": f"Bearer {HF_TOKEN}"} |
| HEADERS_Z_AI = {"Authorization": f"Bearer {Z_AI_API_KEY}", "Content-Type": "application/json"} |
|
|
| |
| MODELS = { |
| "fracture": "bone-fracture-vqdiz/1", |
| "fracture_vlm": "AIRI-Institute/chexfract-maira2", |
| "mammo": "ianpan/mammoscreen", |
| "breast_tissue": "nateraw/breast-cancer-classification", |
| "glm": "glm-4.6v" |
| } |
|
|
| async def call_hf_api(model_id, payload): |
| |
| api_url = f"https://api-inference.huggingface.co/models/{model_id}" |
| |
| async with httpx.AsyncClient(timeout=60.0) as client: |
| |
| |
| if "inputs" in payload and isinstance(payload["inputs"], str): |
| |
| try: |
| img_data = base64.b64decode(payload["inputs"]) |
| response = await client.post(api_url, headers=HEADERS_HF, content=img_data) |
| except: |
| response = await client.post(api_url, headers=HEADERS_HF, json=payload) |
| else: |
| response = await client.post(api_url, headers=HEADERS_HF, json=payload) |
| |
| if response.status_code == 503: |
| return {"error": "Model is loading", "details": response.json()} |
| |
| return response.json() |
|
|
| async def call_roboflow_api(model_id, image_bytes): |
| """ |
| Appelle le serveur d'inférence Roboflow LOCAL (Docker). |
| Configuration basée sur la version RAG finale. |
| """ |
| |
| API_KEY = os.getenv("ROBOFLOW_API_KEY", "Ac4Ngxa813phZbyPDo64") |
| PROJECT_ID = "bone-fracture-vqdiz" |
| MODEL_VERSION = "1" |
| |
| |
| api_url = f"http://localhost:9001/{PROJECT_ID}/{MODEL_VERSION}?api_key={API_KEY}" |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| |
| files = {'file': ("image.jpg", image_bytes, "image/jpeg")} |
| try: |
| response = await client.post(api_url, files=files) |
| |
| print(f"DEBUG Roboflow Local: URL={api_url} Status={response.status_code}") |
| |
| if response.status_code != 200: |
| print(f"DEBUG Roboflow Error Detail: {response.text}") |
| return {"error": response.text, "status": response.status_code} |
| |
| return response.json() |
| except Exception as e: |
| print(f"DEBUG Roboflow Local Exception: {str(e)}") |
| return { |
| "error": "Serveur d'inférence local introuvable.", |
| "message": "Assurez-vous que le conteneur Docker tourne (inference server start).", |
| "details": str(e) |
| } |
|
|
| async def call_z_ai_api(payload): |
| api_url = "https://api.z.ai/api/paas/v4/chat/completions" |
| |
| async with httpx.AsyncClient(timeout=90.0) as client: |
| response = await client.post(api_url, headers=HEADERS_Z_AI, json=payload) |
| return response.json() |
|
|
| def convert_dicom_to_jpg(dicom_bytes: bytes) -> bytes: |
| """Convertit un fichier DICOM en image JPEG exploitable par l'IA.""" |
| try: |
| ds = pydicom.dcmread(io.BytesIO(dicom_bytes)) |
| pixel_array = ds.pixel_array |
| |
| if hasattr(ds, 'RescaleIntercept') and hasattr(ds, 'RescaleSlope'): |
| pixel_array = pixel_array.astype(np.float32) * ds.RescaleSlope + ds.RescaleIntercept |
| |
| p_min, p_max = np.min(pixel_array), np.max(pixel_array) |
| if p_max > p_min: |
| pixel_array = ((pixel_array - p_min) / (p_max - p_min) * 255).astype(np.uint8) |
| else: |
| pixel_array = pixel_array.astype(np.uint8) |
| |
| img = Image.fromarray(pixel_array) |
| if img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| buf = io.BytesIO() |
| img.save(buf, format="JPEG", quality=95) |
| return buf.getvalue() |
| except Exception as e: |
| print(f"Erreur conversion DICOM: {e}") |
| return None |
|
|
| async def run_radiology_agent(image_bytes: bytes, model_type: str, question: str = None) -> dict: |
| |
| is_dicom = False |
| if len(image_bytes) > 132 and image_bytes[128:132] == b"DICM": |
| is_dicom = True |
| |
| if is_dicom: |
| converted = convert_dicom_to_jpg(image_bytes) |
| if converted: |
| image_bytes = converted |
| print("Image DICOM convertie avec succès.") |
|
|
| img_str = base64.b64encode(image_bytes).decode("utf-8") |
|
|
| if model_type == "glm": |
| payload = { |
| "model": "glm-4.6v", |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": f"En tant qu'expert radiologue : {question if question else 'Décrivez cette imagerie et les anomalies potentielles.'}" |
| }, |
| { |
| "type": "image_url", |
| "image_url": { "url": f"data:image/jpeg;base64,{img_str}" } |
| } |
| ] |
| } |
| ] |
| } |
| result = await call_z_ai_api(payload) |
| if "choices" in result: |
| return result["choices"][0]["message"]["content"] |
| return result |
| |
| elif model_type == "fracture_vlm": |
| |
| model_id = MODELS.get("fracture_vlm") |
| |
| payload = { |
| "inputs": { |
| "image": img_str, |
| "text": question if question else "Describe any fractures in this X-ray and propose a BIRADS/Classification." |
| } |
| } |
| return await call_hf_api(model_id, payload) |
| |
| else: |
| |
| model_id = MODELS.get(model_type) |
| |
| if model_type == "fracture": |
| |
| return await call_roboflow_api(model_id, image_bytes) |
| |
| payload = { |
| "inputs": img_str |
| } |
| return await call_hf_api(model_id, payload) |
|
|