File size: 6,769 Bytes
7a658b7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""TimingFlakeGenerator — scenario family: timing_flake."""

from __future__ import annotations

import random

from ci_triage_env.data.clustering.archetypes import Archetype
from ci_triage_env.data.generators._helpers import (
    ArchetypedGenerator,
    _metric_samples,
    build_base_outputs,
    fake_short_sha,
    fake_timestamp,
    fill_template,
    make_failure_summary,
    pick_test_name,
    scenario_id_for,
)
from ci_triage_env.schemas.diagnosis import DiagnosisLabel
from ci_triage_env.schemas.scenario import (
    GroundTruth,
    Scenario,
    ScenarioMetadata,
    TerminalActionSpec,
    ToolOutput,
)


class TimingFlakeGenerator(ArchetypedGenerator):
    family_name = "timing_flake"
    label = DiagnosisLabel.TIMING_FLAKE

    def informative_tools(self) -> list[str]:
        return ["read_logs", "query_flake_history", "cluster_metrics"]

    def minimal_evidence_set(self) -> list[str]:
        return ["query_flake_history", "cluster_metrics"]

    def _default_archetypes(self) -> list[Archetype]:
        return [
            Archetype(
                archetype_id="timing_flake_001",
                family="timing_flake",
                pattern_summary="Test timeout: deadline exceeded after N seconds",
                log_template=(
                    "--- FAIL: {TEST_FUNC} ({DURATION}s)\n"
                    "    {TEST_MODULE}_test.go:{NUM}: context canceled: deadline exceeded\n"
                    "FAIL\t{TEST_MODULE}\t{DURATION}s\n"
                    "panic: test timed out after {TIMEOUT}s"
                ),
                slot_distributions={
                    "TEST_FUNC": ["TestAPIResponse", "TestDBQuery", "TestCacheLoad"],
                    "TEST_MODULE": ["github.com/org/repo/api", "github.com/org/repo/storage"],
                    "DURATION": ["30.001", "60.002", "120.000"],
                    "TIMEOUT": ["30", "60", "120"],
                    "NUM": ["47", "83", "124"],
                },
                informative_tools_hint=["read_logs:full", "query_flake_history", "cluster_metrics:queue_depth"],
                minimal_evidence_hint=["query_flake_history", "cluster_metrics:queue_depth"],
            ),
            Archetype(
                archetype_id="timing_flake_002",
                family="timing_flake",
                pattern_summary="Context canceled / timeout exceeded in async call",
                log_template=(
                    "FAILED {TEST_MODULE}::{TEST_FUNC} - TimeoutError\n"
                    "  TimeoutError: Operation timed out after {DURATION} seconds\n"
                    "  During handling of the above exception:\n"
                    "  asyncio.exceptions.TimeoutError\n"
                    "short test summary info\n"
                    "FAILED {TEST_MODULE}::{TEST_FUNC} — timeout exceeded"
                ),
                slot_distributions={
                    "TEST_FUNC": ["test_async_call", "test_rpc_response", "test_batch_process"],
                    "TEST_MODULE": ["tests/integration/test_rpc", "tests/integration/test_batch"],
                    "DURATION": ["5.0", "10.0", "30.0"],
                },
                informative_tools_hint=["read_logs:full", "query_flake_history", "cluster_metrics:queue_depth"],
                minimal_evidence_hint=["cluster_metrics:queue_depth"],
            ),
        ]

    def generate(self, seed: int, source_log_hash: str | None = None) -> Scenario:
        rng = random.Random(seed)
        archetype = self._pick_archetype(rng)
        log_text = fill_template(archetype.log_template, archetype.slot_distributions, rng)
        test_name = pick_test_name(rng)

        summary = make_failure_summary(
            self.family_name, rng, test_name=test_name, log_excerpt=log_text
        )
        branch = summary.branch

        rerun_passes = (seed % 2 == 0)  # passes ~50% of time
        outputs = build_base_outputs(
            test_name, branch, rng,
            log_lines=log_text.splitlines(),
            rerun_passes=rerun_passes,
        )

        # --- informative overrides ---
        # queue_depth elevated — CI under load → timeouts
        outputs["cluster_metrics:queue_depth"] = ToolOutput(
            tool_name="cluster_metrics",
            payload={"samples": _metric_samples(rng, "queue_depth", elevated=True, n=6)},
            cost_units=0.003,
        )

        # Flake history: intermittent, correlates with CI load spikes
        total_runs = 30
        failures = rng.randint(5, 12)
        outputs[f"query_flake_history:{test_name}"] = ToolOutput(
            tool_name="query_flake_history",
            payload={
                "failure_count": failures,
                "pass_count": total_runs - failures,
                "recent_failures": [
                    {"run_id": fake_short_sha(rng), "at": fake_timestamp(rng),
                     "note": "CI queue was full"}
                    for _ in range(min(3, failures))
                ],
            },
            cost_units=0.002,
        )

        difficulty = rng.choice(["easy", "medium", "hard"])
        rationale = (
            f"query_flake_history shows {failures}/{total_runs} intermittent failures. "
            f"cluster_metrics:queue_depth is elevated, indicating CI is under load. "
            f"Timeouts correlate with scheduler pressure — not a code bug. "
            f"recent_commits shows no test-touching changes."
        )

        return Scenario(
            schema_version="1.0",
            scenario_id=scenario_id_for(self.family_name, seed),
            family=self.family_name,
            seed=seed,
            ground_truth=GroundTruth(
                label=self.label,
                rationale=rationale,
                is_ambiguous=False,
                confidence_target=1.0,
            ),
            failure_summary=summary,
            tool_outputs=outputs,
            informative_tools=self.informative_tools(),
            minimal_evidence_set=self.minimal_evidence_set(),
            correct_terminal_action=TerminalActionSpec(
                primary="submit_diagnosis",
                args={
                    "diagnosis": self.label.value,
                    "confidence": 0.9,
                    "secondary_actions": [{"name": "rerun_test"}],
                },
                acceptable_alternatives=[
                    {"primary": "submit_diagnosis",
                     "args": {"diagnosis": "timing_flake", "confidence": 0.85}},
                ],
            ),
            metadata=ScenarioMetadata(
                generator_version="1.0",
                generated_at=fake_timestamp(rng),
                source_log_hash=source_log_hash,
                difficulty=difficulty,
            ),
        )