File size: 6,531 Bytes
81ff144
 
 
bea04ab
 
81ff144
 
 
 
 
bea04ab
 
 
 
81ff144
aeb2234
 
 
 
 
 
 
 
 
 
81ff144
 
 
aeb2234
bea04ab
 
 
81ff144
 
 
 
 
 
 
 
 
 
 
 
 
ffac2f3
81ff144
 
 
0cb1aa7
81ff144
 
0cb1aa7
 
81ff144
 
 
 
 
bea04ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81ff144
 
 
 
 
 
 
 
 
ffac2f3
81ff144
 
 
0797c65
81ff144
4aa9189
0797c65
 
4aa9189
81ff144
 
 
 
 
 
 
 
ffac2f3
81ff144
 
 
 
 
 
bea04ab
 
 
 
 
 
 
 
 
 
 
 
 
 
81ff144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bea04ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81ff144
 
bea04ab
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, Response
import asyncio
import logging
import os
import json
from pathlib import Path
from dotenv import load_dotenv
import sentry_sdk
from services.orchestrator_service import orchestrator_service
from services.infrastructure_service import infrastructure_service
from services.config import settings
from worker import AubmWorker


def _load_app_version() -> str:
    version_file = Path(__file__).resolve().parent.parent / "VERSION"
    if version_file.exists():
        value = version_file.read_text(encoding="utf-8").strip()
        if value:
            return value
    return os.getenv("APP_VERSION", "0.7.0")


# Load environment variables
load_dotenv()
FRONTEND_DIST = Path(__file__).resolve().parent.parent / "frontend" / "dist"
APP_VERSION = _load_app_version()
logger = logging.getLogger("aubm.api")
embedded_worker: AubmWorker | None = None
embedded_worker_task: asyncio.Task | None = None

# Sentry Initialization
SENTRY_DSN = os.getenv("SENTRY_DSN")
if SENTRY_DSN:
    sentry_sdk.init(
        dsn=SENTRY_DSN,
        traces_sample_rate=1.0,
        profiles_sample_rate=1.0,
    )

app = FastAPI(
    title="Aubm API",
    description="Enterprise-Grade AI Agent Orchestration & Collaboration Platform",
    version=APP_VERSION
)

# CORS Configuration
allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:5173,http://localhost:3000,http://127.0.0.1:5173").split(",")
app.add_middleware(
    CORSMiddleware,
    allow_origins=allowed_origins if allowed_origins != ["*"] else ["*"],
    allow_origin_regex=os.getenv("ALLOWED_ORIGIN_REGEX"),
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


def _log_embedded_worker_result(task: asyncio.Task) -> None:
    if task.cancelled():
        return

    exc = task.exception()
    if exc:
        logger.error(
            "Embedded worker stopped unexpectedly",
            exc_info=(type(exc), exc, exc.__traceback__),
        )


@app.on_event("startup")
async def start_embedded_worker() -> None:
    global embedded_worker, embedded_worker_task

    if settings.TASK_EXECUTION_MODE != "queue" or not settings.TASK_QUEUE_EMBEDDED_WORKER:
        return

    if embedded_worker_task and not embedded_worker_task.done():
        return

    embedded_worker = AubmWorker()
    embedded_worker_task = asyncio.create_task(embedded_worker.start())
    embedded_worker_task.add_done_callback(_log_embedded_worker_result)
    logger.info("Embedded task worker started: %s", embedded_worker.worker_id)


@app.on_event("shutdown")
async def stop_embedded_worker() -> None:
    global embedded_worker, embedded_worker_task

    if not embedded_worker or not embedded_worker_task:
        return

    embedded_worker.stop()
    try:
        await asyncio.wait_for(embedded_worker_task, timeout=10)
        await embedded_worker.heartbeat("stopping")
    except asyncio.TimeoutError:
        embedded_worker_task.cancel()
        logger.warning("Embedded task worker did not stop before timeout")
    finally:
        embedded_worker = None
        embedded_worker_task = None


@app.get("/")
async def root():
    index_path = FRONTEND_DIST / "index.html"
    if index_path.exists():
        return FileResponse(index_path)

    return {
        "status": "online",
        "message": "Aubm API is operational",
        "version": APP_VERSION
    }

# Placeholder for routers
from routers import orchestrator, monitoring, agent_runner, generator

app.include_router(agent_runner.router, prefix="/api/tasks", tags=["Tasks"])
app.include_router(orchestrator.router, prefix="/api/orchestrator", tags=["orchestrator"])
app.include_router(generator.router, prefix="/api/generator", tags=["generator"])
app.include_router(monitoring.router, prefix="/api/monitoring", tags=["Monitoring"])

@app.get("/runtime-config.js", include_in_schema=False)
async def runtime_config():
    config = {
        "apiUrl": os.getenv("VITE_API_URL", ""),
        "supabaseUrl": os.getenv("VITE_SUPABASE_URL", os.getenv("SUPABASE_URL", "")),
        "supabaseAnonKey": os.getenv("VITE_SUPABASE_ANON_KEY", os.getenv("SUPABASE_ANON_KEY", "")),
        "sentryDsn": os.getenv("VITE_SENTRY_DSN", os.getenv("SENTRY_DSN", "")),
        "appVersion": APP_VERSION,
    }
    return Response(
        content=f"window.__AUBM_CONFIG__ = {json.dumps(config)};",
        media_type="application/javascript",
    )

@app.get("/{path:path}", include_in_schema=False)
async def serve_frontend(path: str):
    if not FRONTEND_DIST.exists():
        return await root()

    requested_path = FRONTEND_DIST / path
    if requested_path.is_file():
        return FileResponse(requested_path)

    return Response(
        content=f"window.__AUBM_CONFIG__ = {json.dumps(config)};",
        media_type="application/javascript",
    )

@app.get("/{path:path}", include_in_schema=False)
async def serve_frontend(path: str):
    if not FRONTEND_DIST.exists():
        return await root()

    requested_path = FRONTEND_DIST / path
    if requested_path.is_file():
        return FileResponse(requested_path)

    index_path = FRONTEND_DIST / "index.html"
    if index_path.exists():
        return FileResponse(index_path)

    return await root()

# --- Infrastructure Management ---

@app.post("/infrastructure/nodes/provision")
async def provision_node(name: str = "aubm-inference-node", size: str = "s-4vcpu-8gb-amd"):
    """Creates a new inference node on DigitalOcean."""
    node = await infrastructure_service.create_inference_node(name, size)
    if not node:
        raise HTTPException(status_code=500, detail="Failed to initiate node provisioning.")
    return node

@app.get("/infrastructure/nodes/{droplet_id}/ip")
async def get_node_ip(droplet_id: int):
    """Wait and return the public IP of a node."""
    ip = await infrastructure_service.wait_for_ip(droplet_id)
    if not ip:
        raise HTTPException(status_code=404, detail="IP not assigned or timed out.")
    return {"ip": ip}

@app.delete("/infrastructure/nodes/{droplet_id}")
async def terminate_node(droplet_id: int):
    """Destroy an inference node."""
    success = await infrastructure_service.terminate_node(droplet_id)
    if not success:
        raise HTTPException(status_code=500, detail="Failed to terminate node.")
    return {"status": "termination_requested"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=int(settings.PORT))