Spaces:
Sleeping
Sleeping
| """ | |
| NPH Neuroimaging Segmentation Module | |
| Segmentation and quantitative biomarker analysis for Normal Pressure Hydrocephalus | |
| Supports CT Head, MRI T1, T2, FLAIR. | |
| Computes: Evans' index, callosal angle, temporal horn width, z-Evans index, | |
| DESH pattern assessment, periventricular hyperintensity scoring. | |
| Author: Matheus Rech, MD | |
| Version: 2.0.0 (NPH-focused) | |
| """ | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| from typing import Dict, List, Tuple, Optional, Union | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import warnings | |
| # --------------------------------------------------------------------------- | |
| # Enums and data classes | |
| # --------------------------------------------------------------------------- | |
| class Modality(Enum): | |
| CT_HEAD = "ct_head" | |
| T1 = "t1_weighted" | |
| T1_GD = "t1_gadolinium" | |
| T2 = "t2_weighted" | |
| FLAIR = "flair" | |
| class CSFAppearance(Enum): | |
| """How CSF appears on each modality (needed for correct thresholding).""" | |
| DARK = "dark" # CT, T1, FLAIR | |
| BRIGHT = "bright" # T2 | |
| class SegmentationResult: | |
| """Container for NPH segmentation results.""" | |
| masks: Dict[str, np.ndarray] | |
| overlay: np.ndarray | |
| contours: Dict[str, List] = field(default_factory=dict) | |
| metadata: Dict = field(default_factory=dict) | |
| # --------------------------------------------------------------------------- | |
| # NPH color palette | |
| # --------------------------------------------------------------------------- | |
| COLORS = { | |
| "lateral_ventricles": (0, 150, 255), | |
| "ventricles": (0, 150, 255), | |
| "third_ventricle": (0, 100, 200), | |
| "temporal_horns": (0, 200, 255), | |
| "csf": (0, 150, 255), | |
| "parenchyma": (100, 200, 100), | |
| "pvh": (255, 200, 0), | |
| "periventricular_hyperintensity": (255, 200, 0), | |
| "sylvian_fissures": (200, 100, 255), | |
| "high_convexity_sulci": (255, 150, 100), | |
| "skull": (255, 255, 200), | |
| "bone": (255, 255, 200), | |
| "aqueductal_flow_void": (255, 80, 80), | |
| "transependymal_flow": (255, 180, 0), | |
| "subdural_collection": (180, 60, 60), | |
| "hemorrhage": (200, 50, 100), | |
| "white_matter": (180, 180, 140), | |
| "gray_matter": (140, 160, 140), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Modality-specific CSF behavior | |
| # --------------------------------------------------------------------------- | |
| CSF_MODE = { | |
| Modality.CT_HEAD: CSFAppearance.DARK, | |
| Modality.T1: CSFAppearance.DARK, | |
| Modality.T1_GD: CSFAppearance.DARK, | |
| Modality.T2: CSFAppearance.BRIGHT, | |
| Modality.FLAIR: CSFAppearance.DARK, | |
| } | |
| # Default 8-bit thresholds for ventricle segmentation per modality | |
| VENTRICLE_THRESHOLDS = { | |
| Modality.CT_HEAD: {"csf_low": 0, "csf_high": 55}, # Dark on brain window | |
| Modality.T1: {"csf_low": 0, "csf_high": 45}, # Hypointense | |
| Modality.T1_GD: {"csf_low": 0, "csf_high": 45}, | |
| Modality.T2: {"csf_low": 170, "csf_high": 255}, # Hyperintense | |
| Modality.FLAIR: {"csf_low": 0, "csf_high": 50}, # Suppressed | |
| } | |
| # Periventricular hyperintensity thresholds (FLAIR only) | |
| PVH_THRESHOLD = 145 # Pixel intensity above which = hyperintense on FLAIR | |
| # CT Hounsfield windows | |
| CT_WINDOWS = { | |
| "brain": (40, 80), | |
| "subdural": (75, 215), | |
| "bone": (400, 1800), | |
| } | |
| # =========================================================================== | |
| # IMAGE LOADING AND PREPROCESSING | |
| # =========================================================================== | |
| def preprocess_image(image_path: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """ | |
| Load and preprocess image. | |
| Returns: | |
| (original_rgb, grayscale, blurred) | |
| """ | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| raise FileNotFoundError(f"Could not read image: {image_path}") | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| return img_rgb, gray, blurred | |
| def apply_ct_window(hu_image: np.ndarray, center: float, width: float) -> np.ndarray: | |
| """Apply CT windowing: HU values to 8-bit grayscale.""" | |
| low = center - width / 2.0 | |
| high = center + width / 2.0 | |
| windowed = np.clip(hu_image, low, high) | |
| return ((windowed - low) / (high - low) * 255.0).astype(np.uint8) | |
| def load_dicom(filepath: str) -> Tuple[np.ndarray, dict]: | |
| """ | |
| Load DICOM and return HU image + metadata. | |
| Requires pydicom. | |
| """ | |
| try: | |
| import pydicom | |
| except ImportError: | |
| raise ImportError("pydicom required for DICOM. Install: pip install pydicom") | |
| ds = pydicom.dcmread(filepath) | |
| pixel_array = ds.pixel_array.astype(np.float64) | |
| slope = float(getattr(ds, "RescaleSlope", 1)) | |
| intercept = float(getattr(ds, "RescaleIntercept", 0)) | |
| hu = (pixel_array * slope + intercept).astype(np.int16) | |
| spacing = list(getattr(ds, "PixelSpacing", [1.0, 1.0])) | |
| meta = { | |
| "patient_id": str(getattr(ds, "PatientID", "")), | |
| "modality": str(getattr(ds, "Modality", "")), | |
| "series_description": str(getattr(ds, "SeriesDescription", "")), | |
| "slice_thickness": float(getattr(ds, "SliceThickness", 0)), | |
| "pixel_spacing_mm": [float(s) for s in spacing], | |
| "rows": int(ds.Rows), | |
| "columns": int(ds.Columns), | |
| } | |
| return hu, meta | |
| # =========================================================================== | |
| # CORE SEGMENTATION PRIMITIVES | |
| # =========================================================================== | |
| def create_roi_mask(blurred: np.ndarray, threshold: int = 15) -> np.ndarray: | |
| """Create ROI mask excluding background.""" | |
| _, roi = cv2.threshold(blurred, threshold, 255, cv2.THRESH_BINARY) | |
| kernel = np.ones((10, 10), np.uint8) | |
| roi = cv2.morphologyEx(roi, cv2.MORPH_CLOSE, kernel, iterations=3) | |
| return roi | |
| def morphological_cleanup( | |
| mask: np.ndarray, | |
| kernel_size: int = 5, | |
| close_iter: int = 2, | |
| open_iter: int = 2, | |
| ) -> np.ndarray: | |
| """Morphological close then open.""" | |
| kernel = np.ones((kernel_size, kernel_size), np.uint8) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=close_iter) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=open_iter) | |
| return mask | |
| def filter_by_area(mask: np.ndarray, min_area: int = 300) -> np.ndarray: | |
| """Remove small connected components.""" | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| filtered = np.zeros_like(mask) | |
| for cnt in contours: | |
| if cv2.contourArea(cnt) > min_area: | |
| cv2.drawContours(filtered, [cnt], -1, 255, -1) | |
| return filtered | |
| def segment_adaptive( | |
| gray: np.ndarray, | |
| block_size: int = 51, | |
| C: int = 10, | |
| roi_mask: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """Adaptive thresholding for field inhomogeneity.""" | |
| if block_size % 2 == 0: | |
| block_size += 1 | |
| mask = cv2.adaptiveThreshold( | |
| gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, block_size, C | |
| ) | |
| if roi_mask is not None: | |
| mask = cv2.bitwise_and(mask, roi_mask) | |
| return mask | |
| def segment_otsu( | |
| gray: np.ndarray, | |
| roi_mask: Optional[np.ndarray] = None, | |
| ) -> Tuple[np.ndarray, float]: | |
| """Otsu automatic thresholding. Returns (mask, threshold_value).""" | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| val, mask = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| if roi_mask is not None: | |
| mask = cv2.bitwise_and(mask, roi_mask) | |
| return mask, float(val) | |
| def region_growing( | |
| gray: np.ndarray, | |
| seed: Tuple[int, int], | |
| tolerance: int = 15, | |
| roi_mask: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """Region growing from seed point within intensity tolerance.""" | |
| h, w = gray.shape[:2] | |
| sx, sy = seed | |
| seed_val = int(gray[sy, sx]) | |
| low, high = max(0, seed_val - tolerance), min(255, seed_val + tolerance) | |
| visited = np.zeros((h, w), dtype=np.uint8) | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| neighbors = [(-1, -1), (-1, 0), (-1, 1), (0, -1), | |
| (0, 1), (1, -1), (1, 0), (1, 1)] | |
| stack = [(sx, sy)] | |
| visited[sy, sx] = 1 | |
| while stack: | |
| cx, cy = stack.pop() | |
| val = int(gray[cy, cx]) | |
| if low <= val <= high: | |
| if roi_mask is not None and roi_mask[cy, cx] == 0: | |
| continue | |
| mask[cy, cx] = 255 | |
| for dx, dy in neighbors: | |
| nx, ny = cx + dx, cy + dy | |
| if 0 <= nx < w and 0 <= ny < h and visited[ny, nx] == 0: | |
| visited[ny, nx] = 1 | |
| stack.append((nx, ny)) | |
| return mask | |
| def watershed_segment( | |
| gray: np.ndarray, | |
| roi_mask: Optional[np.ndarray] = None, | |
| min_distance: int = 20, | |
| threshold_ratio: float = 0.5, | |
| ) -> Tuple[np.ndarray, int]: | |
| """Watershed segmentation. Returns (label_image, num_labels).""" | |
| if roi_mask is None: | |
| _, roi_mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| dist = cv2.distanceTransform(roi_mask, cv2.DIST_L2, 5) | |
| _, sure_fg = cv2.threshold(dist, threshold_ratio * dist.max(), 255, 0) | |
| sure_fg = sure_fg.astype(np.uint8) | |
| kernel = np.ones((3, 3), np.uint8) | |
| sure_bg = cv2.dilate(roi_mask, kernel, iterations=3) | |
| unknown = cv2.subtract(sure_bg, sure_fg) | |
| num_labels, markers = cv2.connectedComponents(sure_fg) | |
| markers = markers + 1 | |
| markers[unknown == 255] = 0 | |
| img_color = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) | |
| markers = cv2.watershed(img_color, markers) | |
| return markers, num_labels | |
| def detect_edges_canny( | |
| gray: np.ndarray, low: int = 50, high: int = 150, | |
| roi_mask: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """Canny edge detection.""" | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| edges = cv2.Canny(blurred, low, high) | |
| if roi_mask is not None: | |
| edges = cv2.bitwise_and(edges, roi_mask) | |
| return edges | |
| def detect_edges_sobel( | |
| gray: np.ndarray, ksize: int = 3, | |
| roi_mask: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """Sobel gradient magnitude (normalized to uint8).""" | |
| blur = cv2.GaussianBlur(gray, (5, 5), 0) | |
| gx = cv2.Sobel(blur, cv2.CV_64F, 1, 0, ksize=ksize) | |
| gy = cv2.Sobel(blur, cv2.CV_64F, 0, 1, ksize=ksize) | |
| mag = np.sqrt(gx ** 2 + gy ** 2) | |
| mag = (mag / mag.max() * 255).astype(np.uint8) if mag.max() > 0 else mag.astype(np.uint8) | |
| if roi_mask is not None: | |
| mag = cv2.bitwise_and(mag, mag, mask=roi_mask) | |
| return mag | |
| # =========================================================================== | |
| # VENTRICLE SEGMENTATION | |
| # =========================================================================== | |
| def segment_ventricles( | |
| gray: np.ndarray, | |
| modality: Modality, | |
| roi_mask: Optional[np.ndarray] = None, | |
| custom_thresholds: Optional[Dict] = None, | |
| ) -> np.ndarray: | |
| """ | |
| Segment ventricular CSF on any supported modality. | |
| Args: | |
| gray: Grayscale image (uint8) | |
| modality: Imaging modality | |
| roi_mask: Optional brain ROI mask | |
| custom_thresholds: Optional dict with 'csf_low' and 'csf_high' | |
| Returns: | |
| Binary ventricle mask (uint8, 0 or 255) | |
| """ | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| if roi_mask is None: | |
| roi_mask = create_roi_mask(blurred, threshold=15) | |
| thresh = custom_thresholds or VENTRICLE_THRESHOLDS[modality] | |
| csf_low = thresh["csf_low"] | |
| csf_high = thresh["csf_high"] | |
| csf_mode = CSF_MODE[modality] | |
| if csf_mode == CSFAppearance.DARK: | |
| # CSF is dark: threshold for low intensities | |
| mask = cv2.inRange(blurred, csf_low, csf_high) | |
| else: | |
| # CSF is bright (T2): threshold for high intensities | |
| mask = cv2.inRange(blurred, csf_low, csf_high) | |
| mask = cv2.bitwise_and(mask, roi_mask) | |
| mask = morphological_cleanup(mask, kernel_size=5, close_iter=3, open_iter=2) | |
| mask = filter_by_area(mask, min_area=300) | |
| return mask | |
| def segment_skull( | |
| gray: np.ndarray, | |
| threshold: int = 200, | |
| ) -> np.ndarray: | |
| """ | |
| Segment inner skull boundary for diameter measurement. | |
| For CT (bright bone) or any modality with visible skull. | |
| """ | |
| _, bone_mask = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY) | |
| bone_mask = morphological_cleanup(bone_mask, kernel_size=7, close_iter=3, open_iter=1) | |
| bone_mask = filter_by_area(bone_mask, min_area=1000) | |
| return bone_mask | |
| # =========================================================================== | |
| # NPH BIOMARKER COMPUTATIONS | |
| # =========================================================================== | |
| def compute_evans_index( | |
| ventricle_mask: np.ndarray, | |
| skull_mask: Optional[np.ndarray] = None, | |
| image_width: Optional[int] = None, | |
| pixel_spacing_mm: Optional[float] = None, | |
| ) -> Dict: | |
| """ | |
| Compute Evans' Index from ventricle and skull masks. | |
| If skull_mask is not available, uses image_width as proxy for | |
| maximum skull diameter (less accurate). | |
| Args: | |
| ventricle_mask: Binary ventricle mask (uint8) | |
| skull_mask: Optional binary skull mask | |
| image_width: Fallback for skull diameter (image pixel width) | |
| pixel_spacing_mm: Optional pixel spacing for mm conversion | |
| Returns: | |
| Dict with 'evans_index', 'frontal_horn_width_px', | |
| 'skull_diameter_px', and optionally '_mm' variants | |
| """ | |
| h, w = ventricle_mask.shape[:2] | |
| # Find the row with maximum horizontal ventricle extent | |
| # (approximates the axial level of max frontal horn width) | |
| max_frontal_width = 0 | |
| max_row = 0 | |
| for row in range(h): | |
| cols = np.where(ventricle_mask[row, :] > 0)[0] | |
| if len(cols) > 0: | |
| width = cols[-1] - cols[0] | |
| if width > max_frontal_width: | |
| max_frontal_width = width | |
| max_row = row | |
| # Skull diameter | |
| if skull_mask is not None: | |
| # Find max horizontal extent of non-skull (inner diameter) at the same row range | |
| # Use the skull mask to find inner table boundaries | |
| skull_row = skull_mask[max_row, :] | |
| skull_cols = np.where(skull_row > 0)[0] | |
| if len(skull_cols) > 1: | |
| skull_diameter = skull_cols[-1] - skull_cols[0] | |
| else: | |
| skull_diameter = image_width or w | |
| else: | |
| # Fallback: use the brain ROI width or image width | |
| skull_diameter = image_width or w | |
| if skull_diameter == 0: | |
| skull_diameter = w | |
| evans_index = max_frontal_width / skull_diameter | |
| result = { | |
| "evans_index": round(evans_index, 4), | |
| "frontal_horn_width_px": max_frontal_width, | |
| "skull_diameter_px": skull_diameter, | |
| "measurement_row": max_row, | |
| } | |
| if pixel_spacing_mm is not None: | |
| result["frontal_horn_width_mm"] = round(max_frontal_width * pixel_spacing_mm, 2) | |
| result["skull_diameter_mm"] = round(skull_diameter * pixel_spacing_mm, 2) | |
| return result | |
| def compute_callosal_angle( | |
| ventricle_mask: np.ndarray, | |
| ) -> Dict: | |
| """ | |
| Estimate callosal angle from a coronal ventricle mask. | |
| Finds the two lateral ventricle peaks and the midline apex, | |
| then computes the angle between the two roof lines. | |
| Args: | |
| ventricle_mask: Binary ventricle mask on a coronal slice (uint8) | |
| Returns: | |
| Dict with 'callosal_angle_deg', 'apex_point', 'left_point', 'right_point' | |
| """ | |
| h, w = ventricle_mask.shape[:2] | |
| midline_x = w // 2 | |
| # Find the topmost ventricle row (highest point of ventricles) | |
| rows_with_csf = np.where(ventricle_mask.any(axis=1))[0] | |
| if len(rows_with_csf) == 0: | |
| return {"callosal_angle_deg": None, "error": "No ventricles detected"} | |
| top_row = rows_with_csf[0] | |
| # Find apex: topmost point near midline | |
| midline_band = ventricle_mask[:, max(0, midline_x - 20):min(w, midline_x + 20)] | |
| midline_rows = np.where(midline_band.any(axis=1))[0] | |
| if len(midline_rows) == 0: | |
| # No midline CSF -- use topmost row | |
| apex_y = top_row | |
| apex_x = midline_x | |
| else: | |
| apex_y = midline_rows[0] | |
| apex_col_in_band = np.where(midline_band[apex_y, :] > 0)[0] | |
| apex_x = midline_x - 20 + int(np.mean(apex_col_in_band)) | |
| # Find the topmost point of left ventricle (left of midline) | |
| left_mask = ventricle_mask[:, :midline_x] | |
| left_rows = np.where(left_mask.any(axis=1))[0] | |
| if len(left_rows) == 0: | |
| return {"callosal_angle_deg": None, "error": "Left ventricle not detected"} | |
| left_top_row = left_rows[0] | |
| left_cols = np.where(left_mask[left_top_row, :] > 0)[0] | |
| left_x = int(np.mean(left_cols)) | |
| left_y = left_top_row | |
| # Find the topmost point of right ventricle (right of midline) | |
| right_mask = ventricle_mask[:, midline_x:] | |
| right_rows = np.where(right_mask.any(axis=1))[0] | |
| if len(right_rows) == 0: | |
| return {"callosal_angle_deg": None, "error": "Right ventricle not detected"} | |
| right_top_row = right_rows[0] | |
| right_cols = np.where(right_mask[right_top_row, :] > 0)[0] | |
| right_x = midline_x + int(np.mean(right_cols)) | |
| right_y = right_top_row | |
| # Compute angle at apex between the two lines | |
| vec_left = np.array([left_x - apex_x, left_y - apex_y], dtype=float) | |
| vec_right = np.array([right_x - apex_x, right_y - apex_y], dtype=float) | |
| norm_left = np.linalg.norm(vec_left) | |
| norm_right = np.linalg.norm(vec_right) | |
| if norm_left == 0 or norm_right == 0: | |
| return {"callosal_angle_deg": None, "error": "Degenerate geometry"} | |
| cos_angle = np.dot(vec_left, vec_right) / (norm_left * norm_right) | |
| cos_angle = np.clip(cos_angle, -1.0, 1.0) | |
| angle_deg = np.degrees(np.arccos(cos_angle)) | |
| return { | |
| "callosal_angle_deg": round(angle_deg, 1), | |
| "apex_point": (apex_x, apex_y), | |
| "left_point": (left_x, left_y), | |
| "right_point": (right_x, right_y), | |
| } | |
| def compute_temporal_horn_width( | |
| ventricle_mask: np.ndarray, | |
| pixel_spacing_mm: Optional[float] = None, | |
| ) -> Dict: | |
| """ | |
| Estimate temporal horn width from an axial ventricle mask. | |
| Looks for ventricle regions in the lower third of the image | |
| (approximate temporal horn location). | |
| Returns: | |
| Dict with 'temporal_horn_width_px' (and '_mm' if spacing given) | |
| """ | |
| h, w = ventricle_mask.shape[:2] | |
| # Temporal horns are in the lower portion of an axial slice | |
| lower_third = ventricle_mask[int(h * 0.55):int(h * 0.85), :] | |
| max_width = 0 | |
| for row in range(lower_third.shape[0]): | |
| cols = np.where(lower_third[row, :] > 0)[0] | |
| if len(cols) > 0: | |
| # Look for small isolated clusters (temporal horns are smaller) | |
| # Split into left/right halves | |
| left_cols = cols[cols < w // 2] | |
| right_cols = cols[cols >= w // 2] | |
| for cluster in [left_cols, right_cols]: | |
| if len(cluster) > 0: | |
| cluster_width = cluster[-1] - cluster[0] | |
| if 3 < cluster_width < w // 4: # reasonable temporal horn size | |
| max_width = max(max_width, cluster_width) | |
| result = {"temporal_horn_width_px": max_width} | |
| if pixel_spacing_mm is not None: | |
| result["temporal_horn_width_mm"] = round(max_width * pixel_spacing_mm, 2) | |
| return result | |
| def compute_third_ventricle_width( | |
| ventricle_mask: np.ndarray, | |
| pixel_spacing_mm: Optional[float] = None, | |
| ) -> Dict: | |
| """ | |
| Estimate third ventricle width from an axial ventricle mask. | |
| Looks for a narrow midline CSF structure. | |
| Returns: | |
| Dict with 'third_ventricle_width_px' (and '_mm' if spacing given) | |
| """ | |
| h, w = ventricle_mask.shape[:2] | |
| midline_band = ventricle_mask[:, max(0, w // 2 - 30):min(w, w // 2 + 30)] | |
| max_width = 0 | |
| for row in range(midline_band.shape[0]): | |
| cols = np.where(midline_band[row, :] > 0)[0] | |
| if len(cols) > 0: | |
| width = cols[-1] - cols[0] | |
| if 2 < width < 60: # reasonable third ventricle size | |
| max_width = max(max_width, width) | |
| result = {"third_ventricle_width_px": max_width} | |
| if pixel_spacing_mm is not None: | |
| result["third_ventricle_width_mm"] = round(max_width * pixel_spacing_mm, 2) | |
| return result | |
| def score_pvh( | |
| flair_gray: np.ndarray, | |
| ventricle_mask: np.ndarray, | |
| dilation_px: int = 15, | |
| pvh_threshold: int = PVH_THRESHOLD, | |
| ) -> Dict: | |
| """ | |
| Score periventricular hyperintensity on FLAIR. | |
| Creates a periventricular zone by dilating the ventricle mask, | |
| then measures hyperintense signal within that zone. | |
| Args: | |
| flair_gray: FLAIR grayscale image (uint8) | |
| ventricle_mask: Binary ventricle mask | |
| dilation_px: Size of periventricular zone in pixels | |
| pvh_threshold: Intensity threshold for hyperintensity | |
| Returns: | |
| Dict with 'pvh_grade' (0-3), 'pvh_ratio', 'pvh_area_px' | |
| """ | |
| kernel = np.ones((dilation_px, dilation_px), np.uint8) | |
| periventricular_zone = cv2.dilate(ventricle_mask, kernel, iterations=1) | |
| periventricular_zone = cv2.subtract(periventricular_zone, ventricle_mask) | |
| # Measure hyperintensity within periventricular zone | |
| pvh_mask = cv2.inRange(flair_gray, pvh_threshold, 255) | |
| pvh_in_zone = cv2.bitwise_and(pvh_mask, periventricular_zone) | |
| zone_area = (periventricular_zone > 0).sum() | |
| pvh_area = (pvh_in_zone > 0).sum() | |
| pvh_ratio = pvh_area / zone_area if zone_area > 0 else 0.0 | |
| # Grade (Fazekas-like for NPH context) | |
| if pvh_ratio < 0.05: | |
| grade = 0 # No significant PVH | |
| elif pvh_ratio < 0.15: | |
| grade = 1 # Pencil-thin rim | |
| elif pvh_ratio < 0.35: | |
| grade = 2 # Smooth halo | |
| else: | |
| grade = 3 # Irregular, extending into deep white matter | |
| return { | |
| "pvh_grade": grade, | |
| "pvh_ratio": round(pvh_ratio, 4), | |
| "pvh_area_px": int(pvh_area), | |
| "periventricular_zone_area_px": int(zone_area), | |
| "pvh_mask": pvh_in_zone, | |
| } | |
| def assess_desh( | |
| ventricle_mask: np.ndarray, | |
| gray: np.ndarray, | |
| roi_mask: np.ndarray, | |
| modality: Modality, | |
| pixel_spacing_mm: Optional[float] = None, | |
| ) -> Dict: | |
| """ | |
| Assess DESH (Disproportionately Enlarged Subarachnoid-space Hydrocephalus) pattern. | |
| Compares sylvian fissure CSF to high-convexity sulcal CSF. | |
| DESH-positive = enlarged sylvian fissures + tight high convexity + ventriculomegaly. | |
| Args: | |
| ventricle_mask: Binary ventricle mask | |
| gray: Grayscale image | |
| roi_mask: Brain ROI mask | |
| modality: Imaging modality | |
| pixel_spacing_mm: Optional pixel spacing | |
| Returns: | |
| Dict with DESH component scores and overall assessment | |
| """ | |
| h, w = gray.shape[:2] | |
| # Evans' index component | |
| ei_data = compute_evans_index(ventricle_mask, image_width=w, pixel_spacing_mm=pixel_spacing_mm) | |
| ei = ei_data["evans_index"] | |
| # Ventriculomegaly score | |
| if ei < 0.3: | |
| vm_score = 0 | |
| elif ei <= 0.33: | |
| vm_score = 1 | |
| else: | |
| vm_score = 2 | |
| # Segment all CSF (sulcal + ventricular) | |
| thresh = VENTRICLE_THRESHOLDS[modality] | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| all_csf = cv2.inRange(blurred, thresh["csf_low"], thresh["csf_high"]) | |
| all_csf = cv2.bitwise_and(all_csf, roi_mask) | |
| # Non-ventricular CSF = sulcal CSF | |
| sulcal_csf = cv2.subtract(all_csf, ventricle_mask) | |
| sulcal_csf = morphological_cleanup(sulcal_csf, kernel_size=3, close_iter=1, open_iter=1) | |
| # Sylvian fissure region (middle third vertically, lateral portions) | |
| sylvian_region = np.zeros_like(gray, dtype=np.uint8) | |
| y_start, y_end = int(h * 0.35), int(h * 0.65) | |
| x_left_end, x_right_start = int(w * 0.15), int(w * 0.85) | |
| sylvian_region[y_start:y_end, :x_left_end] = 255 | |
| sylvian_region[y_start:y_end, x_right_start:] = 255 | |
| # Also include lateral middle zones | |
| sylvian_region[y_start:y_end, :int(w * 0.3)] = 255 | |
| sylvian_region[y_start:y_end, int(w * 0.7):] = 255 | |
| sylvian_csf = cv2.bitwise_and(sulcal_csf, sylvian_region) | |
| sylvian_csf_area = (sylvian_csf > 0).sum() | |
| # High convexity region (top 25% of image) | |
| convexity_region = np.zeros_like(gray, dtype=np.uint8) | |
| convexity_region[:int(h * 0.25), :] = 255 | |
| convexity_csf = cv2.bitwise_and(sulcal_csf, convexity_region) | |
| convexity_csf_area = (convexity_csf > 0).sum() | |
| # Sylvian/convexity ratio | |
| if convexity_csf_area > 0: | |
| ratio = sylvian_csf_area / convexity_csf_area | |
| else: | |
| ratio = float("inf") if sylvian_csf_area > 0 else 0.0 | |
| # Sylvian fissure score | |
| if ratio < 1.5: | |
| sylvian_score = 0 # Proportionate | |
| elif ratio < 3.0: | |
| sylvian_score = 1 # Mildly disproportionate | |
| else: | |
| sylvian_score = 2 # Markedly disproportionate (DESH pattern) | |
| # Convexity tightness score | |
| brain_top_area = (roi_mask[:int(h * 0.25), :] > 0).sum() | |
| convexity_ratio = convexity_csf_area / brain_top_area if brain_top_area > 0 else 0 | |
| if convexity_ratio > 0.1: | |
| convexity_score = 0 # Normal sulci | |
| elif convexity_ratio > 0.04: | |
| convexity_score = 1 # Mildly tight | |
| else: | |
| convexity_score = 2 # Effaced (tight high convexity) | |
| # DESH positive if all components >= 2 (or total >= 5 as softer criterion) | |
| total_score = vm_score + sylvian_score + convexity_score | |
| is_desh_positive = (vm_score >= 1 and sylvian_score >= 2 and convexity_score >= 2) | |
| return { | |
| "is_desh_positive": is_desh_positive, | |
| "total_score": total_score, | |
| "ventriculomegaly_score": vm_score, | |
| "sylvian_dilation_score": sylvian_score, | |
| "convexity_tightness_score": convexity_score, | |
| "evans_index": ei, | |
| "sylvian_convexity_ratio": round(ratio, 2) if ratio != float("inf") else "inf", | |
| "sylvian_csf_area_px": int(sylvian_csf_area), | |
| "convexity_csf_area_px": int(convexity_csf_area), | |
| "sylvian_mask": sylvian_csf, | |
| "convexity_mask": convexity_csf, | |
| } | |
| # =========================================================================== | |
| # FOUNDATION MODEL WRAPPERS | |
| # =========================================================================== | |
| def sam_segment( | |
| image: np.ndarray, | |
| points: List[Tuple[int, int]], | |
| labels: List[int], | |
| checkpoint: str = "sam_vit_h.pth", | |
| model_type: str = "vit_h", | |
| ) -> np.ndarray: | |
| """SAM point-prompt segmentation. Requires segment-anything.""" | |
| try: | |
| from segment_anything import SamPredictor, sam_model_registry | |
| except ImportError: | |
| raise ImportError("segment-anything required. See: https://github.com/facebookresearch/segment-anything") | |
| sam = sam_model_registry[model_type](checkpoint=checkpoint) | |
| predictor = SamPredictor(sam) | |
| predictor.set_image(image) | |
| masks, scores, _ = predictor.predict( | |
| point_coords=np.array(points), point_labels=np.array(labels), multimask_output=True | |
| ) | |
| return (masks[np.argmax(scores)].astype(np.uint8) * 255) | |
| def medsam_segment( | |
| image: np.ndarray, | |
| bbox: Tuple[int, int, int, int], | |
| checkpoint: Optional[str] = None, | |
| ) -> np.ndarray: | |
| """MedSAM bbox segmentation. Requires medsam.""" | |
| try: | |
| from medsam import MedSAMPredictor | |
| except ImportError: | |
| raise ImportError("MedSAM required. See: https://github.com/bowang-lab/MedSAM") | |
| predictor = MedSAMPredictor(checkpoint=checkpoint) | |
| return (predictor.predict(image, bbox).astype(np.uint8) * 255) | |
| # =========================================================================== | |
| # VISUALIZATION | |
| # =========================================================================== | |
| def create_overlay( | |
| img_rgb: np.ndarray, | |
| masks: Dict[str, np.ndarray], | |
| alpha: float = 0.45, | |
| draw_contours: bool = True, | |
| ) -> np.ndarray: | |
| """Create color overlay visualization.""" | |
| overlay = img_rgb.copy() | |
| for name, mask in masks.items(): | |
| color = COLORS.get(name, (200, 200, 200)) | |
| overlay[mask > 0] = color | |
| any_mask = np.zeros(img_rgb.shape[:2], dtype=np.uint8) | |
| for mask in masks.values(): | |
| any_mask = cv2.bitwise_or(any_mask, mask) | |
| result = img_rgb.copy() | |
| for c in range(3): | |
| result[:, :, c] = np.where( | |
| any_mask > 0, | |
| (alpha * overlay[:, :, c] + (1 - alpha) * img_rgb[:, :, c]).astype(np.uint8), | |
| img_rgb[:, :, c], | |
| ) | |
| if draw_contours: | |
| for name, mask in masks.items(): | |
| color = COLORS.get(name, (200, 200, 200)) | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| for cnt in contours: | |
| if cv2.contourArea(cnt) > 100: | |
| cv2.drawContours(result, [cnt], -1, color, 2) | |
| return result | |
| def add_annotations( | |
| image: np.ndarray, | |
| masks: Dict[str, np.ndarray], | |
| title: str = "NPH Segmentation", | |
| biomarkers: Optional[Dict] = None, | |
| ) -> np.ndarray: | |
| """Add title, legend, and optionally biomarker values to image.""" | |
| pil_img = Image.fromarray(image) | |
| draw = ImageDraw.Draw(pil_img) | |
| height, width = image.shape[:2] | |
| try: | |
| font_title = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 18) | |
| font_label = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 13) | |
| font_small = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 12) | |
| except Exception: | |
| font_title = font_label = font_small = ImageFont.load_default() | |
| # Title | |
| draw.text((width // 2 - 80, 8), title, fill=(255, 255, 255), font=font_title) | |
| # Legend (bottom-left) | |
| num_items = len(masks) | |
| legend_x, legend_y = 12, height - 28 * num_items - 20 | |
| box_height = 28 * num_items + 12 | |
| draw.rectangle( | |
| [(legend_x, legend_y), (legend_x + 180, legend_y + box_height)], | |
| fill=(0, 0, 0, 180), outline=(150, 150, 150), | |
| ) | |
| y = legend_y + 8 | |
| for name in masks.keys(): | |
| color = COLORS.get(name, (200, 200, 200)) | |
| draw.rectangle([(legend_x + 8, y), (legend_x + 24, y + 14)], fill=color, outline=(200, 200, 200)) | |
| draw.text((legend_x + 32, y - 1), name.replace("_", " ").title(), fill=(255, 255, 255), font=font_label) | |
| y += 24 | |
| # Biomarker panel (top-right) | |
| if biomarkers: | |
| bm_x = width - 220 | |
| bm_y = 35 | |
| bm_items = [] | |
| if "evans_index" in biomarkers: | |
| ei = biomarkers["evans_index"] | |
| status = "ABNORMAL" if ei > 0.3 else "normal" | |
| bm_items.append(f"Evans' Index: {ei:.3f} ({status})") | |
| if "callosal_angle_deg" in biomarkers and biomarkers["callosal_angle_deg"] is not None: | |
| ca = biomarkers["callosal_angle_deg"] | |
| bm_items.append(f"Callosal Angle: {ca:.1f} deg") | |
| if "temporal_horn_width_px" in biomarkers: | |
| thw = biomarkers["temporal_horn_width_px"] | |
| bm_items.append(f"Temporal Horn: {thw} px") | |
| if "pvh_grade" in biomarkers: | |
| bm_items.append(f"PVH Grade: {biomarkers['pvh_grade']}/3") | |
| if "is_desh_positive" in biomarkers: | |
| desh = "POSITIVE" if biomarkers["is_desh_positive"] else "negative" | |
| bm_items.append(f"DESH: {desh}") | |
| if bm_items: | |
| box_h = 20 * len(bm_items) + 12 | |
| draw.rectangle( | |
| [(bm_x - 5, bm_y - 5), (bm_x + 210, bm_y + box_h)], | |
| fill=(0, 0, 0, 200), outline=(100, 200, 255), | |
| ) | |
| for i, text in enumerate(bm_items): | |
| draw.text((bm_x + 3, bm_y + 3 + i * 20), text, fill=(220, 240, 255), font=font_small) | |
| return np.array(pil_img) | |
| def create_comparison( | |
| original: np.ndarray, | |
| segmented: np.ndarray, | |
| title: str = "Original vs NPH Segmentation", | |
| ) -> np.ndarray: | |
| """Side-by-side comparison.""" | |
| height, width = original.shape[:2] | |
| gap = 20 | |
| comp_width = width * 2 + gap | |
| comp_height = height + 60 | |
| comp = np.zeros((comp_height, comp_width, 3), dtype=np.uint8) | |
| comp[:] = (25, 25, 30) | |
| comp[55:55 + height, :width] = original | |
| comp[55:55 + height, width + gap:] = segmented | |
| pil = Image.fromarray(comp) | |
| draw = ImageDraw.Draw(pil) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 18) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| draw.text((width // 2 - 30, 20), "Original", fill=(200, 200, 200), font=font) | |
| draw.text((width + gap + width // 2 - 60, 20), "NPH Analysis", fill=(200, 200, 200), font=font) | |
| draw.text((comp_width // 2 - 100, 2), title, fill=(255, 255, 255), font=font) | |
| return np.array(pil) | |
| # =========================================================================== | |
| # QUALITY METRICS | |
| # =========================================================================== | |
| def dice_coefficient(pred: np.ndarray, gt: np.ndarray) -> float: | |
| """Dice coefficient between prediction and ground truth.""" | |
| p, g = (pred > 0).astype(bool), (gt > 0).astype(bool) | |
| inter = np.sum(p & g) | |
| total = np.sum(p) + np.sum(g) | |
| return 2.0 * inter / total if total > 0 else 1.0 | |
| def iou_score(pred: np.ndarray, gt: np.ndarray) -> float: | |
| """Intersection over Union (Jaccard index).""" | |
| p, g = (pred > 0).astype(bool), (gt > 0).astype(bool) | |
| inter = np.sum(p & g) | |
| union = np.sum(p | g) | |
| return float(inter) / float(union) if union > 0 else 1.0 | |
| # =========================================================================== | |
| # INTERNAL HELPERS | |
| # =========================================================================== | |
| def _extract_contours(masks: Dict[str, np.ndarray], min_area: int = 200) -> Dict[str, List]: | |
| out = {} | |
| for name, mask in masks.items(): | |
| cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| out[name] = [cnt.tolist() for cnt in cnts if cv2.contourArea(cnt) > min_area] | |
| return out | |
| # =========================================================================== | |
| # MAIN NPH PIPELINE | |
| # =========================================================================== | |
| def segment_nph( | |
| image_path: str, | |
| modality: str = "CT_HEAD", | |
| structures: Optional[List[str]] = None, | |
| output_path: Optional[str] = None, | |
| pixel_spacing_mm: Optional[float] = None, | |
| compute_biomarkers: bool = True, | |
| ) -> SegmentationResult: | |
| """ | |
| Main entry point for NPH neuroimaging segmentation and analysis. | |
| Args: | |
| image_path: Path to input image (DICOM, PNG, JPG) | |
| modality: 'CT_HEAD', 'T1', 'T1_GD', 'T2', 'FLAIR' | |
| structures: Optional list of structures to segment | |
| output_path: Optional path to save comparison image | |
| pixel_spacing_mm: Pixel spacing for real-world measurements | |
| compute_biomarkers: Whether to compute Evans' index, etc. | |
| Returns: | |
| SegmentationResult with masks, overlay, contours, and metadata | |
| including NPH biomarkers | |
| """ | |
| mod = Modality[modality.upper()] | |
| # Load image | |
| is_dicom = image_path.lower().endswith((".dcm", ".dicom")) | |
| dicom_meta = {} | |
| if is_dicom: | |
| hu, dicom_meta = load_dicom(image_path) | |
| pixel_spacing_mm = pixel_spacing_mm or dicom_meta.get("pixel_spacing_mm", [1.0])[0] | |
| center, width_hu = CT_WINDOWS["brain"] | |
| gray = apply_ct_window(hu, center, width_hu) | |
| img_rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB) | |
| else: | |
| img_rgb, gray, _ = preprocess_image(image_path) | |
| roi_mask = create_roi_mask(cv2.GaussianBlur(gray, (5, 5), 0), threshold=15) | |
| # Default structures | |
| if structures is None: | |
| if mod == Modality.FLAIR: | |
| structures = ["ventricles", "pvh", "parenchyma"] | |
| else: | |
| structures = ["ventricles", "parenchyma"] | |
| masks: Dict[str, np.ndarray] = {} | |
| # Always segment ventricles | |
| vent_mask = segment_ventricles(gray, mod, roi_mask) | |
| masks["ventricles"] = vent_mask | |
| # Parenchyma (brain tissue excluding ventricles) | |
| if "parenchyma" in structures: | |
| parenchyma = cv2.bitwise_and(roi_mask, cv2.bitwise_not(vent_mask)) | |
| masks["parenchyma"] = parenchyma | |
| # PVH on FLAIR | |
| pvh_data = None | |
| if "pvh" in structures and mod == Modality.FLAIR: | |
| pvh_data = score_pvh(gray, vent_mask) | |
| masks["pvh"] = pvh_data["pvh_mask"] | |
| # Skull (for Evans' index) | |
| skull_mask = None | |
| if is_dicom and mod == Modality.CT_HEAD: | |
| bone_gray = apply_ct_window(hu, *CT_WINDOWS["bone"]) | |
| skull_mask = segment_skull(bone_gray, threshold=180) | |
| masks["skull"] = skull_mask | |
| # Compute biomarkers | |
| biomarkers = {} | |
| if compute_biomarkers: | |
| ei_data = compute_evans_index(vent_mask, skull_mask, gray.shape[1], pixel_spacing_mm) | |
| biomarkers.update(ei_data) | |
| th_data = compute_temporal_horn_width(vent_mask, pixel_spacing_mm) | |
| biomarkers.update(th_data) | |
| tv_data = compute_third_ventricle_width(vent_mask, pixel_spacing_mm) | |
| biomarkers.update(tv_data) | |
| if pvh_data: | |
| biomarkers["pvh_grade"] = pvh_data["pvh_grade"] | |
| biomarkers["pvh_ratio"] = pvh_data["pvh_ratio"] | |
| # DESH assessment | |
| desh_data = assess_desh(vent_mask, gray, roi_mask, mod, pixel_spacing_mm) | |
| biomarkers["is_desh_positive"] = desh_data["is_desh_positive"] | |
| biomarkers["desh_total_score"] = desh_data["total_score"] | |
| biomarkers["desh_details"] = { | |
| k: v for k, v in desh_data.items() | |
| if k not in ("sylvian_mask", "convexity_mask") | |
| } | |
| # Remove non-display masks | |
| display_masks = {k: v for k, v in masks.items() if k != "skull"} | |
| # Visualization | |
| overlay = create_overlay(img_rgb, display_masks) | |
| annotated = add_annotations(overlay, display_masks, f"{modality} - NPH Analysis", biomarkers) | |
| contours = _extract_contours(display_masks) | |
| metadata = { | |
| "modality": modality, | |
| "structures_found": list(display_masks.keys()), | |
| "image_shape": img_rgb.shape, | |
| "pixel_spacing_mm": pixel_spacing_mm, | |
| "dicom_meta": dicom_meta, | |
| } | |
| metadata.update(biomarkers) | |
| result = SegmentationResult( | |
| masks=display_masks, overlay=annotated, contours=contours, metadata=metadata | |
| ) | |
| if output_path: | |
| comparison = create_comparison(img_rgb, annotated, f"{modality} - NPH Analysis") | |
| Image.fromarray(comparison).save(output_path) | |
| print(f"Saved: {output_path}") | |
| return result | |
| # Alias for backward compatibility | |
| segment_brain_image = segment_nph | |
| # =========================================================================== | |
| # CLI | |
| # =========================================================================== | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python segment_neuroimaging.py <image_path> [modality] [output_path]") | |
| print(" modality: CT_HEAD, T1, T1_GD, T2, FLAIR (default: CT_HEAD)") | |
| sys.exit(1) | |
| image_path = sys.argv[1] | |
| modality = sys.argv[2] if len(sys.argv) > 2 else "CT_HEAD" | |
| output_path = sys.argv[3] if len(sys.argv) > 3 else image_path.replace(".", "_nph_analysis.") | |
| result = segment_nph(image_path, modality, output_path=output_path) | |
| print(f"\n--- NPH Analysis Results ---") | |
| print(f"Structures: {result.metadata['structures_found']}") | |
| ei = result.metadata.get("evans_index") | |
| if ei is not None: | |
| print(f"Evans' Index: {ei:.3f} ({'ABNORMAL (>0.3)' if ei > 0.3 else 'Normal'})") | |
| thw = result.metadata.get("temporal_horn_width_px") | |
| if thw is not None: | |
| print(f"Temporal Horn Width: {thw} px") | |
| pvh = result.metadata.get("pvh_grade") | |
| if pvh is not None: | |
| print(f"PVH Grade: {pvh}/3") | |
| desh = result.metadata.get("is_desh_positive") | |
| if desh is not None: | |
| print(f"DESH Pattern: {'POSITIVE' if desh else 'negative'}") | |