Spaces:
Sleeping
Sleeping
File size: 6,933 Bytes
cd61817 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """Rule-based + optional LLM classifier for CI failure families.
Rule-based handles ~70%+ of cases; LLM fallback is called only for records
the rules cannot confidently classify, keeping OpenAI cost under $5.
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
if TYPE_CHECKING:
pass
from ci_triage_env.data.datasets._base import FailureRecord
FAMILIES: list[str] = [
"real_bug",
"race_flake",
"timing_flake",
"infra_network",
"infra_resource",
"dependency_drift",
"ambiguous",
]
_RULES: dict[str, list[re.Pattern[str]]] = {
"infra_resource": [
re.compile(r"OOMKilled|out of memory|cannot allocate|killed by OS|137 SIGKILL", re.IGNORECASE),
re.compile(r"no space left on device|disk full|ENOSPC", re.IGNORECASE),
re.compile(r"too many open files|EMFILE"),
],
"infra_network": [
re.compile(r"DNS resolution|unable to resolve|getaddrinfo failed|connection refused"),
re.compile(r"TLS handshake.*timeout|x509:.*certificate"),
re.compile(r"socket: connection reset|EHOSTUNREACH|ENETUNREACH"),
],
"race_flake": [
re.compile(r"data race|race detected|WARNING: DATA RACE", re.IGNORECASE),
re.compile(r"concurrent map writes|fatal error: concurrent"),
re.compile(r"deadlock detected"),
],
"timing_flake": [
re.compile(r"deadline exceeded|context canceled|timeout exceeded"),
re.compile(r"test timed out after \d+", re.IGNORECASE),
],
"dependency_drift": [
re.compile(r"Cargo\.lock|package-lock\.json|go\.sum.*conflict"),
re.compile(r"npm ERR! peer dep|incompatible dependency"),
re.compile(r"version (mismatch|conflict)"),
],
}
# Patterns that, if matched, strongly suggest a real bug (assertion, panic, etc.)
_REAL_BUG_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"AssertionError|assert .* failed|FAILED assert", re.IGNORECASE),
re.compile(r"panic: "),
re.compile(r"NullPointerException|AttributeError|TypeError|ValueError"),
re.compile(r"FAIL\b.*\(.*s\)"),
]
# Confidence threshold above which a family is counted as a clear "hit"
_HIT_THRESHOLD = 0.3
class RuleBasedClassifier:
"""Keyword + regex matching to classify obvious CI failure cases.
Returns ``("unknown", 0.0)`` for records that match no rules; these
become candidates for the LLM fallback in ``classify_all``.
"""
def classify(self, record: FailureRecord) -> tuple[str, float]:
"""Return ``(family, confidence)`` where confidence is in ``[0, 1]``."""
text = record.log_text
scores: dict[str, float] = {}
for family, patterns in _RULES.items():
matches = sum(1 for p in patterns if p.search(text))
if matches:
scores[family] = matches / len(patterns)
if not scores:
# Try the real_bug heuristics as a last resort
rb_matches = sum(1 for p in _REAL_BUG_PATTERNS if p.search(text))
if rb_matches:
return ("real_bug", rb_matches / len(_REAL_BUG_PATTERNS))
return ("unknown", 0.0)
hit_families = [f for f, s in scores.items() if s > _HIT_THRESHOLD]
if len(hit_families) > 1:
return ("ambiguous", min(scores[f] for f in hit_families))
best_family = max(scores, key=lambda f: scores[f])
return (best_family, scores[best_family])
class LLMClassifier:
"""Fallback for records the rule-based classifier marked ``'unknown'``.
Calls ``openai`` (must be installed) and stops once ``budget_usd`` is spent.
"""
SYSTEM_PROMPT = (
"You are a CI failure classifier. Given a failure log, output exactly "
"one label from: real_bug, race_flake, timing_flake, infra_network, "
"infra_resource, dependency_drift, ambiguous.\n\n"
"Choose ambiguous only if multiple causes are plausible and no single "
"one dominates. Respond with the label only, no explanation."
)
def __init__(
self,
api_key: str,
model: str = "gpt-4o-mini",
budget_usd: float = 5.0,
) -> None:
from openai import OpenAI # optional dependency — imported lazily
self.client = OpenAI(api_key=api_key)
self.model = model
self.budget = budget_usd
self.spent: float = 0.0
def classify_batch(self, records: list[FailureRecord]) -> list[tuple[str, float]]:
results: list[tuple[str, float]] = []
for record in records:
if self.spent >= self.budget:
results.append(("unknown", 0.0))
continue
log_excerpt = record.log_text[:3000]
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": log_excerpt},
],
max_tokens=20,
)
label = response.choices[0].message.content.strip().lower()
if label not in FAMILIES:
label = "unknown"
self.spent += self._estimate_cost(response)
results.append((label, 0.7))
return results
def _estimate_cost(self, response: object) -> float:
usage = response.usage # type: ignore[attr-defined]
# gpt-4o-mini pricing ($/1M tokens): input $0.15, output $0.60
return (usage.prompt_tokens * 0.15 + usage.completion_tokens * 0.60) / 1_000_000
def classify_all(
records: list[FailureRecord],
openai_api_key: str | None = None,
) -> dict[str, list[FailureRecord]]:
"""Classify all records into family buckets.
Rule-based first; LLM fallback for residuals if ``openai_api_key`` given.
Unresolvable residuals land in ``"real_bug"`` as a safe default.
"""
rule_clf = RuleBasedClassifier()
by_family: dict[str, list[FailureRecord]] = {f: [] for f in FAMILIES}
unknowns: list[FailureRecord] = []
for record in records:
family, _conf = rule_clf.classify(record)
if family == "unknown":
unknowns.append(record)
else:
by_family[family].append(record)
if unknowns and openai_api_key:
llm = LLMClassifier(openai_api_key)
llm_results = llm.classify_batch(unknowns)
for record, (family, _conf) in zip(unknowns, llm_results, strict=False):
target = family if family in by_family else "real_bug"
by_family[target].append(record)
unknowns_after_llm = [
r for r, (f, _) in zip(unknowns, llm_results, strict=False) if f == "unknown"
]
unknowns = unknowns_after_llm
print(f"LLM classified residuals; spent ~${llm.spent:.4f}")
# Any remaining unknowns fall into real_bug
by_family["real_bug"].extend(unknowns)
return by_family
|