Ken Sang Tang commited on
Commit
c374e99
·
verified ·
1 Parent(s): c5340db

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +72 -199
app.py CHANGED
@@ -32,7 +32,6 @@ app.add_middleware(
32
  allow_headers=["*"],
33
  )
34
 
35
-
36
  class Task(BaseModel):
37
  id: str
38
  prompt: str
@@ -45,7 +44,6 @@ class Task(BaseModel):
45
  data["created_at"] = self.created_at.isoformat()
46
  return data
47
 
48
-
49
  class TaskManager:
50
  def __init__(self):
51
  self.tasks = {}
@@ -60,26 +58,18 @@ class TaskManager:
60
  self.queues[task_id] = asyncio.Queue()
61
  return task
62
 
63
- async def update_task_step(
64
- self, task_id: str, step: int, result: str, step_type: str = "step"
65
- ):
66
  if task_id in self.tasks:
67
  task = self.tasks[task_id]
68
  task.steps.append({"step": step, "result": result, "type": step_type})
69
- await self.queues[task_id].put(
70
- {"type": step_type, "step": step, "result": result}
71
- )
72
- await self.queues[task_id].put(
73
- {"type": "status", "status": task.status, "steps": task.steps}
74
- )
75
 
76
  async def complete_task(self, task_id: str):
77
  if task_id in self.tasks:
78
  task = self.tasks[task_id]
79
  task.status = "completed"
80
- await self.queues[task_id].put(
81
- {"type": "status", "status": task.status, "steps": task.steps}
82
- )
83
  await self.queues[task_id].put({"type": "complete"})
84
 
85
  async def fail_task(self, task_id: str, error: str):
@@ -87,19 +77,15 @@ class TaskManager:
87
  self.tasks[task_id].status = f"failed: {error}"
88
  await self.queues[task_id].put({"type": "error", "message": error})
89
 
90
-
91
  task_manager = TaskManager()
92
 
93
  def make_sse_handler(task_id):
94
- handler = SSELogHandler(task_id)
95
-
96
- def sink(message):
97
- asyncio.create_task(handler(str(message)))
98
-
99
- return sink
100
 
101
  def get_available_themes():
102
- """扫描themes目录获取所有可用主题"""
103
  themes_dir = "static/themes"
104
  if not os.path.exists(themes_dir):
105
  return [{"id": "openmanus", "name": "Manus", "description": "默认主题"}]
@@ -108,7 +94,6 @@ def get_available_themes():
108
  for item in os.listdir(themes_dir):
109
  theme_path = os.path.join(themes_dir, item)
110
  if os.path.isdir(theme_path):
111
- # 验证主题文件夹是否包含必要的文件
112
  templates_dir = os.path.join(theme_path, "templates")
113
  static_dir = os.path.join(theme_path, "static")
114
  config_file = os.path.join(theme_path, "theme.json")
@@ -116,35 +101,24 @@ def get_available_themes():
116
  if os.path.exists(templates_dir) and os.path.exists(static_dir):
117
  if os.path.exists(os.path.join(templates_dir, "chat.html")):
118
  theme_info = {"id": item, "name": item, "description": ""}
119
-
120
- # 如果有配置文件,读取主题名称和描述
121
  if os.path.exists(config_file):
122
  try:
123
  with open(config_file, "r", encoding="utf-8") as f:
124
  config = json.load(f)
125
  theme_info["name"] = config.get("name", item)
126
- theme_info["description"] = config.get(
127
- "description", ""
128
- )
129
  except Exception as e:
130
  print(f"读取主题配置文件出错: {str(e)}")
131
-
132
  themes.append(theme_info)
133
 
134
- # 确保Normal主题始终存在
135
- normal_exists = any(theme["id"] == "openmanus" for theme in themes)
136
- if not normal_exists:
137
  themes.append({"id": "openmanus", "name": "Manus", "description": "默认主题"})
138
 
139
  return themes
140
 
141
-
142
  @app.get("/", response_class=HTMLResponse)
143
  async def index(request: Request):
144
- # 获取可用主题列表
145
  themes = get_available_themes()
146
-
147
- # 对主题进行排序:Normal在前,cyberpunk在后,其他主题按原顺序
148
  sorted_themes = []
149
  normal_theme = None
150
  cyberpunk_theme = None
@@ -158,28 +132,21 @@ async def index(request: Request):
158
  else:
159
  other_themes.append(theme)
160
 
161
- # 按照指定顺序组合主题
162
  if normal_theme:
163
  sorted_themes.append(normal_theme)
164
  if cyberpunk_theme:
165
  sorted_themes.append(cyberpunk_theme)
166
  sorted_themes.extend(other_themes)
167
 
168
- return templates.TemplateResponse(
169
- "index.html", {"request": request, "themes": sorted_themes}
170
- )
171
-
172
 
173
  @app.get("/chat", response_class=HTMLResponse)
174
  async def chat(request: Request):
175
  theme = request.query_params.get("theme", "openmanus")
176
- # 尝试从主题文件夹加载chat.html
177
  theme_chat_path = f"static/themes/{theme}/templates/chat.html"
178
  if os.path.exists(theme_chat_path):
179
  with open(theme_chat_path, "r", encoding="utf-8") as f:
180
  content = f.read()
181
-
182
- # 读取主题配置文件
183
  theme_config_path = f"static/themes/{theme}/theme.json"
184
  theme_name = theme
185
  if os.path.exists(theme_config_path):
@@ -189,15 +156,9 @@ async def chat(request: Request):
189
  theme_name = config.get("name", theme)
190
  except Exception:
191
  pass
192
-
193
- # 将主题名称添加到HTML标题中
194
- content = content.replace(
195
- "<title>Manus</title>", f"<title>Manus - {theme_name}</title>"
196
- )
197
  return HTMLResponse(content=content)
198
- else:
199
- # 默认使用templates中的chat.html
200
- return templates.TemplateResponse("chat.html", {"request": request})
201
 
202
  @app.get("/debug", response_class=HTMLResponse)
203
  async def debug_sse_page(request: Request):
@@ -207,169 +168,100 @@ async def debug_sse_page(request: Request):
207
  async def download_file(file_path: str):
208
  if not os.path.exists(file_path):
209
  raise HTTPException(status_code=404, detail="File not found")
210
-
211
  return FileResponse(file_path, filename=os.path.basename(file_path))
212
 
213
-
214
  @app.post("/tasks")
215
  async def create_task(prompt: str = Body(..., embed=True)):
216
  task = task_manager.create_task(prompt)
217
  asyncio.create_task(run_task(task.id, prompt))
218
  return {"task_id": task.id}
219
 
 
 
 
220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
 
222
  async def run_task(task_id: str, prompt: str):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  try:
224
  task_manager.tasks[task_id].status = "running"
225
- logger.info("✨ Manus's thoughts: testing SSE event emission")
226
-
227
- # Attach SSE logger
228
- def make_sse_handler(task_id):
229
- handler = SSELogHandler(task_id)
230
-
231
- def sink(message):
232
- asyncio.create_task(handler(str(message)))
233
-
234
- return sink
235
-
236
  logger.add(make_sse_handler(task_id))
237
-
238
- # Simulate external call delay and emit result manually
239
  await asyncio.sleep(2)
240
  logger.info("🎉 Manus result: Hello from mock")
241
-
242
  await task_manager.update_task_step(task_id, 1, "Hello from mock", "result")
243
  await task_manager.complete_task(task_id)
244
-
 
245
  except Exception as e:
246
  await task_manager.fail_task(task_id, str(e))
247
 
248
- async def on_think(thought):
249
- await task_manager.update_task_step(task_id, 0, thought, "think")
250
-
251
- async def on_tool_execute(tool, input):
252
- await task_manager.update_task_step(
253
- task_id, 0, f"Executing tool: {tool}\nInput: {input}", "tool"
254
- )
255
-
256
- async def on_action(action):
257
- await task_manager.update_task_step(
258
- task_id, 0, f"Executing action: {action}", "act"
259
- )
260
-
261
- async def on_run(step, result):
262
- await task_manager.update_task_step(task_id, step, result, "run")
263
-
264
- class SSELogHandler:
265
- def __init__(self, task_id):
266
- self.task_id = task_id
267
-
268
- async def __call__(self, message):
269
- import re
270
-
271
- # Extract - Subsequent Content
272
- cleaned_message = re.sub(r"^.*? - ", "", message)
273
- cleaned_message = re.sub(r"^.*? - ", "", cleaned_message)
274
-
275
- event_type = "log"
276
- if "✨ Manus's thoughts:" in cleaned_message:
277
- event_type = "think"
278
- elif "🛠️ Manus selected" in cleaned_message:
279
- event_type = "tool"
280
- elif "🎯 Tool" in cleaned_message:
281
- event_type = "act"
282
- elif "📝 Oops!" in cleaned_message:
283
- event_type = "error"
284
- elif "🏁 Special tool" in cleaned_message:
285
- event_type = "complete"
286
- elif "🎉 Manus result:" in cleaned_message:
287
- event_type = "result"
288
- cleaned_message = cleaned_message.replace("🎉 Manus result:", "")
289
-
290
- await task_manager.update_task_step(
291
- self.task_id, 1, cleaned_message, event_type
292
- )
293
- return
294
-
295
- await task_manager.update_task_step(
296
- self.task_id, 0, cleaned_message, event_type
297
- )
298
-
299
- import re
300
- def has_log_prefix(message):
301
- # 检查字符串是否包含两个 "|" 且以 " - " 分割前缀和内容
302
- return re.match(r"^.*?\|.*?\|.*? - ", message) is not None
303
-
304
- async def call_manus(url: str, prompt: str):
305
- generate_kwargs = {
306
- "prompt": prompt,
307
- }
308
- async with aiohttp.ClientSession() as session:
309
- async with session.post(
310
- url=url,
311
- json=generate_kwargs,
312
- timeout=aiohttp.ClientTimeout(total=3600)
313
- ) as response:
314
- buffer = ""
315
- async for line in response.content:
316
- decode_line = line.decode('utf-8')
317
-
318
- if has_log_prefix(decode_line) and len(buffer)>0:
319
- logger.info(buffer)
320
- buffer = ""
321
- else:
322
- buffer += decode_line
323
-
324
- if buffer:
325
- logger.info(buffer)
326
-
327
- print("Calling external Manus API...")
328
- await call_manus(OPENMANUS_ENDPOINT_URL, prompt)
329
- print("Finished calling Manus API.")
330
-
331
- await task_manager.update_task_step(task_id, 1, "", "result")
332
- await task_manager.complete_task(task_id)
333
- except Exception as e:
334
- await task_manager.fail_task(task_id, str(e))
335
-
336
-
337
  @app.get("/tasks/{task_id}/events")
338
  async def task_events(task_id: str):
339
  async def event_generator():
340
  if task_id not in task_manager.queues:
341
  yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n"
342
  return
343
-
344
  queue = task_manager.queues[task_id]
345
-
346
  task = task_manager.tasks.get(task_id)
347
  if task:
348
  yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
349
-
350
  while True:
351
  try:
352
  event = await queue.get()
353
  formatted_event = dumps(event)
354
-
355
  yield ": heartbeat\n\n"
356
-
357
- if event["type"] == "complete":
358
- yield f"event: complete\ndata: {formatted_event}\n\n"
359
- break
360
- elif event["type"] == "error":
361
- yield f"event: error\ndata: {formatted_event}\n\n"
362
  break
363
- elif event["type"] == "step":
364
  task = task_manager.tasks.get(task_id)
365
  if task:
366
  yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
367
- yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
368
- elif event["type"] in ["think", "tool", "act", "run"]:
369
- yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
370
- else:
371
- yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
372
-
373
  except asyncio.CancelledError:
374
  print(f"Client disconnected for task {task_id}")
375
  break
@@ -377,28 +269,12 @@ async def task_events(task_id: str):
377
  print(f"Error in event stream: {str(e)}")
378
  yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
379
  break
380
-
381
- return StreamingResponse(
382
- event_generator(),
383
- media_type="text/event-stream",
384
- headers={
385
- "Cache-Control": "no-cache",
386
- "Connection": "keep-alive",
387
- "X-Accel-Buffering": "no",
388
- },
389
- )
390
-
391
 
392
  @app.get("/tasks")
393
  async def get_tasks():
394
- sorted_tasks = sorted(
395
- task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True
396
- )
397
- return JSONResponse(
398
- content=[task.model_dump() for task in sorted_tasks],
399
- headers={"Content-Type": "application/json"},
400
- )
401
-
402
 
403
  @app.get("/tasks/{task_id}")
404
  async def get_task(task_id: str):
@@ -406,12 +282,9 @@ async def get_task(task_id: str):
406
  raise HTTPException(status_code=404, detail="Task not found")
407
  return task_manager.tasks[task_id]
408
 
409
-
410
  @app.exception_handler(Exception)
411
  async def generic_exception_handler(request: Request, exc: Exception):
412
- return JSONResponse(
413
- status_code=500, content={"message": f"Server error: {str(exc)}"}
414
- )
415
 
416
  if __name__ == "__main__":
417
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
32
  allow_headers=["*"],
33
  )
34
 
 
35
  class Task(BaseModel):
36
  id: str
37
  prompt: str
 
44
  data["created_at"] = self.created_at.isoformat()
45
  return data
46
 
 
47
  class TaskManager:
48
  def __init__(self):
49
  self.tasks = {}
 
58
  self.queues[task_id] = asyncio.Queue()
59
  return task
60
 
61
+ async def update_task_step(self, task_id: str, step: int, result: str, step_type: str = "step"):
 
 
62
  if task_id in self.tasks:
63
  task = self.tasks[task_id]
64
  task.steps.append({"step": step, "result": result, "type": step_type})
65
+ await self.queues[task_id].put({"type": step_type, "step": step, "result": result})
66
+ await self.queues[task_id].put({"type": "status", "status": task.status, "steps": task.steps})
 
 
 
 
67
 
68
  async def complete_task(self, task_id: str):
69
  if task_id in self.tasks:
70
  task = self.tasks[task_id]
71
  task.status = "completed"
72
+ await self.queues[task_id].put({"type": "status", "status": task.status, "steps": task.steps})
 
 
73
  await self.queues[task_id].put({"type": "complete"})
74
 
75
  async def fail_task(self, task_id: str, error: str):
 
77
  self.tasks[task_id].status = f"failed: {error}"
78
  await self.queues[task_id].put({"type": "error", "message": error})
79
 
 
80
  task_manager = TaskManager()
81
 
82
  def make_sse_handler(task_id):
83
+ handler = SSELogHandler(task_id)
84
+ def sink(message):
85
+ asyncio.create_task(handler(str(message)))
86
+ return sink
 
 
87
 
88
  def get_available_themes():
 
89
  themes_dir = "static/themes"
90
  if not os.path.exists(themes_dir):
91
  return [{"id": "openmanus", "name": "Manus", "description": "默认主题"}]
 
94
  for item in os.listdir(themes_dir):
95
  theme_path = os.path.join(themes_dir, item)
96
  if os.path.isdir(theme_path):
 
97
  templates_dir = os.path.join(theme_path, "templates")
98
  static_dir = os.path.join(theme_path, "static")
99
  config_file = os.path.join(theme_path, "theme.json")
 
101
  if os.path.exists(templates_dir) and os.path.exists(static_dir):
102
  if os.path.exists(os.path.join(templates_dir, "chat.html")):
103
  theme_info = {"id": item, "name": item, "description": ""}
 
 
104
  if os.path.exists(config_file):
105
  try:
106
  with open(config_file, "r", encoding="utf-8") as f:
107
  config = json.load(f)
108
  theme_info["name"] = config.get("name", item)
109
+ theme_info["description"] = config.get("description", "")
 
 
110
  except Exception as e:
111
  print(f"读取主题配置文件出错: {str(e)}")
 
112
  themes.append(theme_info)
113
 
114
+ if not any(theme["id"] == "openmanus" for theme in themes):
 
 
115
  themes.append({"id": "openmanus", "name": "Manus", "description": "默认主题"})
116
 
117
  return themes
118
 
 
119
  @app.get("/", response_class=HTMLResponse)
120
  async def index(request: Request):
 
121
  themes = get_available_themes()
 
 
122
  sorted_themes = []
123
  normal_theme = None
124
  cyberpunk_theme = None
 
132
  else:
133
  other_themes.append(theme)
134
 
 
135
  if normal_theme:
136
  sorted_themes.append(normal_theme)
137
  if cyberpunk_theme:
138
  sorted_themes.append(cyberpunk_theme)
139
  sorted_themes.extend(other_themes)
140
 
141
+ return templates.TemplateResponse("index.html", {"request": request, "themes": sorted_themes})
 
 
 
142
 
143
  @app.get("/chat", response_class=HTMLResponse)
144
  async def chat(request: Request):
145
  theme = request.query_params.get("theme", "openmanus")
 
146
  theme_chat_path = f"static/themes/{theme}/templates/chat.html"
147
  if os.path.exists(theme_chat_path):
148
  with open(theme_chat_path, "r", encoding="utf-8") as f:
149
  content = f.read()
 
 
150
  theme_config_path = f"static/themes/{theme}/theme.json"
151
  theme_name = theme
152
  if os.path.exists(theme_config_path):
 
156
  theme_name = config.get("name", theme)
157
  except Exception:
158
  pass
159
+ content = content.replace("<title>Manus</title>", f"<title>Manus - {theme_name}</title>")
 
 
 
 
160
  return HTMLResponse(content=content)
161
+ return templates.TemplateResponse("chat.html", {"request": request})
 
 
162
 
163
  @app.get("/debug", response_class=HTMLResponse)
164
  async def debug_sse_page(request: Request):
 
168
  async def download_file(file_path: str):
169
  if not os.path.exists(file_path):
170
  raise HTTPException(status_code=404, detail="File not found")
 
171
  return FileResponse(file_path, filename=os.path.basename(file_path))
172
 
 
173
  @app.post("/tasks")
174
  async def create_task(prompt: str = Body(..., embed=True)):
175
  task = task_manager.create_task(prompt)
176
  asyncio.create_task(run_task(task.id, prompt))
177
  return {"task_id": task.id}
178
 
179
+ class SSELogHandler:
180
+ def __init__(self, task_id):
181
+ self.task_id = task_id
182
 
183
+ async def __call__(self, message):
184
+ import re
185
+ cleaned_message = re.sub(r"^.*? - ", "", message)
186
+ cleaned_message = re.sub(r"^.*? - ", "", cleaned_message)
187
+ event_type = "log"
188
+ if "✨ Manus's thoughts:" in cleaned_message:
189
+ event_type = "think"
190
+ elif "🛠️ Manus selected" in cleaned_message:
191
+ event_type = "tool"
192
+ elif "🎯 Tool" in cleaned_message:
193
+ event_type = "act"
194
+ elif "📝 Oops!" in cleaned_message:
195
+ event_type = "error"
196
+ elif "🏁 Special tool" in cleaned_message:
197
+ event_type = "complete"
198
+ elif "🎉 Manus result:" in cleaned_message:
199
+ event_type = "result"
200
+ cleaned_message = cleaned_message.replace("🎉 Manus result:", "")
201
+ await task_manager.update_task_step(self.task_id, 1, cleaned_message, event_type)
202
+ return
203
+ await task_manager.update_task_step(self.task_id, 0, cleaned_message, event_type)
204
 
205
  async def run_task(task_id: str, prompt: str):
206
+ def has_log_prefix(message):
207
+ import re
208
+ return re.match(r"^.*?\|.*?\|.*? - ", message) is not None
209
+
210
+ async def call_manus(url: str, prompt: str):
211
+ generate_kwargs = {"prompt": prompt}
212
+ async with aiohttp.ClientSession() as session:
213
+ async with session.post(
214
+ url=url,
215
+ json=generate_kwargs,
216
+ timeout=aiohttp.ClientTimeout(total=3600)
217
+ ) as response:
218
+ buffer = ""
219
+ async for line in response.content:
220
+ decode_line = line.decode('utf-8')
221
+ if has_log_prefix(decode_line) and len(buffer) > 0:
222
+ logger.info(buffer)
223
+ buffer = ""
224
+ else:
225
+ buffer += decode_line
226
+ if buffer:
227
+ logger.info(buffer)
228
+
229
  try:
230
  task_manager.tasks[task_id].status = "running"
 
 
 
 
 
 
 
 
 
 
 
231
  logger.add(make_sse_handler(task_id))
232
+ logger.info("✨ Manus's thoughts: testing SSE event emission")
 
233
  await asyncio.sleep(2)
234
  logger.info("🎉 Manus result: Hello from mock")
 
235
  await task_manager.update_task_step(task_id, 1, "Hello from mock", "result")
236
  await task_manager.complete_task(task_id)
237
+ # Uncomment for real call:
238
+ # await call_manus(OPENMANUS_ENDPOINT_URL, prompt)
239
  except Exception as e:
240
  await task_manager.fail_task(task_id, str(e))
241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
  @app.get("/tasks/{task_id}/events")
243
  async def task_events(task_id: str):
244
  async def event_generator():
245
  if task_id not in task_manager.queues:
246
  yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n"
247
  return
 
248
  queue = task_manager.queues[task_id]
 
249
  task = task_manager.tasks.get(task_id)
250
  if task:
251
  yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
 
252
  while True:
253
  try:
254
  event = await queue.get()
255
  formatted_event = dumps(event)
 
256
  yield ": heartbeat\n\n"
257
+ if event["type"] in ["complete", "error"]:
258
+ yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
 
 
 
 
259
  break
260
+ if event["type"] == "step":
261
  task = task_manager.tasks.get(task_id)
262
  if task:
263
  yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
264
+ yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
 
 
 
 
 
265
  except asyncio.CancelledError:
266
  print(f"Client disconnected for task {task_id}")
267
  break
 
269
  print(f"Error in event stream: {str(e)}")
270
  yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
271
  break
272
+ return StreamingResponse(event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"})
 
 
 
 
 
 
 
 
 
 
273
 
274
  @app.get("/tasks")
275
  async def get_tasks():
276
+ sorted_tasks = sorted(task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True)
277
+ return JSONResponse(content=[task.model_dump() for task in sorted_tasks], headers={"Content-Type": "application/json"})
 
 
 
 
 
 
278
 
279
  @app.get("/tasks/{task_id}")
280
  async def get_task(task_id: str):
 
282
  raise HTTPException(status_code=404, detail="Task not found")
283
  return task_manager.tasks[task_id]
284
 
 
285
  @app.exception_handler(Exception)
286
  async def generic_exception_handler(request: Request, exc: Exception):
287
+ return JSONResponse(status_code=500, content={"message": f"Server error: {str(exc)}"})
 
 
288
 
289
  if __name__ == "__main__":
290
+ uvicorn.run(app, host="0.0.0.0", port=7860)