Spaces:
Sleeping
Sleeping
| """TimingFlakeGenerator β scenario family: timing_flake.""" | |
| from __future__ import annotations | |
| import random | |
| from ci_triage_env.data.clustering.archetypes import Archetype | |
| from ci_triage_env.data.generators._helpers import ( | |
| ArchetypedGenerator, | |
| _metric_samples, | |
| build_base_outputs, | |
| fake_short_sha, | |
| fake_timestamp, | |
| fill_template, | |
| make_failure_summary, | |
| pick_test_name, | |
| scenario_id_for, | |
| ) | |
| from ci_triage_env.schemas.diagnosis import DiagnosisLabel | |
| from ci_triage_env.schemas.scenario import ( | |
| GroundTruth, | |
| Scenario, | |
| ScenarioMetadata, | |
| TerminalActionSpec, | |
| ToolOutput, | |
| ) | |
| class TimingFlakeGenerator(ArchetypedGenerator): | |
| family_name = "timing_flake" | |
| label = DiagnosisLabel.TIMING_FLAKE | |
| def informative_tools(self) -> list[str]: | |
| return ["read_logs", "query_flake_history", "cluster_metrics"] | |
| def minimal_evidence_set(self) -> list[str]: | |
| return ["query_flake_history", "cluster_metrics"] | |
| def _default_archetypes(self) -> list[Archetype]: | |
| return [ | |
| Archetype( | |
| archetype_id="timing_flake_001", | |
| family="timing_flake", | |
| pattern_summary="Test timeout: deadline exceeded after N seconds", | |
| log_template=( | |
| "--- FAIL: {TEST_FUNC} ({DURATION}s)\n" | |
| " {TEST_MODULE}_test.go:{NUM}: context canceled: deadline exceeded\n" | |
| "FAIL\t{TEST_MODULE}\t{DURATION}s\n" | |
| "panic: test timed out after {TIMEOUT}s" | |
| ), | |
| slot_distributions={ | |
| "TEST_FUNC": ["TestAPIResponse", "TestDBQuery", "TestCacheLoad"], | |
| "TEST_MODULE": ["github.com/org/repo/api", "github.com/org/repo/storage"], | |
| "DURATION": ["30.001", "60.002", "120.000"], | |
| "TIMEOUT": ["30", "60", "120"], | |
| "NUM": ["47", "83", "124"], | |
| }, | |
| informative_tools_hint=["read_logs:full", "query_flake_history", "cluster_metrics:queue_depth"], | |
| minimal_evidence_hint=["query_flake_history", "cluster_metrics:queue_depth"], | |
| ), | |
| Archetype( | |
| archetype_id="timing_flake_002", | |
| family="timing_flake", | |
| pattern_summary="Context canceled / timeout exceeded in async call", | |
| log_template=( | |
| "FAILED {TEST_MODULE}::{TEST_FUNC} - TimeoutError\n" | |
| " TimeoutError: Operation timed out after {DURATION} seconds\n" | |
| " During handling of the above exception:\n" | |
| " asyncio.exceptions.TimeoutError\n" | |
| "short test summary info\n" | |
| "FAILED {TEST_MODULE}::{TEST_FUNC} β timeout exceeded" | |
| ), | |
| slot_distributions={ | |
| "TEST_FUNC": ["test_async_call", "test_rpc_response", "test_batch_process"], | |
| "TEST_MODULE": ["tests/integration/test_rpc", "tests/integration/test_batch"], | |
| "DURATION": ["5.0", "10.0", "30.0"], | |
| }, | |
| informative_tools_hint=["read_logs:full", "query_flake_history", "cluster_metrics:queue_depth"], | |
| minimal_evidence_hint=["cluster_metrics:queue_depth"], | |
| ), | |
| ] | |
| def generate(self, seed: int, source_log_hash: str | None = None) -> Scenario: | |
| rng = random.Random(seed) | |
| archetype = self._pick_archetype(rng) | |
| log_text = fill_template(archetype.log_template, archetype.slot_distributions, rng) | |
| test_name = pick_test_name(rng) | |
| summary = make_failure_summary( | |
| self.family_name, rng, test_name=test_name, log_excerpt=log_text | |
| ) | |
| branch = summary.branch | |
| rerun_passes = (seed % 2 == 0) # passes ~50% of time | |
| outputs = build_base_outputs( | |
| test_name, branch, rng, | |
| log_lines=log_text.splitlines(), | |
| rerun_passes=rerun_passes, | |
| ) | |
| # --- informative overrides --- | |
| # queue_depth elevated β CI under load β timeouts | |
| outputs["cluster_metrics:queue_depth"] = ToolOutput( | |
| tool_name="cluster_metrics", | |
| payload={"samples": _metric_samples(rng, "queue_depth", elevated=True, n=6)}, | |
| cost_units=0.003, | |
| ) | |
| # Flake history: intermittent, correlates with CI load spikes | |
| total_runs = 30 | |
| failures = rng.randint(5, 12) | |
| outputs[f"query_flake_history:{test_name}"] = ToolOutput( | |
| tool_name="query_flake_history", | |
| payload={ | |
| "failure_count": failures, | |
| "pass_count": total_runs - failures, | |
| "recent_failures": [ | |
| {"run_id": fake_short_sha(rng), "at": fake_timestamp(rng), | |
| "note": "CI queue was full"} | |
| for _ in range(min(3, failures)) | |
| ], | |
| }, | |
| cost_units=0.002, | |
| ) | |
| difficulty = rng.choice(["easy", "medium", "hard"]) | |
| rationale = ( | |
| f"query_flake_history shows {failures}/{total_runs} intermittent failures. " | |
| f"cluster_metrics:queue_depth is elevated, indicating CI is under load. " | |
| f"Timeouts correlate with scheduler pressure β not a code bug. " | |
| f"recent_commits shows no test-touching changes." | |
| ) | |
| return Scenario( | |
| schema_version="1.0", | |
| scenario_id=scenario_id_for(self.family_name, seed), | |
| family=self.family_name, | |
| seed=seed, | |
| ground_truth=GroundTruth( | |
| label=self.label, | |
| rationale=rationale, | |
| is_ambiguous=False, | |
| confidence_target=1.0, | |
| ), | |
| failure_summary=summary, | |
| tool_outputs=outputs, | |
| informative_tools=self.informative_tools(), | |
| minimal_evidence_set=self.minimal_evidence_set(), | |
| correct_terminal_action=TerminalActionSpec( | |
| primary="submit_diagnosis", | |
| args={ | |
| "diagnosis": self.label.value, | |
| "confidence": 0.9, | |
| "secondary_actions": [{"name": "rerun_test"}], | |
| }, | |
| acceptable_alternatives=[ | |
| {"primary": "submit_diagnosis", | |
| "args": {"diagnosis": "timing_flake", "confidence": 0.85}}, | |
| ], | |
| ), | |
| metadata=ScenarioMetadata( | |
| generator_version="1.0", | |
| generated_at=fake_timestamp(rng), | |
| source_log_hash=source_log_hash, | |
| difficulty=difficulty, | |
| ), | |
| ) | |