Spaces:
Runtime error
Runtime error
| # app.py | |
| import asyncio | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from json import dumps | |
| from fastapi import Body, FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from pydantic import BaseModel | |
| from loguru import logger | |
| import uvicorn | |
| import aiohttp | |
| import toml | |
| app = FastAPI() | |
| OPENMANUS_ENDPOINT_URL = os.getenv("OPENMANUS_ENDPOINT_URL") | |
| if not OPENMANUS_ENDPOINT_URL: | |
| config_path = "config/config.toml" | |
| if os.path.exists(config_path): | |
| config = toml.load(config_path) | |
| OPENMANUS_ENDPOINT_URL = config.get("OPENMANUS_ENDPOINT_URL") | |
| if not OPENMANUS_ENDPOINT_URL: | |
| raise EnvironmentError("OPENMANUS_ENDPOINT_URL must be set in env or config/config.toml") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class Task(BaseModel): | |
| id: str | |
| prompt: str | |
| created_at: datetime | |
| status: str | |
| steps: list = [] | |
| def model_dump(self, *args, **kwargs): | |
| data = super().model_dump(*args, **kwargs) | |
| data["created_at"] = self.created_at.isoformat() | |
| return data | |
| class TaskManager: | |
| def __init__(self): | |
| self.tasks = {} | |
| self.queues = {} | |
| def create_task(self, prompt: str) -> Task: | |
| task_id = str(uuid.uuid4()) | |
| task = Task( | |
| id=task_id, prompt=prompt, created_at=datetime.now(), status="pending" | |
| ) | |
| self.tasks[task_id] = task | |
| self.queues[task_id] = asyncio.Queue() | |
| return task | |
| async def update_task_step( | |
| self, task_id: str, step: int, result: str, step_type: str = "step" | |
| ): | |
| if task_id in self.tasks: | |
| task = self.tasks[task_id] | |
| task.steps.append({"step": step, "result": result, "type": step_type}) | |
| await self.queues[task_id].put( | |
| {"type": step_type, "step": step, "result": result} | |
| ) | |
| await self.queues[task_id].put( | |
| {"type": "status", "status": task.status, "steps": task.steps} | |
| ) | |
| async def complete_task(self, task_id: str): | |
| if task_id in self.tasks: | |
| task = self.tasks[task_id] | |
| task.status = "completed" | |
| await self.queues[task_id].put( | |
| {"type": "status", "status": task.status, "steps": task.steps} | |
| ) | |
| await self.queues[task_id].put({"type": "complete"}) | |
| async def fail_task(self, task_id: str, error: str): | |
| if task_id in self.tasks: | |
| self.tasks[task_id].status = f"failed: {error}" | |
| await self.queues[task_id].put({"type": "error", "message": error}) | |
| task_manager = TaskManager() | |
| async def index(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def chat(request: Request): | |
| return templates.TemplateResponse("chat.html", {"request": request}) | |
| async def create_task(prompt: str = Body(..., embed=True)): | |
| task = task_manager.create_task(prompt) | |
| asyncio.create_task(run_task(task.id, prompt)) | |
| return {"task_id": task.id} | |
| async def get_tasks(): | |
| sorted_tasks = sorted( | |
| task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True | |
| ) | |
| return JSONResponse( | |
| content=[task.model_dump() for task in sorted_tasks], | |
| headers={"Content-Type": "application/json"}, | |
| ) | |
| async def get_task(task_id: str): | |
| if task_id not in task_manager.tasks: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return task_manager.tasks[task_id] | |
| async def task_events(task_id: str): | |
| async def event_generator(): | |
| if task_id not in task_manager.queues: | |
| yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n" | |
| return | |
| queue = task_manager.queues[task_id] | |
| task = task_manager.tasks.get(task_id) | |
| if task: | |
| yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n" | |
| while True: | |
| try: | |
| event = await queue.get() | |
| formatted_event = dumps(event) | |
| yield ": heartbeat\n\n" | |
| if event["type"] == "complete": | |
| yield f"event: complete\ndata: {formatted_event}\n\n" | |
| break | |
| elif event["type"] == "error": | |
| yield f"event: error\ndata: {formatted_event}\n\n" | |
| break | |
| elif event["type"] in ["step", "think", "tool", "act", "run"]: | |
| yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
| else: | |
| yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n" | |
| break | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| async def download_file(file_path: str): | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(file_path, filename=os.path.basename(file_path)) | |
| async def run_task(task_id: str, prompt: str): | |
| try: | |
| task_manager.tasks[task_id].status = "running" | |
| class SSELogHandler: | |
| def __init__(self, task_id): | |
| self.task_id = task_id | |
| async def __call__(self, message): | |
| import re | |
| cleaned_message = re.sub(r"^.*? - ", "", message) | |
| event_type = "log" | |
| if "Manus result:" in cleaned_message: | |
| event_type = "result" | |
| cleaned_message = cleaned_message.replace("Manus result:", "") | |
| await task_manager.update_task_step( | |
| self.task_id, 1, cleaned_message, event_type | |
| ) | |
| return | |
| await task_manager.update_task_step( | |
| self.task_id, 0, cleaned_message, event_type | |
| ) | |
| sse_handler = SSELogHandler(task_id) | |
| logger.add(sse_handler) | |
| async def call_manus(url: str, prompt: str): | |
| generate_kwargs = {"prompt": prompt} | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| url=url, | |
| json=generate_kwargs, | |
| timeout=aiohttp.ClientTimeout(total=3600) | |
| ) as response: | |
| buffer = "" | |
| async for line in response.content: | |
| decode_line = line.decode('utf-8') | |
| buffer += decode_line | |
| if buffer: | |
| logger.info(buffer) | |
| await call_manus(OPENMANUS_ENDPOINT_URL, prompt) | |
| await task_manager.update_task_step(task_id, 1, "", "result") | |
| await task_manager.complete_task(task_id) | |
| except Exception as e: | |
| await task_manager.fail_task(task_id, str(e)) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |