akseljoonas HF Staff Claude Opus 4.6 commited on
Commit
f97b6ec
·
1 Parent(s): f729708

feat: stream sandbox bash output to frontend in real-time

Browse files

Add streaming bash endpoint to sandbox server (NDJSON over HTTP),
a bash_stream() generator on the client, and a streaming tool handler
that emits tool_log events per line — same pattern as HF Jobs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

agent/tools/sandbox_client.py CHANGED
@@ -37,11 +37,12 @@ Tools: bash, read, write, edit, upload
37
  from __future__ import annotations
38
 
39
  import io
 
40
  import sys
41
  import time
42
  import uuid
43
  from dataclasses import dataclass, field
44
- from typing import Any, Callable
45
 
46
  import httpx
47
  from huggingface_hub import CommitOperationAdd, HfApi
@@ -97,8 +98,9 @@ CMD ["python", "sandbox_server.py"]
97
 
98
  _SANDBOX_SERVER = '''\
99
  """Minimal FastAPI server for sandbox operations."""
100
- import os, subprocess, pathlib
101
  from fastapi import FastAPI
 
102
  from pydantic import BaseModel
103
  from typing import Optional
104
  import uvicorn
@@ -148,6 +150,41 @@ def bash(req: BashReq):
148
  except Exception as e:
149
  return {"success": False, "output": "", "error": str(e)}
150
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  @app.post("/api/read")
152
  def read(req: ReadReq):
153
  try:
@@ -492,6 +529,58 @@ class Sandbox:
492
  timeout=timeout,
493
  )
494
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
495
  def read(
496
  self, path: str, *, offset: int | None = None, limit: int | None = None
497
  ) -> ToolResult:
 
37
  from __future__ import annotations
38
 
39
  import io
40
+ import json
41
  import sys
42
  import time
43
  import uuid
44
  from dataclasses import dataclass, field
45
+ from typing import Any, Callable, Generator
46
 
47
  import httpx
48
  from huggingface_hub import CommitOperationAdd, HfApi
 
98
 
99
  _SANDBOX_SERVER = '''\
100
  """Minimal FastAPI server for sandbox operations."""
101
+ import os, subprocess, pathlib, asyncio, json
102
  from fastapi import FastAPI
103
+ from fastapi.responses import StreamingResponse
104
  from pydantic import BaseModel
105
  from typing import Optional
106
  import uvicorn
 
150
  except Exception as e:
151
  return {"success": False, "output": "", "error": str(e)}
152
 
153
+ @app.post("/api/bash/stream")
154
+ async def bash_stream(req: BashReq):
155
+ """Stream bash output line-by-line as NDJSON."""
156
+ async def generate():
157
+ try:
158
+ proc = await asyncio.create_subprocess_shell(
159
+ req.command,
160
+ stdout=asyncio.subprocess.PIPE,
161
+ stderr=asyncio.subprocess.STDOUT,
162
+ cwd=req.work_dir,
163
+ )
164
+ total_chars = 0
165
+ try:
166
+ line_bytes = await asyncio.wait_for(proc.stdout.readline(), timeout=req.timeout)
167
+ while line_bytes:
168
+ line = line_bytes.decode(errors="replace")
169
+ total_chars += len(line)
170
+ if total_chars <= 30000:
171
+ yield json.dumps({"type": "output", "data": line}) + "\\n"
172
+ elif total_chars - len(line) <= 30000:
173
+ yield json.dumps({"type": "output", "data": "... (truncated)\\n"}) + "\\n"
174
+ line_bytes = await asyncio.wait_for(proc.stdout.readline(), timeout=req.timeout)
175
+ except asyncio.TimeoutError:
176
+ try:
177
+ proc.kill()
178
+ except ProcessLookupError:
179
+ pass
180
+ yield json.dumps({"type": "error", "data": f"Timeout after {req.timeout}s"}) + "\\n"
181
+ return
182
+ rc = await proc.wait()
183
+ yield json.dumps({"type": "exit", "code": rc}) + "\\n"
184
+ except Exception as e:
185
+ yield json.dumps({"type": "error", "data": str(e)}) + "\\n"
186
+ return StreamingResponse(generate(), media_type="application/x-ndjson")
187
+
188
  @app.post("/api/read")
189
  def read(req: ReadReq):
190
  try:
 
529
  timeout=timeout,
530
  )
531
 
532
+ def bash_stream(
533
+ self,
534
+ command: str,
535
+ *,
536
+ work_dir: str | None = None,
537
+ timeout: int | None = None,
538
+ ) -> Generator[tuple[str, str | int], None, None]:
539
+ """Stream bash output line-by-line from the sandbox.
540
+
541
+ Yields:
542
+ (event_type, data) tuples:
543
+ - ("output", line_text)
544
+ - ("error", error_message)
545
+ - ("exit", return_code)
546
+ """
547
+ effective_timeout = min(timeout or self.timeout, MAX_TIMEOUT)
548
+ payload = {
549
+ "command": command,
550
+ "work_dir": work_dir or self.work_dir,
551
+ "timeout": effective_timeout,
552
+ }
553
+ try:
554
+ with self._client.stream(
555
+ "POST",
556
+ "bash/stream",
557
+ json=payload,
558
+ timeout=httpx.Timeout(effective_timeout + 30, connect=30),
559
+ ) as resp:
560
+ if resp.status_code != 200:
561
+ yield ("error", f"HTTP {resp.status_code}")
562
+ return
563
+ for raw_line in resp.iter_lines():
564
+ if not raw_line:
565
+ continue
566
+ try:
567
+ msg = json.loads(raw_line)
568
+ except json.JSONDecodeError:
569
+ continue
570
+ msg_type = msg.get("type", "")
571
+ if msg_type == "output":
572
+ yield ("output", msg.get("data", ""))
573
+ elif msg_type == "exit":
574
+ yield ("exit", msg.get("code", -1))
575
+ elif msg_type == "error":
576
+ yield ("error", msg.get("data", "unknown error"))
577
+ except httpx.TimeoutException:
578
+ yield ("error", f"Timeout after {effective_timeout}s")
579
+ except httpx.ConnectError:
580
+ yield ("error", f"Cannot connect to sandbox. Is {self.space_id} running?")
581
+ except Exception as e:
582
+ yield ("error", str(e))
583
+
584
  def read(
585
  self, path: str, *, offset: int | None = None, limit: int | None = None
586
  ) -> ToolResult:
agent/tools/sandbox_tool.py CHANGED
@@ -190,8 +190,77 @@ async def sandbox_create_handler(
190
  ), True
191
 
192
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
  def _make_tool_handler(sandbox_tool_name: str):
194
- """Factory: create a handler for a sandbox operation tool."""
195
 
196
  async def handler(args: dict[str, Any], session: Any = None) -> tuple[str, bool]:
197
  # Require sandbox to exist — user must approve sandbox_create first
@@ -235,12 +304,17 @@ def get_sandbox_tools():
235
  # Operation tools (auto-execute, no approval needed)
236
  for name in Sandbox.TOOLS.keys():
237
  spec = Sandbox.TOOLS[name]
 
 
 
 
 
238
  tools.append(
239
  ToolSpec(
240
  name=name,
241
  description=spec["description"],
242
  parameters=spec["parameters"],
243
- handler=_make_tool_handler(name),
244
  )
245
  )
246
 
 
190
  ), True
191
 
192
 
193
+ def _make_bash_streaming_handler():
194
+ """Create a bash handler that streams output via tool_log events."""
195
+
196
+ async def handler(args: dict[str, Any], session: Any = None) -> tuple[str, bool]:
197
+ if not session or not getattr(session, "sandbox", None):
198
+ return "No sandbox running. Call sandbox_create first to start one.", False
199
+
200
+ sb = session.sandbox
201
+ queue: asyncio.Queue = asyncio.Queue()
202
+ loop = asyncio.get_running_loop()
203
+
204
+ def producer():
205
+ try:
206
+ for event_type, data in sb.bash_stream(
207
+ args["command"],
208
+ work_dir=args.get("work_dir"),
209
+ timeout=args.get("timeout"),
210
+ ):
211
+ loop.call_soon_threadsafe(queue.put_nowait, (event_type, data))
212
+ loop.call_soon_threadsafe(queue.put_nowait, None)
213
+ except Exception as e:
214
+ loop.call_soon_threadsafe(queue.put_nowait, ("error", str(e)))
215
+ loop.call_soon_threadsafe(queue.put_nowait, None)
216
+
217
+ fut = loop.run_in_executor(None, producer)
218
+
219
+ all_output: list[str] = []
220
+ exit_code = 0
221
+ error_msg = ""
222
+
223
+ while True:
224
+ item = await queue.get()
225
+ if item is None:
226
+ break
227
+
228
+ event_type, data = item
229
+ if event_type == "output":
230
+ all_output.append(data)
231
+ if session:
232
+ await session.send_event(
233
+ Event(
234
+ event_type="tool_log",
235
+ data={"tool": "bash", "log": data.rstrip("\n")},
236
+ )
237
+ )
238
+ elif event_type == "exit":
239
+ exit_code = data
240
+ elif event_type == "error":
241
+ error_msg = data
242
+
243
+ await fut
244
+
245
+ output = "".join(all_output)
246
+ if not output and not error_msg:
247
+ output = "(no output)"
248
+
249
+ if error_msg:
250
+ if output:
251
+ return f"{output}\nERROR: {error_msg}", False
252
+ return f"ERROR: {error_msg}", False
253
+
254
+ if exit_code != 0:
255
+ return f"{output}\nExit code {exit_code}", False
256
+
257
+ return output, True
258
+
259
+ return handler
260
+
261
+
262
  def _make_tool_handler(sandbox_tool_name: str):
263
+ """Factory: create a handler for a sandbox operation tool (non-bash)."""
264
 
265
  async def handler(args: dict[str, Any], session: Any = None) -> tuple[str, bool]:
266
  # Require sandbox to exist — user must approve sandbox_create first
 
304
  # Operation tools (auto-execute, no approval needed)
305
  for name in Sandbox.TOOLS.keys():
306
  spec = Sandbox.TOOLS[name]
307
+ # Bash uses streaming handler; other tools use standard handler
308
+ if name == "bash":
309
+ handler = _make_bash_streaming_handler()
310
+ else:
311
+ handler = _make_tool_handler(name)
312
  tools.append(
313
  ToolSpec(
314
  name=name,
315
  description=spec["description"],
316
  parameters=spec["parameters"],
317
+ handler=handler,
318
  )
319
  )
320
 
tests/integration/tools/test_sandbox_streaming.py ADDED
@@ -0,0 +1,267 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Integration test for sandbox bash streaming.
4
+
5
+ Creates a real sandbox on HF, runs commands, verifies that:
6
+ 1. bash_stream() yields lines incrementally
7
+ 2. The streaming tool handler emits tool_log events
8
+ 3. The final output is collected correctly
9
+ 4. Error/exit codes propagate
10
+
11
+ Requires HF_TOKEN in environment.
12
+ """
13
+ import asyncio
14
+ import os
15
+ import sys
16
+
17
+ sys.path.insert(0, ".")
18
+
19
+ from agent.tools.sandbox_client import Sandbox
20
+
21
+ GREEN = "\033[92m"
22
+ RED = "\033[91m"
23
+ BLUE = "\033[94m"
24
+ YELLOW = "\033[93m"
25
+ RESET = "\033[0m"
26
+
27
+
28
+ def ok(msg):
29
+ print(f"{GREEN} OK{RESET} {msg}")
30
+
31
+
32
+ def fail(msg):
33
+ print(f"{RED}FAIL{RESET} {msg}")
34
+
35
+
36
+ def info(msg):
37
+ print(f"{BLUE}INFO{RESET} {msg}")
38
+
39
+
40
+ async def test_bash_stream_basic(sb: Sandbox):
41
+ """Test that bash_stream yields output lines and an exit event."""
42
+ info("bash_stream: echo test")
43
+ lines = []
44
+ exit_code = None
45
+ for event_type, data in sb.bash_stream("echo hello && echo world"):
46
+ if event_type == "output":
47
+ lines.append(data.rstrip("\n"))
48
+ elif event_type == "exit":
49
+ exit_code = data
50
+
51
+ if lines == ["hello", "world"]:
52
+ ok(f"Got expected lines: {lines}")
53
+ else:
54
+ fail(f"Unexpected lines: {lines}")
55
+ return False
56
+
57
+ if exit_code == 0:
58
+ ok(f"Exit code: {exit_code}")
59
+ else:
60
+ fail(f"Unexpected exit code: {exit_code}")
61
+ return False
62
+
63
+ return True
64
+
65
+
66
+ async def test_bash_stream_multiline(sb: Sandbox):
67
+ """Test streaming a script that prints multiple lines with delays."""
68
+ info("bash_stream: multi-line with sleep")
69
+ lines = []
70
+ for event_type, data in sb.bash_stream(
71
+ 'for i in 1 2 3; do echo "line $i"; sleep 0.2; done'
72
+ ):
73
+ if event_type == "output":
74
+ line = data.rstrip("\n")
75
+ lines.append(line)
76
+ info(f" streamed: {line}")
77
+
78
+ if len(lines) == 3 and lines == ["line 1", "line 2", "line 3"]:
79
+ ok(f"Got all 3 lines incrementally")
80
+ else:
81
+ fail(f"Unexpected lines: {lines}")
82
+ return False
83
+
84
+ return True
85
+
86
+
87
+ async def test_bash_stream_stderr(sb: Sandbox):
88
+ """Test that stderr is also streamed (merged into stdout)."""
89
+ info("bash_stream: stderr output")
90
+ lines = []
91
+ for event_type, data in sb.bash_stream("echo out && echo err >&2"):
92
+ if event_type == "output":
93
+ lines.append(data.rstrip("\n"))
94
+
95
+ if "out" in lines and "err" in lines:
96
+ ok(f"Both stdout and stderr captured: {lines}")
97
+ else:
98
+ fail(f"Missing output: {lines}")
99
+ return False
100
+
101
+ return True
102
+
103
+
104
+ async def test_bash_stream_exit_code(sb: Sandbox):
105
+ """Test that non-zero exit codes are reported."""
106
+ info("bash_stream: non-zero exit code")
107
+ exit_code = None
108
+ for event_type, data in sb.bash_stream("exit 42"):
109
+ if event_type == "exit":
110
+ exit_code = data
111
+
112
+ if exit_code == 42:
113
+ ok(f"Got expected exit code: {exit_code}")
114
+ else:
115
+ fail(f"Unexpected exit code: {exit_code}")
116
+ return False
117
+
118
+ return True
119
+
120
+
121
+ async def test_bash_stream_empty(sb: Sandbox):
122
+ """Test command with no output."""
123
+ info("bash_stream: no output command")
124
+ lines = []
125
+ exit_code = None
126
+ for event_type, data in sb.bash_stream("true"):
127
+ if event_type == "output":
128
+ lines.append(data)
129
+ elif event_type == "exit":
130
+ exit_code = data
131
+
132
+ if exit_code == 0:
133
+ ok(f"Exit code 0, output lines: {len(lines)}")
134
+ else:
135
+ fail(f"Unexpected exit code: {exit_code}")
136
+ return False
137
+
138
+ return True
139
+
140
+
141
+ async def test_bash_stream_tool_handler(sb: Sandbox):
142
+ """Test the full tool handler path with mock session that captures events."""
143
+ info("Tool handler: streaming bash with event capture")
144
+
145
+ from agent.core.session import Event
146
+ from agent.tools.sandbox_tool import _make_bash_streaming_handler
147
+
148
+ # Mock session with event capture
149
+ class MockSession:
150
+ def __init__(self, sandbox):
151
+ self.sandbox = sandbox
152
+ self.event_queue = asyncio.Queue()
153
+ self.events = []
154
+
155
+ async def send_event(self, event: Event):
156
+ self.events.append(event)
157
+ await self.event_queue.put(event)
158
+
159
+ session = MockSession(sb)
160
+ handler = _make_bash_streaming_handler()
161
+
162
+ output, success = await handler(
163
+ {"command": 'for i in a b c; do echo "$i"; sleep 0.1; done'},
164
+ session=session,
165
+ )
166
+
167
+ # Check tool_log events were emitted
168
+ log_events = [e for e in session.events if e.event_type == "tool_log"]
169
+ log_lines = [e.data["log"] for e in log_events]
170
+
171
+ if log_lines == ["a", "b", "c"]:
172
+ ok(f"Tool handler emitted {len(log_events)} tool_log events: {log_lines}")
173
+ else:
174
+ fail(f"Unexpected log events: {log_lines}")
175
+ return False
176
+
177
+ if success and "a" in output and "b" in output and "c" in output:
178
+ ok(f"Handler returned success with full output")
179
+ else:
180
+ fail(f"Handler returned success={success}, output={output!r}")
181
+ return False
182
+
183
+ return True
184
+
185
+
186
+ async def test_original_bash_still_works(sb: Sandbox):
187
+ """Verify the non-streaming bash endpoint still works."""
188
+ info("Original bash endpoint (non-streaming)")
189
+ result = await asyncio.to_thread(sb.bash, "echo legacy_test")
190
+ if result.success and "legacy_test" in result.output:
191
+ ok(f"Original bash works: {result.output.strip()}")
192
+ else:
193
+ fail(f"Original bash failed: {result}")
194
+ return False
195
+
196
+ return True
197
+
198
+
199
+ async def main():
200
+ print("=" * 60)
201
+ print(f"{BLUE}Sandbox Bash Streaming — Integration Tests{RESET}")
202
+ print("=" * 60)
203
+
204
+ token = os.environ.get("HF_TOKEN")
205
+ if not token:
206
+ fail("HF_TOKEN not set")
207
+ sys.exit(1)
208
+
209
+ from huggingface_hub import HfApi
210
+
211
+ api = HfApi(token=token)
212
+ owner = api.whoami().get("name", "")
213
+ info(f"HF user: {owner}")
214
+
215
+ sb = None
216
+ try:
217
+ info("Creating sandbox (this takes ~2-4 minutes)...")
218
+ sb = Sandbox.create(owner=owner, token=token, hardware="cpu-basic")
219
+ ok(f"Sandbox ready: {sb.space_id}")
220
+ print()
221
+
222
+ tests = [
223
+ test_bash_stream_basic,
224
+ test_bash_stream_multiline,
225
+ test_bash_stream_stderr,
226
+ test_bash_stream_exit_code,
227
+ test_bash_stream_empty,
228
+ test_bash_stream_tool_handler,
229
+ test_original_bash_still_works,
230
+ ]
231
+
232
+ passed = 0
233
+ failed = 0
234
+ for test in tests:
235
+ print()
236
+ try:
237
+ result = await test(sb)
238
+ if result:
239
+ passed += 1
240
+ else:
241
+ failed += 1
242
+ except Exception as e:
243
+ fail(f"{test.__name__}: {e}")
244
+ import traceback
245
+ traceback.print_exc()
246
+ failed += 1
247
+
248
+ print()
249
+ print("=" * 60)
250
+ if failed == 0:
251
+ print(f"{GREEN}All {passed} tests passed!{RESET}")
252
+ else:
253
+ print(f"{RED}{failed} failed{RESET}, {GREEN}{passed} passed{RESET}")
254
+ print("=" * 60)
255
+
256
+ finally:
257
+ if sb and sb._owns_space:
258
+ info(f"Cleaning up sandbox: {sb.space_id}")
259
+ try:
260
+ sb.delete()
261
+ ok("Sandbox deleted")
262
+ except Exception as e:
263
+ fail(f"Cleanup failed: {e}")
264
+
265
+
266
+ if __name__ == "__main__":
267
+ asyncio.run(main())