Prasham.Jain
feat(data): Phase B3 — failure clustering and archetype extraction
cd61817
"""Rule-based + optional LLM classifier for CI failure families.
Rule-based handles ~70%+ of cases; LLM fallback is called only for records
the rules cannot confidently classify, keeping OpenAI cost under $5.
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
if TYPE_CHECKING:
pass
from ci_triage_env.data.datasets._base import FailureRecord
FAMILIES: list[str] = [
"real_bug",
"race_flake",
"timing_flake",
"infra_network",
"infra_resource",
"dependency_drift",
"ambiguous",
]
_RULES: dict[str, list[re.Pattern[str]]] = {
"infra_resource": [
re.compile(r"OOMKilled|out of memory|cannot allocate|killed by OS|137 SIGKILL", re.IGNORECASE),
re.compile(r"no space left on device|disk full|ENOSPC", re.IGNORECASE),
re.compile(r"too many open files|EMFILE"),
],
"infra_network": [
re.compile(r"DNS resolution|unable to resolve|getaddrinfo failed|connection refused"),
re.compile(r"TLS handshake.*timeout|x509:.*certificate"),
re.compile(r"socket: connection reset|EHOSTUNREACH|ENETUNREACH"),
],
"race_flake": [
re.compile(r"data race|race detected|WARNING: DATA RACE", re.IGNORECASE),
re.compile(r"concurrent map writes|fatal error: concurrent"),
re.compile(r"deadlock detected"),
],
"timing_flake": [
re.compile(r"deadline exceeded|context canceled|timeout exceeded"),
re.compile(r"test timed out after \d+", re.IGNORECASE),
],
"dependency_drift": [
re.compile(r"Cargo\.lock|package-lock\.json|go\.sum.*conflict"),
re.compile(r"npm ERR! peer dep|incompatible dependency"),
re.compile(r"version (mismatch|conflict)"),
],
}
# Patterns that, if matched, strongly suggest a real bug (assertion, panic, etc.)
_REAL_BUG_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"AssertionError|assert .* failed|FAILED assert", re.IGNORECASE),
re.compile(r"panic: "),
re.compile(r"NullPointerException|AttributeError|TypeError|ValueError"),
re.compile(r"FAIL\b.*\(.*s\)"),
]
# Confidence threshold above which a family is counted as a clear "hit"
_HIT_THRESHOLD = 0.3
class RuleBasedClassifier:
"""Keyword + regex matching to classify obvious CI failure cases.
Returns ``("unknown", 0.0)`` for records that match no rules; these
become candidates for the LLM fallback in ``classify_all``.
"""
def classify(self, record: FailureRecord) -> tuple[str, float]:
"""Return ``(family, confidence)`` where confidence is in ``[0, 1]``."""
text = record.log_text
scores: dict[str, float] = {}
for family, patterns in _RULES.items():
matches = sum(1 for p in patterns if p.search(text))
if matches:
scores[family] = matches / len(patterns)
if not scores:
# Try the real_bug heuristics as a last resort
rb_matches = sum(1 for p in _REAL_BUG_PATTERNS if p.search(text))
if rb_matches:
return ("real_bug", rb_matches / len(_REAL_BUG_PATTERNS))
return ("unknown", 0.0)
hit_families = [f for f, s in scores.items() if s > _HIT_THRESHOLD]
if len(hit_families) > 1:
return ("ambiguous", min(scores[f] for f in hit_families))
best_family = max(scores, key=lambda f: scores[f])
return (best_family, scores[best_family])
class LLMClassifier:
"""Fallback for records the rule-based classifier marked ``'unknown'``.
Calls ``openai`` (must be installed) and stops once ``budget_usd`` is spent.
"""
SYSTEM_PROMPT = (
"You are a CI failure classifier. Given a failure log, output exactly "
"one label from: real_bug, race_flake, timing_flake, infra_network, "
"infra_resource, dependency_drift, ambiguous.\n\n"
"Choose ambiguous only if multiple causes are plausible and no single "
"one dominates. Respond with the label only, no explanation."
)
def __init__(
self,
api_key: str,
model: str = "gpt-4o-mini",
budget_usd: float = 5.0,
) -> None:
from openai import OpenAI # optional dependency — imported lazily
self.client = OpenAI(api_key=api_key)
self.model = model
self.budget = budget_usd
self.spent: float = 0.0
def classify_batch(self, records: list[FailureRecord]) -> list[tuple[str, float]]:
results: list[tuple[str, float]] = []
for record in records:
if self.spent >= self.budget:
results.append(("unknown", 0.0))
continue
log_excerpt = record.log_text[:3000]
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": log_excerpt},
],
max_tokens=20,
)
label = response.choices[0].message.content.strip().lower()
if label not in FAMILIES:
label = "unknown"
self.spent += self._estimate_cost(response)
results.append((label, 0.7))
return results
def _estimate_cost(self, response: object) -> float:
usage = response.usage # type: ignore[attr-defined]
# gpt-4o-mini pricing ($/1M tokens): input $0.15, output $0.60
return (usage.prompt_tokens * 0.15 + usage.completion_tokens * 0.60) / 1_000_000
def classify_all(
records: list[FailureRecord],
openai_api_key: str | None = None,
) -> dict[str, list[FailureRecord]]:
"""Classify all records into family buckets.
Rule-based first; LLM fallback for residuals if ``openai_api_key`` given.
Unresolvable residuals land in ``"real_bug"`` as a safe default.
"""
rule_clf = RuleBasedClassifier()
by_family: dict[str, list[FailureRecord]] = {f: [] for f in FAMILIES}
unknowns: list[FailureRecord] = []
for record in records:
family, _conf = rule_clf.classify(record)
if family == "unknown":
unknowns.append(record)
else:
by_family[family].append(record)
if unknowns and openai_api_key:
llm = LLMClassifier(openai_api_key)
llm_results = llm.classify_batch(unknowns)
for record, (family, _conf) in zip(unknowns, llm_results, strict=False):
target = family if family in by_family else "real_bug"
by_family[target].append(record)
unknowns_after_llm = [
r for r, (f, _) in zip(unknowns, llm_results, strict=False) if f == "unknown"
]
unknowns = unknowns_after_llm
print(f"LLM classified residuals; spent ~${llm.spent:.4f}")
# Any remaining unknowns fall into real_bug
by_family["real_bug"].extend(unknowns)
return by_family