Ken Sang Tang commited on
Commit
643414d
·
verified ·
1 Parent(s): 0c7fe7f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +15 -9
app.py CHANGED
@@ -12,6 +12,7 @@ import asyncio
12
  import os
13
  import uuid
14
  import toml
 
15
  from datetime import datetime
16
  from json import dumps
17
 
@@ -121,24 +122,29 @@ async def task_events(task_id: str):
121
  queue = task_manager.queues[task_id]
122
  task = task_manager.tasks.get(task_id)
123
  if task:
124
- logger.info(f"Sending initial task status for: {task_id}")
125
  yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
126
 
 
 
127
  while True:
128
  try:
129
- event = await queue.get()
130
- formatted_event = dumps(event)
131
- yield ": heartbeat\n\n"
132
- yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
133
-
134
- if event["type"] in ["complete", "error"]:
 
 
135
  break
 
 
 
 
136
 
137
  except asyncio.CancelledError:
138
- logger.warning("Client disconnected")
139
  break
140
  except Exception as e:
141
- logger.error(f"Event stream error: {e}")
142
  yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
143
  break
144
 
 
12
  import os
13
  import uuid
14
  import toml
15
+ import time
16
  from datetime import datetime
17
  from json import dumps
18
 
 
122
  queue = task_manager.queues[task_id]
123
  task = task_manager.tasks.get(task_id)
124
  if task:
 
125
  yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
126
 
127
+ last_event_time = time.time()
128
+
129
  while True:
130
  try:
131
+ # wait up to 5 seconds for new events
132
+ try:
133
+ event = await asyncio.wait_for(queue.get(), timeout=5.0)
134
+ formatted_event = dumps(event)
135
+ yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
136
+ last_event_time = time.time()
137
+
138
+ if event["type"] in ["complete", "error"]:
139
  break
140
+ except asyncio.TimeoutError:
141
+ # Send heartbeat to keep connection alive
142
+ yield ": heartbeat\n\n"
143
+
144
 
145
  except asyncio.CancelledError:
 
146
  break
147
  except Exception as e:
 
148
  yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
149
  break
150