Spctest / app.py
bahi-bh's picture
Update app.py
001f551 verified
"""
Universal AI Gateway - G4F Backend with FastAPI
Enhanced version with production-grade features
"""
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
from dataclasses import dataclass
from enum import Enum
from contextlib import asynccontextmanager
import json
import time
import uuid
import logging
import os
import asyncio
import g4f
from g4f.client import Client
# =====================================================
# LOGGING - سجلات مُحسّنة مع ألوان في الـ terminal
# =====================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger("ai-gateway")
# =====================================================
# CONFIG - قراءة من متغيرات البيئة
# =====================================================
API_KEY: str = os.getenv("API_KEY", "")
REQUEST_TIMEOUT: int = int(os.getenv("REQUEST_TIMEOUT", "120"))
MAX_RETRIES: int = int(os.getenv("MAX_RETRIES", "2"))
MAX_MESSAGES: int = int(os.getenv("MAX_MESSAGES", "50"))
HOST: str = os.getenv("HOST", "0.0.0.0")
PORT: int = int(os.getenv("PORT", "7860"))
# =====================================================
# FALLBACK MODELS - نماذج بديلة عند الفشل
# =====================================================
FALLBACK_MODELS: List[str] = [
"gpt-4o-mini",
"gpt-4o",
"gemini-2.0-flash",
"llama-3.1-70b",
]
# =====================================================
# Lifespan - إدارة دورة حياة التطبيق
# =====================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(f"🚀 AI Gateway starting on {HOST}:{PORT}")
yield
logger.info("👋 AI Gateway shutting down")
# =====================================================
# FASTAPI APP
# =====================================================
app = FastAPI(
title="Universal AI Gateway",
version="6.0.0",
description="OpenAI-compatible API powered by G4F",
lifespan=lifespan,
)
# =====================================================
# CORS - مُحسّن
# =====================================================
allowed_origins = os.getenv("ALLOWED_ORIGINS", "*")
origins_list = (
["*"]
if allowed_origins == "*"
else [o.strip() for o in allowed_origins.split(",")]
)
app.add_middleware(
CORSMiddleware,
allow_origins=origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =====================================================
# RATE LIMITER - مُحدد الطلبات
# =====================================================
@dataclass
class RateLimiter:
requests_per_minute: int = 30
requests: dict = None
def __post_init__(self):
self.requests = {}
def check(self, client_ip: str) -> bool:
now = time.time()
window_start = now - 60
# تنظيف الطلبات القديمة
if client_ip in self.requests:
self.requests[client_ip] = [
t for t in self.requests[client_ip]
if t > window_start
]
else:
self.requests[client_ip] = []
if len(self.requests[client_ip]) >= self.requests_per_minute:
return False
self.requests[client_ip].append(now)
return True
rate_limiter = RateLimiter(requests_per_minute=60)
# =====================================================
# PYDANTIC MODELS
# =====================================================
class Message(BaseModel):
role: str = Field(
...,
pattern="^(system|user|assistant)$",
description="Message role"
)
content: str = Field(
...,
min_length=1,
max_length=32000,
description="Message content"
)
class ChatRequest(BaseModel):
model: str = Field(
...,
min_length=1,
description="Model identifier"
)
messages: List[Message] = Field(
...,
min_length=1,
max_length=MAX_MESSAGES,
description="Chat messages"
)
stream: bool = Field(
default=False,
description="Enable streaming"
)
temperature: Optional[float] = Field(
default=0.7,
ge=0.0,
le=2.0,
description="Sampling temperature"
)
max_tokens: Optional[int] = Field(
default=4096,
ge=1,
le=128000,
description="Maximum tokens"
)
# =====================================================
# AUTHENTICATION
# =====================================================
def verify_api_key(request: Request) -> bool:
"""التحقق من مفتاح API"""
# لا يوجد مفتاح مُعرّف = السماح للجميع
if not API_KEY:
return True
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Authorization header required",
"type": "authentication_error",
}
},
)
if not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Invalid authorization format. Use: Bearer <key>",
"type": "authentication_error",
}
},
)
token = auth_header[7:].strip()
if token != API_KEY:
raise HTTPException(
status_code=403,
detail={
"error": {
"message": "Invalid API key",
"type": "authentication_error",
}
},
)
return True
# =====================================================
# MODEL HELPER - طلب بديل عند الفشل
# =====================================================
def create_chat(
messages: list,
model: str,
stream: bool = False,
max_retries: int = MAX_RETRIES,
) -> any:
"""إرسال طلب مع إعادة المحاولة على نماذج بديلة"""
models_to_try = [model] + [
m for m in FALLBACK_MODELS
if m != model
]
last_error = None
for model_name in models_to_try[:max_retries + 1]:
try:
client = Client()
logger.info(f"Trying model: {model_name}")
response = client.chat.completions.create(
model=model_name,
messages=messages,
stream=stream,
)
logger.info(f"✅ Success with model: {model_name}")
return response, model_name
except Exception as e:
last_error = e
logger.warning(
f"❌ Model {model_name} failed: {e}"
)
continue
raise last_error
# =====================================================
# ROOT
# =====================================================
@app.get("/")
async def root():
return {
"service": "Universal AI Gateway",
"version": "6.0.0",
"status": "online",
}
# =====================================================
# HEALTH CHECK
# =====================================================
@app.get("/health")
async def health_check():
"""فحص صحة الخدمة"""
return {
"status": "healthy",
"timestamp": int(time.time()),
"version": "6.0.0",
"config": {
"auth_required": bool(API_KEY),
"timeout": REQUEST_TIMEOUT,
"max_retries": MAX_RETRIES,
"max_messages": MAX_MESSAGES,
},
}
# =====================================================
# LOAD AVAILABLE MODELS
# =====================================================
@app.get("/v1/models")
async def get_models():
"""قائمة النماذج المتاحة"""
models_data = []
try:
if hasattr(g4f.models, "_all_models"):
all_models = list(g4f.models._all_models)
for model in all_models:
models_data.append({
"id": str(model),
"object": "model",
"created": int(time.time()),
"owned_by": "g4f",
})
except Exception as e:
logger.error(f"Failed to load models: {e}")
return {"object": "list", "data": models_data}
# =====================================================
# CHAT COMPLETIONS - نقطة النهاية الرئيسية
# =====================================================
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, body: ChatRequest):
"""إرسال رسالة والحصول على رد"""
# 1. التحقق من المصادقة
verify_api_key(request)
# 2. التحقق من حد الطلبات
client_ip = request.client.host
if not rate_limiter.check(client_ip):
raise HTTPException(
status_code=429,
detail={
"error": {
"message": "Rate limit exceeded. Try again later.",
"type": "rate_limit_error",
}
},
)
# 3. تحويل الرسائل
messages = [
{"role": m.role, "content": m.content}
for m in body.messages
]
request_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
logger.info(
f"📨 Request [{request_id}] "
f"model={body.model} stream={body.stream}"
)
# =====================================
# STREAMING RESPONSE
# =====================================
if body.stream:
def generate_stream():
try:
response, used_model = create_chat(
messages=messages,
model=body.model,
stream=True,
)
has_content = False
for chunk in response:
try:
content = ""
if (
hasattr(chunk, "choices")
and chunk.choices
and len(chunk.choices) > 0
and hasattr(chunk.choices[0], "delta")
and chunk.choices[0].delta
and chunk.choices[0].delta.content
):
content = chunk.choices[0].delta.content
if not content:
continue
has_content = True
payload = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": used_model,
"choices": [
{
"index": 0,
"delta": {"content": content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
except Exception as chunk_err:
logger.warning(
f"Chunk processing error: {chunk_err}"
)
continue
if not has_content:
error_payload = {
"error": {
"message": "Provider returned empty stream",
"type": "empty_stream",
}
}
yield f"data: {json.dumps(error_payload)}\n\n"
# إشارة النهاية
final_payload = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(final_payload)}\n\n"
yield "data: [DONE]\n\n"
logger.info(f"✅ Stream completed [{request_id}]")
except Exception as e:
logger.error(f"Stream error [{request_id}]: {e}")
error_payload = {
"error": {
"message": str(e),
"type": "server_error",
}
}
yield f"data: {json.dumps(error_payload)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"X-Request-Id": request_id,
},
)
# =====================================
# NORMAL (NON-STREAM) RESPONSE
# =====================================
try:
response, used_model = create_chat(
messages=messages,
model=body.model,
stream=False,
)
# استخراج الرد
assistant_message = ""
try:
assistant_message = response.choices[0].message.content
except (AttributeError, IndexError):
assistant_message = str(response)
logger.info(
f"✅ Response [{request_id}] "
f"length={len(assistant_message)}"
)
return JSONResponse({
"id": request_id,
"object": "chat.completion",
"created": int(time.time()),
"model": used_model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": assistant_message,
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
})
except Exception as e:
logger.error(f"Chat error [{request_id}]: {e}")
raise HTTPException(
status_code=500,
detail={
"error": {
"message": str(e),
"type": "server_error",
}
},
)
# =====================================================
# RUN
# =====================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host=HOST,
port=PORT,
log_level="info",
reload=False,
)