akseljoonas HF Staff commited on
Commit
b94b18b
·
1 Parent(s): 100eb8d

Revert "feat: stream sandbox bash output to frontend in real-time"

Browse files

This reverts commit f97b6ec34f93edfa05992637e0ff1e3ec7a79f5a.

agent/tools/sandbox_client.py CHANGED
@@ -37,12 +37,11 @@ Tools: bash, read, write, edit, upload
37
  from __future__ import annotations
38
 
39
  import io
40
- import json
41
  import sys
42
  import time
43
  import uuid
44
  from dataclasses import dataclass, field
45
- from typing import Any, Callable, Generator
46
 
47
  import httpx
48
  from huggingface_hub import CommitOperationAdd, HfApi
@@ -98,9 +97,8 @@ CMD ["python", "sandbox_server.py"]
98
 
99
  _SANDBOX_SERVER = '''\
100
  """Minimal FastAPI server for sandbox operations."""
101
- import os, subprocess, pathlib, asyncio, json
102
  from fastapi import FastAPI
103
- from fastapi.responses import StreamingResponse
104
  from pydantic import BaseModel
105
  from typing import Optional
106
  import uvicorn
@@ -150,41 +148,6 @@ def bash(req: BashReq):
150
  except Exception as e:
151
  return {"success": False, "output": "", "error": str(e)}
152
 
153
- @app.post("/api/bash/stream")
154
- async def bash_stream(req: BashReq):
155
- """Stream bash output line-by-line as NDJSON."""
156
- async def generate():
157
- try:
158
- proc = await asyncio.create_subprocess_shell(
159
- req.command,
160
- stdout=asyncio.subprocess.PIPE,
161
- stderr=asyncio.subprocess.STDOUT,
162
- cwd=req.work_dir,
163
- )
164
- total_chars = 0
165
- try:
166
- line_bytes = await asyncio.wait_for(proc.stdout.readline(), timeout=req.timeout)
167
- while line_bytes:
168
- line = line_bytes.decode(errors="replace")
169
- total_chars += len(line)
170
- if total_chars <= 30000:
171
- yield json.dumps({"type": "output", "data": line}) + "\\n"
172
- elif total_chars - len(line) <= 30000:
173
- yield json.dumps({"type": "output", "data": "... (truncated)\\n"}) + "\\n"
174
- line_bytes = await asyncio.wait_for(proc.stdout.readline(), timeout=req.timeout)
175
- except asyncio.TimeoutError:
176
- try:
177
- proc.kill()
178
- except ProcessLookupError:
179
- pass
180
- yield json.dumps({"type": "error", "data": f"Timeout after {req.timeout}s"}) + "\\n"
181
- return
182
- rc = await proc.wait()
183
- yield json.dumps({"type": "exit", "code": rc}) + "\\n"
184
- except Exception as e:
185
- yield json.dumps({"type": "error", "data": str(e)}) + "\\n"
186
- return StreamingResponse(generate(), media_type="application/x-ndjson")
187
-
188
  @app.post("/api/read")
189
  def read(req: ReadReq):
190
  try:
@@ -529,58 +492,6 @@ class Sandbox:
529
  timeout=timeout,
530
  )
531
 
532
- def bash_stream(
533
- self,
534
- command: str,
535
- *,
536
- work_dir: str | None = None,
537
- timeout: int | None = None,
538
- ) -> Generator[tuple[str, str | int], None, None]:
539
- """Stream bash output line-by-line from the sandbox.
540
-
541
- Yields:
542
- (event_type, data) tuples:
543
- - ("output", line_text)
544
- - ("error", error_message)
545
- - ("exit", return_code)
546
- """
547
- effective_timeout = min(timeout or self.timeout, MAX_TIMEOUT)
548
- payload = {
549
- "command": command,
550
- "work_dir": work_dir or self.work_dir,
551
- "timeout": effective_timeout,
552
- }
553
- try:
554
- with self._client.stream(
555
- "POST",
556
- "bash/stream",
557
- json=payload,
558
- timeout=httpx.Timeout(effective_timeout + 30, connect=30),
559
- ) as resp:
560
- if resp.status_code != 200:
561
- yield ("error", f"HTTP {resp.status_code}")
562
- return
563
- for raw_line in resp.iter_lines():
564
- if not raw_line:
565
- continue
566
- try:
567
- msg = json.loads(raw_line)
568
- except json.JSONDecodeError:
569
- continue
570
- msg_type = msg.get("type", "")
571
- if msg_type == "output":
572
- yield ("output", msg.get("data", ""))
573
- elif msg_type == "exit":
574
- yield ("exit", msg.get("code", -1))
575
- elif msg_type == "error":
576
- yield ("error", msg.get("data", "unknown error"))
577
- except httpx.TimeoutException:
578
- yield ("error", f"Timeout after {effective_timeout}s")
579
- except httpx.ConnectError:
580
- yield ("error", f"Cannot connect to sandbox. Is {self.space_id} running?")
581
- except Exception as e:
582
- yield ("error", str(e))
583
-
584
  def read(
585
  self, path: str, *, offset: int | None = None, limit: int | None = None
586
  ) -> ToolResult:
 
37
  from __future__ import annotations
38
 
39
  import io
 
40
  import sys
41
  import time
42
  import uuid
43
  from dataclasses import dataclass, field
44
+ from typing import Any, Callable
45
 
46
  import httpx
47
  from huggingface_hub import CommitOperationAdd, HfApi
 
97
 
98
  _SANDBOX_SERVER = '''\
99
  """Minimal FastAPI server for sandbox operations."""
100
+ import os, subprocess, pathlib
101
  from fastapi import FastAPI
 
102
  from pydantic import BaseModel
103
  from typing import Optional
104
  import uvicorn
 
148
  except Exception as e:
149
  return {"success": False, "output": "", "error": str(e)}
150
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  @app.post("/api/read")
152
  def read(req: ReadReq):
153
  try:
 
492
  timeout=timeout,
493
  )
494
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
495
  def read(
496
  self, path: str, *, offset: int | None = None, limit: int | None = None
497
  ) -> ToolResult:
agent/tools/sandbox_tool.py CHANGED
@@ -190,77 +190,8 @@ async def sandbox_create_handler(
190
  ), True
191
 
192
 
193
- def _make_bash_streaming_handler():
194
- """Create a bash handler that streams output via tool_log events."""
195
-
196
- async def handler(args: dict[str, Any], session: Any = None) -> tuple[str, bool]:
197
- if not session or not getattr(session, "sandbox", None):
198
- return "No sandbox running. Call sandbox_create first to start one.", False
199
-
200
- sb = session.sandbox
201
- queue: asyncio.Queue = asyncio.Queue()
202
- loop = asyncio.get_running_loop()
203
-
204
- def producer():
205
- try:
206
- for event_type, data in sb.bash_stream(
207
- args["command"],
208
- work_dir=args.get("work_dir"),
209
- timeout=args.get("timeout"),
210
- ):
211
- loop.call_soon_threadsafe(queue.put_nowait, (event_type, data))
212
- loop.call_soon_threadsafe(queue.put_nowait, None)
213
- except Exception as e:
214
- loop.call_soon_threadsafe(queue.put_nowait, ("error", str(e)))
215
- loop.call_soon_threadsafe(queue.put_nowait, None)
216
-
217
- fut = loop.run_in_executor(None, producer)
218
-
219
- all_output: list[str] = []
220
- exit_code = 0
221
- error_msg = ""
222
-
223
- while True:
224
- item = await queue.get()
225
- if item is None:
226
- break
227
-
228
- event_type, data = item
229
- if event_type == "output":
230
- all_output.append(data)
231
- if session:
232
- await session.send_event(
233
- Event(
234
- event_type="tool_log",
235
- data={"tool": "bash", "log": data.rstrip("\n")},
236
- )
237
- )
238
- elif event_type == "exit":
239
- exit_code = data
240
- elif event_type == "error":
241
- error_msg = data
242
-
243
- await fut
244
-
245
- output = "".join(all_output)
246
- if not output and not error_msg:
247
- output = "(no output)"
248
-
249
- if error_msg:
250
- if output:
251
- return f"{output}\nERROR: {error_msg}", False
252
- return f"ERROR: {error_msg}", False
253
-
254
- if exit_code != 0:
255
- return f"{output}\nExit code {exit_code}", False
256
-
257
- return output, True
258
-
259
- return handler
260
-
261
-
262
  def _make_tool_handler(sandbox_tool_name: str):
263
- """Factory: create a handler for a sandbox operation tool (non-bash)."""
264
 
265
  async def handler(args: dict[str, Any], session: Any = None) -> tuple[str, bool]:
266
  # Require sandbox to exist — user must approve sandbox_create first
@@ -304,17 +235,12 @@ def get_sandbox_tools():
304
  # Operation tools (auto-execute, no approval needed)
305
  for name in Sandbox.TOOLS.keys():
306
  spec = Sandbox.TOOLS[name]
307
- # Bash uses streaming handler; other tools use standard handler
308
- if name == "bash":
309
- handler = _make_bash_streaming_handler()
310
- else:
311
- handler = _make_tool_handler(name)
312
  tools.append(
313
  ToolSpec(
314
  name=name,
315
  description=spec["description"],
316
  parameters=spec["parameters"],
317
- handler=handler,
318
  )
319
  )
320
 
 
190
  ), True
191
 
192
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
  def _make_tool_handler(sandbox_tool_name: str):
194
+ """Factory: create a handler for a sandbox operation tool."""
195
 
196
  async def handler(args: dict[str, Any], session: Any = None) -> tuple[str, bool]:
197
  # Require sandbox to exist — user must approve sandbox_create first
 
235
  # Operation tools (auto-execute, no approval needed)
236
  for name in Sandbox.TOOLS.keys():
237
  spec = Sandbox.TOOLS[name]
 
 
 
 
 
238
  tools.append(
239
  ToolSpec(
240
  name=name,
241
  description=spec["description"],
242
  parameters=spec["parameters"],
243
+ handler=_make_tool_handler(name),
244
  )
245
  )
246
 
tests/integration/tools/test_sandbox_streaming.py DELETED
@@ -1,267 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Integration test for sandbox bash streaming.
4
-
5
- Creates a real sandbox on HF, runs commands, verifies that:
6
- 1. bash_stream() yields lines incrementally
7
- 2. The streaming tool handler emits tool_log events
8
- 3. The final output is collected correctly
9
- 4. Error/exit codes propagate
10
-
11
- Requires HF_TOKEN in environment.
12
- """
13
- import asyncio
14
- import os
15
- import sys
16
-
17
- sys.path.insert(0, ".")
18
-
19
- from agent.tools.sandbox_client import Sandbox
20
-
21
- GREEN = "\033[92m"
22
- RED = "\033[91m"
23
- BLUE = "\033[94m"
24
- YELLOW = "\033[93m"
25
- RESET = "\033[0m"
26
-
27
-
28
- def ok(msg):
29
- print(f"{GREEN} OK{RESET} {msg}")
30
-
31
-
32
- def fail(msg):
33
- print(f"{RED}FAIL{RESET} {msg}")
34
-
35
-
36
- def info(msg):
37
- print(f"{BLUE}INFO{RESET} {msg}")
38
-
39
-
40
- async def test_bash_stream_basic(sb: Sandbox):
41
- """Test that bash_stream yields output lines and an exit event."""
42
- info("bash_stream: echo test")
43
- lines = []
44
- exit_code = None
45
- for event_type, data in sb.bash_stream("echo hello && echo world"):
46
- if event_type == "output":
47
- lines.append(data.rstrip("\n"))
48
- elif event_type == "exit":
49
- exit_code = data
50
-
51
- if lines == ["hello", "world"]:
52
- ok(f"Got expected lines: {lines}")
53
- else:
54
- fail(f"Unexpected lines: {lines}")
55
- return False
56
-
57
- if exit_code == 0:
58
- ok(f"Exit code: {exit_code}")
59
- else:
60
- fail(f"Unexpected exit code: {exit_code}")
61
- return False
62
-
63
- return True
64
-
65
-
66
- async def test_bash_stream_multiline(sb: Sandbox):
67
- """Test streaming a script that prints multiple lines with delays."""
68
- info("bash_stream: multi-line with sleep")
69
- lines = []
70
- for event_type, data in sb.bash_stream(
71
- 'for i in 1 2 3; do echo "line $i"; sleep 0.2; done'
72
- ):
73
- if event_type == "output":
74
- line = data.rstrip("\n")
75
- lines.append(line)
76
- info(f" streamed: {line}")
77
-
78
- if len(lines) == 3 and lines == ["line 1", "line 2", "line 3"]:
79
- ok(f"Got all 3 lines incrementally")
80
- else:
81
- fail(f"Unexpected lines: {lines}")
82
- return False
83
-
84
- return True
85
-
86
-
87
- async def test_bash_stream_stderr(sb: Sandbox):
88
- """Test that stderr is also streamed (merged into stdout)."""
89
- info("bash_stream: stderr output")
90
- lines = []
91
- for event_type, data in sb.bash_stream("echo out && echo err >&2"):
92
- if event_type == "output":
93
- lines.append(data.rstrip("\n"))
94
-
95
- if "out" in lines and "err" in lines:
96
- ok(f"Both stdout and stderr captured: {lines}")
97
- else:
98
- fail(f"Missing output: {lines}")
99
- return False
100
-
101
- return True
102
-
103
-
104
- async def test_bash_stream_exit_code(sb: Sandbox):
105
- """Test that non-zero exit codes are reported."""
106
- info("bash_stream: non-zero exit code")
107
- exit_code = None
108
- for event_type, data in sb.bash_stream("exit 42"):
109
- if event_type == "exit":
110
- exit_code = data
111
-
112
- if exit_code == 42:
113
- ok(f"Got expected exit code: {exit_code}")
114
- else:
115
- fail(f"Unexpected exit code: {exit_code}")
116
- return False
117
-
118
- return True
119
-
120
-
121
- async def test_bash_stream_empty(sb: Sandbox):
122
- """Test command with no output."""
123
- info("bash_stream: no output command")
124
- lines = []
125
- exit_code = None
126
- for event_type, data in sb.bash_stream("true"):
127
- if event_type == "output":
128
- lines.append(data)
129
- elif event_type == "exit":
130
- exit_code = data
131
-
132
- if exit_code == 0:
133
- ok(f"Exit code 0, output lines: {len(lines)}")
134
- else:
135
- fail(f"Unexpected exit code: {exit_code}")
136
- return False
137
-
138
- return True
139
-
140
-
141
- async def test_bash_stream_tool_handler(sb: Sandbox):
142
- """Test the full tool handler path with mock session that captures events."""
143
- info("Tool handler: streaming bash with event capture")
144
-
145
- from agent.core.session import Event
146
- from agent.tools.sandbox_tool import _make_bash_streaming_handler
147
-
148
- # Mock session with event capture
149
- class MockSession:
150
- def __init__(self, sandbox):
151
- self.sandbox = sandbox
152
- self.event_queue = asyncio.Queue()
153
- self.events = []
154
-
155
- async def send_event(self, event: Event):
156
- self.events.append(event)
157
- await self.event_queue.put(event)
158
-
159
- session = MockSession(sb)
160
- handler = _make_bash_streaming_handler()
161
-
162
- output, success = await handler(
163
- {"command": 'for i in a b c; do echo "$i"; sleep 0.1; done'},
164
- session=session,
165
- )
166
-
167
- # Check tool_log events were emitted
168
- log_events = [e for e in session.events if e.event_type == "tool_log"]
169
- log_lines = [e.data["log"] for e in log_events]
170
-
171
- if log_lines == ["a", "b", "c"]:
172
- ok(f"Tool handler emitted {len(log_events)} tool_log events: {log_lines}")
173
- else:
174
- fail(f"Unexpected log events: {log_lines}")
175
- return False
176
-
177
- if success and "a" in output and "b" in output and "c" in output:
178
- ok(f"Handler returned success with full output")
179
- else:
180
- fail(f"Handler returned success={success}, output={output!r}")
181
- return False
182
-
183
- return True
184
-
185
-
186
- async def test_original_bash_still_works(sb: Sandbox):
187
- """Verify the non-streaming bash endpoint still works."""
188
- info("Original bash endpoint (non-streaming)")
189
- result = await asyncio.to_thread(sb.bash, "echo legacy_test")
190
- if result.success and "legacy_test" in result.output:
191
- ok(f"Original bash works: {result.output.strip()}")
192
- else:
193
- fail(f"Original bash failed: {result}")
194
- return False
195
-
196
- return True
197
-
198
-
199
- async def main():
200
- print("=" * 60)
201
- print(f"{BLUE}Sandbox Bash Streaming — Integration Tests{RESET}")
202
- print("=" * 60)
203
-
204
- token = os.environ.get("HF_TOKEN")
205
- if not token:
206
- fail("HF_TOKEN not set")
207
- sys.exit(1)
208
-
209
- from huggingface_hub import HfApi
210
-
211
- api = HfApi(token=token)
212
- owner = api.whoami().get("name", "")
213
- info(f"HF user: {owner}")
214
-
215
- sb = None
216
- try:
217
- info("Creating sandbox (this takes ~2-4 minutes)...")
218
- sb = Sandbox.create(owner=owner, token=token, hardware="cpu-basic")
219
- ok(f"Sandbox ready: {sb.space_id}")
220
- print()
221
-
222
- tests = [
223
- test_bash_stream_basic,
224
- test_bash_stream_multiline,
225
- test_bash_stream_stderr,
226
- test_bash_stream_exit_code,
227
- test_bash_stream_empty,
228
- test_bash_stream_tool_handler,
229
- test_original_bash_still_works,
230
- ]
231
-
232
- passed = 0
233
- failed = 0
234
- for test in tests:
235
- print()
236
- try:
237
- result = await test(sb)
238
- if result:
239
- passed += 1
240
- else:
241
- failed += 1
242
- except Exception as e:
243
- fail(f"{test.__name__}: {e}")
244
- import traceback
245
- traceback.print_exc()
246
- failed += 1
247
-
248
- print()
249
- print("=" * 60)
250
- if failed == 0:
251
- print(f"{GREEN}All {passed} tests passed!{RESET}")
252
- else:
253
- print(f"{RED}{failed} failed{RESET}, {GREEN}{passed} passed{RESET}")
254
- print("=" * 60)
255
-
256
- finally:
257
- if sb and sb._owns_space:
258
- info(f"Cleaning up sandbox: {sb.space_id}")
259
- try:
260
- sb.delete()
261
- ok("Sandbox deleted")
262
- except Exception as e:
263
- fail(f"Cleanup failed: {e}")
264
-
265
-
266
- if __name__ == "__main__":
267
- asyncio.run(main())