akseljoonas HF Staff Claude Opus 4.6 commited on
Commit
47d6895
·
1 Parent(s): aa0f54a

fix: live event streaming after reconnection + LiteLLM timeout

Browse files

Frontend: visibility change handler now re-hydrates messages, opens a
live event stream via GET /api/events, and polls messages every 3s to
keep the chat in sync. Terminal events trigger a final hydration and
clean up both the stream and poll.

Backend: add timeout=600 to litellm acompletion() call — long tool-use
turns were being killed by the default timeout.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

agent/core/agent_loop.py CHANGED
@@ -255,6 +255,7 @@ class Handlers:
255
  tool_choice="auto",
256
  stream=True,
257
  stream_options={"include_usage": True},
 
258
  **llm_params,
259
  )
260
 
 
255
  tool_choice="auto",
256
  stream=True,
257
  stream_options={"include_usage": True},
258
+ timeout=600, # 10 min — long tool-use turns can take a while
259
  **llm_params,
260
  )
261
 
frontend/src/hooks/useAgentChat.ts CHANGED
@@ -339,51 +339,184 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
339
  return () => { cancelled = true; };
340
  }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
341
 
342
- // -- Re-hydrate on wake from sleep (SSE stream may have died) -----------
343
- const rehydratingRef = useRef(false);
 
 
 
 
 
 
 
 
 
344
  useEffect(() => {
345
- const onVisible = async () => {
346
- if (document.visibilityState !== 'visible') return;
347
- if (rehydratingRef.current) return;
348
- rehydratingRef.current = true;
349
  try {
350
  const [msgsRes, infoRes] = await Promise.all([
351
  apiFetch(`/api/session/${sessionId}/messages`),
352
  apiFetch(`/api/session/${sessionId}`),
353
  ]);
354
- if (!msgsRes.ok || !infoRes.ok) return;
355
- const info = await infoRes.json();
356
  const data = await msgsRes.json();
357
- if (!Array.isArray(data) || data.length === 0) return;
358
 
359
- // Rebuild pending-approval set
360
  let pendingIds: Set<string> | undefined;
361
- if (info.pending_approval && Array.isArray(info.pending_approval)) {
362
- pendingIds = new Set(
363
- info.pending_approval.map((t: { tool_call_id: string }) => t.tool_call_id)
364
- );
365
- if (pendingIds.size > 0) setNeedsAttention(sessionId, true);
 
 
 
 
366
  }
 
 
 
 
 
367
 
368
- const uiMsgs = llmMessagesToUIMessages(data, pendingIds);
369
- if (uiMsgs.length > 0) {
370
- chat.setMessages(uiMsgs);
371
- saveMessages(sessionId, uiMsgs);
372
- }
 
 
 
 
373
 
374
- // If the backend is still processing but we lost the SSE stream,
375
- // mark the UI as busy so the chat input stays disabled.
376
- if (info.is_processing) {
377
- updateSession(sessionId, { isProcessing: true, activityStatus: { type: 'thinking' } });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
378
  }
379
  } catch {
380
- /* ignore backend may be briefly unreachable */
381
- } finally {
382
- rehydratingRef.current = false;
383
  }
384
  };
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
385
  document.addEventListener('visibilitychange', onVisible);
386
- return () => document.removeEventListener('visibilitychange', onVisible);
 
 
 
387
  }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
388
 
389
  // -- Persist messages ---------------------------------------------------
 
339
  return () => { cancelled = true; };
340
  }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
341
 
342
+ // -- Re-hydrate + reconnect on wake from sleep ----------------------------
343
+ // The Vercel AI SDK only calls reconnectToStream() on mount, NOT on
344
+ // visibility change. So when the browser wakes from sleep and the SSE
345
+ // stream is dead, we must manually:
346
+ // 1. Re-hydrate messages (one-shot fetch from backend)
347
+ // 2. Subscribe to live events via GET /api/events/{sessionId}
348
+ // 3. Pipe those events through the side-channel callbacks for real-time UI
349
+ // 4. Poll messages every few seconds so chat.setMessages stays in sync
350
+ const reconnectAbortRef = useRef<AbortController | null>(null);
351
+ const pollTimerRef = useRef<ReturnType<typeof setInterval> | null>(null);
352
+
353
  useEffect(() => {
354
+ /** Fetch latest messages from backend and push into the SDK. */
355
+ const hydrateMessages = async () => {
 
 
356
  try {
357
  const [msgsRes, infoRes] = await Promise.all([
358
  apiFetch(`/api/session/${sessionId}/messages`),
359
  apiFetch(`/api/session/${sessionId}`),
360
  ]);
361
+ if (!msgsRes.ok) return null;
 
362
  const data = await msgsRes.json();
363
+ if (!Array.isArray(data) || data.length === 0) return null;
364
 
 
365
  let pendingIds: Set<string> | undefined;
366
+ if (infoRes.ok) {
367
+ const info = await infoRes.json();
368
+ if (info.pending_approval && Array.isArray(info.pending_approval)) {
369
+ pendingIds = new Set(
370
+ info.pending_approval.map((t: { tool_call_id: string }) => t.tool_call_id)
371
+ );
372
+ if (pendingIds.size > 0) setNeedsAttention(sessionId, true);
373
+ }
374
+ return { data, pendingIds, info };
375
  }
376
+ return { data, pendingIds, info: null };
377
+ } catch {
378
+ return null;
379
+ }
380
+ };
381
 
382
+ /** Stop any running reconnection (event stream + poll). */
383
+ const stopReconnect = () => {
384
+ reconnectAbortRef.current?.abort();
385
+ reconnectAbortRef.current = null;
386
+ if (pollTimerRef.current) {
387
+ clearInterval(pollTimerRef.current);
388
+ pollTimerRef.current = null;
389
+ }
390
+ };
391
 
392
+ /** Read the event stream from GET /api/events and forward to side-channel. */
393
+ const consumeEventStream = async (signal: AbortSignal) => {
394
+ try {
395
+ const res = await apiFetch(`/api/events/${sessionId}`, {
396
+ headers: { 'Accept': 'text/event-stream' },
397
+ signal,
398
+ });
399
+ if (!res.ok || !res.body) return;
400
+
401
+ const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
402
+ let buf = '';
403
+ while (true) {
404
+ const { value, done } = await reader.read();
405
+ if (done || signal.aborted) break;
406
+ buf += value;
407
+ const lines = buf.split('\n');
408
+ buf = lines.pop() || '';
409
+ for (const line of lines) {
410
+ const trimmed = line.trim();
411
+ if (!trimmed.startsWith('data: ')) continue;
412
+ try {
413
+ const event = JSON.parse(trimmed.slice(6));
414
+ // Forward to side-channel for real-time UI updates
415
+ const et = event.event_type as string;
416
+ if (et === 'processing') sideChannel.onProcessing();
417
+ else if (et === 'assistant_chunk') sideChannel.onStreaming();
418
+ else if (et === 'tool_call') {
419
+ const t = event.data?.tool as string;
420
+ const d = event.data?.arguments?.description as string | undefined;
421
+ sideChannel.onToolRunning(t, d);
422
+ sideChannel.onToolCallPanel(t, (event.data?.arguments || {}) as Record<string, unknown>);
423
+ } else if (et === 'tool_output') {
424
+ sideChannel.onToolOutputPanel(
425
+ event.data?.tool as string,
426
+ event.data?.tool_call_id as string,
427
+ event.data?.output as string,
428
+ event.data?.success as boolean,
429
+ );
430
+ } else if (et === 'tool_state_change') {
431
+ const state = event.data?.state as string;
432
+ const toolName = event.data?.tool as string;
433
+ if (state === 'running' && toolName) sideChannel.onToolRunning(toolName);
434
+ } else if (et === 'turn_complete' || et === 'error' || et === 'interrupted') {
435
+ sideChannel.onProcessingDone();
436
+ stopReconnect();
437
+ // Final hydration to get the complete message state
438
+ const result = await hydrateMessages();
439
+ if (result) {
440
+ const uiMsgs = llmMessagesToUIMessages(result.data, result.pendingIds);
441
+ if (uiMsgs.length > 0) {
442
+ chat.setMessages(uiMsgs);
443
+ saveMessages(sessionId, uiMsgs);
444
+ }
445
+ }
446
+ return;
447
+ } else if (et === 'approval_required') {
448
+ sideChannel.onApprovalRequired(
449
+ (event.data?.tools || []) as Array<{ tool: string; arguments: Record<string, unknown>; tool_call_id: string }>,
450
+ );
451
+ stopReconnect();
452
+ const result = await hydrateMessages();
453
+ if (result) {
454
+ const uiMsgs = llmMessagesToUIMessages(result.data, result.pendingIds);
455
+ if (uiMsgs.length > 0) {
456
+ chat.setMessages(uiMsgs);
457
+ saveMessages(sessionId, uiMsgs);
458
+ }
459
+ }
460
+ return;
461
+ }
462
+ } catch { /* ignore parse errors */ }
463
+ }
464
  }
465
  } catch {
466
+ /* stream ended or aborted */
 
 
467
  }
468
  };
469
+
470
+ const onVisible = async () => {
471
+ if (document.visibilityState !== 'visible') return;
472
+
473
+ // Always re-hydrate messages on wake
474
+ const result = await hydrateMessages();
475
+ if (!result) return;
476
+
477
+ const { data, pendingIds, info } = result;
478
+ const uiMsgs = llmMessagesToUIMessages(data, pendingIds);
479
+ if (uiMsgs.length > 0) {
480
+ chat.setMessages(uiMsgs);
481
+ saveMessages(sessionId, uiMsgs);
482
+ }
483
+
484
+ // If the backend is still processing, reconnect to the live event stream
485
+ if (info?.is_processing) {
486
+ updateSession(sessionId, { isProcessing: true, activityStatus: { type: 'thinking' } });
487
+
488
+ // Stop any previous reconnection
489
+ stopReconnect();
490
+
491
+ // Start live event subscription
492
+ const abort = new AbortController();
493
+ reconnectAbortRef.current = abort;
494
+ consumeEventStream(abort.signal);
495
+
496
+ // Poll messages every 3 s so the chat message list stays up-to-date
497
+ // (the event stream gives us real-time status but not full message diffs)
498
+ pollTimerRef.current = setInterval(async () => {
499
+ const fresh = await hydrateMessages();
500
+ if (!fresh) return;
501
+ const msgs = llmMessagesToUIMessages(fresh.data, fresh.pendingIds);
502
+ if (msgs.length > 0) {
503
+ chat.setMessages(msgs);
504
+ saveMessages(sessionId, msgs);
505
+ }
506
+ // If backend stopped processing, clean up
507
+ if (fresh.info && !fresh.info.is_processing) {
508
+ updateSession(sessionId, { isProcessing: false });
509
+ stopReconnect();
510
+ }
511
+ }, 3000);
512
+ }
513
+ };
514
+
515
  document.addEventListener('visibilitychange', onVisible);
516
+ return () => {
517
+ document.removeEventListener('visibilitychange', onVisible);
518
+ stopReconnect();
519
+ };
520
  }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
521
 
522
  // -- Persist messages ---------------------------------------------------