pybackend / main.py
vish85521's picture
Upload 17 files
dffa8c2 verified
"""
AgentSociety Marketing Platform - FastAPI Backend
"""
import os
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from sqlalchemy import text
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events"""
from app.config import get_settings
settings = get_settings()
# Startup
logger.info("Starting AgentSociety API...")
# Create upload directory
os.makedirs(settings.upload_dir, exist_ok=True)
# Start results listener (receives simulation results from Ray worker)
from app.results_listener import start_results_listener
start_results_listener(redis_url=settings.redis_url)
logger.info("Results listener started - listening for Ray worker results")
yield
# Shutdown
from app.results_listener import stop_results_listener
stop_results_listener()
logger.info("Shutting down AgentSociety API...")
# Create FastAPI app - disable redirect_slashes to prevent 307 that strips auth headers
app = FastAPI(
title="AgentSociety Marketing Platform",
description="AI-powered marketing simulation platform that simulates 1,000+ AI agents reacting to video advertisements",
version="1.0.0",
lifespan=lifespan,
redirect_slashes=False
)
# CORS configuration - explicit origins required when credentials=True
origins = [
"http://localhost:3000",
"http://127.0.0.1:3000",
"http://localhost:8000",
"http://127.0.0.1:8000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["*"],
expose_headers=["*"],
)
# Register routers
from app.routers import auth_router, projects_router, simulations_router
app.include_router(auth_router)
app.include_router(projects_router)
app.include_router(simulations_router)
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "AgentSociety API",
"version": "1.0.0"
}
@app.get("/health")
async def health_check():
"""Detailed health check"""
from app.config import get_settings
settings = get_settings()
health = {
"api": "healthy",
"database": "unknown",
"redis": "unknown"
}
# Check database
try:
from app.database import engine
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
health["database"] = "healthy"
except Exception as e:
health["database"] = f"unhealthy: {str(e)}"
# Check Redis
try:
import redis
import ssl as ssl_module
redis_kwargs = {}
if settings.redis_url.startswith("rediss://"):
redis_kwargs["ssl_cert_reqs"] = ssl_module.CERT_REQUIRED
r = redis.from_url(settings.redis_url, **redis_kwargs)
r.ping()
health["redis"] = "healthy"
except Exception as e:
health["redis"] = f"unhealthy: {str(e)}"
return health