raazkumar commited on
Commit
a8c86bb
·
verified ·
1 Parent(s): 2266b31

feat: add proactive rate-limit guard for NIM 40 req/min

Browse files
Files changed (1) hide show
  1. agent/core/agent_loop.py +2056 -0
agent/core/agent_loop.py ADDED
@@ -0,0 +1,2056 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """loop
2
+ Main agent implementation with integrated tool system and MCP support
3
+ """
4
+
5
+ import asyncio
6
+ import json
7
+ import logging
8
+ import time
9
+ from dataclasses import dataclass, field
10
+ from typing import Any
11
+
12
+ from litellm import (
13
+ ChatCompletionMessageToolCall,
14
+ Message,
15
+ acompletion,
16
+ stream_chunk_builder,
17
+ )
18
+ from litellm.exceptions import ContextWindowExceededError
19
+
20
+ from agent.config import Config
21
+ from agent.core.approval_policy import (
22
+ is_scheduled_operation,
23
+ normalize_tool_operation,
24
+ )
25
+ from agent.core.cost_estimation import CostEstimate, estimate_tool_cost
26
+ from agent.messaging.gateway import NotificationGateway
27
+ from agent.core import telemetry
28
+ from agent.core.doom_loop import check_for_doom_loop
29
+ from agent.core.llm_params import _resolve_llm_params
30
+ from agent.core.prompt_caching import with_prompt_caching
31
+ from agent.core.session import Event, OpType, Session
32
+ from agent.core.tools import ToolRouter
33
+ from agent.tools.jobs_tool import CPU_FLAVORS
34
+ from agent.tools.sandbox_tool import DEFAULT_CPU_SANDBOX_HARDWARE
35
+
36
+ logger = logging.getLogger(__name__)
37
+
38
+ ToolCall = ChatCompletionMessageToolCall
39
+
40
+ _MALFORMED_TOOL_PREFIX = "ERROR: Tool call to '"
41
+ _MALFORMED_TOOL_SUFFIX = "' had malformed JSON arguments"
42
+
43
+
44
+ def _malformed_tool_name(message: Message) -> str | None:
45
+ """Return the tool name for malformed-json tool-result messages."""
46
+ if getattr(message, "role", None) != "tool":
47
+ return None
48
+ content = getattr(message, "content", None)
49
+ if not isinstance(content, str):
50
+ return None
51
+ if not content.startswith(_MALFORMED_TOOL_PREFIX):
52
+ return None
53
+ end = content.find(_MALFORMED_TOOL_SUFFIX, len(_MALFORMED_TOOL_PREFIX))
54
+ if end == -1:
55
+ return None
56
+ return content[len(_MALFORMED_TOOL_PREFIX) : end]
57
+
58
+
59
+ def _detect_repeated_malformed(
60
+ items: list[Message],
61
+ threshold: int = 2,
62
+ ) -> str | None:
63
+ """Return the repeated malformed tool name if the tail contains a streak.
64
+
65
+ Walk backward over the current conversation tail. A streak counts only
66
+ consecutive malformed tool-result messages for the same tool; any other
67
+ tool result breaks it.
68
+ """
69
+ if threshold <= 0:
70
+ return None
71
+
72
+ streak_tool: str | None = None
73
+ streak = 0
74
+
75
+ for item in reversed(items):
76
+ if getattr(item, "role", None) != "tool":
77
+ continue
78
+
79
+ malformed_tool = _malformed_tool_name(item)
80
+ if malformed_tool is None:
81
+ break
82
+
83
+ if streak_tool is None:
84
+ streak_tool = malformed_tool
85
+ streak = 1
86
+ elif malformed_tool == streak_tool:
87
+ streak += 1
88
+ else:
89
+ break
90
+
91
+ if streak >= threshold:
92
+ return streak_tool
93
+
94
+ return None
95
+
96
+
97
+ def _validate_tool_args(tool_args: dict) -> tuple[bool, str | None]:
98
+ """
99
+ Validate tool arguments structure.
100
+
101
+ Returns:
102
+ (is_valid, error_message)
103
+ """
104
+ args = tool_args.get("args", {})
105
+ # Sometimes LLM passes args as string instead of dict
106
+ if isinstance(args, str):
107
+ return (
108
+ False,
109
+ f"Tool call error: 'args' must be a JSON object, not a string. You passed: {repr(args)}",
110
+ )
111
+ if not isinstance(args, dict) and args is not None:
112
+ return (
113
+ False,
114
+ f"Tool call error: 'args' must be a JSON object. You passed type: {type(args).__name__}",
115
+ )
116
+ return True, None
117
+
118
+
119
+ _IMMEDIATE_HF_JOB_RUNS = {"run", "uv"}
120
+
121
+
122
+ @dataclass(frozen=True)
123
+ class ApprovalDecision:
124
+ requires_approval: bool
125
+ auto_approved: bool = False
126
+ auto_approval_blocked: bool = False
127
+ block_reason: str | None = None
128
+ estimated_cost_usd: float | None = None
129
+ remaining_cap_usd: float | None = None
130
+ billable: bool = False
131
+
132
+
133
+ def _operation(tool_args: dict) -> str:
134
+ return normalize_tool_operation(tool_args.get("operation"))
135
+
136
+
137
+ def _is_immediate_hf_job_run(tool_name: str, tool_args: dict) -> bool:
138
+ return tool_name == "hf_jobs" and _operation(tool_args) in _IMMEDIATE_HF_JOB_RUNS
139
+
140
+
141
+ def _is_scheduled_hf_job_run(tool_name: str, tool_args: dict) -> bool:
142
+ return tool_name == "hf_jobs" and is_scheduled_operation(_operation(tool_args))
143
+
144
+
145
+ def _is_budgeted_auto_approval_target(tool_name: str, tool_args: dict) -> bool:
146
+ return tool_name == "sandbox_create" or _is_immediate_hf_job_run(
147
+ tool_name, tool_args
148
+ )
149
+
150
+
151
+ def _base_needs_approval(
152
+ tool_name: str, tool_args: dict, config: Config | None = None
153
+ ) -> bool:
154
+ """Check if a tool call requires approval before YOLO policy is applied."""
155
+
156
+ # If args are malformed, skip approval (validation error will be shown later)
157
+ args_valid, _ = _validate_tool_args(tool_args)
158
+ if not args_valid:
159
+ return False
160
+
161
+ if tool_name == "sandbox_create":
162
+ hardware = tool_args.get("hardware") or DEFAULT_CPU_SANDBOX_HARDWARE
163
+ return hardware != DEFAULT_CPU_SANDBOX_HARDWARE
164
+
165
+ if tool_name == "hf_jobs":
166
+ operation = _operation(tool_args)
167
+ if is_scheduled_operation(operation):
168
+ return True
169
+ if operation not in _IMMEDIATE_HF_JOB_RUNS:
170
+ return False
171
+
172
+ # Check if this is a CPU-only job
173
+ # hardware_flavor is at top level of tool_args, not nested in args
174
+ hardware_flavor = (
175
+ tool_args.get("hardware_flavor")
176
+ or tool_args.get("flavor")
177
+ or tool_args.get("hardware")
178
+ or "cpu-basic"
179
+ )
180
+ is_cpu_job = hardware_flavor in CPU_FLAVORS
181
+
182
+ if is_cpu_job:
183
+ if config and not config.confirm_cpu_jobs:
184
+ return False
185
+ return True
186
+
187
+ return True
188
+
189
+ # Check for file upload operations (hf_private_repos or other tools)
190
+ if tool_name == "hf_private_repos":
191
+ operation = tool_args.get("operation", "")
192
+ if operation == "upload_file":
193
+ if config and config.auto_file_upload:
194
+ return False
195
+ return True
196
+ # Other operations (create_repo, etc.) always require approval
197
+ if operation in ["create_repo"]:
198
+ return True
199
+
200
+ # hf_repo_files: upload (can overwrite) and delete require approval
201
+ if tool_name == "hf_repo_files":
202
+ operation = tool_args.get("operation", "")
203
+ if operation in ["upload", "delete"]:
204
+ return True
205
+
206
+ # hf_repo_git: destructive operations require approval
207
+ if tool_name == "hf_repo_git":
208
+ operation = tool_args.get("operation", "")
209
+ if operation in [
210
+ "delete_branch",
211
+ "delete_tag",
212
+ "merge_pr",
213
+ "create_repo",
214
+ "update_repo",
215
+ ]:
216
+ return True
217
+
218
+ return False
219
+
220
+
221
+ def _needs_approval(
222
+ tool_name: str, tool_args: dict, config: Config | None = None
223
+ ) -> bool:
224
+ """Legacy sync approval predicate used by tests and CLI display helpers."""
225
+ if _is_scheduled_hf_job_run(tool_name, tool_args):
226
+ return True
227
+ if config and config.yolo_mode:
228
+ return False
229
+ return _base_needs_approval(tool_name, tool_args, config)
230
+
231
+
232
+ def _session_auto_approval_enabled(session: Session | None) -> bool:
233
+ return bool(session and getattr(session, "auto_approval_enabled", False))
234
+
235
+
236
+ def _effective_yolo_enabled(session: Session | None, config: Config | None) -> bool:
237
+ return bool(
238
+ (config and config.yolo_mode) or _session_auto_approval_enabled(session)
239
+ )
240
+
241
+
242
+ def _remaining_budget_after_reservations(
243
+ session: Session | None, reserved_spend_usd: float
244
+ ) -> float | None:
245
+ if not session or getattr(session, "auto_approval_cost_cap_usd", None) is None:
246
+ return None
247
+ cap = float(getattr(session, "auto_approval_cost_cap_usd") or 0.0)
248
+ spent = float(getattr(session, "auto_approval_estimated_spend_usd", 0.0) or 0.0)
249
+ return round(max(0.0, cap - spent - reserved_spend_usd), 4)
250
+
251
+
252
+ def _budget_block_reason(
253
+ estimate: CostEstimate,
254
+ *,
255
+ remaining_cap_usd: float | None,
256
+ ) -> str | None:
257
+ if estimate.estimated_cost_usd is None:
258
+ return estimate.block_reason or "Could not estimate the cost safely."
259
+ if (
260
+ remaining_cap_usd is not None
261
+ and estimate.estimated_cost_usd > remaining_cap_usd
262
+ ):
263
+ return (
264
+ f"Estimated cost ${estimate.estimated_cost_usd:.2f} exceeds "
265
+ f"remaining YOLO cap ${remaining_cap_usd:.2f}."
266
+ )
267
+ return None
268
+
269
+
270
+ async def _approval_decision(
271
+ tool_name: str,
272
+ tool_args: dict,
273
+ session: Session,
274
+ *,
275
+ reserved_spend_usd: float = 0.0,
276
+ ) -> ApprovalDecision:
277
+ """Return the approval decision for one parsed tool call."""
278
+ config = session.config
279
+ base_requires_approval = _base_needs_approval(tool_name, tool_args, config)
280
+
281
+ # Scheduled jobs are recurring/unbounded enough that YOLO never bypasses
282
+ # the human confirmation, including legacy config.yolo_mode.
283
+ if _is_scheduled_hf_job_run(tool_name, tool_args):
284
+ return ApprovalDecision(
285
+ requires_approval=True,
286
+ auto_approval_blocked=_effective_yolo_enabled(session, config),
287
+ block_reason="Scheduled HF jobs always require manual approval.",
288
+ )
289
+
290
+ yolo_enabled = _effective_yolo_enabled(session, config)
291
+ budgeted_target = _is_budgeted_auto_approval_target(tool_name, tool_args)
292
+
293
+ # Cost caps are a session-scoped web policy. Legacy config.yolo_mode
294
+ # remains uncapped for CLI/headless, except for scheduled jobs above.
295
+ session_yolo_enabled = _session_auto_approval_enabled(session)
296
+ if yolo_enabled and budgeted_target and session_yolo_enabled:
297
+ estimate = await estimate_tool_cost(tool_name, tool_args, session=session)
298
+ remaining = _remaining_budget_after_reservations(session, reserved_spend_usd)
299
+ reason = _budget_block_reason(estimate, remaining_cap_usd=remaining)
300
+ if reason:
301
+ return ApprovalDecision(
302
+ requires_approval=True,
303
+ auto_approval_blocked=True,
304
+ block_reason=reason,
305
+ estimated_cost_usd=estimate.estimated_cost_usd,
306
+ remaining_cap_usd=remaining,
307
+ billable=estimate.billable,
308
+ )
309
+ if base_requires_approval:
310
+ return ApprovalDecision(
311
+ requires_approval=False,
312
+ auto_approved=True,
313
+ estimated_cost_usd=estimate.estimated_cost_usd,
314
+ remaining_cap_usd=remaining,
315
+ billable=estimate.billable,
316
+ )
317
+ return ApprovalDecision(
318
+ requires_approval=False,
319
+ estimated_cost_usd=estimate.estimated_cost_usd,
320
+ remaining_cap_usd=remaining,
321
+ billable=estimate.billable,
322
+ )
323
+
324
+ if base_requires_approval and yolo_enabled:
325
+ return ApprovalDecision(requires_approval=False, auto_approved=True)
326
+
327
+ return ApprovalDecision(requires_approval=base_requires_approval)
328
+
329
+
330
+ def _record_estimated_spend(session: Session, decision: ApprovalDecision) -> None:
331
+ if not decision.billable or decision.estimated_cost_usd is None:
332
+ return
333
+ if hasattr(session, "add_auto_approval_estimated_spend"):
334
+ session.add_auto_approval_estimated_spend(decision.estimated_cost_usd)
335
+ else:
336
+ session.auto_approval_estimated_spend_usd = round(
337
+ float(getattr(session, "auto_approval_estimated_spend_usd", 0.0) or 0.0)
338
+ + float(decision.estimated_cost_usd),
339
+ 4,
340
+ )
341
+
342
+
343
+ async def _record_manual_approved_spend_if_needed(
344
+ session: Session,
345
+ tool_name: str,
346
+ tool_args: dict,
347
+ ) -> None:
348
+ if not _session_auto_approval_enabled(session):
349
+ return
350
+ if not _is_budgeted_auto_approval_target(tool_name, tool_args):
351
+ return
352
+ estimate = await estimate_tool_cost(tool_name, tool_args, session=session)
353
+ _record_estimated_spend(
354
+ session,
355
+ ApprovalDecision(
356
+ requires_approval=False,
357
+ billable=estimate.billable,
358
+ estimated_cost_usd=estimate.estimated_cost_usd,
359
+ ),
360
+ )
361
+
362
+
363
+ # -- LLM retry constants --------------------------------------------------
364
+ _MAX_LLM_RETRIES = 3
365
+ _LLM_RETRY_DELAYS = [5, 15, 30] # seconds between retries
366
+ _LLM_RATE_LIMIT_RETRY_DELAYS = [30, 60] # exceed Bedrock's ~60s TPM bucket window
367
+
368
+
369
+ def _is_rate_limit_error(error: Exception) -> bool:
370
+ """Return True for rate-limit / quota-bucket style provider errors."""
371
+ err_str = str(error).lower()
372
+ rate_limit_patterns = [
373
+ "429",
374
+ "rate limit",
375
+ "rate_limit",
376
+ "too many requests",
377
+ "too many tokens",
378
+ "request limit",
379
+ "throttl",
380
+ ]
381
+ return any(pattern in err_str for pattern in rate_limit_patterns)
382
+
383
+
384
+ def _is_context_overflow_error(error: Exception) -> bool:
385
+ """Return True when the prompt exceeded the model's context window."""
386
+ if isinstance(error, ContextWindowExceededError):
387
+ return True
388
+
389
+ err_str = str(error).lower()
390
+ overflow_patterns = [
391
+ "context window exceeded",
392
+ "maximum context length",
393
+ "max context length",
394
+ "prompt is too long",
395
+ "context length exceeded",
396
+ "too many input tokens",
397
+ "input is too long",
398
+ ]
399
+ return any(pattern in err_str for pattern in overflow_patterns)
400
+
401
+
402
+ def _retry_delay_for(error: Exception, attempt_index: int) -> int | None:
403
+ """Return the delay for this retry attempt, or None if it should not retry."""
404
+ if _is_rate_limit_error(error):
405
+ schedule = _LLM_RATE_LIMIT_RETRY_DELAYS
406
+ elif _is_transient_error(error):
407
+ schedule = _LLM_RETRY_DELAYS
408
+ else:
409
+ return None
410
+
411
+ if attempt_index >= len(schedule):
412
+ return None
413
+ return schedule[attempt_index]
414
+
415
+
416
+ def _is_transient_error(error: Exception) -> bool:
417
+ """Return True for errors that are likely transient and worth retrying."""
418
+ err_str = str(error).lower()
419
+ transient_patterns = [
420
+ "timeout",
421
+ "timed out",
422
+ "503",
423
+ "service unavailable",
424
+ "502",
425
+ "bad gateway",
426
+ "500",
427
+ "internal server error",
428
+ "overloaded",
429
+ "capacity",
430
+ "connection reset",
431
+ "connection refused",
432
+ "connection error",
433
+ "eof",
434
+ "broken pipe",
435
+ ]
436
+ return _is_rate_limit_error(error) or any(
437
+ pattern in err_str for pattern in transient_patterns
438
+ )
439
+
440
+
441
+ def _is_effort_config_error(error: Exception) -> bool:
442
+ """Catch the two 400s the effort probe also handles — thinking
443
+ unsupported for this model, or the specific effort level invalid.
444
+
445
+ This is our safety net for the case where ``/effort`` was changed
446
+ mid-conversation (which clears the probe cache) and the new level
447
+ doesn't work for the current model. We heal the cache and retry once.
448
+ """
449
+ from agent.core.effort_probe import _is_invalid_effort, _is_thinking_unsupported
450
+
451
+ return _is_thinking_unsupported(error) or _is_invalid_effort(error)
452
+
453
+
454
+ async def _heal_effort_and_rebuild_params(
455
+ session: Session,
456
+ error: Exception,
457
+ llm_params: dict,
458
+ ) -> dict:
459
+ """Update the session's effort cache based on ``error`` and return new
460
+ llm_params. Called only when ``_is_effort_config_error(error)`` is True.
461
+
462
+ Two branches:
463
+ • thinking-unsupported → cache ``None`` for this model, next call
464
+ strips thinking entirely
465
+ • invalid-effort → re-run the full cascade probe; the result lands
466
+ in the cache
467
+ """
468
+ from agent.core.effort_probe import (
469
+ ProbeInconclusive,
470
+ _is_thinking_unsupported,
471
+ probe_effort,
472
+ )
473
+
474
+ model = session.config.model_name
475
+ if _is_thinking_unsupported(error):
476
+ session.model_effective_effort[model] = None
477
+ logger.info("healed: %s doesn't support thinking — stripped", model)
478
+ else:
479
+ try:
480
+ outcome = await probe_effort(
481
+ model,
482
+ session.config.reasoning_effort,
483
+ session.hf_token,
484
+ session=session,
485
+ )
486
+ session.model_effective_effort[model] = outcome.effective_effort
487
+ logger.info(
488
+ "healed: %s effort cascade → %s",
489
+ model,
490
+ outcome.effective_effort,
491
+ )
492
+ except ProbeInconclusive:
493
+ # Transient during healing — strip thinking for safety, next
494
+ # call will either succeed or surface the real error.
495
+ session.model_effective_effort[model] = None
496
+ logger.info("healed: %s probe inconclusive — stripped", model)
497
+
498
+ return _resolve_llm_params(
499
+ model,
500
+ session.hf_token,
501
+ reasoning_effort=session.effective_effort_for(model),
502
+ )
503
+
504
+
505
+ def _friendly_error_message(error: Exception) -> str | None:
506
+ """Return a user-friendly message for known error types, or None to fall back to traceback."""
507
+ err_str = str(error).lower()
508
+
509
+ if (
510
+ "authentication" in err_str
511
+ or "unauthorized" in err_str
512
+ or "invalid x-api-key" in err_str
513
+ ):
514
+ return (
515
+ "Authentication failed — your API key is missing or invalid.\n\n"
516
+ "To fix this, set the API key for your model provider:\n"
517
+ " • Anthropic: export ANTHROPIC_API_KEY=sk-...\n"
518
+ " • OpenAI: export OPENAI_API_KEY=sk-...\n"
519
+ " • HF Router: export HF_TOKEN=hf_...\n\n"
520
+ "You can also add it to a .env file in the project root.\n"
521
+ "To switch models, use the /model command."
522
+ )
523
+
524
+ if "insufficient" in err_str and "credit" in err_str:
525
+ return (
526
+ "Insufficient API credits. Please check your account balance "
527
+ "at your model provider's dashboard."
528
+ )
529
+
530
+ if "not supported by provider" in err_str or "no provider supports" in err_str:
531
+ return (
532
+ "The model isn't served by the provider you pinned.\n\n"
533
+ "Drop the ':<provider>' suffix to let the HF router auto-pick a "
534
+ "provider, or use '/model' (no arg) to see which providers host "
535
+ "which models."
536
+ )
537
+
538
+ if "model_not_found" in err_str or (
539
+ "model" in err_str and ("not found" in err_str or "does not exist" in err_str)
540
+ ):
541
+ return (
542
+ "Model not found. Use '/model' to list suggestions, or paste an "
543
+ "HF model id like 'MiniMaxAI/MiniMax-M2.7'. Availability is shown "
544
+ "when you switch."
545
+ )
546
+
547
+ return None
548
+
549
+
550
+ async def _compact_and_notify(session: Session) -> None:
551
+ """Run compaction and send event if context was reduced.
552
+
553
+ Catches ``CompactionFailedError`` and ends the session cleanly instead
554
+ of letting the caller retry. Pre-2026-05-04 the caller looped on
555
+ ContextWindowExceededError → compact → re-trigger, burning Bedrock
556
+ budget at ~$3/Opus retry while the session never reached the upload
557
+ path (so the cost was invisible in the dataset).
558
+ """
559
+ from agent.context_manager.manager import CompactionFailedError
560
+
561
+ cm = session.context_manager
562
+ old_usage = cm.running_context_usage
563
+ logger.debug(
564
+ "Compaction check: usage=%d, max=%d, threshold=%d, needs_compact=%s",
565
+ old_usage,
566
+ cm.model_max_tokens,
567
+ cm.compaction_threshold,
568
+ cm.needs_compaction,
569
+ )
570
+ try:
571
+ await cm.compact(
572
+ model_name=session.config.model_name,
573
+ tool_specs=session.tool_router.get_tool_specs_for_llm(),
574
+ hf_token=session.hf_token,
575
+ session=session,
576
+ )
577
+ except CompactionFailedError as e:
578
+ logger.error(
579
+ "Compaction failed for session %s: %s — terminating session",
580
+ session.session_id,
581
+ e,
582
+ )
583
+ # Persist the failure event so the dataset has a record of WHY this
584
+ # session ended (and the cost it incurred up to that point) even if
585
+ # save_and_upload_detached has issues downstream.
586
+ await session.send_event(
587
+ Event(
588
+ event_type="session_terminated",
589
+ data={
590
+ "reason": "compaction_failed",
591
+ "context_usage": cm.running_context_usage,
592
+ "context_threshold": cm.compaction_threshold,
593
+ "error": str(e)[:300],
594
+ "user_message": (
595
+ "Your conversation has grown too large to continue. "
596
+ "The work you've done is saved — start a new session to keep going."
597
+ ),
598
+ },
599
+ )
600
+ )
601
+ # Stop the agent loop; the finally in _run_session will fire
602
+ # cleanup_sandbox + save_trajectory so the dataset captures
603
+ # everything that did happen.
604
+ session.is_running = False
605
+ return
606
+
607
+ new_usage = cm.running_context_usage
608
+ if new_usage != old_usage:
609
+ logger.warning(
610
+ "Context compacted: %d -> %d tokens (max=%d, %d messages)",
611
+ old_usage,
612
+ new_usage,
613
+ cm.model_max_tokens,
614
+ len(cm.items),
615
+ )
616
+ await session.send_event(
617
+ Event(
618
+ event_type="compacted",
619
+ data={"old_tokens": old_usage, "new_tokens": new_usage},
620
+ )
621
+ )
622
+
623
+
624
+ async def _cleanup_on_cancel(session: Session) -> None:
625
+ """Kill sandbox processes and cancel HF jobs when the user interrupts."""
626
+ # Kill active sandbox processes
627
+ sandbox = getattr(session, "sandbox", None)
628
+ if sandbox:
629
+ try:
630
+ await asyncio.to_thread(sandbox.kill_all)
631
+ logger.info("Killed sandbox processes on cancel")
632
+ except Exception as e:
633
+ logger.warning("Failed to kill sandbox processes: %s", e)
634
+
635
+ # Cancel running HF jobs
636
+ job_ids = list(session._running_job_ids)
637
+ if job_ids:
638
+ from huggingface_hub import HfApi
639
+
640
+ api = HfApi(token=session.hf_token)
641
+ for job_id in job_ids:
642
+ try:
643
+ await asyncio.to_thread(api.cancel_job, job_id=job_id)
644
+ logger.info("Cancelled HF job %s on interrupt", job_id)
645
+ except Exception as e:
646
+ logger.warning("Failed to cancel HF job %s: %s", job_id, e)
647
+ session._running_job_ids.clear()
648
+
649
+
650
+ @dataclass
651
+ class LLMResult:
652
+ """Result from an LLM call (streaming or non-streaming)."""
653
+
654
+ content: str | None
655
+ tool_calls_acc: dict[int, dict]
656
+ token_count: int
657
+ finish_reason: str | None
658
+ usage: dict = field(default_factory=dict)
659
+ thinking_blocks: list[dict[str, Any]] | None = None
660
+ reasoning_content: str | None = None
661
+
662
+
663
+ def _extract_thinking_state(
664
+ message: Any,
665
+ ) -> tuple[list[dict[str, Any]] | None, str | None]:
666
+ """Return provider reasoning fields that must be replayed after tool calls."""
667
+ provider_fields = getattr(message, "provider_specific_fields", None)
668
+ if not isinstance(provider_fields, dict):
669
+ provider_fields = {}
670
+
671
+ thinking_blocks = (
672
+ getattr(message, "thinking_blocks", None)
673
+ or provider_fields.get("thinking_blocks")
674
+ or None
675
+ )
676
+ reasoning_content = (
677
+ getattr(message, "reasoning_content", None)
678
+ or provider_fields.get("reasoning_content")
679
+ or None
680
+ )
681
+ return thinking_blocks, reasoning_content
682
+
683
+
684
+ def _should_replay_thinking_state(model_name: str | None) -> bool:
685
+ """Only Anthropic's native adapter accepts replayed thinking metadata."""
686
+ return bool(model_name and model_name.startswith("anthropic/"))
687
+
688
+
689
+ def _is_invalid_thinking_signature_error(exc: Exception) -> bool:
690
+ """Return True when Anthropic rejected replayed extended-thinking state."""
691
+ text = str(exc)
692
+ return (
693
+ "Invalid `signature` in `thinking` block" in text
694
+ or "Invalid signature in thinking block" in text
695
+ )
696
+
697
+
698
+ def _strip_thinking_state_from_messages(messages: list[Any]) -> int:
699
+ """Remove replayed thinking metadata from assistant history messages."""
700
+ stripped = 0
701
+
702
+ for message in messages:
703
+ role = (
704
+ message.get("role")
705
+ if isinstance(message, dict)
706
+ else getattr(message, "role", None)
707
+ )
708
+ if role != "assistant":
709
+ continue
710
+
711
+ if isinstance(message, dict):
712
+ if message.pop("thinking_blocks", None) is not None:
713
+ stripped += 1
714
+ if message.pop("reasoning_content", None) is not None:
715
+ stripped += 1
716
+ provider_fields = message.get("provider_specific_fields")
717
+ content = message.get("content")
718
+ else:
719
+ if getattr(message, "thinking_blocks", None) is not None:
720
+ message.thinking_blocks = None
721
+ stripped += 1
722
+ if getattr(message, "reasoning_content", None) is not None:
723
+ message.reasoning_content = None
724
+ stripped += 1
725
+ provider_fields = getattr(message, "provider_specific_fields", None)
726
+ content = getattr(message, "content", None)
727
+
728
+ if isinstance(provider_fields, dict):
729
+ cleaned_fields = dict(provider_fields)
730
+ if cleaned_fields.pop("thinking_blocks", None) is not None:
731
+ stripped += 1
732
+ if cleaned_fields.pop("reasoning_content", None) is not None:
733
+ stripped += 1
734
+ if cleaned_fields != provider_fields:
735
+ if isinstance(message, dict):
736
+ message["provider_specific_fields"] = cleaned_fields
737
+ else:
738
+ message.provider_specific_fields = cleaned_fields
739
+
740
+ if isinstance(content, list):
741
+ cleaned_content = [
742
+ block
743
+ for block in content
744
+ if not (
745
+ isinstance(block, dict)
746
+ and block.get("type") in {"thinking", "redacted_thinking"}
747
+ )
748
+ ]
749
+ if len(cleaned_content) != len(content):
750
+ stripped += len(content) - len(cleaned_content)
751
+ if isinstance(message, dict):
752
+ message["content"] = cleaned_content
753
+ else:
754
+ message.content = cleaned_content
755
+
756
+ return stripped
757
+
758
+
759
+ async def _maybe_heal_invalid_thinking_signature(
760
+ session: Session,
761
+ messages: list[Any],
762
+ exc: Exception,
763
+ *,
764
+ already_healed: bool,
765
+ ) -> bool:
766
+ if already_healed or not _is_invalid_thinking_signature_error(exc):
767
+ return False
768
+
769
+ stripped = _strip_thinking_state_from_messages(messages)
770
+ if not stripped:
771
+ return False
772
+
773
+ await session.send_event(
774
+ Event(
775
+ event_type="tool_log",
776
+ data={
777
+ "tool": "system",
778
+ "log": (
779
+ "Anthropic rejected stale thinking signatures; retrying "
780
+ "without replayed thinking metadata."
781
+ ),
782
+ },
783
+ )
784
+ )
785
+ return True
786
+
787
+
788
+ def _assistant_message_from_result(
789
+ llm_result: LLMResult,
790
+ *,
791
+ model_name: str | None,
792
+ tool_calls: list[ToolCall] | None = None,
793
+ ) -> Message:
794
+ """Build an assistant history message without dropping reasoning state."""
795
+ kwargs: dict[str, Any] = {
796
+ "role": "assistant",
797
+ "content": llm_result.content,
798
+ }
799
+ if tool_calls is not None:
800
+ kwargs["tool_calls"] = tool_calls
801
+ if _should_replay_thinking_state(model_name):
802
+ if llm_result.thinking_blocks:
803
+ kwargs["thinking_blocks"] = llm_result.thinking_blocks
804
+ if llm_result.reasoning_content:
805
+ kwargs["reasoning_content"] = llm_result.reasoning_content
806
+ return Message(**kwargs)
807
+
808
+
809
+ async def _call_llm_streaming(
810
+ session: Session, messages, tools, llm_params
811
+ ) -> LLMResult:
812
+ """Call the LLM with streaming, emitting assistant_chunk events."""
813
+ response = None
814
+ _healed_effort = False # one-shot safety net per call
815
+ _healed_thinking_signature = False
816
+ messages, tools = with_prompt_caching(messages, tools, llm_params.get("model"))
817
+ t_start = time.monotonic()
818
+ for _llm_attempt in range(_MAX_LLM_RETRIES):
819
+ try:
820
+ response = await acompletion(
821
+ messages=messages,
822
+ tools=tools,
823
+ tool_choice="auto",
824
+ stream=True,
825
+ stream_options={"include_usage": True},
826
+ timeout=600,
827
+ **llm_params,
828
+ )
829
+ break
830
+ except ContextWindowExceededError:
831
+ raise
832
+ except Exception as e:
833
+ if _is_context_overflow_error(e):
834
+ raise ContextWindowExceededError(str(e)) from e
835
+ if not _healed_effort and _is_effort_config_error(e):
836
+ _healed_effort = True
837
+ llm_params = await _heal_effort_and_rebuild_params(
838
+ session, e, llm_params
839
+ )
840
+ await session.send_event(
841
+ Event(
842
+ event_type="tool_log",
843
+ data={
844
+ "tool": "system",
845
+ "log": "Reasoning effort not supported for this model — adjusting and retrying.",
846
+ },
847
+ )
848
+ )
849
+ continue
850
+ if await _maybe_heal_invalid_thinking_signature(
851
+ session,
852
+ messages,
853
+ e,
854
+ already_healed=_healed_thinking_signature,
855
+ ):
856
+ _healed_thinking_signature = True
857
+ continue
858
+ _delay = _retry_delay_for(e, _llm_attempt)
859
+ if _llm_attempt < _MAX_LLM_RETRIES - 1 and _delay is not None:
860
+ logger.warning(
861
+ "Transient LLM error (attempt %d/%d): %s — retrying in %ds",
862
+ _llm_attempt + 1,
863
+ _MAX_LLM_RETRIES,
864
+ e,
865
+ _delay,
866
+ )
867
+ await session.send_event(
868
+ Event(
869
+ event_type="tool_log",
870
+ data={
871
+ "tool": "system",
872
+ "log": f"LLM connection error, retrying in {_delay}s...",
873
+ },
874
+ )
875
+ )
876
+ await asyncio.sleep(_delay)
877
+ continue
878
+ raise
879
+
880
+ full_content = ""
881
+ tool_calls_acc: dict[int, dict] = {}
882
+ token_count = 0
883
+ finish_reason = None
884
+ final_usage_chunk = None
885
+ chunks = []
886
+ should_replay_thinking = _should_replay_thinking_state(llm_params.get("model"))
887
+
888
+ async for chunk in response:
889
+ chunks.append(chunk)
890
+ if session.is_cancelled:
891
+ tool_calls_acc.clear()
892
+ break
893
+
894
+ choice = chunk.choices[0] if chunk.choices else None
895
+ if not choice:
896
+ if hasattr(chunk, "usage") and chunk.usage:
897
+ token_count = chunk.usage.total_tokens
898
+ final_usage_chunk = chunk
899
+ continue
900
+
901
+ delta = choice.delta
902
+ if choice.finish_reason:
903
+ finish_reason = choice.finish_reason
904
+
905
+ if delta.content:
906
+ full_content += delta.content
907
+ await session.send_event(
908
+ Event(event_type="assistant_chunk", data={"content": delta.content})
909
+ )
910
+
911
+ if delta.tool_calls:
912
+ for tc_delta in delta.tool_calls:
913
+ idx = tc_delta.index
914
+ if idx not in tool_calls_acc:
915
+ tool_calls_acc[idx] = {
916
+ "id": "",
917
+ "type": "function",
918
+ "function": {"name": "", "arguments": ""},
919
+ }
920
+ if tc_delta.id:
921
+ tool_calls_acc[idx]["id"] = tc_delta.id
922
+ if tc_delta.function:
923
+ if tc_delta.function.name:
924
+ tool_calls_acc[idx]["function"]["name"] += (
925
+ tc_delta.function.name
926
+ )
927
+ if tc_delta.function.arguments:
928
+ tool_calls_acc[idx]["function"]["arguments"] += (
929
+ tc_delta.function.arguments
930
+ )
931
+
932
+ if hasattr(chunk, "usage") and chunk.usage:
933
+ token_count = chunk.usage.total_tokens
934
+ final_usage_chunk = chunk
935
+
936
+ usage = await telemetry.record_llm_call(
937
+ session,
938
+ model=llm_params.get("model", session.config.model_name),
939
+ response=final_usage_chunk,
940
+ latency_ms=int((time.monotonic() - t_start) * 1000),
941
+ finish_reason=finish_reason,
942
+ )
943
+ thinking_blocks = None
944
+ reasoning_content = None
945
+ if chunks and should_replay_thinking:
946
+ try:
947
+ rebuilt = stream_chunk_builder(chunks, messages=messages)
948
+ if rebuilt and getattr(rebuilt, "choices", None):
949
+ rebuilt_msg = rebuilt.choices[0].message
950
+ thinking_blocks, reasoning_content = _extract_thinking_state(
951
+ rebuilt_msg
952
+ )
953
+ except Exception:
954
+ logger.debug("Failed to rebuild streaming thinking state", exc_info=True)
955
+
956
+ return LLMResult(
957
+ content=full_content or None,
958
+ tool_calls_acc=tool_calls_acc,
959
+ token_count=token_count,
960
+ finish_reason=finish_reason,
961
+ usage=usage,
962
+ thinking_blocks=thinking_blocks,
963
+ reasoning_content=reasoning_content,
964
+ )
965
+
966
+
967
+ async def _call_llm_non_streaming(
968
+ session: Session, messages, tools, llm_params
969
+ ) -> LLMResult:
970
+ """Call the LLM without streaming, emit assistant_message at the end."""
971
+ response = None
972
+ _healed_effort = False
973
+ _healed_thinking_signature = False
974
+ messages, tools = with_prompt_caching(messages, tools, llm_params.get("model"))
975
+ t_start = time.monotonic()
976
+ for _llm_attempt in range(_MAX_LLM_RETRIES):
977
+ try:
978
+ response = await acompletion(
979
+ messages=messages,
980
+ tools=tools,
981
+ tool_choice="auto",
982
+ stream=False,
983
+ timeout=600,
984
+ **llm_params,
985
+ )
986
+ break
987
+ except ContextWindowExceededError:
988
+ raise
989
+ except Exception as e:
990
+ if _is_context_overflow_error(e):
991
+ raise ContextWindowExceededError(str(e)) from e
992
+ if not _healed_effort and _is_effort_config_error(e):
993
+ _healed_effort = True
994
+ llm_params = await _heal_effort_and_rebuild_params(
995
+ session, e, llm_params
996
+ )
997
+ await session.send_event(
998
+ Event(
999
+ event_type="tool_log",
1000
+ data={
1001
+ "tool": "system",
1002
+ "log": "Reasoning effort not supported for this model — adjusting and retrying.",
1003
+ },
1004
+ )
1005
+ )
1006
+ continue
1007
+ if await _maybe_heal_invalid_thinking_signature(
1008
+ session,
1009
+ messages,
1010
+ e,
1011
+ already_healed=_healed_thinking_signature,
1012
+ ):
1013
+ _healed_thinking_signature = True
1014
+ continue
1015
+ _delay = _retry_delay_for(e, _llm_attempt)
1016
+ if _llm_attempt < _MAX_LLM_RETRIES - 1 and _delay is not None:
1017
+ logger.warning(
1018
+ "Transient LLM error (attempt %d/%d): %s — retrying in %ds",
1019
+ _llm_attempt + 1,
1020
+ _MAX_LLM_RETRIES,
1021
+ e,
1022
+ _delay,
1023
+ )
1024
+ await session.send_event(
1025
+ Event(
1026
+ event_type="tool_log",
1027
+ data={
1028
+ "tool": "system",
1029
+ "log": f"LLM connection error, retrying in {_delay}s...",
1030
+ },
1031
+ )
1032
+ )
1033
+ await asyncio.sleep(_delay)
1034
+ continue
1035
+ raise
1036
+
1037
+ choice = response.choices[0]
1038
+ message = choice.message
1039
+ content = message.content or None
1040
+ finish_reason = choice.finish_reason
1041
+ token_count = response.usage.total_tokens if response.usage else 0
1042
+ thinking_blocks, reasoning_content = _extract_thinking_state(message)
1043
+
1044
+ # Build tool_calls_acc in the same format as streaming
1045
+ tool_calls_acc: dict[int, dict] = {}
1046
+ if message.tool_calls:
1047
+ for idx, tc in enumerate(message.tool_calls):
1048
+ tool_calls_acc[idx] = {
1049
+ "id": tc.id,
1050
+ "type": "function",
1051
+ "function": {
1052
+ "name": tc.function.name,
1053
+ "arguments": tc.function.arguments,
1054
+ },
1055
+ }
1056
+
1057
+ # Emit the full message as a single event
1058
+ if content:
1059
+ await session.send_event(
1060
+ Event(event_type="assistant_message", data={"content": content})
1061
+ )
1062
+
1063
+ usage = await telemetry.record_llm_call(
1064
+ session,
1065
+ model=llm_params.get("model", session.config.model_name),
1066
+ response=response,
1067
+ latency_ms=int((time.monotonic() - t_start) * 1000),
1068
+ finish_reason=finish_reason,
1069
+ )
1070
+
1071
+ return LLMResult(
1072
+ content=content,
1073
+ tool_calls_acc=tool_calls_acc,
1074
+ token_count=token_count,
1075
+ finish_reason=finish_reason,
1076
+ usage=usage,
1077
+ thinking_blocks=thinking_blocks,
1078
+ reasoning_content=reasoning_content,
1079
+ )
1080
+
1081
+
1082
+ class Handlers:
1083
+ """Handler functions for each operation type"""
1084
+
1085
+ @staticmethod
1086
+ async def _abandon_pending_approval(session: Session) -> None:
1087
+ """Cancel pending approval tools when the user continues the conversation.
1088
+
1089
+ Injects rejection tool-result messages into the LLM context (so the
1090
+ history stays valid) and notifies the frontend that those tools were
1091
+ abandoned.
1092
+ """
1093
+ tool_calls = session.pending_approval.get("tool_calls", [])
1094
+ for tc in tool_calls:
1095
+ tool_name = tc.function.name
1096
+ abandon_msg = (
1097
+ "Task abandoned — user continued the conversation without approving."
1098
+ )
1099
+
1100
+ # Keep LLM context valid: every tool_call needs a tool result
1101
+ tool_msg = Message(
1102
+ role="tool",
1103
+ content=abandon_msg,
1104
+ tool_call_id=tc.id,
1105
+ name=tool_name,
1106
+ )
1107
+ session.context_manager.add_message(tool_msg)
1108
+
1109
+ await session.send_event(
1110
+ Event(
1111
+ event_type="tool_state_change",
1112
+ data={
1113
+ "tool_call_id": tc.id,
1114
+ "tool": tool_name,
1115
+ "state": "abandoned",
1116
+ },
1117
+ )
1118
+ )
1119
+
1120
+ session.pending_approval = None
1121
+ logger.info("Abandoned %d pending approval tool(s)", len(tool_calls))
1122
+
1123
+ @staticmethod
1124
+ async def run_agent(
1125
+ session: Session,
1126
+ text: str,
1127
+ ) -> str | None:
1128
+ """
1129
+ Handle user input (like user_input_or_turn in codex.rs:1291)
1130
+ Returns the final assistant response content, if any.
1131
+ """
1132
+ # Clear any stale cancellation flag from a previous run
1133
+ session.reset_cancel()
1134
+
1135
+ # If there's a pending approval and the user sent a new message,
1136
+ # abandon the pending tools so the LLM context stays valid.
1137
+ if text and session.pending_approval:
1138
+ await Handlers._abandon_pending_approval(session)
1139
+
1140
+ # Add user message to history only if there's actual content
1141
+ if text:
1142
+ user_msg = Message(role="user", content=text)
1143
+ session.context_manager.add_message(user_msg)
1144
+
1145
+ # Send event that we're processing
1146
+ await session.send_event(
1147
+ Event(event_type="processing", data={"message": "Processing user input"})
1148
+ )
1149
+
1150
+ # Agentic loop - continue until model doesn't call tools or max iterations is reached
1151
+ iteration = 0
1152
+ final_response = None
1153
+ errored = False
1154
+ max_iterations = session.config.max_iterations
1155
+
1156
+ while max_iterations == -1 or iteration < max_iterations:
1157
+ # ── Cancellation check: before LLM call ──
1158
+ if session.is_cancelled:
1159
+ break
1160
+
1161
+ # Compact before calling the LLM if context is near the limit.
1162
+ # When _compact_and_notify catches CompactionFailedError it sets
1163
+ # session.is_running = False; we MUST exit the loop here, otherwise
1164
+ # the LLM call below fires with an over-threshold context, hits
1165
+ # ContextWindowExceededError, and we end up looping again on the
1166
+ # except path — exactly the bug this PR is supposed to fix.
1167
+ await _compact_and_notify(session)
1168
+ if not session.is_running:
1169
+ break
1170
+
1171
+ # Doom-loop detection: break out of repeated tool call patterns
1172
+ doom_prompt = check_for_doom_loop(session.context_manager.items)
1173
+ if doom_prompt:
1174
+ session.context_manager.add_message(
1175
+ Message(role="user", content=doom_prompt)
1176
+ )
1177
+
1178
+ malformed_tool = _detect_repeated_malformed(session.context_manager.items)
1179
+ if malformed_tool:
1180
+ recovery_prompt = (
1181
+ "[SYSTEM: Repeated malformed tool arguments detected for "
1182
+ f"'{malformed_tool}'. Stop retrying the same tool call shape. "
1183
+ "Use a different strategy that produces smaller, valid JSON. "
1184
+ "For large file writes, prefer bash with a heredoc or split the "
1185
+ "edit into multiple smaller tool calls.]"
1186
+ )
1187
+ session.context_manager.add_message(
1188
+ Message(role="user", content=recovery_prompt)
1189
+ )
1190
+ await session.send_event(
1191
+ Event(
1192
+ event_type="tool_log",
1193
+ data={
1194
+ "tool": "system",
1195
+ "log": (
1196
+ "Repeated malformed tool arguments detected — "
1197
+ f"forcing a different strategy for {malformed_tool}"
1198
+ ),
1199
+ },
1200
+ )
1201
+ )
1202
+
1203
+ messages = session.context_manager.get_messages()
1204
+ tools = session.tool_router.get_tool_specs_for_llm()
1205
+ try:
1206
+ # ── Call the LLM (streaming or non-streaming) ──
1207
+ # Pull the per-model probed effort from the session cache when
1208
+ # available; fall back to the raw preference for models we
1209
+ # haven't probed yet (e.g. research sub-model).
1210
+ llm_params = _resolve_llm_params(
1211
+ session.config.model_name,
1212
+ session.hf_token,
1213
+ reasoning_effort=session.effective_effort_for(
1214
+ session.config.model_name
1215
+ ),
1216
+ )
1217
+ if session.stream:
1218
+ llm_result = await _call_llm_streaming(
1219
+ session, messages, tools, llm_params
1220
+ )
1221
+ else:
1222
+ llm_result = await _call_llm_non_streaming(
1223
+ session, messages, tools, llm_params
1224
+ )
1225
+
1226
+ content = llm_result.content
1227
+ tool_calls_acc = llm_result.tool_calls_acc
1228
+ token_count = llm_result.token_count
1229
+ finish_reason = llm_result.finish_reason
1230
+
1231
+ # If output was truncated, all tool call args are garbage.
1232
+ # Inject a system hint so the LLM retries with smaller content.
1233
+ if finish_reason == "length" and tool_calls_acc:
1234
+ dropped_names = [
1235
+ tc["function"]["name"]
1236
+ for tc in tool_calls_acc.values()
1237
+ if tc["function"]["name"]
1238
+ ]
1239
+ logger.warning(
1240
+ "Output truncated (finish_reason=length) — dropping tool calls: %s",
1241
+ dropped_names,
1242
+ )
1243
+ tool_calls_acc.clear()
1244
+
1245
+ # Tell the agent what happened so it can retry differently
1246
+ truncation_hint = (
1247
+ "Your previous response was truncated because the output hit the "
1248
+ "token limit. The following tool calls were lost: "
1249
+ f"{dropped_names}. "
1250
+ "IMPORTANT: Do NOT retry with the same large content. Instead:\n"
1251
+ " • For 'write': use bash with cat<<'HEREDOC' to write the file, "
1252
+ "or split into several smaller edit calls.\n"
1253
+ " • For other tools: reduce the size of your arguments or use bash."
1254
+ )
1255
+ if content:
1256
+ assistant_msg = _assistant_message_from_result(
1257
+ llm_result,
1258
+ model_name=llm_params.get("model"),
1259
+ )
1260
+ session.context_manager.add_message(assistant_msg, token_count)
1261
+ session.context_manager.add_message(
1262
+ Message(role="user", content=f"[SYSTEM: {truncation_hint}]")
1263
+ )
1264
+ if session.stream:
1265
+ await session.send_event(
1266
+ Event(event_type="assistant_stream_end", data={})
1267
+ )
1268
+ await session.send_event(
1269
+ Event(
1270
+ event_type="tool_log",
1271
+ data={
1272
+ "tool": "system",
1273
+ "log": f"Output truncated — retrying with smaller content ({dropped_names})",
1274
+ },
1275
+ )
1276
+ )
1277
+ iteration += 1
1278
+ continue # retry this iteration
1279
+
1280
+ # Build tool_calls list from accumulated deltas
1281
+ tool_calls: list[ToolCall] = []
1282
+ for idx in sorted(tool_calls_acc.keys()):
1283
+ tc_data = tool_calls_acc[idx]
1284
+ tool_calls.append(
1285
+ ToolCall(
1286
+ id=tc_data["id"],
1287
+ type="function",
1288
+ function={
1289
+ "name": tc_data["function"]["name"],
1290
+ "arguments": tc_data["function"]["arguments"],
1291
+ },
1292
+ )
1293
+ )
1294
+
1295
+ # Signal end of streaming to the frontend
1296
+ if session.stream:
1297
+ await session.send_event(
1298
+ Event(event_type="assistant_stream_end", data={})
1299
+ )
1300
+
1301
+ # If no tool calls, add assistant message and we're done
1302
+ if not tool_calls:
1303
+ logger.debug(
1304
+ "Agent loop ending: no tool calls. "
1305
+ "finish_reason=%s, token_count=%d, "
1306
+ "usage=%d, model_max_tokens=%d, "
1307
+ "iteration=%d/%d, "
1308
+ "response_text=%s",
1309
+ finish_reason,
1310
+ token_count,
1311
+ session.context_manager.running_context_usage,
1312
+ session.context_manager.model_max_tokens,
1313
+ iteration,
1314
+ max_iterations,
1315
+ (content or "")[:500],
1316
+ )
1317
+ if content:
1318
+ assistant_msg = _assistant_message_from_result(
1319
+ llm_result,
1320
+ model_name=llm_params.get("model"),
1321
+ )
1322
+ session.context_manager.add_message(assistant_msg, token_count)
1323
+ final_response = content
1324
+ break
1325
+
1326
+ # Validate tool call args (one json.loads per call, once)
1327
+ # and split into good vs bad
1328
+ good_tools: list[tuple[ToolCall, str, dict]] = []
1329
+ bad_tools: list[ToolCall] = []
1330
+ for tc in tool_calls:
1331
+ try:
1332
+ args = json.loads(tc.function.arguments)
1333
+ good_tools.append((tc, tc.function.name, args))
1334
+ except (json.JSONDecodeError, TypeError, ValueError):
1335
+ logger.warning(
1336
+ "Malformed arguments for tool_call %s (%s) — skipping",
1337
+ tc.id,
1338
+ tc.function.name,
1339
+ )
1340
+ tc.function.arguments = "{}"
1341
+ bad_tools.append(tc)
1342
+
1343
+ # Add assistant message with all tool calls to context
1344
+ assistant_msg = _assistant_message_from_result(
1345
+ llm_result,
1346
+ model_name=llm_params.get("model"),
1347
+ tool_calls=tool_calls,
1348
+ )
1349
+ session.context_manager.add_message(assistant_msg, token_count)
1350
+
1351
+ # Add error results for bad tool calls so the LLM
1352
+ # knows what happened and can retry differently
1353
+ for tc in bad_tools:
1354
+ error_msg = (
1355
+ f"ERROR: Tool call to '{tc.function.name}' had malformed JSON "
1356
+ f"arguments and was NOT executed. Retry with smaller content — "
1357
+ f"for 'write', split into multiple smaller writes using 'edit'."
1358
+ )
1359
+ session.context_manager.add_message(
1360
+ Message(
1361
+ role="tool",
1362
+ content=error_msg,
1363
+ tool_call_id=tc.id,
1364
+ name=tc.function.name,
1365
+ )
1366
+ )
1367
+ await session.send_event(
1368
+ Event(
1369
+ event_type="tool_call",
1370
+ data={
1371
+ "tool": tc.function.name,
1372
+ "arguments": {},
1373
+ "tool_call_id": tc.id,
1374
+ },
1375
+ )
1376
+ )
1377
+ await session.send_event(
1378
+ Event(
1379
+ event_type="tool_output",
1380
+ data={
1381
+ "tool": tc.function.name,
1382
+ "tool_call_id": tc.id,
1383
+ "output": error_msg,
1384
+ "success": False,
1385
+ },
1386
+ )
1387
+ )
1388
+
1389
+ # ── Cancellation check: before tool execution ──
1390
+ if session.is_cancelled:
1391
+ break
1392
+
1393
+ # Separate good tools into approval-required vs auto-execute.
1394
+ # Track reserved spend while classifying a batch so two
1395
+ # auto-approved jobs in one model response cannot jointly
1396
+ # exceed the remaining session cap.
1397
+ approval_required_tools: list[
1398
+ tuple[ToolCall, str, dict, ApprovalDecision]
1399
+ ] = []
1400
+ non_approval_tools: list[
1401
+ tuple[ToolCall, str, dict, ApprovalDecision]
1402
+ ] = []
1403
+ reserved_auto_spend_usd = 0.0
1404
+ for tc, tool_name, tool_args in good_tools:
1405
+ decision = await _approval_decision(
1406
+ tool_name,
1407
+ tool_args,
1408
+ session,
1409
+ reserved_spend_usd=reserved_auto_spend_usd,
1410
+ )
1411
+ if decision.requires_approval:
1412
+ approval_required_tools.append(
1413
+ (tc, tool_name, tool_args, decision)
1414
+ )
1415
+ else:
1416
+ non_approval_tools.append((tc, tool_name, tool_args, decision))
1417
+ if (
1418
+ decision.auto_approved
1419
+ and decision.billable
1420
+ and decision.estimated_cost_usd is not None
1421
+ ):
1422
+ reserved_auto_spend_usd += decision.estimated_cost_usd
1423
+
1424
+ # Execute non-approval tools (in parallel when possible)
1425
+ if non_approval_tools:
1426
+ # 1. Validate args upfront
1427
+ parsed_tools: list[
1428
+ tuple[ToolCall, str, dict, ApprovalDecision, bool, str]
1429
+ ] = []
1430
+ for tc, tool_name, tool_args, decision in non_approval_tools:
1431
+ args_valid, error_msg = _validate_tool_args(tool_args)
1432
+ parsed_tools.append(
1433
+ (tc, tool_name, tool_args, decision, args_valid, error_msg)
1434
+ )
1435
+
1436
+ # 2. Send all tool_call events upfront (so frontend shows them all)
1437
+ for (
1438
+ tc,
1439
+ tool_name,
1440
+ tool_args,
1441
+ _decision,
1442
+ args_valid,
1443
+ _,
1444
+ ) in parsed_tools:
1445
+ if args_valid:
1446
+ await session.send_event(
1447
+ Event(
1448
+ event_type="tool_call",
1449
+ data={
1450
+ "tool": tool_name,
1451
+ "arguments": tool_args,
1452
+ "tool_call_id": tc.id,
1453
+ },
1454
+ )
1455
+ )
1456
+
1457
+ # 3. Execute all valid tools in parallel, cancellable
1458
+ async def _exec_tool(
1459
+ tc: ToolCall,
1460
+ name: str,
1461
+ args: dict,
1462
+ decision: ApprovalDecision,
1463
+ valid: bool,
1464
+ err: str,
1465
+ ) -> tuple[ToolCall, str, dict, str, bool]:
1466
+ if not valid:
1467
+ return (tc, name, args, err, False)
1468
+ if decision.billable:
1469
+ _record_estimated_spend(session, decision)
1470
+ out, ok = await session.tool_router.call_tool(
1471
+ name, args, session=session, tool_call_id=tc.id
1472
+ )
1473
+ return (tc, name, args, out, ok)
1474
+
1475
+ gather_task = asyncio.ensure_future(
1476
+ asyncio.gather(
1477
+ *[
1478
+ _exec_tool(tc, name, args, decision, valid, err)
1479
+ for tc, name, args, decision, valid, err in parsed_tools
1480
+ ]
1481
+ )
1482
+ )
1483
+ cancel_task = asyncio.ensure_future(session._cancelled.wait())
1484
+
1485
+ done, _ = await asyncio.wait(
1486
+ [gather_task, cancel_task],
1487
+ return_when=asyncio.FIRST_COMPLETED,
1488
+ )
1489
+
1490
+ if cancel_task in done:
1491
+ gather_task.cancel()
1492
+ try:
1493
+ await gather_task
1494
+ except asyncio.CancelledError:
1495
+ pass
1496
+ # Notify frontend that in-flight tools were cancelled
1497
+ for tc, name, _args, _decision, valid, _ in parsed_tools:
1498
+ if valid:
1499
+ await session.send_event(
1500
+ Event(
1501
+ event_type="tool_state_change",
1502
+ data={
1503
+ "tool_call_id": tc.id,
1504
+ "tool": name,
1505
+ "state": "cancelled",
1506
+ },
1507
+ )
1508
+ )
1509
+ await _cleanup_on_cancel(session)
1510
+ break
1511
+
1512
+ cancel_task.cancel()
1513
+ results = gather_task.result()
1514
+
1515
+ # 4. Record results and send outputs (order preserved)
1516
+ for tc, tool_name, tool_args, output, success in results:
1517
+ tool_msg = Message(
1518
+ role="tool",
1519
+ content=output,
1520
+ tool_call_id=tc.id,
1521
+ name=tool_name,
1522
+ )
1523
+ session.context_manager.add_message(tool_msg)
1524
+
1525
+ await session.send_event(
1526
+ Event(
1527
+ event_type="tool_output",
1528
+ data={
1529
+ "tool": tool_name,
1530
+ "tool_call_id": tc.id,
1531
+ "output": output,
1532
+ "success": success,
1533
+ },
1534
+ )
1535
+ )
1536
+
1537
+ # If there are tools requiring approval, ask for batch approval
1538
+ if approval_required_tools:
1539
+ # Prepare batch approval data
1540
+ tools_data = []
1541
+ blocked_payloads = []
1542
+ for tc, tool_name, tool_args, decision in approval_required_tools:
1543
+ # Resolve sandbox file paths for hf_jobs scripts so the
1544
+ # frontend can display & edit the actual file content.
1545
+ if tool_name == "hf_jobs" and isinstance(
1546
+ tool_args.get("script"), str
1547
+ ):
1548
+ from agent.tools.sandbox_tool import resolve_sandbox_script
1549
+
1550
+ sandbox = getattr(session, "sandbox", None)
1551
+ resolved, _ = await resolve_sandbox_script(
1552
+ sandbox, tool_args["script"]
1553
+ )
1554
+ if resolved:
1555
+ tool_args = {**tool_args, "script": resolved}
1556
+
1557
+ tool_payload = {
1558
+ "tool": tool_name,
1559
+ "arguments": tool_args,
1560
+ "tool_call_id": tc.id,
1561
+ }
1562
+ if decision.auto_approval_blocked:
1563
+ tool_payload.update(
1564
+ {
1565
+ "auto_approval_blocked": True,
1566
+ "block_reason": decision.block_reason,
1567
+ "estimated_cost_usd": decision.estimated_cost_usd,
1568
+ "remaining_cap_usd": decision.remaining_cap_usd,
1569
+ }
1570
+ )
1571
+ blocked_payloads.append(tool_payload)
1572
+ tools_data.append(tool_payload)
1573
+
1574
+ event_data = {"tools": tools_data, "count": len(tools_data)}
1575
+ if blocked_payloads:
1576
+ first = blocked_payloads[0]
1577
+ event_data.update(
1578
+ {
1579
+ "auto_approval_blocked": True,
1580
+ "block_reason": first.get("block_reason"),
1581
+ "estimated_cost_usd": first.get("estimated_cost_usd"),
1582
+ "remaining_cap_usd": first.get("remaining_cap_usd"),
1583
+ }
1584
+ )
1585
+ await session.send_event(
1586
+ Event(
1587
+ event_type="approval_required",
1588
+ data=event_data,
1589
+ )
1590
+ )
1591
+
1592
+ # Store all approval-requiring tools (ToolCall objects for execution)
1593
+ session.pending_approval = {
1594
+ "tool_calls": [tc for tc, _, _, _ in approval_required_tools],
1595
+ }
1596
+
1597
+ # Return early - wait for EXEC_APPROVAL operation
1598
+ return None
1599
+
1600
+ iteration += 1
1601
+
1602
+ except ContextWindowExceededError:
1603
+ # Force compact and retry this iteration.
1604
+ cm = session.context_manager
1605
+ logger.warning(
1606
+ "ContextWindowExceededError at iteration %d — forcing compaction "
1607
+ "(usage=%d, model_max_tokens=%d, messages=%d)",
1608
+ iteration,
1609
+ cm.running_context_usage,
1610
+ cm.model_max_tokens,
1611
+ len(cm.items),
1612
+ )
1613
+ cm.running_context_usage = cm.model_max_tokens + 1
1614
+ await _compact_and_notify(session)
1615
+ # Same guard as the top of the loop: if compaction couldn't
1616
+ # bring us under threshold, _compact_and_notify has already
1617
+ # emitted session_terminated and set is_running=False. Continue
1618
+ # would just re-call the LLM with the same too-big context.
1619
+ if not session.is_running:
1620
+ break
1621
+ continue
1622
+
1623
+ except Exception as e:
1624
+ import traceback
1625
+
1626
+ error_msg = _friendly_error_message(e)
1627
+ if error_msg is None:
1628
+ error_msg = str(e) + "\n" + traceback.format_exc()
1629
+
1630
+ await session.send_event(
1631
+ Event(
1632
+ event_type="error",
1633
+ data={"error": error_msg},
1634
+ )
1635
+ )
1636
+ errored = True
1637
+ break
1638
+
1639
+ if session.is_cancelled:
1640
+ await _cleanup_on_cancel(session)
1641
+ await session.send_event(Event(event_type="interrupted"))
1642
+ elif not errored:
1643
+ await session.send_event(
1644
+ Event(
1645
+ event_type="turn_complete",
1646
+ data={
1647
+ "history_size": len(session.context_manager.items),
1648
+ "final_response": final_response
1649
+ if isinstance(final_response, str)
1650
+ else None,
1651
+ },
1652
+ )
1653
+ )
1654
+
1655
+ # Increment turn counter and check for auto-save
1656
+ session.increment_turn()
1657
+ await session.auto_save_if_needed()
1658
+
1659
+ return final_response
1660
+
1661
+ @staticmethod
1662
+ async def undo(session: Session) -> None:
1663
+ """Remove the last complete turn and notify the frontend."""
1664
+ removed = session.context_manager.undo_last_turn()
1665
+ if not removed:
1666
+ logger.warning("Undo: no user message found to remove")
1667
+ await session.send_event(Event(event_type="undo_complete"))
1668
+
1669
+ @staticmethod
1670
+ async def exec_approval(session: Session, approvals: list[dict]) -> None:
1671
+ """Handle batch job execution approval"""
1672
+ if not session.pending_approval:
1673
+ await session.send_event(
1674
+ Event(
1675
+ event_type="error",
1676
+ data={"error": "No pending approval to process"},
1677
+ )
1678
+ )
1679
+ return
1680
+
1681
+ tool_calls = session.pending_approval.get("tool_calls", [])
1682
+ if not tool_calls:
1683
+ await session.send_event(
1684
+ Event(
1685
+ event_type="error",
1686
+ data={"error": "No pending tool calls found"},
1687
+ )
1688
+ )
1689
+ return
1690
+
1691
+ # Create a map of tool_call_id -> approval decision
1692
+ approval_map = {a["tool_call_id"]: a for a in approvals}
1693
+ for a in approvals:
1694
+ if a.get("edited_script"):
1695
+ logger.info(
1696
+ f"Received edited script for tool_call {a['tool_call_id']} ({len(a['edited_script'])} chars)"
1697
+ )
1698
+
1699
+ # Separate approved and rejected tool calls
1700
+ approved_tasks = []
1701
+ rejected_tasks = []
1702
+
1703
+ for tc in tool_calls:
1704
+ tool_name = tc.function.name
1705
+ try:
1706
+ tool_args = json.loads(tc.function.arguments)
1707
+ except (json.JSONDecodeError, TypeError) as e:
1708
+ # Malformed arguments — treat as failed, notify agent
1709
+ logger.warning(f"Malformed tool arguments for {tool_name}: {e}")
1710
+ tool_msg = Message(
1711
+ role="tool",
1712
+ content=f"Malformed arguments: {e}",
1713
+ tool_call_id=tc.id,
1714
+ name=tool_name,
1715
+ )
1716
+ session.context_manager.add_message(tool_msg)
1717
+ await session.send_event(
1718
+ Event(
1719
+ event_type="tool_output",
1720
+ data={
1721
+ "tool": tool_name,
1722
+ "tool_call_id": tc.id,
1723
+ "output": f"Malformed arguments: {e}",
1724
+ "success": False,
1725
+ },
1726
+ )
1727
+ )
1728
+ continue
1729
+
1730
+ approval_decision = approval_map.get(tc.id, {"approved": False})
1731
+
1732
+ if approval_decision.get("approved", False):
1733
+ edited_script = approval_decision.get("edited_script")
1734
+ was_edited = False
1735
+ if edited_script and "script" in tool_args:
1736
+ tool_args["script"] = edited_script
1737
+ was_edited = True
1738
+ logger.info(f"Using user-edited script for {tool_name} ({tc.id})")
1739
+ selected_namespace = approval_decision.get("namespace")
1740
+ if selected_namespace and tool_name == "hf_jobs":
1741
+ tool_args["namespace"] = selected_namespace
1742
+ approved_tasks.append((tc, tool_name, tool_args, was_edited))
1743
+ else:
1744
+ rejected_tasks.append((tc, tool_name, approval_decision))
1745
+
1746
+ # Clear pending approval immediately so a page refresh during
1747
+ # execution won't re-show the approval dialog.
1748
+ session.pending_approval = None
1749
+
1750
+ # Notify frontend of approval decisions immediately (before execution)
1751
+ for tc, tool_name, tool_args, _was_edited in approved_tasks:
1752
+ await session.send_event(
1753
+ Event(
1754
+ event_type="tool_state_change",
1755
+ data={
1756
+ "tool_call_id": tc.id,
1757
+ "tool": tool_name,
1758
+ "state": "approved",
1759
+ },
1760
+ )
1761
+ )
1762
+ for tc, tool_name, approval_decision in rejected_tasks:
1763
+ await session.send_event(
1764
+ Event(
1765
+ event_type="tool_state_change",
1766
+ data={
1767
+ "tool_call_id": tc.id,
1768
+ "tool": tool_name,
1769
+ "state": "rejected",
1770
+ },
1771
+ )
1772
+ )
1773
+
1774
+ # Execute all approved tools concurrently
1775
+ async def execute_tool(tc, tool_name, tool_args, was_edited):
1776
+ """Execute a single tool and return its result.
1777
+
1778
+ The TraceLog already exists on the frontend (created by
1779
+ approval_required), so we send tool_state_change instead of
1780
+ tool_call to avoid creating a duplicate.
1781
+ """
1782
+ await session.send_event(
1783
+ Event(
1784
+ event_type="tool_state_change",
1785
+ data={
1786
+ "tool_call_id": tc.id,
1787
+ "tool": tool_name,
1788
+ "state": "running",
1789
+ },
1790
+ )
1791
+ )
1792
+
1793
+ await _record_manual_approved_spend_if_needed(session, tool_name, tool_args)
1794
+
1795
+ output, success = await session.tool_router.call_tool(
1796
+ tool_name, tool_args, session=session, tool_call_id=tc.id
1797
+ )
1798
+
1799
+ return (tc, tool_name, output, success, was_edited)
1800
+
1801
+ # Execute all approved tools concurrently (cancellable)
1802
+ if approved_tasks:
1803
+ gather_task = asyncio.ensure_future(
1804
+ asyncio.gather(
1805
+ *[
1806
+ execute_tool(tc, tool_name, tool_args, was_edited)
1807
+ for tc, tool_name, tool_args, was_edited in approved_tasks
1808
+ ],
1809
+ return_exceptions=True,
1810
+ )
1811
+ )
1812
+ cancel_task = asyncio.ensure_future(session._cancelled.wait())
1813
+
1814
+ done, _ = await asyncio.wait(
1815
+ [gather_task, cancel_task],
1816
+ return_when=asyncio.FIRST_COMPLETED,
1817
+ )
1818
+
1819
+ if cancel_task in done:
1820
+ gather_task.cancel()
1821
+ try:
1822
+ await gather_task
1823
+ except asyncio.CancelledError:
1824
+ pass
1825
+ # Notify frontend that approved tools were cancelled
1826
+ for tc, tool_name, _args, _was_edited in approved_tasks:
1827
+ await session.send_event(
1828
+ Event(
1829
+ event_type="tool_state_change",
1830
+ data={
1831
+ "tool_call_id": tc.id,
1832
+ "tool": tool_name,
1833
+ "state": "cancelled",
1834
+ },
1835
+ )
1836
+ )
1837
+ await _cleanup_on_cancel(session)
1838
+ await session.send_event(Event(event_type="interrupted"))
1839
+ session.increment_turn()
1840
+ await session.auto_save_if_needed()
1841
+ return
1842
+
1843
+ cancel_task.cancel()
1844
+ results = gather_task.result()
1845
+
1846
+ # Process results and add to context
1847
+ for result in results:
1848
+ if isinstance(result, Exception):
1849
+ # Handle execution error
1850
+ logger.error(f"Tool execution error: {result}")
1851
+ continue
1852
+
1853
+ tc, tool_name, output, success, was_edited = result
1854
+
1855
+ if was_edited:
1856
+ output = f"[Note: The user edited the script before execution. The output below reflects the user-modified version, not your original script.]\n\n{output}"
1857
+
1858
+ # Add tool result to context
1859
+ tool_msg = Message(
1860
+ role="tool",
1861
+ content=output,
1862
+ tool_call_id=tc.id,
1863
+ name=tool_name,
1864
+ )
1865
+ session.context_manager.add_message(tool_msg)
1866
+
1867
+ await session.send_event(
1868
+ Event(
1869
+ event_type="tool_output",
1870
+ data={
1871
+ "tool": tool_name,
1872
+ "tool_call_id": tc.id,
1873
+ "output": output,
1874
+ "success": success,
1875
+ },
1876
+ )
1877
+ )
1878
+
1879
+ # Process rejected tools
1880
+ for tc, tool_name, approval_decision in rejected_tasks:
1881
+ rejection_msg = "Job execution cancelled by user"
1882
+ user_feedback = approval_decision.get("feedback")
1883
+ if user_feedback:
1884
+ # Ensure feedback is a string and sanitize any problematic characters
1885
+ feedback_str = str(user_feedback).strip()
1886
+ # Remove any control characters that might break JSON parsing
1887
+ feedback_str = "".join(
1888
+ char for char in feedback_str if ord(char) >= 32 or char in "\n\t"
1889
+ )
1890
+ rejection_msg += f". User feedback: {feedback_str}"
1891
+
1892
+ # Ensure rejection_msg is a clean string
1893
+ rejection_msg = str(rejection_msg).strip()
1894
+
1895
+ tool_msg = Message(
1896
+ role="tool",
1897
+ content=rejection_msg,
1898
+ tool_call_id=tc.id,
1899
+ name=tool_name,
1900
+ )
1901
+ session.context_manager.add_message(tool_msg)
1902
+
1903
+ await session.send_event(
1904
+ Event(
1905
+ event_type="tool_output",
1906
+ data={
1907
+ "tool": tool_name,
1908
+ "tool_call_id": tc.id,
1909
+ "output": rejection_msg,
1910
+ "success": False,
1911
+ },
1912
+ )
1913
+ )
1914
+
1915
+ # Continue agent loop with empty input to process the tool results
1916
+ await Handlers.run_agent(session, "")
1917
+
1918
+ @staticmethod
1919
+ async def shutdown(session: Session) -> bool:
1920
+ """Handle shutdown (like shutdown in codex.rs:1329)"""
1921
+ # Save session trajectory if enabled (fire-and-forget, returns immediately)
1922
+ if session.config.save_sessions:
1923
+ logger.info("Saving session...")
1924
+ repo_id = session.config.session_dataset_repo
1925
+ _ = session.save_and_upload_detached(repo_id)
1926
+
1927
+ session.is_running = False
1928
+ await session.send_event(Event(event_type="shutdown"))
1929
+ return True
1930
+
1931
+
1932
+ async def process_submission(session: Session, submission) -> bool:
1933
+ """
1934
+ Process a single submission and return whether to continue running.
1935
+
1936
+ Returns:
1937
+ bool: True to continue, False to shutdown
1938
+ """
1939
+ op = submission.operation
1940
+ logger.debug("Received operation: %s", op.op_type.value)
1941
+
1942
+ if op.op_type == OpType.USER_INPUT:
1943
+ text = op.data.get("text", "") if op.data else ""
1944
+ await Handlers.run_agent(session, text)
1945
+ return True
1946
+
1947
+ if op.op_type == OpType.COMPACT:
1948
+ await _compact_and_notify(session)
1949
+ return True
1950
+
1951
+ if op.op_type == OpType.UNDO:
1952
+ await Handlers.undo(session)
1953
+ return True
1954
+
1955
+ if op.op_type == OpType.EXEC_APPROVAL:
1956
+ approvals = op.data.get("approvals", []) if op.data else []
1957
+ await Handlers.exec_approval(session, approvals)
1958
+ return True
1959
+
1960
+ if op.op_type == OpType.SHUTDOWN:
1961
+ return not await Handlers.shutdown(session)
1962
+
1963
+ logger.warning(f"Unknown operation: {op.op_type}")
1964
+ return True
1965
+
1966
+
1967
+ async def submission_loop(
1968
+ submission_queue: asyncio.Queue,
1969
+ event_queue: asyncio.Queue,
1970
+ config: Config,
1971
+ tool_router: ToolRouter | None = None,
1972
+ session_holder: list | None = None,
1973
+ hf_token: str | None = None,
1974
+ user_id: str | None = None,
1975
+ local_mode: bool = False,
1976
+ stream: bool = True,
1977
+ notification_gateway: NotificationGateway | None = None,
1978
+ notification_destinations: list[str] | None = None,
1979
+ defer_turn_complete_notification: bool = False,
1980
+ ) -> None:
1981
+ """
1982
+ Main agent loop - processes submissions and dispatches to handlers.
1983
+ This is the core of the agent (like submission_loop in codex.rs:1259-1340)
1984
+ """
1985
+
1986
+ # Create session with tool router
1987
+ session = Session(
1988
+ event_queue,
1989
+ config=config,
1990
+ tool_router=tool_router,
1991
+ hf_token=hf_token,
1992
+ user_id=user_id,
1993
+ local_mode=local_mode,
1994
+ stream=stream,
1995
+ notification_gateway=notification_gateway,
1996
+ notification_destinations=notification_destinations,
1997
+ defer_turn_complete_notification=defer_turn_complete_notification,
1998
+ )
1999
+ if session_holder is not None:
2000
+ session_holder[0] = session
2001
+ logger.info("Agent loop started")
2002
+
2003
+ # Retry any failed uploads from previous sessions (fire-and-forget).
2004
+ # Includes the personal trace repo when enabled so a session that failed
2005
+ # to publish to the user's HF dataset gets a fresh attempt on next run.
2006
+ if config and config.save_sessions:
2007
+ Session.retry_failed_uploads_detached(
2008
+ directory="session_logs",
2009
+ repo_id=config.session_dataset_repo,
2010
+ personal_repo_id=session._personal_trace_repo_id(),
2011
+ )
2012
+
2013
+ try:
2014
+ # Main processing loop
2015
+ async with tool_router:
2016
+ # Emit ready event after initialization
2017
+ await session.send_event(
2018
+ Event(
2019
+ event_type="ready",
2020
+ data={
2021
+ "message": "Agent initialized",
2022
+ "tool_count": len(tool_router.tools),
2023
+ },
2024
+ )
2025
+ )
2026
+
2027
+ while session.is_running:
2028
+ submission = await submission_queue.get()
2029
+
2030
+ try:
2031
+ should_continue = await process_submission(session, submission)
2032
+ if not should_continue:
2033
+ break
2034
+ except asyncio.CancelledError:
2035
+ logger.warning("Agent loop cancelled")
2036
+ break
2037
+ except Exception as e:
2038
+ logger.error(f"Error in agent loop: {e}")
2039
+ await session.send_event(
2040
+ Event(event_type="error", data={"error": str(e)})
2041
+ )
2042
+
2043
+ logger.info("Agent loop exited")
2044
+
2045
+ finally:
2046
+ # Emergency save if session saving is enabled and shutdown wasn't called properly
2047
+ if session.config.save_sessions and session.is_running:
2048
+ logger.info("Emergency save: preserving session before exit...")
2049
+ try:
2050
+ local_path = session.save_and_upload_detached(
2051
+ session.config.session_dataset_repo
2052
+ )
2053
+ if local_path:
2054
+ logger.info("Emergency save successful, upload in progress")
2055
+ except Exception as e:
2056
+ logger.error(f"Emergency save failed: {e}")