File size: 18,816 Bytes
7e0ed24 0ce94e3 c4c7642 8c4e3e2 c4c7642 c743c0a 7e0ed24 32e13cf 1a86a6e a1ab9ac 7e0ed24 0643e09 7e0ed24 0643e09 0ce94e3 7e0ed24 1a15285 0643e09 1a15285 9931366 7e0ed24 c4c7642 e3c6c58 7e0ed24 e3c6c58 7e0ed24 e3c6c58 7e0ed24 c743c0a 32359e5 c743c0a 32359e5 c743c0a 32359e5 c743c0a 32359e5 c743c0a 1a86a6e 9931366 1a86a6e 9931366 1a86a6e 9931366 1a86a6e 9931366 1a86a6e 32e13cf 9931366 1a86a6e 9931366 1a86a6e 9931366 1a86a6e a1ab9ac 8da57c6 8c4e3e2 8da57c6 a1ab9ac 8da57c6 0d591d4 8da57c6 a1ab9ac 8da57c6 a1ab9ac 8da57c6 0d591d4 32e13cf 0d591d4 32e13cf c26c6a2 32e13cf 0d591d4 c26c6a2 0d591d4 32e13cf c26c6a2 32e13cf c26c6a2 32e13cf c26c6a2 32e13cf c26c6a2 32e13cf c4c7642 0ce94e3 c4c7642 ea055f0 c4c7642 0643e09 c4c7642 0ce94e3 c4c7642 ea055f0 c4c7642 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 | """EEG (electroencephalography) pipeline.
Loads raw recordings (FIF/EDF), bandpass-filters, removes EOG artifacts via
ICA, slices into fixed-duration epochs, computes per-band PSD + statistical
features, flattens to a 2D table, and writes a model-ready Parquet at
`data/processed/eeg_features.parquet`.
Follows the Data Readiness contract in AGENTS.md §4 and the Parquet storage
convention in §6: schema validity, domain validity (drop NaN/inf epochs with
a logged WARNING), determinism (seeded ICA + sklearn RNG), traceability
(in/out/dropped counts at INFO), and idempotent overwrite output.
"""
from __future__ import annotations
import time
from pathlib import Path
from typing import Callable
import mne
import numpy as np
import pandas as pd
from mne.preprocessing import ICA
from scipy import signal as scipy_signal
from scipy import stats as scipy_stats
from src.core.determinism import pin_threads
from src.core.logger import get_logger
from src.core.storage import write_parquet
from src.core.tracking import track_pipeline_run
logger = get_logger(__name__)
# Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
# (AGENTS.md §4 rule 3) holds across hardware. See src.core.determinism.
pin_threads()
# Pearson-correlation threshold for EOG-component rejection in ICA.
# Real-world EOG components typically score 0.8-0.95 against the EOG channel;
# 0.9 is a conservative floor that avoids false positives at the cost of
# missing weak artifacts. Lower (0.7-0.8) for noisier recordings.
_EOG_CORR_THRESHOLD: float = 0.9
# Default I/O paths for the EEG pipeline. Override via run_pipeline() args.
DEFAULT_INPUT = Path("data/raw/eeg.fif")
DEFAULT_OUTPUT = Path("data/processed/eeg_features.parquet")
def is_valid_epoch(epoch: np.ndarray | None) -> bool:
"""Return True iff `epoch` is a non-empty 2-D numeric array with no NaN/inf.
The annotation is the *expected* input class; the implementation defensively
rejects any other garbage (lists, scalars, string dtypes, zero-sized arrays)
without raising — matching the BBB pipeline's `is_valid_smiles` pattern.
"""
if not isinstance(epoch, np.ndarray):
return False
if epoch.ndim != 2:
return False
if epoch.size == 0:
return False
if not np.issubdtype(epoch.dtype, np.number):
return False
if not np.all(np.isfinite(epoch)):
return False
return True
def bandpass_filter(
raw: mne.io.BaseRaw,
l_freq: float = 1.0,
h_freq: float = 40.0,
) -> mne.io.BaseRaw:
"""Apply a non-mutating bandpass filter to an MNE Raw.
Default 1-40 Hz removes drift below 1 Hz and high-frequency noise / line
artifacts above 40 Hz. Returns a copy; the input `raw` is unchanged.
Args:
raw: Loaded `mne.io.BaseRaw` (call `.load_data()` first if from disk).
l_freq: Low-cut frequency in Hz. Must be strictly less than `h_freq`.
h_freq: High-cut frequency in Hz.
Returns:
A filtered copy of `raw`.
Raises:
ValueError: if `l_freq >= h_freq`. MNE silently produces a corrupted
band-stop-like result on inverted inputs, so we guard up front.
"""
if l_freq >= h_freq:
raise ValueError(
f"l_freq ({l_freq}) must be strictly less than h_freq ({h_freq})"
)
out = raw.copy()
# picks="all" includes the EOG channel so the ICA step in
# remove_artifacts_with_ica sees a consistently-filtered EOG reference.
out.filter(l_freq=l_freq, h_freq=h_freq, picks="all", verbose="ERROR")
logger.info("Bandpass filter applied: %.1f-%.1f Hz", l_freq, h_freq)
return out
def remove_artifacts_with_ica(
raw: mne.io.BaseRaw,
eog_ch_name: str | None = None,
n_components: int = 15,
random_state: int = 97,
) -> mne.io.BaseRaw:
"""Remove EOG-like artifacts using MNE's ICA + EOG correlation.
Fits an ICA decomposition on `raw`, finds components whose time courses
correlate (Pearson) with the named EOG channel via `find_bads_eog` using
`measure="correlation"`, marks them as "bad" and reconstructs the signal
without them. Returns a copy; the input `raw` is unchanged.
If `eog_ch_name` is None or not present in the recording's channels,
ICA is skipped entirely and a copy of `raw` is returned unchanged.
Args:
raw: Loaded, ideally bandpass-filtered, `mne.io.BaseRaw`.
eog_ch_name: Name of the EOG channel for correlation-based detection.
None disables auto-rejection; a string that is not in the recording's
channel list logs a WARNING and skips ICA.
n_components: Cap on ICA components. If this exceeds the number of EEG
channels, MNE raises ValueError, so the implementation internally
caps it at `max(n_eeg - 1, 1)` before fitting.
random_state: Seed for ICA's underlying solver. Required for §4
Determinism.
Returns:
A copy of `raw` with EOG-correlated ICA components removed (or an
unchanged copy if ICA was skipped).
Raises:
ValueError: if the EEG data is rank-deficient (all-zero or constant
channels) and `mne.preprocessing.ICA.fit` cannot converge.
"""
out = raw.copy()
if eog_ch_name is None:
logger.info("ICA skipped: eog_ch_name not provided")
return out
if eog_ch_name not in out.ch_names:
logger.warning(
"ICA skipped: eog_ch_name=%r not found in channels %s",
eog_ch_name, out.ch_names,
)
return out
# Guard: ICA.fit cannot handle NaN/inf in the data (scipy SVD will raise).
# If the raw contains non-finite samples, skip ICA so the NaN propagates
# to the epoch-level validity check in extract_features_from_recording
# where it will be cleanly dropped with a WARNING.
eeg_picks_check = mne.pick_types(out.info, eeg=True, meg=False)
if not np.all(np.isfinite(out.get_data(picks=eeg_picks_check))):
logger.warning(
"ICA skipped: EEG data contains NaN/inf values; "
"invalid epochs will be dropped downstream"
)
return out
# Cap n_components at rank-1. Average reference (if applied) reduces rank
# to n_eeg - 1; using that as the ceiling is safe for both referenced and
# unreferenced data and avoids ValueError from ICA.fit on small recordings.
n_eeg = len(mne.pick_types(out.info, eeg=True, meg=False))
safe_n = min(n_components, max(n_eeg - 1, 1))
ica = ICA(
n_components=safe_n,
random_state=random_state,
max_iter="auto",
method="fastica",
verbose="ERROR",
)
ica.fit(out, picks="eeg", verbose="ERROR")
# Use raw correlation (not z-score) so we can reliably flag artifact
# components on small recordings where n_components < 10 makes the
# default z-score threshold algebraically unreachable.
bad_idx, _ = ica.find_bads_eog(
out,
ch_name=eog_ch_name,
measure="correlation",
threshold=_EOG_CORR_THRESHOLD,
verbose="ERROR",
)
ica.exclude = list(bad_idx)
logger.info(
"ICA fit: n_components=%d, EOG-correlated rejected=%d (indices=%s)",
safe_n, len(ica.exclude), ica.exclude,
)
ica.apply(out, verbose="ERROR")
return out
EEG_BANDS: dict[str, tuple[float, float]] = {
"delta": (1.0, 4.0),
"theta": (4.0, 8.0),
"alpha": (8.0, 13.0),
"beta": (13.0, 30.0),
"gamma": (30.0, 40.0),
}
def _band_power(freqs: np.ndarray, psd: np.ndarray, lo: float, hi: float) -> float:
"""Mean PSD value within the [lo, hi) frequency band."""
mask = (freqs >= lo) & (freqs < hi)
if not mask.any():
return 0.0
return float(psd[mask].mean())
# Statistical-moment functions, bound to their column-label names. The
# `STATS` tuple below is derived from this list so labels and computations
# can never drift out of sync (a class of bug the original parallel-list
# design was vulnerable to).
_StatFn = Callable[[np.ndarray], float]
_STATS_FUNCS: tuple[tuple[str, _StatFn], ...] # populated below
def _stat_mean(x: np.ndarray) -> float:
return float(np.mean(x))
def _stat_std(x: np.ndarray) -> float:
return float(np.std(x))
def _stat_var(x: np.ndarray) -> float:
return float(np.var(x))
def _stat_skew(x: np.ndarray) -> float:
return float(scipy_stats.skew(x))
def _stat_kurtosis(x: np.ndarray) -> float:
return float(scipy_stats.kurtosis(x))
_STATS_FUNCS = (
("mean", _stat_mean),
("std", _stat_std),
("var", _stat_var),
("skew", _stat_skew),
("kurtosis", _stat_kurtosis),
)
STATS: tuple[str, ...] = tuple(name for name, _ in _STATS_FUNCS)
def compute_features_from_epoch(epoch: np.ndarray, sfreq: float) -> np.ndarray:
"""Compute PSD-band + statistical features for one epoch.
Per channel, the feature block is:
[psd_delta, psd_theta, psd_alpha, psd_beta, psd_gamma,
mean, std, var, skew, kurtosis]
Channels are stacked in their input order. The resulting 1-D vector has
length ``n_channels * (len(EEG_BANDS) + len(STATS))``.
PSD uses Welch's method (`scipy.signal.welch`, `nperseg=min(256, n_samples)`).
For meaningful Welch averaging, the epoch should contain at least
`2 * nperseg` samples (e.g. ≥2 seconds at 256 Hz); shorter epochs degrade
to a single-segment periodogram with high estimation variance.
Statistical conventions:
- ``mean``, ``std``, ``var`` use NumPy with ``ddof=0`` (biased / population
estimators). For sample statistics callers must apply ``ddof=1`` adjustment
downstream.
- ``skew`` uses ``scipy.stats.skew(bias=True)`` (biased estimator).
- ``kurtosis`` uses ``scipy.stats.kurtosis(fisher=True, bias=True)`` —
Fisher's *excess* kurtosis (Gaussian → 0, not 3). Add 3 if Pearson
kurtosis is required downstream.
- For constant-valued channels (zero variance), ``skew`` and
``kurtosis`` are mathematically undefined and scipy returns NaN.
We post-process the feature vector with ``np.nan_to_num`` to map
any NaN/inf to 0.0, preserving the "no NaN survives" Parquet
contract from AGENTS.md §6.
Precondition: `epoch` must be finite (no NaN/inf). Filter via
`is_valid_epoch` before calling — feature values are NaN-propagating.
Args:
epoch: A 2-D array shape (n_channels, n_samples), all-finite.
sfreq: Sampling rate in Hz.
Returns:
A 1-D `np.ndarray` of dtype float64.
"""
n_channels, n_samples = epoch.shape
nperseg = min(256, n_samples)
feats: list[float] = []
for ch in range(n_channels):
x = epoch[ch]
freqs, psd = scipy_signal.welch(x, fs=sfreq, nperseg=nperseg)
for _band, (lo, hi) in EEG_BANDS.items():
feats.append(_band_power(freqs, psd, lo, hi))
for _name, fn in _STATS_FUNCS:
feats.append(fn(x))
arr = np.asarray(feats, dtype=np.float64)
# Constant-valued / zero-variance channels (e.g., disconnected electrodes)
# make scipy.stats.skew / kurtosis return NaN. Map those to 0.0 so the
# downstream Parquet contract ("no NaN in feature table") holds.
return np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
def _build_feature_columns(eeg_ch_names: list[str]) -> list[str]:
"""Generate the deterministic, in-channel-order column ordering."""
cols: list[str] = []
for ch in eeg_ch_names:
for band in EEG_BANDS:
cols.append(f"feat_{ch}_psd_{band}")
for stat in STATS:
cols.append(f"feat_{ch}_{stat}")
return cols
def extract_features_from_recording(
raw: mne.io.BaseRaw,
epoch_duration_s: float = 2.0,
eog_ch_name: str | None = None,
n_components: int = 15,
random_state: int = 97,
) -> pd.DataFrame:
"""Run the EEG pipeline on a Raw and return a 2-D feature DataFrame.
Steps:
1. Bandpass filter (1-40 Hz).
2. ICA-based EOG artifact rejection (skipped if `eog_ch_name` is None).
3. Slice into fixed-duration epochs.
4. Drop any epoch with NaN/inf samples (logged WARNING).
5. Compute features per epoch and stack into a DataFrame whose columns
are `feat_<channel>_psd_<band>` and `feat_<channel>_<stat>`.
Args:
raw: Loaded `mne.io.BaseRaw` (must be `.load_data()`'d).
epoch_duration_s: Length of each fixed-duration epoch in seconds.
eog_ch_name: Name of EOG reference channel for ICA. None disables ICA.
n_components: Cap on ICA components.
random_state: Seed for ICA's solver (determinism).
Returns:
A `pd.DataFrame` with one row per valid epoch and ``n_eeg_channels *
(len(EEG_BANDS) + len(STATS))`` ``feat_*`` columns.
Raises:
ValueError: if `epoch_duration_s * sfreq` rounds to less than 1 sample.
(Other ValueError sources can propagate from `bandpass_filter`
and `remove_artifacts_with_ica`; see their respective docstrings.)
"""
filtered = bandpass_filter(raw, l_freq=1.0, h_freq=40.0)
cleaned = remove_artifacts_with_ica(
filtered,
eog_ch_name=eog_ch_name,
n_components=n_components,
random_state=random_state,
)
sfreq = float(cleaned.info["sfreq"])
n_samples_per_epoch = int(round(epoch_duration_s * sfreq))
if n_samples_per_epoch < 1:
raise ValueError(
f"epoch_duration_s={epoch_duration_s!r} at sfreq={sfreq} Hz produces "
f"{n_samples_per_epoch} samples per epoch (must be >= 1)"
)
eeg_picks = mne.pick_types(cleaned.info, eeg=True, meg=False, eog=False)
eeg_names = [cleaned.ch_names[i] for i in eeg_picks]
data = cleaned.get_data(picks=eeg_picks) # shape (n_eeg, n_times)
_, n_times = data.shape
n_total_epochs = n_times // n_samples_per_epoch
feature_cols = _build_feature_columns(eeg_names)
rows: list[np.ndarray] = []
invalid_indices: list[int] = []
for ep in range(n_total_epochs):
start = ep * n_samples_per_epoch
end = start + n_samples_per_epoch
epoch = data[:, start:end]
if not is_valid_epoch(epoch):
invalid_indices.append(ep)
continue
rows.append(compute_features_from_epoch(epoch, sfreq=sfreq))
n_dropped = len(invalid_indices)
if n_dropped:
display = invalid_indices[:10]
suffix = (
f"... (+{n_dropped - 10} more)" if n_dropped > 10 else ""
)
logger.warning(
"Dropping %d/%d epochs with invalid samples (indices=%s%s)",
n_dropped, n_total_epochs, display, suffix,
)
if not rows:
logger.info(
"Feature extraction complete: in=%d, out=0, dropped=%d (%.2f%%)",
n_total_epochs, n_dropped,
100.0 * n_dropped / max(n_total_epochs, 1),
)
return pd.DataFrame(columns=feature_cols).astype(np.float64)
matrix = np.vstack(rows)
out = pd.DataFrame(matrix, columns=feature_cols, dtype=np.float64)
logger.info(
"Feature extraction complete: in=%d, out=%d, dropped=%d (%.2f%%)",
n_total_epochs, len(out), n_dropped,
100.0 * n_dropped / max(n_total_epochs, 1),
)
return out
def run_pipeline(
input_path: Path = DEFAULT_INPUT,
output_path: Path = DEFAULT_OUTPUT,
epoch_duration_s: float = 2.0,
eog_ch_name: str | None = None,
n_components: int = 15,
random_state: int = 97,
) -> None:
"""Run the EEG pipeline end-to-end: raw FIF/EDF -> processed feature Parquet.
Reads `input_path` via MNE, applies bandpass + ICA + epoching + feature
extraction, then writes a model-ready Parquet at `output_path` (preserves
float64 dtype; satisfies AGENTS.md §6).
Args:
input_path: Path to the raw recording (.fif or .edf).
output_path: Where to write the processed feature Parquet file.
Parent directory is created if missing.
epoch_duration_s: Length of each fixed-duration epoch (seconds).
eog_ch_name: Name of the EOG channel for ICA-based artifact rejection.
None disables ICA.
n_components: Cap on ICA components.
random_state: Seed for ICA's solver. Required for §4 Determinism.
Raises:
FileNotFoundError: if `input_path` does not exist.
IsADirectoryError: if `output_path` resolves to an existing directory.
"""
input_path = Path(input_path)
output_path = Path(output_path)
if not input_path.exists():
raise FileNotFoundError(f"Raw EEG file not found: {input_path}")
started = time.perf_counter()
logger.info("Reading raw EEG from %s", input_path)
# Format dispatch: .edf via read_raw_edf, anything else (FIF, gzipped FIF)
# via read_raw_fif. .bdf / .set / .vhdr support can be added here.
if input_path.suffix.lower() == ".edf":
raw = mne.io.read_raw_edf(input_path, preload=True, verbose="ERROR")
else:
raw = mne.io.read_raw_fif(input_path, preload=True, verbose="ERROR")
logger.info(
"Loaded %d channels, sfreq=%.1f Hz, n_times=%d",
len(raw.ch_names), raw.info["sfreq"], raw.n_times,
)
features = extract_features_from_recording(
raw,
epoch_duration_s=epoch_duration_s,
eog_ch_name=eog_ch_name,
n_components=n_components,
random_state=random_state,
)
# Parquet preserves dtypes (float64 features stay float64) and is
# byte-deterministic with single-threaded snappy. AGENTS.md §6.
write_parquet(features, output_path)
logger.info(
"Wrote processed features to %s (rows=%d, cols=%d)",
output_path, len(features), features.shape[1],
)
duration_sec = time.perf_counter() - started
with track_pipeline_run(
experiment_name="eeg_pipeline",
params={
"input_path": str(input_path),
"output_path": str(output_path),
"epoch_duration_s": epoch_duration_s,
"eog_ch_name": str(eog_ch_name) if eog_ch_name is not None else "None",
"n_components": n_components,
"random_state": random_state,
},
metrics={
"rows_out": float(len(features)),
"duration_sec": duration_sec,
},
artifact_path=output_path,
):
pass
if __name__ == "__main__":
# Day-2 CLI entrypoint — runs with default paths against `data/raw/eeg.fif`.
# Defaults to `eog_ch_name=None` (ICA disabled). Pass an EOG channel
# name programmatically via run_pipeline(eog_ch_name=...) to enable
# artifact rejection. Argument parsing (argparse / click) lands later.
# python -m src.pipelines.eeg_pipeline
run_pipeline()
|