mekosotto commited on
Commit
0643e09
·
1 Parent(s): 4dab60f

refactor(eeg): use core.determinism + core.storage helpers

Browse files
Files changed (1) hide show
  1. src/pipelines/eeg_pipeline.py +5 -17
src/pipelines/eeg_pipeline.py CHANGED
@@ -12,30 +12,25 @@ a logged WARNING), determinism (seeded ICA + sklearn RNG), traceability
12
  """
13
  from __future__ import annotations
14
 
15
- import os
16
  from pathlib import Path
17
  from typing import Callable
18
 
19
  import mne
20
  import numpy as np
21
  import pandas as pd
22
- import pyarrow as pa
23
  from mne.preprocessing import ICA
24
  from scipy import signal as scipy_signal
25
  from scipy import stats as scipy_stats
26
 
 
27
  from src.core.logger import get_logger
 
28
 
29
  logger = get_logger(__name__)
30
 
31
  # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
32
- # (AGENTS.md §4 rule 3) holds across hardware. Without this, multi-threaded
33
- # floating-point reductions can reorder and produce non-bit-identical output.
34
- os.environ.setdefault("OMP_NUM_THREADS", "1")
35
- os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
36
- os.environ.setdefault("MKL_NUM_THREADS", "1")
37
- pa.set_cpu_count(1)
38
- pa.set_io_thread_count(1)
39
 
40
  # Pearson-correlation threshold for EOG-component rejection in ICA.
41
  # Real-world EOG components typically score 0.8-0.95 against the EOG channel;
@@ -464,16 +459,9 @@ def run_pipeline(
464
  random_state=random_state,
465
  )
466
 
467
- output_path.parent.mkdir(parents=True, exist_ok=True)
468
- if output_path.is_dir():
469
- raise IsADirectoryError(
470
- f"output_path must be a file, got a directory: {output_path}"
471
- )
472
  # Parquet preserves dtypes (float64 features stay float64) and is
473
  # byte-deterministic with single-threaded snappy. AGENTS.md §6.
474
- features.to_parquet(
475
- output_path, index=False, engine="pyarrow", compression="snappy",
476
- )
477
  logger.info(
478
  "Wrote processed features to %s (rows=%d, cols=%d)",
479
  output_path, len(features), features.shape[1],
 
12
  """
13
  from __future__ import annotations
14
 
 
15
  from pathlib import Path
16
  from typing import Callable
17
 
18
  import mne
19
  import numpy as np
20
  import pandas as pd
 
21
  from mne.preprocessing import ICA
22
  from scipy import signal as scipy_signal
23
  from scipy import stats as scipy_stats
24
 
25
+ from src.core.determinism import pin_threads
26
  from src.core.logger import get_logger
27
+ from src.core.storage import write_parquet
28
 
29
  logger = get_logger(__name__)
30
 
31
  # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
32
+ # (AGENTS.md §4 rule 3) holds across hardware. See src.core.determinism.
33
+ pin_threads()
 
 
 
 
 
34
 
35
  # Pearson-correlation threshold for EOG-component rejection in ICA.
36
  # Real-world EOG components typically score 0.8-0.95 against the EOG channel;
 
459
  random_state=random_state,
460
  )
461
 
 
 
 
 
 
462
  # Parquet preserves dtypes (float64 features stay float64) and is
463
  # byte-deterministic with single-threaded snappy. AGENTS.md §6.
464
+ write_parquet(features, output_path)
 
 
465
  logger.info(
466
  "Wrote processed features to %s (rows=%d, cols=%d)",
467
  output_path, len(features), features.shape[1],