mekosotto commited on
Commit
0af6558
·
1 Parent(s): 0643e09

refactor(mri): use core.determinism + core.storage helpers

Browse files
Files changed (1) hide show
  1. src/pipelines/mri_pipeline.py +6 -25
src/pipelines/mri_pipeline.py CHANGED
@@ -11,27 +11,22 @@ traceability (in/out/dropped counts at INFO), and idempotent overwrite.
11
  """
12
  from __future__ import annotations
13
 
14
- import os
15
  from pathlib import Path
16
 
17
  import nibabel as nib
18
  import numpy as np
19
  import pandas as pd
20
- import pyarrow as pa
21
  from scipy import ndimage as scipy_ndimage
22
 
 
23
  from src.core.logger import get_logger
 
24
 
25
  logger = get_logger(__name__)
26
 
27
  # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
28
- # (AGENTS.md §4 rule 3) holds across hardware. Without this, multi-threaded
29
- # floating-point reductions can reorder and produce non-bit-identical output.
30
- os.environ.setdefault("OMP_NUM_THREADS", "1")
31
- os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
32
- os.environ.setdefault("MKL_NUM_THREADS", "1")
33
- pa.set_cpu_count(1)
34
- pa.set_io_thread_count(1)
35
 
36
 
37
  def is_valid_volume(volume: np.ndarray | None) -> bool:
@@ -362,14 +357,7 @@ def run_pipeline(
362
  empty = pd.DataFrame(
363
  columns=["subject_id", "site", *feature_cols]
364
  ).astype({c: np.float64 for c in feature_cols})
365
- output_path.parent.mkdir(parents=True, exist_ok=True)
366
- if output_path.is_dir():
367
- raise IsADirectoryError(
368
- f"output_path must be a file, got a directory: {output_path}"
369
- )
370
- empty.to_parquet(
371
- output_path, index=False, engine="pyarrow", compression="snappy",
372
- )
373
  return
374
 
375
  raw_features = pd.DataFrame(rows)
@@ -417,16 +405,9 @@ def run_pipeline(
417
  n_total, len(final), n_dropped, 100.0 * n_dropped / max(n_total, 1),
418
  )
419
 
420
- output_path.parent.mkdir(parents=True, exist_ok=True)
421
- if output_path.is_dir():
422
- raise IsADirectoryError(
423
- f"output_path must be a file, got a directory: {output_path}"
424
- )
425
  # Parquet preserves dtypes (float64 features stay float64) and is
426
  # byte-deterministic with single-threaded snappy. AGENTS.md §6.
427
- final.to_parquet(
428
- output_path, index=False, engine="pyarrow", compression="snappy",
429
- )
430
  logger.info(
431
  "Wrote processed features to %s (rows=%d, cols=%d)",
432
  output_path, len(final), final.shape[1],
 
11
  """
12
  from __future__ import annotations
13
 
 
14
  from pathlib import Path
15
 
16
  import nibabel as nib
17
  import numpy as np
18
  import pandas as pd
 
19
  from scipy import ndimage as scipy_ndimage
20
 
21
+ from src.core.determinism import pin_threads
22
  from src.core.logger import get_logger
23
+ from src.core.storage import write_parquet
24
 
25
  logger = get_logger(__name__)
26
 
27
  # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
28
+ # (AGENTS.md §4 rule 3) holds across hardware. See src.core.determinism.
29
+ pin_threads()
 
 
 
 
 
30
 
31
 
32
  def is_valid_volume(volume: np.ndarray | None) -> bool:
 
357
  empty = pd.DataFrame(
358
  columns=["subject_id", "site", *feature_cols]
359
  ).astype({c: np.float64 for c in feature_cols})
360
+ write_parquet(empty, output_path)
 
 
 
 
 
 
 
361
  return
362
 
363
  raw_features = pd.DataFrame(rows)
 
405
  n_total, len(final), n_dropped, 100.0 * n_dropped / max(n_total, 1),
406
  )
407
 
 
 
 
 
 
408
  # Parquet preserves dtypes (float64 features stay float64) and is
409
  # byte-deterministic with single-threaded snappy. AGENTS.md §6.
410
+ write_parquet(final, output_path)
 
 
411
  logger.info(
412
  "Wrote processed features to %s (rows=%d, cols=%d)",
413
  output_path, len(final), final.shape[1],