bh / app.py
bahi-bh's picture
Create app.py
b8f9b0d verified
import os
import json
import time
import logging
from typing import Any, Dict, List, Optional, Tuple
import gradio as gr
import g4f
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("g4f-smart-router")
# =====================================================
# COOKIES
# =====================================================
def _load_cookies_raw() -> Dict[str, Any]:
raw_env = (os.getenv("COOKIES_JSON") or "").strip()
if raw_env:
try:
return json.loads(raw_env)
except Exception as e:
logger.warning(e)
try:
if os.path.exists("cookies.json"):
with open("cookies.json", "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(e)
return {}
def load_cookies() -> str:
data = _load_cookies_raw()
if not data:
return "⚠️ No Cookies"
try:
from g4f.cookies import set_cookies
except Exception:
return "⚠️ Cookies Found"
for domain, vals in data.items():
try:
dom = domain if "." in domain else f".{domain}.com"
if isinstance(vals, list):
vals = {
x["name"]: x["value"]
for x in vals
if isinstance(x, dict)
}
if isinstance(vals, dict):
set_cookies(dom, vals)
except Exception as e:
logger.warning(e)
return "✅ Cookies Loaded"
COOKIE_STATUS = load_cookies()
# =====================================================
# PROVIDERS
# =====================================================
def get_provider(name: str):
try:
return getattr(g4f.Provider, name)
except:
return None
REAL_PROVIDERS: Dict[str, Any] = {
"Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"),
"Copilot": get_provider("Copilot"),
"Qwen": get_provider("Qwen"),
"Blackbox": get_provider("Blackbox"),
"DeepSeek": get_provider("DeepSeek"),
"HuggingFace": get_provider("HuggingFace"),
"Cloudflare": get_provider("Cloudflare"),
"You": get_provider("You"),
"Bing": get_provider("Bing"),
}
REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v}
# =====================================================
# MODELS
# =====================================================
PROVIDER_MODELS_FALLBACK: Dict[str, List[str]] = {
"Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"],
"Copilot": ["gpt-4o", "gpt-4", "turbo"],
"Qwen": ["qwen-max", "qwen-plus", "qwen-turbo", "qwen"],
"Blackbox": ["gpt-4o", "claude-3", "gemini-pro", "llama-3"],
"DeepSeek": ["deepseek-chat", "deepseek-coder"],
"HuggingFace": ["meta-llama", "mistral", "qwen", "gemma"],
"Cloudflare": ["llama-3", "mistral", "qwen"],
"You": ["gpt-4o", "claude", "llama-3"],
"Bing": ["gpt-4o", "gpt-4"],
}
# =====================================================
# MODEL DISCOVERY
# =====================================================
def _normalize_model_list(x: Any) -> List[str]:
out: List[str] = []
if x is None:
return out
if isinstance(x, dict):
out = [str(k) for k in x.keys()]
elif isinstance(x, (list, tuple, set)):
out = [str(i) for i in x]
else:
out = [str(x)]
out = [m.strip() for m in out if m]
seen = set()
uniq = []
for m in out:
if m not in seen:
uniq.append(m)
seen.add(m)
return uniq
def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]:
candidates = []
for attr in (
"models",
"model",
"default_model",
"available_models",
"supported_models"
):
try:
if hasattr(provider_obj, attr):
candidates += _normalize_model_list(
getattr(provider_obj, attr)
)
except:
pass
for fn in ("get_models", "models_list", "list_models"):
try:
if hasattr(provider_obj, fn):
candidates += _normalize_model_list(
getattr(provider_obj, fn)()
)
except:
pass
if not candidates:
candidates = PROVIDER_MODELS_FALLBACK.get(
provider_name,
["gpt-4o"]
)
seen = set()
uniq = []
for m in candidates:
if m not in seen:
uniq.append(m)
seen.add(m)
return uniq
# =====================================================
# STREAM CLEANER (محسّنة جداً - تحل مشاكل JSON والأكواد)
# =====================================================
def clean_stream(chunk):
try:
# إذا كانت القطعة قاموساً (dict) مثل JSON
if isinstance(chunk, dict):
# استخراج النص من تنسيق Perplexity
if 'choices' in chunk and chunk['choices']:
delta = chunk['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
if 'text' in delta:
return delta['text']
# تنسيقات أخرى
if 'content' in chunk:
return chunk['content']
if 'text' in chunk:
return chunk['text']
return ""
# إذا كانت القطعة نصاً
if isinstance(chunk, str):
# محاولة تفسيرها كـ JSON
if chunk.strip().startswith("{") and chunk.strip().endswith("}"):
try:
data = json.loads(chunk)
# استخراج النص من JSON
if 'choices' in data and data['choices']:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
if 'content' in data:
return data['content']
if 'text' in data:
return data['text']
except:
pass
# تنظيف علامات الترميز (للكود)
chunk = chunk.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', ' ')
chunk = chunk.replace('\\"', '"').replace("\\'", "'")
return chunk
# إذا كانت القطعة شيئاً آخر
return str(chunk)
except Exception as e:
logger.warning(f"خطأ في clean_stream: {e}")
return ""
# =====================================================
# UI update
# =====================================================
_PROVIDER_MODEL_CACHE = {}
def update_models(provider_name):
pobj = REAL_PROVIDERS.get(provider_name)
if not pobj:
arr = ["gpt-4o"]
else:
if provider_name not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[provider_name] = discover_provider_models(
pobj,
provider_name
)
arr = _PROVIDER_MODEL_CACHE[provider_name]
return gr.update(
choices=arr,
value=arr[0]
)
# =====================================================
# CACHE
# =====================================================
CACHE = {}
CACHE_TS = {}
def _cache_get(key):
if key not in CACHE:
return None
return CACHE[key]
def _cache_set(key, val):
CACHE[key] = val
CACHE_TS[key] = time.time()
# =====================================================
# CHAT (محسّنة - ذاكرة أطول وfallback أفضل)
# =====================================================
def ask(message: str, history, provider_name: str, model_name: str):
history = history or []
message = (message or "").strip()
if not message:
yield ""
return
key = f"{provider_name}|{model_name}|{message}"
cached = _cache_get(key)
if cached is not None:
yield cached
return
# =================================================
# بناء الذاكرة (محسّن - يحفظ آخر 40/20 بدلاً من 16/8)
# =================================================
msgs = []
try:
if history:
# format جديد
if isinstance(history[0], dict):
# حفظ آخر 40 رسالة (بدلاً من 16)
for item in history[-40:]:
role = item.get("role")
content = item.get("content")
if role and content:
msgs.append({
"role": str(role),
"content": str(content)
})
# format قديم
else:
# حفظ آخر 20 محادثة (بدلاً من 8)
for item in history[-20:]:
if isinstance(item, (list, tuple)) and len(item) == 2:
u = item[0]
a = item[1]
if u:
msgs.append({
"role": "user",
"content": str(u)
})
if a:
msgs.append({
"role": "assistant",
"content": str(a)
})
except Exception as e:
logger.warning(e)
msgs.append({
"role": "user",
"content": message
})
# =================================================
# تجربة المزودين مع fallback
# =================================================
fallback_providers = [
provider_name,
"Perplexity",
"Copilot",
"Blackbox",
"DeepSeek",
"Bing",
"You",
"Qwen"
]
used = []
for pname in fallback_providers:
if pname in used:
continue
used.append(pname)
pobj = REAL_PROVIDERS.get(pname)
if not pobj:
continue
if pname not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[pname] = discover_provider_models(
pobj,
pname
)
model_candidates = [model_name] + [
x for x in _PROVIDER_MODEL_CACHE[pname]
if x != model_name
]
for m in model_candidates[:12]:
try:
stream = g4f.ChatCompletion.create(
model=m,
provider=pobj,
messages=msgs,
stream=True
)
text = ""
for chunk in stream:
c = clean_stream(chunk)
if c is None or c == "":
continue
text += c
yield text
if text.strip():
_cache_set(key, text)
return
except Exception as e:
logger.warning(e)
continue
yield "❌ Failed with all providers."
# =====================================================
# UI
# =====================================================
css = """
.gradio-container{
max-width:1200px!important;
}
"""
providers = list(REAL_PROVIDERS.keys())
default_provider = (
"Perplexity"
if "Perplexity" in providers
else providers[0]
)
default_models = discover_provider_models(
REAL_PROVIDERS[default_provider],
default_provider
)
with gr.Blocks(
theme=gr.themes.Soft(),
css=css
) as demo:
gr.Markdown("# 🤖 G4F Smart Router FINAL")
gr.Markdown(COOKIE_STATUS)
with gr.Row():
provider_dd = gr.Dropdown(
choices=providers,
value=default_provider,
label="Provider"
)
model_dd = gr.Dropdown(
choices=default_models,
value=default_models[0],
label="Model"
)
provider_dd.change(
update_models,
inputs=provider_dd,
outputs=model_dd
)
gr.ChatInterface(
fn=ask,
additional_inputs=[
provider_dd,
model_dd
],
title="",
description="Choose provider → models update automatically."
)
demo.queue().launch(
server_name="0.0.0.0",
server_port=int(os.getenv("PORT", "7860"))
)