| import os |
| import json |
| import time |
| import logging |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import gradio as gr |
| import g4f |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("g4f-smart-router") |
|
|
| |
| |
| |
| def _load_cookies_raw() -> Dict[str, Any]: |
| raw_env = (os.getenv("COOKIES_JSON") or "").strip() |
|
|
| if raw_env: |
| try: |
| return json.loads(raw_env) |
| except Exception as e: |
| logger.warning(e) |
|
|
| try: |
| if os.path.exists("cookies.json"): |
| with open("cookies.json", "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception as e: |
| logger.warning(e) |
|
|
| return {} |
|
|
| def load_cookies() -> str: |
| data = _load_cookies_raw() |
|
|
| if not data: |
| return "⚠️ No Cookies" |
|
|
| try: |
| from g4f.cookies import set_cookies |
| except Exception: |
| return "⚠️ Cookies Found" |
|
|
| for domain, vals in data.items(): |
| try: |
| dom = domain if "." in domain else f".{domain}.com" |
|
|
| if isinstance(vals, list): |
| vals = { |
| x["name"]: x["value"] |
| for x in vals |
| if isinstance(x, dict) |
| } |
|
|
| if isinstance(vals, dict): |
| set_cookies(dom, vals) |
|
|
| except Exception as e: |
| logger.warning(e) |
|
|
| return "✅ Cookies Loaded" |
|
|
| COOKIE_STATUS = load_cookies() |
|
|
| |
| |
| |
| def get_provider(name: str): |
| try: |
| return getattr(g4f.Provider, name) |
| except: |
| return None |
|
|
| REAL_PROVIDERS: Dict[str, Any] = { |
| "Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"), |
| "Copilot": get_provider("Copilot"), |
| "Qwen": get_provider("Qwen"), |
| "Blackbox": get_provider("Blackbox"), |
| "DeepSeek": get_provider("DeepSeek"), |
| "HuggingFace": get_provider("HuggingFace"), |
| "Cloudflare": get_provider("Cloudflare"), |
| "You": get_provider("You"), |
| "Bing": get_provider("Bing"), |
| } |
|
|
| REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v} |
|
|
| |
| |
| |
| PROVIDER_MODELS_FALLBACK: Dict[str, List[str]] = { |
| "Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"], |
| "Copilot": ["gpt-4o", "gpt-4", "turbo"], |
| "Qwen": ["qwen-max", "qwen-plus", "qwen-turbo", "qwen"], |
| "Blackbox": ["gpt-4o", "claude-3", "gemini-pro", "llama-3"], |
| "DeepSeek": ["deepseek-chat", "deepseek-coder"], |
| "HuggingFace": ["meta-llama", "mistral", "qwen", "gemma"], |
| "Cloudflare": ["llama-3", "mistral", "qwen"], |
| "You": ["gpt-4o", "claude", "llama-3"], |
| "Bing": ["gpt-4o", "gpt-4"], |
| } |
|
|
| |
| |
| |
| def _normalize_model_list(x: Any) -> List[str]: |
| out: List[str] = [] |
|
|
| if x is None: |
| return out |
|
|
| if isinstance(x, dict): |
| out = [str(k) for k in x.keys()] |
| elif isinstance(x, (list, tuple, set)): |
| out = [str(i) for i in x] |
| else: |
| out = [str(x)] |
|
|
| out = [m.strip() for m in out if m] |
|
|
| seen = set() |
| uniq = [] |
|
|
| for m in out: |
| if m not in seen: |
| uniq.append(m) |
| seen.add(m) |
|
|
| return uniq |
|
|
| def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]: |
|
|
| candidates = [] |
|
|
| for attr in ( |
| "models", |
| "model", |
| "default_model", |
| "available_models", |
| "supported_models" |
| ): |
| try: |
| if hasattr(provider_obj, attr): |
| candidates += _normalize_model_list( |
| getattr(provider_obj, attr) |
| ) |
| except: |
| pass |
|
|
| for fn in ("get_models", "models_list", "list_models"): |
| try: |
| if hasattr(provider_obj, fn): |
| candidates += _normalize_model_list( |
| getattr(provider_obj, fn)() |
| ) |
| except: |
| pass |
|
|
| if not candidates: |
| candidates = PROVIDER_MODELS_FALLBACK.get( |
| provider_name, |
| ["gpt-4o"] |
| ) |
|
|
| seen = set() |
| uniq = [] |
|
|
| for m in candidates: |
| if m not in seen: |
| uniq.append(m) |
| seen.add(m) |
|
|
| return uniq |
|
|
| |
| |
| |
| def clean_stream(chunk): |
| try: |
| |
| if isinstance(chunk, dict): |
| |
| if 'choices' in chunk and chunk['choices']: |
| delta = chunk['choices'][0].get('delta', {}) |
| if 'content' in delta: |
| return delta['content'] |
| if 'text' in delta: |
| return delta['text'] |
| |
| if 'content' in chunk: |
| return chunk['content'] |
| if 'text' in chunk: |
| return chunk['text'] |
| return "" |
| |
| |
| if isinstance(chunk, str): |
| |
| if chunk.strip().startswith("{") and chunk.strip().endswith("}"): |
| try: |
| data = json.loads(chunk) |
| |
| if 'choices' in data and data['choices']: |
| delta = data['choices'][0].get('delta', {}) |
| if 'content' in delta: |
| return delta['content'] |
| if 'content' in data: |
| return data['content'] |
| if 'text' in data: |
| return data['text'] |
| except: |
| pass |
| |
| |
| chunk = chunk.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', ' ') |
| chunk = chunk.replace('\\"', '"').replace("\\'", "'") |
| return chunk |
| |
| |
| return str(chunk) |
| except Exception as e: |
| logger.warning(f"خطأ في clean_stream: {e}") |
| return "" |
|
|
| |
| |
| |
| _PROVIDER_MODEL_CACHE = {} |
|
|
| def update_models(provider_name): |
|
|
| pobj = REAL_PROVIDERS.get(provider_name) |
|
|
| if not pobj: |
| arr = ["gpt-4o"] |
| else: |
| if provider_name not in _PROVIDER_MODEL_CACHE: |
| _PROVIDER_MODEL_CACHE[provider_name] = discover_provider_models( |
| pobj, |
| provider_name |
| ) |
|
|
| arr = _PROVIDER_MODEL_CACHE[provider_name] |
|
|
| return gr.update( |
| choices=arr, |
| value=arr[0] |
| ) |
|
|
| |
| |
| |
| CACHE = {} |
| CACHE_TS = {} |
|
|
| def _cache_get(key): |
|
|
| if key not in CACHE: |
| return None |
|
|
| return CACHE[key] |
|
|
| def _cache_set(key, val): |
| CACHE[key] = val |
| CACHE_TS[key] = time.time() |
|
|
| |
| |
| |
| def ask(message: str, history, provider_name: str, model_name: str): |
|
|
| history = history or [] |
| message = (message or "").strip() |
|
|
| if not message: |
| yield "" |
| return |
|
|
| key = f"{provider_name}|{model_name}|{message}" |
|
|
| cached = _cache_get(key) |
|
|
| if cached is not None: |
| yield cached |
| return |
|
|
| |
| |
| |
| msgs = [] |
|
|
| try: |
| if history: |
| |
| if isinstance(history[0], dict): |
| |
| for item in history[-40:]: |
| role = item.get("role") |
| content = item.get("content") |
| if role and content: |
| msgs.append({ |
| "role": str(role), |
| "content": str(content) |
| }) |
| |
| else: |
| |
| for item in history[-20:]: |
| if isinstance(item, (list, tuple)) and len(item) == 2: |
| u = item[0] |
| a = item[1] |
| if u: |
| msgs.append({ |
| "role": "user", |
| "content": str(u) |
| }) |
| if a: |
| msgs.append({ |
| "role": "assistant", |
| "content": str(a) |
| }) |
| except Exception as e: |
| logger.warning(e) |
|
|
| msgs.append({ |
| "role": "user", |
| "content": message |
| }) |
|
|
| |
| |
| |
| fallback_providers = [ |
| provider_name, |
| "Perplexity", |
| "Copilot", |
| "Blackbox", |
| "DeepSeek", |
| "Bing", |
| "You", |
| "Qwen" |
| ] |
|
|
| used = [] |
|
|
| for pname in fallback_providers: |
|
|
| if pname in used: |
| continue |
|
|
| used.append(pname) |
|
|
| pobj = REAL_PROVIDERS.get(pname) |
|
|
| if not pobj: |
| continue |
|
|
| if pname not in _PROVIDER_MODEL_CACHE: |
| _PROVIDER_MODEL_CACHE[pname] = discover_provider_models( |
| pobj, |
| pname |
| ) |
|
|
| model_candidates = [model_name] + [ |
| x for x in _PROVIDER_MODEL_CACHE[pname] |
| if x != model_name |
| ] |
|
|
| for m in model_candidates[:12]: |
|
|
| try: |
| stream = g4f.ChatCompletion.create( |
| model=m, |
| provider=pobj, |
| messages=msgs, |
| stream=True |
| ) |
|
|
| text = "" |
|
|
| for chunk in stream: |
| c = clean_stream(chunk) |
|
|
| if c is None or c == "": |
| continue |
|
|
| text += c |
| yield text |
|
|
| if text.strip(): |
| _cache_set(key, text) |
| return |
|
|
| except Exception as e: |
| logger.warning(e) |
| continue |
|
|
| yield "❌ Failed with all providers." |
|
|
| |
| |
| |
| css = """ |
| .gradio-container{ |
| max-width:1200px!important; |
| } |
| """ |
|
|
| providers = list(REAL_PROVIDERS.keys()) |
|
|
| default_provider = ( |
| "Perplexity" |
| if "Perplexity" in providers |
| else providers[0] |
| ) |
|
|
| default_models = discover_provider_models( |
| REAL_PROVIDERS[default_provider], |
| default_provider |
| ) |
|
|
| with gr.Blocks( |
| theme=gr.themes.Soft(), |
| css=css |
| ) as demo: |
|
|
| gr.Markdown("# 🤖 G4F Smart Router FINAL") |
| gr.Markdown(COOKIE_STATUS) |
|
|
| with gr.Row(): |
|
|
| provider_dd = gr.Dropdown( |
| choices=providers, |
| value=default_provider, |
| label="Provider" |
| ) |
|
|
| model_dd = gr.Dropdown( |
| choices=default_models, |
| value=default_models[0], |
| label="Model" |
| ) |
|
|
| provider_dd.change( |
| update_models, |
| inputs=provider_dd, |
| outputs=model_dd |
| ) |
|
|
| gr.ChatInterface( |
| fn=ask, |
| additional_inputs=[ |
| provider_dd, |
| model_dd |
| ], |
| title="", |
| description="Choose provider → models update automatically." |
| ) |
|
|
| demo.queue().launch( |
| server_name="0.0.0.0", |
| server_port=int(os.getenv("PORT", "7860")) |
| ) |
|
|