mekosotto commited on
Commit
cb5d63e
·
1 Parent(s): 8f586ea

feat(bbb): log run params, metrics, and parquet artifact to MLflow

Browse files
src/pipelines/bbb_pipeline.py CHANGED
@@ -11,6 +11,7 @@ traceability (row count in / out / dropped), and idempotent output.
11
  from __future__ import annotations
12
 
13
  import math
 
14
  from pathlib import Path
15
 
16
  import numpy as np
@@ -22,6 +23,7 @@ from rdkit.DataStructs import ConvertToNumpyArray
22
  from src.core.determinism import pin_threads
23
  from src.core.logger import get_logger
24
  from src.core.storage import write_parquet
 
25
 
26
  logger = get_logger(__name__)
27
 
@@ -224,6 +226,7 @@ def run_pipeline(
224
  if not input_path.exists():
225
  raise FileNotFoundError(f"Raw BBBP file not found: {input_path}")
226
 
 
227
  logger.info("Reading raw BBBP from %s", input_path)
228
  df = pd.read_csv(input_path)
229
  logger.info("Loaded %d rows, %d columns", len(df), len(df.columns))
@@ -240,6 +243,26 @@ def run_pipeline(
240
  output_path, len(features), features.shape[1],
241
  )
242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
 
244
  if __name__ == "__main__":
245
  # Day-1 CLI entrypoint — runs with default paths against `data/raw/bbbp.csv`.
 
11
  from __future__ import annotations
12
 
13
  import math
14
+ import time
15
  from pathlib import Path
16
 
17
  import numpy as np
 
23
  from src.core.determinism import pin_threads
24
  from src.core.logger import get_logger
25
  from src.core.storage import write_parquet
26
+ from src.core.tracking import track_pipeline_run
27
 
28
  logger = get_logger(__name__)
29
 
 
226
  if not input_path.exists():
227
  raise FileNotFoundError(f"Raw BBBP file not found: {input_path}")
228
 
229
+ started = time.perf_counter()
230
  logger.info("Reading raw BBBP from %s", input_path)
231
  df = pd.read_csv(input_path)
232
  logger.info("Loaded %d rows, %d columns", len(df), len(df.columns))
 
243
  output_path, len(features), features.shape[1],
244
  )
245
 
246
+ duration_sec = time.perf_counter() - started
247
+
248
+ with track_pipeline_run(
249
+ experiment_name="bbb_pipeline",
250
+ params={
251
+ "input_path": str(input_path),
252
+ "output_path": str(output_path),
253
+ "n_bits": n_bits,
254
+ "radius": radius,
255
+ },
256
+ metrics={
257
+ "rows_in": float(len(df)),
258
+ "rows_out": float(len(features)),
259
+ "rows_dropped": float(len(df) - len(features)),
260
+ "duration_sec": duration_sec,
261
+ },
262
+ artifact_path=output_path,
263
+ ):
264
+ pass
265
+
266
 
267
  if __name__ == "__main__":
268
  # Day-1 CLI entrypoint — runs with default paths against `data/raw/bbbp.csv`.
tests/pipelines/test_bbb_pipeline.py CHANGED
@@ -215,3 +215,22 @@ class TestRunPipeline:
215
 
216
  with pytest.raises(IsADirectoryError, match="must be a file"):
217
  run_pipeline(input_path=input_path, output_path=bad_output, n_bits=32)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
 
216
  with pytest.raises(IsADirectoryError, match="must be a file"):
217
  run_pipeline(input_path=input_path, output_path=bad_output, n_bits=32)
218
+
219
+
220
+ import mlflow
221
+ from src.pipelines import bbb_pipeline as _bbb_for_mlflow_test
222
+
223
+
224
+ class TestBBBPipelineMLflow:
225
+ def test_run_pipeline_creates_mlflow_run(self, tmp_path):
226
+ from pathlib import Path
227
+ fixture = Path(__file__).resolve().parents[1] / "fixtures" / "bbbp_sample.csv"
228
+ out = tmp_path / "out.parquet"
229
+ _bbb_for_mlflow_test.run_pipeline(input_path=fixture, output_path=out)
230
+ runs = mlflow.search_runs(
231
+ experiment_names=["bbb_pipeline"],
232
+ order_by=["start_time DESC"],
233
+ )
234
+ assert len(runs) >= 1
235
+ assert "metrics.rows_out" in runs.columns
236
+ assert runs.iloc[0]["metrics.rows_out"] > 0