mekosotto Claude Sonnet 4.6 commited on
Commit
915880e
·
1 Parent(s): 32d3a5f

feat(bbb): switch processed output to Parquet for dtype preservation

Browse files
AGENTS.md CHANGED
@@ -31,7 +31,7 @@ All experiment runs are tracked in **MLflow**. All services ship as **Docker** i
31
  ├── pytest.ini
32
  ├── data/
33
  │ ├── raw/ # Untouched source data. NEVER train on this directly.
34
- │ └── processed/ # Pipeline output. Model-ready outputs (overwritten on each run; see §4).
35
  ├── src/
36
  │ ├── api/ # FastAPI routers, request/response schemas
37
  │ ├── pipelines/ # One file per modality. Pure functions + a `run_pipeline()` entry.
@@ -85,3 +85,12 @@ refactored into a pipeline.
85
  5. Write deterministic output to `output_path`.
86
  6. Document any new dependency in `requirements.txt` (pinned).
87
  7. Add a one-line entry to this file's pipeline table.
 
 
 
 
 
 
 
 
 
 
31
  ├── pytest.ini
32
  ├── data/
33
  │ ├── raw/ # Untouched source data. NEVER train on this directly.
34
+ │ └── processed/ # Pipeline output as Parquet (preserves dtypes; overwritten each run; see §4).
35
  ├── src/
36
  │ ├── api/ # FastAPI routers, request/response schemas
37
  │ ├── pipelines/ # One file per modality. Pure functions + a `run_pipeline()` entry.
 
85
  5. Write deterministic output to `output_path`.
86
  6. Document any new dependency in `requirements.txt` (pinned).
87
  7. Add a one-line entry to this file's pipeline table.
88
+
89
+ ## 6. Storage Format Convention
90
+
91
+ All `data/processed/` outputs MUST be **Parquet** (`pyarrow` engine, `compression="snappy"`):
92
+ - Preserves dtypes (uint8 fingerprints stay uint8; float32 EEG features stay float32) — CSV silently widens numeric columns and is unsuitable for the high-dimensional float arrays produced by the EEG and MRI pipelines.
93
+ - Byte-deterministic with fixed compression and single-threaded writes (satisfies §4 Determinism).
94
+ - Read with `pd.read_parquet(path)`; no dtype hints required.
95
+
96
+ The raw `data/raw/` inputs may be in any vendor-supplied format (CSV for BBBP, EDF/FIF for EEG, NIfTI for MRI).
requirements.txt CHANGED
@@ -1,4 +1,4 @@
1
- # Requires Python 3.10–3.12 (rdkit / numpy / pandas / scipy / scikit-learn pins ship cp310–cp312 wheels only).
2
  # See AGENTS.md §3 for the full coding-standards contract.
3
 
4
  # --- Web / API layer ---
@@ -9,6 +9,7 @@ pydantic==2.9.2
9
  # --- Core data stack ---
10
  numpy==1.26.4
11
  pandas==2.2.2
 
12
  scipy==1.13.1
13
  scikit-learn==1.5.1
14
 
 
1
+ # Requires Python 3.10–3.12 (rdkit / numpy / pandas / pyarrow / scipy / scikit-learn pins ship cp310–cp312 wheels only).
2
  # See AGENTS.md §3 for the full coding-standards contract.
3
 
4
  # --- Web / API layer ---
 
9
  # --- Core data stack ---
10
  numpy==1.26.4
11
  pandas==2.2.2
12
+ pyarrow==17.0.0
13
  scipy==1.13.1
14
  scikit-learn==1.5.1
15
 
src/pipelines/bbb_pipeline.py CHANGED
@@ -36,7 +36,7 @@ RDLogger.DisableLog("rdApp.*")
36
 
37
  # Default I/O paths for the BBB pipeline. Override via run_pipeline() args.
38
  DEFAULT_INPUT = Path("data/raw/bbbp.csv")
39
- DEFAULT_OUTPUT = Path("data/processed/bbbp_features.csv")
40
 
41
 
42
  def is_valid_smiles(smiles: str | float | None) -> bool:
@@ -196,12 +196,13 @@ def run_pipeline(
196
 
197
  Reads the Kaggle BBBP CSV at `input_path`, validates and converts
198
  SMILES into Morgan fingerprints, and writes the model-ready table
199
- to `output_path`. Output is overwritten on every run (idempotent).
 
200
 
201
  Args:
202
  input_path: Path to the raw BBBP CSV (must include `smiles_col`).
203
- output_path: Where to write the processed feature CSV. Parent
204
- directory is created if missing.
205
  smiles_col: SMILES column name in the raw CSV.
206
  n_bits: Morgan fingerprint length.
207
  radius: Morgan radius.
@@ -230,7 +231,9 @@ def run_pipeline(
230
  raise IsADirectoryError(
231
  f"output_path must be a file, got a directory: {output_path}"
232
  )
233
- features.to_csv(output_path, index=False, lineterminator="\n")
 
 
234
  logger.info(
235
  "Wrote processed features to %s (rows=%d, cols=%d)",
236
  output_path, len(features), features.shape[1],
 
36
 
37
  # Default I/O paths for the BBB pipeline. Override via run_pipeline() args.
38
  DEFAULT_INPUT = Path("data/raw/bbbp.csv")
39
+ DEFAULT_OUTPUT = Path("data/processed/bbbp_features.parquet")
40
 
41
 
42
  def is_valid_smiles(smiles: str | float | None) -> bool:
 
196
 
197
  Reads the Kaggle BBBP CSV at `input_path`, validates and converts
198
  SMILES into Morgan fingerprints, and writes the model-ready table
199
+ as a Parquet file at `output_path`. Output is overwritten on every
200
+ run (idempotent) and preserves the uint8 dtype of fingerprint columns.
201
 
202
  Args:
203
  input_path: Path to the raw BBBP CSV (must include `smiles_col`).
204
+ output_path: Where to write the processed feature Parquet file.
205
+ Parent directory is created if missing.
206
  smiles_col: SMILES column name in the raw CSV.
207
  n_bits: Morgan fingerprint length.
208
  radius: Morgan radius.
 
231
  raise IsADirectoryError(
232
  f"output_path must be a file, got a directory: {output_path}"
233
  )
234
+ # Parquet preserves dtypes (uint8 stays uint8) and is byte-deterministic
235
+ # when compression is fixed. Used across BBB / EEG / MRI pipelines.
236
+ features.to_parquet(output_path, index=False, engine="pyarrow", compression="snappy")
237
  logger.info(
238
  "Wrote processed features to %s (rows=%d, cols=%d)",
239
  output_path, len(features), features.shape[1],
tests/pipelines/test_bbb_pipeline.py CHANGED
@@ -141,36 +141,52 @@ class TestExtractFeaturesFromDataFrame:
141
 
142
 
143
  class TestRunPipeline:
144
- def test_end_to_end_writes_processed_csv(self, tmp_path: Path) -> None:
145
  # Arrange: copy fixture into a synthetic raw layout.
146
  raw_dir = tmp_path / "data" / "raw"
147
  proc_dir = tmp_path / "data" / "processed"
148
  raw_dir.mkdir(parents=True)
149
  proc_dir.mkdir(parents=True)
150
  input_path = raw_dir / "bbbp.csv"
151
- output_path = proc_dir / "bbbp_features.csv"
152
  shutil.copy(FIXTURE, input_path)
153
 
154
  # Act
155
  run_pipeline(input_path=input_path, output_path=output_path, n_bits=128, radius=2)
156
 
157
  # Assert: file exists
158
- assert output_path.exists(), "pipeline must write processed CSV"
159
 
160
  # Assert: content is correct
161
- out = pd.read_csv(output_path)
162
  assert len(out) == 4 # 6 raw - 2 invalid
163
  assert "p_np" in out.columns
164
  assert sum(c.startswith("fp_") for c in out.columns) == 128
165
  assert "smiles" not in out.columns
166
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  def test_run_pipeline_is_idempotent(self, tmp_path: Path) -> None:
168
  raw_dir = tmp_path / "data" / "raw"
169
  proc_dir = tmp_path / "data" / "processed"
170
  raw_dir.mkdir(parents=True)
171
  proc_dir.mkdir(parents=True)
172
  input_path = raw_dir / "bbbp.csv"
173
- output_path = proc_dir / "bbbp_features.csv"
174
  shutil.copy(FIXTURE, input_path)
175
 
176
  run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
@@ -184,7 +200,7 @@ class TestRunPipeline:
184
  with pytest.raises(FileNotFoundError):
185
  run_pipeline(
186
  input_path=tmp_path / "nope.csv",
187
- output_path=tmp_path / "out.csv",
188
  )
189
 
190
  def test_run_pipeline_rejects_directory_as_output(self, tmp_path: Path) -> None:
 
141
 
142
 
143
  class TestRunPipeline:
144
+ def test_end_to_end_writes_processed_parquet(self, tmp_path: Path) -> None:
145
  # Arrange: copy fixture into a synthetic raw layout.
146
  raw_dir = tmp_path / "data" / "raw"
147
  proc_dir = tmp_path / "data" / "processed"
148
  raw_dir.mkdir(parents=True)
149
  proc_dir.mkdir(parents=True)
150
  input_path = raw_dir / "bbbp.csv"
151
+ output_path = proc_dir / "bbbp_features.parquet"
152
  shutil.copy(FIXTURE, input_path)
153
 
154
  # Act
155
  run_pipeline(input_path=input_path, output_path=output_path, n_bits=128, radius=2)
156
 
157
  # Assert: file exists
158
+ assert output_path.exists(), "pipeline must write processed Parquet"
159
 
160
  # Assert: content is correct
161
+ out = pd.read_parquet(output_path)
162
  assert len(out) == 4 # 6 raw - 2 invalid
163
  assert "p_np" in out.columns
164
  assert sum(c.startswith("fp_") for c in out.columns) == 128
165
  assert "smiles" not in out.columns
166
 
167
+ def test_run_pipeline_preserves_uint8_dtype(self, tmp_path: Path) -> None:
168
+ """The Parquet round-trip must keep fp_* columns as uint8 (not widen to int64)."""
169
+ raw_dir = tmp_path / "data" / "raw"
170
+ proc_dir = tmp_path / "data" / "processed"
171
+ raw_dir.mkdir(parents=True)
172
+ proc_dir.mkdir(parents=True)
173
+ input_path = raw_dir / "bbbp.csv"
174
+ output_path = proc_dir / "bbbp_features.parquet"
175
+ shutil.copy(FIXTURE, input_path)
176
+
177
+ run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
178
+ out = pd.read_parquet(output_path)
179
+ fp_cols = [c for c in out.columns if c.startswith("fp_")]
180
+ for col in fp_cols:
181
+ assert out[col].dtype == np.uint8, f"{col} widened to {out[col].dtype}"
182
+
183
  def test_run_pipeline_is_idempotent(self, tmp_path: Path) -> None:
184
  raw_dir = tmp_path / "data" / "raw"
185
  proc_dir = tmp_path / "data" / "processed"
186
  raw_dir.mkdir(parents=True)
187
  proc_dir.mkdir(parents=True)
188
  input_path = raw_dir / "bbbp.csv"
189
+ output_path = proc_dir / "bbbp_features.parquet"
190
  shutil.copy(FIXTURE, input_path)
191
 
192
  run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
 
200
  with pytest.raises(FileNotFoundError):
201
  run_pipeline(
202
  input_path=tmp_path / "nope.csv",
203
+ output_path=tmp_path / "out.parquet",
204
  )
205
 
206
  def test_run_pipeline_rejects_directory_as_output(self, tmp_path: Path) -> None: