Prasham.Jain
feat(data): Phase B5 — corpus instantiation, HF publish, annotations
18a3fbf
"""``python -m ci_triage_env.data.cli`` entrypoint.
Phase B1 ships ``load <dataset>``; B3 adds ``cluster``; B5 will extend this
with ``generate`` and ``publish-hf`` without breaking the surface here.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from ci_triage_env.data.clustering import ArchetypeExtractor, classify_all
from ci_triage_env.data.datasets import LOADER_REGISTRY
from ci_triage_env.data.datasets.cache import load_all_cached
from ci_triage_env.data.mining import (
DEFAULT_REPOS,
GhAuthError,
GitHubActionsLogScraper,
check_gh_auth,
)
DEFAULT_GHA_OUT_DIR = Path("data_artifacts/datasets_cache/github_actions")
def cmd_mine(args: argparse.Namespace) -> int:
if not args.skip_auth_check:
try:
check_gh_auth()
except GhAuthError as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
scraper_kwargs: dict = {"rate_limit_per_min": args.rate_limit}
if args.cache_dir:
scraper_kwargs["cache_dir"] = Path(args.cache_dir)
scraper = GitHubActionsLogScraper(**scraper_kwargs)
repos = [args.repo] if args.repo else list(DEFAULT_REPOS)
out_dir = Path(args.out_dir) if args.out_dir else DEFAULT_GHA_OUT_DIR
out_dir.mkdir(parents=True, exist_ok=True)
total = 0
for repo in repos:
repo_records = list(scraper.mine_repo(repo, count=args.count))
for record in repo_records:
(out_dir / f"{record.record_id}.json").write_text(record.model_dump_json())
total += len(repo_records)
print(f" {repo}: {len(repo_records)} records")
print(f"mined {total} records into {out_dir}")
return 0
def cmd_load(args: argparse.Namespace) -> int:
loader_cls = LOADER_REGISTRY[args.dataset]
kwargs: dict = {}
if args.data_path:
kwargs["data_path"] = Path(args.data_path)
if args.cache_dir:
kwargs["cache_dir"] = Path(args.cache_dir)
loader = loader_cls(**kwargs)
if args.force and loader.cache_dir.exists():
for path in loader.cache_dir.glob("*.json"):
path.unlink()
records = list(loader.fetch())
written = loader.cache_records(records)
print(f"loaded {len(records)} records from {args.dataset}; cached {written} into {loader.cache_dir}")
if args.summary:
print(json.dumps(loader.info(), indent=2))
return 0
def cmd_cluster(args: argparse.Namespace) -> int:
records = load_all_cached()
if not records:
print("warning: no cached records found — run `cli load <dataset>` first", file=sys.stderr)
api_key: str | None = os.environ.get("OPENAI_API_KEY") if not args.no_llm else None
by_family = classify_all(records, openai_api_key=api_key)
for family, recs in sorted(by_family.items()):
print(f" {family}: {len(recs)} records")
extractor = ArchetypeExtractor()
out_dir = Path(args.out_dir) if args.out_dir else Path("data_artifacts/clustering")
total_archetypes = 0
for family, recs in by_family.items():
if not recs:
print(f"WARNING: {family} has no records — skipping archetype extraction")
continue
archetypes = extractor.extract(recs, family, n_archetypes=args.n_archetypes)
family_dir = out_dir / family
family_dir.mkdir(parents=True, exist_ok=True)
(family_dir / "archetypes.json").write_text(
json.dumps([a.model_dump() for a in archetypes], indent=2)
)
total_archetypes += len(archetypes)
print(f" wrote {len(archetypes)} archetypes → {family_dir}/archetypes.json")
print(f"cluster complete: {total_archetypes} archetypes across {len(by_family)} families")
return 0
def cmd_generate(args: argparse.Namespace) -> int:
from ci_triage_env.data.instantiation import CorpusBuilder
ratios_raw = args.split.split("/")
if len(ratios_raw) != 3:
print("error: --split must be three slash-separated values, e.g. 70/15/15", file=sys.stderr)
return 2
try:
r = [float(v) for v in ratios_raw]
except ValueError:
print("error: --split values must be numbers", file=sys.stderr)
return 2
total_r = sum(r)
split_ratios = (r[0] / total_r, r[1] / total_r, r[2] / total_r)
out_dir = Path(args.output_dir) if args.output_dir else Path("data_artifacts/scenarios")
builder = CorpusBuilder(
total=args.total,
split_ratios=split_ratios,
base_seed=args.seed,
)
summary = builder.build(out_dir)
print(json.dumps(summary, indent=2))
print(f"corpus written to {out_dir}")
return 0
def cmd_publish_hf(args: argparse.Namespace) -> int:
from ci_triage_env.data.publish import publish_to_hf
token: str | None = args.token or os.environ.get("HF_TOKEN")
publish_to_hf(
scenarios_dir=Path(args.scenarios_dir),
dataset_name=args.dataset_name,
token=token,
)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="ci_triage_env.data.cli")
sub = parser.add_subparsers(dest="cmd", required=True)
p_load = sub.add_parser("load", help="Load a public dataset into the local cache.")
p_load.add_argument("dataset", choices=sorted(LOADER_REGISTRY.keys()))
p_load.add_argument(
"--data-path",
default=None,
help="Local artifact path (overrides the dataset's env-var fallback).",
)
p_load.add_argument("--cache-dir", default=None, help="Override the cache directory.")
p_load.add_argument(
"--force",
action="store_true",
help="Wipe the cache before fetching.",
)
p_load.add_argument(
"--summary",
action="store_true",
help="Print a JSON summary (label distribution, count) after loading.",
)
p_load.set_defaults(func=cmd_load)
p_mine = sub.add_parser(
"mine",
help="Mine failed GitHub Actions logs from public repos via the gh CLI.",
)
p_mine.add_argument(
"--repo",
default=None,
help="owner/name (e.g. kubernetes/kubernetes). Omit to mine the default repo set.",
)
p_mine.add_argument("--count", type=int, default=30, help="Failed runs per repo (max).")
p_mine.add_argument(
"--rate-limit",
type=int,
default=60,
help="Outbound gh calls per minute (default 60; cap on the 83/min auth'd limit).",
)
p_mine.add_argument("--cache-dir", default=None, help="Override raw-log cache directory.")
p_mine.add_argument(
"--out-dir",
default=None,
help="Override the FailureRecord output directory (default data_artifacts/datasets_cache/github_actions).",
)
p_mine.add_argument(
"--skip-auth-check",
action="store_true",
help="Skip the `gh auth status` precheck (use only when calling against a recorded fixture).",
)
p_mine.set_defaults(func=cmd_mine)
p_cluster = sub.add_parser(
"cluster",
help="Classify cached FailureRecords into families and extract archetypes.",
)
p_cluster.add_argument(
"--out-dir",
default=None,
help="Output directory for archetype JSON files (default data_artifacts/clustering).",
)
p_cluster.add_argument(
"--n-archetypes",
type=int,
default=4,
help="Max archetypes to extract per family (default 4).",
)
p_cluster.add_argument(
"--no-llm",
action="store_true",
help="Skip LLM fallback even if OPENAI_API_KEY is set.",
)
p_cluster.set_defaults(func=cmd_cluster)
p_generate = sub.add_parser(
"generate",
help="Generate a scenario corpus from all 7 failure family generators.",
)
p_generate.add_argument(
"--total",
type=int,
default=200,
help="Total number of scenarios to generate (default 200).",
)
p_generate.add_argument(
"--split",
default="70/15/15",
help="Train/val/held-out split ratios as slash-separated values (default 70/15/15).",
)
p_generate.add_argument(
"--seed",
type=int,
default=100_000,
help="Base seed for deterministic generation (default 100000).",
)
p_generate.add_argument(
"--output-dir",
default=None,
help="Output directory for scenario JSON files (default data_artifacts/scenarios).",
)
p_generate.set_defaults(func=cmd_generate)
p_publish = sub.add_parser(
"publish-hf",
help="Publish a generated scenario corpus to the HuggingFace dataset hub.",
)
p_publish.add_argument(
"scenarios_dir",
help="Directory containing train/, val/, and held_out/ subdirectories.",
)
p_publish.add_argument(
"dataset_name",
help="HuggingFace repo id, e.g. 'your-org/ci-triage-scenarios'.",
)
p_publish.add_argument(
"--token",
default=None,
help="HuggingFace API token (falls back to HF_TOKEN env var).",
)
p_publish.set_defaults(func=cmd_publish_hf)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
sys.exit(main())