Prasham.Jain
feat(branch-b): B1 public dataset ingest — DeFlaker, iDFlakies, FlakeFlagger, LogHub loaders + CLI
54627d8
"""FlakeFlagger (Alshammari et al., ICSE 2021) loader.
Source: https://github.com/AlshammariA/FlakeFlagger. ~800 flaky tests labeled
with rich features (timing, IO, threading, randomness, network, concurrency).
Their published feature CSVs are sufficient for clustering — we don't need
raw test runs (B2 mines raw logs).
Expected on-disk format: a CSV with at least these columns:
``project, test_name, flake_type, timing_flag, io_flag, threading_flag``
(boolean-ish flags as ``0/1`` or ``True/False``). Extra columns are kept on
``metadata`` so future B3 features can pick them up without a schema bump.
"""
from __future__ import annotations
import csv
import hashlib
from collections.abc import Iterator
from ci_triage_env.data.datasets._base import DatasetLoader, FailureRecord
_REQUIRED_COLUMNS = {"project", "test_name"}
def _truthy(v: str | None) -> bool:
if v is None:
return False
return v.strip().lower() in {"1", "true", "yes", "y", "t"}
class FlakeFlaggerLoader(DatasetLoader):
name = "flakeflagger"
env_var = "FLAKEFLAGGER_DATA_PATH"
download_instructions = (
"Download FlakeFlagger's published feature CSV from "
"https://github.com/AlshammariA/FlakeFlagger (under data/ in the repo) "
"and point $FLAKEFLAGGER_DATA_PATH at the file. Expected columns "
"include (project, test_name, flake_type, timing_flag, io_flag, "
"threading_flag); extras are preserved in metadata."
)
def fetch(self) -> Iterator[FailureRecord]:
path = self._require_data_path()
with open(path, newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
cols = set(reader.fieldnames or [])
missing = _REQUIRED_COLUMNS - cols
if missing:
raise ValueError(
f"flakeflagger CSV at {path} is missing required columns: {sorted(missing)}"
)
extra_cols = cols - {"project", "test_name", "flake_type"}
for row in reader:
project = (row.get("project") or "").strip()
test_name = (row.get("test_name") or "").strip()
if not project or not test_name:
continue
flake_type = (row.get("flake_type") or "").strip() or None
metadata = {
"timing_flag": _truthy(row.get("timing_flag")),
"io_flag": _truthy(row.get("io_flag")),
"threading_flag": _truthy(row.get("threading_flag")),
}
# Preserve any extra columns verbatim so B3 can use them.
for c in extra_cols - {"timing_flag", "io_flag", "threading_flag"}:
metadata[c] = row.get(c, "")
suffix = hashlib.sha1(f"{project}|{test_name}".encode()).hexdigest()[:8]
yield FailureRecord(
record_id=f"flakeflagger-{suffix}",
source_dataset="flakeflagger",
project=project,
test_name=test_name,
failure_type_label=flake_type,
log_text="",
metadata=metadata,
)