Manus3 / app.py
Ken Sang Tang
Update app.py
3f67da4 verified
from fastapi import FastAPI, Body, Request, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.requests import Request
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from loguru import logger
import aiohttp
import uvicorn
import asyncio
import os
import uuid
import toml
import time
from datetime import datetime
from json import dumps
# Load OPENMANUS_ENDPOINT_URL from env or config fallback
OPENMANUS_ENDPOINT_URL = os.getenv("OPENMANUS_ENDPOINT_URL")
if not OPENMANUS_ENDPOINT_URL:
config_path = "config/config.toml"
if os.path.exists(config_path):
config = toml.load(config_path)
OPENMANUS_ENDPOINT_URL = config.get("OPENMANUS_ENDPOINT_URL")
if not OPENMANUS_ENDPOINT_URL:
raise EnvironmentError("OPENMANUS_ENDPOINT_URL must be set in env or config/config.toml")
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
class Task(BaseModel):
id: str
prompt: str
created_at: datetime
status: str
steps: list = []
def model_dump(self, *args, **kwargs):
data = super().model_dump(*args, **kwargs)
data["created_at"] = self.created_at.isoformat()
return data
class TaskManager:
def __init__(self):
self.tasks = {}
self.queues = {}
def create_task(self, prompt: str) -> Task:
task_id = str(uuid.uuid4())
task = Task(
id=task_id, prompt=prompt, created_at=datetime.now(), status="pending"
)
self.tasks[task_id] = task
self.queues[task_id] = asyncio.Queue()
return task
async def update_task_step(self, task_id: str, step: int, result: str, step_type: str = "step"):
if task_id in self.tasks:
task = self.tasks[task_id]
task.steps.append({"step": step, "result": result, "type": step_type})
await self.queues[task_id].put({"type": step_type, "step": step, "result": result})
await self.queues[task_id].put({"type": "status", "status": task.status, "steps": task.steps})
async def complete_task(self, task_id: str):
if task_id in self.tasks:
task = self.tasks[task_id]
task.status = "completed"
await self.queues[task_id].put({"type": "status", "status": task.status, "steps": task.steps})
await self.queues[task_id].put({"type": "complete"})
async def fail_task(self, task_id: str, error: str):
if task_id in self.tasks:
self.tasks[task_id].status = f"failed: {error}"
await self.queues[task_id].put({"type": "error", "message": error})
task_manager = TaskManager()
@app.post("/tasks")
async def create_task(prompt: str = Body(..., embed=True)):
task = task_manager.create_task(prompt)
asyncio.create_task(run_task(task.id, prompt))
return {"task_id": task.id}
async def run_task(task_id: str, prompt: str):
try:
logger.info(f"Simulating task: {task_id} with prompt: {prompt}")
task_manager.tasks[task_id].status = "running"
# Simulated processing
await asyncio.sleep(2) # simulate delay
result_text = f"Simulated response for prompt: '{prompt}'"
await task_manager.update_task_step(task_id, 0, result_text, "result")
await task_manager.complete_task(task_id)
except Exception as e:
logger.error(f"Simulated task failed: {e}")
await task_manager.fail_task(task_id, str(e))
@app.get("/tasks/{task_id}/events")
async def task_events(task_id: str):
logger.info(f"Client subscribed to events for task: {task_id}")
async def event_generator():
if task_id not in task_manager.queues:
yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n"
return
queue = task_manager.queues[task_id]
task = task_manager.tasks.get(task_id)
if task:
yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
last_event_time = time.time()
while True:
try:
# wait up to 5 seconds for new events
try:
event = await asyncio.wait_for(queue.get(), timeout=5.0)
formatted_event = dumps(event)
yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
last_event_time = time.time()
if event["type"] in ["complete", "error"]:
break
except asyncio.TimeoutError:
# Send heartbeat to keep connection alive
yield ": heartbeat\n\n"
except asyncio.CancelledError:
break
except Exception as e:
yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
break
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.get("/", response_class=HTMLResponse)
async def homepage(request: Request):
return templates.TemplateResponse("index.html", {"request": request})