raazkumar commited on
Commit
736e99b
·
verified ·
1 Parent(s): 57b2e21

Upload agent/core/effort_probe.py

Browse files
Files changed (1) hide show
  1. agent/core/effort_probe.py +298 -0
agent/core/effort_probe.py ADDED
@@ -0,0 +1,298 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Probe-and-cascade for reasoning effort on /model switch.
2
+
3
+ We don't maintain a per-model capability table. Instead, the first time a
4
+ user picks a model we fire a 1-token ping with the same params we'd use
5
+ for real and walk down a cascade (``max`` → ``xhigh`` → ``high`` → …)
6
+ until the provider stops rejecting us. The result is cached per-model on
7
+ the session, so real messages don't pay the probe cost again.
8
+
9
+ Three outcomes, classified from the 400 error text:
10
+
11
+ * success → cache the effort that worked
12
+ * ``"thinking ... not supported"`` → model doesn't do thinking at all;
13
+ cache ``None`` so we stop sending thinking params
14
+ * ``"effort ... invalid"`` / synonyms → cascade walks down and retries
15
+
16
+ Transient errors (5xx, timeout, connection reset) bubble out as
17
+ ``ProbeInconclusive`` so the caller can complete the switch with a
18
+ warning instead of blocking on a flaky provider.
19
+ """
20
+
21
+ from __future__ import annotations
22
+
23
+ import asyncio
24
+ import logging
25
+ import time
26
+ from dataclasses import dataclass
27
+ from typing import Any
28
+
29
+ from litellm import acompletion
30
+
31
+ from agent.core.llm_params import UnsupportedEffortError, _resolve_llm_params
32
+
33
+ logger = logging.getLogger(__name__)
34
+
35
+
36
+ # Cascade: for each user-stated preference, the ordered list of levels to
37
+ # try. First success wins. ``max`` is Anthropic-only; ``xhigh`` is also
38
+ # supported on current OpenAI GPT-5 models. Providers that don't accept a
39
+ # requested level raise ``UnsupportedEffortError`` synchronously (no wasted
40
+ # network round-trip) and we advance to the next level.
41
+ _EFFORT_CASCADE: dict[str, list[str]] = {
42
+ "max": ["max", "xhigh", "high", "medium", "low"],
43
+ "xhigh": ["xhigh", "high", "medium", "low"],
44
+ "high": ["high", "medium", "low"],
45
+ "medium": ["medium", "low"],
46
+ "minimal": ["minimal", "low"],
47
+ "low": ["low"],
48
+ }
49
+
50
+ _PROBE_TIMEOUT = 15.0
51
+ # Keep the probe cheap, but high enough that frontier reasoning models can
52
+ # finish a trivial reply instead of tripping a false "output limit reached"
53
+ # error during capability detection.
54
+ _PROBE_MAX_TOKENS = 64
55
+
56
+
57
+ class ProbeInconclusive(Exception):
58
+ """The probe couldn't reach a verdict (transient network / provider error).
59
+
60
+ Caller should complete the switch with a warning — the next real call
61
+ will re-surface the error if it's persistent.
62
+ """
63
+
64
+
65
+ @dataclass
66
+ class ProbeOutcome:
67
+ """What the probe learned. ``effective_effort`` semantics match the cache:
68
+
69
+ * str → send this level
70
+ * None → model doesn't support thinking; strip it
71
+ """
72
+
73
+ effective_effort: str | None
74
+ attempts: int
75
+ elapsed_ms: int
76
+ note: str | None = None # e.g. "max not supported, falling back"
77
+
78
+
79
+ def _is_thinking_unsupported(e: Exception) -> bool:
80
+ """Model rejected any thinking config.
81
+
82
+ Matches Anthropic's 'thinking.type.enabled is not supported for this
83
+ model' as well as the adaptive variant. Substring-match because the
84
+ exact wording shifts across API versions.
85
+ """
86
+ s = str(e).lower()
87
+ return "thinking" in s and "not supported" in s
88
+
89
+
90
+ def _is_invalid_effort(e: Exception) -> bool:
91
+ """The requested effort level isn't accepted for this model.
92
+
93
+ Covers both API responses (Anthropic/OpenAI 400 with "invalid", "must
94
+ be one of", etc.) and LiteLLM's local validation that fires *before*
95
+ the request (e.g. "effort='max' is only supported by Claude Opus 4.6"
96
+ — LiteLLM knows max is Opus-4.6-only and raises synchronously). The
97
+ cascade walks down on either.
98
+
99
+ Explicitly returns False when the message is really about thinking
100
+ itself (e.g. Anthropic's 4.7 error mentions ``output_config.effort``
101
+ in its fix hint, but the actual failure is ``thinking.type.enabled``
102
+ being unsupported). That case is caught by ``_is_thinking_unsupported``.
103
+ """
104
+ if _is_thinking_unsupported(e):
105
+ return False
106
+ s = str(e).lower()
107
+ if "effort" not in s and "output_config" not in s:
108
+ return False
109
+ return any(
110
+ phrase in s
111
+ for phrase in (
112
+ "invalid",
113
+ "not supported",
114
+ "must be one of",
115
+ "not a valid",
116
+ "unrecognized",
117
+ "unknown",
118
+ # LiteLLM's own pre-flight validation phrasing.
119
+ "only supported by",
120
+ "is only supported",
121
+ )
122
+ )
123
+
124
+
125
+ def _is_transient(e: Exception) -> bool:
126
+ """Network / provider-side flake. Keep in sync with agent_loop's list.
127
+
128
+ Also matches by type for ``asyncio.TimeoutError`` — its ``str(e)`` is
129
+ empty, so substring matching alone misses it.
130
+ """
131
+ if isinstance(e, (asyncio.TimeoutError, TimeoutError)):
132
+ return True
133
+ s = str(e).lower()
134
+ return any(
135
+ p in s
136
+ for p in (
137
+ "timeout",
138
+ "timed out",
139
+ "429",
140
+ "rate limit",
141
+ "503",
142
+ "service unavailable",
143
+ "502",
144
+ "bad gateway",
145
+ "500",
146
+ "internal server error",
147
+ "overloaded",
148
+ "capacity",
149
+ "connection reset",
150
+ "connection refused",
151
+ "connection error",
152
+ "eof",
153
+ "broken pipe",
154
+ )
155
+ )
156
+
157
+
158
+ async def probe_effort(
159
+ model_name: str,
160
+ preference: str | None,
161
+ hf_token: str | None,
162
+ session: Any = None,
163
+ ) -> ProbeOutcome:
164
+ """Walk the cascade for ``preference`` on ``model_name``.
165
+
166
+ Returns the first effort the provider accepts, or ``None`` if it
167
+ rejects thinking altogether. Raises ``ProbeInconclusive`` only for
168
+ transient errors (5xx, timeout) — persistent 4xx that aren't thinking/
169
+ effort related bubble as the original exception so callers can surface
170
+ them (auth, model-not-found, quota, etc.).
171
+
172
+ ``session`` is optional; when provided, each successful probe attempt
173
+ is recorded via ``telemetry.record_llm_call(kind="effort_probe")`` so
174
+ the cost shows up in the session's ``total_cost_usd``. Failed probes
175
+ (rejected by the provider) typically aren't billed, so we only record
176
+ on success.
177
+ """
178
+ loop = asyncio.get_event_loop()
179
+ start = loop.time()
180
+ attempts = 0
181
+
182
+ if not preference:
183
+ # User explicitly turned effort off — nothing to probe. A bare
184
+ # ping with no thinking params is pointless; just report "off".
185
+ return ProbeOutcome(effective_effort=None, attempts=0, elapsed_ms=0)
186
+
187
+ # Local / self-hosted providers rarely support reasoning effort.
188
+ # Skip the probe to avoid wasting time on a cascade that will fail.
189
+ _LOCAL_PREFIXES = {
190
+ "llamacpp", "lmstudio", "mlx", "nim", "local",
191
+ "ollama", "vllm", "tgi",
192
+ }
193
+ if model_name.split("/", 1)[0] in _LOCAL_PREFIXES:
194
+ return ProbeOutcome(
195
+ effective_effort=None,
196
+ attempts=0,
197
+ elapsed_ms=0,
198
+ note="local provider — reasoning effort skipped",
199
+ )
200
+
201
+ cascade = _EFFORT_CASCADE.get(preference, [preference])
202
+ skipped: list[str] = [] # levels the provider rejected synchronously
203
+
204
+ last_error: Exception | None = None
205
+ for effort in cascade:
206
+ try:
207
+ params = _resolve_llm_params(
208
+ model_name,
209
+ hf_token,
210
+ reasoning_effort=effort,
211
+ strict=True,
212
+ )
213
+ except UnsupportedEffortError:
214
+ # Provider can't even accept this effort name (e.g. "max" on
215
+ # HF router). Skip without a network call.
216
+ skipped.append(effort)
217
+ continue
218
+
219
+ attempts += 1
220
+ try:
221
+ _t0 = time.monotonic()
222
+ response = await asyncio.wait_for(
223
+ acompletion(
224
+ messages=[{"role": "user", "content": "ping"}],
225
+ max_tokens=_PROBE_MAX_TOKENS,
226
+ stream=False,
227
+ **params,
228
+ ),
229
+ timeout=_PROBE_TIMEOUT,
230
+ )
231
+ if session is not None:
232
+ # Best-effort telemetry — never let a logging blip propagate
233
+ # out of the probe and break model switching.
234
+ try:
235
+ from agent.core import telemetry
236
+
237
+ await telemetry.record_llm_call(
238
+ session,
239
+ model=model_name,
240
+ response=response,
241
+ latency_ms=int((time.monotonic() - _t0) * 1000),
242
+ finish_reason=response.choices[0].finish_reason
243
+ if response.choices
244
+ else None,
245
+ kind="effort_probe",
246
+ )
247
+ except Exception as _telem_err:
248
+ logger.debug("effort_probe telemetry failed: %s", _telem_err)
249
+ except Exception as e:
250
+ last_error = e
251
+ if _is_thinking_unsupported(e):
252
+ elapsed = int((loop.time() - start) * 1000)
253
+ return ProbeOutcome(
254
+ effective_effort=None,
255
+ attempts=attempts,
256
+ elapsed_ms=elapsed,
257
+ note="model doesn't support reasoning, dropped",
258
+ )
259
+ if _is_invalid_effort(e):
260
+ logger.debug(
261
+ "probe: %s rejected effort=%s, trying next", model_name, effort
262
+ )
263
+ continue
264
+ if _is_transient(e):
265
+ raise ProbeInconclusive(str(e)) from e
266
+ # Persistent non-thinking 4xx (auth, quota, model-not-found) —
267
+ # let the caller classify & surface.
268
+ raise
269
+ else:
270
+ elapsed = int((loop.time() - start) * 1000)
271
+ note = None
272
+ if effort != preference:
273
+ note = f"{preference} not supported, using {effort}"
274
+ return ProbeOutcome(
275
+ effective_effort=effort,
276
+ attempts=attempts,
277
+ elapsed_ms=elapsed,
278
+ note=note,
279
+ )
280
+
281
+ # Cascade exhausted without a success. This only happens when every
282
+ # level was either rejected synchronously (``UnsupportedEffortError``,
283
+ # e.g. preference=max on HF and we also somehow filtered all others)
284
+ # or the provider 400'd ``invalid effort`` on every level.
285
+ elapsed = int((loop.time() - start) * 1000)
286
+ if last_error is not None and not _is_invalid_effort(last_error):
287
+ raise last_error
288
+ note = (
289
+ "no effort level accepted — proceeding without thinking"
290
+ if not skipped
291
+ else f"provider rejected all efforts ({', '.join(skipped)})"
292
+ )
293
+ return ProbeOutcome(
294
+ effective_effort=None,
295
+ attempts=attempts,
296
+ elapsed_ms=elapsed,
297
+ note=note,
298
+ )