File size: 9,397 Bytes
c4c8d1e
 
 
 
48cf9c9
c4c8d1e
80528e7
c4c8d1e
 
 
80528e7
 
b08a67c
80528e7
48cf9c9
80528e7
c4c8d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0236e73
c4c8d1e
 
 
0236e73
b1bd8db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b08a67c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
049a352
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48cf9c9
 
 
915880e
48cf9c9
 
 
 
 
 
915880e
48cf9c9
 
 
 
 
 
915880e
48cf9c9
 
915880e
48cf9c9
 
 
 
 
915880e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48cf9c9
 
 
 
 
 
915880e
48cf9c9
 
 
 
 
 
 
 
 
 
 
 
 
915880e
48cf9c9
32d3a5f
 
 
 
 
 
 
 
 
 
 
 
 
cb5d63e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Unit + integration tests for the BBB (SMILES → Morgan FP) pipeline."""
from __future__ import annotations

from pathlib import Path
import shutil

import numpy as np
import pandas as pd
import pytest

from src.pipelines.bbb_pipeline import (
    compute_morgan_fingerprint,
    extract_features_from_dataframe,
    is_valid_smiles,
    run_pipeline,
)


FIXTURE = Path(__file__).parent.parent / "fixtures" / "bbbp_sample.csv"


class TestIsValidSmiles:
    def test_accepts_simple_alcohol(self) -> None:
        assert is_valid_smiles("CCCO") is True

    def test_accepts_aromatic_ring(self) -> None:
        assert is_valid_smiles("c1ccccc1") is True

    def test_rejects_garbage_string(self) -> None:
        assert is_valid_smiles("this_is_not_a_smiles") is False

    def test_rejects_empty_string(self) -> None:
        assert is_valid_smiles("") is False

    def test_rejects_none(self) -> None:
        assert is_valid_smiles(None) is False

    def test_rejects_nan(self) -> None:
        import math
        assert is_valid_smiles(math.nan) is False


class TestComputeMorganFingerprint:
    def test_returns_numpy_array_of_correct_length(self) -> None:
        fp = compute_morgan_fingerprint("CCCO", n_bits=2048, radius=2)
        assert isinstance(fp, np.ndarray)
        assert fp.shape == (2048,)
        assert fp.dtype == np.uint8

    def test_only_zero_or_one(self) -> None:
        fp = compute_morgan_fingerprint("c1ccccc1", n_bits=1024, radius=2)
        assert set(np.unique(fp).tolist()).issubset({0, 1})

    def test_different_molecules_yield_different_fingerprints(self) -> None:
        fp_a = compute_morgan_fingerprint("CCCO", n_bits=2048, radius=2)
        fp_b = compute_morgan_fingerprint("c1ccccc1", n_bits=2048, radius=2)
        assert not np.array_equal(fp_a, fp_b)

    def test_invalid_smiles_raises_value_error(self) -> None:
        with pytest.raises(ValueError, match="invalid SMILES"):
            compute_morgan_fingerprint("not_a_smiles", n_bits=2048, radius=2)


class TestExtractFeaturesFromDataFrame:
    def test_filters_invalid_smiles(self) -> None:
        raw = pd.read_csv(FIXTURE)
        # Sanity: fixture contains 6 rows total, 2 are invalid by construction.
        assert len(raw) == 6

        features = extract_features_from_dataframe(raw, smiles_col="smiles", n_bits=128, radius=2)

        # Only the 4 chemically valid rows should remain.
        assert len(features) == 4

    def test_preserves_label_column(self) -> None:
        raw = pd.read_csv(FIXTURE)
        features = extract_features_from_dataframe(raw, smiles_col="smiles", n_bits=128, radius=2)
        assert "p_np" in features.columns

    def test_expands_fingerprint_into_named_columns(self) -> None:
        raw = pd.read_csv(FIXTURE)
        features = extract_features_from_dataframe(raw, smiles_col="smiles", n_bits=128, radius=2)
        fp_cols = [c for c in features.columns if c.startswith("fp_")]
        assert len(fp_cols) == 128
        # All FP columns must be 0/1 integers.
        assert features[fp_cols].isin([0, 1]).all().all()

    def test_drops_smiles_string_after_expansion(self) -> None:
        """Once expanded to bits, the original SMILES string adds no signal."""
        raw = pd.read_csv(FIXTURE)
        features = extract_features_from_dataframe(raw, smiles_col="smiles", n_bits=128, radius=2)
        assert "smiles" not in features.columns

    def test_resets_index(self) -> None:
        raw = pd.read_csv(FIXTURE)
        features = extract_features_from_dataframe(raw, smiles_col="smiles", n_bits=128, radius=2)
        assert list(features.index) == list(range(len(features)))

    def test_raises_key_error_on_missing_smiles_col(self) -> None:
        df = pd.DataFrame({"foo": [1, 2, 3]})
        with pytest.raises(KeyError, match="missing required column 'smiles'"):
            extract_features_from_dataframe(df, smiles_col="smiles", n_bits=64)

    def test_returns_empty_dataframe_when_all_invalid(self) -> None:
        """All-invalid input must produce a typed empty result, not crash."""
        df = pd.DataFrame(
            {
                "p_np": [0, 0],
                "smiles": ["", "still_garbage"],
            }
        )
        out = extract_features_from_dataframe(df, smiles_col="smiles", n_bits=32)
        assert len(out) == 0
        assert "p_np" in out.columns
        assert sum(c.startswith("fp_") for c in out.columns) == 32
        assert "smiles" not in out.columns

    def test_emits_warning_and_info_logs(self) -> None:
        """AGENTS.md §4 traceability: log invalid drops + in/out/dropped counts."""
        import io
        import logging

        from src.core.logger import get_logger
        from src.pipelines import bbb_pipeline as mod

        # Swap the module logger's stream so we can capture output.
        logger = get_logger(mod.__name__, level=logging.INFO)
        handler = logger.handlers[0]
        buf = io.StringIO()
        original_stream = handler.stream
        handler.stream = buf
        try:
            df = pd.read_csv(FIXTURE)
            extract_features_from_dataframe(df, smiles_col="smiles", n_bits=32)
        finally:
            handler.stream = original_stream

        output = buf.getvalue()
        assert "Dropping 2/6 rows with invalid SMILES" in output
        assert "Feature extraction complete: in=6, out=4, dropped=2" in output


class TestRunPipeline:
    def test_end_to_end_writes_processed_parquet(self, tmp_path: Path) -> None:
        # Arrange: copy fixture into a synthetic raw layout.
        raw_dir = tmp_path / "data" / "raw"
        proc_dir = tmp_path / "data" / "processed"
        raw_dir.mkdir(parents=True)
        proc_dir.mkdir(parents=True)
        input_path = raw_dir / "bbbp.csv"
        output_path = proc_dir / "bbbp_features.parquet"
        shutil.copy(FIXTURE, input_path)

        # Act
        run_pipeline(input_path=input_path, output_path=output_path, n_bits=128, radius=2)

        # Assert: file exists
        assert output_path.exists(), "pipeline must write processed Parquet"

        # Assert: content is correct
        out = pd.read_parquet(output_path)
        assert len(out) == 4  # 6 raw - 2 invalid
        assert "p_np" in out.columns
        assert sum(c.startswith("fp_") for c in out.columns) == 128
        assert "smiles" not in out.columns

    def test_run_pipeline_preserves_uint8_dtype(self, tmp_path: Path) -> None:
        """The Parquet round-trip must keep fp_* columns as uint8 (not widen to int64)."""
        raw_dir = tmp_path / "data" / "raw"
        proc_dir = tmp_path / "data" / "processed"
        raw_dir.mkdir(parents=True)
        proc_dir.mkdir(parents=True)
        input_path = raw_dir / "bbbp.csv"
        output_path = proc_dir / "bbbp_features.parquet"
        shutil.copy(FIXTURE, input_path)

        run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
        out = pd.read_parquet(output_path)
        fp_cols = [c for c in out.columns if c.startswith("fp_")]
        for col in fp_cols:
            assert out[col].dtype == np.uint8, f"{col} widened to {out[col].dtype}"

    def test_run_pipeline_is_idempotent(self, tmp_path: Path) -> None:
        raw_dir = tmp_path / "data" / "raw"
        proc_dir = tmp_path / "data" / "processed"
        raw_dir.mkdir(parents=True)
        proc_dir.mkdir(parents=True)
        input_path = raw_dir / "bbbp.csv"
        output_path = proc_dir / "bbbp_features.parquet"
        shutil.copy(FIXTURE, input_path)

        run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
        first_bytes = output_path.read_bytes()
        run_pipeline(input_path=input_path, output_path=output_path, n_bits=64, radius=2)
        second_bytes = output_path.read_bytes()

        assert first_bytes == second_bytes, "pipeline output must be byte-deterministic"

    def test_run_pipeline_raises_when_input_missing(self, tmp_path: Path) -> None:
        with pytest.raises(FileNotFoundError):
            run_pipeline(
                input_path=tmp_path / "nope.csv",
                output_path=tmp_path / "out.parquet",
            )

    def test_run_pipeline_rejects_directory_as_output(self, tmp_path: Path) -> None:
        raw_dir = tmp_path / "data" / "raw"
        raw_dir.mkdir(parents=True)
        input_path = raw_dir / "bbbp.csv"
        shutil.copy(FIXTURE, input_path)

        # output_path points at an existing directory, not a file
        bad_output = tmp_path / "out_dir"
        bad_output.mkdir()

        with pytest.raises(IsADirectoryError, match="must be a file"):
            run_pipeline(input_path=input_path, output_path=bad_output, n_bits=32)


import mlflow
from src.pipelines import bbb_pipeline as _bbb_for_mlflow_test


class TestBBBPipelineMLflow:
    def test_run_pipeline_creates_mlflow_run(self, tmp_path):
        from pathlib import Path
        fixture = Path(__file__).resolve().parents[1] / "fixtures" / "bbbp_sample.csv"
        out = tmp_path / "out.parquet"
        _bbb_for_mlflow_test.run_pipeline(input_path=fixture, output_path=out)
        runs = mlflow.search_runs(
            experiment_names=["bbb_pipeline"],
            order_by=["start_time DESC"],
        )
        assert len(runs) >= 1
        assert "metrics.rows_out" in runs.columns
        assert runs.iloc[0]["metrics.rows_out"] > 0