caulde / app.py
bahi-bh's picture
Update app.py
024f4e3 verified
raw
history blame
28.2 kB
import os
import json
import time
import logging
import asyncio
import threading
from typing import Any, Dict, List, Optional
from collections import OrderedDict
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, JSONResponse, Response
from pydantic import BaseModel
import g4f
# =====================================================
# LOGGING
# =====================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("g4f-smart-router")
# =====================================================
# COOKIES
# =====================================================
def _load_cookies_raw() -> Dict[str, Any]:
raw_env = (os.getenv("COOKIES_JSON") or "").strip()
if raw_env:
try:
return json.loads(raw_env)
except Exception as e:
logger.warning(f"Failed to load cookies from env: {e}")
try:
if os.path.exists("cookies.json"):
with open("cookies.json", "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load cookies from file: {e}")
return {}
def load_cookies() -> str:
data = _load_cookies_raw()
if not data:
return "⚠️ No Cookies"
try:
from g4f.cookies import set_cookies
except Exception:
return "⚠️ Cookies Found"
for domain, vals in data.items():
try:
dom = domain if "." in domain else f".{domain}.com"
if isinstance(vals, list):
vals = {x["name"]: x["value"] for x in vals if isinstance(x, dict)}
if isinstance(vals, dict):
set_cookies(dom, vals)
except Exception as e:
logger.warning(f"Cookie error for {domain}: {e}")
return "✅ Cookies Loaded"
COOKIE_STATUS = load_cookies()
# =====================================================
# CACHE
# =====================================================
class TTLCache:
def __init__(self, max_size: int = 100, ttl_seconds: int = 300):
self.cache: OrderedDict = OrderedDict()
self.max_size = max_size
self.ttl = ttl_seconds
self._lock = threading.Lock()
self._last_cleanup = time.time()
self._cleanup_interval = 60
def _clean_expired(self):
now = time.time()
if now - self._last_cleanup < self._cleanup_interval:
return
self._last_cleanup = now
expired = [k for k, (_, ts) in self.cache.items() if now - ts > self.ttl]
for k in expired:
del self.cache[k]
def get(self, key: str) -> Optional[str]:
with self._lock:
if key in self.cache:
value, _ = self.cache[key]
self.cache.move_to_end(key)
return value
return None
def set(self, key: str, value: str):
with self._lock:
self._clean_expired()
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = (value, time.time())
CACHE = TTLCache(max_size=100, ttl_seconds=300)
# =====================================================
# PROVIDERS - فقط Qwen
# =====================================================
def get_provider(name: str):
try:
return getattr(g4f.Provider, name)
except:
return None
REAL_PROVIDERS = {
"Qwen": get_provider("Qwen"),
}
REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v}
# =====================================================
# MODELS - جميع نماذج Qwen (بعد البحث)
# =====================================================
PROVIDER_MODELS_FALLBACK = {
"Qwen": [
# =====================================
# Qwen 3.6 SERIES (أحدث الإصدارات حسب Alibaba Cloud 2026-04-02)
# المصدر: https://www.alibabacloud.com/help/en/model-studio/newly-released-models
# =====================================
"qwen3.6-plus", # Qwen3.6-Plus الرئيسي
"qwen3.6-plus-2026-04-02", # إصدار مؤرخ
"qwen3.6-35b-a3b", # النموذج المفتوح المصدر 35B MoE
# =====================================
# Qwen 3.5 SERIES
# المصدر: Alibaba Cloud Model Catalog
# =====================================
"qwen3.5-plus", # Qwen3.5-Plus الرئيسي
"qwen3.5-plus-2026-02-15", # إصدار مؤرخ
"qwen3.5-flash", # للاستجابة السريعة
"qwen3.5-flash-2026-02-23", # إصدار مؤرخ
"qwen3.5-122b-a10b", # 122B مع 10B نشط
"qwen3.5-27b", # 27B كثيف
"qwen3.5-35b-a3b", # 35B MoE كثيف
# =====================================
# Qwen 3 MAX & FLAGSHIP
# المصدر: Promptfoo Documentation
# =====================================
"qwen3-max", # الجيل التالي الرائد
"qwen3-max-preview", # نسخة تجريبية
"qwen3-max-2025-09-23", # إصدار مؤرخ
"qwen-max", # الرائد الأصلي
"qwen-max-latest", # دائماً محدث
"qwen-max-2025-01-25", # إصدار مؤرخ
# =====================================
# Qwen PLUS & TURBO (متعددة الاستخدامات)
# =====================================
"qwen-plus", # متوازن للغاية
"qwen-plus-latest", # دائماً محدث
"qwen-plus-2025-09-11", # إصدار مؤرخ
"qwen-plus-2025-07-28", # إصدار مؤرخ
"qwen-plus-2025-07-14", # إصدار مؤرخ
"qwen-plus-2025-04-28", # إصدار مؤرخ
"qwen-plus-2025-01-25", # إصدار مؤرخ
"qwen-turbo", # سريع وفعال من حيث التكلفة
"qwen-turbo-latest", # دائماً محدث
"qwen-turbo-2025-04-28", # إصدار مؤرخ
"qwen-turbo-2024-11-01", # إصدار مؤرخ
"qwen-flash", # محسّن للوقت (يستبدل النفاث)
"qwen-flash-2025-07-28", # إصدار مؤرخ
# =====================================
# Qwen LONG (سياق فائق الطول)
# =====================================
"qwen-long-latest", # مليون سياق
"qwen-long-2025-01-25", # إصدار مؤرخ
# =====================================
# Qwen 3 CODER (نماذج البرمجة المتخصصة)
# المصدر: Alibaba Cloud + Promptfoo
# =====================================
"qwen3-coder-plus", # كود احترافي مع أداة الاتصال
"qwen3-coder-plus-2025-09-23", # إصدار مؤرخ
"qwen3-coder-plus-2025-07-22", # إصدار مؤرخ
"qwen3-coder-flash", # توليد كود سريع
"qwen3-coder-flash-2025-07-28", # إصدار مؤرخ
"qwen3-coder-next", # الجيل التالي من الكود
"qwen3-coder-480b-a35b-instruct", # 480B نموذج كود مفتوح
"qwen3-coder-30b-a3b-instruct", # 30B نموذج كود مفتوح
# =====================================
# Qwen 3 VL (الرؤية والفيديو)
# المصدر: Promptfoo + DashScope
# =====================================
"qwen3-vl-plus", # رؤية رائدة
"qwen3-vl-plus-2025-09-23", # إصدار مؤرخ
"qwen3-vl-flash", # نموذج رؤية سريع
"qwen3-vl-flash-2025-10-15", # إصدار مؤرخ
"qwen3-vl-235b-a22b-thinking", # 235B مع التفكير
"qwen3-vl-235b-a22b-instruct", # 235B تعليمات
"qwen3-vl-32b-thinking", # 32B مع التفكير
"qwen3-vl-32b-instruct", # 32B تعليمات
"qwen3-vl-30b-a3b-thinking", # 30B مع التفكير
"qwen3-vl-30b-a3b-instruct", # 30B تعليمات
"qwen3-vl-8b-thinking", # 8B مع التفكير
"qwen3-vl-8b-instruct", # 8B تعليمات
"qwen-vl-max", # VL ماكس
"qwen-vl-plus", # VL بلص
"qwen-vl-ocr", # محسّن للتعرف الضوئي على الحروف
# =====================================
# Qwen 3 OMN (رؤية + كلام)
# =====================================
"qwen3-omni-flash", # متعدد الوسائط
"qwen3-omni-flash-2025-09-15", # إصدار مؤرخ
"qwen3-omni-flash-realtime", # بث مباشر في الوقت الفعلي
"qwen3-omni-flash-realtime-2025-09-15", # إصدار مؤرخ
"qwen3-omni-30b-a3b-captioner", # تذييل الصوت
# =====================================
# QwQ REASONING (الاستدلال والبحث)
# المصدر: Promptfoo
# =====================================
"qwq-plus", # نموذج استدلال تجاري
"qwq-32b", # QwQ 32B مفتوح
"qwen-deep-research", # مساعد بحث مع استعلام ويب
"qvq-max", # استدلال بصري
"qvq-max-latest", # دائماً محدث
"qvq-max-2025-03-25", # إصدار مؤرخ
"qvq-72b-preview", # استدلال بصري 72B
# =====================================
# Qwen 3 OPEN-SOURCE (مفتوحة المصدر)
# المصدر: Alibaba Cloud
# =====================================
"qwen3-next-80b-a3b-thinking", # الجيل التالي 80B مع التفكير
"qwen3-next-80b-a3b-instruct", # الجيل التالي 80B تعليمات
"qwen3-235b-a22b-thinking-2507", # 235B يوليو 2025 مع التفكير
"qwen3-235b-a22b-instruct-2507", # 235B يوليو 2025 تعليمات
"qwen3-30b-a3b-thinking-2507", # 30B يوليو 2025 مع التفكير
"qwen3-30b-a3b-instruct-2507", # 30B يوليو 2025 تعليمات
"qwen3-235b-a22b", # 235B وضع مزدوج
"qwen3-32b", # 32B وضع مزدوج
"qwen3-30b-a3b", # 30B وضع مزدوج
"qwen3-14b", # 14B وضع مزدوج
"qwen3-8b", # 8B وضع مزدوج
"qwen3-4b", # 4B وضع مزدوج
"qwen3-1.7b", # 1.7B للأجهزة الطرفية
"qwen3-0.6b", # 0.6B للأجهزة الطرفية
# =====================================
# Qwen 2.5 SERIES (مستقرة ومجربة)
# =====================================
"qwen2.5-72b-instruct", # 72B نموذج كثيف
"qwen2.5-32b-instruct", # 32B نموذج كثيف
"qwen2.5-14b-instruct", # 14B نموذج كثيف
"qwen2.5-7b-instruct", # 7B نموذج كثيف
"qwen2.5-1.5b-instruct", # 1.5B نموذج صغير
"qwen2.5-0.5b-instruct", # 0.5B نموذج صغير جداً
"qwen2.5-7b-instruct-1m", # 7B مع سياق مليوني
"qwen2.5-14b-instruct-1m", # 14B مع سياق مليوني
"qwen2.5-coder-32b-instruct", # 32B كود
"qwen2.5-coder-14b-instruct", # 14B كود
"qwen2.5-coder-7b-instruct", # 7B كود
"qwen2.5-coder-1.5b-instruct", # 1.5B كود
"qwen2.5-coder-0.5b-instruct", # 0.5B كود
# =====================================
# QWEN MATH (النماذج الرياضية)
# =====================================
"qwen-math-plus", # عددي بلص
"qwen-math-plus-latest", # دائماً محدث
"qwen-math-plus-2024-09-19", # إصدار مؤرخ
"qwen-math-plus-2024-08-16", # إصدار مؤرخ
"qwen-math-turbo", # عددي توربو
"qwen-math-turbo-latest", # دائماً محدث
"qwen-math-turbo-2024-09-19", # إصدار مؤرخ
"qwen2.5-math-72b-instruct", # 72B للنماذج الرياضية
"qwen2.5-math-7b-instruct", # 7B للنماذج الرياضية
# =====================================
# QWEN TRANSLATION (الترجمة)
# =====================================
"qwen-mt-plus", # مترجمة بلص
"qwen-mt-turbo", # مترجمة توربو
# =====================================
# QWEN DOCUMENT (استخراج المستندات)
# =====================================
"qwen-doc-turbo", # استخراج المستندات المهيكلة
# =====================================
# QWEN IMAGE GENERATION (توليد الصور)
# =====================================
"qwen-image-plus", # نص إلى صورة مع عرض نصي معقد
# =====================================
# ALIASES (استعلامات مختصرة)
# =====================================
"qwen",
"qwen2.5",
"qwen-coder",
"qwen-vl",
"qwq",
"qvq",
],
}
# =====================================================
# MODEL DISCOVERY
# =====================================================
_PROVIDER_MODEL_CACHE = {}
def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]:
candidates = []
for attr in ("models", "model", "default_model", "available_models", "supported_models"):
try:
if hasattr(provider_obj, attr):
val = getattr(provider_obj, attr)
if isinstance(val, dict):
candidates.extend(str(k) for k in val.keys())
elif isinstance(val, (list, tuple, set)):
candidates.extend(str(i) for i in val)
elif val:
candidates.append(str(val))
except:
pass
if not candidates:
candidates = PROVIDER_MODELS_FALLBACK.get(provider_name, ["qwen-max"])
seen = set()
return [m for m in candidates if not (m in seen or seen.add(m))]
# =====================================================
# STREAM CLEANER
# =====================================================
def clean_stream(chunk):
try:
if isinstance(chunk, dict):
if 'choices' in chunk and chunk['choices']:
delta = chunk['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
if 'text' in delta:
return delta['text']
return chunk.get('content') or chunk.get('text') or ""
if isinstance(chunk, str):
if chunk and chunk[0] == '{' and chunk[-1] == '}':
try:
data = json.loads(chunk)
if 'choices' in data and data['choices']:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
return data.get('content') or data.get('text') or ""
except:
pass
if '\\' in chunk:
chunk = chunk.replace('\\n', '\n')
if '\\r' in chunk:
chunk = chunk.replace('\\r', '\r')
if '\\t' in chunk:
chunk = chunk.replace('\\t', ' ')
return chunk
return str(chunk)
except Exception:
return ""
# =====================================================
# CHAT LOGIC - فقط مع Qwen و fallback محدد
# =====================================================
def ask(message: str, history, provider_name: str, model_name: str, stop_flag=None):
message = (message or "").strip()
if not message:
yield ""
return
key = f"{provider_name}|{model_name}|{message}"
cached = CACHE.get(key)
if cached:
yield cached
return
msgs = []
try:
if history:
if history and isinstance(history[0], dict):
for item in history[-40:]:
if role := item.get("role"):
if content := item.get("content"):
msgs.append({"role": str(role), "content": str(content)})
else:
for item in history[-20:]:
if isinstance(item, (list, tuple)) and len(item) == 2:
if u := item[0]:
msgs.append({"role": "user", "content": str(u)})
if a := item[1]:
msgs.append({"role": "assistant", "content": str(a)})
except Exception as e:
logger.warning(f"History error: {e}")
msgs.append({"role": "user", "content": message})
# استراتيجية التراجع: فقط Qwen
fallback_providers = [
provider_name,
"Qwen"
]
used = []
for pname in fallback_providers:
if pname in used:
continue
used.append(pname)
pobj = REAL_PROVIDERS.get(pname)
if not pobj:
continue
if pname not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[pname] = discover_provider_models(pobj, pname)
model_candidates = [model_name] + [x for x in _PROVIDER_MODEL_CACHE[pname] if x != model_name]
for m in model_candidates[:12]:
try:
stream = g4f.ChatCompletion.create(
model=m,
provider=pobj,
messages=msgs,
stream=True,
timeout=30
)
buffer = []
for chunk in stream:
if stop_flag and stop_flag.is_set():
return
c = clean_stream(chunk)
if not c:
continue
buffer.append(c)
yield c
full = "".join(buffer)
if full.strip():
CACHE.set(key, full)
return
except Exception as e:
logger.warning(f"Provider {pname} model {m} failed: {e}")
continue
yield "❌ Failed with all providers."
# =====================================================
# FASTAPI
# =====================================================
app = FastAPI(title="G4F Smart Router", description="AI Gateway - Qwen Only")
API_KEY = os.getenv("API_KEY", "mysecretkey123")
class ChatRequest(BaseModel):
message: str
provider: str = "Qwen"
model: str = "qwen-max"
history: List[Any] = []
# =====================================================
# VERIFY API KEY
# =====================================================
def verify_api_key(request: Request):
auth = request.headers.get("Authorization", "").strip()
x_key = request.headers.get("X-API-Key", "").strip()
x_api_key = request.headers.get("x-api-key", "").strip()
if auth.startswith("Bearer "):
key = auth[7:].strip()
if key and key == API_KEY:
return True
if x_key and x_key == API_KEY:
return True
if x_api_key and x_api_key == API_KEY:
return True
raise HTTPException(status_code=401, detail="Invalid API key. Use 'Authorization: Bearer KEY' or 'X-API-Key: KEY'")
# =====================================================
# SUPPORT HEAD METHOD
# =====================================================
@app.head("/")
async def head_root():
return Response(status_code=200)
@app.head("/health")
async def head_health():
return Response(status_code=200)
@app.head("/v1/models")
async def head_models():
return Response(status_code=200)
# =====================================================
# CLAUDE-COMPATIBLE ENDPOINTS
# =====================================================
@app.get("/v1/models")
async def v1_models(request: Request):
"""نماذج Qwen المتاحة"""
models = []
for pname, pobj in REAL_PROVIDERS.items():
if pname not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[pname] = discover_provider_models(pobj, pname)
for model in _PROVIDER_MODEL_CACHE[pname][:30]:
models.append({
"id": model,
"type": "model",
"display_name": f"{pname} - {model}"
})
if not models:
models = [
{"id": "qwen3.6-plus", "type": "model", "display_name": "Qwen 3.6 - Plus"},
{"id": "qwen3-max", "type": "model", "display_name": "Qwen 3 - Max"},
{"id": "qwen-plus", "type": "model", "display_name": "Qwen - Plus"},
{"id": "qwen3-coder-plus", "type": "model", "display_name": "Qwen 3 - Coder Plus"},
{"id": "qwen3-vl-plus", "type": "model", "display_name": "Qwen 3 - Vision Plus"},
]
return {"data": models}
@app.post("/v1/messages")
async def v1_messages(request: Request):
"""نقطة نهاية متوافقة مع Claude Desktop"""
verify_api_key(request)
body = await request.json()
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="No messages provided")
last_message = messages[-1]
user_message = last_message.get("content", "")
model = body.get("model", "qwen-max")
system_prompt = body.get("system", "")
history = []
for msg in messages[:-1]:
role = msg.get("role", "user")
content = msg.get("content", "")
history.append({"role": role, "content": content})
full_message = user_message
if system_prompt:
full_message = f"[System: {system_prompt}]\n\n{user_message}"
full_response = ""
for chunk in ask(full_message, history, "Qwen", model):
full_response = chunk
return {
"id": f"msg_{int(time.time())}_{os.urandom(4).hex()}",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": full_response}],
"model": model,
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {
"input_tokens": len(user_message) // 4,
"output_tokens": len(full_response) // 4
}
}
@app.post("/v1/messages/stream")
async def v1_messages_stream(request: Request):
"""نقطة نهاية متدفقة متوافقة مع Claude Desktop"""
verify_api_key(request)
body = await request.json()
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="No messages provided")
last_message = messages[-1]
user_message = last_message.get("content", "")
model = body.get("model", "qwen-max")
system_prompt = body.get("system", "")
full_message = user_message
if system_prompt:
full_message = f"[System: {system_prompt}]\n\n{user_message}"
async def generate_stream():
message_id = f"msg_{int(time.time())}_{os.urandom(4).hex()}"
yield f"event: message_start\ndata: {{\"message\": {{\"id\": \"{message_id}\", \"type\": \"message\", \"role\": \"assistant\", \"content\": [], \"model\": \"{model}\", \"stop_reason\": null, \"stop_sequence\": null, \"usage\": {{\"input_tokens\": 0, \"output_tokens\": 0}}}}}}\n\n"
yield f"event: content_block_start\ndata: {{\"type\": \"content_block_start\", \"index\": 0, \"content_block\": {{\"type\": \"text\", \"text\": \"\"}}}}\n\n"
for chunk in ask(full_message, [], "Qwen", model):
yield f"event: content_block_delta\ndata: {{\"type\": \"content_block_delta\", \"index\": 0, \"delta\": {{\"type\": \"text_delta\", \"text\": {json.dumps(chunk, ensure_ascii=False)}}}}}\n\n"
yield f"event: message_delta\ndata: {{\"type\": \"message_delta\", \"delta\": {{\"stop_reason\": \"end_turn\", \"stop_sequence\": null}}, \"usage\": {{\"output_tokens\": 100}}}}\n\n"
yield f"event: message_stop\ndata: {{}}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
# =====================================================
# ORIGINAL ENDPOINTS
# =====================================================
@app.get("/")
async def root():
return {
"message": "G4F Smart Router is running (Qwen Only - Full Models)",
"provider": "Qwen",
"endpoints": {
"GET /": "Home page",
"GET /health": "Health check",
"GET /v1/models": "List models (NO AUTH)",
"POST /v1/messages": "Send message (REQUIRES AUTH)",
"POST /v1/messages/stream": "Stream message (REQUIRES AUTH)",
"GET /providers": "List providers (REQUIRES AUTH)",
"POST /chat": "Legacy chat (REQUIRES AUTH)",
"POST /chat/stream": "Legacy stream (REQUIRES AUTH)"
},
"authentication": "Bearer YOUR_API_KEY or X-API-Key: YOUR_API_KEY",
"cookies": COOKIE_STATUS,
"models_count": len(PROVIDER_MODELS_FALLBACK.get("Qwen", [])),
"status": "✅ Server is working"
}
@app.get("/health")
async def health():
return {"status": "ok", "cookies": COOKIE_STATUS, "providers": list(REAL_PROVIDERS.keys())}
@app.post("/chat")
async def chat(request: Request, chat_req: ChatRequest):
verify_api_key(request)
result = ""
for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model):
result = chunk
return JSONResponse({"response": result})
@app.post("/chat/stream")
async def chat_stream(request: Request, chat_req: ChatRequest):
verify_api_key(request)
async def generate():
for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model):
yield f"data: {json.dumps({'delta': chunk}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.get("/providers")
async def get_providers(request: Request):
verify_api_key(request)
providers_info = {}
for pname, pobj in REAL_PROVIDERS.items():
if pname not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[pname] = discover_provider_models(pobj, pname)
providers_info[pname] = _PROVIDER_MODEL_CACHE[pname]
return JSONResponse({"providers": providers_info})
# =====================================================
# RUN
# =====================================================
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 7860))
uvicorn.run(
"app:app",
host="0.0.0.0",
port=port,
reload=False
)