mekosotto commited on
Commit
4dab60f
·
1 Parent(s): c70b852

refactor(bbb): use core.determinism + core.storage helpers

Browse files
Files changed (1) hide show
  1. src/pipelines/bbb_pipeline.py +5 -15
src/pipelines/bbb_pipeline.py CHANGED
@@ -11,28 +11,23 @@ traceability (row count in / out / dropped), and idempotent output.
11
  from __future__ import annotations
12
 
13
  import math
14
- import os
15
  from pathlib import Path
16
 
17
  import numpy as np
18
  import pandas as pd
19
- import pyarrow as pa
20
  from rdkit import Chem, RDLogger
21
  from rdkit.Chem import AllChem
22
  from rdkit.DataStructs import ConvertToNumpyArray
23
 
 
24
  from src.core.logger import get_logger
 
25
 
26
  logger = get_logger(__name__)
27
 
28
  # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
29
- # (AGENTS.md §4 rule 3) holds across hardware. Without this, multi-threaded
30
- # floating-point reductions can reorder and produce non-bit-identical output.
31
- os.environ.setdefault("OMP_NUM_THREADS", "1")
32
- os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
33
- os.environ.setdefault("MKL_NUM_THREADS", "1")
34
- pa.set_cpu_count(1)
35
- pa.set_io_thread_count(1)
36
 
37
  # Suppress RDKit's noisy C++-level warning stream; we surface our own
38
  # structured warnings via the project logger when a SMILES fails to parse.
@@ -237,14 +232,9 @@ def run_pipeline(
237
  df, smiles_col=smiles_col, n_bits=n_bits, radius=radius,
238
  )
239
 
240
- output_path.parent.mkdir(parents=True, exist_ok=True)
241
- if output_path.is_dir():
242
- raise IsADirectoryError(
243
- f"output_path must be a file, got a directory: {output_path}"
244
- )
245
  # Parquet preserves dtypes (uint8 stays uint8) and is byte-deterministic
246
  # when compression is fixed. Used across BBB / EEG / MRI pipelines.
247
- features.to_parquet(output_path, index=False, engine="pyarrow", compression="snappy")
248
  logger.info(
249
  "Wrote processed features to %s (rows=%d, cols=%d)",
250
  output_path, len(features), features.shape[1],
 
11
  from __future__ import annotations
12
 
13
  import math
 
14
  from pathlib import Path
15
 
16
  import numpy as np
17
  import pandas as pd
 
18
  from rdkit import Chem, RDLogger
19
  from rdkit.Chem import AllChem
20
  from rdkit.DataStructs import ConvertToNumpyArray
21
 
22
+ from src.core.determinism import pin_threads
23
  from src.core.logger import get_logger
24
+ from src.core.storage import write_parquet
25
 
26
  logger = get_logger(__name__)
27
 
28
  # Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
29
+ # (AGENTS.md §4 rule 3) holds across hardware.
30
+ pin_threads()
 
 
 
 
 
31
 
32
  # Suppress RDKit's noisy C++-level warning stream; we surface our own
33
  # structured warnings via the project logger when a SMILES fails to parse.
 
232
  df, smiles_col=smiles_col, n_bits=n_bits, radius=radius,
233
  )
234
 
 
 
 
 
 
235
  # Parquet preserves dtypes (uint8 stays uint8) and is byte-deterministic
236
  # when compression is fixed. Used across BBB / EEG / MRI pipelines.
237
+ write_parquet(features, output_path)
238
  logger.info(
239
  "Wrote processed features to %s (rows=%d, cols=%d)",
240
  output_path, len(features), features.shape[1],