File size: 3,220 Bytes
54627d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""FlakeFlagger (Alshammari et al., ICSE 2021) loader.

Source: https://github.com/AlshammariA/FlakeFlagger. ~800 flaky tests labeled
with rich features (timing, IO, threading, randomness, network, concurrency).
Their published feature CSVs are sufficient for clustering — we don't need
raw test runs (B2 mines raw logs).

Expected on-disk format: a CSV with at least these columns:
``project, test_name, flake_type, timing_flag, io_flag, threading_flag``
(boolean-ish flags as ``0/1`` or ``True/False``). Extra columns are kept on
``metadata`` so future B3 features can pick them up without a schema bump.
"""

from __future__ import annotations

import csv
import hashlib
from collections.abc import Iterator

from ci_triage_env.data.datasets._base import DatasetLoader, FailureRecord

_REQUIRED_COLUMNS = {"project", "test_name"}


def _truthy(v: str | None) -> bool:
    if v is None:
        return False
    return v.strip().lower() in {"1", "true", "yes", "y", "t"}


class FlakeFlaggerLoader(DatasetLoader):
    name = "flakeflagger"
    env_var = "FLAKEFLAGGER_DATA_PATH"
    download_instructions = (
        "Download FlakeFlagger's published feature CSV from "
        "https://github.com/AlshammariA/FlakeFlagger (under data/ in the repo) "
        "and point $FLAKEFLAGGER_DATA_PATH at the file. Expected columns "
        "include (project, test_name, flake_type, timing_flag, io_flag, "
        "threading_flag); extras are preserved in metadata."
    )

    def fetch(self) -> Iterator[FailureRecord]:
        path = self._require_data_path()
        with open(path, newline="", encoding="utf-8") as fh:
            reader = csv.DictReader(fh)
            cols = set(reader.fieldnames or [])
            missing = _REQUIRED_COLUMNS - cols
            if missing:
                raise ValueError(
                    f"flakeflagger CSV at {path} is missing required columns: {sorted(missing)}"
                )
            extra_cols = cols - {"project", "test_name", "flake_type"}
            for row in reader:
                project = (row.get("project") or "").strip()
                test_name = (row.get("test_name") or "").strip()
                if not project or not test_name:
                    continue
                flake_type = (row.get("flake_type") or "").strip() or None
                metadata = {
                    "timing_flag": _truthy(row.get("timing_flag")),
                    "io_flag": _truthy(row.get("io_flag")),
                    "threading_flag": _truthy(row.get("threading_flag")),
                }
                # Preserve any extra columns verbatim so B3 can use them.
                for c in extra_cols - {"timing_flag", "io_flag", "threading_flag"}:
                    metadata[c] = row.get(c, "")
                suffix = hashlib.sha1(f"{project}|{test_name}".encode()).hexdigest()[:8]
                yield FailureRecord(
                    record_id=f"flakeflagger-{suffix}",
                    source_dataset="flakeflagger",
                    project=project,
                    test_name=test_name,
                    failure_type_label=flake_type,
                    log_text="",
                    metadata=metadata,
                )