feat(bbb): add run_pipeline orchestrator + CLI entrypoint with idempotent writes
Browse files
src/pipelines/bbb_pipeline.py
CHANGED
|
@@ -11,6 +11,7 @@ traceability (row count in / out / dropped), and idempotent output.
|
|
| 11 |
from __future__ import annotations
|
| 12 |
|
| 13 |
import math
|
|
|
|
| 14 |
|
| 15 |
import numpy as np
|
| 16 |
import pandas as pd
|
|
@@ -177,3 +178,60 @@ def extract_features_from_dataframe(
|
|
| 177 |
n_total, len(out), n_invalid, 100.0 * n_invalid / max(n_total, 1),
|
| 178 |
)
|
| 179 |
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 11 |
from __future__ import annotations
|
| 12 |
|
| 13 |
import math
|
| 14 |
+
from pathlib import Path
|
| 15 |
|
| 16 |
import numpy as np
|
| 17 |
import pandas as pd
|
|
|
|
| 178 |
n_total, len(out), n_invalid, 100.0 * n_invalid / max(n_total, 1),
|
| 179 |
)
|
| 180 |
return out
|
| 181 |
+
|
| 182 |
+
|
| 183 |
+
DEFAULT_INPUT = Path("data/raw/bbbp.csv")
|
| 184 |
+
DEFAULT_OUTPUT = Path("data/processed/bbbp_features.csv")
|
| 185 |
+
|
| 186 |
+
|
| 187 |
+
def run_pipeline(
|
| 188 |
+
input_path: Path = DEFAULT_INPUT,
|
| 189 |
+
output_path: Path = DEFAULT_OUTPUT,
|
| 190 |
+
smiles_col: str = "smiles",
|
| 191 |
+
n_bits: int = 2048,
|
| 192 |
+
radius: int = 2,
|
| 193 |
+
) -> None:
|
| 194 |
+
"""Run the BBB pipeline end-to-end: raw CSV → processed feature CSV.
|
| 195 |
+
|
| 196 |
+
Reads the Kaggle BBBP CSV at `input_path`, validates and converts
|
| 197 |
+
SMILES into Morgan fingerprints, and writes the model-ready table
|
| 198 |
+
to `output_path`. Output is overwritten on every run (idempotent).
|
| 199 |
+
|
| 200 |
+
Args:
|
| 201 |
+
input_path: Path to the raw BBBP CSV (must include `smiles_col`).
|
| 202 |
+
output_path: Where to write the processed feature CSV. Parent
|
| 203 |
+
directory is created if missing.
|
| 204 |
+
smiles_col: SMILES column name in the raw CSV.
|
| 205 |
+
n_bits: Morgan fingerprint length.
|
| 206 |
+
radius: Morgan radius.
|
| 207 |
+
|
| 208 |
+
Raises:
|
| 209 |
+
FileNotFoundError: if `input_path` does not exist.
|
| 210 |
+
KeyError: if `smiles_col` is missing from the CSV.
|
| 211 |
+
"""
|
| 212 |
+
input_path = Path(input_path)
|
| 213 |
+
output_path = Path(output_path)
|
| 214 |
+
|
| 215 |
+
if not input_path.exists():
|
| 216 |
+
raise FileNotFoundError(f"Raw BBBP file not found: {input_path}")
|
| 217 |
+
|
| 218 |
+
logger.info("Reading raw BBBP from %s", input_path)
|
| 219 |
+
df = pd.read_csv(input_path)
|
| 220 |
+
logger.info("Loaded %d rows, columns=%s", len(df), list(df.columns))
|
| 221 |
+
|
| 222 |
+
features = extract_features_from_dataframe(
|
| 223 |
+
df, smiles_col=smiles_col, n_bits=n_bits, radius=radius,
|
| 224 |
+
)
|
| 225 |
+
|
| 226 |
+
output_path.parent.mkdir(parents=True, exist_ok=True)
|
| 227 |
+
features.to_csv(output_path, index=False)
|
| 228 |
+
logger.info(
|
| 229 |
+
"Wrote processed features to %s (rows=%d, cols=%d)",
|
| 230 |
+
output_path, len(features), features.shape[1],
|
| 231 |
+
)
|
| 232 |
+
|
| 233 |
+
|
| 234 |
+
if __name__ == "__main__":
|
| 235 |
+
# Production-ready CLI entrypoint:
|
| 236 |
+
# python -m src.pipelines.bbb_pipeline
|
| 237 |
+
run_pipeline()
|
tests/pipelines/test_bbb_pipeline.py
CHANGED
|
@@ -2,6 +2,7 @@
|
|
| 2 |
from __future__ import annotations
|
| 3 |
|
| 4 |
from pathlib import Path
|
|
|
|
| 5 |
|
| 6 |
import numpy as np
|
| 7 |
import pandas as pd
|
|
@@ -11,6 +12,7 @@ from src.pipelines.bbb_pipeline import (
|
|
| 11 |
compute_morgan_fingerprint,
|
| 12 |
extract_features_from_dataframe,
|
| 13 |
is_valid_smiles,
|
|
|
|
| 14 |
)
|
| 15 |
|
| 16 |
|
|
@@ -136,3 +138,51 @@ class TestExtractFeaturesFromDataFrame:
|
|
| 136 |
output = buf.getvalue()
|
| 137 |
assert "Dropping 2/6 rows with invalid SMILES" in output
|
| 138 |
assert "Feature extraction complete: in=6, out=4, dropped=2" in output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 2 |
from __future__ import annotations
|
| 3 |
|
| 4 |
from pathlib import Path
|
| 5 |
+
import shutil
|
| 6 |
|
| 7 |
import numpy as np
|
| 8 |
import pandas as pd
|
|
|
|
| 12 |
compute_morgan_fingerprint,
|
| 13 |
extract_features_from_dataframe,
|
| 14 |
is_valid_smiles,
|
| 15 |
+
run_pipeline,
|
| 16 |
)
|
| 17 |
|
| 18 |
|
|
|
|
| 138 |
output = buf.getvalue()
|
| 139 |
assert "Dropping 2/6 rows with invalid SMILES" in output
|
| 140 |
assert "Feature extraction complete: in=6, out=4, dropped=2" in output
|
| 141 |
+
|
| 142 |
+
|
| 143 |
+
class TestRunPipeline:
|
| 144 |
+
def test_end_to_end_writes_processed_csv(self, tmp_path: Path) -> None:
|
| 145 |
+
# Arrange: copy fixture into a synthetic raw layout.
|
| 146 |
+
raw_dir = tmp_path / "data" / "raw"
|
| 147 |
+
proc_dir = tmp_path / "data" / "processed"
|
| 148 |
+
raw_dir.mkdir(parents=True)
|
| 149 |
+
proc_dir.mkdir(parents=True)
|
| 150 |
+
input_path = raw_dir / "bbbp.csv"
|
| 151 |
+
output_path = proc_dir / "bbbp_features.csv"
|
| 152 |
+
shutil.copy(FIXTURE, input_path)
|
| 153 |
+
|
| 154 |
+
# Act
|
| 155 |
+
run_pipeline(input_path=input_path, output_path=output_path, n_bits=128, radius=2)
|
| 156 |
+
|
| 157 |
+
# Assert: file exists
|
| 158 |
+
assert output_path.exists(), "pipeline must write processed CSV"
|
| 159 |
+
|
| 160 |
+
# Assert: content is correct
|
| 161 |
+
out = pd.read_csv(output_path)
|
| 162 |
+
assert len(out) == 4 # 6 raw - 2 invalid
|
| 163 |
+
assert "p_np" in out.columns
|
| 164 |
+
assert sum(c.startswith("fp_") for c in out.columns) == 128
|
| 165 |
+
assert "smiles" not in out.columns
|
| 166 |
+
|
| 167 |
+
def test_run_pipeline_is_idempotent(self, tmp_path: Path) -> None:
|
| 168 |
+
raw_dir = tmp_path / "data" / "raw"
|
| 169 |
+
proc_dir = tmp_path / "data" / "processed"
|
| 170 |
+
raw_dir.mkdir(parents=True)
|
| 171 |
+
proc_dir.mkdir(parents=True)
|
| 172 |
+
input_path = raw_dir / "bbbp.csv"
|
| 173 |
+
output_path = proc_dir / "bbbp_features.csv"
|
| 174 |
+
shutil.copy(FIXTURE, input_path)
|
| 175 |
+
|
| 176 |
+
run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
|
| 177 |
+
first_bytes = output_path.read_bytes()
|
| 178 |
+
run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
|
| 179 |
+
second_bytes = output_path.read_bytes()
|
| 180 |
+
|
| 181 |
+
assert first_bytes == second_bytes, "pipeline output must be byte-deterministic"
|
| 182 |
+
|
| 183 |
+
def test_run_pipeline_raises_when_input_missing(self, tmp_path: Path) -> None:
|
| 184 |
+
with pytest.raises(FileNotFoundError):
|
| 185 |
+
run_pipeline(
|
| 186 |
+
input_path=tmp_path / "nope.csv",
|
| 187 |
+
output_path=tmp_path / "out.csv",
|
| 188 |
+
)
|