""" Universal AI Gateway - G4F Backend with FastAPI Fully compatible with n8n - Returns correct response format """ from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field from typing import List, Optional from dataclasses import dataclass from contextlib import asynccontextmanager import json import time import uuid import logging import os import g4f from g4f.client import Client # ===================================================== # LOGGING # ===================================================== logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-8s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger = logging.getLogger("ai-gateway") # ===================================================== # CONFIG # ===================================================== HOST: str = os.getenv("HOST", "0.0.0.0") PORT: int = int(os.getenv("PORT", "7860")) # ===================================================== # WORKING MODELS # ===================================================== WORKING_MODELS: List[str] = [ "command-a", "command-r-plus", "kimi-k2", "aria", "gpt-4", ] # ===================================================== # FALLBACK MODELS # ===================================================== FALLBACK_MODELS: List[str] = WORKING_MODELS.copy() # ===================================================== # Lifespan # ===================================================== @asynccontextmanager async def lifespan(app: FastAPI): logger.info(f"🚀 AI Gateway starting on {HOST}:{PORT}") logger.info(f"📋 Available models: {', '.join(WORKING_MODELS)}") yield logger.info("👋 AI Gateway shutting down") # ===================================================== # FASTAPI APP # ===================================================== app = FastAPI( title="Universal AI Gateway", version="6.0.0", lifespan=lifespan, ) # ===================================================== # CORS # ===================================================== app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ===================================================== # PYDANTIC MODELS # ===================================================== class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: List[Message] stream: bool = False # ===================================================== # MODEL HELPER # ===================================================== def create_chat(messages: list, model: str, stream: bool = False): models_to_try = [model] + [m for m in FALLBACK_MODELS if m != model] for model_name in models_to_try: try: client = Client() logger.info(f"🔄 Trying model: {model_name}") response = client.chat.completions.create( model=model_name, messages=messages, stream=stream, ) logger.info(f"✅ Success with model: {model_name}") return response, model_name except Exception as e: logger.warning(f"❌ Model {model_name} failed: {str(e)[:100]}") continue raise Exception("All models failed") # ===================================================== # ENDPOINTS # ===================================================== @app.get("/") async def root(): return {"status": "online", "models": WORKING_MODELS} @app.get("/v1/models") async def get_models(): models_data = [{"id": model} for model in WORKING_MODELS] return {"data": models_data} # ===================================================== # MAIN CHAT ENDPOINT # ===================================================== @app.post("/v1/chat/completions") async def chat_completions(request: Request, body: ChatRequest): messages = [{"role": m.role, "content": m.content} for m in body.messages] request_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" logger.info(f"📨 Request [{request_id}] model={body.model}") # NON-STREAMING if not body.stream: try: response, used_model = create_chat(messages, body.model, stream=False) assistant_message = response.choices[0].message.content return { "id": request_id, "object": "chat.completion", "created": int(time.time()), "model": used_model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": assistant_message }, "finish_reason": "stop" }] } except Exception as e: logger.error(f"Error: {e}") raise HTTPException(status_code=500, detail=str(e)) # STREAMING async def generate(): try: response, used_model = create_chat(messages, body.model, stream=True) for chunk in response: if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content data = { "id": request_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": used_model, "choices": [{ "index": 0, "delta": {"content": content}, "finish_reason": None }] } yield f"data: {json.dumps(data)}\n\n" yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': 'stop'}]})}\n\n" yield "data: [DONE]\n\n" except Exception as e: logger.error(f"Stream error: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") # ===================================================== # RESPONSES API - FOR n8n # ===================================================== @app.post("/v1/responses") async def responses_endpoint(request: Request): try: body = await request.json() except: body = {} logger.info(f"🔄 Responses request from {request.client.host}") # Extract messages from responses format messages = [] if "input" in body: if isinstance(body["input"], str): messages = [{"role": "user", "content": body["input"]}] elif isinstance(body["input"], list): for item in body["input"]: if isinstance(item, str): messages.append({"role": "user", "content": item}) elif isinstance(item, dict) and "content" in item: messages.append({"role": item.get("role", "user"), "content": item["content"]}) elif "messages" in body: messages = body["messages"] elif "prompt" in body: messages = [{"role": "user", "content": body["prompt"]}] if not messages: messages = [{"role": "user", "content": "Say hello"}] model_name = body.get("model", "command-a") if model_name not in WORKING_MODELS: model_name = "command-a" # Convert to chat format chat_messages = [Message(role=m.get("role", "user"), content=m.get("content", "")) for m in messages] stream = body.get("stream", False) chat_body = ChatRequest(model=model_name, messages=chat_messages, stream=stream) # Return response in format n8n expects result = await chat_completions(request, chat_body) # For non-streaming, ensure it's JSON if not stream and isinstance(result, dict): return JSONResponse(content=result) return result # ===================================================== # RUN # ===================================================== if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host=HOST, port=PORT)