ci-triage-env / tests /data /test_mining.py
Prasham.Jain
feat(branch-b): B2 GitHub Actions log mining — gh-CLI scraper, anonymizer, throttle, cache
a4ff035
"""Phase B2 mining tests.
All ``gh`` invocations are monkeypatched so the suite is fully offline. The
mining module itself never imports network libraries, so the only seam to
fake is ``subprocess.run`` — we route every call through a recording shim.
"""
from __future__ import annotations
import json
from collections.abc import Iterable
from pathlib import Path
import pytest
from ci_triage_env.data.cli import build_parser
from ci_triage_env.data.cli import main as cli_main
from ci_triage_env.data.datasets._base import FailureRecord
from ci_triage_env.data.mining import github_actions as gha_mod
from ci_triage_env.data.mining.anonymizer import anonymize, hash_short
from ci_triage_env.data.mining.cache import DEFAULT_MINING_CACHE, mining_cache_dir
from ci_triage_env.data.mining.github_actions import (
DEFAULT_REPOS,
GhAuthError,
GitHubActionsLogScraper,
check_gh_auth,
)
# ---------------------------------------------------------------------------
# Anonymizer
# ---------------------------------------------------------------------------
def test_anonymizer_replaces_emails():
out = anonymize("contact bob@example.com for help")
assert "EMAIL" in out
assert "bob@example.com" not in out
def test_anonymizer_replaces_full_shas():
full_sha = "abc123def456abc123def456abc123def456abcd" # 40 hex
out = anonymize(f"commit {full_sha} landed")
assert "sha-" in out
assert full_sha not in out
def test_anonymizer_replaces_short_hex():
"""8-char hex tokens get hashed to a stable short token."""
out = anonymize("ref deadbeef in tree")
assert "hex-" in out
assert "deadbeef" not in out
def test_anonymizer_full_sha_takes_precedence_over_short_hex():
"""A 40-hex SHA must replace as one full token, not split into hex- segments."""
full = "0" * 40
out = anonymize(full)
assert out.count("sha-") == 1
assert "hex-" not in out
def test_anonymizer_replaces_user_paths():
out = anonymize("Traceback in /home/alice/project/main.py")
assert "/home/alice/" not in out
assert "/PATH/USER/" in out
def test_anonymizer_replaces_user_mentions():
out = anonymize("blame @alice and @bob-eng for the change")
assert "@alice" not in out
assert "@bob-eng" not in out
assert "@USER" in out
def test_anonymizer_replaces_ipv4():
out = anonymize("Connection refused: 192.168.1.10")
assert "192.168.1.10" not in out
assert "IP" in out
def test_anonymizer_idempotent():
text = "user @alice (alice@example.com) on /home/alice/foo at 192.168.1.1"
once = anonymize(text)
twice = anonymize(once)
assert once == twice
def test_anonymizer_preserves_log_structure():
raw = "ERROR: failure\n at line 12\n\tindented\nESC[31mred\nbye"
out = anonymize(raw)
assert out.count("\n") == raw.count("\n")
assert "\t" in out
assert "ESC[31m" in out
def test_hash_short_is_deterministic():
assert hash_short("abc") == hash_short("abc")
assert len(hash_short("abc")) == 8
# ---------------------------------------------------------------------------
# Cache helper
# ---------------------------------------------------------------------------
def test_mining_cache_dir_default(monkeypatch):
monkeypatch.delenv("CI_TRIAGE_MINING_CACHE", raising=False)
assert mining_cache_dir() == DEFAULT_MINING_CACHE
def test_mining_cache_dir_env_override(monkeypatch, tmp_path):
monkeypatch.setenv("CI_TRIAGE_MINING_CACHE", str(tmp_path / "alt"))
assert mining_cache_dir() == tmp_path / "alt"
# ---------------------------------------------------------------------------
# Subprocess shim used by every scraper test below.
# ---------------------------------------------------------------------------
class _FakeProc:
def __init__(self, returncode: int = 0, stdout: str = "", stderr: str = ""):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
def _make_run_recorder(responses: dict[tuple[str, ...], _FakeProc]):
"""Return (fn, calls) where ``fn`` matches subprocess.run's signature.
``responses`` keys on the *tuple of relevant tokens* in the gh argv.
Anything not matched returns an empty success.
"""
calls: list[list[str]] = []
def _run(argv, capture_output=True, text=True, check=False, **kw):
calls.append(list(argv))
for keys, proc in responses.items():
if all(k in argv for k in keys):
return proc
return _FakeProc(returncode=0, stdout="", stderr="")
return _run, calls
@pytest.fixture
def patched_subprocess(monkeypatch):
def _install(responses):
run, calls = _make_run_recorder(responses)
monkeypatch.setattr(gha_mod.subprocess, "run", run)
# Also disable real time.sleep so throttle tests don't actually wait.
sleeps: list[float] = []
monkeypatch.setattr(gha_mod.time, "sleep", lambda s: sleeps.append(s))
return calls, sleeps
return _install
# ---------------------------------------------------------------------------
# Scraper happy path
# ---------------------------------------------------------------------------
def _gh_run_list_payload(items: Iterable[dict]) -> str:
return json.dumps(list(items))
def _gh_run_view_payload(jobs: Iterable[dict]) -> str:
return json.dumps({"jobs": list(jobs)})
def test_mine_repo_with_fixture_subprocess_yields_failure_records(patched_subprocess, tmp_path):
runs = _gh_run_list_payload([
{
"databaseId": 111,
"workflowName": "test",
"headBranch": "main",
"createdAt": "2026-04-25T12:00:00Z",
}
])
jobs = _gh_run_view_payload([
{"databaseId": 222, "name": "build/x", "conclusion": "failure"},
{"databaseId": 333, "name": "skip/me", "conclusion": "success"},
])
raw_log = "ERROR build failed at sha 0123456789abcdef0123456789abcdef01234567 by @alice (alice@x.com)"
responses = {
("run", "list"): _FakeProc(stdout=runs),
("run", "view", "111", "--json"): _FakeProc(stdout=jobs),
("run", "view", "111", "--log"): _FakeProc(stdout=raw_log),
}
patched_subprocess(responses)
scraper = GitHubActionsLogScraper(rate_limit_per_min=1000, cache_dir=tmp_path)
records = list(scraper.mine_repo("kubernetes/kubernetes", count=5))
assert len(records) == 1
rec = records[0]
assert rec.source_dataset == "github_actions"
assert rec.project == "kubernetes/kubernetes"
assert rec.test_name == "build/x"
assert rec.metadata["run_id"] == 111
assert rec.metadata["job_id"] == 222
# log_text is the anonymized version, raw is in the cache.
assert "@alice" not in rec.log_text
assert "EMAIL" in rec.log_text
assert "sha-" in rec.log_text
cached = (tmp_path / "kubernetes__kubernetes" / "111__222.txt").read_text()
assert "@alice" in cached # raw cache stays un-anonymized for re-derivation
def test_scraper_uses_cache_on_repeat_fetch(patched_subprocess, tmp_path):
"""A second mine_repo call hits the cache for logs (no new --log subprocess)."""
runs = _gh_run_list_payload([
{"databaseId": 1, "workflowName": "test", "headBranch": "main", "createdAt": "t"}
])
jobs = _gh_run_view_payload([{"databaseId": 2, "name": "j", "conclusion": "failure"}])
responses = {
("run", "list"): _FakeProc(stdout=runs),
("run", "view", "1", "--json"): _FakeProc(stdout=jobs),
("run", "view", "1", "--log"): _FakeProc(stdout="hello world"),
}
calls, _ = patched_subprocess(responses)
scraper = GitHubActionsLogScraper(rate_limit_per_min=1000, cache_dir=tmp_path)
list(scraper.mine_repo("x/y", count=1))
log_calls_first = sum(1 for c in calls if "--log" in c)
assert log_calls_first == 1
list(scraper.mine_repo("x/y", count=1))
log_calls_second = sum(1 for c in calls if "--log" in c) - log_calls_first
assert log_calls_second == 0 # second pass served from cache
def test_scraper_throttles_when_rate_limit_exceeded(patched_subprocess, tmp_path, monkeypatch):
# Many failed jobs across runs, low rate limit → throttle must kick in.
runs = _gh_run_list_payload([
{"databaseId": i, "workflowName": "test", "headBranch": "main", "createdAt": "t"}
for i in range(20)
])
jobs = _gh_run_view_payload([{"databaseId": 99, "name": "j", "conclusion": "failure"}])
responses = {
("run", "list"): _FakeProc(stdout=runs),
("--json", "jobs"): _FakeProc(stdout=jobs),
("--log",): _FakeProc(stdout="log"),
}
_, sleeps = patched_subprocess(responses)
# Pin time.time so the sliding window doesn't expire spontaneously.
fake_now = [1000.0]
monkeypatch.setattr(gha_mod.time, "time", lambda: fake_now[0])
scraper = GitHubActionsLogScraper(rate_limit_per_min=3, cache_dir=tmp_path)
list(scraper.mine_repo("x/y", count=20))
assert any(s > 0 for s in sleeps), "expected at least one throttle sleep"
def test_scraper_skips_empty_log(patched_subprocess, tmp_path):
runs = _gh_run_list_payload([
{"databaseId": 1, "workflowName": "test", "headBranch": "main", "createdAt": "t"}
])
jobs = _gh_run_view_payload([{"databaseId": 2, "name": "j", "conclusion": "failure"}])
responses = {
("run", "list"): _FakeProc(stdout=runs),
("--json", "jobs"): _FakeProc(stdout=jobs),
("--log",): _FakeProc(returncode=1, stdout="", stderr="boom"),
}
patched_subprocess(responses)
scraper = GitHubActionsLogScraper(rate_limit_per_min=1000, cache_dir=tmp_path)
records = list(scraper.mine_repo("x/y", count=1))
assert records == []
def test_scraper_skips_docs_and_lint_workflows(patched_subprocess, tmp_path):
runs = _gh_run_list_payload([
{"databaseId": 1, "workflowName": "Docs build", "headBranch": "main", "createdAt": "t"},
{"databaseId": 2, "workflowName": "Spelling", "headBranch": "main", "createdAt": "t"},
{"databaseId": 3, "workflowName": "test-suite", "headBranch": "main", "createdAt": "t"},
])
jobs = _gh_run_view_payload([{"databaseId": 99, "name": "j", "conclusion": "failure"}])
responses = {
("run", "list"): _FakeProc(stdout=runs),
("--json", "jobs"): _FakeProc(stdout=jobs),
("--log",): _FakeProc(stdout="log"),
}
calls, _ = patched_subprocess(responses)
scraper = GitHubActionsLogScraper(rate_limit_per_min=1000, cache_dir=tmp_path)
records = list(scraper.mine_repo("x/y", count=3))
assert len(records) == 1
# Only the test-suite run should have triggered jobs/log fetches.
job_view_calls = [c for c in calls if "--json" in c and "jobs" in c]
assert len(job_view_calls) == 1
def test_log_truncated_to_max_bytes(patched_subprocess, tmp_path):
big = "x" * (5 * 1024 * 1024) # 5MB
runs = _gh_run_list_payload([
{"databaseId": 1, "workflowName": "test", "headBranch": "main", "createdAt": "t"}
])
jobs = _gh_run_view_payload([{"databaseId": 2, "name": "j", "conclusion": "failure"}])
responses = {
("run", "list"): _FakeProc(stdout=runs),
("--json", "jobs"): _FakeProc(stdout=jobs),
("--log",): _FakeProc(stdout=big),
}
patched_subprocess(responses)
scraper = GitHubActionsLogScraper(
rate_limit_per_min=1000, cache_dir=tmp_path, max_log_bytes=200_000
)
list(scraper.mine_repo("x/y", count=1))
cache_path = tmp_path / "x__y" / "1__2.txt"
assert cache_path.exists()
assert len(cache_path.read_bytes()) <= 200_000
def test_scraper_returns_empty_when_run_list_fails(patched_subprocess, tmp_path):
responses = {
("run", "list"): _FakeProc(returncode=1, stderr="auth issue"),
}
patched_subprocess(responses)
scraper = GitHubActionsLogScraper(rate_limit_per_min=1000, cache_dir=tmp_path)
assert list(scraper.mine_repo("x/y", count=1)) == []
def test_scraper_returns_empty_on_invalid_json(patched_subprocess, tmp_path):
responses = {
("run", "list"): _FakeProc(stdout="not json"),
}
patched_subprocess(responses)
scraper = GitHubActionsLogScraper(rate_limit_per_min=1000, cache_dir=tmp_path)
assert list(scraper.mine_repo("x/y", count=1)) == []
# ---------------------------------------------------------------------------
# gh auth check
# ---------------------------------------------------------------------------
def test_check_gh_auth_raises_when_not_installed(monkeypatch):
def _raise(*a, **k):
raise FileNotFoundError("no gh")
monkeypatch.setattr(gha_mod.subprocess, "run", _raise)
with pytest.raises(GhAuthError, match="gh CLI not found"):
check_gh_auth()
def test_check_gh_auth_raises_when_not_authenticated(monkeypatch):
monkeypatch.setattr(
gha_mod.subprocess,
"run",
lambda *a, **k: _FakeProc(returncode=1, stderr="not logged in"),
)
with pytest.raises(GhAuthError, match="not authenticated"):
check_gh_auth()
def test_check_gh_auth_passes_when_authenticated(monkeypatch):
monkeypatch.setattr(
gha_mod.subprocess,
"run",
lambda *a, **k: _FakeProc(returncode=0, stdout="ok"),
)
check_gh_auth() # no exception
# ---------------------------------------------------------------------------
# CLI mine subcommand
# ---------------------------------------------------------------------------
def test_cli_mine_parser_lists_subcommand():
parser = build_parser()
parsed = parser.parse_args(["mine", "--repo", "x/y", "--count", "3", "--skip-auth-check"])
assert parsed.cmd == "mine"
assert parsed.repo == "x/y"
assert parsed.count == 3
assert parsed.skip_auth_check is True
def test_cli_mine_writes_failure_records(monkeypatch, tmp_path, capsys):
runs = _gh_run_list_payload([
{"databaseId": 5, "workflowName": "test", "headBranch": "main", "createdAt": "t"}
])
jobs = _gh_run_view_payload([{"databaseId": 6, "name": "build", "conclusion": "failure"}])
responses = {
("run", "list"): _FakeProc(stdout=runs),
("--json", "jobs"): _FakeProc(stdout=jobs),
("--log",): _FakeProc(stdout="failure log alice@example.com"),
}
run, _ = _make_run_recorder(responses)
monkeypatch.setattr(gha_mod.subprocess, "run", run)
monkeypatch.setattr(gha_mod.time, "sleep", lambda s: None)
cache_dir = tmp_path / "raw"
out_dir = tmp_path / "records"
rc = cli_main(
[
"mine",
"--repo",
"k8s/k8s",
"--count",
"1",
"--rate-limit",
"1000",
"--cache-dir",
str(cache_dir),
"--out-dir",
str(out_dir),
"--skip-auth-check",
]
)
assert rc == 0
out = capsys.readouterr().out
assert "k8s/k8s: 1 records" in out
assert "mined 1 records" in out
written = list(out_dir.glob("*.json"))
assert len(written) == 1
rec = FailureRecord.model_validate_json(written[0].read_text())
assert rec.source_dataset == "github_actions"
assert "EMAIL" in rec.log_text
def test_cli_mine_default_repos_constant_matches():
"""Sanity: the CLI's "no --repo" branch hits the same DEFAULT_REPOS list."""
assert DEFAULT_REPOS # non-empty
assert "kubernetes/kubernetes" in DEFAULT_REPOS
def test_cli_mine_propagates_auth_failure(monkeypatch, capsys):
monkeypatch.setattr(
gha_mod.subprocess,
"run",
lambda *a, **k: _FakeProc(returncode=1, stderr="please login"),
)
rc = cli_main(["mine", "--repo", "x/y", "--count", "1"])
assert rc == 2
captured = capsys.readouterr()
assert "not authenticated" in captured.err
# ---------------------------------------------------------------------------
# Sanity: the default cache root is under the gitignored data_artifacts dir.
# ---------------------------------------------------------------------------
def test_default_mining_cache_under_data_artifacts():
assert Path("data_artifacts") in DEFAULT_MINING_CACHE.parents or DEFAULT_MINING_CACHE.parts[0] == "data_artifacts"