import os import json import time import logging from typing import Any, Dict, List, Optional, Tuple import gradio as gr import g4f logging.basicConfig(level=logging.INFO) logger = logging.getLogger("g4f-smart-router") # ===================================================== # COOKIES # ===================================================== def _load_cookies_raw() -> Dict[str, Any]: raw_env = (os.getenv("COOKIES_JSON") or "").strip() if raw_env: try: return json.loads(raw_env) except Exception as e: logger.warning(e) try: if os.path.exists("cookies.json"): with open("cookies.json", "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.warning(e) return {} def load_cookies() -> str: data = _load_cookies_raw() if not data: return "⚠️ No Cookies" try: from g4f.cookies import set_cookies except Exception: return "⚠️ Cookies Found" for domain, vals in data.items(): try: dom = domain if "." in domain else f".{domain}.com" if isinstance(vals, list): vals = { x["name"]: x["value"] for x in vals if isinstance(x, dict) } if isinstance(vals, dict): set_cookies(dom, vals) except Exception as e: logger.warning(e) return "✅ Cookies Loaded" COOKIE_STATUS = load_cookies() # ===================================================== # PROVIDERS # ===================================================== def get_provider(name: str): try: return getattr(g4f.Provider, name) except: return None REAL_PROVIDERS: Dict[str, Any] = { "Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"), "Copilot": get_provider("Copilot"), "Qwen": get_provider("Qwen"), "Blackbox": get_provider("Blackbox"), "DeepSeek": get_provider("DeepSeek"), "HuggingFace": get_provider("HuggingFace"), "Cloudflare": get_provider("Cloudflare"), "You": get_provider("You"), "Bing": get_provider("Bing"), } REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v} # ===================================================== # MODELS # ===================================================== PROVIDER_MODELS_FALLBACK: Dict[str, List[str]] = { "Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"], "Copilot": ["gpt-4o", "gpt-4", "turbo"], "Qwen": ["qwen-max", "qwen-plus", "qwen-turbo", "qwen"], "Blackbox": ["gpt-4o", "claude-3", "gemini-pro", "llama-3"], "DeepSeek": ["deepseek-chat", "deepseek-coder"], "HuggingFace": ["meta-llama", "mistral", "qwen", "gemma"], "Cloudflare": ["llama-3", "mistral", "qwen"], "You": ["gpt-4o", "claude", "llama-3"], "Bing": ["gpt-4o", "gpt-4"], } # ===================================================== # MODEL DISCOVERY # ===================================================== def _normalize_model_list(x: Any) -> List[str]: out: List[str] = [] if x is None: return out if isinstance(x, dict): out = [str(k) for k in x.keys()] elif isinstance(x, (list, tuple, set)): out = [str(i) for i in x] else: out = [str(x)] out = [m.strip() for m in out if m] seen = set() uniq = [] for m in out: if m not in seen: uniq.append(m) seen.add(m) return uniq def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]: candidates = [] for attr in ( "models", "model", "default_model", "available_models", "supported_models" ): try: if hasattr(provider_obj, attr): candidates += _normalize_model_list( getattr(provider_obj, attr) ) except: pass for fn in ("get_models", "models_list", "list_models"): try: if hasattr(provider_obj, fn): candidates += _normalize_model_list( getattr(provider_obj, fn)() ) except: pass if not candidates: candidates = PROVIDER_MODELS_FALLBACK.get( provider_name, ["gpt-4o"] ) seen = set() uniq = [] for m in candidates: if m not in seen: uniq.append(m) seen.add(m) return uniq # ===================================================== # STREAM CLEANER (محسّنة جداً - تحل مشاكل JSON والأكواد) # ===================================================== def clean_stream(chunk): try: # إذا كانت القطعة قاموساً (dict) مثل JSON if isinstance(chunk, dict): # استخراج النص من تنسيق Perplexity if 'choices' in chunk and chunk['choices']: delta = chunk['choices'][0].get('delta', {}) if 'content' in delta: return delta['content'] if 'text' in delta: return delta['text'] # تنسيقات أخرى if 'content' in chunk: return chunk['content'] if 'text' in chunk: return chunk['text'] return "" # إذا كانت القطعة نصاً if isinstance(chunk, str): # محاولة تفسيرها كـ JSON if chunk.strip().startswith("{") and chunk.strip().endswith("}"): try: data = json.loads(chunk) # استخراج النص من JSON if 'choices' in data and data['choices']: delta = data['choices'][0].get('delta', {}) if 'content' in delta: return delta['content'] if 'content' in data: return data['content'] if 'text' in data: return data['text'] except: pass # تنظيف علامات الترميز (للكود) chunk = chunk.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', ' ') chunk = chunk.replace('\\"', '"').replace("\\'", "'") return chunk # إذا كانت القطعة شيئاً آخر return str(chunk) except Exception as e: logger.warning(f"خطأ في clean_stream: {e}") return "" # ===================================================== # UI update # ===================================================== _PROVIDER_MODEL_CACHE = {} def update_models(provider_name): pobj = REAL_PROVIDERS.get(provider_name) if not pobj: arr = ["gpt-4o"] else: if provider_name not in _PROVIDER_MODEL_CACHE: _PROVIDER_MODEL_CACHE[provider_name] = discover_provider_models( pobj, provider_name ) arr = _PROVIDER_MODEL_CACHE[provider_name] return gr.update( choices=arr, value=arr[0] ) # ===================================================== # CACHE # ===================================================== CACHE = {} CACHE_TS = {} def _cache_get(key): if key not in CACHE: return None return CACHE[key] def _cache_set(key, val): CACHE[key] = val CACHE_TS[key] = time.time() # ===================================================== # CHAT (محسّنة - ذاكرة أطول وfallback أفضل) # ===================================================== def ask(message: str, history, provider_name: str, model_name: str): history = history or [] message = (message or "").strip() if not message: yield "" return key = f"{provider_name}|{model_name}|{message}" cached = _cache_get(key) if cached is not None: yield cached return # ================================================= # بناء الذاكرة (محسّن - يحفظ آخر 40/20 بدلاً من 16/8) # ================================================= msgs = [] try: if history: # format جديد if isinstance(history[0], dict): # حفظ آخر 40 رسالة (بدلاً من 16) for item in history[-40:]: role = item.get("role") content = item.get("content") if role and content: msgs.append({ "role": str(role), "content": str(content) }) # format قديم else: # حفظ آخر 20 محادثة (بدلاً من 8) for item in history[-20:]: if isinstance(item, (list, tuple)) and len(item) == 2: u = item[0] a = item[1] if u: msgs.append({ "role": "user", "content": str(u) }) if a: msgs.append({ "role": "assistant", "content": str(a) }) except Exception as e: logger.warning(e) msgs.append({ "role": "user", "content": message }) # ================================================= # تجربة المزودين مع fallback # ================================================= fallback_providers = [ provider_name, "Perplexity", "Copilot", "Blackbox", "DeepSeek", "Bing", "You", "Qwen" ] used = [] for pname in fallback_providers: if pname in used: continue used.append(pname) pobj = REAL_PROVIDERS.get(pname) if not pobj: continue if pname not in _PROVIDER_MODEL_CACHE: _PROVIDER_MODEL_CACHE[pname] = discover_provider_models( pobj, pname ) model_candidates = [model_name] + [ x for x in _PROVIDER_MODEL_CACHE[pname] if x != model_name ] for m in model_candidates[:12]: try: stream = g4f.ChatCompletion.create( model=m, provider=pobj, messages=msgs, stream=True ) text = "" for chunk in stream: c = clean_stream(chunk) if c is None or c == "": continue text += c yield text if text.strip(): _cache_set(key, text) return except Exception as e: logger.warning(e) continue yield "❌ Failed with all providers." # ===================================================== # UI # ===================================================== css = """ .gradio-container{ max-width:1200px!important; } """ providers = list(REAL_PROVIDERS.keys()) default_provider = ( "Perplexity" if "Perplexity" in providers else providers[0] ) default_models = discover_provider_models( REAL_PROVIDERS[default_provider], default_provider ) with gr.Blocks( theme=gr.themes.Soft(), css=css ) as demo: gr.Markdown("# 🤖 G4F Smart Router FINAL") gr.Markdown(COOKIE_STATUS) with gr.Row(): provider_dd = gr.Dropdown( choices=providers, value=default_provider, label="Provider" ) model_dd = gr.Dropdown( choices=default_models, value=default_models[0], label="Model" ) provider_dd.change( update_models, inputs=provider_dd, outputs=model_dd ) gr.ChatInterface( fn=ask, additional_inputs=[ provider_dd, model_dd ], title="", description="Choose provider → models update automatically." ) demo.queue().launch( server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")) )