File size: 6,531 Bytes
81ff144 bea04ab 81ff144 bea04ab 81ff144 aeb2234 81ff144 aeb2234 bea04ab 81ff144 ffac2f3 81ff144 0cb1aa7 81ff144 0cb1aa7 81ff144 bea04ab 81ff144 ffac2f3 81ff144 0797c65 81ff144 4aa9189 0797c65 4aa9189 81ff144 ffac2f3 81ff144 bea04ab 81ff144 bea04ab 81ff144 bea04ab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, Response
import asyncio
import logging
import os
import json
from pathlib import Path
from dotenv import load_dotenv
import sentry_sdk
from services.orchestrator_service import orchestrator_service
from services.infrastructure_service import infrastructure_service
from services.config import settings
from worker import AubmWorker
def _load_app_version() -> str:
version_file = Path(__file__).resolve().parent.parent / "VERSION"
if version_file.exists():
value = version_file.read_text(encoding="utf-8").strip()
if value:
return value
return os.getenv("APP_VERSION", "0.7.0")
# Load environment variables
load_dotenv()
FRONTEND_DIST = Path(__file__).resolve().parent.parent / "frontend" / "dist"
APP_VERSION = _load_app_version()
logger = logging.getLogger("aubm.api")
embedded_worker: AubmWorker | None = None
embedded_worker_task: asyncio.Task | None = None
# Sentry Initialization
SENTRY_DSN = os.getenv("SENTRY_DSN")
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
)
app = FastAPI(
title="Aubm API",
description="Enterprise-Grade AI Agent Orchestration & Collaboration Platform",
version=APP_VERSION
)
# CORS Configuration
allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:5173,http://localhost:3000,http://127.0.0.1:5173").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins if allowed_origins != ["*"] else ["*"],
allow_origin_regex=os.getenv("ALLOWED_ORIGIN_REGEX"),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _log_embedded_worker_result(task: asyncio.Task) -> None:
if task.cancelled():
return
exc = task.exception()
if exc:
logger.error(
"Embedded worker stopped unexpectedly",
exc_info=(type(exc), exc, exc.__traceback__),
)
@app.on_event("startup")
async def start_embedded_worker() -> None:
global embedded_worker, embedded_worker_task
if settings.TASK_EXECUTION_MODE != "queue" or not settings.TASK_QUEUE_EMBEDDED_WORKER:
return
if embedded_worker_task and not embedded_worker_task.done():
return
embedded_worker = AubmWorker()
embedded_worker_task = asyncio.create_task(embedded_worker.start())
embedded_worker_task.add_done_callback(_log_embedded_worker_result)
logger.info("Embedded task worker started: %s", embedded_worker.worker_id)
@app.on_event("shutdown")
async def stop_embedded_worker() -> None:
global embedded_worker, embedded_worker_task
if not embedded_worker or not embedded_worker_task:
return
embedded_worker.stop()
try:
await asyncio.wait_for(embedded_worker_task, timeout=10)
await embedded_worker.heartbeat("stopping")
except asyncio.TimeoutError:
embedded_worker_task.cancel()
logger.warning("Embedded task worker did not stop before timeout")
finally:
embedded_worker = None
embedded_worker_task = None
@app.get("/")
async def root():
index_path = FRONTEND_DIST / "index.html"
if index_path.exists():
return FileResponse(index_path)
return {
"status": "online",
"message": "Aubm API is operational",
"version": APP_VERSION
}
# Placeholder for routers
from routers import orchestrator, monitoring, agent_runner, generator
app.include_router(agent_runner.router, prefix="/api/tasks", tags=["Tasks"])
app.include_router(orchestrator.router, prefix="/api/orchestrator", tags=["orchestrator"])
app.include_router(generator.router, prefix="/api/generator", tags=["generator"])
app.include_router(monitoring.router, prefix="/api/monitoring", tags=["Monitoring"])
@app.get("/runtime-config.js", include_in_schema=False)
async def runtime_config():
config = {
"apiUrl": os.getenv("VITE_API_URL", ""),
"supabaseUrl": os.getenv("VITE_SUPABASE_URL", os.getenv("SUPABASE_URL", "")),
"supabaseAnonKey": os.getenv("VITE_SUPABASE_ANON_KEY", os.getenv("SUPABASE_ANON_KEY", "")),
"sentryDsn": os.getenv("VITE_SENTRY_DSN", os.getenv("SENTRY_DSN", "")),
"appVersion": APP_VERSION,
}
return Response(
content=f"window.__AUBM_CONFIG__ = {json.dumps(config)};",
media_type="application/javascript",
)
@app.get("/{path:path}", include_in_schema=False)
async def serve_frontend(path: str):
if not FRONTEND_DIST.exists():
return await root()
requested_path = FRONTEND_DIST / path
if requested_path.is_file():
return FileResponse(requested_path)
return Response(
content=f"window.__AUBM_CONFIG__ = {json.dumps(config)};",
media_type="application/javascript",
)
@app.get("/{path:path}", include_in_schema=False)
async def serve_frontend(path: str):
if not FRONTEND_DIST.exists():
return await root()
requested_path = FRONTEND_DIST / path
if requested_path.is_file():
return FileResponse(requested_path)
index_path = FRONTEND_DIST / "index.html"
if index_path.exists():
return FileResponse(index_path)
return await root()
# --- Infrastructure Management ---
@app.post("/infrastructure/nodes/provision")
async def provision_node(name: str = "aubm-inference-node", size: str = "s-4vcpu-8gb-amd"):
"""Creates a new inference node on DigitalOcean."""
node = await infrastructure_service.create_inference_node(name, size)
if not node:
raise HTTPException(status_code=500, detail="Failed to initiate node provisioning.")
return node
@app.get("/infrastructure/nodes/{droplet_id}/ip")
async def get_node_ip(droplet_id: int):
"""Wait and return the public IP of a node."""
ip = await infrastructure_service.wait_for_ip(droplet_id)
if not ip:
raise HTTPException(status_code=404, detail="IP not assigned or timed out.")
return {"ip": ip}
@app.delete("/infrastructure/nodes/{droplet_id}")
async def terminate_node(droplet_id: int):
"""Destroy an inference node."""
success = await infrastructure_service.terminate_node(droplet_id)
if not success:
raise HTTPException(status_code=500, detail="Failed to terminate node.")
return {"status": "termination_requested"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(settings.PORT))
|