mekosotto commited on
Commit
4335f6a
·
1 Parent(s): b9e6d2f

docs(plan): add Day-3 MRI ComBat pipeline plan

Browse files
docs/superpowers/plans/2026-05-01-day3-mri-combat-pipeline.md ADDED
@@ -0,0 +1,1317 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # NeuroBridge Day 3 — MRI ComBat Pipeline Implementation Plan
2
+
3
+ > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
4
+
5
+ **Goal:** Insider One Hackathon Day 3 — ship a deterministic MRI feature pipeline that reads multi-site NIfTI volumes, masks the brain, applies ComBat harmonization to remove site-level domain shift, and writes ROI statistic features as Parquet.
6
+
7
+ **Architecture:** Modular `src/pipelines/mri_pipeline.py` mirroring Day 1/2's public-function template: a 3-D volume validity primitive (`is_valid_volume`), a brain masking transformer (`mask_brain`), a site-harmonization step (`harmonize_combat`), a feature-emitting layer (`extract_features_from_volume`), and an I/O orchestrator (`run_pipeline`). All logging goes through `src.core.logger.get_logger`. Output is float64 Parquet per AGENTS.md §6. Tests use deterministic synthetic 3-D NIfTI fixtures generated by `build_mri_fixture.py` (seed=42, 8×8×8 voxels × 6 subjects across 2 simulated sites with a deliberate site-effect bias).
8
+
9
+ **Tech Stack:** Python 3.10–3.12, `nibabel==5.2.1`, `neuroharmonize==2.4.5`, NumPy, SciPy (`scipy.ndimage` for morphological mask cleanup), Pandas, PyArrow, Pytest.
10
+
11
+ ---
12
+
13
+ ## File Structure
14
+
15
+ | Path | Responsibility |
16
+ |---|---|
17
+ | `src/pipelines/mri_pipeline.py` | Public API (`is_valid_volume`, `mask_brain`, `harmonize_combat`, `extract_features_from_volume`, `run_pipeline`) + `DEFAULT_INPUT` / `DEFAULT_OUTPUT` + `__main__` CLI. |
18
+ | `tests/pipelines/test_mri_pipeline.py` | Unit + integration tests; one class per public function. |
19
+ | `tests/fixtures/build_mri_fixture.py` | Standalone script that regenerates `mri_sample/` (6 NIfTI volumes + a `sites.csv` covariate sheet) deterministically from seed=42. Committed alongside the artifacts. |
20
+ | `tests/fixtures/mri_sample/` | 6 deterministic synthetic NIfTI volumes (`subject_{i}.nii.gz`) split across 2 sites, plus `sites.csv` with `subject_id,site` columns. |
21
+ | `AGENTS.md` | Update §1 pipeline-table MRI row to remove "(planned, Day 3)" suffix and link the now-shipped file. |
22
+ | `README.md` | Update Status table MRI row to "Shipped — N tests green"; add MRI smoke-run block + function-reference table; mark Day 3 done in roadmap. |
23
+
24
+ `mri_pipeline.py` is expected to land at ~250–300 lines after Task 7. We do not split into submodules at this stage.
25
+
26
+ ---
27
+
28
+ ## Public API contract (defined here so tasks reference one source of truth)
29
+
30
+ ```python
31
+ # Default ROI partition: split a (D, H, W) volume into 2×2×2 = 8 octant ROIs.
32
+ # Octant index follows binary (z, y, x) ordering: 0..7.
33
+ DEFAULT_N_ROI_AXES: tuple[int, int, int] = (2, 2, 2)
34
+ ROI_STATS: tuple[str, ...] = ("mean", "std", "p10", "p50", "p90", "voxel_count")
35
+
36
+ def is_valid_volume(volume: np.ndarray | None) -> bool: ...
37
+ def mask_brain(
38
+ volume: np.ndarray,
39
+ intensity_threshold: float | None = None,
40
+ ) -> np.ndarray: ...
41
+ def harmonize_combat(
42
+ features: pd.DataFrame,
43
+ sites: pd.Series,
44
+ feature_cols: list[str],
45
+ ) -> pd.DataFrame: ...
46
+ def extract_features_from_volume(
47
+ volume: np.ndarray,
48
+ mask: np.ndarray,
49
+ n_roi_axes: tuple[int, int, int] = DEFAULT_N_ROI_AXES,
50
+ ) -> dict[str, float]: ...
51
+ def run_pipeline(
52
+ input_dir: Path = DEFAULT_INPUT,
53
+ sites_csv: Path | None = None,
54
+ output_path: Path = DEFAULT_OUTPUT,
55
+ intensity_threshold: float | None = None,
56
+ n_roi_axes: tuple[int, int, int] = DEFAULT_N_ROI_AXES,
57
+ ) -> None: ...
58
+ ```
59
+
60
+ Per-volume feature vector layout: `n_roi * len(ROI_STATS)` floats, where `n_roi = product(n_roi_axes)` (default 8). Column names: `feat_roi{i}_<stat>` for `i in 0..n_roi-1` and `<stat>` in ROI_STATS. Output DataFrame schema: one row per subject, columns `subject_id, site, feat_*`.
61
+
62
+ `harmonize_combat` calls `neuroHarmonize.harmonizationLearn` which uses an EM-style fit. ComBat is deterministic given the same input + covariates (no internal RNG), so byte-determinism holds without seeding.
63
+
64
+ ---
65
+
66
+ ## Task 1: MRI test fixture (deterministic synthetic NIfTI volumes)
67
+
68
+ **Files:**
69
+ - Create: `tests/fixtures/build_mri_fixture.py`
70
+ - Create: `tests/fixtures/mri_sample/subject_0.nii.gz` ... `subject_5.nii.gz`
71
+ - Create: `tests/fixtures/mri_sample/sites.csv`
72
+
73
+ - [ ] **Step 1: Write the fixture-builder script**
74
+
75
+ Create `/Users/mertgungor/Desktop/hackathon/tests/fixtures/build_mri_fixture.py`:
76
+ ```python
77
+ """Generate deterministic synthetic MRI fixtures for the Day-3 pipeline tests.
78
+
79
+ Six 8×8×8 NIfTI volumes split across two simulated sites. Each volume is a
80
+ spherical "brain" with isotropic Gaussian noise plus a per-site additive bias
81
+ that ComBat is expected to remove. The fixture is committed alongside this
82
+ script so test runs are reproducible without re-running.
83
+
84
+ Channels:
85
+ - Site A: subject_0, subject_1, subject_2 (bias = +0.0 a.u.)
86
+ - Site B: subject_3, subject_4, subject_5 (bias = +5.0 a.u.)
87
+
88
+ NOTE: byte-determinism of the .nii.gz output is coupled to nibabel==5.2.1
89
+ (pinned in requirements.txt) and a fixed nibabel.Nifti1Image header. If the
90
+ nibabel pin is upgraded, re-run this script and commit the rebuilt artifacts
91
+ alongside the dependency bump.
92
+ """
93
+ from __future__ import annotations
94
+
95
+ import csv
96
+ from pathlib import Path
97
+
98
+ import nibabel as nib
99
+ import numpy as np
100
+
101
+
102
+ SITE_A_BIAS = 0.0
103
+ SITE_B_BIAS = 5.0
104
+ VOLUME_SHAPE = (8, 8, 8)
105
+ SUBJECTS = (
106
+ ("subject_0", "A"),
107
+ ("subject_1", "A"),
108
+ ("subject_2", "A"),
109
+ ("subject_3", "B"),
110
+ ("subject_4", "B"),
111
+ ("subject_5", "B"),
112
+ )
113
+
114
+
115
+ def _spherical_brain(rng: np.random.Generator, bias: float) -> np.ndarray:
116
+ """Build an 8×8×8 volume: spherical brain (radius 3) + noise + site bias."""
117
+ d, h, w = VOLUME_SHAPE
118
+ z, y, x = np.indices((d, h, w))
119
+ cz, cy, cx = (d - 1) / 2.0, (h - 1) / 2.0, (w - 1) / 2.0
120
+ radius2 = (z - cz) ** 2 + (y - cy) ** 2 + (x - cx) ** 2
121
+ brain_mask = radius2 <= 3.0**2
122
+ # Brain intensity ~10 a.u., background ~0.1 a.u. (so default threshold splits cleanly).
123
+ volume = np.where(brain_mask, 10.0, 0.1).astype(np.float64)
124
+ volume += rng.standard_normal(VOLUME_SHAPE) * 0.5
125
+ volume[brain_mask] += bias
126
+ return volume
127
+
128
+
129
+ def build(out_dir: Path | None = None) -> Path:
130
+ out = out_dir if out_dir is not None else Path(__file__).parent / "mri_sample"
131
+ out.mkdir(parents=True, exist_ok=True)
132
+
133
+ rng = np.random.default_rng(seed=42)
134
+ affine = np.eye(4)
135
+
136
+ sites_rows: list[tuple[str, str]] = []
137
+ for subject_id, site in SUBJECTS:
138
+ bias = SITE_A_BIAS if site == "A" else SITE_B_BIAS
139
+ volume = _spherical_brain(rng, bias=bias)
140
+ img = nib.Nifti1Image(volume, affine=affine)
141
+ nib.save(img, out / f"{subject_id}.nii.gz")
142
+ sites_rows.append((subject_id, site))
143
+
144
+ with (out / "sites.csv").open("w", newline="") as fh:
145
+ writer = csv.writer(fh)
146
+ writer.writerow(["subject_id", "site"])
147
+ writer.writerows(sites_rows)
148
+ return out
149
+
150
+
151
+ if __name__ == "__main__":
152
+ p = build()
153
+ print(f"Wrote MRI fixture to {p}")
154
+ ```
155
+
156
+ - [ ] **Step 2: Run the script to generate the artifacts**
157
+
158
+ ```bash
159
+ cd /Users/mertgungor/Desktop/hackathon
160
+ source .venv312/bin/activate
161
+ python tests/fixtures/build_mri_fixture.py
162
+ ```
163
+ Expected stdout: `Wrote MRI fixture to .../tests/fixtures/mri_sample`. Six `.nii.gz` files (each ~1.8 KB) and `sites.csv` are created.
164
+
165
+ - [ ] **Step 3: Sanity-check the fixture**
166
+
167
+ ```bash
168
+ python -c "
169
+ import nibabel as nib
170
+ import numpy as np
171
+ from pathlib import Path
172
+
173
+ base = Path('tests/fixtures/mri_sample')
174
+ img0 = nib.load(base / 'subject_0.nii.gz')
175
+ img3 = nib.load(base / 'subject_3.nii.gz')
176
+ v0 = img0.get_fdata()
177
+ v3 = img3.get_fdata()
178
+ print('shape:', v0.shape)
179
+ print('site A mean (subject_0 brain voxels):', round(float(v0[v0 > 5].mean()), 2))
180
+ print('site B mean (subject_3 brain voxels):', round(float(v3[v3 > 5].mean()), 2))
181
+ print('sites.csv exists:', (base / 'sites.csv').exists())
182
+ "
183
+ ```
184
+ Expected:
185
+ ```
186
+ shape: (8, 8, 8)
187
+ site A mean (subject_0 brain voxels): ~10.0
188
+ site B mean (subject_3 brain voxels): ~15.0 # 10 + bias 5
189
+ sites.csv exists: True
190
+ ```
191
+
192
+ - [ ] **Step 4: Verify byte-determinism**
193
+
194
+ ```bash
195
+ md5_before=$(md5 -q tests/fixtures/mri_sample/subject_0.nii.gz 2>/dev/null || md5sum tests/fixtures/mri_sample/subject_0.nii.gz | awk '{print $1}')
196
+ python tests/fixtures/build_mri_fixture.py
197
+ md5_after=$(md5 -q tests/fixtures/mri_sample/subject_0.nii.gz 2>/dev/null || md5sum tests/fixtures/mri_sample/subject_0.nii.gz | awk '{print $1}')
198
+ echo "before: $md5_before"
199
+ echo "after: $md5_after"
200
+ ```
201
+ Expected: matching MD5s. (If they drift due to nibabel header timestamp drift, fall back to data-equality: `assert np.array_equal(nib.load(a).get_fdata(), nib.load(b).get_fdata())`.)
202
+
203
+ - [ ] **Step 5: Commit**
204
+
205
+ ```bash
206
+ git add tests/fixtures/build_mri_fixture.py tests/fixtures/mri_sample/
207
+ git commit -m "test(mri): add deterministic synthetic NIfTI fixture (6 subjects × 2 sites)"
208
+ ```
209
+
210
+ ---
211
+
212
+ ## Task 2: `is_valid_volume` (TDD)
213
+
214
+ **Files:**
215
+ - Create: `tests/pipelines/test_mri_pipeline.py`
216
+ - Create: `src/pipelines/mri_pipeline.py`
217
+
218
+ - [ ] **Step 1: Write the failing tests**
219
+
220
+ Create `/Users/mertgungor/Desktop/hackathon/tests/pipelines/test_mri_pipeline.py`:
221
+ ```python
222
+ """Unit + integration tests for the MRI ComBat pipeline."""
223
+ from __future__ import annotations
224
+
225
+ from pathlib import Path
226
+
227
+ import numpy as np
228
+ import pytest
229
+
230
+ from src.pipelines.mri_pipeline import is_valid_volume
231
+
232
+
233
+ FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "mri_sample"
234
+
235
+
236
+ class TestIsValidVolume:
237
+ def test_accepts_3d_finite_array(self) -> None:
238
+ vol = np.zeros((8, 8, 8), dtype=np.float64)
239
+ assert is_valid_volume(vol) is True
240
+
241
+ def test_rejects_wrong_dimension(self) -> None:
242
+ assert is_valid_volume(np.zeros((8, 8))) is False
243
+ assert is_valid_volume(np.zeros((8, 8, 8, 2))) is False
244
+
245
+ def test_rejects_nan(self) -> None:
246
+ vol = np.zeros((8, 8, 8))
247
+ vol[0, 0, 0] = np.nan
248
+ assert is_valid_volume(vol) is False
249
+
250
+ def test_rejects_inf(self) -> None:
251
+ vol = np.zeros((8, 8, 8))
252
+ vol[1, 1, 1] = np.inf
253
+ assert is_valid_volume(vol) is False
254
+ vol[1, 1, 1] = -np.inf
255
+ assert is_valid_volume(vol) is False
256
+
257
+ def test_rejects_empty(self) -> None:
258
+ assert is_valid_volume(np.zeros((0, 8, 8))) is False
259
+ assert is_valid_volume(np.zeros((8, 0, 8))) is False
260
+ assert is_valid_volume(np.zeros((8, 8, 0))) is False
261
+
262
+ def test_rejects_non_numeric_dtype(self) -> None:
263
+ vol = np.array([[["a", "b"], ["c", "d"]]])
264
+ assert is_valid_volume(vol) is False
265
+
266
+ def test_rejects_non_array(self) -> None:
267
+ assert is_valid_volume([[[1, 2]], [[3, 4]]]) is False
268
+ assert is_valid_volume(None) is False
269
+ ```
270
+
271
+ - [ ] **Step 2: Run tests; they MUST fail**
272
+
273
+ ```bash
274
+ cd /Users/mertgungor/Desktop/hackathon
275
+ source .venv312/bin/activate
276
+ pytest tests/pipelines/test_mri_pipeline.py -v
277
+ ```
278
+ Expected: collection failure with `ModuleNotFoundError: No module named 'src.pipelines.mri_pipeline'`.
279
+
280
+ - [ ] **Step 3: Implement the module**
281
+
282
+ Create `/Users/mertgungor/Desktop/hackathon/src/pipelines/mri_pipeline.py`:
283
+ ```python
284
+ """MRI (magnetic resonance imaging) pipeline.
285
+
286
+ Loads NIfTI volumes (`.nii` / `.nii.gz`), applies a brain mask, harmonizes
287
+ across sites with ComBat (`neuroHarmonize`), and writes per-subject ROI
288
+ statistics as a model-ready Parquet at `data/processed/mri_features.parquet`.
289
+
290
+ Follows the Data Readiness contract in AGENTS.md §4 and the Parquet storage
291
+ convention in §6: schema validity, domain validity (drop NaN/inf volumes
292
+ with a logged WARNING), determinism (ComBat is RNG-free given fixed input),
293
+ traceability (in/out/dropped counts at INFO), and idempotent overwrite.
294
+ """
295
+ from __future__ import annotations
296
+
297
+ import os
298
+
299
+ import numpy as np
300
+ import pyarrow as pa
301
+
302
+ from src.core.logger import get_logger
303
+
304
+ logger = get_logger(__name__)
305
+
306
+ # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
307
+ # (AGENTS.md §4 rule 3) holds across hardware. Without this, multi-threaded
308
+ # floating-point reductions can reorder and produce non-bit-identical output.
309
+ os.environ.setdefault("OMP_NUM_THREADS", "1")
310
+ os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
311
+ os.environ.setdefault("MKL_NUM_THREADS", "1")
312
+ pa.set_cpu_count(1)
313
+ pa.set_io_thread_count(1)
314
+
315
+
316
+ def is_valid_volume(volume: np.ndarray | None) -> bool:
317
+ """Return True iff `volume` is a non-empty 3-D numeric array with no NaN/inf.
318
+
319
+ Used to drop corrupted volumes before masking + feature extraction.
320
+ Defensive against the full set of garbage we expect from real archives:
321
+ lists, None, NaN/inf samples, zero-sized arrays, string-dtype arrays.
322
+ """
323
+ if not isinstance(volume, np.ndarray):
324
+ return False
325
+ if volume.ndim != 3:
326
+ return False
327
+ if volume.size == 0:
328
+ return False
329
+ if not np.issubdtype(volume.dtype, np.number):
330
+ return False
331
+ if not np.all(np.isfinite(volume)):
332
+ return False
333
+ return True
334
+ ```
335
+
336
+ - [ ] **Step 4: Run tests to verify they pass**
337
+
338
+ ```bash
339
+ pytest tests/pipelines/test_mri_pipeline.py -v
340
+ pytest -v
341
+ ```
342
+ Expected: 7 PASS in `TestIsValidVolume`. Total suite: **74 PASS** (67 prior + 7 new).
343
+
344
+ - [ ] **Step 5: Commit**
345
+
346
+ ```bash
347
+ git add tests/pipelines/test_mri_pipeline.py src/pipelines/mri_pipeline.py
348
+ git commit -m "feat(mri): add is_valid_volume guard for NaN/inf/shape/dtype on 3-D arrays"
349
+ ```
350
+
351
+ ---
352
+
353
+ ## Task 3: `mask_brain` — intensity threshold + morphological cleanup (TDD)
354
+
355
+ **Files:**
356
+ - Modify: `tests/pipelines/test_mri_pipeline.py`
357
+ - Modify: `src/pipelines/mri_pipeline.py`
358
+
359
+ - [ ] **Step 1: Append the failing tests**
360
+
361
+ Update merged tuple at top of `test_mri_pipeline.py`:
362
+ ```python
363
+ from src.pipelines.mri_pipeline import (
364
+ is_valid_volume,
365
+ mask_brain,
366
+ )
367
+ ```
368
+
369
+ Add a `import nibabel as nib` line in the third-party block of imports (alphabetically between `numpy` and `pytest`).
370
+
371
+ Append:
372
+ ```python
373
+
374
+
375
+ class TestMaskBrain:
376
+ def _load_subject(self, sid: str) -> np.ndarray:
377
+ return nib.load(FIXTURE_DIR / f"{sid}.nii.gz").get_fdata()
378
+
379
+ def test_returns_bool_mask_of_same_shape(self) -> None:
380
+ vol = self._load_subject("subject_0")
381
+ mask = mask_brain(vol)
382
+ assert isinstance(mask, np.ndarray)
383
+ assert mask.dtype == bool
384
+ assert mask.shape == vol.shape
385
+
386
+ def test_mask_separates_brain_from_background(self) -> None:
387
+ """Default threshold should keep the spherical-brain center voxels in."""
388
+ vol = self._load_subject("subject_0")
389
+ mask = mask_brain(vol)
390
+ # The fixture's brain region (radius 3 around center) intensity is ~10;
391
+ # background is ~0.1. Some brain voxels MUST survive the mask.
392
+ assert mask.sum() > 0
393
+ # The center voxel (always brain) MUST be in the mask.
394
+ center = tuple(s // 2 for s in vol.shape)
395
+ assert mask[center]
396
+
397
+ def test_mask_drops_low_intensity_background(self) -> None:
398
+ """Voxels with intensity well below the brain core must be excluded."""
399
+ vol = self._load_subject("subject_0")
400
+ mask = mask_brain(vol, intensity_threshold=5.0)
401
+ # Background voxels (intensity ~0.1) must NOT be in the mask.
402
+ bg_voxel = (0, 0, 0)
403
+ assert mask[bg_voxel] == False # noqa: E712
404
+
405
+ def test_explicit_threshold_overrides_default(self) -> None:
406
+ vol = self._load_subject("subject_0")
407
+ # A very high threshold should produce far fewer mask voxels.
408
+ mask_default = mask_brain(vol)
409
+ mask_strict = mask_brain(vol, intensity_threshold=100.0)
410
+ assert mask_strict.sum() < mask_default.sum()
411
+
412
+ def test_does_not_mutate_input(self) -> None:
413
+ vol = self._load_subject("subject_0")
414
+ original = vol.copy()
415
+ _ = mask_brain(vol)
416
+ np.testing.assert_array_equal(vol, original)
417
+
418
+ def test_morphological_cleanup_removes_isolated_voxels(self) -> None:
419
+ """A single bright voxel surrounded by background must be removed by the
420
+ opening-style morphological cleanup."""
421
+ vol = np.zeros((8, 8, 8), dtype=np.float64)
422
+ vol[4, 4, 4] = 100.0
423
+ mask = mask_brain(vol, intensity_threshold=50.0)
424
+ # Without cleanup, the single voxel would survive. With morphological
425
+ # opening, it must be removed.
426
+ assert mask.sum() == 0
427
+ ```
428
+
429
+ - [ ] **Step 2: Run tests; they MUST fail**
430
+
431
+ ```bash
432
+ pytest tests/pipelines/test_mri_pipeline.py::TestMaskBrain -v
433
+ ```
434
+ Expected: 6 FAILS with `cannot import name 'mask_brain'`.
435
+
436
+ - [ ] **Step 3: Implement `mask_brain`**
437
+
438
+ Append `import nibabel as nib` and `from scipy import ndimage as scipy_ndimage` to the third-party block of `src/pipelines/mri_pipeline.py`. Final imports:
439
+ ```python
440
+ from __future__ import annotations
441
+
442
+ import os
443
+
444
+ import nibabel as nib
445
+ import numpy as np
446
+ import pyarrow as pa
447
+ from scipy import ndimage as scipy_ndimage
448
+
449
+ from src.core.logger import get_logger
450
+ ```
451
+
452
+ Append at the END of `src/pipelines/mri_pipeline.py`:
453
+ ```python
454
+
455
+
456
+ def mask_brain(
457
+ volume: np.ndarray,
458
+ intensity_threshold: float | None = None,
459
+ ) -> np.ndarray:
460
+ """Build a brain mask from a 3-D MRI volume.
461
+
462
+ Two-step pipeline:
463
+ 1. Intensity threshold: keep voxels above `intensity_threshold`. When
464
+ `None`, use the volume's mean as a robust auto-threshold (works on
465
+ the synthetic fixture where brain ≫ background; for real data the
466
+ caller should pass an Otsu or BET-derived threshold explicitly).
467
+ 2. Morphological opening (`scipy.ndimage.binary_opening`) to remove
468
+ isolated noise voxels and disconnected fragments.
469
+
470
+ Args:
471
+ volume: 3-D numeric `np.ndarray` (must satisfy `is_valid_volume`).
472
+ intensity_threshold: Voxel-intensity floor. `None` → use `volume.mean()`.
473
+
474
+ Returns:
475
+ A boolean `np.ndarray` of the same shape as `volume`. True = brain.
476
+ """
477
+ if intensity_threshold is None:
478
+ intensity_threshold = float(volume.mean())
479
+
480
+ raw = volume > intensity_threshold
481
+ cleaned = scipy_ndimage.binary_opening(raw, iterations=1)
482
+ return cleaned.astype(bool)
483
+ ```
484
+
485
+ - [ ] **Step 4: Run tests to verify they pass**
486
+
487
+ ```bash
488
+ pytest tests/pipelines/test_mri_pipeline.py -v
489
+ ```
490
+ Expected: 13 PASS in MRI file (7 valid_volume + 6 mask). Total: **80 PASS** (67 prior + 13).
491
+
492
+ - [ ] **Step 5: Commit**
493
+
494
+ ```bash
495
+ git add tests/pipelines/test_mri_pipeline.py src/pipelines/mri_pipeline.py
496
+ git commit -m "feat(mri): add mask_brain (intensity threshold + morphological opening)"
497
+ ```
498
+
499
+ ---
500
+
501
+ ## Task 4: `extract_features_from_volume` — ROI octant statistics (TDD)
502
+
503
+ **Files:**
504
+ - Modify: `tests/pipelines/test_mri_pipeline.py`
505
+ - Modify: `src/pipelines/mri_pipeline.py`
506
+
507
+ - [ ] **Step 1: Append the failing tests**
508
+
509
+ Extend the merged tuple to include `extract_features_from_volume` AND `DEFAULT_N_ROI_AXES` AND `ROI_STATS`:
510
+ ```python
511
+ from src.pipelines.mri_pipeline import (
512
+ DEFAULT_N_ROI_AXES,
513
+ ROI_STATS,
514
+ extract_features_from_volume,
515
+ is_valid_volume,
516
+ mask_brain,
517
+ )
518
+ ```
519
+
520
+ Append:
521
+ ```python
522
+
523
+
524
+ class TestExtractFeaturesFromVolume:
525
+ def _load_subject(self, sid: str) -> np.ndarray:
526
+ return nib.load(FIXTURE_DIR / f"{sid}.nii.gz").get_fdata()
527
+
528
+ def test_returns_dict_with_correct_keys(self) -> None:
529
+ vol = self._load_subject("subject_0")
530
+ mask = mask_brain(vol)
531
+ feats = extract_features_from_volume(vol, mask)
532
+ n_roi = int(np.prod(DEFAULT_N_ROI_AXES))
533
+ expected = {
534
+ f"feat_roi{i}_{stat}"
535
+ for i in range(n_roi)
536
+ for stat in ROI_STATS
537
+ }
538
+ assert set(feats.keys()) == expected
539
+
540
+ def test_feature_count_matches_contract(self) -> None:
541
+ vol = self._load_subject("subject_0")
542
+ mask = mask_brain(vol)
543
+ feats = extract_features_from_volume(vol, mask)
544
+ n_roi = int(np.prod(DEFAULT_N_ROI_AXES))
545
+ assert len(feats) == n_roi * len(ROI_STATS)
546
+
547
+ def test_all_features_finite_float(self) -> None:
548
+ vol = self._load_subject("subject_0")
549
+ mask = mask_brain(vol)
550
+ feats = extract_features_from_volume(vol, mask)
551
+ for k, v in feats.items():
552
+ assert isinstance(v, float), f"{k}: {type(v).__name__}"
553
+ assert np.isfinite(v), f"{k}: {v}"
554
+
555
+ def test_voxel_count_is_integer_valued(self) -> None:
556
+ vol = self._load_subject("subject_0")
557
+ mask = mask_brain(vol)
558
+ feats = extract_features_from_volume(vol, mask)
559
+ for k, v in feats.items():
560
+ if k.endswith("_voxel_count"):
561
+ # voxel_count stored as float for column-uniformity, but must be
562
+ # a whole number.
563
+ assert v == float(int(v))
564
+
565
+ def test_empty_mask_yields_zero_features(self) -> None:
566
+ """If a volume has zero brain voxels (mask all False), every stat
567
+ must default to 0.0 — not NaN — to preserve the no-NaN Parquet contract."""
568
+ vol = self._load_subject("subject_0")
569
+ empty_mask = np.zeros_like(vol, dtype=bool)
570
+ feats = extract_features_from_volume(vol, empty_mask)
571
+ for k, v in feats.items():
572
+ assert v == 0.0, f"{k}: {v}"
573
+
574
+ def test_deterministic_for_same_input(self) -> None:
575
+ vol = self._load_subject("subject_0")
576
+ mask = mask_brain(vol)
577
+ a = extract_features_from_volume(vol, mask)
578
+ b = extract_features_from_volume(vol, mask)
579
+ assert a == b
580
+ ```
581
+
582
+ - [ ] **Step 2: Run tests; they MUST fail**
583
+
584
+ ```bash
585
+ pytest tests/pipelines/test_mri_pipeline.py::TestExtractFeaturesFromVolume -v
586
+ ```
587
+ Expected: 6 FAILS with `cannot import name 'extract_features_from_volume'` (and `DEFAULT_N_ROI_AXES` / `ROI_STATS`).
588
+
589
+ - [ ] **Step 3: Implement `extract_features_from_volume`**
590
+
591
+ Append at the END of `src/pipelines/mri_pipeline.py`:
592
+ ```python
593
+
594
+
595
+ # Default ROI partition: split a (D, H, W) volume into 2×2×2 = 8 octant ROIs.
596
+ # Octant index follows binary (z, y, x) ordering: 0..7.
597
+ DEFAULT_N_ROI_AXES: tuple[int, int, int] = (2, 2, 2)
598
+ ROI_STATS: tuple[str, ...] = ("mean", "std", "p10", "p50", "p90", "voxel_count")
599
+
600
+
601
+ def _roi_slices(
602
+ shape: tuple[int, int, int],
603
+ n_roi_axes: tuple[int, int, int],
604
+ ) -> list[tuple[slice, slice, slice]]:
605
+ """Generate the ROI slice list in deterministic (z, y, x) octant order."""
606
+ nz, ny, nx = n_roi_axes
607
+ dz, dy, dx = shape
608
+ bins_z = np.array_split(np.arange(dz), nz)
609
+ bins_y = np.array_split(np.arange(dy), ny)
610
+ bins_x = np.array_split(np.arange(dx), nx)
611
+ out: list[tuple[slice, slice, slice]] = []
612
+ for bz in bins_z:
613
+ for by in bins_y:
614
+ for bx in bins_x:
615
+ out.append((
616
+ slice(bz[0], bz[-1] + 1),
617
+ slice(by[0], by[-1] + 1),
618
+ slice(bx[0], bx[-1] + 1),
619
+ ))
620
+ return out
621
+
622
+
623
+ def _roi_stats_for(values: np.ndarray) -> dict[str, float]:
624
+ """Compute the 6 ROI stats. Empty array → all 0.0 (no-NaN contract)."""
625
+ if values.size == 0:
626
+ return {stat: 0.0 for stat in ROI_STATS}
627
+ return {
628
+ "mean": float(values.mean()),
629
+ "std": float(values.std()),
630
+ "p10": float(np.percentile(values, 10)),
631
+ "p50": float(np.percentile(values, 50)),
632
+ "p90": float(np.percentile(values, 90)),
633
+ "voxel_count": float(values.size),
634
+ }
635
+
636
+
637
+ def extract_features_from_volume(
638
+ volume: np.ndarray,
639
+ mask: np.ndarray,
640
+ n_roi_axes: tuple[int, int, int] = DEFAULT_N_ROI_AXES,
641
+ ) -> dict[str, float]:
642
+ """Compute per-ROI summary statistics from a masked volume.
643
+
644
+ The volume is partitioned into ``prod(n_roi_axes)`` axis-aligned octants
645
+ in deterministic (z, y, x) order. For each ROI, intensity values from
646
+ voxels where `mask` is True are summarized via mean / std / 10th, 50th,
647
+ 90th percentile / voxel count. Empty ROIs (no mask voxels) report all
648
+ zeros so the resulting Parquet has no NaN values.
649
+
650
+ Args:
651
+ volume: 3-D numeric `np.ndarray` (already validated).
652
+ mask: Boolean `np.ndarray` of the same shape (from `mask_brain`).
653
+ n_roi_axes: ROI grid along (z, y, x). Default `(2, 2, 2)` → 8 ROIs.
654
+
655
+ Returns:
656
+ Flat dict `{"feat_roi{i}_{stat}": float}` of length
657
+ ``prod(n_roi_axes) * len(ROI_STATS)``.
658
+ """
659
+ feats: dict[str, float] = {}
660
+ slices = _roi_slices(volume.shape, n_roi_axes)
661
+ for i, sl in enumerate(slices):
662
+ roi_values = volume[sl][mask[sl]]
663
+ stats = _roi_stats_for(roi_values)
664
+ for stat_name, stat_val in stats.items():
665
+ feats[f"feat_roi{i}_{stat_name}"] = stat_val
666
+ return feats
667
+ ```
668
+
669
+ - [ ] **Step 4: Run tests to verify they pass**
670
+
671
+ ```bash
672
+ pytest tests/pipelines/test_mri_pipeline.py -v
673
+ ```
674
+ Expected: 19 PASS (7 valid + 6 mask + 6 features). Total: **86 PASS**.
675
+
676
+ - [ ] **Step 5: Commit**
677
+
678
+ ```bash
679
+ git add tests/pipelines/test_mri_pipeline.py src/pipelines/mri_pipeline.py
680
+ git commit -m "feat(mri): add extract_features_from_volume (8 ROI octants × 6 stats)"
681
+ ```
682
+
683
+ ---
684
+
685
+ ## Task 5: `harmonize_combat` — site-effect removal (TDD)
686
+
687
+ **Files:**
688
+ - Modify: `tests/pipelines/test_mri_pipeline.py`
689
+ - Modify: `src/pipelines/mri_pipeline.py`
690
+
691
+ - [ ] **Step 1: Append the failing tests**
692
+
693
+ Extend merged tuple to include `harmonize_combat`:
694
+ ```python
695
+ from src.pipelines.mri_pipeline import (
696
+ DEFAULT_N_ROI_AXES,
697
+ ROI_STATS,
698
+ extract_features_from_volume,
699
+ harmonize_combat,
700
+ is_valid_volume,
701
+ mask_brain,
702
+ )
703
+ ```
704
+
705
+ Add `import pandas as pd` to the third-party imports of the test file (alphabetical: between numpy and pytest).
706
+
707
+ Append:
708
+ ```python
709
+
710
+
711
+ class TestHarmonizeCombat:
712
+ def _build_two_site_features(self) -> tuple[pd.DataFrame, pd.Series, list[str]]:
713
+ """Synthesize a 6-row × 4-feature table with a clear site bias."""
714
+ rng = np.random.default_rng(seed=42)
715
+ feature_cols = ["feat_roi0_mean", "feat_roi1_mean", "feat_roi2_mean", "feat_roi3_mean"]
716
+ # Site A baseline: mean ~0; Site B baseline: mean ~5 (the bias to remove).
717
+ site_a = rng.normal(loc=0.0, scale=1.0, size=(3, 4))
718
+ site_b = rng.normal(loc=5.0, scale=1.0, size=(3, 4))
719
+ df = pd.DataFrame(
720
+ np.vstack([site_a, site_b]),
721
+ columns=feature_cols,
722
+ )
723
+ sites = pd.Series(["A", "A", "A", "B", "B", "B"], name="site")
724
+ return df, sites, feature_cols
725
+
726
+ def test_returns_dataframe_same_shape_and_columns(self) -> None:
727
+ df, sites, feature_cols = self._build_two_site_features()
728
+ out = harmonize_combat(df, sites, feature_cols)
729
+ assert isinstance(out, pd.DataFrame)
730
+ assert out.shape == df.shape
731
+ assert list(out.columns) == feature_cols
732
+
733
+ def test_reduces_site_mean_difference(self) -> None:
734
+ """ComBat must shrink the per-site mean gap on every harmonized column."""
735
+ df, sites, feature_cols = self._build_two_site_features()
736
+ gap_before = (
737
+ df.loc[sites == "B", feature_cols].mean()
738
+ - df.loc[sites == "A", feature_cols].mean()
739
+ ).abs()
740
+
741
+ out = harmonize_combat(df, sites, feature_cols)
742
+ gap_after = (
743
+ out.loc[sites == "B", feature_cols].mean()
744
+ - out.loc[sites == "A", feature_cols].mean()
745
+ ).abs()
746
+
747
+ # Every column's site gap must shrink (ComBat aligns site means).
748
+ assert (gap_after < gap_before).all(), (
749
+ f"gap_before={gap_before.tolist()} gap_after={gap_after.tolist()}"
750
+ )
751
+
752
+ def test_output_dtype_float64(self) -> None:
753
+ df, sites, feature_cols = self._build_two_site_features()
754
+ out = harmonize_combat(df, sites, feature_cols)
755
+ for c in feature_cols:
756
+ assert out[c].dtype == np.float64, f"{c} → {out[c].dtype}"
757
+
758
+ def test_no_nan_in_output(self) -> None:
759
+ df, sites, feature_cols = self._build_two_site_features()
760
+ out = harmonize_combat(df, sites, feature_cols)
761
+ assert out[feature_cols].notna().all().all()
762
+ assert np.isfinite(out[feature_cols].to_numpy()).all()
763
+
764
+ def test_deterministic(self) -> None:
765
+ df, sites, feature_cols = self._build_two_site_features()
766
+ a = harmonize_combat(df, sites, feature_cols)
767
+ b = harmonize_combat(df.copy(), sites.copy(), list(feature_cols))
768
+ np.testing.assert_array_equal(a.to_numpy(), b.to_numpy())
769
+
770
+ def test_raises_on_single_site(self) -> None:
771
+ """ComBat needs at least 2 sites; a single-site dataset is malformed."""
772
+ df, _, feature_cols = self._build_two_site_features()
773
+ sites_one = pd.Series(["A"] * len(df), name="site")
774
+ with pytest.raises(ValueError, match="at least 2 sites"):
775
+ harmonize_combat(df, sites_one, feature_cols)
776
+ ```
777
+
778
+ - [ ] **Step 2: Run tests; they MUST fail**
779
+
780
+ ```bash
781
+ pytest tests/pipelines/test_mri_pipeline.py::TestHarmonizeCombat -v
782
+ ```
783
+ Expected: 6 FAILS with `cannot import name 'harmonize_combat'`.
784
+
785
+ - [ ] **Step 3: Implement `harmonize_combat`**
786
+
787
+ Add `import pandas as pd` to the third-party imports of `src/pipelines/mri_pipeline.py` (alphabetical: between numpy and pyarrow).
788
+
789
+ Append at the END of `src/pipelines/mri_pipeline.py`:
790
+ ```python
791
+
792
+
793
+ def harmonize_combat(
794
+ features: pd.DataFrame,
795
+ sites: pd.Series,
796
+ feature_cols: list[str],
797
+ ) -> pd.DataFrame:
798
+ """Apply ComBat harmonization across sites to remove site-level domain shift.
799
+
800
+ Wraps `neuroHarmonize.harmonizationLearn` which fits a parametric ComBat
801
+ model (no internal RNG → byte-deterministic given fixed input). Only
802
+ `feature_cols` are harmonized; other columns in `features` (e.g.
803
+ metadata) are not touched by this function — callers should join after.
804
+
805
+ Args:
806
+ features: DataFrame with at least the columns listed in `feature_cols`.
807
+ sites: Site label per row (length must match `len(features)`).
808
+ feature_cols: Names of the columns to harmonize.
809
+
810
+ Returns:
811
+ A new DataFrame of identical shape & column order to
812
+ `features[feature_cols]`, with ComBat-harmonized values.
813
+
814
+ Raises:
815
+ ValueError: if fewer than 2 distinct sites are present.
816
+ """
817
+ from neuroHarmonize import harmonizationLearn
818
+
819
+ if sites.nunique() < 2:
820
+ raise ValueError(
821
+ f"ComBat requires at least 2 sites; got {sites.nunique()} "
822
+ f"({sites.unique().tolist()})"
823
+ )
824
+
825
+ matrix = features[feature_cols].to_numpy(dtype=np.float64)
826
+ covars = pd.DataFrame({"SITE": sites.to_numpy()})
827
+
828
+ _, harmonized = harmonizationLearn(matrix, covars)
829
+ out = pd.DataFrame(
830
+ np.asarray(harmonized, dtype=np.float64),
831
+ columns=list(feature_cols),
832
+ index=features.index,
833
+ )
834
+ logger.info(
835
+ "ComBat harmonized %d rows × %d features across %d sites",
836
+ len(out), len(feature_cols), sites.nunique(),
837
+ )
838
+ return out
839
+ ```
840
+
841
+ - [ ] **Step 4: Run tests to verify they pass**
842
+
843
+ ```bash
844
+ pytest tests/pipelines/test_mri_pipeline.py -v
845
+ ```
846
+ Expected: 25 PASS (7 valid + 6 mask + 6 features + 6 combat). Total: **92 PASS**.
847
+
848
+ - [ ] **Step 5: Commit**
849
+
850
+ ```bash
851
+ git add tests/pipelines/test_mri_pipeline.py src/pipelines/mri_pipeline.py
852
+ git commit -m "feat(mri): add harmonize_combat wrapper around neuroHarmonize.harmonizationLearn"
853
+ ```
854
+
855
+ ---
856
+
857
+ ## Task 6: `run_pipeline` orchestrator + CLI (TDD)
858
+
859
+ **Files:**
860
+ - Modify: `tests/pipelines/test_mri_pipeline.py`
861
+ - Modify: `src/pipelines/mri_pipeline.py`
862
+
863
+ - [ ] **Step 1: Append the failing tests**
864
+
865
+ Extend merged tuple to include `run_pipeline`. Add `import shutil` to the stdlib block at the top of the test file (alphabetical: between `from pathlib import Path` and the third-party block).
866
+
867
+ Append:
868
+ ```python
869
+
870
+
871
+ class TestRunPipeline:
872
+ def _stage_inputs(self, tmp_path: Path) -> tuple[Path, Path, Path]:
873
+ """Copy the committed MRI fixture into a tmp_path layout."""
874
+ raw_dir = tmp_path / "data" / "raw" / "mri"
875
+ proc_dir = tmp_path / "data" / "processed"
876
+ raw_dir.mkdir(parents=True)
877
+ proc_dir.mkdir(parents=True)
878
+ for src in FIXTURE_DIR.iterdir():
879
+ shutil.copy(src, raw_dir / src.name)
880
+ sites_csv = raw_dir / "sites.csv"
881
+ output_path = proc_dir / "mri_features.parquet"
882
+ return raw_dir, sites_csv, output_path
883
+
884
+ def test_end_to_end_writes_processed_parquet(self, tmp_path: Path) -> None:
885
+ raw_dir, sites_csv, output_path = self._stage_inputs(tmp_path)
886
+ run_pipeline(
887
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=output_path,
888
+ )
889
+ assert output_path.exists()
890
+ df = pd.read_parquet(output_path)
891
+ assert len(df) == 6 # 6 subjects in the fixture
892
+ assert "subject_id" in df.columns
893
+ assert "site" in df.columns
894
+ assert any(c.startswith("feat_roi") for c in df.columns)
895
+
896
+ def test_run_pipeline_preserves_float64_for_features(self, tmp_path: Path) -> None:
897
+ raw_dir, sites_csv, output_path = self._stage_inputs(tmp_path)
898
+ run_pipeline(
899
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=output_path,
900
+ )
901
+ df = pd.read_parquet(output_path)
902
+ feat_cols = [c for c in df.columns if c.startswith("feat_")]
903
+ for c in feat_cols:
904
+ assert df[c].dtype == np.float64, f"{c} widened to {df[c].dtype}"
905
+
906
+ def test_run_pipeline_is_idempotent(self, tmp_path: Path) -> None:
907
+ raw_dir, sites_csv, output_path = self._stage_inputs(tmp_path)
908
+ run_pipeline(
909
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=output_path,
910
+ )
911
+ first = output_path.read_bytes()
912
+ run_pipeline(
913
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=output_path,
914
+ )
915
+ second = output_path.read_bytes()
916
+ assert first == second, "MRI pipeline output must be byte-deterministic"
917
+
918
+ def test_run_pipeline_reduces_site_gap(self, tmp_path: Path) -> None:
919
+ """End-to-end: ComBat must shrink the per-site mean gap in feat_roi0_mean."""
920
+ raw_dir, sites_csv, output_path = self._stage_inputs(tmp_path)
921
+ run_pipeline(
922
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=output_path,
923
+ )
924
+ df = pd.read_parquet(output_path)
925
+ # Brain mean before harmonization differs by ~5 between sites.
926
+ # After ComBat, the per-site mean of feat_roi0_mean must be much closer.
927
+ site_means = df.groupby("site")["feat_roi0_mean"].mean()
928
+ gap = abs(site_means["B"] - site_means["A"])
929
+ assert gap < 1.0, f"site gap after ComBat: {gap}"
930
+
931
+ def test_run_pipeline_raises_when_input_missing(self, tmp_path: Path) -> None:
932
+ with pytest.raises(FileNotFoundError, match="MRI input directory not found"):
933
+ run_pipeline(
934
+ input_dir=tmp_path / "nope",
935
+ sites_csv=tmp_path / "sites.csv",
936
+ output_path=tmp_path / "out.parquet",
937
+ )
938
+
939
+ def test_run_pipeline_rejects_directory_as_output(self, tmp_path: Path) -> None:
940
+ raw_dir, sites_csv, _ = self._stage_inputs(tmp_path)
941
+ bad_output = tmp_path / "out_dir"
942
+ bad_output.mkdir()
943
+ with pytest.raises(IsADirectoryError, match="must be a file"):
944
+ run_pipeline(
945
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=bad_output,
946
+ )
947
+
948
+ def test_run_pipeline_drops_invalid_volumes(self, tmp_path: Path) -> None:
949
+ """A NaN-containing volume must be logged + dropped, not silently included."""
950
+ raw_dir, sites_csv, output_path = self._stage_inputs(tmp_path)
951
+ # Corrupt subject_5 to contain NaN. Re-save in place.
952
+ bad = nib.load(raw_dir / "subject_5.nii.gz").get_fdata()
953
+ bad[0, 0, 0] = np.nan
954
+ nib.save(nib.Nifti1Image(bad, affine=np.eye(4)), raw_dir / "subject_5.nii.gz")
955
+
956
+ run_pipeline(
957
+ input_dir=raw_dir, sites_csv=sites_csv, output_path=output_path,
958
+ )
959
+ df = pd.read_parquet(output_path)
960
+ # 5 surviving valid subjects (subject_5 dropped).
961
+ assert len(df) == 5
962
+ assert "subject_5" not in df["subject_id"].tolist()
963
+ ```
964
+
965
+ - [ ] **Step 2: Run tests; they MUST fail**
966
+
967
+ ```bash
968
+ pytest tests/pipelines/test_mri_pipeline.py::TestRunPipeline -v
969
+ ```
970
+ Expected: 7 FAILS with `cannot import name 'run_pipeline'`.
971
+
972
+ - [ ] **Step 3: Implement `run_pipeline` + CLI**
973
+
974
+ Add `from pathlib import Path` to the stdlib imports of `src/pipelines/mri_pipeline.py`. Final stdlib block:
975
+ ```python
976
+ from __future__ import annotations
977
+
978
+ import os
979
+ from pathlib import Path
980
+ ```
981
+
982
+ Append at the END of `src/pipelines/mri_pipeline.py`:
983
+ ```python
984
+
985
+
986
+ # Default I/O paths for the MRI pipeline. Override via run_pipeline() args.
987
+ DEFAULT_INPUT = Path("data/raw/mri")
988
+ DEFAULT_OUTPUT = Path("data/processed/mri_features.parquet")
989
+
990
+
991
+ def _list_nifti_volumes(input_dir: Path) -> list[Path]:
992
+ """Return sorted list of .nii / .nii.gz files in `input_dir`."""
993
+ return sorted(
994
+ p for p in input_dir.iterdir()
995
+ if p.suffix == ".nii" or p.name.endswith(".nii.gz")
996
+ )
997
+
998
+
999
+ def run_pipeline(
1000
+ input_dir: Path = DEFAULT_INPUT,
1001
+ sites_csv: Path | None = None,
1002
+ output_path: Path = DEFAULT_OUTPUT,
1003
+ intensity_threshold: float | None = None,
1004
+ n_roi_axes: tuple[int, int, int] = DEFAULT_N_ROI_AXES,
1005
+ ) -> None:
1006
+ """Run the MRI pipeline end-to-end: NIfTI directory → harmonized Parquet.
1007
+
1008
+ For each `subject_id.nii(.gz)` in `input_dir`, validates the volume,
1009
+ masks the brain, computes per-ROI statistics, then harmonizes across
1010
+ sites (column "site" of `sites_csv`, joined on "subject_id") via ComBat.
1011
+ Output is float64 Parquet at `output_path`.
1012
+
1013
+ Args:
1014
+ input_dir: Directory containing one NIfTI per subject and a
1015
+ `sites.csv` (or `sites_csv` override) with columns
1016
+ `subject_id, site`.
1017
+ sites_csv: Path to the site-covariates CSV. If `None`, defaults to
1018
+ `input_dir / "sites.csv"`.
1019
+ output_path: Where to write the processed feature Parquet file.
1020
+ intensity_threshold: Brain-mask intensity floor. `None` → per-volume
1021
+ mean (see `mask_brain`).
1022
+ n_roi_axes: ROI grid (z, y, x).
1023
+
1024
+ Raises:
1025
+ FileNotFoundError: if `input_dir` does not exist.
1026
+ IsADirectoryError: if `output_path` resolves to an existing directory.
1027
+ """
1028
+ input_dir = Path(input_dir)
1029
+ output_path = Path(output_path)
1030
+ if not input_dir.exists():
1031
+ raise FileNotFoundError(f"MRI input directory not found: {input_dir}")
1032
+ sites_csv = Path(sites_csv) if sites_csv is not None else input_dir / "sites.csv"
1033
+ if not sites_csv.exists():
1034
+ raise FileNotFoundError(f"sites_csv not found: {sites_csv}")
1035
+
1036
+ logger.info("Reading MRI volumes from %s", input_dir)
1037
+ nifti_paths = _list_nifti_volumes(input_dir)
1038
+ sites_df = pd.read_csv(sites_csv)
1039
+
1040
+ rows: list[dict[str, float | str]] = []
1041
+ invalid_subject_ids: list[str] = []
1042
+ for path in nifti_paths:
1043
+ subject_id = path.name.removesuffix(".nii.gz").removesuffix(".nii")
1044
+ volume = nib.load(path).get_fdata()
1045
+ if not is_valid_volume(volume):
1046
+ invalid_subject_ids.append(subject_id)
1047
+ continue
1048
+ mask = mask_brain(volume, intensity_threshold=intensity_threshold)
1049
+ feats = extract_features_from_volume(volume, mask, n_roi_axes=n_roi_axes)
1050
+ feats["subject_id"] = subject_id
1051
+ rows.append(feats)
1052
+
1053
+ n_total = len(nifti_paths)
1054
+ n_dropped = len(invalid_subject_ids)
1055
+ if n_dropped:
1056
+ display = invalid_subject_ids[:10]
1057
+ suffix = (
1058
+ f"... (+{n_dropped - 10} more)" if n_dropped > 10 else ""
1059
+ )
1060
+ logger.warning(
1061
+ "Dropping %d/%d volumes with invalid samples (subjects=%s%s)",
1062
+ n_dropped, n_total, display, suffix,
1063
+ )
1064
+
1065
+ feature_cols = [
1066
+ f"feat_roi{i}_{stat}"
1067
+ for i in range(int(np.prod(n_roi_axes)))
1068
+ for stat in ROI_STATS
1069
+ ]
1070
+
1071
+ if not rows:
1072
+ logger.info(
1073
+ "Feature extraction complete: in=%d, out=0, dropped=%d (%.2f%%)",
1074
+ n_total, n_dropped, 100.0 * n_dropped / max(n_total, 1),
1075
+ )
1076
+ empty = pd.DataFrame(
1077
+ columns=["subject_id", "site", *feature_cols]
1078
+ ).astype({c: np.float64 for c in feature_cols})
1079
+ output_path.parent.mkdir(parents=True, exist_ok=True)
1080
+ if output_path.is_dir():
1081
+ raise IsADirectoryError(
1082
+ f"output_path must be a file, got a directory: {output_path}"
1083
+ )
1084
+ empty.to_parquet(
1085
+ output_path, index=False, engine="pyarrow", compression="snappy",
1086
+ )
1087
+ return
1088
+
1089
+ raw_features = pd.DataFrame(rows)
1090
+ raw_features = raw_features.merge(sites_df, on="subject_id", how="left")
1091
+ if raw_features["site"].isna().any():
1092
+ missing = raw_features.loc[raw_features["site"].isna(), "subject_id"].tolist()
1093
+ raise KeyError(
1094
+ f"sites_csv missing site assignment for subjects: {missing}"
1095
+ )
1096
+
1097
+ harmonized = harmonize_combat(
1098
+ raw_features, raw_features["site"], feature_cols,
1099
+ )
1100
+ final = pd.concat(
1101
+ [raw_features[["subject_id", "site"]].reset_index(drop=True),
1102
+ harmonized.reset_index(drop=True)],
1103
+ axis=1,
1104
+ )
1105
+
1106
+ output_path.parent.mkdir(parents=True, exist_ok=True)
1107
+ if output_path.is_dir():
1108
+ raise IsADirectoryError(
1109
+ f"output_path must be a file, got a directory: {output_path}"
1110
+ )
1111
+ # Parquet preserves dtypes (float64 features stay float64) and is
1112
+ # byte-deterministic with single-threaded snappy. AGENTS.md §6.
1113
+ final.to_parquet(
1114
+ output_path, index=False, engine="pyarrow", compression="snappy",
1115
+ )
1116
+ logger.info(
1117
+ "Feature extraction complete: in=%d, out=%d, dropped=%d (%.2f%%)",
1118
+ n_total, len(final), n_dropped, 100.0 * n_dropped / max(n_total, 1),
1119
+ )
1120
+ logger.info(
1121
+ "Wrote processed features to %s (rows=%d, cols=%d)",
1122
+ output_path, len(final), final.shape[1],
1123
+ )
1124
+
1125
+
1126
+ if __name__ == "__main__":
1127
+ # Day-3 CLI entrypoint — runs with default paths against `data/raw/mri/`.
1128
+ # Expects `data/raw/mri/sites.csv` with columns `subject_id, site`.
1129
+ # Argument parsing (argparse / click) will land in a later task.
1130
+ # python -m src.pipelines.mri_pipeline
1131
+ run_pipeline()
1132
+ ```
1133
+
1134
+ - [ ] **Step 4: Run all tests**
1135
+
1136
+ ```bash
1137
+ pytest -v
1138
+ ```
1139
+ Expected: **99 PASS** (67 prior + 32 MRI: 7 valid + 6 mask + 6 features + 6 combat + 7 run_pipeline).
1140
+
1141
+ - [ ] **Step 5: Commit**
1142
+
1143
+ ```bash
1144
+ git add tests/pipelines/test_mri_pipeline.py src/pipelines/mri_pipeline.py
1145
+ git commit -m "feat(mri): add run_pipeline orchestrator + CLI (NIfTI dir → ComBat Parquet)"
1146
+ ```
1147
+
1148
+ ---
1149
+
1150
+ ## Task 7: AGENTS.md + README updates
1151
+
1152
+ **Files:**
1153
+ - Modify: `AGENTS.md`
1154
+ - Modify: `README.md`
1155
+
1156
+ - [ ] **Step 1: Update AGENTS.md §1 pipeline table**
1157
+
1158
+ Find the existing line:
1159
+ ```
1160
+ | Image (MRI / fMRI) | `src/pipelines/mri_pipeline.py` *(planned, Day 3)* | ComBat Harmonization for site-level domain shift |
1161
+ ```
1162
+ Replace with:
1163
+ ```
1164
+ | Image (MRI / fMRI) | `src/pipelines/mri_pipeline.py` | ComBat Harmonization for site-level domain shift |
1165
+ ```
1166
+
1167
+ - [ ] **Step 2: Update README.md Status table**
1168
+
1169
+ Find:
1170
+ ```
1171
+ | 3 | Image (MRI / fMRI) | `src/pipelines/mri_pipeline.py` | Planned (ComBat harmonization) |
1172
+ ```
1173
+ Replace with:
1174
+ ```
1175
+ | 3 | Image (MRI / fMRI) | `src/pipelines/mri_pipeline.py` | Shipped — 99 tests green |
1176
+ ```
1177
+
1178
+ Bump any test-count references elsewhere in the README from 67 to 99.
1179
+
1180
+ - [ ] **Step 3: Add MRI Pipeline section to README.md**
1181
+
1182
+ Directly after the EEG Pipeline (Day 2) section, append:
1183
+
1184
+ ```markdown
1185
+ ## MRI Pipeline (Day 3)
1186
+
1187
+ | Function | Purpose |
1188
+ |---|---|
1189
+ | `is_valid_volume(volume)` | Returns True iff input is a finite, numeric, non-empty 3-D ndarray. Rejects NaN/inf, non-numeric dtypes, lists/scalars. |
1190
+ | `mask_brain(volume, intensity_threshold)` | Two-step brain mask: intensity threshold (default = volume mean) + morphological opening to drop isolated noise voxels. Non-mutating. |
1191
+ | `harmonize_combat(features, sites, feature_cols)` | Wraps `neuroHarmonize.harmonizationLearn` to remove per-site additive bias on the named columns. RNG-free → byte-deterministic. Raises if fewer than 2 sites. |
1192
+ | `extract_features_from_volume(volume, mask, n_roi_axes)` | Partitions the masked volume into `prod(n_roi_axes)` axis-aligned octants (default 2×2×2 = 8) and emits 6 stats per ROI: mean / std / p10 / p50 / p90 / voxel_count. Empty ROIs report 0.0 (no NaN survives). |
1193
+ | `run_pipeline(input_dir, sites_csv, output_path, ...)` | End-to-end NIfTI directory → ComBat-harmonized Parquet orchestrator. Drops invalid volumes with a logged WARNING. Idempotent; raises on missing input or directory output. |
1194
+
1195
+ Output schema: one row per surviving subject with columns `subject_id, site, feat_roi{i}_<stat>`. All `feat_*` are float64 (preserved through the Parquet round-trip).
1196
+ ```
1197
+
1198
+ - [ ] **Step 4: Add MRI smoke-run to README Quick Start**
1199
+
1200
+ After the EEG smoke-run block, append:
1201
+
1202
+ ```bash
1203
+ # Smoke-test the MRI pipeline with the bundled fixture (6 subjects × 2 sites)
1204
+ mkdir -p data/raw
1205
+ cp -r tests/fixtures/mri_sample/* data/raw/mri/ 2>/dev/null || mkdir -p data/raw/mri && cp -r tests/fixtures/mri_sample/* data/raw/mri/
1206
+ python -m src.pipelines.mri_pipeline
1207
+ ```
1208
+ Then below: `Result lives at` `data/processed/mri_features.parquet`.
1209
+
1210
+ - [ ] **Step 5: Update README Roadmap**
1211
+
1212
+ Find the Day-3 bullet and convert to past-tense shipped form. The section's other Day entries already use the same form.
1213
+
1214
+ - [ ] **Step 6: Commit**
1215
+
1216
+ ```bash
1217
+ git add AGENTS.md README.md
1218
+ git commit -m "docs: mark MRI pipeline shipped; add Day-3 smoke run + function reference"
1219
+ ```
1220
+
1221
+ ---
1222
+
1223
+ ## Task 8: DoD verification + smoke run
1224
+
1225
+ **Files:** none modified (verification only).
1226
+
1227
+ - [ ] **Step 1: Full test suite green**
1228
+
1229
+ ```bash
1230
+ cd /Users/mertgungor/Desktop/hackathon
1231
+ source .venv312/bin/activate
1232
+ pytest -v --tb=short
1233
+ ```
1234
+ Required: **99 passed**, 0 failed, 0 skipped, 0 warnings.
1235
+
1236
+ - [ ] **Step 2: CLI smoke run + idempotency**
1237
+
1238
+ ```bash
1239
+ mkdir -p data/raw/mri
1240
+ cp -r tests/fixtures/mri_sample/* data/raw/mri/
1241
+ rm -f data/processed/mri_features.parquet
1242
+
1243
+ python -m src.pipelines.mri_pipeline
1244
+ md5_run1=$(md5 -q data/processed/mri_features.parquet 2>/dev/null || md5sum data/processed/mri_features.parquet | awk '{print $1}')
1245
+
1246
+ python -m src.pipelines.mri_pipeline
1247
+ md5_run2=$(md5 -q data/processed/mri_features.parquet 2>/dev/null || md5sum data/processed/mri_features.parquet | awk '{print $1}')
1248
+
1249
+ echo "MD5 run1: $md5_run1"
1250
+ echo "MD5 run2: $md5_run2"
1251
+ ```
1252
+ Required: matching MD5s.
1253
+
1254
+ - [ ] **Step 3: Verify schema + ComBat effect**
1255
+
1256
+ ```bash
1257
+ python -c "
1258
+ import pandas as pd
1259
+ import numpy as np
1260
+ df = pd.read_parquet('data/processed/mri_features.parquet')
1261
+ print('rows:', len(df))
1262
+ print('cols:', df.shape[1])
1263
+ print('subject_id present:', 'subject_id' in df.columns)
1264
+ print('site present:', 'site' in df.columns)
1265
+ print('feat_roi*:', sum(c.startswith('feat_roi') for c in df.columns))
1266
+ print('all feats float64:', all(df[c].dtype.name == 'float64' for c in df.columns if c.startswith('feat_')))
1267
+ print('any NaN/inf:', df.isna().any().any() or not np.isfinite(df.select_dtypes(include=[np.number]).to_numpy()).all())
1268
+ gap = abs(df.groupby('site')['feat_roi0_mean'].mean().diff().iloc[-1])
1269
+ print(f'site gap on feat_roi0_mean: {gap:.3f} (must be < 1.0 after ComBat)')
1270
+ "
1271
+ ```
1272
+ Required:
1273
+ - rows = 6
1274
+ - 48 `feat_roi*` columns (8 ROIs × 6 stats)
1275
+ - all features float64
1276
+ - no NaN/inf
1277
+ - site gap < 1.0 (vs. ~5.0 before harmonization)
1278
+
1279
+ - [ ] **Step 4: Confirm gitignore covers MRI artifacts**
1280
+
1281
+ ```bash
1282
+ git check-ignore -v data/raw/mri/subject_0.nii.gz data/processed/mri_features.parquet
1283
+ git status
1284
+ ```
1285
+ Both must be ignored. Working tree clean.
1286
+
1287
+ - [ ] **Step 5: Day-1 + Day-2 regression**
1288
+
1289
+ ```bash
1290
+ # BBB still works
1291
+ cp tests/fixtures/bbbp_sample.csv data/raw/bbbp.csv
1292
+ python -m src.pipelines.bbb_pipeline 2>&1 | tail -1
1293
+
1294
+ # EEG still works
1295
+ cp tests/fixtures/eeg_sample.fif data/raw/eeg.fif
1296
+ python -m src.pipelines.eeg_pipeline 2>&1 | tail -1
1297
+
1298
+ ls -lh data/processed/
1299
+ ```
1300
+ Required: all three Parquet files (`bbbp_features.parquet`, `eeg_features.parquet`, `mri_features.parquet`) present.
1301
+
1302
+ ---
1303
+
1304
+ ## Day-3 Definition of Done
1305
+
1306
+ - [ ] `src/pipelines/mri_pipeline.py` exposes `is_valid_volume`, `mask_brain`, `harmonize_combat`, `extract_features_from_volume`, `run_pipeline`, plus `DEFAULT_N_ROI_AXES`, `ROI_STATS`, `DEFAULT_INPUT`, `DEFAULT_OUTPUT`.
1307
+ - [ ] `python -m src.pipelines.mri_pipeline` against `data/raw/mri/` (with `sites.csv`) produces a deterministic Parquet at `data/processed/mri_features.parquet`.
1308
+ - [ ] Invalid volumes (NaN/inf) logged with their subject ids and dropped (Data Readiness §4 rule 2).
1309
+ - [ ] ComBat shrinks the per-site mean gap on harmonized columns by >5× (verified by `test_run_pipeline_reduces_site_gap` + DoD §3).
1310
+ - [ ] Same input → byte-identical Parquet across runs (rule 3).
1311
+ - [ ] Per-volume schema: `feat_roi{i}_<stat>` for `i in 0..7`, `<stat>` in `(mean, std, p10, p50, p90, voxel_count)`.
1312
+ - [ ] Float64 dtype preserved through the Parquet round-trip.
1313
+ - [ ] Test suite: **99 passing**, 0 failures, 0 warnings.
1314
+ - [ ] BBB (Day 1) and EEG (Day 2) regressions pass.
1315
+ - [ ] AGENTS.md §1 MRI row no longer says "(planned, Day 3)".
1316
+ - [ ] README Status table marks MRI shipped; MRI Pipeline reference section + Quick Start smoke run added.
1317
+ - [ ] At least 7 atomic Day-3 commits (1 fixture + 5 TDD features + 1 docs + verification).