import os import json import time import logging from typing import Any, Dict, List, Optional, Tuple from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel import g4f logging.basicConfig(level=logging.INFO) logger = logging.getLogger("g4f-smart-router") # ===================================================== # COOKIES # ===================================================== def _load_cookies_raw() -> Dict[str, Any]: raw_env = (os.getenv("COOKIES_JSON") or "").strip() if raw_env: try: return json.loads(raw_env) except Exception as e: logger.warning(e) try: if os.path.exists("cookies.json"): with open("cookies.json", "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.warning(e) return {} def load_cookies() -> str: data = _load_cookies_raw() if not data: return "⚠️ No Cookies" try: from g4f.cookies import set_cookies except Exception: return "⚠️ Cookies Found" for domain, vals in data.items(): try: dom = domain if "." in domain else f".{domain}.com" if isinstance(vals, list): vals = { x["name"]: x["value"] for x in vals if isinstance(x, dict) } if isinstance(vals, dict): set_cookies(dom, vals) except Exception as e: logger.warning(e) return "✅ Cookies Loaded" COOKIE_STATUS = load_cookies() # ===================================================== # PROVIDERS # ===================================================== def get_provider(name: str): try: return getattr(g4f.Provider, name) except: return None REAL_PROVIDERS: Dict[str, Any] = { "Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"), "Copilot": get_provider("Copilot"), "Qwen": get_provider("Qwen"), "Blackbox": get_provider("Blackbox"), "DeepSeek": get_provider("DeepSeek"), "HuggingFace": get_provider("HuggingFace"), "Cloudflare": get_provider("Cloudflare"), "You": get_provider("You"), "Bing": get_provider("Bing"), } REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v} # ===================================================== # MODELS # ===================================================== PROVIDER_MODELS_FALLBACK: Dict[str, List[str]] = { "Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"], "Copilot": ["gpt-4o", "gpt-4", "turbo"], "Qwen": ["qwen-max", "qwen-plus", "qwen-turbo", "qwen"], "Blackbox": ["gpt-4o", "claude-3", "gemini-pro", "llama-3"], "DeepSeek": ["deepseek-chat", "deepseek-coder"], "HuggingFace": ["meta-llama", "mistral", "qwen", "gemma"], "Cloudflare": ["llama-3", "mistral", "qwen"], "You": ["gpt-4o", "claude", "llama-3"], "Bing": ["gpt-4o", "gpt-4"], } # ===================================================== # MODEL DISCOVERY # ===================================================== def _normalize_model_list(x: Any) -> List[str]: out: List[str] = [] if x is None: return out if isinstance(x, dict): out = [str(k) for k in x.keys()] elif isinstance(x, (list, tuple, set)): out = [str(i) for i in x] else: out = [str(x)] out = [m.strip() for m in out if m] seen = set() uniq = [] for m in out: if m not in seen: uniq.append(m) seen.add(m) return uniq def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]: candidates = [] for attr in ( "models", "model", "default_model", "available_models", "supported_models" ): try: if hasattr(provider_obj, attr): candidates += _normalize_model_list( getattr(provider_obj, attr) ) except: pass for fn in ("get_models", "models_list", "list_models"): try: if hasattr(provider_obj, fn): candidates += _normalize_model_list( getattr(provider_obj, fn)() ) except: pass if not candidates: candidates = PROVIDER_MODELS_FALLBACK.get( provider_name, ["gpt-4o"] ) seen = set() uniq = [] for m in candidates: if m not in seen: uniq.append(m) seen.add(m) return uniq # ===================================================== # STREAM CLEANER # ===================================================== def clean_stream(chunk): try: if isinstance(chunk, dict): if 'choices' in chunk and chunk['choices']: delta = chunk['choices'][0].get('delta', {}) if 'content' in delta: return delta['content'] if 'text' in delta: return delta['text'] if 'content' in chunk: return chunk['content'] if 'text' in chunk: return chunk['text'] return "" if isinstance(chunk, str): if chunk.strip().startswith("{") and chunk.strip().endswith("}"): try: data = json.loads(chunk) if 'choices' in data and data['choices']: delta = data['choices'][0].get('delta', {}) if 'content' in delta: return delta['content'] if 'content' in data: return data['content'] if 'text' in data: return data['text'] except: pass chunk = chunk.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', ' ') chunk = chunk.replace('\\"', '"').replace("\\'", "'") return chunk return str(chunk) except Exception as e: logger.warning(f"خطأ في clean_stream: {e}") return "" # ===================================================== # CACHE # ===================================================== CACHE = {} CACHE_TS = {} def _cache_get(key): if key not in CACHE: return None return CACHE[key] def _cache_set(key, val): CACHE[key] = val CACHE_TS[key] = time.time() # ===================================================== # CHAT LOGIC # ===================================================== _PROVIDER_MODEL_CACHE = {} def ask(message: str, history, provider_name: str, model_name: str): history = history or [] message = (message or "").strip() if not message: yield "" return key = f"{provider_name}|{model_name}|{message}" cached = _cache_get(key) if cached is not None: yield cached return msgs = [] try: if history: if isinstance(history[0], dict): for item in history[-40:]: role = item.get("role") content = item.get("content") if role and content: msgs.append({ "role": str(role), "content": str(content) }) else: for item in history[-20:]: if isinstance(item, (list, tuple)) and len(item) == 2: u = item[0] a = item[1] if u: msgs.append({ "role": "user", "content": str(u) }) if a: msgs.append({ "role": "assistant", "content": str(a) }) except Exception as e: logger.warning(e) msgs.append({ "role": "user", "content": message }) fallback_providers = [ provider_name, "Perplexity", "Copilot", "Blackbox", "DeepSeek", "Bing", "You", "Qwen" ] used = [] for pname in fallback_providers: if pname in used: continue used.append(pname) pobj = REAL_PROVIDERS.get(pname) if not pobj: continue if pname not in _PROVIDER_MODEL_CACHE: _PROVIDER_MODEL_CACHE[pname] = discover_provider_models( pobj, pname ) model_candidates = [model_name] + [ x for x in _PROVIDER_MODEL_CACHE[pname] if x != model_name ] for m in model_candidates[:12]: try: stream = g4f.ChatCompletion.create( model=m, provider=pobj, messages=msgs, stream=True ) text = "" for chunk in stream: c = clean_stream(chunk) if c is None or c == "": continue text += c yield text if text.strip(): _cache_set(key, text) return except Exception as e: logger.warning(e) continue yield "❌ Failed with all providers." # ===================================================== # FASTAPI # ===================================================== app = FastAPI(title="G4F API", description="G4F Smart Router API") API_KEY = os.getenv("API_KEY", "mysecretkey123") class ChatRequest(BaseModel): message: str provider: str = "Perplexity" model: str = "sonar" history: List[Any] = [] # ===================================================== # تعديل دالة التحقق لدعم X-API-Key مع الاحتفاظ بـ Bearer # ===================================================== def verify_api_key(request: Request): # الطريقة الأولى: Authorization: Bearer XXX auth = request.headers.get("Authorization") if auth and auth.startswith("Bearer "): key = auth.replace("Bearer ", "") if key == API_KEY: return True # الطريقة الثانية: X-API-Key: XXX x_api_key = request.headers.get("X-API-Key") if x_api_key and x_api_key == API_KEY: return True # لا يوجد مفتاح صالح raise HTTPException( status_code=401, detail="Invalid or missing API key. Use 'Authorization: Bearer KEY' or 'X-API-Key: KEY'" ) # ===================================================== # نقاط النهاية (بدون تغيير) # ===================================================== @app.get("/") async def root(): return { "message": "G4F API is running", "endpoints": { "GET /": "هذه الصفحة", "GET /health": "التحقق من صحة الخادم", "GET /providers": "قائمة المزودين والنماذج (يتطلب مفتاح)", "POST /chat": "إرسال رسالة والحصول على رد (يتطلب مفتاح)", "POST /chat/stream": "إرسال رسالة والحصول على رد متدفق (يتطلب مفتاح)" }, "authentication": "Bearer YOUR_API_KEY or X-API-Key: YOUR_API_KEY", "status": "✅ Server is working" } @app.get("/health") async def health(): return {"status": "ok", "cookies": COOKIE_STATUS, "providers": list(REAL_PROVIDERS.keys())} @app.post("/chat") async def chat(request: Request, chat_req: ChatRequest): verify_api_key(request) full_response = "" for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model): full_response = chunk return JSONResponse(content={"response": full_response}) @app.post("/chat/stream") async def chat_stream(request: Request, chat_req: ChatRequest): verify_api_key(request) async def generate(): for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model): yield f"data: {json.dumps({'chunk': chunk})}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream") @app.get("/providers") async def get_providers(request: Request): verify_api_key(request) providers_info = {} for pname, pobj in REAL_PROVIDERS.items(): if pname not in _PROVIDER_MODEL_CACHE: _PROVIDER_MODEL_CACHE[pname] = discover_provider_models(pobj, pname) providers_info[pname] = _PROVIDER_MODEL_CACHE[pname] return JSONResponse(content={"providers": providers_info}) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run(app, host="0.0.0.0", port=port)