| import os |
| import json |
| import time |
| import logging |
| from typing import Any, Dict, List, Optional, Tuple |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import StreamingResponse, JSONResponse |
| from pydantic import BaseModel |
| import g4f |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("g4f-smart-router") |
|
|
| |
| |
| |
| def _load_cookies_raw() -> Dict[str, Any]: |
| raw_env = (os.getenv("COOKIES_JSON") or "").strip() |
|
|
| if raw_env: |
| try: |
| return json.loads(raw_env) |
| except Exception as e: |
| logger.warning(e) |
|
|
| try: |
| if os.path.exists("cookies.json"): |
| with open("cookies.json", "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception as e: |
| logger.warning(e) |
|
|
| return {} |
|
|
| def load_cookies() -> str: |
| data = _load_cookies_raw() |
|
|
| if not data: |
| return "⚠️ No Cookies" |
|
|
| try: |
| from g4f.cookies import set_cookies |
| except Exception: |
| return "⚠️ Cookies Found" |
|
|
| for domain, vals in data.items(): |
| try: |
| dom = domain if "." in domain else f".{domain}.com" |
|
|
| if isinstance(vals, list): |
| vals = { |
| x["name"]: x["value"] |
| for x in vals |
| if isinstance(x, dict) |
| } |
|
|
| if isinstance(vals, dict): |
| set_cookies(dom, vals) |
|
|
| except Exception as e: |
| logger.warning(e) |
|
|
| return "✅ Cookies Loaded" |
|
|
| COOKIE_STATUS = load_cookies() |
|
|
| |
| |
| |
| def get_provider(name: str): |
| try: |
| return getattr(g4f.Provider, name) |
| except: |
| return None |
|
|
| REAL_PROVIDERS: Dict[str, Any] = { |
| "Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"), |
| "Copilot": get_provider("Copilot"), |
| "Qwen": get_provider("Qwen"), |
| "Blackbox": get_provider("Blackbox"), |
| "DeepSeek": get_provider("DeepSeek"), |
| "HuggingFace": get_provider("HuggingFace"), |
| "Cloudflare": get_provider("Cloudflare"), |
| "You": get_provider("You"), |
| "Bing": get_provider("Bing"), |
| } |
|
|
| REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v} |
|
|
| |
| |
| |
| PROVIDER_MODELS_FALLBACK: Dict[str, List[str]] = { |
| "Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"], |
| "Copilot": ["gpt-4o", "gpt-4", "turbo"], |
| "Qwen": ["qwen-max", "qwen-plus", "qwen-turbo", "qwen"], |
| "Blackbox": ["gpt-4o", "claude-3", "gemini-pro", "llama-3"], |
| "DeepSeek": ["deepseek-chat", "deepseek-coder"], |
| "HuggingFace": ["meta-llama", "mistral", "qwen", "gemma"], |
| "Cloudflare": ["llama-3", "mistral", "qwen"], |
| "You": ["gpt-4o", "claude", "llama-3"], |
| "Bing": ["gpt-4o", "gpt-4"], |
| } |
|
|
| |
| |
| |
| def _normalize_model_list(x: Any) -> List[str]: |
| out: List[str] = [] |
|
|
| if x is None: |
| return out |
|
|
| if isinstance(x, dict): |
| out = [str(k) for k in x.keys()] |
| elif isinstance(x, (list, tuple, set)): |
| out = [str(i) for i in x] |
| else: |
| out = [str(x)] |
|
|
| out = [m.strip() for m in out if m] |
|
|
| seen = set() |
| uniq = [] |
|
|
| for m in out: |
| if m not in seen: |
| uniq.append(m) |
| seen.add(m) |
|
|
| return uniq |
|
|
| def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]: |
|
|
| candidates = [] |
|
|
| for attr in ( |
| "models", |
| "model", |
| "default_model", |
| "available_models", |
| "supported_models" |
| ): |
| try: |
| if hasattr(provider_obj, attr): |
| candidates += _normalize_model_list( |
| getattr(provider_obj, attr) |
| ) |
| except: |
| pass |
|
|
| for fn in ("get_models", "models_list", "list_models"): |
| try: |
| if hasattr(provider_obj, fn): |
| candidates += _normalize_model_list( |
| getattr(provider_obj, fn)() |
| ) |
| except: |
| pass |
|
|
| if not candidates: |
| candidates = PROVIDER_MODELS_FALLBACK.get( |
| provider_name, |
| ["gpt-4o"] |
| ) |
|
|
| seen = set() |
| uniq = [] |
|
|
| for m in candidates: |
| if m not in seen: |
| uniq.append(m) |
| seen.add(m) |
|
|
| return uniq |
|
|
| |
| |
| |
| def clean_stream(chunk): |
| try: |
| if isinstance(chunk, dict): |
| if 'choices' in chunk and chunk['choices']: |
| delta = chunk['choices'][0].get('delta', {}) |
| if 'content' in delta: |
| return delta['content'] |
| if 'text' in delta: |
| return delta['text'] |
| if 'content' in chunk: |
| return chunk['content'] |
| if 'text' in chunk: |
| return chunk['text'] |
| return "" |
|
|
| if isinstance(chunk, str): |
| if chunk.strip().startswith("{") and chunk.strip().endswith("}"): |
| try: |
| data = json.loads(chunk) |
| if 'choices' in data and data['choices']: |
| delta = data['choices'][0].get('delta', {}) |
| if 'content' in delta: |
| return delta['content'] |
| if 'content' in data: |
| return data['content'] |
| if 'text' in data: |
| return data['text'] |
| except: |
| pass |
|
|
| chunk = chunk.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', ' ') |
| chunk = chunk.replace('\\"', '"').replace("\\'", "'") |
| return chunk |
|
|
| return str(chunk) |
| except Exception as e: |
| logger.warning(f"خطأ في clean_stream: {e}") |
| return "" |
|
|
| |
| |
| |
| CACHE = {} |
| CACHE_TS = {} |
|
|
| def _cache_get(key): |
| if key not in CACHE: |
| return None |
| return CACHE[key] |
|
|
| def _cache_set(key, val): |
| CACHE[key] = val |
| CACHE_TS[key] = time.time() |
|
|
| |
| |
| |
| _PROVIDER_MODEL_CACHE = {} |
|
|
| def ask(message: str, history, provider_name: str, model_name: str): |
| history = history or [] |
| message = (message or "").strip() |
|
|
| if not message: |
| yield "" |
| return |
|
|
| key = f"{provider_name}|{model_name}|{message}" |
|
|
| cached = _cache_get(key) |
|
|
| if cached is not None: |
| yield cached |
| return |
|
|
| msgs = [] |
|
|
| try: |
| if history: |
| if isinstance(history[0], dict): |
| for item in history[-40:]: |
| role = item.get("role") |
| content = item.get("content") |
| if role and content: |
| msgs.append({ |
| "role": str(role), |
| "content": str(content) |
| }) |
| else: |
| for item in history[-20:]: |
| if isinstance(item, (list, tuple)) and len(item) == 2: |
| u = item[0] |
| a = item[1] |
| if u: |
| msgs.append({ |
| "role": "user", |
| "content": str(u) |
| }) |
| if a: |
| msgs.append({ |
| "role": "assistant", |
| "content": str(a) |
| }) |
| except Exception as e: |
| logger.warning(e) |
|
|
| msgs.append({ |
| "role": "user", |
| "content": message |
| }) |
|
|
| fallback_providers = [ |
| provider_name, |
| "Perplexity", |
| "Copilot", |
| "Blackbox", |
| "DeepSeek", |
| "Bing", |
| "You", |
| "Qwen" |
| ] |
|
|
| used = [] |
|
|
| for pname in fallback_providers: |
| if pname in used: |
| continue |
|
|
| used.append(pname) |
|
|
| pobj = REAL_PROVIDERS.get(pname) |
|
|
| if not pobj: |
| continue |
|
|
| if pname not in _PROVIDER_MODEL_CACHE: |
| _PROVIDER_MODEL_CACHE[pname] = discover_provider_models( |
| pobj, |
| pname |
| ) |
|
|
| model_candidates = [model_name] + [ |
| x for x in _PROVIDER_MODEL_CACHE[pname] |
| if x != model_name |
| ] |
|
|
| for m in model_candidates[:12]: |
| try: |
| stream = g4f.ChatCompletion.create( |
| model=m, |
| provider=pobj, |
| messages=msgs, |
| stream=True |
| ) |
|
|
| text = "" |
|
|
| for chunk in stream: |
| c = clean_stream(chunk) |
|
|
| if c is None or c == "": |
| continue |
|
|
| text += c |
| yield text |
|
|
| if text.strip(): |
| _cache_set(key, text) |
| return |
|
|
| except Exception as e: |
| logger.warning(e) |
| continue |
|
|
| yield "❌ Failed with all providers." |
|
|
| |
| |
| |
|
|
| app = FastAPI(title="G4F API", description="G4F Smart Router API") |
|
|
| API_KEY = os.getenv("API_KEY", "mysecretkey123") |
|
|
| class ChatRequest(BaseModel): |
| message: str |
| provider: str = "Perplexity" |
| model: str = "sonar" |
| history: List[Any] = [] |
|
|
| |
| |
| |
| def verify_api_key(request: Request): |
| |
| auth = request.headers.get("Authorization") |
| if auth and auth.startswith("Bearer "): |
| key = auth.replace("Bearer ", "") |
| if key == API_KEY: |
| return True |
| |
| |
| x_api_key = request.headers.get("X-API-Key") |
| if x_api_key and x_api_key == API_KEY: |
| return True |
| |
| |
| raise HTTPException( |
| status_code=401, |
| detail="Invalid or missing API key. Use 'Authorization: Bearer KEY' or 'X-API-Key: KEY'" |
| ) |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "G4F API is running", |
| "endpoints": { |
| "GET /": "هذه الصفحة", |
| "GET /health": "التحقق من صحة الخادم", |
| "GET /providers": "قائمة المزودين والنماذج (يتطلب مفتاح)", |
| "POST /chat": "إرسال رسالة والحصول على رد (يتطلب مفتاح)", |
| "POST /chat/stream": "إرسال رسالة والحصول على رد متدفق (يتطلب مفتاح)" |
| }, |
| "authentication": "Bearer YOUR_API_KEY or X-API-Key: YOUR_API_KEY", |
| "status": "✅ Server is working" |
| } |
|
|
| @app.get("/health") |
| async def health(): |
| return {"status": "ok", "cookies": COOKIE_STATUS, "providers": list(REAL_PROVIDERS.keys())} |
|
|
| @app.post("/chat") |
| async def chat(request: Request, chat_req: ChatRequest): |
| verify_api_key(request) |
| |
| full_response = "" |
| for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model): |
| full_response = chunk |
| |
| return JSONResponse(content={"response": full_response}) |
|
|
| @app.post("/chat/stream") |
| async def chat_stream(request: Request, chat_req: ChatRequest): |
| verify_api_key(request) |
| |
| async def generate(): |
| for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model): |
| yield f"data: {json.dumps({'chunk': chunk})}\n\n" |
| yield "data: [DONE]\n\n" |
| |
| return StreamingResponse(generate(), media_type="text/event-stream") |
|
|
| @app.get("/providers") |
| async def get_providers(request: Request): |
| verify_api_key(request) |
| |
| providers_info = {} |
| for pname, pobj in REAL_PROVIDERS.items(): |
| if pname not in _PROVIDER_MODEL_CACHE: |
| _PROVIDER_MODEL_CACHE[pname] = discover_provider_models(pobj, pname) |
| providers_info[pname] = _PROVIDER_MODEL_CACHE[pname] |
| |
| return JSONResponse(content={"providers": providers_info}) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| port = int(os.getenv("PORT", "7860")) |
| uvicorn.run(app, host="0.0.0.0", port=port) |