# app.py import asyncio import json import os import uuid from datetime import datetime from json import dumps from fastapi import Body, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel from loguru import logger import uvicorn import aiohttp import toml app = FastAPI() OPENMANUS_ENDPOINT_URL = os.getenv("OPENMANUS_ENDPOINT_URL") if not OPENMANUS_ENDPOINT_URL: config_path = "config/config.toml" if os.path.exists(config_path): config = toml.load(config_path) OPENMANUS_ENDPOINT_URL = config.get("OPENMANUS_ENDPOINT_URL") if not OPENMANUS_ENDPOINT_URL: raise EnvironmentError("OPENMANUS_ENDPOINT_URL must be set in env or config/config.toml") app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class Task(BaseModel): id: str prompt: str created_at: datetime status: str steps: list = [] def model_dump(self, *args, **kwargs): data = super().model_dump(*args, **kwargs) data["created_at"] = self.created_at.isoformat() return data class TaskManager: def __init__(self): self.tasks = {} self.queues = {} def create_task(self, prompt: str) -> Task: task_id = str(uuid.uuid4()) task = Task( id=task_id, prompt=prompt, created_at=datetime.now(), status="pending" ) self.tasks[task_id] = task self.queues[task_id] = asyncio.Queue() return task async def update_task_step( self, task_id: str, step: int, result: str, step_type: str = "step" ): if task_id in self.tasks: task = self.tasks[task_id] task.steps.append({"step": step, "result": result, "type": step_type}) await self.queues[task_id].put( {"type": step_type, "step": step, "result": result} ) await self.queues[task_id].put( {"type": "status", "status": task.status, "steps": task.steps} ) async def complete_task(self, task_id: str): if task_id in self.tasks: task = self.tasks[task_id] task.status = "completed" await self.queues[task_id].put( {"type": "status", "status": task.status, "steps": task.steps} ) await self.queues[task_id].put({"type": "complete"}) async def fail_task(self, task_id: str, error: str): if task_id in self.tasks: self.tasks[task_id].status = f"failed: {error}" await self.queues[task_id].put({"type": "error", "message": error}) task_manager = TaskManager() @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/chat", response_class=HTMLResponse) async def chat(request: Request): return templates.TemplateResponse("chat.html", {"request": request}) @app.post("/tasks") async def create_task(prompt: str = Body(..., embed=True)): task = task_manager.create_task(prompt) asyncio.create_task(run_task(task.id, prompt)) return {"task_id": task.id} @app.get("/tasks") async def get_tasks(): sorted_tasks = sorted( task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True ) return JSONResponse( content=[task.model_dump() for task in sorted_tasks], headers={"Content-Type": "application/json"}, ) @app.get("/tasks/{task_id}") async def get_task(task_id: str): if task_id not in task_manager.tasks: raise HTTPException(status_code=404, detail="Task not found") return task_manager.tasks[task_id] @app.get("/tasks/{task_id}/events") async def task_events(task_id: str): async def event_generator(): if task_id not in task_manager.queues: yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n" return queue = task_manager.queues[task_id] task = task_manager.tasks.get(task_id) if task: yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n" while True: try: event = await queue.get() formatted_event = dumps(event) yield ": heartbeat\n\n" if event["type"] == "complete": yield f"event: complete\ndata: {formatted_event}\n\n" break elif event["type"] == "error": yield f"event: error\ndata: {formatted_event}\n\n" break elif event["type"] in ["step", "think", "tool", "act", "run"]: yield f"event: {event['type']}\ndata: {formatted_event}\n\n" else: yield f"event: {event['type']}\ndata: {formatted_event}\n\n" except asyncio.CancelledError: break except Exception as e: yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n" break return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @app.get("/download") async def download_file(file_path: str): if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") return FileResponse(file_path, filename=os.path.basename(file_path)) async def run_task(task_id: str, prompt: str): try: task_manager.tasks[task_id].status = "running" class SSELogHandler: def __init__(self, task_id): self.task_id = task_id async def __call__(self, message): import re cleaned_message = re.sub(r"^.*? - ", "", message) event_type = "log" if "Manus result:" in cleaned_message: event_type = "result" cleaned_message = cleaned_message.replace("Manus result:", "") await task_manager.update_task_step( self.task_id, 1, cleaned_message, event_type ) return await task_manager.update_task_step( self.task_id, 0, cleaned_message, event_type ) sse_handler = SSELogHandler(task_id) logger.add(sse_handler) async def call_manus(url: str, prompt: str): generate_kwargs = {"prompt": prompt} async with aiohttp.ClientSession() as session: async with session.post( url=url, json=generate_kwargs, timeout=aiohttp.ClientTimeout(total=3600) ) as response: buffer = "" async for line in response.content: decode_line = line.decode('utf-8') buffer += decode_line if buffer: logger.info(buffer) await call_manus(OPENMANUS_ENDPOINT_URL, prompt) await task_manager.update_task_step(task_id, 1, "", "result") await task_manager.complete_task(task_id) except Exception as e: await task_manager.fail_task(task_id, str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)