mmrech's picture
Upload app.py with huggingface_hub
c85f5af verified
"""
NPH Diagnostic Platform v3.0
Unified Gradio app combining intensity segmentation, YOLO detection,
dual-engine comparison, ensemble scoring, clinical calculator,
multi-slice batch analysis, quality assessment, and report generation.
Author: Matheus Rech, MD
"""
import gradio as gr
import numpy as np
from PIL import Image, ImageFilter, ImageEnhance, ImageOps, ImageDraw
from transformers import pipeline
import cv2
import tempfile
import os
import threading
import logging
import time
import json
from datetime import datetime
from segment_neuroimaging import (
segment_nph, segment_ventricles, compute_evans_index,
compute_callosal_angle, compute_temporal_horn_width,
compute_third_ventricle_width, score_pvh, assess_desh,
create_overlay, add_annotations, create_comparison,
preprocess_image, create_roi_mask, morphological_cleanup,
filter_by_area, Modality, VENTRICLE_THRESHOLDS, CSF_MODE,
CSFAppearance, COLORS
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- ML models (lazy-loaded) ----
_classifier = None
_detector = None
_segmenter = None
def get_classifier():
global _classifier
if _classifier is None:
_classifier = pipeline("image-classification", model="google/vit-base-patch16-224")
return _classifier
def get_detector():
global _detector
if _detector is None:
_detector = pipeline("object-detection", model="facebook/detr-resnet-50")
return _detector
def get_segmenter():
global _segmenter
if _segmenter is None:
_segmenter = pipeline("image-segmentation", model="facebook/detr-resnet-50-panoptic")
return _segmenter
# ---- YOLO model ----
YOLO_MODEL_PATH = "best.pt"
_yolo_model = None
_yolo_lock = threading.Lock()
YOLO_COLORS = {
"ventricle": (0, 150, 255),
"sylvian_fissure": (200, 100, 255),
"tight_convexity": (255, 150, 100),
"pvh": (255, 200, 0),
"skull_inner": (200, 200, 200),
}
YOLO_COLOR_HEX = {
"ventricle": "#0096FF",
"sylvian_fissure": "#C864FF",
"tight_convexity": "#FF9664",
"pvh": "#FFC800",
"skull_inner": "#C8C8C8",
}
def _get_yolo_model():
global _yolo_model
if _yolo_model is None:
with _yolo_lock:
if _yolo_model is None and os.path.exists(YOLO_MODEL_PATH):
try:
from ultralytics import YOLO
_yolo_model = YOLO(YOLO_MODEL_PATH)
logger.info("YOLO model loaded from %s", YOLO_MODEL_PATH)
except Exception as e:
logger.error("Failed to load YOLO model: %s", e)
return _yolo_model
# ===========================================================================
# Shared: NPH scoring
# ===========================================================================
def _compute_nph_score(data: dict) -> dict:
"""Weighted NPH scoring: VSR(40%) + EI(25%) + CA(20%) + DESH(10%) + Sylvian(5%)."""
score = 0.0
evans = data.get("evansIndex") or 0.0
callosal = data.get("callosalAngle")
desh = data.get("deshScore") or 0
sylvian = bool(data.get("sylvianDilation"))
vsr = data.get("vsr")
triad = data.get("triad") or []
atrophy = data.get("corticalAtrophy") or "unknown"
has_vsr = vsr is not None
has_callosal = callosal is not None
if has_vsr:
if vsr > 2.0:
score += 40 * min((vsr - 2.0) / 2.0, 1)
if evans > 0.3:
score += 25 * min((evans - 0.3) / 0.15, 1)
if has_callosal and callosal < 90:
score += 20 * min((90 - callosal) / 50, 1)
score += (desh / 3) * 10
if sylvian:
score += 5
else:
scale = 100 / 60
if evans > 0.3:
score += 25 * scale * min((evans - 0.3) / 0.15, 1)
if has_callosal and callosal < 90:
score += 20 * scale * min((90 - callosal) / 50, 1)
score += (desh / 3) * 10 * scale
if sylvian:
score += 5 * scale
triad_count = sum(1 for v in triad if v)
if triad_count == 3:
score = min(score * 1.15, 100)
elif triad_count == 2:
score = min(score * 1.05, 100)
if atrophy == "significant":
score *= 0.7
elif atrophy == "moderate":
score *= 0.85
score = int(round(min(score, 100)))
if score >= 75:
label, color = "Probable NPH", "#ef4444"
rec = "Strongly consider CSF tap test and neurosurgical referral for VP shunt evaluation."
elif score >= 50:
label, color = "Possible NPH", "#f59e0b"
rec = "CSF tap test recommended. Consider supplementary MRI for DESH confirmation."
elif score >= 30:
label, color = "Low Suspicion", "#3b82f6"
rec = "NPH less likely. Consider alternative diagnoses. Follow-up imaging in 6 months."
else:
label, color = "Unlikely NPH", "#6b7280"
rec = "Ventriculomegaly likely ex-vacuo or other etiology. Investigate alternative causes."
return {"score": score, "label": label, "color": color, "recommendation": rec}
# ===========================================================================
# Shared: Image quality assessment
# ===========================================================================
def assess_quality(gray):
"""Return a quality dict: sharpness, contrast, noise, overall grade."""
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
contrast = float(gray.std())
noise_est = 0
if gray.shape[0] > 3 and gray.shape[1] > 3:
kernel = np.array([[1, -2, 1], [-2, 4, -2], [1, -2, 1]])
sigma = np.abs(cv2.filter2D(gray.astype(np.float64), -1, kernel)).sum()
noise_est = sigma * np.sqrt(0.5 * np.pi) / (6 * (gray.shape[0] - 2) * (gray.shape[1] - 2))
sharpness_score = min(100, laplacian_var / 5)
contrast_score = min(100, contrast * 2)
noise_score = max(0, 100 - noise_est * 10)
overall = (sharpness_score * 0.4 + contrast_score * 0.35 + noise_score * 0.25)
if overall >= 70:
grade = "Good"
elif overall >= 40:
grade = "Acceptable"
else:
grade = "Poor"
return {
"sharpness": round(sharpness_score, 1),
"contrast": round(contrast_score, 1),
"noise": round(noise_score, 1),
"overall": round(overall, 1),
"grade": grade,
}
def compute_symmetry_score(mask):
"""Score left-right symmetry of a binary mask (0-100)."""
h, w = mask.shape[:2]
mid = w // 2
left = mask[:, :mid]
right = np.fliplr(mask[:, mid:mid + left.shape[1]])
if left.shape != right.shape:
min_w = min(left.shape[1], right.shape[1])
left = left[:, :min_w]
right = right[:, :min_w]
intersection = np.logical_and(left > 0, right > 0).sum()
union = np.logical_or(left > 0, right > 0).sum()
if union == 0:
return 0.0
return round(intersection / union * 100, 1)
# ===========================================================================
# Tab 1: Dual-Engine NPH Analysis (the main innovation)
# ===========================================================================
def _run_intensity_engine(image, modality, sensitivity, pixel_spacing):
"""Run the intensity-based segmentation engine."""
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
Image.fromarray(image).save(f.name)
temp_path = f.name
try:
modality_map = {
"Axial FLAIR": "FLAIR", "Axial T1": "T1", "Axial T2": "T2",
"Coronal T2": "T2", "Axial T2 FFE": "T2", "Sagittal T1": "T1",
"CT Head": "CT_HEAD",
}
mod_key = modality_map.get(modality, "T1")
mod = Modality[mod_key]
is_coronal = "Coronal" in modality
img_rgb, gray, _ = preprocess_image(temp_path)
h, w = gray.shape[:2]
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
roi_mask = create_roi_mask(blurred, threshold=30)
orig_thresh = dict(VENTRICLE_THRESHOLDS[mod])
sens_adj = (sensitivity - 50) / 50.0
custom_thresholds = dict(orig_thresh)
if CSF_MODE[mod] == CSFAppearance.DARK:
custom_thresholds["csf_high"] = max(20, min(120, int(orig_thresh["csf_high"] + sens_adj * 30)))
else:
custom_thresholds["csf_low"] = max(100, min(220, int(orig_thresh["csf_low"] - sens_adj * 30)))
vent_mask = segment_ventricles(gray, mod, roi_mask, custom_thresholds=custom_thresholds)
if pixel_spacing is None:
pixel_spacing = round(180.0 / max(w, 256), 2)
ei_data = compute_evans_index(vent_mask, image_width=w, pixel_spacing_mm=pixel_spacing)
th_data = compute_temporal_horn_width(vent_mask, pixel_spacing)
tv_data = compute_third_ventricle_width(vent_mask, pixel_spacing)
desh_data = assess_desh(vent_mask, gray, roi_mask, mod, pixel_spacing)
pvh_data = score_pvh(gray, vent_mask) if mod == Modality.FLAIR else None
ca_data = compute_callosal_angle(vent_mask) if is_coronal else {}
vent_area = int((vent_mask > 0).sum())
brain_area = int((roi_mask > 0).sum())
vb_ratio = round(vent_area / brain_area, 4) if brain_area > 0 else 0
quality = assess_quality(gray)
symmetry = compute_symmetry_score(vent_mask)
# Build overlay
display_masks = {"ventricles": vent_mask}
parenchyma = cv2.bitwise_and(roi_mask, cv2.bitwise_not(vent_mask))
display_masks["parenchyma"] = parenchyma
if pvh_data and mod == Modality.FLAIR:
display_masks["pvh"] = pvh_data["pvh_mask"]
if "sylvian_mask" in desh_data:
display_masks["sylvian_fissures"] = desh_data["sylvian_mask"]
if "convexity_mask" in desh_data:
display_masks["high_convexity_sulci"] = desh_data["convexity_mask"]
overlay = create_overlay(img_rgb, display_masks, alpha=0.45)
biomarkers = dict(ei_data)
biomarkers.update(th_data)
if pvh_data:
biomarkers["pvh_grade"] = pvh_data["pvh_grade"]
biomarkers["is_desh_positive"] = desh_data["is_desh_positive"]
if ca_data.get("callosal_angle_deg") is not None:
biomarkers["callosal_angle_deg"] = ca_data["callosal_angle_deg"]
annotated = add_annotations(overlay, display_masks, f"{modality} -- Intensity Engine", biomarkers)
# Draw Evans' index line
row = ei_data.get("measurement_row", 0)
if row > 0:
cols = np.where(vent_mask[row, :] > 0)[0]
if len(cols) > 0:
minX, maxX = int(cols[0]), int(cols[-1])
cv2.line(annotated, (minX, row), (maxX, row), (255, 220, 0), 2)
skull_d = ei_data.get("skull_diameter_px", w)
cx = w // 2
hs = skull_d // 2
cv2.line(annotated, (cx - hs, row + 8), (cx + hs, row + 8), (200, 200, 200), 1)
return {
"annotated": annotated,
"evans_index": ei_data.get("evans_index", 0),
"frontal_horn_mm": ei_data.get("frontal_horn_width_mm"),
"skull_diameter_mm": ei_data.get("skull_diameter_mm"),
"temporal_horn_px": th_data.get("temporal_horn_width_px", 0),
"temporal_horn_mm": th_data.get("temporal_horn_width_mm"),
"third_ventricle_px": tv_data.get("third_ventricle_width_px", 0),
"third_ventricle_mm": tv_data.get("third_ventricle_width_mm"),
"vb_ratio": vb_ratio,
"vent_area": vent_area,
"brain_area": brain_area,
"desh_positive": desh_data.get("is_desh_positive", False),
"desh_score": desh_data.get("total_score", 0),
"desh_ventriculomegaly": desh_data.get("ventriculomegaly_score", 0),
"desh_sylvian": desh_data.get("sylvian_dilation_score", 0),
"desh_convexity": desh_data.get("convexity_tightness_score", 0),
"pvh_grade": pvh_data["pvh_grade"] if pvh_data else None,
"pvh_ratio": pvh_data["pvh_ratio"] if pvh_data else None,
"callosal_angle": ca_data.get("callosal_angle_deg"),
"quality": quality,
"symmetry": symmetry,
}
finally:
os.unlink(temp_path)
def _run_yolo_engine(image, conf_threshold=0.25):
"""Run the YOLO detection engine."""
model = _get_yolo_model()
if model is None:
return None
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
Image.fromarray(image).save(f.name)
temp_path = f.name
try:
results = model(temp_path, verbose=False)[0]
h, w = image.shape[:2]
annotated_img = image.copy()
boxes = []
for box in results.boxes:
conf = float(box.conf[0])
if conf < conf_threshold:
continue
x1, y1, x2, y2 = [int(round(v)) for v in box.xyxy[0].tolist()]
cls_id = int(box.cls[0])
cls_name = model.names.get(cls_id, str(cls_id))
color = YOLO_COLORS.get(cls_name, (255, 255, 255))
boxes.append({
"class": cls_name, "x1": x1, "y1": y1, "x2": x2, "y2": y2,
"confidence": round(conf, 4),
})
cv2.rectangle(annotated_img, (x1, y1), (x2, y2), color, 2)
label = f"{cls_name} {conf:.0%}"
(lw, lh), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(annotated_img, (x1, y1 - lh - 8), (x1 + lw + 4, y1), color, -1)
cv2.putText(annotated_img, label, (x1 + 2, y1 - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
# Derive metrics
ventricle = next((b for b in boxes if b["class"] == "ventricle"), None)
skull = next((b for b in boxes if b["class"] == "skull_inner"), None)
if ventricle and skull:
vent_w = ventricle["x2"] - ventricle["x1"]
skull_w = skull["x2"] - skull["x1"]
ei = round(vent_w / skull_w, 4) if skull_w > 0 else 0
elif ventricle:
ei = round((ventricle["x2"] - ventricle["x1"]) / w, 4)
else:
ei = 0
desh_classes = {"tight_convexity", "sylvian_fissure", "pvh"}
detected_desh = {b["class"] for b in boxes if b["class"] in desh_classes}
sylvian = any(b["class"] == "sylvian_fissure" for b in boxes)
pvh = any(b["class"] == "pvh" for b in boxes)
return {
"annotated": annotated_img,
"boxes": boxes,
"evans_index": ei,
"desh_score": len(detected_desh),
"sylvian_dilation": sylvian,
"pvh_detected": pvh,
"n_detections": len(boxes),
}
finally:
os.unlink(temp_path)
def dual_engine_analyze(image, modality, sensitivity, pixel_spacing_str, yolo_conf):
"""Run BOTH engines and produce comparison + ensemble score."""
if image is None:
raise gr.Error("Please upload a brain MRI or CT image first.")
pixel_spacing = None
if pixel_spacing_str and pixel_spacing_str.strip():
try:
pixel_spacing = float(pixel_spacing_str.strip())
except ValueError:
pass
# Run intensity engine
t0 = time.time()
intensity = _run_intensity_engine(image, modality, sensitivity, pixel_spacing)
t_intensity = round(time.time() - t0, 2)
# Run YOLO engine
t0 = time.time()
yolo = _run_yolo_engine(image, yolo_conf)
t_yolo = round(time.time() - t0, 2)
# Ensemble: average the Evans' Index from both engines
ei_intensity = intensity["evans_index"]
ei_yolo = yolo["evans_index"] if yolo else 0
if yolo:
ei_ensemble = round((ei_intensity * 0.6 + ei_yolo * 0.4), 4)
else:
ei_ensemble = ei_intensity
# DESH ensemble
desh_intensity_score = intensity["desh_score"]
desh_yolo_score = yolo["desh_score"] if yolo else 0
desh_ensemble = max(desh_intensity_score, desh_yolo_score)
sylvian_ensemble = intensity["desh_sylvian"] > 0 or (yolo and yolo["sylvian_dilation"])
# Compute ensemble NPH score
score_input = {
"evansIndex": ei_ensemble,
"callosalAngle": intensity.get("callosal_angle"),
"deshScore": desh_ensemble,
"sylvianDilation": sylvian_ensemble,
"vsr": None,
"triad": [],
"corticalAtrophy": "unknown",
}
nph_result = _compute_nph_score(score_input)
# Build report
q = intensity["quality"]
lines = []
lines.append("# Dual-Engine NPH Analysis Report\n")
lines.append(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')} | **Modality:** {modality}\n")
lines.append("---\n## Image Quality Assessment\n")
lines.append(f"| Metric | Score |")
lines.append(f"|---|---|")
lines.append(f"| Sharpness | {q['sharpness']}/100 |")
lines.append(f"| Contrast | {q['contrast']}/100 |")
lines.append(f"| Noise | {q['noise']}/100 |")
lines.append(f"| **Overall** | **{q['overall']}/100 ({q['grade']})** |")
lines.append(f"| Symmetry | {intensity['symmetry']}% |")
lines.append("\n---\n## Engine Comparison\n")
lines.append("| Metric | Intensity Engine | YOLO Engine | Ensemble |")
lines.append("|---|---|---|---|")
ei_i_status = "abnormal" if ei_intensity > 0.3 else "normal"
ei_y_status = ("abnormal" if ei_yolo > 0.3 else "normal") if yolo else "N/A"
ei_e_status = "ABNORMAL" if ei_ensemble > 0.3 else "normal"
lines.append(f"| Evans' Index | {ei_intensity:.3f} ({ei_i_status}) | {ei_yolo:.3f} ({ei_y_status}) | **{ei_ensemble:.3f} ({ei_e_status})** |")
lines.append(f"| DESH Score | {desh_intensity_score}/6 | {desh_yolo_score}/3 | {desh_ensemble} (max) |")
desh_pos_i = "Yes" if intensity["desh_positive"] else "No"
desh_pos_y = ("Yes" if yolo and yolo["desh_score"] >= 2 else "No") if yolo else "N/A"
lines.append(f"| DESH Positive | {desh_pos_i} | {desh_pos_y} | -- |")
if yolo:
lines.append(f"| Detections | -- | {yolo['n_detections']} objects | -- |")
pvh_str = f"Grade {intensity['pvh_grade']}/3" if intensity["pvh_grade"] is not None else "N/A (not FLAIR)"
pvh_y_str = ("Yes" if yolo and yolo["pvh_detected"] else "No") if yolo else "N/A"
lines.append(f"| PVH | {pvh_str} | {pvh_y_str} | -- |")
lines.append(f"| Processing Time | {t_intensity}s | {t_yolo}s | -- |")
lines.append("\n---\n## Intensity Engine Details\n")
if intensity.get("frontal_horn_mm"):
lines.append(f"- Frontal horn width: {intensity['frontal_horn_mm']} mm")
if intensity.get("skull_diameter_mm"):
lines.append(f"- Skull diameter: {intensity['skull_diameter_mm']} mm")
if intensity["temporal_horn_px"] > 0:
mm_str = f" ({intensity['temporal_horn_mm']} mm)" if intensity.get("temporal_horn_mm") else ""
lines.append(f"- Temporal horn width: {intensity['temporal_horn_px']} px{mm_str}")
if intensity["third_ventricle_px"] > 0:
mm_str = f" ({intensity['third_ventricle_mm']} mm)" if intensity.get("third_ventricle_mm") else ""
lines.append(f"- Third ventricle width: {intensity['third_ventricle_px']} px{mm_str}")
lines.append(f"- Ventricle/Brain ratio: {intensity['vb_ratio']:.4f} ({intensity['vent_area']}/{intensity['brain_area']} px)")
lines.append(f"- DESH breakdown: Vent={intensity['desh_ventriculomegaly']}/2, Sylvian={intensity['desh_sylvian']}/2, Convexity={intensity['desh_convexity']}/2")
if intensity.get("callosal_angle") is not None:
ca = intensity["callosal_angle"]
ca_str = "Suggestive" if ca < 90 else ("Indeterminate" if ca < 120 else "Normal")
lines.append(f"- Callosal angle: {ca:.1f} deg ({ca_str})")
if intensity["pvh_grade"] is not None:
lines.append(f"- PVH: Grade {intensity['pvh_grade']}/3 (ratio: {intensity['pvh_ratio']:.4f})")
if yolo:
lines.append("\n---\n## YOLO Detection Details\n")
for b in yolo["boxes"]:
bw = b["x2"] - b["x1"]
bh = b["y2"] - b["y1"]
lines.append(f"- **{b['class']}**: {b['confidence']:.1%} conf, {bw}x{bh} px at ({b['x1']},{b['y1']})")
lines.append(f"\n---\n## Ensemble NPH Score: **{nph_result['score']}/100 -- {nph_result['label']}**\n")
lines.append(f"*{nph_result['recommendation']}*")
report = "\n".join(lines)
yolo_img = yolo["annotated"] if yolo else np.zeros_like(image)
return intensity["annotated"], yolo_img, report
# ===========================================================================
# Tab 2: Multi-Slice Batch Analysis
# ===========================================================================
def batch_analyze(files, modality, sensitivity):
"""Analyze multiple slices and aggregate results."""
if not files:
raise gr.Error("Please upload one or more brain scan images.")
results = []
for f in files:
img = np.array(Image.open(f.name).convert("RGB"))
try:
r = _run_intensity_engine(img, modality, sensitivity, None)
results.append({
"file": os.path.basename(f.name),
"evans_index": r["evans_index"],
"desh_positive": r["desh_positive"],
"desh_score": r["desh_score"],
"vb_ratio": r["vb_ratio"],
"quality_grade": r["quality"]["grade"],
"symmetry": r["symmetry"],
})
except Exception as e:
results.append({
"file": os.path.basename(f.name),
"evans_index": 0,
"desh_positive": False,
"desh_score": 0,
"vb_ratio": 0,
"quality_grade": "Error",
"symmetry": 0,
"error": str(e),
})
# Aggregate
valid = [r for r in results if "error" not in r]
if not valid:
return "All slices failed to process."
ei_values = [r["evans_index"] for r in valid]
max_ei = max(ei_values)
max_ei_slice = valid[ei_values.index(max_ei)]["file"]
mean_ei = np.mean(ei_values)
any_desh = any(r["desh_positive"] for r in valid)
max_desh = max(r["desh_score"] for r in valid)
mean_vb = np.mean([r["vb_ratio"] for r in valid])
# Score using the max Evans' Index (worst slice = most diagnostic)
score_input = {
"evansIndex": max_ei,
"deshScore": min(max_desh, 3),
"sylvianDilation": any_desh,
"corticalAtrophy": "unknown",
}
nph_result = _compute_nph_score(score_input)
lines = ["# Multi-Slice NPH Analysis\n"]
lines.append(f"**Slices analyzed:** {len(valid)} / {len(results)}\n")
lines.append("---\n## Per-Slice Results\n")
lines.append("| Slice | Evans' Index | V/B Ratio | DESH | Quality | Symmetry |")
lines.append("|---|---|---|---|---|---|")
for r in results:
if "error" in r:
lines.append(f"| {r['file']} | ERROR | -- | -- | -- | -- |")
else:
ei_flag = " **" if r["evans_index"] > 0.3 else ""
desh_flag = "POS" if r["desh_positive"] else f"{r['desh_score']}/6"
lines.append(f"| {r['file']} | {r['evans_index']:.3f}{ei_flag} | {r['vb_ratio']:.4f} | {desh_flag} | {r['quality_grade']} | {r['symmetry']}% |")
lines.append(f"\n---\n## Aggregate Summary\n")
lines.append(f"- **Max Evans' Index:** {max_ei:.3f} (slice: {max_ei_slice})" + (" -- ABNORMAL" if max_ei > 0.3 else ""))
lines.append(f"- **Mean Evans' Index:** {mean_ei:.3f}")
lines.append(f"- **Mean V/B Ratio:** {mean_vb:.4f}")
lines.append(f"- **Max DESH Score:** {max_desh}/6")
lines.append(f"- **Any DESH Positive:** {'Yes' if any_desh else 'No'}")
lines.append(f"\n---\n## NPH Score: **{nph_result['score']}/100 -- {nph_result['label']}**\n")
lines.append(f"*Based on worst-case slice (max EI). {nph_result['recommendation']}*")
return "\n".join(lines)
# ===========================================================================
# Tab 3: Clinical Scoring Calculator
# ===========================================================================
def compute_clinical_score(
evans_index, callosal_angle_str, desh_score, sylvian_dilation, vsr_str,
gait, cognition, urinary, cortical_atrophy
):
callosal = None
if callosal_angle_str and callosal_angle_str.strip():
try:
callosal = float(callosal_angle_str.strip())
except ValueError:
pass
vsr = None
if vsr_str and vsr_str.strip():
try:
vsr = float(vsr_str.strip())
except ValueError:
pass
triad = [gait, cognition, urinary]
atrophy_map = {"None/Mild": "none", "Moderate": "moderate", "Significant": "significant"}
score_data = {
"evansIndex": evans_index, "callosalAngle": callosal,
"deshScore": int(desh_score), "sylvianDilation": sylvian_dilation,
"vsr": vsr, "triad": triad,
"corticalAtrophy": atrophy_map.get(cortical_atrophy, "unknown"),
}
result = _compute_nph_score(score_data)
lines = [f"# NPH Score: {result['score']}/100", f"## {result['label']}\n", f"{result['recommendation']}\n"]
lines.append("---\n### Input Summary\n")
lines.append(f"- **Evans' Index:** {evans_index:.3f}" + (" (>0.3 = abnormal)" if evans_index > 0.3 else ""))
if callosal is not None:
lines.append(f"- **Callosal Angle:** {callosal:.1f} deg")
lines.append(f"- **DESH Score:** {int(desh_score)}/3")
lines.append(f"- **Sylvian Dilation:** {'Yes' if sylvian_dilation else 'No'}")
if vsr is not None:
lines.append(f"- **VSR:** {vsr:.2f}" + (" (>2.0 = strong NPH indicator)" if vsr > 2.0 else ""))
triad_count = sum(triad)
lines.append(f"- **Hakim Triad:** {triad_count}/3")
lines.append(f"- **Cortical Atrophy:** {cortical_atrophy}")
return "\n".join(lines)
# ===========================================================================
# Tab 4: Report Generator
# ===========================================================================
def generate_report(image, modality, sensitivity, pixel_spacing_str,
patient_id, patient_age, clinical_history,
gait, cognition, urinary):
"""Generate a structured clinical radiology report."""
if image is None:
raise gr.Error("Please upload a brain scan first.")
pixel_spacing = None
if pixel_spacing_str and pixel_spacing_str.strip():
try:
pixel_spacing = float(pixel_spacing_str.strip())
except ValueError:
pass
intensity = _run_intensity_engine(image, modality, sensitivity, pixel_spacing)
yolo = _run_yolo_engine(image, 0.25)
ei = intensity["evans_index"]
ei_y = yolo["evans_index"] if yolo else None
ei_ensemble = round((ei * 0.6 + ei_y * 0.4), 4) if ei_y else ei
triad = [gait, cognition, urinary]
triad_count = sum(triad)
score_input = {
"evansIndex": ei_ensemble,
"callosalAngle": intensity.get("callosal_angle"),
"deshScore": intensity["desh_score"],
"sylvianDilation": intensity["desh_sylvian"] > 0,
"triad": triad,
"corticalAtrophy": "unknown",
}
nph_result = _compute_nph_score(score_input)
lines = []
lines.append("# NEURORADIOLOGY REPORT")
lines.append("## Normal Pressure Hydrocephalus Assessment\n")
lines.append("---\n")
lines.append(f"**Patient ID:** {patient_id or 'Anonymous'}")
lines.append(f"**Age:** {patient_age or 'Not specified'}")
lines.append(f"**Date:** {datetime.now().strftime('%Y-%m-%d')}")
lines.append(f"**Modality:** {modality}")
lines.append(f"**Clinical History:** {clinical_history or 'Not provided'}\n")
lines.append("---\n## CLINICAL PRESENTATION\n")
symptoms = []
if gait: symptoms.append("gait disturbance")
if cognition: symptoms.append("cognitive impairment")
if urinary: symptoms.append("urinary incontinence")
if symptoms:
lines.append(f"Patient presents with {', '.join(symptoms)} ({triad_count}/3 Hakim triad components).")
else:
lines.append("No specific Hakim triad symptoms reported.")
lines.append("\n---\n## FINDINGS\n")
lines.append("### Ventricular System")
ei_word = "abnormally enlarged" if ei_ensemble > 0.3 else "within normal limits"
lines.append(f"The lateral ventricles are {ei_word} with an Evans' Index of **{ei_ensemble:.3f}** (normal < 0.3).")
if intensity.get("frontal_horn_mm"):
lines.append(f"Frontal horn width measures {intensity['frontal_horn_mm']} mm with a biparietal skull diameter of {intensity['skull_diameter_mm']} mm.")
lines.append(f"Ventricle-to-brain parenchyma ratio is {intensity['vb_ratio']:.4f}.")
if intensity["temporal_horn_px"] > 0:
mm_str = f" ({intensity['temporal_horn_mm']} mm)" if intensity.get("temporal_horn_mm") else ""
lines.append(f"\nThe temporal horns measure {intensity['temporal_horn_px']} px{mm_str}.")
if intensity["third_ventricle_px"] > 0:
mm_str = f" ({intensity['third_ventricle_mm']} mm)" if intensity.get("third_ventricle_mm") else ""
lines.append(f"Third ventricle width is {intensity['third_ventricle_px']} px{mm_str}.")
if intensity.get("callosal_angle") is not None:
ca = intensity["callosal_angle"]
ca_word = "acutely narrowed, consistent with NPH" if ca < 90 else "within normal range"
lines.append(f"\nThe callosal angle measures {ca:.1f} degrees ({ca_word}).")
lines.append("\n### DESH Assessment")
desh_word = "present" if intensity["desh_positive"] else "not fully met"
lines.append(f"DESH pattern is **{desh_word}** (score: {intensity['desh_score']}/6).")
lines.append(f"- Ventriculomegaly: {intensity['desh_ventriculomegaly']}/2")
lines.append(f"- Sylvian fissure dilation: {intensity['desh_sylvian']}/2")
lines.append(f"- High convexity tightness: {intensity['desh_convexity']}/2")
if intensity["pvh_grade"] is not None:
pvh_desc = {0: "absent", 1: "pencil-thin periventricular rim", 2: "smooth periventricular halo", 3: "irregular extension into deep white matter"}
lines.append(f"\n### Periventricular Changes")
lines.append(f"PVH Grade **{intensity['pvh_grade']}/3**: {pvh_desc.get(intensity['pvh_grade'], '')}.")
lines.append(f"\n### Image Quality")
q = intensity["quality"]
lines.append(f"Image quality is {q['grade'].lower()} (score: {q['overall']}/100). Symmetry index: {intensity['symmetry']}%.")
if yolo:
lines.append(f"\n### AI Structure Detection (YOLO)")
lines.append(f"{yolo['n_detections']} structures detected:")
for b in yolo["boxes"]:
lines.append(f"- {b['class']}: {b['confidence']:.0%} confidence")
lines.append(f"\n---\n## IMPRESSION\n")
lines.append(f"**NPH Assessment Score: {nph_result['score']}/100 -- {nph_result['label']}**\n")
findings = []
if ei_ensemble > 0.3:
findings.append(f"ventriculomegaly (EI={ei_ensemble:.3f})")
if intensity["desh_positive"]:
findings.append("DESH pattern")
if intensity["pvh_grade"] is not None and intensity["pvh_grade"] >= 2:
findings.append(f"periventricular hyperintensities (Grade {intensity['pvh_grade']})")
if intensity.get("callosal_angle") is not None and intensity["callosal_angle"] < 90:
findings.append(f"acute callosal angle ({intensity['callosal_angle']:.0f} deg)")
if triad_count >= 2:
findings.append(f"clinical Hakim triad ({triad_count}/3)")
if findings:
lines.append(f"Key findings: {', '.join(findings)}.")
lines.append(f"\n{nph_result['recommendation']}")
lines.append(f"\n---\n*This report was generated by the NPH Diagnostic Platform (v3.0). ")
lines.append(f"Measurements from JPEG/PNG images are approximate. For clinical decisions, ")
lines.append(f"correlate with DICOM-derived measurements and clinical examination.*")
lines.append(f"\n*Matheus Rech, MD | {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
return intensity["annotated"], "\n".join(lines)
# ===========================================================================
# Tabs 5-8: Filters & ML Models
# ===========================================================================
def apply_filter(image, effect, intensity):
if image is None:
raise gr.Error("Please upload an image first.")
img = Image.fromarray(image)
if effect == "Grayscale":
filtered = ImageOps.grayscale(img).convert("RGB")
if intensity < 1.0: filtered = Image.blend(img, filtered, intensity)
elif effect == "Sepia":
gray = ImageOps.grayscale(img)
sepia = ImageOps.colorize(gray, "#704214", "#C0A080")
filtered = Image.blend(img, sepia, intensity)
elif effect == "Blur":
filtered = img.filter(ImageFilter.GaussianBlur(radius=max(1, int(intensity * 10))))
elif effect == "Sharpen":
filtered = ImageEnhance.Sharpness(img).enhance(1 + intensity * 4)
elif effect == "Edge Detect":
filtered = Image.blend(img, img.filter(ImageFilter.FIND_EDGES), intensity)
elif effect == "Invert":
filtered = Image.blend(img, ImageOps.invert(img.convert("RGB")), intensity)
elif effect == "Brightness":
filtered = ImageEnhance.Brightness(img).enhance(0.5 + intensity * 1.5)
elif effect == "Contrast":
filtered = ImageEnhance.Contrast(img).enhance(0.5 + intensity * 2)
else:
filtered = img
return np.array(filtered)
def classify_image(image):
if image is None:
raise gr.Error("Please upload an image first.")
return {r["label"]: r["score"] for r in get_classifier()(Image.fromarray(image))}
def detect_objects(image, threshold):
if image is None:
raise gr.Error("Please upload an image first.")
results = get_detector()(Image.fromarray(image), threshold=threshold)
return (image, [((r["box"]["xmin"], r["box"]["ymin"], r["box"]["xmax"], r["box"]["ymax"]),
f"{r['label']} ({r['score']:.0%})") for r in results])
def segment_image(image):
if image is None:
raise gr.Error("Please upload an image first.")
results = get_segmenter()(Image.fromarray(image))
return (image, [(np.array(r["mask"]), r["label"]) for r in results])
# ===========================================================================
# Build the UI
# ===========================================================================
CUSTOM_CSS = """
.main-title { text-align: center; margin-bottom: 0.2em; }
.subtitle { text-align: center; color: #666; margin-top: 0; font-size: 0.9em; }
.engine-label { font-weight: 700; font-size: 0.85em; text-transform: uppercase; letter-spacing: 0.05em; }
footer { display: none !important; }
"""
with gr.Blocks(theme=gr.themes.Soft(), css=CUSTOM_CSS, title="NPH Diagnostic Platform") as demo:
gr.Markdown("# NPH Diagnostic Platform", elem_classes="main-title")
gr.Markdown(
"Dual-engine analysis (intensity segmentation + YOLO detection), ensemble scoring, "
"multi-slice batch processing, clinical calculator, and structured report generation.",
elem_classes="subtitle"
)
# ========== Tab 1: Dual-Engine Analysis ==========
with gr.Tab("Dual-Engine Analysis"):
gr.Markdown(
"### Two Engines, One Diagnosis\n"
"Runs **intensity-based segmentation** AND **YOLO deep learning detection** on the same image, "
"compares results side-by-side, and produces an **ensemble NPH score** (weighted 60/40)."
)
with gr.Row():
with gr.Column(scale=1):
de_input = gr.Image(label="Upload Brain Scan", type="numpy")
de_modality = gr.Dropdown(
choices=["Axial FLAIR", "Axial T1", "Axial T2", "Coronal T2",
"Axial T2 FFE", "Sagittal T1", "CT Head"],
value="Axial FLAIR", label="Modality / Sequence"
)
de_sensitivity = gr.Slider(10, 90, value=50, step=5, label="Sensitivity (%)")
de_yolo_conf = gr.Slider(0.1, 0.95, value=0.25, step=0.05, label="YOLO Confidence Threshold")
de_spacing = gr.Textbox(label="Pixel Spacing (mm/px)", placeholder="auto-estimate", value="")
de_btn = gr.Button("Run Dual-Engine Analysis", variant="primary", size="lg")
with gr.Column(scale=2):
with gr.Row():
with gr.Column():
gr.Markdown("**Intensity Engine**", elem_classes="engine-label")
de_intensity_out = gr.Image(label="Segmentation Overlay", type="numpy")
with gr.Column():
gr.Markdown("**YOLO Engine**", elem_classes="engine-label")
de_yolo_out = gr.Image(label="Detection Overlay", type="numpy")
de_report = gr.Markdown(label="Dual-Engine Report")
de_btn.click(
fn=dual_engine_analyze,
inputs=[de_input, de_modality, de_sensitivity, de_spacing, de_yolo_conf],
outputs=[de_intensity_out, de_yolo_out, de_report]
)
with gr.Accordion("How Ensemble Scoring Works", open=False):
gr.Markdown(
"The ensemble combines both engines:\n\n"
"- **Evans' Index**: Weighted average (60% intensity + 40% YOLO)\n"
"- **DESH Pattern**: Takes the maximum score from either engine\n"
"- **Sylvian Dilation**: Positive if either engine detects it\n\n"
"This approach is more robust than either engine alone -- intensity segmentation "
"is better at precise boundary delineation, while YOLO is better at detecting "
"spatial patterns and multiple structures simultaneously."
)
# ========== Tab 2: Multi-Slice Batch ==========
with gr.Tab("Multi-Slice Batch"):
gr.Markdown(
"### Batch Analysis Across Multiple Slices\n"
"Upload multiple axial slices from the same patient. Each slice is analyzed individually, "
"then results are aggregated. The **worst-case slice** (highest Evans' Index) drives the NPH score."
)
with gr.Row():
with gr.Column():
batch_files = gr.File(
label="Upload Multiple Slices",
file_count="multiple",
file_types=["image"],
)
batch_modality = gr.Dropdown(
choices=["Axial FLAIR", "Axial T1", "Axial T2", "CT Head"],
value="Axial FLAIR", label="Modality"
)
batch_sensitivity = gr.Slider(10, 90, value=50, step=5, label="Sensitivity (%)")
batch_btn = gr.Button("Analyze All Slices", variant="primary", size="lg")
batch_report = gr.Markdown(label="Batch Report")
batch_btn.click(fn=batch_analyze, inputs=[batch_files, batch_modality, batch_sensitivity], outputs=batch_report)
# ========== Tab 3: NPH Score Calculator ==========
with gr.Tab("NPH Score Calculator"):
gr.Markdown(
"### Clinical NPH Scoring Calculator\n"
"Enter imaging biomarkers and clinical findings to compute a weighted NPH probability score."
)
with gr.Row():
with gr.Column():
gr.Markdown("#### Imaging Biomarkers")
calc_evans = gr.Slider(0.0, 0.6, value=0.30, step=0.01, label="Evans' Index")
calc_callosal = gr.Textbox(label="Callosal Angle (degrees)", placeholder="e.g. 85", value="")
calc_desh = gr.Slider(0, 3, value=0, step=1, label="DESH Score (0-3)")
calc_sylvian = gr.Checkbox(label="Sylvian Fissure Dilation", value=False)
calc_vsr = gr.Textbox(label="VSR", placeholder="e.g. 2.5", value="")
with gr.Column():
gr.Markdown("#### Clinical Findings (Hakim Triad)")
calc_gait = gr.Checkbox(label="Gait disturbance", value=False)
calc_cognition = gr.Checkbox(label="Cognitive impairment", value=False)
calc_urinary = gr.Checkbox(label="Urinary incontinence", value=False)
gr.Markdown("#### Modifiers")
calc_atrophy = gr.Radio(["None/Mild", "Moderate", "Significant"], value="None/Mild", label="Cortical Atrophy")
calc_btn = gr.Button("Calculate NPH Score", variant="primary", size="lg")
calc_report = gr.Markdown(label="Score Report")
calc_btn.click(
fn=compute_clinical_score,
inputs=[calc_evans, calc_callosal, calc_desh, calc_sylvian, calc_vsr,
calc_gait, calc_cognition, calc_urinary, calc_atrophy],
outputs=calc_report
)
# ========== Tab 4: Report Generator ==========
with gr.Tab("Report Generator"):
gr.Markdown(
"### Structured Clinical Report\n"
"Generates a formal neuroradiology-style NPH assessment report combining imaging analysis "
"with clinical findings."
)
with gr.Row():
with gr.Column(scale=1):
rpt_input = gr.Image(label="Upload Brain Scan", type="numpy")
rpt_modality = gr.Dropdown(
choices=["Axial FLAIR", "Axial T1", "Axial T2", "Coronal T2", "CT Head"],
value="Axial FLAIR", label="Modality"
)
rpt_sensitivity = gr.Slider(10, 90, value=50, step=5, label="Sensitivity (%)")
rpt_spacing = gr.Textbox(label="Pixel Spacing (mm/px)", placeholder="auto-estimate", value="")
gr.Markdown("#### Patient Info")
rpt_id = gr.Textbox(label="Patient ID", placeholder="Anonymous")
rpt_age = gr.Textbox(label="Age", placeholder="e.g. 72")
rpt_history = gr.Textbox(label="Clinical History", lines=2, placeholder="e.g. Progressive gait instability...")
gr.Markdown("#### Hakim Triad")
rpt_gait = gr.Checkbox(label="Gait disturbance", value=False)
rpt_cognition = gr.Checkbox(label="Cognitive impairment", value=False)
rpt_urinary = gr.Checkbox(label="Urinary incontinence", value=False)
rpt_btn = gr.Button("Generate Report", variant="primary", size="lg")
with gr.Column(scale=2):
rpt_overlay = gr.Image(label="Segmentation", type="numpy")
rpt_text = gr.Markdown(label="Clinical Report")
rpt_btn.click(
fn=generate_report,
inputs=[rpt_input, rpt_modality, rpt_sensitivity, rpt_spacing,
rpt_id, rpt_age, rpt_history, rpt_gait, rpt_cognition, rpt_urinary],
outputs=[rpt_overlay, rpt_text]
)
# ========== Tab 5: Browser NPH Detector ==========
with gr.Tab("NPH Detector (Browser)"):
gr.Markdown(
"### Client-Side NPH Pipeline\n"
"Runs entirely in your browser via JavaScript Canvas API. Zero server dependency."
)
gr.HTML(
value='<iframe src="https://mmrech-nph-detector-js.hf.space" '
'width="100%" height="900" frameborder="0" '
'allow="clipboard-write" '
'style="border-radius: 12px; border: 1px solid #333;"></iframe>',
)
# ========== Tab 6: Video Demo ==========
with gr.Tab("Video Demo"):
gr.Markdown("### Whole-Brain Segmentation Demo")
gr.Video(value="examples/hydromorph_whole_brain_segmentation.mp4", label="NPH Segmentation Video", autoplay=False)
# ========== Tab 7: Filters ==========
with gr.Tab("Filters & Effects"):
with gr.Row():
with gr.Column():
filter_input = gr.Image(label="Upload Image", type="numpy")
filter_effect = gr.Dropdown(
choices=["Grayscale", "Sepia", "Blur", "Sharpen", "Edge Detect", "Invert", "Brightness", "Contrast"],
value="Sepia", label="Effect"
)
filter_intensity = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="Intensity")
filter_btn = gr.Button("Apply Filter", variant="primary")
with gr.Column():
filter_output = gr.Image(label="Result", type="numpy")
filter_btn.click(fn=apply_filter, inputs=[filter_input, filter_effect, filter_intensity], outputs=filter_output)
# ========== Tab 8: Classification ==========
with gr.Tab("Image Classification"):
with gr.Row():
with gr.Column():
cls_input = gr.Image(label="Upload Image", type="numpy")
cls_btn = gr.Button("Classify", variant="primary")
with gr.Column():
cls_output = gr.Label(label="Predictions", num_top_classes=5)
cls_btn.click(fn=classify_image, inputs=cls_input, outputs=cls_output)
# ========== Tab 9: Object Detection ==========
with gr.Tab("Object Detection"):
with gr.Row():
with gr.Column():
det_input = gr.Image(label="Upload Image", type="numpy")
det_threshold = gr.Slider(0.1, 0.95, value=0.5, step=0.05, label="Confidence Threshold")
det_btn = gr.Button("Detect Objects", variant="primary")
with gr.Column():
det_output = gr.AnnotatedImage(label="Detections")
det_btn.click(fn=detect_objects, inputs=[det_input, det_threshold], outputs=det_output)
# ========== Tab 10: Segmentation ==========
with gr.Tab("Segmentation"):
with gr.Row():
with gr.Column():
seg_input = gr.Image(label="Upload Image", type="numpy")
seg_btn = gr.Button("Segment", variant="primary")
with gr.Column():
seg_output = gr.AnnotatedImage(label="Segmentation Map")
seg_btn.click(fn=segment_image, inputs=seg_input, outputs=seg_output)
gr.Markdown(
"<center style='color: #888; font-size: 0.75em; margin-top: 20px;'>"
"NPH Diagnostic Platform v3.0 | Matheus Rech, MD | "
"Built with Gradio + YOLO + Transformers"
"</center>"
)
demo.launch()