akseljoonas HF Staff commited on
Commit
571b292
·
1 Parent(s): 1de37c8

replace websocket transport with SSE

Browse files
backend/dependencies.py CHANGED
@@ -1,6 +1,5 @@
1
  """Authentication dependencies for FastAPI routes.
2
 
3
- Provides auth validation for both REST and WebSocket endpoints.
4
  - In dev mode (OAUTH_CLIENT_ID not set): auth is bypassed, returns a default "dev" user.
5
  - In production: validates Bearer tokens or cookies against HF OAuth.
6
  """
@@ -11,7 +10,7 @@ import time
11
  from typing import Any
12
 
13
  import httpx
14
- from fastapi import HTTPException, Request, WebSocket, status
15
 
16
  logger = logging.getLogger(__name__)
17
 
@@ -142,31 +141,3 @@ async def get_current_user(request: Request) -> dict[str, Any]:
142
  )
143
 
144
 
145
- async def get_ws_user(websocket: WebSocket) -> dict[str, Any] | None:
146
- """Extract and validate user from WebSocket connection.
147
-
148
- WebSocket doesn't support custom headers from browser, so we check:
149
- 1. ?token= query parameter
150
- 2. hf_access_token cookie (sent automatically for same-origin)
151
-
152
- Returns user dict or None if not authenticated.
153
- In dev mode, returns the default dev user.
154
- """
155
- if not AUTH_ENABLED:
156
- return DEV_USER
157
-
158
- # Try query param
159
- token = websocket.query_params.get("token")
160
- if token:
161
- user = await _extract_user_from_token(token)
162
- if user:
163
- return user
164
-
165
- # Try cookie (works for same-origin WebSocket)
166
- token = websocket.cookies.get("hf_access_token")
167
- if token:
168
- user = await _extract_user_from_token(token)
169
- if user:
170
- return user
171
-
172
- return None
 
1
  """Authentication dependencies for FastAPI routes.
2
 
 
3
  - In dev mode (OAUTH_CLIENT_ID not set): auth is bypassed, returns a default "dev" user.
4
  - In production: validates Bearer tokens or cookies against HF OAuth.
5
  """
 
10
  from typing import Any
11
 
12
  import httpx
13
+ from fastapi import HTTPException, Request, status
14
 
15
  logger = logging.getLogger(__name__)
16
 
 
141
  )
142
 
143
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/routes/agent.py CHANGED
@@ -1,25 +1,22 @@
1
- """Agent API routes - WebSocket and REST endpoints.
2
 
3
  All routes (except /health) require authentication via the get_current_user
4
  dependency. In dev mode (no OAUTH_CLIENT_ID), auth is bypassed automatically.
5
  """
6
 
 
7
  import logging
8
- import os
9
  from typing import Any
10
 
11
- from dependencies import get_current_user, get_ws_user
12
  from fastapi import (
13
  APIRouter,
14
  Depends,
15
  HTTPException,
16
  Request,
17
- WebSocket,
18
- WebSocketDisconnect,
19
  )
 
20
  from litellm import acompletion
21
-
22
- from agent.core.agent_loop import _resolve_hf_router_params
23
  from models import (
24
  ApprovalRequest,
25
  HealthResponse,
@@ -29,7 +26,8 @@ from models import (
29
  SubmitRequest,
30
  )
31
  from session_manager import MAX_SESSIONS, SessionCapacityError, session_manager
32
- from websocket import manager as ws_manager
 
33
 
34
  logger = logging.getLogger(__name__)
35
 
@@ -286,6 +284,84 @@ async def submit_approval(
286
  return {"status": "submitted", "session_id": request.session_id}
287
 
288
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  @router.post("/interrupt/{session_id}")
290
  async def interrupt_session(
291
  session_id: str, user: dict = Depends(get_current_user)
@@ -344,77 +420,3 @@ async def shutdown_session(
344
  return {"status": "shutdown_requested", "session_id": session_id}
345
 
346
 
347
- @router.websocket("/ws/{session_id}")
348
- async def websocket_endpoint(websocket: WebSocket, session_id: str) -> None:
349
- """WebSocket endpoint for real-time events.
350
-
351
- Authentication is done via:
352
- - ?token= query parameter (for browsers that can't send WS headers)
353
- - Cookie (automatic for same-origin connections)
354
- - Dev mode bypass (when OAUTH_CLIENT_ID is not set)
355
-
356
- NOTE: We must accept() before close() so the browser receives our custom
357
- close codes (4001, 4003, 4004). If we close() before accept(), Starlette
358
- sends HTTP 403 and the browser only sees code 1006 (abnormal closure).
359
- """
360
- logger.info(f"WebSocket connection request for session {session_id}")
361
-
362
- # Authenticate the WebSocket connection
363
- user = await get_ws_user(websocket)
364
- if not user:
365
- logger.warning(
366
- f"WebSocket rejected: authentication failed for session {session_id}"
367
- )
368
- await websocket.accept()
369
- await websocket.close(code=4001, reason="Authentication required")
370
- return
371
-
372
- # Verify session exists
373
- info = session_manager.get_session_info(session_id)
374
- if not info:
375
- logger.warning(f"WebSocket rejected: session {session_id} not found")
376
- await websocket.accept()
377
- await websocket.close(code=4004, reason="Session not found")
378
- return
379
-
380
- # Verify user owns the session
381
- if not session_manager.verify_session_access(session_id, user["user_id"]):
382
- logger.warning(
383
- f"WebSocket rejected: user {user['user_id']} denied access to session {session_id}"
384
- )
385
- await websocket.accept()
386
- await websocket.close(code=4003, reason="Access denied")
387
- return
388
-
389
- had_buffered = await ws_manager.connect(websocket, session_id)
390
-
391
- # Send "ready" on fresh connections so the frontend knows the session
392
- # is alive. Skip it when buffered events were flushed — those already
393
- # contain the correct state and a ready would incorrectly reset
394
- # isProcessing on the frontend.
395
- if not had_buffered:
396
- try:
397
- await websocket.send_json(
398
- {
399
- "event_type": "ready",
400
- "data": {"message": "Agent initialized"},
401
- }
402
- )
403
- except Exception as e:
404
- logger.error(f"Failed to send ready event for session {session_id}: {e}")
405
-
406
- try:
407
- while True:
408
- # Keep connection alive, handle ping/pong
409
- data = await websocket.receive_json()
410
-
411
- # Handle client messages (e.g., ping)
412
- if data.get("type") == "ping":
413
- await websocket.send_json({"type": "pong"})
414
-
415
- except WebSocketDisconnect:
416
- logger.info(f"WebSocket disconnected for session {session_id}")
417
- except Exception as e:
418
- logger.error(f"WebSocket error for session {session_id}: {e}")
419
- finally:
420
- ws_manager.disconnect(session_id)
 
1
+ """Agent API routes REST + SSE endpoints.
2
 
3
  All routes (except /health) require authentication via the get_current_user
4
  dependency. In dev mode (no OAUTH_CLIENT_ID), auth is bypassed automatically.
5
  """
6
 
7
+ import json
8
  import logging
 
9
  from typing import Any
10
 
11
+ from dependencies import get_current_user
12
  from fastapi import (
13
  APIRouter,
14
  Depends,
15
  HTTPException,
16
  Request,
 
 
17
  )
18
+ from fastapi.responses import StreamingResponse
19
  from litellm import acompletion
 
 
20
  from models import (
21
  ApprovalRequest,
22
  HealthResponse,
 
26
  SubmitRequest,
27
  )
28
  from session_manager import MAX_SESSIONS, SessionCapacityError, session_manager
29
+
30
+ from agent.core.agent_loop import _resolve_hf_router_params
31
 
32
  logger = logging.getLogger(__name__)
33
 
 
284
  return {"status": "submitted", "session_id": request.session_id}
285
 
286
 
287
+ @router.post("/chat/{session_id}")
288
+ async def chat_sse(
289
+ session_id: str,
290
+ request: Request,
291
+ user: dict = Depends(get_current_user),
292
+ ) -> StreamingResponse:
293
+ """SSE endpoint: submit input or approval, then stream events until turn ends."""
294
+ _check_session_access(session_id, user)
295
+
296
+ agent_session = session_manager.sessions.get(session_id)
297
+ if not agent_session or not agent_session.is_active:
298
+ raise HTTPException(status_code=404, detail="Session not found or inactive")
299
+
300
+ # Parse body
301
+ body = await request.json()
302
+
303
+ # Subscribe BEFORE submitting so we never miss events — even if the
304
+ # agent loop processes the submission before this coroutine continues.
305
+ broadcaster = agent_session.broadcaster
306
+ sub_id, event_queue = broadcaster.subscribe()
307
+
308
+ # Submit the operation
309
+ text = body.get("text")
310
+ approvals = body.get("approvals")
311
+
312
+ try:
313
+ if approvals:
314
+ formatted = [
315
+ {
316
+ "tool_call_id": a["tool_call_id"],
317
+ "approved": a["approved"],
318
+ "feedback": a.get("feedback"),
319
+ "edited_script": a.get("edited_script"),
320
+ }
321
+ for a in approvals
322
+ ]
323
+ success = await session_manager.submit_approval(session_id, formatted)
324
+ elif text is not None:
325
+ success = await session_manager.submit_user_input(session_id, text)
326
+ else:
327
+ broadcaster.unsubscribe(sub_id)
328
+ raise HTTPException(status_code=400, detail="Must provide 'text' or 'approvals'")
329
+
330
+ if not success:
331
+ broadcaster.unsubscribe(sub_id)
332
+ raise HTTPException(status_code=404, detail="Session not found or inactive")
333
+ except HTTPException:
334
+ raise
335
+ except Exception:
336
+ broadcaster.unsubscribe(sub_id)
337
+ raise
338
+
339
+ # Terminal events that end the SSE stream
340
+ TERMINAL_EVENTS = {"turn_complete", "approval_required", "error", "interrupted", "shutdown"}
341
+
342
+ async def event_generator():
343
+ try:
344
+ while True:
345
+ msg = await event_queue.get()
346
+ event_type = msg.get("event_type", "")
347
+ # Format as SSE
348
+ yield f"data: {json.dumps(msg)}\n\n"
349
+ if event_type in TERMINAL_EVENTS:
350
+ break
351
+ finally:
352
+ broadcaster.unsubscribe(sub_id)
353
+
354
+ return StreamingResponse(
355
+ event_generator(),
356
+ media_type="text/event-stream",
357
+ headers={
358
+ "Cache-Control": "no-cache",
359
+ "Connection": "keep-alive",
360
+ "X-Accel-Buffering": "no",
361
+ },
362
+ )
363
+
364
+
365
  @router.post("/interrupt/{session_id}")
366
  async def interrupt_session(
367
  session_id: str, user: dict = Depends(get_current_user)
 
420
  return {"status": "shutdown_requested", "session_id": session_id}
421
 
422
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/session_manager.py CHANGED
@@ -8,8 +8,6 @@ from datetime import datetime
8
  from pathlib import Path
9
  from typing import Any, Optional
10
 
11
- from websocket import manager as ws_manager
12
-
13
  from agent.config import load_config
14
  from agent.core.agent_loop import process_submission
15
  from agent.core.session import Event, OpType, Session
@@ -40,6 +38,44 @@ class Submission:
40
  logger = logging.getLogger(__name__)
41
 
42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  @dataclass
44
  class AgentSession:
45
  """Wrapper for an agent session with its associated resources."""
@@ -53,6 +89,7 @@ class AgentSession:
53
  task: asyncio.Task | None = None
54
  created_at: datetime = field(default_factory=datetime.utcnow)
55
  is_active: bool = True
 
56
 
57
 
58
  class SessionCapacityError(Exception):
@@ -126,7 +163,7 @@ class SessionManager:
126
 
127
  # Run blocking constructors in a thread to keep the event loop responsive.
128
  # Without this, Session.__init__ → ContextManager → litellm.get_max_tokens()
129
- # blocks all HTTP/WebSocket handling.
130
  import time as _time
131
 
132
  def _create_session_sync():
@@ -182,7 +219,7 @@ class SessionManager:
182
  event_queue: asyncio.Queue,
183
  tool_router: ToolRouter,
184
  ) -> None:
185
- """Run the agent loop for a session and forward events to WebSocket."""
186
  agent_session = self.sessions.get(session_id)
187
  if not agent_session:
188
  logger.error(f"Session {session_id} not found")
@@ -190,10 +227,10 @@ class SessionManager:
190
 
191
  session = agent_session.session
192
 
193
- # Start event forwarder task
194
- event_forwarder = asyncio.create_task(
195
- self._forward_events(session_id, event_queue)
196
- )
197
 
198
  try:
199
  async with tool_router:
@@ -223,9 +260,9 @@ class SessionManager:
223
  )
224
 
225
  finally:
226
- event_forwarder.cancel()
227
  try:
228
- await event_forwarder
229
  except asyncio.CancelledError:
230
  pass
231
 
@@ -237,19 +274,6 @@ class SessionManager:
237
 
238
  logger.info(f"Session {session_id} ended")
239
 
240
- async def _forward_events(
241
- self, session_id: str, event_queue: asyncio.Queue
242
- ) -> None:
243
- """Forward events from the agent to the WebSocket."""
244
- while True:
245
- try:
246
- event: Event = await event_queue.get()
247
- await ws_manager.send_event(session_id, event.event_type, event.data)
248
- except asyncio.CancelledError:
249
- break
250
- except Exception as e:
251
- logger.error(f"Error forwarding event for {session_id}: {e}")
252
-
253
  async def submit(self, session_id: str, operation: Operation) -> bool:
254
  """Submit an operation to a session."""
255
  async with self._lock:
@@ -320,8 +344,6 @@ class SessionManager:
320
  if not agent_session:
321
  return False
322
 
323
- ws_manager.clear_buffer(session_id)
324
-
325
  # Clean up sandbox Space before cancelling the task
326
  await self._cleanup_sandbox(agent_session.session)
327
 
 
8
  from pathlib import Path
9
  from typing import Any, Optional
10
 
 
 
11
  from agent.config import load_config
12
  from agent.core.agent_loop import process_submission
13
  from agent.core.session import Event, OpType, Session
 
38
  logger = logging.getLogger(__name__)
39
 
40
 
41
+ class EventBroadcaster:
42
+ """Reads from the agent's event queue and fans out to SSE subscribers.
43
+
44
+ Events that arrive when no subscribers are listening are discarded.
45
+ With SSE each turn is a separate request, so there is no reconnect
46
+ scenario that would need buffered replay.
47
+ """
48
+
49
+ def __init__(self, event_queue: asyncio.Queue):
50
+ self._source = event_queue
51
+ self._subscribers: dict[int, asyncio.Queue] = {}
52
+ self._counter = 0
53
+
54
+ def subscribe(self) -> tuple[int, asyncio.Queue]:
55
+ """Create a new subscriber. Returns (id, queue)."""
56
+ self._counter += 1
57
+ sub_id = self._counter
58
+ q: asyncio.Queue = asyncio.Queue()
59
+ self._subscribers[sub_id] = q
60
+ return sub_id, q
61
+
62
+ def unsubscribe(self, sub_id: int) -> None:
63
+ self._subscribers.pop(sub_id, None)
64
+
65
+ async def run(self) -> None:
66
+ """Main loop — reads from source queue and broadcasts."""
67
+ while True:
68
+ try:
69
+ event: Event = await self._source.get()
70
+ msg = {"event_type": event.event_type, "data": event.data}
71
+ for q in self._subscribers.values():
72
+ await q.put(msg)
73
+ except asyncio.CancelledError:
74
+ break
75
+ except Exception as e:
76
+ logger.error(f"EventBroadcaster error: {e}")
77
+
78
+
79
  @dataclass
80
  class AgentSession:
81
  """Wrapper for an agent session with its associated resources."""
 
89
  task: asyncio.Task | None = None
90
  created_at: datetime = field(default_factory=datetime.utcnow)
91
  is_active: bool = True
92
+ broadcaster: Any = None
93
 
94
 
95
  class SessionCapacityError(Exception):
 
163
 
164
  # Run blocking constructors in a thread to keep the event loop responsive.
165
  # Without this, Session.__init__ → ContextManager → litellm.get_max_tokens()
166
+ # blocks all HTTP/SSE handling.
167
  import time as _time
168
 
169
  def _create_session_sync():
 
219
  event_queue: asyncio.Queue,
220
  tool_router: ToolRouter,
221
  ) -> None:
222
+ """Run the agent loop for a session and broadcast events via EventBroadcaster."""
223
  agent_session = self.sessions.get(session_id)
224
  if not agent_session:
225
  logger.error(f"Session {session_id} not found")
 
227
 
228
  session = agent_session.session
229
 
230
+ # Start event broadcaster task
231
+ broadcaster = EventBroadcaster(event_queue)
232
+ agent_session.broadcaster = broadcaster
233
+ broadcast_task = asyncio.create_task(broadcaster.run())
234
 
235
  try:
236
  async with tool_router:
 
260
  )
261
 
262
  finally:
263
+ broadcast_task.cancel()
264
  try:
265
+ await broadcast_task
266
  except asyncio.CancelledError:
267
  pass
268
 
 
274
 
275
  logger.info(f"Session {session_id} ended")
276
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
  async def submit(self, session_id: str, operation: Operation) -> bool:
278
  """Submit an operation to a session."""
279
  async with self._lock:
 
344
  if not agent_session:
345
  return False
346
 
 
 
347
  # Clean up sandbox Space before cancelling the task
348
  await self._cleanup_sandbox(agent_session.session)
349
 
backend/websocket.py DELETED
@@ -1,105 +0,0 @@
1
- """WebSocket connection manager for real-time communication."""
2
-
3
- import logging
4
- from collections import deque
5
- from typing import Any
6
-
7
- from fastapi import WebSocket
8
-
9
- logger = logging.getLogger(__name__)
10
-
11
- # Max events buffered per session while WS is disconnected.
12
- MAX_EVENT_BUFFER = 500
13
-
14
-
15
- class ConnectionManager:
16
- """Manages WebSocket connections for multiple sessions."""
17
-
18
- def __init__(self) -> None:
19
- # session_id -> WebSocket
20
- self.active_connections: dict[str, WebSocket] = {}
21
- # session_id -> events buffered while WS was disconnected
22
- self._event_buffers: dict[str, deque[dict[str, Any]]] = {}
23
-
24
- async def connect(self, websocket: WebSocket, session_id: str) -> bool:
25
- """Accept a WebSocket connection, register it, and flush buffered events.
26
-
27
- Returns True if buffered events were flushed (i.e. this is a reconnect
28
- that had missed events).
29
- """
30
- logger.info(f"Attempting to accept WebSocket for session {session_id}")
31
- await websocket.accept()
32
- self.active_connections[session_id] = websocket
33
-
34
- # Flush events that were buffered while the WS was disconnected
35
- buffered = self._event_buffers.pop(session_id, None)
36
- if buffered:
37
- logger.info(
38
- f"Flushing {len(buffered)} buffered events for session {session_id}"
39
- )
40
- for message in buffered:
41
- try:
42
- await websocket.send_json(message)
43
- except Exception:
44
- logger.error(
45
- f"Error flushing buffered event for session {session_id}"
46
- )
47
- break
48
-
49
- logger.info(f"WebSocket connected and registered for session {session_id}")
50
- return bool(buffered)
51
-
52
- def disconnect(self, session_id: str) -> None:
53
- """Remove a WebSocket connection."""
54
- if session_id in self.active_connections:
55
- del self.active_connections[session_id]
56
- logger.info(f"WebSocket disconnected for session {session_id}")
57
-
58
- def clear_buffer(self, session_id: str) -> None:
59
- """Clear the event buffer for a session (e.g. on session delete)."""
60
- self._event_buffers.pop(session_id, None)
61
-
62
- async def send_event(
63
- self, session_id: str, event_type: str, data: dict[str, Any] | None = None
64
- ) -> None:
65
- """Send an event to a specific session's WebSocket.
66
-
67
- If no WebSocket is connected, the event is buffered so it can be
68
- replayed when the client reconnects.
69
- """
70
- message: dict[str, Any] = {"event_type": event_type}
71
- if data is not None:
72
- message["data"] = data
73
-
74
- if session_id not in self.active_connections:
75
- buf = self._event_buffers.setdefault(
76
- session_id, deque(maxlen=MAX_EVENT_BUFFER)
77
- )
78
- buf.append(message)
79
- return
80
-
81
- try:
82
- await self.active_connections[session_id].send_json(message)
83
- except Exception as e:
84
- logger.error(f"Error sending to session {session_id}: {e}")
85
- self.disconnect(session_id)
86
- # Buffer the event that failed to send
87
- buf = self._event_buffers.setdefault(
88
- session_id, deque(maxlen=MAX_EVENT_BUFFER)
89
- )
90
- buf.append(message)
91
-
92
- async def broadcast(
93
- self, event_type: str, data: dict[str, Any] | None = None
94
- ) -> None:
95
- """Broadcast an event to all connected sessions."""
96
- for session_id in list(self.active_connections.keys()):
97
- await self.send_event(session_id, event_type, data)
98
-
99
- def is_connected(self, session_id: str) -> bool:
100
- """Check if a session has an active WebSocket connection."""
101
- return session_id in self.active_connections
102
-
103
-
104
- # Global connection manager instance
105
- manager = ConnectionManager()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
frontend/src/components/SessionChat.tsx CHANGED
@@ -2,8 +2,8 @@
2
  * Per-session chat component.
3
  *
4
  * Each session renders its own SessionChat. The hook (useAgentChat) always
5
- * runs — keeping the WebSocket alive and processing events — but only the
6
- * active session renders visible UI (MessageList + ChatInput).
7
  */
8
  import { useCallback, useEffect, useRef } from 'react';
9
  import { useAgentChat } from '@/hooks/useAgentChat';
@@ -25,7 +25,7 @@ export default function SessionChat({ sessionId, isActive, onSessionDead }: Sess
25
  const { isConnected, isProcessing, setProcessing, activityStatus } = useAgentStore();
26
  const { updateSessionTitle } = useSessionStore();
27
 
28
- const { messages, sendMessage, stop, undoLastTurn, approveTools, transport } = useAgentChat({
29
  sessionId,
30
  isActive,
31
  onReady: () => logger.log(`Session ${sessionId} ready`),
@@ -38,16 +38,10 @@ export default function SessionChat({ sessionId, isActive, onSessionDead }: Sess
38
  const prevActiveRef = useRef(isActive);
39
  useEffect(() => {
40
  if (isActive && !prevActiveRef.current) {
41
- // Force reconnect if WS is dead (e.g. retries exhausted while on another session)
42
- if (transport && !transport.isWebSocketConnected()) {
43
- transport.connectToSession(sessionId);
44
- }
45
-
46
  const store = useAgentStore.getState();
47
 
48
- // Sync WebSocket connection state
49
- const wsConnected = transport?.isWebSocketConnected() ?? false;
50
- store.setConnected(wsConnected);
51
 
52
  // Check if this session has pending approvals in its messages
53
  const lastAssistant = [...messages].reverse().find(m => m.role === 'assistant');
@@ -109,7 +103,7 @@ export default function SessionChat({ sessionId, isActive, onSessionDead }: Sess
109
  }
110
  }
111
  prevActiveRef.current = isActive;
112
- }, [isActive, messages, transport]);
113
 
114
  const handleSendMessage = useCallback(
115
  async (text: string) => {
 
2
  * Per-session chat component.
3
  *
4
  * Each session renders its own SessionChat. The hook (useAgentChat) always
5
+ * runs — processing events — but only the active session renders visible
6
+ * UI (MessageList + ChatInput).
7
  */
8
  import { useCallback, useEffect, useRef } from 'react';
9
  import { useAgentChat } from '@/hooks/useAgentChat';
 
25
  const { isConnected, isProcessing, setProcessing, activityStatus } = useAgentStore();
26
  const { updateSessionTitle } = useSessionStore();
27
 
28
+ const { messages, sendMessage, stop, undoLastTurn, approveTools } = useAgentChat({
29
  sessionId,
30
  isActive,
31
  onReady: () => logger.log(`Session ${sessionId} ready`),
 
38
  const prevActiveRef = useRef(isActive);
39
  useEffect(() => {
40
  if (isActive && !prevActiveRef.current) {
 
 
 
 
 
41
  const store = useAgentStore.getState();
42
 
43
+ // SSE transport has no persistent connection — always connected
44
+ store.setConnected(true);
 
45
 
46
  // Check if this session has pending approvals in its messages
47
  const lastAssistant = [...messages].reverse().find(m => m.role === 'assistant');
 
103
  }
104
  }
105
  prevActiveRef.current = isActive;
106
+ }, [isActive, messages]);
107
 
108
  const handleSendMessage = useCallback(
109
  async (text: string) => {
frontend/src/hooks/useAgentChat.ts CHANGED
@@ -1,6 +1,6 @@
1
  /**
2
- * Central hook wiring the Vercel AI SDK's useChat with our custom
3
- * WebSocketChatTransport.
4
  *
5
  * In the per-session architecture, each session mounts its own instance
6
  * of this hook. The `isActive` flag controls whether side-channel
@@ -9,8 +9,8 @@
9
  */
10
  import { useCallback, useEffect, useMemo, useRef } from 'react';
11
  import { useChat } from '@ai-sdk/react';
12
- import type { UIMessage } from 'ai';
13
- import { WebSocketChatTransport, type SideChannelCallbacks } from '@/lib/ws-chat-transport';
14
  import { loadMessages, saveMessages } from '@/lib/chat-message-store';
15
  import { llmMessagesToUIMessages } from '@/lib/convert-llm-messages';
16
  import { apiFetch } from '@/utils/api';
@@ -47,7 +47,6 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
47
  const { setSessionActive, setNeedsAttention } = useSessionStore();
48
 
49
  // -- Build side-channel callbacks (stable ref) --------------------------
50
- // These check isActiveRef to decide whether to update global UI state.
51
  const sideChannel = useMemo<SideChannelCallbacks>(
52
  () => ({
53
  onReady: () => {
@@ -83,19 +82,9 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
83
  }
84
  },
85
  onUndoComplete: () => {
 
 
86
  if (isActiveRef.current) setProcessing(false);
87
- // Remove the last turn (user msg + assistant response) from useChat state
88
- const setMsgs = chatActionsRef.current.setMessages;
89
- const msgs = chatActionsRef.current.messages;
90
- if (setMsgs && msgs.length > 0) {
91
- let lastUserIdx = -1;
92
- for (let i = msgs.length - 1; i >= 0; i--) {
93
- if (msgs[i].role === 'user') { lastUserIdx = i; break; }
94
- }
95
- const updated = lastUserIdx > 0 ? msgs.slice(0, lastUserIdx) : [];
96
- setMsgs(updated);
97
- saveMessages(sessionId, updated);
98
- }
99
  },
100
  onCompacted: (oldTokens: number, newTokens: number) => {
101
  logger.log(`Context compacted: ${oldTokens} -> ${newTokens} tokens`);
@@ -132,11 +121,7 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
132
  },
133
  onApprovalRequired: (tools) => {
134
  if (!tools.length) return;
135
-
136
- // Always mark the session as needing attention
137
  setNeedsAttention(sessionId, true);
138
-
139
- // Only update global UI if this is the active session
140
  if (!isActiveRef.current) return;
141
 
142
  setActivityStatus({ type: 'waiting-approval' });
@@ -199,30 +184,21 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
199
  if (isActiveRef.current) setActivityStatus({ type: 'tool', toolName, description });
200
  },
201
  }),
202
- // sessionId is the only real dependency — Zustand setters are stable
203
  // eslint-disable-next-line react-hooks/exhaustive-deps
204
  [sessionId],
205
  );
206
 
207
  // -- Create transport (one per session, stable for lifetime) ------------
208
- const transportRef = useRef<WebSocketChatTransport | null>(null);
209
  if (!transportRef.current) {
210
- transportRef.current = new WebSocketChatTransport({ sideChannel });
211
  }
212
 
213
- // Keep side-channel callbacks in sync (they capture isActiveRef)
214
  useEffect(() => {
215
  transportRef.current?.updateSideChannel(sideChannel);
216
  }, [sideChannel]);
217
 
218
- // Connect WebSocket on mount, disconnect on unmount
219
- useEffect(() => {
220
- transportRef.current?.connectToSession(sessionId);
221
- return () => {
222
- transportRef.current?.connectToSession(null);
223
- };
224
- }, [sessionId]);
225
-
226
  // Destroy transport on unmount
227
  useEffect(() => {
228
  return () => {
@@ -249,6 +225,10 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
249
  messages: initialMessages,
250
  transport: transportRef.current!,
251
  experimental_throttle: 80,
 
 
 
 
252
  onError: (error) => {
253
  logger.error('useChat error:', error);
254
  if (isActiveRef.current) {
@@ -267,15 +247,12 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
267
  let cancelled = false;
268
  (async () => {
269
  try {
270
- // Fetch messages and session info (for pending approval) in parallel
271
  const [msgsRes, infoRes] = await Promise.all([
272
  apiFetch(`/api/session/${sessionId}/messages`),
273
  apiFetch(`/api/session/${sessionId}`),
274
  ]);
275
-
276
  if (cancelled) return;
277
 
278
- // Extract pending approval tool IDs from session info
279
  let pendingIds: Set<string> | undefined;
280
  if (infoRes.ok) {
281
  const info = await infoRes.json();
@@ -315,25 +292,46 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
315
  }
316
  }, [sessionId, chat.messages]);
317
 
318
- // -- Undo last turn -----------------------------------------------------
 
 
 
319
  const undoLastTurn = useCallback(async () => {
320
  try {
321
  const res = await apiFetch(`/api/undo/${sessionId}`, { method: 'POST' });
322
  if (!res.ok) {
323
  logger.error('Undo API returned', res.status);
 
 
 
 
 
 
 
 
 
 
 
 
 
324
  }
 
325
  } catch (e) {
326
  logger.error('Undo failed:', e);
327
  }
328
- }, [sessionId]);
329
 
330
- // -- Approve tools via transport ----------------------------------------
331
  const approveTools = useCallback(
332
  async (approvals: Array<{ tool_call_id: string; approved: boolean; feedback?: string | null; edited_script?: string | null }>) => {
333
- if (!transportRef.current) return false;
 
 
 
 
 
334
 
335
- // Transition SDK tool state from approval-requested approval-responded
336
- // so the approval UI disappears immediately (survives session switches).
337
  for (const a of approvals) {
338
  chat.addToolApprovalResponse({
339
  id: `approval-${a.tool_call_id}`,
@@ -342,25 +340,26 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
342
  });
343
  }
344
 
345
- const ok = await transportRef.current.approveTools(sessionId, approvals);
346
- if (ok) {
347
- // Clear needsAttention since user has responded
348
- setNeedsAttention(sessionId, false);
349
- const hasApproved = approvals.some(a => a.approved);
350
- if (hasApproved && isActiveRef.current) setProcessing(true);
351
- }
352
- return ok;
353
  },
354
  [sessionId, chat, setProcessing, setNeedsAttention],
355
  );
356
 
 
 
 
 
 
 
357
  return {
358
  messages: chat.messages,
359
  sendMessage: chat.sendMessage,
360
- stop: chat.stop,
361
  status: chat.status,
362
  undoLastTurn,
363
  approveTools,
364
- transport: transportRef.current,
365
  };
366
  }
 
1
  /**
2
+ * Central hook wiring the Vercel AI SDK's useChat with our SSE-based
3
+ * ChatTransport.
4
  *
5
  * In the per-session architecture, each session mounts its own instance
6
  * of this hook. The `isActive` flag controls whether side-channel
 
9
  */
10
  import { useCallback, useEffect, useMemo, useRef } from 'react';
11
  import { useChat } from '@ai-sdk/react';
12
+ import { type UIMessage, lastAssistantMessageIsCompleteWithApprovalResponses } from 'ai';
13
+ import { SSEChatTransport, type SideChannelCallbacks } from '@/lib/sse-chat-transport';
14
  import { loadMessages, saveMessages } from '@/lib/chat-message-store';
15
  import { llmMessagesToUIMessages } from '@/lib/convert-llm-messages';
16
  import { apiFetch } from '@/utils/api';
 
47
  const { setSessionActive, setNeedsAttention } = useSessionStore();
48
 
49
  // -- Build side-channel callbacks (stable ref) --------------------------
 
50
  const sideChannel = useMemo<SideChannelCallbacks>(
51
  () => ({
52
  onReady: () => {
 
82
  }
83
  },
84
  onUndoComplete: () => {
85
+ // Undo is handled client-side in undoLastTurn(). With SSE, undo_complete
86
+ // events are discarded (no subscriber listening between turns).
87
  if (isActiveRef.current) setProcessing(false);
 
 
 
 
 
 
 
 
 
 
 
 
88
  },
89
  onCompacted: (oldTokens: number, newTokens: number) => {
90
  logger.log(`Context compacted: ${oldTokens} -> ${newTokens} tokens`);
 
121
  },
122
  onApprovalRequired: (tools) => {
123
  if (!tools.length) return;
 
 
124
  setNeedsAttention(sessionId, true);
 
 
125
  if (!isActiveRef.current) return;
126
 
127
  setActivityStatus({ type: 'waiting-approval' });
 
184
  if (isActiveRef.current) setActivityStatus({ type: 'tool', toolName, description });
185
  },
186
  }),
 
187
  // eslint-disable-next-line react-hooks/exhaustive-deps
188
  [sessionId],
189
  );
190
 
191
  // -- Create transport (one per session, stable for lifetime) ------------
192
+ const transportRef = useRef<SSEChatTransport | null>(null);
193
  if (!transportRef.current) {
194
+ transportRef.current = new SSEChatTransport(sessionId, sideChannel);
195
  }
196
 
197
+ // Keep side-channel callbacks in sync
198
  useEffect(() => {
199
  transportRef.current?.updateSideChannel(sideChannel);
200
  }, [sideChannel]);
201
 
 
 
 
 
 
 
 
 
202
  // Destroy transport on unmount
203
  useEffect(() => {
204
  return () => {
 
225
  messages: initialMessages,
226
  transport: transportRef.current!,
227
  experimental_throttle: 80,
228
+ // After all approval responses are set, auto-send to continue the agent loop.
229
+ // Without this, addToolApprovalResponse only updates the UI — it won't trigger
230
+ // sendMessages on the transport.
231
+ sendAutomaticallyWhen: lastAssistantMessageIsCompleteWithApprovalResponses,
232
  onError: (error) => {
233
  logger.error('useChat error:', error);
234
  if (isActiveRef.current) {
 
247
  let cancelled = false;
248
  (async () => {
249
  try {
 
250
  const [msgsRes, infoRes] = await Promise.all([
251
  apiFetch(`/api/session/${sessionId}/messages`),
252
  apiFetch(`/api/session/${sessionId}`),
253
  ]);
 
254
  if (cancelled) return;
255
 
 
256
  let pendingIds: Set<string> | undefined;
257
  if (infoRes.ok) {
258
  const info = await infoRes.json();
 
292
  }
293
  }, [sessionId, chat.messages]);
294
 
295
+ // -- Undo last turn (REST call + client-side message removal) -----------
296
+ // With SSE there's no persistent connection to receive the undo_complete
297
+ // event, so we handle message removal on the frontend after a successful
298
+ // REST call to the backend.
299
  const undoLastTurn = useCallback(async () => {
300
  try {
301
  const res = await apiFetch(`/api/undo/${sessionId}`, { method: 'POST' });
302
  if (!res.ok) {
303
  logger.error('Undo API returned', res.status);
304
+ return;
305
+ }
306
+ // Remove the last user turn + assistant response from the UI
307
+ const msgs = chatActionsRef.current.messages;
308
+ const setMsgs = chatActionsRef.current.setMessages;
309
+ if (setMsgs && msgs.length > 0) {
310
+ let lastUserIdx = -1;
311
+ for (let i = msgs.length - 1; i >= 0; i--) {
312
+ if (msgs[i].role === 'user') { lastUserIdx = i; break; }
313
+ }
314
+ const updated = lastUserIdx > 0 ? msgs.slice(0, lastUserIdx) : [];
315
+ setMsgs(updated);
316
+ saveMessages(sessionId, updated);
317
  }
318
+ if (isActiveRef.current) setProcessing(false);
319
  } catch (e) {
320
  logger.error('Undo failed:', e);
321
  }
322
+ }, [sessionId, setProcessing]);
323
 
324
+ // -- Approve tools ------------------------------------------------------
325
  const approveTools = useCallback(
326
  async (approvals: Array<{ tool_call_id: string; approved: boolean; feedback?: string | null; edited_script?: string | null }>) => {
327
+ // Store edited scripts so the transport can read them when sendMessages is called
328
+ for (const a of approvals) {
329
+ if (a.edited_script) {
330
+ useAgentStore.getState().setEditedScript(a.tool_call_id, a.edited_script);
331
+ }
332
+ }
333
 
334
+ // Update SDK tool state this triggers sendMessages() via the transport
 
335
  for (const a of approvals) {
336
  chat.addToolApprovalResponse({
337
  id: `approval-${a.tool_call_id}`,
 
340
  });
341
  }
342
 
343
+ setNeedsAttention(sessionId, false);
344
+ const hasApproved = approvals.some(a => a.approved);
345
+ if (hasApproved && isActiveRef.current) setProcessing(true);
346
+ return true;
 
 
 
 
347
  },
348
  [sessionId, chat, setProcessing, setNeedsAttention],
349
  );
350
 
351
+ // -- Stop (abort SSE stream + interrupt backend agent loop) ---------------
352
+ const stop = useCallback(() => {
353
+ chat.stop();
354
+ apiFetch(`/api/interrupt/${sessionId}`, { method: 'POST' }).catch(() => {});
355
+ }, [sessionId, chat]);
356
+
357
  return {
358
  messages: chat.messages,
359
  sendMessage: chat.sendMessage,
360
+ stop,
361
  status: chat.status,
362
  undoLastTurn,
363
  approveTools,
 
364
  };
365
  }
frontend/src/lib/sse-chat-transport.ts ADDED
@@ -0,0 +1,361 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ /**
2
+ * SSE-based ChatTransport that bridges our backend event protocol
3
+ * to the Vercel AI SDK's UIMessageChunk streaming interface.
4
+ *
5
+ * Each sendMessages() call does a POST → SSE response.
6
+ * One request per turn phase (initial message, or approval continuation).
7
+ */
8
+ import type { ChatTransport, UIMessage, UIMessageChunk, ChatRequestOptions } from 'ai';
9
+ import { apiFetch } from '@/utils/api';
10
+ import { logger } from '@/utils/logger';
11
+ import type { AgentEvent } from '@/types/events';
12
+ import { useAgentStore } from '@/store/agentStore';
13
+
14
+ // ---------------------------------------------------------------------------
15
+ // Side-channel callback interface (non-chat events forwarded to the store)
16
+ // ---------------------------------------------------------------------------
17
+ export interface SideChannelCallbacks {
18
+ onReady: () => void;
19
+ onShutdown: () => void;
20
+ onError: (error: string) => void;
21
+ onProcessing: () => void;
22
+ onProcessingDone: () => void;
23
+ onUndoComplete: () => void;
24
+ onCompacted: (oldTokens: number, newTokens: number) => void;
25
+ onPlanUpdate: (plan: Array<{ id: string; content: string; status: string }>) => void;
26
+ onToolLog: (tool: string, log: string) => void;
27
+ onConnectionChange: (connected: boolean) => void;
28
+ onSessionDead: (sessionId: string) => void;
29
+ onApprovalRequired: (tools: Array<{ tool: string; arguments: Record<string, unknown>; tool_call_id: string }>) => void;
30
+ onToolCallPanel: (tool: string, args: Record<string, unknown>) => void;
31
+ onToolOutputPanel: (tool: string, toolCallId: string, output: string, success: boolean) => void;
32
+ onStreaming: () => void;
33
+ onToolRunning: (toolName: string, description?: string) => void;
34
+ }
35
+
36
+ // ---------------------------------------------------------------------------
37
+ // Helpers
38
+ // ---------------------------------------------------------------------------
39
+ let partIdCounter = 0;
40
+ function nextPartId(prefix: string): string {
41
+ return `${prefix}-${Date.now()}-${++partIdCounter}`;
42
+ }
43
+
44
+ /** Parse an SSE text stream into AgentEvent objects. */
45
+ function createSSEParserStream(): TransformStream<string, AgentEvent> {
46
+ let buffer = '';
47
+ return new TransformStream<string, AgentEvent>({
48
+ transform(chunk, controller) {
49
+ buffer += chunk;
50
+ const lines = buffer.split('\n');
51
+ // Keep the last (possibly incomplete) line in the buffer
52
+ buffer = lines.pop() || '';
53
+ for (const line of lines) {
54
+ const trimmed = line.trim();
55
+ if (trimmed.startsWith('data: ')) {
56
+ try {
57
+ const json = JSON.parse(trimmed.slice(6));
58
+ controller.enqueue(json as AgentEvent);
59
+ } catch {
60
+ logger.warn('SSE parse error:', trimmed);
61
+ }
62
+ }
63
+ }
64
+ },
65
+ flush(controller) {
66
+ // Process any remaining data in buffer
67
+ if (buffer.trim().startsWith('data: ')) {
68
+ try {
69
+ const json = JSON.parse(buffer.trim().slice(6));
70
+ controller.enqueue(json as AgentEvent);
71
+ } catch { /* ignore incomplete */ }
72
+ }
73
+ },
74
+ });
75
+ }
76
+
77
+ /** Transform AgentEvent objects into UIMessageChunk objects for the Vercel AI SDK. */
78
+ function createEventToChunkStream(sideChannel: SideChannelCallbacks): TransformStream<AgentEvent, UIMessageChunk> {
79
+ let textPartId: string | null = null;
80
+
81
+ function endTextPart(controller: TransformStreamDefaultController<UIMessageChunk>) {
82
+ if (textPartId) {
83
+ controller.enqueue({ type: 'text-end', id: textPartId });
84
+ textPartId = null;
85
+ }
86
+ }
87
+
88
+ return new TransformStream<AgentEvent, UIMessageChunk>({
89
+ transform(event, controller) {
90
+ switch (event.event_type) {
91
+ // -- Side-channel only events ----------------------------------------
92
+ case 'ready':
93
+ sideChannel.onReady();
94
+ break;
95
+
96
+ case 'shutdown':
97
+ sideChannel.onShutdown();
98
+ break;
99
+
100
+ case 'interrupted':
101
+ endTextPart(controller);
102
+ controller.enqueue({ type: 'finish-step' });
103
+ controller.enqueue({ type: 'finish', finishReason: 'stop' });
104
+ sideChannel.onProcessingDone();
105
+ break;
106
+
107
+ case 'undo_complete':
108
+ endTextPart(controller);
109
+ sideChannel.onUndoComplete();
110
+ break;
111
+
112
+ case 'compacted':
113
+ sideChannel.onCompacted(
114
+ (event.data?.old_tokens as number) || 0,
115
+ (event.data?.new_tokens as number) || 0,
116
+ );
117
+ break;
118
+
119
+ case 'plan_update':
120
+ sideChannel.onPlanUpdate(
121
+ (event.data?.plan as Array<{ id: string; content: string; status: string }>) || [],
122
+ );
123
+ break;
124
+
125
+ case 'tool_log':
126
+ sideChannel.onToolLog(
127
+ (event.data?.tool as string) || '',
128
+ (event.data?.log as string) || '',
129
+ );
130
+ break;
131
+
132
+ // -- Chat stream events ----------------------------------------------
133
+ case 'processing':
134
+ sideChannel.onProcessing();
135
+ controller.enqueue({ type: 'start', messageMetadata: { createdAt: new Date().toISOString() } });
136
+ controller.enqueue({ type: 'start-step' });
137
+ break;
138
+
139
+ case 'assistant_chunk': {
140
+ const delta = (event.data?.content as string) || '';
141
+ if (!delta) break;
142
+ if (!textPartId) {
143
+ textPartId = nextPartId('text');
144
+ controller.enqueue({ type: 'text-start', id: textPartId });
145
+ sideChannel.onStreaming();
146
+ }
147
+ controller.enqueue({ type: 'text-delta', id: textPartId, delta });
148
+ break;
149
+ }
150
+
151
+ case 'assistant_stream_end':
152
+ endTextPart(controller);
153
+ break;
154
+
155
+ case 'assistant_message': {
156
+ const content = (event.data?.content as string) || '';
157
+ if (!content) break;
158
+ const id = nextPartId('text');
159
+ controller.enqueue({ type: 'text-start', id });
160
+ controller.enqueue({ type: 'text-delta', id, delta: content });
161
+ controller.enqueue({ type: 'text-end', id });
162
+ break;
163
+ }
164
+
165
+ case 'tool_call': {
166
+ const toolName = (event.data?.tool as string) || 'unknown';
167
+ const toolCallId = (event.data?.tool_call_id as string) || '';
168
+ const args = (event.data?.arguments as Record<string, unknown>) || {};
169
+ if (toolName === 'plan_tool') break;
170
+
171
+ endTextPart(controller);
172
+ controller.enqueue({ type: 'tool-input-start', toolCallId, toolName, dynamic: true });
173
+ controller.enqueue({ type: 'tool-input-available', toolCallId, toolName, input: args, dynamic: true });
174
+
175
+ sideChannel.onToolRunning(toolName, (args as Record<string, unknown>)?.description as string | undefined);
176
+ sideChannel.onToolCallPanel(toolName, args as Record<string, unknown>);
177
+ break;
178
+ }
179
+
180
+ case 'tool_output': {
181
+ const toolCallId = (event.data?.tool_call_id as string) || '';
182
+ const output = (event.data?.output as string) || '';
183
+ const success = event.data?.success as boolean;
184
+ const toolName = (event.data?.tool as string) || '';
185
+ if (toolName === 'plan_tool' || toolCallId.startsWith('plan_tool')) break;
186
+
187
+ if (success) {
188
+ controller.enqueue({ type: 'tool-output-available', toolCallId, output, dynamic: true });
189
+ } else {
190
+ controller.enqueue({ type: 'tool-output-error', toolCallId, errorText: output, dynamic: true });
191
+ }
192
+ sideChannel.onToolOutputPanel(toolName, toolCallId, output, success);
193
+ break;
194
+ }
195
+
196
+ case 'approval_required': {
197
+ const tools = event.data?.tools as Array<{
198
+ tool: string;
199
+ arguments: Record<string, unknown>;
200
+ tool_call_id: string;
201
+ }>;
202
+ if (!tools) break;
203
+
204
+ endTextPart(controller);
205
+ for (const t of tools) {
206
+ controller.enqueue({ type: 'tool-input-start', toolCallId: t.tool_call_id, toolName: t.tool, dynamic: true });
207
+ controller.enqueue({ type: 'tool-input-available', toolCallId: t.tool_call_id, toolName: t.tool, input: t.arguments, dynamic: true });
208
+ controller.enqueue({ type: 'tool-approval-request', approvalId: `approval-${t.tool_call_id}`, toolCallId: t.tool_call_id });
209
+ }
210
+ sideChannel.onApprovalRequired(tools);
211
+ // DON'T emit finish here — the stream will close naturally and the SDK
212
+ // will see there's a pending approval. The SDK calls sendMessages again
213
+ // after addToolApprovalResponse.
214
+ break;
215
+ }
216
+
217
+ case 'tool_state_change': {
218
+ const tcId = (event.data?.tool_call_id as string) || '';
219
+ const state = (event.data?.state as string) || '';
220
+ const toolName = (event.data?.tool as string) || '';
221
+ const jobUrl = (event.data?.jobUrl as string) || undefined;
222
+
223
+ if (tcId.startsWith('plan_tool')) break;
224
+
225
+ if (jobUrl && tcId) {
226
+ useAgentStore.getState().setJobUrl(tcId, jobUrl);
227
+ }
228
+ if (state === 'running' && toolName) {
229
+ sideChannel.onToolRunning(toolName);
230
+ }
231
+ if (state === 'rejected' || state === 'abandoned') {
232
+ controller.enqueue({ type: 'tool-output-denied', toolCallId: tcId });
233
+ }
234
+ break;
235
+ }
236
+
237
+ case 'turn_complete':
238
+ endTextPart(controller);
239
+ controller.enqueue({ type: 'finish-step' });
240
+ controller.enqueue({ type: 'finish', finishReason: 'stop' });
241
+ sideChannel.onProcessingDone();
242
+ break;
243
+
244
+ case 'error': {
245
+ const errorMsg = (event.data?.error as string) || 'Unknown error';
246
+ sideChannel.onError(errorMsg);
247
+ controller.enqueue({ type: 'error', errorText: errorMsg });
248
+ sideChannel.onProcessingDone();
249
+ break;
250
+ }
251
+
252
+ default:
253
+ logger.log('SSE transport: unknown event', event);
254
+ }
255
+ },
256
+ });
257
+ }
258
+
259
+ // ---------------------------------------------------------------------------
260
+ // Transport implementation
261
+ // ---------------------------------------------------------------------------
262
+ export class SSEChatTransport implements ChatTransport<UIMessage> {
263
+ private sessionId: string;
264
+ private sideChannel: SideChannelCallbacks;
265
+
266
+ constructor(sessionId: string, sideChannel: SideChannelCallbacks) {
267
+ this.sessionId = sessionId;
268
+ this.sideChannel = sideChannel;
269
+ // Mark as connected immediately — no persistent connection to establish
270
+ sideChannel.onConnectionChange(true);
271
+ }
272
+
273
+ updateSideChannel(sideChannel: SideChannelCallbacks): void {
274
+ this.sideChannel = sideChannel;
275
+ }
276
+
277
+ destroy(): void {
278
+ // Nothing to clean up — no persistent connections
279
+ }
280
+
281
+ // -- ChatTransport interface ---------------------------------------------
282
+
283
+ async sendMessages(
284
+ options: {
285
+ trigger: 'submit-message' | 'regenerate-message';
286
+ chatId: string;
287
+ messageId: string | undefined;
288
+ messages: UIMessage[];
289
+ abortSignal: AbortSignal | undefined;
290
+ } & ChatRequestOptions,
291
+ ): Promise<ReadableStream<UIMessageChunk>> {
292
+ const sessionId = this.sessionId;
293
+
294
+ // Detect: is this an approval continuation or a new user message?
295
+ // After addToolApprovalResponse, the SDK calls sendMessages again.
296
+ // The last assistant message will have tool parts in 'approval-responded' state.
297
+ const lastAssistant = [...options.messages].reverse().find(m => m.role === 'assistant');
298
+ const approvedParts = lastAssistant?.parts.filter(
299
+ (p) => p.type === 'dynamic-tool' && p.state === 'approval-responded'
300
+ ) || [];
301
+
302
+ let body: Record<string, unknown>;
303
+ if (approvedParts.length > 0) {
304
+ // Approval continuation — extract approval decisions
305
+ const approvals = approvedParts.map((p) => {
306
+ if (p.type !== 'dynamic-tool') return null;
307
+ const approved = p.approval?.approved ?? true;
308
+ // Get edited script from agentStore if available
309
+ const editedScript = useAgentStore.getState().getEditedScript(p.toolCallId);
310
+ return {
311
+ tool_call_id: p.toolCallId,
312
+ approved,
313
+ feedback: approved ? null : (p.approval?.reason || 'Rejected by user'),
314
+ edited_script: editedScript ?? null,
315
+ };
316
+ }).filter(Boolean);
317
+ body = { approvals };
318
+ } else {
319
+ // Normal user message
320
+ const lastUserMsg = [...options.messages].reverse().find(m => m.role === 'user');
321
+ const text = lastUserMsg
322
+ ? lastUserMsg.parts
323
+ .filter((p): p is Extract<typeof p, { type: 'text' }> => p.type === 'text')
324
+ .map(p => p.text)
325
+ .join('')
326
+ : '';
327
+ body = { text };
328
+ }
329
+
330
+ // POST to SSE endpoint
331
+ const response = await apiFetch(`/api/chat/${sessionId}`, {
332
+ method: 'POST',
333
+ body: JSON.stringify(body),
334
+ signal: options.abortSignal,
335
+ headers: {
336
+ 'Content-Type': 'application/json',
337
+ 'Accept': 'text/event-stream',
338
+ },
339
+ });
340
+
341
+ if (!response.ok) {
342
+ const errorText = await response.text().catch(() => 'Request failed');
343
+ throw new Error(`Chat request failed: ${response.status} ${errorText}`);
344
+ }
345
+
346
+ if (!response.body) {
347
+ throw new Error('No response body');
348
+ }
349
+
350
+ // Pipe: response bytes → text → SSE events → UIMessageChunks
351
+ return response.body
352
+ .pipeThrough(new TextDecoderStream())
353
+ .pipeThrough(createSSEParserStream())
354
+ .pipeThrough(createEventToChunkStream(this.sideChannel));
355
+ }
356
+
357
+ async reconnectToStream(): Promise<ReadableStream<UIMessageChunk> | null> {
358
+ // Not needed — each turn is a separate request
359
+ return null;
360
+ }
361
+ }
frontend/src/lib/ws-chat-transport.ts DELETED
@@ -1,599 +0,0 @@
1
- /**
2
- * Custom ChatTransport that bridges our WebSocket-based backend protocol
3
- * to the Vercel AI SDK's UIMessageChunk streaming interface.
4
- *
5
- * Each instance manages a single session's WebSocket connection.
6
- * In the per-session architecture, every session owns its own transport.
7
- */
8
- import type { ChatTransport, UIMessage, UIMessageChunk, ChatRequestOptions } from 'ai';
9
- import { apiFetch, getWebSocketUrl } from '@/utils/api';
10
- import { logger } from '@/utils/logger';
11
- import type { AgentEvent } from '@/types/events';
12
- import { useAgentStore } from '@/store/agentStore';
13
-
14
- // ---------------------------------------------------------------------------
15
- // Side-channel callback interface (non-chat events forwarded to the store)
16
- // ---------------------------------------------------------------------------
17
- export interface SideChannelCallbacks {
18
- onReady: () => void;
19
- onShutdown: () => void;
20
- onError: (error: string) => void;
21
- onProcessing: () => void;
22
- onProcessingDone: () => void;
23
- onUndoComplete: () => void;
24
- onCompacted: (oldTokens: number, newTokens: number) => void;
25
- onPlanUpdate: (plan: Array<{ id: string; content: string; status: string }>) => void;
26
- onToolLog: (tool: string, log: string) => void;
27
- onConnectionChange: (connected: boolean) => void;
28
- onSessionDead: (sessionId: string) => void;
29
- /** Called when approval_required arrives — lets the store manage panels */
30
- onApprovalRequired: (tools: Array<{ tool: string; arguments: Record<string, unknown>; tool_call_id: string }>) => void;
31
- /** Called when a tool_call arrives with panel-relevant args */
32
- onToolCallPanel: (tool: string, args: Record<string, unknown>) => void;
33
- /** Called when tool_output arrives with panel-relevant data */
34
- onToolOutputPanel: (tool: string, toolCallId: string, output: string, success: boolean) => void;
35
- /** Called when assistant text starts streaming */
36
- onStreaming: () => void;
37
- /** Called when a tool starts running (non-plan) */
38
- onToolRunning: (toolName: string, description?: string) => void;
39
- }
40
-
41
- // ---------------------------------------------------------------------------
42
- // Transport options
43
- // ---------------------------------------------------------------------------
44
- export interface WebSocketChatTransportOptions {
45
- sideChannel: SideChannelCallbacks;
46
- }
47
-
48
- // ---------------------------------------------------------------------------
49
- // Constants
50
- // ---------------------------------------------------------------------------
51
- const WS_RECONNECT_DELAY = 1000;
52
- const WS_MAX_RECONNECT_DELAY = 30000;
53
- const WS_MAX_RETRIES = 5;
54
- const WS_PING_INTERVAL = 30000;
55
-
56
- let partIdCounter = 0;
57
- function nextPartId(prefix: string): string {
58
- return `${prefix}-${Date.now()}-${++partIdCounter}`;
59
- }
60
-
61
- // ---------------------------------------------------------------------------
62
- // Transport implementation
63
- // ---------------------------------------------------------------------------
64
- export class WebSocketChatTransport implements ChatTransport<UIMessage> {
65
- private ws: WebSocket | null = null;
66
- private currentSessionId: string | null = null;
67
- private sideChannel: SideChannelCallbacks;
68
-
69
- private streamController: ReadableStreamDefaultController<UIMessageChunk> | null = null;
70
- private streamGeneration = 0;
71
- private abortedGeneration = 0;
72
- private textPartId: string | null = null;
73
- private awaitingProcessing = false;
74
-
75
- private connectTimeout: ReturnType<typeof setTimeout> | null = null;
76
- private reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
77
- private reconnectDelay = WS_RECONNECT_DELAY;
78
- private retries = 0;
79
- private pingInterval: ReturnType<typeof setInterval> | null = null;
80
- private boundVisibilityHandler: (() => void) | null = null;
81
-
82
- constructor({ sideChannel }: WebSocketChatTransportOptions) {
83
- this.sideChannel = sideChannel;
84
- this.setupVisibilityHandler();
85
- }
86
-
87
- private setupVisibilityHandler(): void {
88
- this.boundVisibilityHandler = () => {
89
- if (document.visibilityState === 'hidden') {
90
- return;
91
- }
92
-
93
- if (document.visibilityState === 'visible' && this.currentSessionId) {
94
- const wsState = this.ws?.readyState;
95
- if (wsState !== WebSocket.OPEN && wsState !== WebSocket.CONNECTING) {
96
- logger.log('Tab visible: WS is dead, reconnecting immediately');
97
- this.retries = 0;
98
- this.reconnectDelay = WS_RECONNECT_DELAY;
99
- this.createWebSocket(this.currentSessionId);
100
- } else if (wsState === WebSocket.OPEN) {
101
- this.ws!.send(JSON.stringify({ type: 'ping' }));
102
- }
103
- }
104
- };
105
- document.addEventListener('visibilitychange', this.boundVisibilityHandler);
106
- }
107
-
108
- /** Update side-channel callbacks (e.g. when isActive changes). */
109
- updateSideChannel(sideChannel: SideChannelCallbacks): void {
110
- this.sideChannel = sideChannel;
111
- }
112
-
113
- /** Check if the WebSocket is currently connected. */
114
- isWebSocketConnected(): boolean {
115
- return this.ws?.readyState === WebSocket.OPEN;
116
- }
117
-
118
- // -- Public API ----------------------------------------------------------
119
-
120
- /** Connect (or reconnect) to a session's WebSocket. */
121
- connectToSession(sessionId: string | null): void {
122
- if (this.connectTimeout) {
123
- clearTimeout(this.connectTimeout);
124
- this.connectTimeout = null;
125
- }
126
-
127
- // Same session — no-op
128
- if (sessionId === this.currentSessionId && this.ws?.readyState === WebSocket.OPEN) {
129
- return;
130
- }
131
-
132
- this.disconnectWebSocket();
133
- this.currentSessionId = sessionId;
134
-
135
- if (sessionId) {
136
- this.retries = 0;
137
- this.reconnectDelay = WS_RECONNECT_DELAY;
138
- this.connectTimeout = setTimeout(() => {
139
- this.connectTimeout = null;
140
- if (this.currentSessionId === sessionId) {
141
- this.createWebSocket(sessionId);
142
- }
143
- }, 100);
144
- }
145
- }
146
-
147
- /** Approve / reject tools. Called directly from the UI. */
148
- async approveTools(
149
- sessionId: string,
150
- approvals: Array<{ tool_call_id: string; approved: boolean; feedback?: string | null; edited_script?: string | null }>,
151
- ): Promise<boolean> {
152
- try {
153
- const res = await apiFetch('/api/approve', {
154
- method: 'POST',
155
- body: JSON.stringify({ session_id: sessionId, approvals }),
156
- });
157
- return res.ok;
158
- } catch (e) {
159
- logger.error('Approval request failed:', e);
160
- return false;
161
- }
162
- }
163
-
164
- /** Clean up everything. */
165
- destroy(): void {
166
- if (this.connectTimeout) {
167
- clearTimeout(this.connectTimeout);
168
- this.connectTimeout = null;
169
- }
170
- if (this.boundVisibilityHandler) {
171
- document.removeEventListener('visibilitychange', this.boundVisibilityHandler);
172
- this.boundVisibilityHandler = null;
173
- }
174
- this.disconnectWebSocket();
175
- this.closeActiveStream();
176
- }
177
-
178
- // -- ChatTransport interface ---------------------------------------------
179
-
180
- async sendMessages(
181
- options: {
182
- trigger: 'submit-message' | 'regenerate-message';
183
- chatId: string;
184
- messageId: string | undefined;
185
- messages: UIMessage[];
186
- abortSignal: AbortSignal | undefined;
187
- } & ChatRequestOptions,
188
- ): Promise<ReadableStream<UIMessageChunk>> {
189
- const sessionId = options.chatId;
190
-
191
- // Close any previously active stream (e.g. user sent new msg during approval)
192
- this.closeActiveStream();
193
-
194
- // Track generation to protect against late cancel from a stale stream
195
- const gen = ++this.streamGeneration;
196
- logger.log(`sendMessages: gen=${gen}, awaitingProcessing=${this.awaitingProcessing}, abortedGen=${this.abortedGeneration}`);
197
-
198
- // Wire up abort signal to interrupt the backend and close the stream
199
- if (options.abortSignal) {
200
- const onAbort = () => {
201
- if (this.streamGeneration !== gen) return;
202
- logger.log(`Stream aborted by user (gen=${gen})`);
203
- this.interruptBackend(sessionId);
204
- this.endTextPart();
205
- if (this.streamController) {
206
- this.enqueue({ type: 'finish-step' });
207
- this.enqueue({ type: 'finish', finishReason: 'stop' });
208
- this.closeActiveStream();
209
- }
210
- this.awaitingProcessing = true;
211
- this.abortedGeneration = this.streamGeneration;
212
- logger.log(`Abort complete: awaitingProcessing=true, abortedGen=${this.abortedGeneration}`);
213
- this.sideChannel.onProcessingDone();
214
- };
215
- if (options.abortSignal.aborted) {
216
- onAbort();
217
- } else {
218
- options.abortSignal.addEventListener('abort', onAbort, { once: true });
219
- }
220
- }
221
-
222
- // Create the stream BEFORE the POST so WebSocket events arriving
223
- // while the HTTP request is in-flight are captured immediately.
224
- const stream = new ReadableStream<UIMessageChunk>({
225
- start: (controller) => {
226
- this.streamController = controller;
227
- this.textPartId = null;
228
- },
229
- cancel: () => {
230
- if (this.streamGeneration === gen) {
231
- this.streamController = null;
232
- this.textPartId = null;
233
- }
234
- },
235
- });
236
-
237
- // Extract the latest user text from the messages array
238
- const lastUserMsg = [...options.messages].reverse().find(m => m.role === 'user');
239
- const text = lastUserMsg
240
- ? lastUserMsg.parts
241
- .filter((p): p is Extract<typeof p, { type: 'text' }> => p.type === 'text')
242
- .map(p => p.text)
243
- .join('')
244
- : '';
245
-
246
- // POST to the existing backend endpoint
247
- try {
248
- await apiFetch('/api/submit', {
249
- method: 'POST',
250
- body: JSON.stringify({ session_id: sessionId, text }),
251
- });
252
- } catch (e) {
253
- logger.error('Submit failed:', e);
254
- this.enqueue({ type: 'error', errorText: 'Failed to send message' });
255
- this.closeActiveStream();
256
- }
257
-
258
- return stream;
259
- }
260
-
261
- async reconnectToStream(): Promise<ReadableStream<UIMessageChunk> | null> {
262
- return null;
263
- }
264
-
265
- /** Ask the backend to interrupt the current generation. Fire-and-forget. */
266
- private interruptBackend(sessionId: string): void {
267
- apiFetch(`/api/interrupt/${sessionId}`, { method: 'POST' }).catch((e) =>
268
- logger.warn('Interrupt request failed:', e),
269
- );
270
- }
271
-
272
- // -- WebSocket lifecycle -------------------------------------------------
273
-
274
- private createWebSocket(sessionId: string): void {
275
- if (this.ws?.readyState === WebSocket.OPEN || this.ws?.readyState === WebSocket.CONNECTING) {
276
- return;
277
- }
278
-
279
- const wsUrl = getWebSocketUrl(sessionId);
280
- logger.log('WS transport connecting:', wsUrl);
281
- const ws = new WebSocket(wsUrl);
282
-
283
- ws.onopen = () => {
284
- logger.log('WS transport connected');
285
- this.sideChannel.onConnectionChange(true);
286
- this.reconnectDelay = WS_RECONNECT_DELAY;
287
- this.retries = 0;
288
- this.startPing();
289
- };
290
-
291
- ws.onmessage = (evt) => {
292
- try {
293
- const raw = JSON.parse(evt.data);
294
- if (raw.type === 'pong') return;
295
- this.handleEvent(raw as AgentEvent);
296
- } catch (e) {
297
- logger.error('WS parse error:', e);
298
- }
299
- };
300
-
301
- ws.onerror = (err) => logger.error('WS error:', err);
302
-
303
- ws.onclose = (evt) => {
304
- logger.log('WS closed', evt.code, evt.reason);
305
- this.sideChannel.onConnectionChange(false);
306
- this.stopPing();
307
-
308
- const noRetry = [1000, 4001, 4003, 4004];
309
- if (evt.code === 4004 && sessionId) {
310
- this.sideChannel.onSessionDead(sessionId);
311
- return;
312
- }
313
- if (!noRetry.includes(evt.code) && this.currentSessionId === sessionId) {
314
- this.retries += 1;
315
- if (this.retries > WS_MAX_RETRIES) {
316
- logger.warn('WS max retries reached, will reconnect on session switch');
317
- return;
318
- }
319
- this.reconnectTimeout = setTimeout(() => {
320
- this.reconnectDelay = Math.min(this.reconnectDelay * 2, WS_MAX_RECONNECT_DELAY);
321
- this.createWebSocket(sessionId);
322
- }, this.reconnectDelay);
323
- }
324
- };
325
-
326
- this.ws = ws;
327
- }
328
-
329
- private disconnectWebSocket(): void {
330
- if (this.reconnectTimeout) {
331
- clearTimeout(this.reconnectTimeout);
332
- this.reconnectTimeout = null;
333
- }
334
- this.stopPing();
335
- if (this.ws) {
336
- this.ws.close();
337
- this.ws = null;
338
- }
339
- this.sideChannel.onConnectionChange(false);
340
- }
341
-
342
- private startPing(): void {
343
- this.stopPing();
344
- this.pingInterval = setInterval(() => {
345
- if (this.ws?.readyState === WebSocket.OPEN) {
346
- this.ws.send(JSON.stringify({ type: 'ping' }));
347
- }
348
- }, WS_PING_INTERVAL);
349
- }
350
-
351
- private stopPing(): void {
352
- if (this.pingInterval) {
353
- clearInterval(this.pingInterval);
354
- this.pingInterval = null;
355
- }
356
- }
357
-
358
- // -- Stream helpers ------------------------------------------------------
359
-
360
- private closeActiveStream(): void {
361
- if (this.streamController) {
362
- try {
363
- this.streamController.close();
364
- } catch {
365
- // already closed
366
- }
367
- this.streamController = null;
368
- this.textPartId = null;
369
- }
370
- }
371
-
372
- private enqueue(chunk: UIMessageChunk): void {
373
- try {
374
- this.streamController?.enqueue(chunk);
375
- } catch {
376
- // stream already closed
377
- }
378
- }
379
-
380
- private endTextPart(): void {
381
- if (this.textPartId) {
382
- this.enqueue({ type: 'text-end', id: this.textPartId });
383
- this.textPartId = null;
384
- }
385
- }
386
-
387
- // -- Event -> UIMessageChunk mapping ------------------------------------
388
-
389
- private static readonly STREAM_EVENTS = new Set([
390
- 'assistant_chunk', 'assistant_stream_end', 'assistant_message',
391
- 'tool_call', 'tool_output', 'approval_required', 'tool_state_change',
392
- 'turn_complete', 'error',
393
- ]);
394
-
395
- private handleEvent(event: AgentEvent): void {
396
- // After an abort, ignore stale stream events until the next 'processing'
397
- if (this.awaitingProcessing && WebSocketChatTransport.STREAM_EVENTS.has(event.event_type)) {
398
- logger.log(`Filtering stale "${event.event_type}" (gen=${this.streamGeneration}, aborted=${this.abortedGeneration})`);
399
- return;
400
- }
401
-
402
- switch (event.event_type) {
403
- // -- Side-channel only events ----------------------------------------
404
- case 'ready':
405
- if (this.streamController) {
406
- // Reconnect during an active turn — don't reset processing state,
407
- // just signal that the connection is back.
408
- this.sideChannel.onConnectionChange(true);
409
- } else {
410
- this.sideChannel.onReady();
411
- }
412
- break;
413
-
414
- case 'shutdown':
415
- this.sideChannel.onShutdown();
416
- this.closeActiveStream();
417
- break;
418
-
419
- case 'interrupted':
420
- this.sideChannel.onProcessingDone();
421
- break;
422
-
423
- case 'undo_complete':
424
- this.endTextPart();
425
- this.closeActiveStream();
426
- this.sideChannel.onUndoComplete();
427
- break;
428
-
429
- case 'compacted':
430
- this.sideChannel.onCompacted(
431
- (event.data?.old_tokens as number) || 0,
432
- (event.data?.new_tokens as number) || 0,
433
- );
434
- break;
435
-
436
- case 'plan_update':
437
- this.sideChannel.onPlanUpdate(
438
- (event.data?.plan as Array<{ id: string; content: string; status: string }>) || [],
439
- );
440
- break;
441
-
442
- case 'tool_log':
443
- this.sideChannel.onToolLog(
444
- (event.data?.tool as string) || '',
445
- (event.data?.log as string) || '',
446
- );
447
- break;
448
-
449
- // -- Chat stream events ----------------------------------------------
450
- case 'processing':
451
- if (this.awaitingProcessing) {
452
- if (this.streamGeneration <= this.abortedGeneration) {
453
- logger.log(`Ignoring stale "processing" (gen=${this.streamGeneration} <= aborted=${this.abortedGeneration})`);
454
- break;
455
- }
456
- logger.log(`Accepting "processing" for new generation (gen=${this.streamGeneration}, aborted=${this.abortedGeneration})`);
457
- this.awaitingProcessing = false;
458
- }
459
- this.sideChannel.onProcessing();
460
- if (this.streamController) {
461
- this.enqueue({
462
- type: 'start',
463
- messageMetadata: { createdAt: new Date().toISOString() },
464
- });
465
- this.enqueue({ type: 'start-step' });
466
- }
467
- break;
468
-
469
- case 'assistant_chunk': {
470
- const delta = (event.data?.content as string) || '';
471
- if (!delta || !this.streamController) break;
472
-
473
- if (!this.textPartId) {
474
- this.textPartId = nextPartId('text');
475
- this.enqueue({ type: 'text-start', id: this.textPartId });
476
- this.sideChannel.onStreaming();
477
- }
478
- this.enqueue({ type: 'text-delta', id: this.textPartId, delta });
479
- break;
480
- }
481
-
482
- case 'assistant_stream_end':
483
- this.endTextPart();
484
- break;
485
-
486
- case 'assistant_message': {
487
- const content = (event.data?.content as string) || '';
488
- if (!content || !this.streamController) break;
489
- const id = nextPartId('text');
490
- this.enqueue({ type: 'text-start', id });
491
- this.enqueue({ type: 'text-delta', id, delta: content });
492
- this.enqueue({ type: 'text-end', id });
493
- break;
494
- }
495
-
496
- case 'tool_call': {
497
- if (!this.streamController) break;
498
- const toolName = (event.data?.tool as string) || 'unknown';
499
- const toolCallId = (event.data?.tool_call_id as string) || '';
500
- const args = (event.data?.arguments as Record<string, unknown>) || {};
501
-
502
- if (toolName === 'plan_tool') break;
503
-
504
- this.endTextPart();
505
- this.enqueue({ type: 'tool-input-start', toolCallId, toolName, dynamic: true });
506
- this.enqueue({ type: 'tool-input-available', toolCallId, toolName, input: args, dynamic: true });
507
-
508
- this.sideChannel.onToolRunning(toolName, (args as Record<string, unknown>)?.description as string | undefined);
509
- this.sideChannel.onToolCallPanel(toolName, args as Record<string, unknown>);
510
- break;
511
- }
512
-
513
- case 'tool_output': {
514
- if (!this.streamController) break;
515
- const toolCallId = (event.data?.tool_call_id as string) || '';
516
- const output = (event.data?.output as string) || '';
517
- const success = event.data?.success as boolean;
518
- const toolName = (event.data?.tool as string) || '';
519
-
520
- if (toolName === 'plan_tool' || toolCallId.startsWith('plan_tool')) break;
521
-
522
- if (success) {
523
- this.enqueue({ type: 'tool-output-available', toolCallId, output, dynamic: true });
524
- } else {
525
- this.enqueue({ type: 'tool-output-error', toolCallId, errorText: output, dynamic: true });
526
- }
527
-
528
- this.sideChannel.onToolOutputPanel(toolName, toolCallId, output, success);
529
- break;
530
- }
531
-
532
- case 'approval_required': {
533
- const tools = event.data?.tools as Array<{
534
- tool: string;
535
- arguments: Record<string, unknown>;
536
- tool_call_id: string;
537
- }>;
538
- if (!tools || !this.streamController) break;
539
-
540
- this.endTextPart();
541
-
542
- for (const t of tools) {
543
- this.enqueue({ type: 'tool-input-start', toolCallId: t.tool_call_id, toolName: t.tool, dynamic: true });
544
- this.enqueue({ type: 'tool-input-available', toolCallId: t.tool_call_id, toolName: t.tool, input: t.arguments, dynamic: true });
545
- this.enqueue({ type: 'tool-approval-request', approvalId: `approval-${t.tool_call_id}`, toolCallId: t.tool_call_id });
546
- }
547
-
548
- this.sideChannel.onApprovalRequired(tools);
549
- this.sideChannel.onProcessingDone();
550
- break;
551
- }
552
-
553
- case 'tool_state_change': {
554
- const tcId = (event.data?.tool_call_id as string) || '';
555
- const state = (event.data?.state as string) || '';
556
- const toolName = (event.data?.tool as string) || '';
557
- const jobUrl = (event.data?.jobUrl as string) || undefined;
558
-
559
- if (tcId.startsWith('plan_tool')) break;
560
-
561
- if (jobUrl && tcId) {
562
- useAgentStore.getState().setJobUrl(tcId, jobUrl);
563
- }
564
-
565
- if (state === 'running' && toolName) {
566
- this.sideChannel.onToolRunning(toolName);
567
- }
568
-
569
- if (this.streamController && (state === 'rejected' || state === 'abandoned')) {
570
- this.enqueue({ type: 'tool-output-denied', toolCallId: tcId });
571
- }
572
- break;
573
- }
574
-
575
- case 'turn_complete':
576
- this.endTextPart();
577
- if (this.streamController) {
578
- this.enqueue({ type: 'finish-step' });
579
- this.enqueue({ type: 'finish', finishReason: 'stop' });
580
- this.closeActiveStream();
581
- }
582
- this.sideChannel.onProcessingDone();
583
- break;
584
-
585
- case 'error': {
586
- const errorMsg = (event.data?.error as string) || 'Unknown error';
587
- this.sideChannel.onError(errorMsg);
588
- if (this.streamController) {
589
- this.enqueue({ type: 'error', errorText: errorMsg });
590
- }
591
- this.sideChannel.onProcessingDone();
592
- break;
593
- }
594
-
595
- default:
596
- logger.log('WS transport: unknown event', event);
597
- }
598
- }
599
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
frontend/src/utils/api.ts CHANGED
@@ -38,10 +38,4 @@ export async function apiFetch(
38
  }
39
 
40
  return response;
41
- }
42
-
43
- /** Build the WebSocket URL for a session. */
44
- export function getWebSocketUrl(sessionId: string): string {
45
- const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
46
- return `${protocol}//${window.location.host}/api/ws/${sessionId}`;
47
- }
 
38
  }
39
 
40
  return response;
41
+ }