mekosotto commited on
Commit
0ce94e3
·
1 Parent(s): cb5d63e

feat(eeg): log run params, metrics, and parquet artifact to MLflow

Browse files
src/pipelines/eeg_pipeline.py CHANGED
@@ -12,6 +12,7 @@ a logged WARNING), determinism (seeded ICA + sklearn RNG), traceability
12
  """
13
  from __future__ import annotations
14
 
 
15
  from pathlib import Path
16
  from typing import Callable
17
 
@@ -25,6 +26,7 @@ from scipy import stats as scipy_stats
25
  from src.core.determinism import pin_threads
26
  from src.core.logger import get_logger
27
  from src.core.storage import write_parquet
 
28
 
29
  logger = get_logger(__name__)
30
 
@@ -439,6 +441,7 @@ def run_pipeline(
439
  if not input_path.exists():
440
  raise FileNotFoundError(f"Raw EEG file not found: {input_path}")
441
 
 
442
  logger.info("Reading raw EEG from %s", input_path)
443
  # Format dispatch: .edf via read_raw_edf, anything else (FIF, gzipped FIF)
444
  # via read_raw_fif. .bdf / .set / .vhdr support can be added here.
@@ -467,6 +470,26 @@ def run_pipeline(
467
  output_path, len(features), features.shape[1],
468
  )
469
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470
 
471
  if __name__ == "__main__":
472
  # Day-2 CLI entrypoint — runs with default paths against `data/raw/eeg.fif`.
 
12
  """
13
  from __future__ import annotations
14
 
15
+ import time
16
  from pathlib import Path
17
  from typing import Callable
18
 
 
26
  from src.core.determinism import pin_threads
27
  from src.core.logger import get_logger
28
  from src.core.storage import write_parquet
29
+ from src.core.tracking import track_pipeline_run
30
 
31
  logger = get_logger(__name__)
32
 
 
441
  if not input_path.exists():
442
  raise FileNotFoundError(f"Raw EEG file not found: {input_path}")
443
 
444
+ started = time.perf_counter()
445
  logger.info("Reading raw EEG from %s", input_path)
446
  # Format dispatch: .edf via read_raw_edf, anything else (FIF, gzipped FIF)
447
  # via read_raw_fif. .bdf / .set / .vhdr support can be added here.
 
470
  output_path, len(features), features.shape[1],
471
  )
472
 
473
+ duration_sec = time.perf_counter() - started
474
+
475
+ with track_pipeline_run(
476
+ experiment_name="eeg_pipeline",
477
+ params={
478
+ "input_path": str(input_path),
479
+ "output_path": str(output_path),
480
+ "epoch_duration_s": epoch_duration_s,
481
+ "eog_ch_name": str(eog_ch_name) if eog_ch_name is not None else "None",
482
+ "n_components": n_components,
483
+ "random_state": random_state,
484
+ },
485
+ metrics={
486
+ "rows_out": float(len(features)),
487
+ "duration_sec": duration_sec,
488
+ },
489
+ artifact_path=output_path,
490
+ ):
491
+ pass
492
+
493
 
494
  if __name__ == "__main__":
495
  # Day-2 CLI entrypoint — runs with default paths against `data/raw/eeg.fif`.
tests/pipelines/test_eeg_pipeline.py CHANGED
@@ -427,3 +427,26 @@ class TestRunPipeline:
427
  epoch_duration_s=2.0, eog_ch_name="EOG061",
428
  n_components=4, random_state=97,
429
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
427
  epoch_duration_s=2.0, eog_ch_name="EOG061",
428
  n_components=4, random_state=97,
429
  )
430
+
431
+
432
+ import mlflow
433
+ from src.pipelines import eeg_pipeline as _eeg_for_mlflow_test
434
+
435
+
436
+ class TestEEGPipelineMLflow:
437
+ def test_run_pipeline_creates_mlflow_run(self, tmp_path):
438
+ from pathlib import Path
439
+ fixture = Path(__file__).resolve().parents[1] / "fixtures" / "eeg_sample.fif"
440
+ out = tmp_path / "out.parquet"
441
+ _eeg_for_mlflow_test.run_pipeline(
442
+ input_path=fixture, output_path=out,
443
+ epoch_duration_s=2.0, eog_ch_name="EOG061",
444
+ n_components=4, random_state=97,
445
+ )
446
+ runs = mlflow.search_runs(
447
+ experiment_names=["eeg_pipeline"],
448
+ order_by=["start_time DESC"],
449
+ )
450
+ assert len(runs) >= 1
451
+ assert "metrics.rows_out" in runs.columns
452
+ assert runs.iloc[0]["metrics.rows_out"] > 0