from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel from typing import List, Optional import json import time import uuid import logging import g4f from g4f.client import Client # ===================================================== # LOGGING # ===================================================== logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ===================================================== # CONFIG # ===================================================== API_KEY = "sk-your-secret-key" # ===================================================== # FASTAPI # ===================================================== app = FastAPI( title="Universal AI Gateway", version="5.3.0" ) # ===================================================== # CORS # ===================================================== app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ===================================================== # MODELS # ===================================================== class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: List[Message] stream: bool = False temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 4096 # ===================================================== # AUTH # ===================================================== def verify_api_key(req: Request): auth = req.headers.get("Authorization") # السماح بدون مفتاح للاختبار if not auth: return True if not auth.startswith("Bearer "): raise HTTPException( status_code=401, detail="Invalid Authorization Format" ) token = auth.replace("Bearer ", "").strip() if token != API_KEY: raise HTTPException( status_code=403, detail="Invalid API Key" ) return True # ===================================================== # ROOT # ===================================================== @app.get("/") async def root(): return { "status": "online", "service": "Universal AI Gateway", "version": "5.3.0" } # ===================================================== # MODELS # ===================================================== @app.get("/v1/models") async def get_models(): models_data = [] try: if hasattr(g4f.models, "_all_models"): all_models = list(g4f.models._all_models) for model in all_models: models_data.append({ "id": str(model), "object": "model", "created": int(time.time()), "owned_by": "g4f" }) except Exception as e: logger.error(f"Models error: {e}") return { "object": "list", "data": models_data } # ===================================================== # CHAT COMPLETIONS # ===================================================== @app.post("/v1/chat/completions") async def chat_completions( req: Request, body: ChatRequest ): verify_api_key(req) messages = [ { "role": m.role, "content": m.content } for m in body.messages ] logger.info( f"Request model={body.model} stream={body.stream}" ) # ================================================= # STREAMING # ================================================= if body.stream: def generate_stream(): chunk_id = f"chatcmpl-{uuid.uuid4().hex}" try: client = Client() response = client.chat.completions.create( model=body.model, messages=messages, stream=True ) has_content = False for chunk in response: try: content = "" if ( hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0 and hasattr(chunk.choices[0], "delta") and chunk.choices[0].delta and chunk.choices[0].delta.content ): content = chunk.choices[0].delta.content if not content: continue has_content = True payload = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": body.model, "choices": [ { "index": 0, "delta": { "content": content }, "finish_reason": None } ] } yield ( "data: " + json.dumps( payload, ensure_ascii=False ) + "\n\n" ) except Exception as chunk_error: logger.error( f"Chunk error: {chunk_error}" ) if not has_content: error_payload = { "error": { "message": "Provider returned empty stream", "type": "empty_stream" } } yield ( "data: " + json.dumps(error_payload) + "\n\n" ) final_payload = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": body.model, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop" } ] } yield ( "data: " + json.dumps(final_payload) + "\n\n" ) yield "data: [DONE]\n\n" except Exception as e: logger.error( f"Streaming error: {e}" ) error_payload = { "error": { "message": str(e), "type": "server_error" } } yield ( "data: " + json.dumps(error_payload) + "\n\n" ) yield "data: [DONE]\n\n" return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # ================================================= # NORMAL RESPONSE # ================================================= try: client = Client() response = client.chat.completions.create( model=body.model, messages=messages, stream=False ) assistant_message = "" try: assistant_message = ( response .choices[0] .message .content ) except Exception: assistant_message = str(response) return JSONResponse({ "id": ( f"chatcmpl-{uuid.uuid4().hex}" ), "object": "chat.completion", "created": int(time.time()), "model": body.model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": assistant_message }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0 } }) except Exception as e: logger.error(f"Chat error: {e}") raise HTTPException( status_code=500, detail=str(e) ) # ===================================================== # RUN # ===================================================== if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info" )