File size: 9,475 Bytes
c4c8d1e
 
 
 
ebf1b60
c4c8d1e
 
 
 
 
 
 
 
cb5d63e
48cf9c9
c4c8d1e
b1bd8db
b08a67c
c4c8d1e
b1bd8db
 
c4c8d1e
4dab60f
c4c8d1e
4dab60f
cb5d63e
c4c8d1e
 
 
1a15285
6ba9168
4dab60f
1a15285
c4c8d1e
 
0236e73
 
 
 
 
 
c4c8d1e
 
 
32d3a5f
 
915880e
32d3a5f
 
0236e73
c4c8d1e
 
 
 
 
 
 
 
 
 
 
 
b1bd8db
 
 
 
 
 
 
 
 
 
 
 
 
 
80528e7
 
b1bd8db
 
 
 
 
 
 
 
 
 
 
 
80528e7
 
b1bd8db
 
 
b08a67c
 
049a352
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b08a67c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
049a352
 
 
 
b08a67c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
049a352
 
 
b08a67c
 
 
 
 
 
049a352
 
 
 
 
 
b08a67c
049a352
 
b08a67c
 
 
 
049a352
 
 
 
 
 
 
 
 
b08a67c
 
049a352
b08a67c
 
 
 
 
 
 
48cf9c9
 
 
 
 
 
 
 
 
ebf1b60
48cf9c9
 
 
915880e
 
48cf9c9
 
 
915880e
 
48cf9c9
 
 
 
 
 
32d3a5f
48cf9c9
 
 
 
 
 
 
 
cb5d63e
48cf9c9
 
32d3a5f
48cf9c9
 
 
 
 
915880e
 
4dab60f
48cf9c9
 
 
 
 
cb5d63e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48cf9c9
 
32d3a5f
 
48cf9c9
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""BBB (Blood-Brain Barrier) molecule pipeline.

Reads the Kaggle BBBP dataset (SMILES strings + binary penetration label),
filters chemically invalid SMILES, computes Morgan circular fingerprints with
RDKit, and writes a model-ready Parquet feature table to `data/processed/`.

This module follows the Data Readiness contract in AGENTS.md §4:
schema validity, domain validity (drop invalid SMILES), determinism,
traceability (row count in / out / dropped), and idempotent output.
"""
from __future__ import annotations

import math
import time
from pathlib import Path

import numpy as np
import pandas as pd
from rdkit import Chem, RDLogger
from rdkit.Chem import AllChem
from rdkit.DataStructs import ConvertToNumpyArray

from src.core.determinism import pin_threads
from src.core.logger import get_logger
from src.core.storage import write_parquet
from src.core.tracking import track_pipeline_run

logger = get_logger(__name__)

# Pin BLAS / OpenMP / pyarrow to single-threaded mode so byte-determinism
# (AGENTS.md §4 rule 3) holds across hardware. See src.core.determinism.
pin_threads()

# Suppress RDKit's noisy C++-level warning stream; we surface our own
# structured warnings via the project logger when a SMILES fails to parse.
#
# IMPORTANT: this call is process-global and irreversible from this module's
# import. Any other code (other pipelines, the FastAPI surface, tests) that
# relies on RDKit warnings will be affected. If a future modality needs
# fine-grained RDKit log control, move this into an explicit
# `configure_rdkit_logging()` helper invoked from `run_pipeline()` instead.
RDLogger.DisableLog("rdApp.*")


# Default I/O paths for the BBB pipeline. Override via run_pipeline() args.
DEFAULT_INPUT = Path("data/raw/bbbp.csv")
DEFAULT_OUTPUT = Path("data/processed/bbbp_features.parquet")


def is_valid_smiles(smiles: str | float | None) -> bool:
    """Return True iff `smiles` is a non-empty string parseable by RDKit.

    Handles the full set of garbage we expect from real CSVs:
    None, NaN floats, empty strings, and unparseable text.
    """
    if smiles is None:
        return False
    if isinstance(smiles, float) and math.isnan(smiles):
        return False
    if not isinstance(smiles, str) or not smiles.strip():
        return False
    return Chem.MolFromSmiles(smiles) is not None


def compute_morgan_fingerprint(
    smiles: str,
    n_bits: int = 2048,
    radius: int = 2,
) -> np.ndarray:
    """Compute the Morgan (ECFP-like) circular fingerprint for a SMILES.

    Args:
        smiles: A SMILES string already known to be valid. Pass through
            `is_valid_smiles` first if the source is untrusted.
        n_bits: Length of the bit vector. 2048 is the de-facto default
            for downstream scikit-learn classifiers.
        radius: Morgan radius (2 ≈ ECFP4). Passed to RDKit's modern
            MorganGenerator API.

    Returns:
        A 1-D `np.ndarray` of length `n_bits` and dtype `uint8`, where
        each element is 0 or 1.

    Raises:
        ValueError: if `smiles` cannot be parsed by RDKit.
    """
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        raise ValueError(f"invalid SMILES: {smiles!r}")

    generator = AllChem.GetMorganGenerator(radius=radius, fpSize=n_bits)
    bit_vect = generator.GetFingerprint(mol)
    arr = np.zeros((n_bits,), dtype=np.uint8)
    ConvertToNumpyArray(bit_vect, arr)
    return arr


def _compute_fingerprint_matrix(
    valid_smiles: list[str],
    n_bits: int,
    radius: int,
) -> np.ndarray:
    """Stack Morgan fingerprints into a (N, n_bits) uint8 matrix.

    Caller must guarantee `valid_smiles` is non-empty and every entry has
    already passed `is_valid_smiles`.
    """
    return np.stack(
        [
            compute_morgan_fingerprint(s, n_bits=n_bits, radius=radius)
            for s in valid_smiles
        ],
        axis=0,
    )


def extract_features_from_dataframe(
    df: pd.DataFrame,
    smiles_col: str = "smiles",
    n_bits: int = 2048,
    radius: int = 2,
) -> pd.DataFrame:
    """Convert a DataFrame of (SMILES + metadata) into model-ready features.

    Steps:
      1. Validate every SMILES with `is_valid_smiles`. Invalid rows are
         logged at WARNING with their original index and dropped.
      2. Compute the Morgan fingerprint for each remaining SMILES.
      3. Expand the bit vector into `n_bits` integer columns named
         `fp_0 ... fp_{n_bits - 1}` and concatenate with the surviving
         non-SMILES metadata.

    On empty input or when every row is invalid, returns a DataFrame with
    the expected columns and zero rows (rather than raising), so callers
    downstream see a well-typed result instead of an exception.

    Args:
        df: Raw DataFrame; must contain `smiles_col`.
        smiles_col: Name of the SMILES column (default `"smiles"`).
        n_bits: Fingerprint length.
        radius: Morgan radius.

    Returns:
        A new DataFrame with the SMILES column dropped and `n_bits` new
        `fp_*` columns appended. Index is reset to 0..N-1.

    Raises:
        KeyError: if `smiles_col` is missing from `df`.
    """
    if smiles_col not in df.columns:
        raise KeyError(f"DataFrame is missing required column {smiles_col!r}")

    fp_columns = [f"fp_{i}" for i in range(n_bits)]
    metadata_columns = [c for c in df.columns if c != smiles_col]

    n_total = len(df)
    valid_mask = df[smiles_col].apply(is_valid_smiles)
    n_invalid = int((~valid_mask).sum())

    if n_invalid:
        invalid_indices = df.index[~valid_mask].tolist()
        display = invalid_indices[:10]
        suffix = (
            f"... (+{len(invalid_indices) - 10} more)"
            if len(invalid_indices) > 10
            else ""
        )
        logger.warning(
            "Dropping %d/%d rows with invalid SMILES (indices=%s%s)",
            n_invalid, n_total, display, suffix,
        )

    valid_df = df.loc[valid_mask].reset_index(drop=True)

    if len(valid_df) == 0:
        logger.info(
            "Feature extraction complete: in=%d, out=0, dropped=%d (%.2f%%)",
            n_total, n_invalid, 100.0 * n_invalid / max(n_total, 1),
        )
        return pd.DataFrame(columns=metadata_columns + fp_columns)

    fingerprints = _compute_fingerprint_matrix(
        valid_df[smiles_col].tolist(), n_bits=n_bits, radius=radius,
    )
    fp_df = pd.DataFrame(fingerprints, columns=fp_columns, dtype=np.uint8)
    metadata = valid_df.drop(columns=[smiles_col])
    out = pd.concat([metadata, fp_df], axis=1)

    logger.info(
        "Feature extraction complete: in=%d, out=%d, dropped=%d (%.2f%%)",
        n_total, len(out), n_invalid, 100.0 * n_invalid / max(n_total, 1),
    )
    return out


def run_pipeline(
    input_path: Path = DEFAULT_INPUT,
    output_path: Path = DEFAULT_OUTPUT,
    smiles_col: str = "smiles",
    n_bits: int = 2048,
    radius: int = 2,
) -> None:
    """Run the BBB pipeline end-to-end: raw CSV → processed feature Parquet.

    Reads the Kaggle BBBP CSV at `input_path`, validates and converts
    SMILES into Morgan fingerprints, and writes the model-ready table
    as a Parquet file at `output_path`. Output is overwritten on every
    run (idempotent) and preserves the uint8 dtype of fingerprint columns.

    Args:
        input_path: Path to the raw BBBP CSV (must include `smiles_col`).
        output_path: Where to write the processed feature Parquet file.
            Parent directory is created if missing.
        smiles_col: SMILES column name in the raw CSV.
        n_bits: Morgan fingerprint length.
        radius: Morgan radius.

    Raises:
        FileNotFoundError: if `input_path` does not exist.
        IsADirectoryError: if `output_path` resolves to an existing directory.
        KeyError: if `smiles_col` is missing from the CSV.
    """
    input_path = Path(input_path)
    output_path = Path(output_path)

    if not input_path.exists():
        raise FileNotFoundError(f"Raw BBBP file not found: {input_path}")

    started = time.perf_counter()
    logger.info("Reading raw BBBP from %s", input_path)
    df = pd.read_csv(input_path)
    logger.info("Loaded %d rows, %d columns", len(df), len(df.columns))

    features = extract_features_from_dataframe(
        df, smiles_col=smiles_col, n_bits=n_bits, radius=radius,
    )

    # Parquet preserves dtypes (uint8 stays uint8) and is byte-deterministic
    # when compression is fixed. Used across BBB / EEG / MRI pipelines.
    write_parquet(features, output_path)
    logger.info(
        "Wrote processed features to %s (rows=%d, cols=%d)",
        output_path, len(features), features.shape[1],
    )

    duration_sec = time.perf_counter() - started

    with track_pipeline_run(
        experiment_name="bbb_pipeline",
        params={
            "input_path": str(input_path),
            "output_path": str(output_path),
            "n_bits": n_bits,
            "radius": radius,
        },
        metrics={
            "rows_in": float(len(df)),
            "rows_out": float(len(features)),
            "rows_dropped": float(len(df) - len(features)),
            "duration_sec": duration_sec,
        },
        artifact_path=output_path,
    ):
        pass


if __name__ == "__main__":
    # Day-1 CLI entrypoint — runs with default paths against `data/raw/bbbp.csv`.
    # Argument parsing (argparse / click) will land in a later task.
    #   python -m src.pipelines.bbb_pipeline
    run_pipeline()