Spaces:
Sleeping
Sleeping
| """Rule-based + optional LLM classifier for CI failure families. | |
| Rule-based handles ~70%+ of cases; LLM fallback is called only for records | |
| the rules cannot confidently classify, keeping OpenAI cost under $5. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| pass | |
| from ci_triage_env.data.datasets._base import FailureRecord | |
| FAMILIES: list[str] = [ | |
| "real_bug", | |
| "race_flake", | |
| "timing_flake", | |
| "infra_network", | |
| "infra_resource", | |
| "dependency_drift", | |
| "ambiguous", | |
| ] | |
| _RULES: dict[str, list[re.Pattern[str]]] = { | |
| "infra_resource": [ | |
| re.compile(r"OOMKilled|out of memory|cannot allocate|killed by OS|137 SIGKILL", re.IGNORECASE), | |
| re.compile(r"no space left on device|disk full|ENOSPC", re.IGNORECASE), | |
| re.compile(r"too many open files|EMFILE"), | |
| ], | |
| "infra_network": [ | |
| re.compile(r"DNS resolution|unable to resolve|getaddrinfo failed|connection refused"), | |
| re.compile(r"TLS handshake.*timeout|x509:.*certificate"), | |
| re.compile(r"socket: connection reset|EHOSTUNREACH|ENETUNREACH"), | |
| ], | |
| "race_flake": [ | |
| re.compile(r"data race|race detected|WARNING: DATA RACE", re.IGNORECASE), | |
| re.compile(r"concurrent map writes|fatal error: concurrent"), | |
| re.compile(r"deadlock detected"), | |
| ], | |
| "timing_flake": [ | |
| re.compile(r"deadline exceeded|context canceled|timeout exceeded"), | |
| re.compile(r"test timed out after \d+", re.IGNORECASE), | |
| ], | |
| "dependency_drift": [ | |
| re.compile(r"Cargo\.lock|package-lock\.json|go\.sum.*conflict"), | |
| re.compile(r"npm ERR! peer dep|incompatible dependency"), | |
| re.compile(r"version (mismatch|conflict)"), | |
| ], | |
| } | |
| # Patterns that, if matched, strongly suggest a real bug (assertion, panic, etc.) | |
| _REAL_BUG_PATTERNS: list[re.Pattern[str]] = [ | |
| re.compile(r"AssertionError|assert .* failed|FAILED assert", re.IGNORECASE), | |
| re.compile(r"panic: "), | |
| re.compile(r"NullPointerException|AttributeError|TypeError|ValueError"), | |
| re.compile(r"FAIL\b.*\(.*s\)"), | |
| ] | |
| # Confidence threshold above which a family is counted as a clear "hit" | |
| _HIT_THRESHOLD = 0.3 | |
| class RuleBasedClassifier: | |
| """Keyword + regex matching to classify obvious CI failure cases. | |
| Returns ``("unknown", 0.0)`` for records that match no rules; these | |
| become candidates for the LLM fallback in ``classify_all``. | |
| """ | |
| def classify(self, record: FailureRecord) -> tuple[str, float]: | |
| """Return ``(family, confidence)`` where confidence is in ``[0, 1]``.""" | |
| text = record.log_text | |
| scores: dict[str, float] = {} | |
| for family, patterns in _RULES.items(): | |
| matches = sum(1 for p in patterns if p.search(text)) | |
| if matches: | |
| scores[family] = matches / len(patterns) | |
| if not scores: | |
| # Try the real_bug heuristics as a last resort | |
| rb_matches = sum(1 for p in _REAL_BUG_PATTERNS if p.search(text)) | |
| if rb_matches: | |
| return ("real_bug", rb_matches / len(_REAL_BUG_PATTERNS)) | |
| return ("unknown", 0.0) | |
| hit_families = [f for f, s in scores.items() if s > _HIT_THRESHOLD] | |
| if len(hit_families) > 1: | |
| return ("ambiguous", min(scores[f] for f in hit_families)) | |
| best_family = max(scores, key=lambda f: scores[f]) | |
| return (best_family, scores[best_family]) | |
| class LLMClassifier: | |
| """Fallback for records the rule-based classifier marked ``'unknown'``. | |
| Calls ``openai`` (must be installed) and stops once ``budget_usd`` is spent. | |
| """ | |
| SYSTEM_PROMPT = ( | |
| "You are a CI failure classifier. Given a failure log, output exactly " | |
| "one label from: real_bug, race_flake, timing_flake, infra_network, " | |
| "infra_resource, dependency_drift, ambiguous.\n\n" | |
| "Choose ambiguous only if multiple causes are plausible and no single " | |
| "one dominates. Respond with the label only, no explanation." | |
| ) | |
| def __init__( | |
| self, | |
| api_key: str, | |
| model: str = "gpt-4o-mini", | |
| budget_usd: float = 5.0, | |
| ) -> None: | |
| from openai import OpenAI # optional dependency — imported lazily | |
| self.client = OpenAI(api_key=api_key) | |
| self.model = model | |
| self.budget = budget_usd | |
| self.spent: float = 0.0 | |
| def classify_batch(self, records: list[FailureRecord]) -> list[tuple[str, float]]: | |
| results: list[tuple[str, float]] = [] | |
| for record in records: | |
| if self.spent >= self.budget: | |
| results.append(("unknown", 0.0)) | |
| continue | |
| log_excerpt = record.log_text[:3000] | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| {"role": "system", "content": self.SYSTEM_PROMPT}, | |
| {"role": "user", "content": log_excerpt}, | |
| ], | |
| max_tokens=20, | |
| ) | |
| label = response.choices[0].message.content.strip().lower() | |
| if label not in FAMILIES: | |
| label = "unknown" | |
| self.spent += self._estimate_cost(response) | |
| results.append((label, 0.7)) | |
| return results | |
| def _estimate_cost(self, response: object) -> float: | |
| usage = response.usage # type: ignore[attr-defined] | |
| # gpt-4o-mini pricing ($/1M tokens): input $0.15, output $0.60 | |
| return (usage.prompt_tokens * 0.15 + usage.completion_tokens * 0.60) / 1_000_000 | |
| def classify_all( | |
| records: list[FailureRecord], | |
| openai_api_key: str | None = None, | |
| ) -> dict[str, list[FailureRecord]]: | |
| """Classify all records into family buckets. | |
| Rule-based first; LLM fallback for residuals if ``openai_api_key`` given. | |
| Unresolvable residuals land in ``"real_bug"`` as a safe default. | |
| """ | |
| rule_clf = RuleBasedClassifier() | |
| by_family: dict[str, list[FailureRecord]] = {f: [] for f in FAMILIES} | |
| unknowns: list[FailureRecord] = [] | |
| for record in records: | |
| family, _conf = rule_clf.classify(record) | |
| if family == "unknown": | |
| unknowns.append(record) | |
| else: | |
| by_family[family].append(record) | |
| if unknowns and openai_api_key: | |
| llm = LLMClassifier(openai_api_key) | |
| llm_results = llm.classify_batch(unknowns) | |
| for record, (family, _conf) in zip(unknowns, llm_results, strict=False): | |
| target = family if family in by_family else "real_bug" | |
| by_family[target].append(record) | |
| unknowns_after_llm = [ | |
| r for r, (f, _) in zip(unknowns, llm_results, strict=False) if f == "unknown" | |
| ] | |
| unknowns = unknowns_after_llm | |
| print(f"LLM classified residuals; spent ~${llm.spent:.4f}") | |
| # Any remaining unknowns fall into real_bug | |
| by_family["real_bug"].extend(unknowns) | |
| return by_family | |