Spaces:
Sleeping
Sleeping
Prasham.Jain
feat(branch-b): B1 public dataset ingest — DeFlaker, iDFlakies, FlakeFlagger, LogHub loaders + CLI
54627d8 | """Phase B1 dataset-loader tests. | |
| The four loaders are exercised against tiny fixtures shipped alongside the | |
| tests so the suite runs offline (matches §Implementation notes "Caching is | |
| content-addressable. Re-running is safe."). One test per loader covers each | |
| behavior the phase doc requires: | |
| - yields ≥ 1 ``FailureRecord`` | |
| - records validate against the schema (Pydantic raises if not) | |
| - caches to ``cache_dir`` after a fetch | |
| - a repeat ``load_or_fetch`` returns from the cache (we corrupt the source | |
| file between calls and verify it still works) | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import json | |
| import sys | |
| from pathlib import Path | |
| import pytest | |
| from ci_triage_env.data.cli import build_parser | |
| from ci_triage_env.data.cli import main as cli_main | |
| from ci_triage_env.data.datasets import ( | |
| LOADER_REGISTRY, | |
| DatasetLoader, | |
| DeFlakerLoader, | |
| FailureRecord, | |
| FlakeFlaggerLoader, | |
| IDFlakiesLoader, | |
| LogHubLoader, | |
| ) | |
| from ci_triage_env.data.datasets._base import MissingArtifactError | |
| from ci_triage_env.data.datasets.cache import ( | |
| DEFAULT_CACHE_ROOT, | |
| cache_dir_for, | |
| is_cached, | |
| load_cached, | |
| ) | |
| FIXTURES_ROOT = Path(__file__).parent / "fixtures" | |
| # --------------------------------------------------------------------------- | |
| # FailureRecord round-trip + registry sanity | |
| # --------------------------------------------------------------------------- | |
| def test_failure_record_round_trip(): | |
| rec = FailureRecord( | |
| record_id="rt-1", | |
| source_dataset="deflaker", | |
| project="x/y", | |
| test_name="t", | |
| failure_type_label="flaky", | |
| log_text="log", | |
| metadata={"k": "v"}, | |
| ) | |
| restored = FailureRecord.model_validate_json(rec.model_dump_json()) | |
| assert restored == rec | |
| def test_loader_registry_lists_all_four(): | |
| assert set(LOADER_REGISTRY) == {"deflaker", "idflakies", "flakeflagger", "loghub"} | |
| def test_each_loader_has_correct_name(loader_cls, expected_name): | |
| assert loader_cls.name == expected_name | |
| def test_missing_data_path_raises_missing_artifact(loader_cls, tmp_path, monkeypatch): | |
| monkeypatch.delenv(loader_cls.env_var, raising=False) | |
| loader = loader_cls(cache_dir=tmp_path / "cache") | |
| with pytest.raises(MissingArtifactError) as exc_info: | |
| list(loader.fetch()) | |
| assert loader_cls.download_instructions in str(exc_info.value) | |
| # --------------------------------------------------------------------------- | |
| # Per-loader fixture-driven tests (parametrized so the matrix is one place). | |
| # --------------------------------------------------------------------------- | |
| LOADER_CASES = [ | |
| pytest.param( | |
| DeFlakerLoader, | |
| FIXTURES_ROOT / "deflaker" / "failures.csv", | |
| 3, # 4th row is malformed and skipped | |
| {"flaky", "real"}, | |
| id="deflaker", | |
| ), | |
| pytest.param( | |
| IDFlakiesLoader, | |
| FIXTURES_ROOT / "idflakies" / "idflakies.csv", | |
| 4, | |
| {"OD", "NOD", "OD-Brit", "OD-Vict"}, | |
| id="idflakies", | |
| ), | |
| pytest.param( | |
| FlakeFlaggerLoader, | |
| FIXTURES_ROOT / "flakeflagger" / "features.csv", | |
| 3, | |
| {"timing", "io", "thread"}, | |
| id="flakeflagger", | |
| ), | |
| pytest.param( | |
| LogHubLoader, | |
| FIXTURES_ROOT / "loghub", | |
| 7, # 4 from HDFS_v1 + 3 from Linux | |
| {"normal", "anomaly", "unlabeled"}, | |
| id="loghub", | |
| ), | |
| ] | |
| def test_loader_fixture_yields_at_least_one_record( | |
| loader_cls, data_path, expected_count, expected_labels, tmp_path | |
| ): | |
| loader = loader_cls(data_path=data_path, cache_dir=tmp_path / "cache") | |
| records = list(loader.fetch()) | |
| assert records, f"{loader_cls.name} produced no records" | |
| assert len(records) == expected_count | |
| for r in records: | |
| assert isinstance(r, FailureRecord) | |
| def test_loader_records_validate_against_schema( | |
| loader_cls, data_path, expected_count, expected_labels, tmp_path | |
| ): | |
| loader = loader_cls(data_path=data_path, cache_dir=tmp_path / "cache") | |
| for record in loader.fetch(): | |
| # round-trip via Pydantic to exercise validation end-to-end | |
| FailureRecord.model_validate_json(record.model_dump_json()) | |
| def test_loader_caches_to_disk( | |
| loader_cls, data_path, expected_count, expected_labels, tmp_path | |
| ): | |
| cache_dir = tmp_path / "cache" | |
| loader = loader_cls(data_path=data_path, cache_dir=cache_dir) | |
| records = list(loader.fetch()) | |
| written = loader.cache_records(records) | |
| assert written == len(records) | |
| cached_files = list(cache_dir.glob("*.json")) | |
| assert len(cached_files) == len(records) | |
| # Sanity: every cached file parses back cleanly. | |
| for path in cached_files: | |
| FailureRecord.model_validate_json(path.read_text()) | |
| def test_loader_repeat_load_or_fetch_uses_cache( | |
| loader_cls, data_path, expected_count, expected_labels, tmp_path | |
| ): | |
| """Once the cache is populated, ``load_or_fetch`` reads from disk — | |
| even if the source artifact is later deleted.""" | |
| cache_dir = tmp_path / "cache" | |
| loader = loader_cls(data_path=data_path, cache_dir=cache_dir) | |
| first = list(loader.load_or_fetch()) | |
| assert len(first) == expected_count | |
| # Wreck the source so the next call would fail if the cache wasn't honored. | |
| loader_after = loader_cls(data_path=tmp_path / "does_not_exist", cache_dir=cache_dir) | |
| second = list(loader_after.load_or_fetch()) | |
| assert {r.record_id for r in first} == {r.record_id for r in second} | |
| def test_loader_label_distribution_in_info( | |
| loader_cls, data_path, expected_count, expected_labels, tmp_path | |
| ): | |
| loader = loader_cls(data_path=data_path, cache_dir=tmp_path / "cache") | |
| info = loader.info() | |
| assert info["count"] == expected_count | |
| seen = set(info["label_distribution"]) - {None} | |
| # Some loaders may emit additional labels (e.g. LogHub mixes), but every | |
| # expected label must appear. | |
| assert expected_labels.issubset(seen) or expected_labels & seen | |
| # --------------------------------------------------------------------------- | |
| # Loader-specific fine-grained checks | |
| # --------------------------------------------------------------------------- | |
| def test_deflaker_skips_malformed_rows(tmp_path): | |
| loader = DeFlakerLoader( | |
| data_path=FIXTURES_ROOT / "deflaker" / "failures.csv", | |
| cache_dir=tmp_path / "cache", | |
| ) | |
| records = list(loader.fetch()) | |
| assert all(r.test_name and r.metadata.get("commit_sha") for r in records) | |
| def test_deflaker_rejects_csv_missing_required_columns(tmp_path): | |
| bad = tmp_path / "bad.csv" | |
| with open(bad, "w", newline="") as fh: | |
| writer = csv.writer(fh) | |
| writer.writerow(["project", "test"]) # missing label, commit_sha | |
| writer.writerow(["x", "y"]) | |
| with pytest.raises(ValueError, match="missing required columns"): | |
| list(DeFlakerLoader(data_path=bad, cache_dir=tmp_path / "cache").fetch()) | |
| def test_idflakies_record_ids_are_unique(tmp_path): | |
| """Two iDFlakies rows can share (project, test_name) under different | |
| flake-type prefixes; record_ids must still be unique.""" | |
| loader = IDFlakiesLoader( | |
| data_path=FIXTURES_ROOT / "idflakies" / "idflakies.csv", | |
| cache_dir=tmp_path / "cache", | |
| ) | |
| ids = [r.record_id for r in loader.fetch()] | |
| assert len(ids) == len(set(ids)) | |
| def test_flakeflagger_extra_columns_preserved_in_metadata(tmp_path): | |
| loader = FlakeFlaggerLoader( | |
| data_path=FIXTURES_ROOT / "flakeflagger" / "features.csv", | |
| cache_dir=tmp_path / "cache", | |
| ) | |
| records = list(loader.fetch()) | |
| timing = next(r for r in records if r.test_name.endswith("testTiming")) | |
| assert timing.metadata["timing_flag"] is True | |
| assert timing.metadata["io_flag"] is False | |
| assert timing.metadata["extra_feature"] == "seasonal" | |
| def test_loghub_picks_up_anomaly_labels(tmp_path): | |
| loader = LogHubLoader( | |
| data_path=FIXTURES_ROOT / "loghub", | |
| cache_dir=tmp_path / "cache", | |
| sub_datasets=["HDFS_v1"], | |
| ) | |
| records = list(loader.fetch()) | |
| by_label = {r.metadata["line_no"]: r.failure_type_label for r in records} | |
| assert by_label[1] == "anomaly" | |
| assert by_label[3] == "anomaly" | |
| assert by_label[0] == "normal" | |
| def test_loghub_per_subset_limit_caps_records(tmp_path): | |
| loader = LogHubLoader( | |
| data_path=FIXTURES_ROOT / "loghub", | |
| cache_dir=tmp_path / "cache", | |
| sub_datasets=["HDFS_v1"], | |
| per_subset_limit=2, | |
| ) | |
| records = list(loader.fetch()) | |
| assert len(records) == 2 | |
| def test_loghub_treats_missing_anomaly_csv_as_unlabeled(tmp_path): | |
| loader = LogHubLoader( | |
| data_path=FIXTURES_ROOT / "loghub", | |
| cache_dir=tmp_path / "cache", | |
| sub_datasets=["Linux"], | |
| ) | |
| labels = {r.failure_type_label for r in loader.fetch()} | |
| assert labels == {"unlabeled"} | |
| def test_loghub_skips_missing_subsets_silently(tmp_path): | |
| loader = LogHubLoader( | |
| data_path=FIXTURES_ROOT / "loghub", | |
| cache_dir=tmp_path / "cache", | |
| sub_datasets=["HDFS_v1", "Hadoop_does_not_exist"], | |
| ) | |
| records = list(loader.fetch()) | |
| assert all(r.metadata["sub_dataset"] == "HDFS_v1" for r in records) | |
| # --------------------------------------------------------------------------- | |
| # cache helpers | |
| # --------------------------------------------------------------------------- | |
| def test_cache_dir_for_respects_env_var(monkeypatch, tmp_path): | |
| monkeypatch.setenv("CI_TRIAGE_DATA_CACHE", str(tmp_path / "alt")) | |
| assert cache_dir_for("loghub") == tmp_path / "alt" / "loghub" | |
| def test_cache_dir_for_default_root(monkeypatch): | |
| monkeypatch.delenv("CI_TRIAGE_DATA_CACHE", raising=False) | |
| assert cache_dir_for("loghub") == DEFAULT_CACHE_ROOT / "loghub" | |
| def test_load_cached_yields_records(tmp_path, monkeypatch): | |
| monkeypatch.setenv("CI_TRIAGE_DATA_CACHE", str(tmp_path)) | |
| target = tmp_path / "deflaker" | |
| target.mkdir() | |
| rec = FailureRecord( | |
| record_id="x", | |
| source_dataset="deflaker", | |
| project="p", | |
| test_name="t", | |
| log_text="", | |
| ) | |
| (target / "x.json").write_text(rec.model_dump_json()) | |
| assert is_cached("x", "deflaker") | |
| out = list(load_cached("deflaker")) | |
| assert out == [rec] | |
| # --------------------------------------------------------------------------- | |
| # CLI surface | |
| # --------------------------------------------------------------------------- | |
| def test_cli_parser_lists_load_subcommand(): | |
| parser = build_parser() | |
| parsed = parser.parse_args(["load", "deflaker"]) | |
| assert parsed.cmd == "load" | |
| assert parsed.dataset == "deflaker" | |
| def test_cli_load_writes_cache(tmp_path, capsys): | |
| rc = cli_main( | |
| [ | |
| "load", | |
| "deflaker", | |
| "--data-path", | |
| str(FIXTURES_ROOT / "deflaker" / "failures.csv"), | |
| "--cache-dir", | |
| str(tmp_path / "cache"), | |
| "--summary", | |
| ] | |
| ) | |
| assert rc == 0 | |
| out = capsys.readouterr().out | |
| assert "loaded 3 records" in out | |
| summary = json.loads(out.split("\n", 1)[1]) # second line is the JSON | |
| assert summary["count"] == 3 | |
| assert any((tmp_path / "cache").glob("*.json")) | |
| def test_cli_load_with_force_clears_cache(tmp_path, capsys): | |
| cache_dir = tmp_path / "cache" | |
| cache_dir.mkdir() | |
| stale = cache_dir / "stale-id.json" | |
| stale.write_text("{}") | |
| rc = cli_main( | |
| [ | |
| "load", | |
| "deflaker", | |
| "--data-path", | |
| str(FIXTURES_ROOT / "deflaker" / "failures.csv"), | |
| "--cache-dir", | |
| str(cache_dir), | |
| "--force", | |
| ] | |
| ) | |
| assert rc == 0 | |
| assert not stale.exists() | |
| capsys.readouterr() # drain | |
| def test_cli_module_main_runs_via_subprocess(monkeypatch, tmp_path): | |
| """Smoke: ``python -m ci_triage_env.data.cli load --help`` parses cleanly.""" | |
| parser = build_parser() | |
| with pytest.raises(SystemExit): | |
| parser.parse_args(["load", "--help"]) # argparse exits with 0 | |
| # Confirm we didn't accidentally break the module-as-script entrypoint. | |
| import importlib | |
| sys.modules.pop("ci_triage_env.data.cli", None) | |
| importlib.import_module("ci_triage_env.data.cli") | |
| # --------------------------------------------------------------------------- | |
| # Fixture presence (per phase doc §Tests required) | |
| # --------------------------------------------------------------------------- | |
| def test_fixtures_exist(): | |
| for path in [ | |
| FIXTURES_ROOT / "deflaker" / "failures.csv", | |
| FIXTURES_ROOT / "idflakies" / "idflakies.csv", | |
| FIXTURES_ROOT / "flakeflagger" / "features.csv", | |
| FIXTURES_ROOT / "loghub" / "HDFS_v1" / "HDFS_v1.log", | |
| FIXTURES_ROOT / "loghub" / "HDFS_v1" / "HDFS_v1_anomaly.csv", | |
| FIXTURES_ROOT / "loghub" / "Linux" / "Linux.log", | |
| ]: | |
| assert path.exists(), f"fixture missing: {path}" | |
| # --------------------------------------------------------------------------- | |
| # DatasetLoader is abstract (sanity for the contract) | |
| # --------------------------------------------------------------------------- | |
| def test_dataset_loader_is_abstract(): | |
| with pytest.raises(TypeError): | |
| DatasetLoader(data_path=Path("/tmp")) # type: ignore[abstract] | |