""" NPH Diagnostic Platform v3.0 Unified Gradio app combining intensity segmentation, YOLO detection, dual-engine comparison, ensemble scoring, clinical calculator, multi-slice batch analysis, quality assessment, and report generation. Author: Matheus Rech, MD """ import gradio as gr import numpy as np from PIL import Image, ImageFilter, ImageEnhance, ImageOps, ImageDraw from transformers import pipeline import cv2 import tempfile import os import threading import logging import time import json from datetime import datetime from segment_neuroimaging import ( segment_nph, segment_ventricles, compute_evans_index, compute_callosal_angle, compute_temporal_horn_width, compute_third_ventricle_width, score_pvh, assess_desh, create_overlay, add_annotations, create_comparison, preprocess_image, create_roi_mask, morphological_cleanup, filter_by_area, Modality, VENTRICLE_THRESHOLDS, CSF_MODE, CSFAppearance, COLORS ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ---- ML models (lazy-loaded) ---- _classifier = None _detector = None _segmenter = None def get_classifier(): global _classifier if _classifier is None: _classifier = pipeline("image-classification", model="google/vit-base-patch16-224") return _classifier def get_detector(): global _detector if _detector is None: _detector = pipeline("object-detection", model="facebook/detr-resnet-50") return _detector def get_segmenter(): global _segmenter if _segmenter is None: _segmenter = pipeline("image-segmentation", model="facebook/detr-resnet-50-panoptic") return _segmenter # ---- YOLO model ---- YOLO_MODEL_PATH = "best.pt" _yolo_model = None _yolo_lock = threading.Lock() YOLO_COLORS = { "ventricle": (0, 150, 255), "sylvian_fissure": (200, 100, 255), "tight_convexity": (255, 150, 100), "pvh": (255, 200, 0), "skull_inner": (200, 200, 200), } YOLO_COLOR_HEX = { "ventricle": "#0096FF", "sylvian_fissure": "#C864FF", "tight_convexity": "#FF9664", "pvh": "#FFC800", "skull_inner": "#C8C8C8", } def _get_yolo_model(): global _yolo_model if _yolo_model is None: with _yolo_lock: if _yolo_model is None and os.path.exists(YOLO_MODEL_PATH): try: from ultralytics import YOLO _yolo_model = YOLO(YOLO_MODEL_PATH) logger.info("YOLO model loaded from %s", YOLO_MODEL_PATH) except Exception as e: logger.error("Failed to load YOLO model: %s", e) return _yolo_model # =========================================================================== # Shared: NPH scoring # =========================================================================== def _compute_nph_score(data: dict) -> dict: """Weighted NPH scoring: VSR(40%) + EI(25%) + CA(20%) + DESH(10%) + Sylvian(5%).""" score = 0.0 evans = data.get("evansIndex") or 0.0 callosal = data.get("callosalAngle") desh = data.get("deshScore") or 0 sylvian = bool(data.get("sylvianDilation")) vsr = data.get("vsr") triad = data.get("triad") or [] atrophy = data.get("corticalAtrophy") or "unknown" has_vsr = vsr is not None has_callosal = callosal is not None if has_vsr: if vsr > 2.0: score += 40 * min((vsr - 2.0) / 2.0, 1) if evans > 0.3: score += 25 * min((evans - 0.3) / 0.15, 1) if has_callosal and callosal < 90: score += 20 * min((90 - callosal) / 50, 1) score += (desh / 3) * 10 if sylvian: score += 5 else: scale = 100 / 60 if evans > 0.3: score += 25 * scale * min((evans - 0.3) / 0.15, 1) if has_callosal and callosal < 90: score += 20 * scale * min((90 - callosal) / 50, 1) score += (desh / 3) * 10 * scale if sylvian: score += 5 * scale triad_count = sum(1 for v in triad if v) if triad_count == 3: score = min(score * 1.15, 100) elif triad_count == 2: score = min(score * 1.05, 100) if atrophy == "significant": score *= 0.7 elif atrophy == "moderate": score *= 0.85 score = int(round(min(score, 100))) if score >= 75: label, color = "Probable NPH", "#ef4444" rec = "Strongly consider CSF tap test and neurosurgical referral for VP shunt evaluation." elif score >= 50: label, color = "Possible NPH", "#f59e0b" rec = "CSF tap test recommended. Consider supplementary MRI for DESH confirmation." elif score >= 30: label, color = "Low Suspicion", "#3b82f6" rec = "NPH less likely. Consider alternative diagnoses. Follow-up imaging in 6 months." else: label, color = "Unlikely NPH", "#6b7280" rec = "Ventriculomegaly likely ex-vacuo or other etiology. Investigate alternative causes." return {"score": score, "label": label, "color": color, "recommendation": rec} # =========================================================================== # Shared: Image quality assessment # =========================================================================== def assess_quality(gray): """Return a quality dict: sharpness, contrast, noise, overall grade.""" laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() contrast = float(gray.std()) noise_est = 0 if gray.shape[0] > 3 and gray.shape[1] > 3: kernel = np.array([[1, -2, 1], [-2, 4, -2], [1, -2, 1]]) sigma = np.abs(cv2.filter2D(gray.astype(np.float64), -1, kernel)).sum() noise_est = sigma * np.sqrt(0.5 * np.pi) / (6 * (gray.shape[0] - 2) * (gray.shape[1] - 2)) sharpness_score = min(100, laplacian_var / 5) contrast_score = min(100, contrast * 2) noise_score = max(0, 100 - noise_est * 10) overall = (sharpness_score * 0.4 + contrast_score * 0.35 + noise_score * 0.25) if overall >= 70: grade = "Good" elif overall >= 40: grade = "Acceptable" else: grade = "Poor" return { "sharpness": round(sharpness_score, 1), "contrast": round(contrast_score, 1), "noise": round(noise_score, 1), "overall": round(overall, 1), "grade": grade, } def compute_symmetry_score(mask): """Score left-right symmetry of a binary mask (0-100).""" h, w = mask.shape[:2] mid = w // 2 left = mask[:, :mid] right = np.fliplr(mask[:, mid:mid + left.shape[1]]) if left.shape != right.shape: min_w = min(left.shape[1], right.shape[1]) left = left[:, :min_w] right = right[:, :min_w] intersection = np.logical_and(left > 0, right > 0).sum() union = np.logical_or(left > 0, right > 0).sum() if union == 0: return 0.0 return round(intersection / union * 100, 1) # =========================================================================== # Tab 1: Dual-Engine NPH Analysis (the main innovation) # =========================================================================== def _run_intensity_engine(image, modality, sensitivity, pixel_spacing): """Run the intensity-based segmentation engine.""" with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: Image.fromarray(image).save(f.name) temp_path = f.name try: modality_map = { "Axial FLAIR": "FLAIR", "Axial T1": "T1", "Axial T2": "T2", "Coronal T2": "T2", "Axial T2 FFE": "T2", "Sagittal T1": "T1", "CT Head": "CT_HEAD", } mod_key = modality_map.get(modality, "T1") mod = Modality[mod_key] is_coronal = "Coronal" in modality img_rgb, gray, _ = preprocess_image(temp_path) h, w = gray.shape[:2] blurred = cv2.GaussianBlur(gray, (5, 5), 0) roi_mask = create_roi_mask(blurred, threshold=30) orig_thresh = dict(VENTRICLE_THRESHOLDS[mod]) sens_adj = (sensitivity - 50) / 50.0 custom_thresholds = dict(orig_thresh) if CSF_MODE[mod] == CSFAppearance.DARK: custom_thresholds["csf_high"] = max(20, min(120, int(orig_thresh["csf_high"] + sens_adj * 30))) else: custom_thresholds["csf_low"] = max(100, min(220, int(orig_thresh["csf_low"] - sens_adj * 30))) vent_mask = segment_ventricles(gray, mod, roi_mask, custom_thresholds=custom_thresholds) if pixel_spacing is None: pixel_spacing = round(180.0 / max(w, 256), 2) ei_data = compute_evans_index(vent_mask, image_width=w, pixel_spacing_mm=pixel_spacing) th_data = compute_temporal_horn_width(vent_mask, pixel_spacing) tv_data = compute_third_ventricle_width(vent_mask, pixel_spacing) desh_data = assess_desh(vent_mask, gray, roi_mask, mod, pixel_spacing) pvh_data = score_pvh(gray, vent_mask) if mod == Modality.FLAIR else None ca_data = compute_callosal_angle(vent_mask) if is_coronal else {} vent_area = int((vent_mask > 0).sum()) brain_area = int((roi_mask > 0).sum()) vb_ratio = round(vent_area / brain_area, 4) if brain_area > 0 else 0 quality = assess_quality(gray) symmetry = compute_symmetry_score(vent_mask) # Build overlay display_masks = {"ventricles": vent_mask} parenchyma = cv2.bitwise_and(roi_mask, cv2.bitwise_not(vent_mask)) display_masks["parenchyma"] = parenchyma if pvh_data and mod == Modality.FLAIR: display_masks["pvh"] = pvh_data["pvh_mask"] if "sylvian_mask" in desh_data: display_masks["sylvian_fissures"] = desh_data["sylvian_mask"] if "convexity_mask" in desh_data: display_masks["high_convexity_sulci"] = desh_data["convexity_mask"] overlay = create_overlay(img_rgb, display_masks, alpha=0.45) biomarkers = dict(ei_data) biomarkers.update(th_data) if pvh_data: biomarkers["pvh_grade"] = pvh_data["pvh_grade"] biomarkers["is_desh_positive"] = desh_data["is_desh_positive"] if ca_data.get("callosal_angle_deg") is not None: biomarkers["callosal_angle_deg"] = ca_data["callosal_angle_deg"] annotated = add_annotations(overlay, display_masks, f"{modality} -- Intensity Engine", biomarkers) # Draw Evans' index line row = ei_data.get("measurement_row", 0) if row > 0: cols = np.where(vent_mask[row, :] > 0)[0] if len(cols) > 0: minX, maxX = int(cols[0]), int(cols[-1]) cv2.line(annotated, (minX, row), (maxX, row), (255, 220, 0), 2) skull_d = ei_data.get("skull_diameter_px", w) cx = w // 2 hs = skull_d // 2 cv2.line(annotated, (cx - hs, row + 8), (cx + hs, row + 8), (200, 200, 200), 1) return { "annotated": annotated, "evans_index": ei_data.get("evans_index", 0), "frontal_horn_mm": ei_data.get("frontal_horn_width_mm"), "skull_diameter_mm": ei_data.get("skull_diameter_mm"), "temporal_horn_px": th_data.get("temporal_horn_width_px", 0), "temporal_horn_mm": th_data.get("temporal_horn_width_mm"), "third_ventricle_px": tv_data.get("third_ventricle_width_px", 0), "third_ventricle_mm": tv_data.get("third_ventricle_width_mm"), "vb_ratio": vb_ratio, "vent_area": vent_area, "brain_area": brain_area, "desh_positive": desh_data.get("is_desh_positive", False), "desh_score": desh_data.get("total_score", 0), "desh_ventriculomegaly": desh_data.get("ventriculomegaly_score", 0), "desh_sylvian": desh_data.get("sylvian_dilation_score", 0), "desh_convexity": desh_data.get("convexity_tightness_score", 0), "pvh_grade": pvh_data["pvh_grade"] if pvh_data else None, "pvh_ratio": pvh_data["pvh_ratio"] if pvh_data else None, "callosal_angle": ca_data.get("callosal_angle_deg"), "quality": quality, "symmetry": symmetry, } finally: os.unlink(temp_path) def _run_yolo_engine(image, conf_threshold=0.25): """Run the YOLO detection engine.""" model = _get_yolo_model() if model is None: return None with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: Image.fromarray(image).save(f.name) temp_path = f.name try: results = model(temp_path, verbose=False)[0] h, w = image.shape[:2] annotated_img = image.copy() boxes = [] for box in results.boxes: conf = float(box.conf[0]) if conf < conf_threshold: continue x1, y1, x2, y2 = [int(round(v)) for v in box.xyxy[0].tolist()] cls_id = int(box.cls[0]) cls_name = model.names.get(cls_id, str(cls_id)) color = YOLO_COLORS.get(cls_name, (255, 255, 255)) boxes.append({ "class": cls_name, "x1": x1, "y1": y1, "x2": x2, "y2": y2, "confidence": round(conf, 4), }) cv2.rectangle(annotated_img, (x1, y1), (x2, y2), color, 2) label = f"{cls_name} {conf:.0%}" (lw, lh), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(annotated_img, (x1, y1 - lh - 8), (x1 + lw + 4, y1), color, -1) cv2.putText(annotated_img, label, (x1 + 2, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA) # Derive metrics ventricle = next((b for b in boxes if b["class"] == "ventricle"), None) skull = next((b for b in boxes if b["class"] == "skull_inner"), None) if ventricle and skull: vent_w = ventricle["x2"] - ventricle["x1"] skull_w = skull["x2"] - skull["x1"] ei = round(vent_w / skull_w, 4) if skull_w > 0 else 0 elif ventricle: ei = round((ventricle["x2"] - ventricle["x1"]) / w, 4) else: ei = 0 desh_classes = {"tight_convexity", "sylvian_fissure", "pvh"} detected_desh = {b["class"] for b in boxes if b["class"] in desh_classes} sylvian = any(b["class"] == "sylvian_fissure" for b in boxes) pvh = any(b["class"] == "pvh" for b in boxes) return { "annotated": annotated_img, "boxes": boxes, "evans_index": ei, "desh_score": len(detected_desh), "sylvian_dilation": sylvian, "pvh_detected": pvh, "n_detections": len(boxes), } finally: os.unlink(temp_path) def dual_engine_analyze(image, modality, sensitivity, pixel_spacing_str, yolo_conf): """Run BOTH engines and produce comparison + ensemble score.""" if image is None: raise gr.Error("Please upload a brain MRI or CT image first.") pixel_spacing = None if pixel_spacing_str and pixel_spacing_str.strip(): try: pixel_spacing = float(pixel_spacing_str.strip()) except ValueError: pass # Run intensity engine t0 = time.time() intensity = _run_intensity_engine(image, modality, sensitivity, pixel_spacing) t_intensity = round(time.time() - t0, 2) # Run YOLO engine t0 = time.time() yolo = _run_yolo_engine(image, yolo_conf) t_yolo = round(time.time() - t0, 2) # Ensemble: average the Evans' Index from both engines ei_intensity = intensity["evans_index"] ei_yolo = yolo["evans_index"] if yolo else 0 if yolo: ei_ensemble = round((ei_intensity * 0.6 + ei_yolo * 0.4), 4) else: ei_ensemble = ei_intensity # DESH ensemble desh_intensity_score = intensity["desh_score"] desh_yolo_score = yolo["desh_score"] if yolo else 0 desh_ensemble = max(desh_intensity_score, desh_yolo_score) sylvian_ensemble = intensity["desh_sylvian"] > 0 or (yolo and yolo["sylvian_dilation"]) # Compute ensemble NPH score score_input = { "evansIndex": ei_ensemble, "callosalAngle": intensity.get("callosal_angle"), "deshScore": desh_ensemble, "sylvianDilation": sylvian_ensemble, "vsr": None, "triad": [], "corticalAtrophy": "unknown", } nph_result = _compute_nph_score(score_input) # Build report q = intensity["quality"] lines = [] lines.append("# Dual-Engine NPH Analysis Report\n") lines.append(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')} | **Modality:** {modality}\n") lines.append("---\n## Image Quality Assessment\n") lines.append(f"| Metric | Score |") lines.append(f"|---|---|") lines.append(f"| Sharpness | {q['sharpness']}/100 |") lines.append(f"| Contrast | {q['contrast']}/100 |") lines.append(f"| Noise | {q['noise']}/100 |") lines.append(f"| **Overall** | **{q['overall']}/100 ({q['grade']})** |") lines.append(f"| Symmetry | {intensity['symmetry']}% |") lines.append("\n---\n## Engine Comparison\n") lines.append("| Metric | Intensity Engine | YOLO Engine | Ensemble |") lines.append("|---|---|---|---|") ei_i_status = "abnormal" if ei_intensity > 0.3 else "normal" ei_y_status = ("abnormal" if ei_yolo > 0.3 else "normal") if yolo else "N/A" ei_e_status = "ABNORMAL" if ei_ensemble > 0.3 else "normal" lines.append(f"| Evans' Index | {ei_intensity:.3f} ({ei_i_status}) | {ei_yolo:.3f} ({ei_y_status}) | **{ei_ensemble:.3f} ({ei_e_status})** |") lines.append(f"| DESH Score | {desh_intensity_score}/6 | {desh_yolo_score}/3 | {desh_ensemble} (max) |") desh_pos_i = "Yes" if intensity["desh_positive"] else "No" desh_pos_y = ("Yes" if yolo and yolo["desh_score"] >= 2 else "No") if yolo else "N/A" lines.append(f"| DESH Positive | {desh_pos_i} | {desh_pos_y} | -- |") if yolo: lines.append(f"| Detections | -- | {yolo['n_detections']} objects | -- |") pvh_str = f"Grade {intensity['pvh_grade']}/3" if intensity["pvh_grade"] is not None else "N/A (not FLAIR)" pvh_y_str = ("Yes" if yolo and yolo["pvh_detected"] else "No") if yolo else "N/A" lines.append(f"| PVH | {pvh_str} | {pvh_y_str} | -- |") lines.append(f"| Processing Time | {t_intensity}s | {t_yolo}s | -- |") lines.append("\n---\n## Intensity Engine Details\n") if intensity.get("frontal_horn_mm"): lines.append(f"- Frontal horn width: {intensity['frontal_horn_mm']} mm") if intensity.get("skull_diameter_mm"): lines.append(f"- Skull diameter: {intensity['skull_diameter_mm']} mm") if intensity["temporal_horn_px"] > 0: mm_str = f" ({intensity['temporal_horn_mm']} mm)" if intensity.get("temporal_horn_mm") else "" lines.append(f"- Temporal horn width: {intensity['temporal_horn_px']} px{mm_str}") if intensity["third_ventricle_px"] > 0: mm_str = f" ({intensity['third_ventricle_mm']} mm)" if intensity.get("third_ventricle_mm") else "" lines.append(f"- Third ventricle width: {intensity['third_ventricle_px']} px{mm_str}") lines.append(f"- Ventricle/Brain ratio: {intensity['vb_ratio']:.4f} ({intensity['vent_area']}/{intensity['brain_area']} px)") lines.append(f"- DESH breakdown: Vent={intensity['desh_ventriculomegaly']}/2, Sylvian={intensity['desh_sylvian']}/2, Convexity={intensity['desh_convexity']}/2") if intensity.get("callosal_angle") is not None: ca = intensity["callosal_angle"] ca_str = "Suggestive" if ca < 90 else ("Indeterminate" if ca < 120 else "Normal") lines.append(f"- Callosal angle: {ca:.1f} deg ({ca_str})") if intensity["pvh_grade"] is not None: lines.append(f"- PVH: Grade {intensity['pvh_grade']}/3 (ratio: {intensity['pvh_ratio']:.4f})") if yolo: lines.append("\n---\n## YOLO Detection Details\n") for b in yolo["boxes"]: bw = b["x2"] - b["x1"] bh = b["y2"] - b["y1"] lines.append(f"- **{b['class']}**: {b['confidence']:.1%} conf, {bw}x{bh} px at ({b['x1']},{b['y1']})") lines.append(f"\n---\n## Ensemble NPH Score: **{nph_result['score']}/100 -- {nph_result['label']}**\n") lines.append(f"*{nph_result['recommendation']}*") report = "\n".join(lines) yolo_img = yolo["annotated"] if yolo else np.zeros_like(image) return intensity["annotated"], yolo_img, report # =========================================================================== # Tab 2: Multi-Slice Batch Analysis # =========================================================================== def batch_analyze(files, modality, sensitivity): """Analyze multiple slices and aggregate results.""" if not files: raise gr.Error("Please upload one or more brain scan images.") results = [] for f in files: img = np.array(Image.open(f.name).convert("RGB")) try: r = _run_intensity_engine(img, modality, sensitivity, None) results.append({ "file": os.path.basename(f.name), "evans_index": r["evans_index"], "desh_positive": r["desh_positive"], "desh_score": r["desh_score"], "vb_ratio": r["vb_ratio"], "quality_grade": r["quality"]["grade"], "symmetry": r["symmetry"], }) except Exception as e: results.append({ "file": os.path.basename(f.name), "evans_index": 0, "desh_positive": False, "desh_score": 0, "vb_ratio": 0, "quality_grade": "Error", "symmetry": 0, "error": str(e), }) # Aggregate valid = [r for r in results if "error" not in r] if not valid: return "All slices failed to process." ei_values = [r["evans_index"] for r in valid] max_ei = max(ei_values) max_ei_slice = valid[ei_values.index(max_ei)]["file"] mean_ei = np.mean(ei_values) any_desh = any(r["desh_positive"] for r in valid) max_desh = max(r["desh_score"] for r in valid) mean_vb = np.mean([r["vb_ratio"] for r in valid]) # Score using the max Evans' Index (worst slice = most diagnostic) score_input = { "evansIndex": max_ei, "deshScore": min(max_desh, 3), "sylvianDilation": any_desh, "corticalAtrophy": "unknown", } nph_result = _compute_nph_score(score_input) lines = ["# Multi-Slice NPH Analysis\n"] lines.append(f"**Slices analyzed:** {len(valid)} / {len(results)}\n") lines.append("---\n## Per-Slice Results\n") lines.append("| Slice | Evans' Index | V/B Ratio | DESH | Quality | Symmetry |") lines.append("|---|---|---|---|---|---|") for r in results: if "error" in r: lines.append(f"| {r['file']} | ERROR | -- | -- | -- | -- |") else: ei_flag = " **" if r["evans_index"] > 0.3 else "" desh_flag = "POS" if r["desh_positive"] else f"{r['desh_score']}/6" lines.append(f"| {r['file']} | {r['evans_index']:.3f}{ei_flag} | {r['vb_ratio']:.4f} | {desh_flag} | {r['quality_grade']} | {r['symmetry']}% |") lines.append(f"\n---\n## Aggregate Summary\n") lines.append(f"- **Max Evans' Index:** {max_ei:.3f} (slice: {max_ei_slice})" + (" -- ABNORMAL" if max_ei > 0.3 else "")) lines.append(f"- **Mean Evans' Index:** {mean_ei:.3f}") lines.append(f"- **Mean V/B Ratio:** {mean_vb:.4f}") lines.append(f"- **Max DESH Score:** {max_desh}/6") lines.append(f"- **Any DESH Positive:** {'Yes' if any_desh else 'No'}") lines.append(f"\n---\n## NPH Score: **{nph_result['score']}/100 -- {nph_result['label']}**\n") lines.append(f"*Based on worst-case slice (max EI). {nph_result['recommendation']}*") return "\n".join(lines) # =========================================================================== # Tab 3: Clinical Scoring Calculator # =========================================================================== def compute_clinical_score( evans_index, callosal_angle_str, desh_score, sylvian_dilation, vsr_str, gait, cognition, urinary, cortical_atrophy ): callosal = None if callosal_angle_str and callosal_angle_str.strip(): try: callosal = float(callosal_angle_str.strip()) except ValueError: pass vsr = None if vsr_str and vsr_str.strip(): try: vsr = float(vsr_str.strip()) except ValueError: pass triad = [gait, cognition, urinary] atrophy_map = {"None/Mild": "none", "Moderate": "moderate", "Significant": "significant"} score_data = { "evansIndex": evans_index, "callosalAngle": callosal, "deshScore": int(desh_score), "sylvianDilation": sylvian_dilation, "vsr": vsr, "triad": triad, "corticalAtrophy": atrophy_map.get(cortical_atrophy, "unknown"), } result = _compute_nph_score(score_data) lines = [f"# NPH Score: {result['score']}/100", f"## {result['label']}\n", f"{result['recommendation']}\n"] lines.append("---\n### Input Summary\n") lines.append(f"- **Evans' Index:** {evans_index:.3f}" + (" (>0.3 = abnormal)" if evans_index > 0.3 else "")) if callosal is not None: lines.append(f"- **Callosal Angle:** {callosal:.1f} deg") lines.append(f"- **DESH Score:** {int(desh_score)}/3") lines.append(f"- **Sylvian Dilation:** {'Yes' if sylvian_dilation else 'No'}") if vsr is not None: lines.append(f"- **VSR:** {vsr:.2f}" + (" (>2.0 = strong NPH indicator)" if vsr > 2.0 else "")) triad_count = sum(triad) lines.append(f"- **Hakim Triad:** {triad_count}/3") lines.append(f"- **Cortical Atrophy:** {cortical_atrophy}") return "\n".join(lines) # =========================================================================== # Tab 4: Report Generator # =========================================================================== def generate_report(image, modality, sensitivity, pixel_spacing_str, patient_id, patient_age, clinical_history, gait, cognition, urinary): """Generate a structured clinical radiology report.""" if image is None: raise gr.Error("Please upload a brain scan first.") pixel_spacing = None if pixel_spacing_str and pixel_spacing_str.strip(): try: pixel_spacing = float(pixel_spacing_str.strip()) except ValueError: pass intensity = _run_intensity_engine(image, modality, sensitivity, pixel_spacing) yolo = _run_yolo_engine(image, 0.25) ei = intensity["evans_index"] ei_y = yolo["evans_index"] if yolo else None ei_ensemble = round((ei * 0.6 + ei_y * 0.4), 4) if ei_y else ei triad = [gait, cognition, urinary] triad_count = sum(triad) score_input = { "evansIndex": ei_ensemble, "callosalAngle": intensity.get("callosal_angle"), "deshScore": intensity["desh_score"], "sylvianDilation": intensity["desh_sylvian"] > 0, "triad": triad, "corticalAtrophy": "unknown", } nph_result = _compute_nph_score(score_input) lines = [] lines.append("# NEURORADIOLOGY REPORT") lines.append("## Normal Pressure Hydrocephalus Assessment\n") lines.append("---\n") lines.append(f"**Patient ID:** {patient_id or 'Anonymous'}") lines.append(f"**Age:** {patient_age or 'Not specified'}") lines.append(f"**Date:** {datetime.now().strftime('%Y-%m-%d')}") lines.append(f"**Modality:** {modality}") lines.append(f"**Clinical History:** {clinical_history or 'Not provided'}\n") lines.append("---\n## CLINICAL PRESENTATION\n") symptoms = [] if gait: symptoms.append("gait disturbance") if cognition: symptoms.append("cognitive impairment") if urinary: symptoms.append("urinary incontinence") if symptoms: lines.append(f"Patient presents with {', '.join(symptoms)} ({triad_count}/3 Hakim triad components).") else: lines.append("No specific Hakim triad symptoms reported.") lines.append("\n---\n## FINDINGS\n") lines.append("### Ventricular System") ei_word = "abnormally enlarged" if ei_ensemble > 0.3 else "within normal limits" lines.append(f"The lateral ventricles are {ei_word} with an Evans' Index of **{ei_ensemble:.3f}** (normal < 0.3).") if intensity.get("frontal_horn_mm"): lines.append(f"Frontal horn width measures {intensity['frontal_horn_mm']} mm with a biparietal skull diameter of {intensity['skull_diameter_mm']} mm.") lines.append(f"Ventricle-to-brain parenchyma ratio is {intensity['vb_ratio']:.4f}.") if intensity["temporal_horn_px"] > 0: mm_str = f" ({intensity['temporal_horn_mm']} mm)" if intensity.get("temporal_horn_mm") else "" lines.append(f"\nThe temporal horns measure {intensity['temporal_horn_px']} px{mm_str}.") if intensity["third_ventricle_px"] > 0: mm_str = f" ({intensity['third_ventricle_mm']} mm)" if intensity.get("third_ventricle_mm") else "" lines.append(f"Third ventricle width is {intensity['third_ventricle_px']} px{mm_str}.") if intensity.get("callosal_angle") is not None: ca = intensity["callosal_angle"] ca_word = "acutely narrowed, consistent with NPH" if ca < 90 else "within normal range" lines.append(f"\nThe callosal angle measures {ca:.1f} degrees ({ca_word}).") lines.append("\n### DESH Assessment") desh_word = "present" if intensity["desh_positive"] else "not fully met" lines.append(f"DESH pattern is **{desh_word}** (score: {intensity['desh_score']}/6).") lines.append(f"- Ventriculomegaly: {intensity['desh_ventriculomegaly']}/2") lines.append(f"- Sylvian fissure dilation: {intensity['desh_sylvian']}/2") lines.append(f"- High convexity tightness: {intensity['desh_convexity']}/2") if intensity["pvh_grade"] is not None: pvh_desc = {0: "absent", 1: "pencil-thin periventricular rim", 2: "smooth periventricular halo", 3: "irregular extension into deep white matter"} lines.append(f"\n### Periventricular Changes") lines.append(f"PVH Grade **{intensity['pvh_grade']}/3**: {pvh_desc.get(intensity['pvh_grade'], '')}.") lines.append(f"\n### Image Quality") q = intensity["quality"] lines.append(f"Image quality is {q['grade'].lower()} (score: {q['overall']}/100). Symmetry index: {intensity['symmetry']}%.") if yolo: lines.append(f"\n### AI Structure Detection (YOLO)") lines.append(f"{yolo['n_detections']} structures detected:") for b in yolo["boxes"]: lines.append(f"- {b['class']}: {b['confidence']:.0%} confidence") lines.append(f"\n---\n## IMPRESSION\n") lines.append(f"**NPH Assessment Score: {nph_result['score']}/100 -- {nph_result['label']}**\n") findings = [] if ei_ensemble > 0.3: findings.append(f"ventriculomegaly (EI={ei_ensemble:.3f})") if intensity["desh_positive"]: findings.append("DESH pattern") if intensity["pvh_grade"] is not None and intensity["pvh_grade"] >= 2: findings.append(f"periventricular hyperintensities (Grade {intensity['pvh_grade']})") if intensity.get("callosal_angle") is not None and intensity["callosal_angle"] < 90: findings.append(f"acute callosal angle ({intensity['callosal_angle']:.0f} deg)") if triad_count >= 2: findings.append(f"clinical Hakim triad ({triad_count}/3)") if findings: lines.append(f"Key findings: {', '.join(findings)}.") lines.append(f"\n{nph_result['recommendation']}") lines.append(f"\n---\n*This report was generated by the NPH Diagnostic Platform (v3.0). ") lines.append(f"Measurements from JPEG/PNG images are approximate. For clinical decisions, ") lines.append(f"correlate with DICOM-derived measurements and clinical examination.*") lines.append(f"\n*Matheus Rech, MD | {datetime.now().strftime('%Y-%m-%d %H:%M')}*") return intensity["annotated"], "\n".join(lines) # =========================================================================== # Tabs 5-8: Filters & ML Models # =========================================================================== def apply_filter(image, effect, intensity): if image is None: raise gr.Error("Please upload an image first.") img = Image.fromarray(image) if effect == "Grayscale": filtered = ImageOps.grayscale(img).convert("RGB") if intensity < 1.0: filtered = Image.blend(img, filtered, intensity) elif effect == "Sepia": gray = ImageOps.grayscale(img) sepia = ImageOps.colorize(gray, "#704214", "#C0A080") filtered = Image.blend(img, sepia, intensity) elif effect == "Blur": filtered = img.filter(ImageFilter.GaussianBlur(radius=max(1, int(intensity * 10)))) elif effect == "Sharpen": filtered = ImageEnhance.Sharpness(img).enhance(1 + intensity * 4) elif effect == "Edge Detect": filtered = Image.blend(img, img.filter(ImageFilter.FIND_EDGES), intensity) elif effect == "Invert": filtered = Image.blend(img, ImageOps.invert(img.convert("RGB")), intensity) elif effect == "Brightness": filtered = ImageEnhance.Brightness(img).enhance(0.5 + intensity * 1.5) elif effect == "Contrast": filtered = ImageEnhance.Contrast(img).enhance(0.5 + intensity * 2) else: filtered = img return np.array(filtered) def classify_image(image): if image is None: raise gr.Error("Please upload an image first.") return {r["label"]: r["score"] for r in get_classifier()(Image.fromarray(image))} def detect_objects(image, threshold): if image is None: raise gr.Error("Please upload an image first.") results = get_detector()(Image.fromarray(image), threshold=threshold) return (image, [((r["box"]["xmin"], r["box"]["ymin"], r["box"]["xmax"], r["box"]["ymax"]), f"{r['label']} ({r['score']:.0%})") for r in results]) def segment_image(image): if image is None: raise gr.Error("Please upload an image first.") results = get_segmenter()(Image.fromarray(image)) return (image, [(np.array(r["mask"]), r["label"]) for r in results]) # =========================================================================== # Build the UI # =========================================================================== CUSTOM_CSS = """ .main-title { text-align: center; margin-bottom: 0.2em; } .subtitle { text-align: center; color: #666; margin-top: 0; font-size: 0.9em; } .engine-label { font-weight: 700; font-size: 0.85em; text-transform: uppercase; letter-spacing: 0.05em; } footer { display: none !important; } """ with gr.Blocks(theme=gr.themes.Soft(), css=CUSTOM_CSS, title="NPH Diagnostic Platform") as demo: gr.Markdown("# NPH Diagnostic Platform", elem_classes="main-title") gr.Markdown( "Dual-engine analysis (intensity segmentation + YOLO detection), ensemble scoring, " "multi-slice batch processing, clinical calculator, and structured report generation.", elem_classes="subtitle" ) # ========== Tab 1: Dual-Engine Analysis ========== with gr.Tab("Dual-Engine Analysis"): gr.Markdown( "### Two Engines, One Diagnosis\n" "Runs **intensity-based segmentation** AND **YOLO deep learning detection** on the same image, " "compares results side-by-side, and produces an **ensemble NPH score** (weighted 60/40)." ) with gr.Row(): with gr.Column(scale=1): de_input = gr.Image(label="Upload Brain Scan", type="numpy") de_modality = gr.Dropdown( choices=["Axial FLAIR", "Axial T1", "Axial T2", "Coronal T2", "Axial T2 FFE", "Sagittal T1", "CT Head"], value="Axial FLAIR", label="Modality / Sequence" ) de_sensitivity = gr.Slider(10, 90, value=50, step=5, label="Sensitivity (%)") de_yolo_conf = gr.Slider(0.1, 0.95, value=0.25, step=0.05, label="YOLO Confidence Threshold") de_spacing = gr.Textbox(label="Pixel Spacing (mm/px)", placeholder="auto-estimate", value="") de_btn = gr.Button("Run Dual-Engine Analysis", variant="primary", size="lg") with gr.Column(scale=2): with gr.Row(): with gr.Column(): gr.Markdown("**Intensity Engine**", elem_classes="engine-label") de_intensity_out = gr.Image(label="Segmentation Overlay", type="numpy") with gr.Column(): gr.Markdown("**YOLO Engine**", elem_classes="engine-label") de_yolo_out = gr.Image(label="Detection Overlay", type="numpy") de_report = gr.Markdown(label="Dual-Engine Report") de_btn.click( fn=dual_engine_analyze, inputs=[de_input, de_modality, de_sensitivity, de_spacing, de_yolo_conf], outputs=[de_intensity_out, de_yolo_out, de_report] ) with gr.Accordion("How Ensemble Scoring Works", open=False): gr.Markdown( "The ensemble combines both engines:\n\n" "- **Evans' Index**: Weighted average (60% intensity + 40% YOLO)\n" "- **DESH Pattern**: Takes the maximum score from either engine\n" "- **Sylvian Dilation**: Positive if either engine detects it\n\n" "This approach is more robust than either engine alone -- intensity segmentation " "is better at precise boundary delineation, while YOLO is better at detecting " "spatial patterns and multiple structures simultaneously." ) # ========== Tab 2: Multi-Slice Batch ========== with gr.Tab("Multi-Slice Batch"): gr.Markdown( "### Batch Analysis Across Multiple Slices\n" "Upload multiple axial slices from the same patient. Each slice is analyzed individually, " "then results are aggregated. The **worst-case slice** (highest Evans' Index) drives the NPH score." ) with gr.Row(): with gr.Column(): batch_files = gr.File( label="Upload Multiple Slices", file_count="multiple", file_types=["image"], ) batch_modality = gr.Dropdown( choices=["Axial FLAIR", "Axial T1", "Axial T2", "CT Head"], value="Axial FLAIR", label="Modality" ) batch_sensitivity = gr.Slider(10, 90, value=50, step=5, label="Sensitivity (%)") batch_btn = gr.Button("Analyze All Slices", variant="primary", size="lg") batch_report = gr.Markdown(label="Batch Report") batch_btn.click(fn=batch_analyze, inputs=[batch_files, batch_modality, batch_sensitivity], outputs=batch_report) # ========== Tab 3: NPH Score Calculator ========== with gr.Tab("NPH Score Calculator"): gr.Markdown( "### Clinical NPH Scoring Calculator\n" "Enter imaging biomarkers and clinical findings to compute a weighted NPH probability score." ) with gr.Row(): with gr.Column(): gr.Markdown("#### Imaging Biomarkers") calc_evans = gr.Slider(0.0, 0.6, value=0.30, step=0.01, label="Evans' Index") calc_callosal = gr.Textbox(label="Callosal Angle (degrees)", placeholder="e.g. 85", value="") calc_desh = gr.Slider(0, 3, value=0, step=1, label="DESH Score (0-3)") calc_sylvian = gr.Checkbox(label="Sylvian Fissure Dilation", value=False) calc_vsr = gr.Textbox(label="VSR", placeholder="e.g. 2.5", value="") with gr.Column(): gr.Markdown("#### Clinical Findings (Hakim Triad)") calc_gait = gr.Checkbox(label="Gait disturbance", value=False) calc_cognition = gr.Checkbox(label="Cognitive impairment", value=False) calc_urinary = gr.Checkbox(label="Urinary incontinence", value=False) gr.Markdown("#### Modifiers") calc_atrophy = gr.Radio(["None/Mild", "Moderate", "Significant"], value="None/Mild", label="Cortical Atrophy") calc_btn = gr.Button("Calculate NPH Score", variant="primary", size="lg") calc_report = gr.Markdown(label="Score Report") calc_btn.click( fn=compute_clinical_score, inputs=[calc_evans, calc_callosal, calc_desh, calc_sylvian, calc_vsr, calc_gait, calc_cognition, calc_urinary, calc_atrophy], outputs=calc_report ) # ========== Tab 4: Report Generator ========== with gr.Tab("Report Generator"): gr.Markdown( "### Structured Clinical Report\n" "Generates a formal neuroradiology-style NPH assessment report combining imaging analysis " "with clinical findings." ) with gr.Row(): with gr.Column(scale=1): rpt_input = gr.Image(label="Upload Brain Scan", type="numpy") rpt_modality = gr.Dropdown( choices=["Axial FLAIR", "Axial T1", "Axial T2", "Coronal T2", "CT Head"], value="Axial FLAIR", label="Modality" ) rpt_sensitivity = gr.Slider(10, 90, value=50, step=5, label="Sensitivity (%)") rpt_spacing = gr.Textbox(label="Pixel Spacing (mm/px)", placeholder="auto-estimate", value="") gr.Markdown("#### Patient Info") rpt_id = gr.Textbox(label="Patient ID", placeholder="Anonymous") rpt_age = gr.Textbox(label="Age", placeholder="e.g. 72") rpt_history = gr.Textbox(label="Clinical History", lines=2, placeholder="e.g. Progressive gait instability...") gr.Markdown("#### Hakim Triad") rpt_gait = gr.Checkbox(label="Gait disturbance", value=False) rpt_cognition = gr.Checkbox(label="Cognitive impairment", value=False) rpt_urinary = gr.Checkbox(label="Urinary incontinence", value=False) rpt_btn = gr.Button("Generate Report", variant="primary", size="lg") with gr.Column(scale=2): rpt_overlay = gr.Image(label="Segmentation", type="numpy") rpt_text = gr.Markdown(label="Clinical Report") rpt_btn.click( fn=generate_report, inputs=[rpt_input, rpt_modality, rpt_sensitivity, rpt_spacing, rpt_id, rpt_age, rpt_history, rpt_gait, rpt_cognition, rpt_urinary], outputs=[rpt_overlay, rpt_text] ) # ========== Tab 5: Browser NPH Detector ========== with gr.Tab("NPH Detector (Browser)"): gr.Markdown( "### Client-Side NPH Pipeline\n" "Runs entirely in your browser via JavaScript Canvas API. Zero server dependency." ) gr.HTML( value='', ) # ========== Tab 6: Video Demo ========== with gr.Tab("Video Demo"): gr.Markdown("### Whole-Brain Segmentation Demo") gr.Video(value="examples/hydromorph_whole_brain_segmentation.mp4", label="NPH Segmentation Video", autoplay=False) # ========== Tab 7: Filters ========== with gr.Tab("Filters & Effects"): with gr.Row(): with gr.Column(): filter_input = gr.Image(label="Upload Image", type="numpy") filter_effect = gr.Dropdown( choices=["Grayscale", "Sepia", "Blur", "Sharpen", "Edge Detect", "Invert", "Brightness", "Contrast"], value="Sepia", label="Effect" ) filter_intensity = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="Intensity") filter_btn = gr.Button("Apply Filter", variant="primary") with gr.Column(): filter_output = gr.Image(label="Result", type="numpy") filter_btn.click(fn=apply_filter, inputs=[filter_input, filter_effect, filter_intensity], outputs=filter_output) # ========== Tab 8: Classification ========== with gr.Tab("Image Classification"): with gr.Row(): with gr.Column(): cls_input = gr.Image(label="Upload Image", type="numpy") cls_btn = gr.Button("Classify", variant="primary") with gr.Column(): cls_output = gr.Label(label="Predictions", num_top_classes=5) cls_btn.click(fn=classify_image, inputs=cls_input, outputs=cls_output) # ========== Tab 9: Object Detection ========== with gr.Tab("Object Detection"): with gr.Row(): with gr.Column(): det_input = gr.Image(label="Upload Image", type="numpy") det_threshold = gr.Slider(0.1, 0.95, value=0.5, step=0.05, label="Confidence Threshold") det_btn = gr.Button("Detect Objects", variant="primary") with gr.Column(): det_output = gr.AnnotatedImage(label="Detections") det_btn.click(fn=detect_objects, inputs=[det_input, det_threshold], outputs=det_output) # ========== Tab 10: Segmentation ========== with gr.Tab("Segmentation"): with gr.Row(): with gr.Column(): seg_input = gr.Image(label="Upload Image", type="numpy") seg_btn = gr.Button("Segment", variant="primary") with gr.Column(): seg_output = gr.AnnotatedImage(label="Segmentation Map") seg_btn.click(fn=segment_image, inputs=seg_input, outputs=seg_output) gr.Markdown( "