from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel from typing import List, Optional import json import time import uuid import logging import g4f from g4f.client import Client # ===================================================== # LOGGING # ===================================================== logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ===================================================== # CONFIG # ===================================================== API_KEY = "sk-your-secret-key" # ===================================================== # FASTAPI # ===================================================== app = FastAPI( title="Universal AI Gateway", version="5.3.0" ) # ===================================================== # CORS # ===================================================== app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ===================================================== # MODELS # ===================================================== class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: List[Message] stream: bool = False temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 4096 # ===================================================== # AUTH # ===================================================== def verify_api_key(req: Request): auth = req.headers.get("Authorization") x_api_key = req.headers.get("x-api-key") # السماح بدون مفتاح للاختبار if not auth and not x_api_key: return True token = None if x_api_key: token = x_api_key.strip() elif auth: parts = auth.split(" ", 1) if len(parts) == 2: token = parts[1].strip() if not token: return True if API_KEY and token != API_KEY: raise HTTPException( status_code=403, detail="Invalid API Key" ) return True # ===================================================== # ROOT # ===================================================== @app.get("/") async def root(): return { "status": "online", "service": "Universal AI Gateway", "version": "5.3.0" } @app.head("/") async def root_head(): return JSONResponse( status_code=200, content={} ) # ===================================================== # MODEL NORMALIZER # ===================================================== def normalize_model(model: str): if not model: return "gpt-4" model_lower = model.lower() # GPT if model_lower == "gpt-4": return "gpt-4" if model_lower == "gpt-4o": return "gpt-4o" if model_lower == "gpt-4o-mini": return "gpt-4o-mini" if model_lower == "gpt-3.5-turbo": return "gpt-3.5-turbo" # Claude if "claude" in model_lower: return "gpt-4" # Gemini if "gemini" in model_lower: return "gemini" # Command if model_lower == "command-r": return "command-r" if model_lower == "command-r-plus": return "command-r-plus" # Grok if "grok" in model_lower: return "gpt-4" # Llama if "llama" in model_lower: return "llama-3-70b" # Mistral if "mistral" in model_lower: return "mistral-7b" return model # ===================================================== # SAFE CLIENT # ===================================================== def create_client(): return Client() # ===================================================== # MODELS # ===================================================== @app.get("/v1/models") async def get_models(): models_data = [] visible_models = [ # GPT "gpt-4", "gpt-4-turbo", "gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo", # Claude "claude-sonnet-4-6", "claude-opus-4", "claude-3-7-sonnet", # Gemini "gemini-2.5-pro", "gemini-2.5-flash", # Cohere "command-r", "command-r-plus", # Other "grok-4", "llama-3-70b", "mistral-7b" ] for model in visible_models: models_data.append({ "id": model, "object": "model", "created": int(time.time()), "owned_by": "openai", "permission": [], "root": model, "parent": None }) return { "object": "list", "data": models_data } # ===================================================== # CHAT COMPLETIONS # ===================================================== @app.post("/v1/chat/completions") async def chat_completions( req: Request, body: ChatRequest ): verify_api_key(req) model = normalize_model( body.model ) messages = [ { "role": m.role, "content": m.content } for m in body.messages ] logger.info( f"Request model={model} stream={body.stream}" ) if body.stream: def generate_stream(): chunk_id = f"chatcmpl-{uuid.uuid4().hex}" try: client = create_client() response = client.chat.completions.create( model=model, messages=messages, stream=True ) has_content = False for chunk in response: try: content = "" if ( hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0 and hasattr(chunk.choices[0], "delta") and chunk.choices[0].delta and chunk.choices[0].delta.content ): content = ( chunk .choices[0] .delta.content ) if not content: continue has_content = True payload = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "delta": { "content": content }, "finish_reason": None } ] } yield ( "data: " + json.dumps( payload, ensure_ascii=False ) + "\n\n" ) except Exception as chunk_error: logger.error( f"Chunk error: {chunk_error}" ) if not has_content: yield ( "data: " + json.dumps({ "error": { "message": "Empty response" } }) + "\n\n" ) final_payload = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop" } ] } yield ( "data: " + json.dumps(final_payload) + "\n\n" ) yield "data: [DONE]\n\n" except Exception as e: logger.error( f"Streaming error: {e}" ) yield ( "data: " + json.dumps({ "error": { "message": str(e) } }) + "\n\n" ) yield "data: [DONE]\n\n" return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) try: client = create_client() response = client.chat.completions.create( model=model, messages=messages, stream=False ) assistant_message = "" try: assistant_message = ( response .choices[0] .message .content ) except Exception: assistant_message = str(response) return JSONResponse({ "id": ( f"chatcmpl-{uuid.uuid4().hex}" ), "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": assistant_message }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0 } }) except Exception as e: logger.error(f"Chat error: {e}") raise HTTPException( status_code=500, detail=str(e) ) # ===================================================== # ANTHROPIC MESSAGES # ===================================================== @app.post("/v1/messages") async def anthropic_messages( req: Request, body: dict ): verify_api_key(req) requested_model = body.get("model") model = normalize_model( requested_model ) messages = body.get("messages", []) logger.info( f"Anthropic request model={model} stream=True" ) converted_messages = [] for m in messages: role = m.get("role", "user") content = m.get("content", "") if isinstance(content, list): text_parts = [] for part in content: if ( isinstance(part, dict) and part.get("type") == "text" ): text_parts.append( part.get("text", "") ) content = "\n".join(text_parts) converted_messages.append({ "role": role, "content": content }) def generate_stream(): message_id = f"msg_{uuid.uuid4().hex}" try: client = create_client() response = client.chat.completions.create( model=model, messages=converted_messages, stream=True ) # ========================================= # message_start # ========================================= yield ( "event: message_start\n" "data: " + json.dumps({ "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "model": requested_model, "content": [], "stop_reason": None, "stop_sequence": None, "usage": { "input_tokens": 0, "output_tokens": 0 } } }) + "\n\n" ) # ========================================= # ping # ========================================= yield ( "event: ping\n" "data: " + json.dumps({ "type": "ping" }) + "\n\n" ) # ========================================= # content_block_start # ========================================= yield ( "event: content_block_start\n" "data: " + json.dumps({ "type": "content_block_start", "index": 0, "content_block": { "type": "text", "text": "" } }) + "\n\n" ) has_content = False for chunk in response: try: content = "" if ( hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0 and hasattr(chunk.choices[0], "delta") and chunk.choices[0].delta and chunk.choices[0].delta.content ): content = ( chunk .choices[0] .delta.content ) if not content: continue has_content = True payload = { "type": "content_block_delta", "index": 0, "delta": { "type": "text_delta", "text": content } } yield ( "event: content_block_delta\n" "data: " + json.dumps( payload, ensure_ascii=False ) + "\n\n" ) except Exception as e: logger.error( f"Anthropic chunk error: {e}" ) if not has_content: yield ( "event: error\n" "data: " + json.dumps({ "type": "error", "error": { "type": "api_error", "message": "Empty response" } }) + "\n\n" ) return # ========================================= # content_block_stop # ========================================= yield ( "event: content_block_stop\n" "data: " + json.dumps({ "type": "content_block_stop", "index": 0 }) + "\n\n" ) # ========================================= # message_delta # ========================================= yield ( "event: message_delta\n" "data: " + json.dumps({ "type": "message_delta", "delta": { "stop_reason": "end_turn", "stop_sequence": None }, "usage": { "output_tokens": 0 } }) + "\n\n" ) # ========================================= # message_stop # ========================================= yield ( "event: message_stop\n" "data: " + json.dumps({ "type": "message_stop" }) + "\n\n" ) except Exception as e: logger.error( f"Anthropic stream error: {e}" ) yield ( "event: error\n" "data: " + json.dumps({ "type": "error", "error": { "type": "api_error", "message": str(e) } }) + "\n\n" ) return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) # ===================================================== # RUN # ===================================================== if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info" )