Dffdfg / app.py
bahi-bh's picture
Update app.py
2d2e230 verified
import os
import json
import time
import logging
from typing import Any, Dict, List, Optional, Tuple
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
import g4f
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("g4f-smart-router")
# =====================================================
# COOKIES
# =====================================================
def _load_cookies_raw() -> Dict[str, Any]:
raw_env = (os.getenv("COOKIES_JSON") or "").strip()
if raw_env:
try:
return json.loads(raw_env)
except Exception as e:
logger.warning(e)
try:
if os.path.exists("cookies.json"):
with open("cookies.json", "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(e)
return {}
def load_cookies() -> str:
data = _load_cookies_raw()
if not data:
return "⚠️ No Cookies"
try:
from g4f.cookies import set_cookies
except Exception:
return "⚠️ Cookies Found"
for domain, vals in data.items():
try:
dom = domain if "." in domain else f".{domain}.com"
if isinstance(vals, list):
vals = {
x["name"]: x["value"]
for x in vals
if isinstance(x, dict)
}
if isinstance(vals, dict):
set_cookies(dom, vals)
except Exception as e:
logger.warning(e)
return "✅ Cookies Loaded"
COOKIE_STATUS = load_cookies()
# =====================================================
# PROVIDERS
# =====================================================
def get_provider(name: str):
try:
return getattr(g4f.Provider, name)
except:
return None
REAL_PROVIDERS: Dict[str, Any] = {
"Perplexity": get_provider("Perplexity") or get_provider("PerplexityAi"),
"Copilot": get_provider("Copilot"),
"Qwen": get_provider("Qwen"),
"Blackbox": get_provider("Blackbox"),
"DeepSeek": get_provider("DeepSeek"),
"HuggingFace": get_provider("HuggingFace"),
"Cloudflare": get_provider("Cloudflare"),
"You": get_provider("You"),
"Bing": get_provider("Bing"),
}
REAL_PROVIDERS = {k: v for k, v in REAL_PROVIDERS.items() if v}
# =====================================================
# MODELS
# =====================================================
PROVIDER_MODELS_FALLBACK: Dict[str, List[str]] = {
"Perplexity": ["sonar", "sonar-pro", "gpt-4o", "llama-3"],
"Copilot": ["gpt-4o", "gpt-4", "turbo"],
"Qwen": ["qwen-max", "qwen-plus", "qwen-turbo", "qwen"],
"Blackbox": ["gpt-4o", "claude-3", "gemini-pro", "llama-3"],
"DeepSeek": ["deepseek-chat", "deepseek-coder"],
"HuggingFace": ["meta-llama", "mistral", "qwen", "gemma"],
"Cloudflare": ["llama-3", "mistral", "qwen"],
"You": ["gpt-4o", "claude", "llama-3"],
"Bing": ["gpt-4o", "gpt-4"],
}
# =====================================================
# MODEL DISCOVERY
# =====================================================
def _normalize_model_list(x: Any) -> List[str]:
out: List[str] = []
if x is None:
return out
if isinstance(x, dict):
out = [str(k) for k in x.keys()]
elif isinstance(x, (list, tuple, set)):
out = [str(i) for i in x]
else:
out = [str(x)]
out = [m.strip() for m in out if m]
seen = set()
uniq = []
for m in out:
if m not in seen:
uniq.append(m)
seen.add(m)
return uniq
def discover_provider_models(provider_obj: Any, provider_name: str) -> List[str]:
candidates = []
for attr in (
"models",
"model",
"default_model",
"available_models",
"supported_models"
):
try:
if hasattr(provider_obj, attr):
candidates += _normalize_model_list(
getattr(provider_obj, attr)
)
except:
pass
for fn in ("get_models", "models_list", "list_models"):
try:
if hasattr(provider_obj, fn):
candidates += _normalize_model_list(
getattr(provider_obj, fn)()
)
except:
pass
if not candidates:
candidates = PROVIDER_MODELS_FALLBACK.get(
provider_name,
["gpt-4o"]
)
seen = set()
uniq = []
for m in candidates:
if m not in seen:
uniq.append(m)
seen.add(m)
return uniq
# =====================================================
# STREAM CLEANER
# =====================================================
def clean_stream(chunk):
try:
if isinstance(chunk, dict):
if 'choices' in chunk and chunk['choices']:
delta = chunk['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
if 'text' in delta:
return delta['text']
if 'content' in chunk:
return chunk['content']
if 'text' in chunk:
return chunk['text']
return ""
if isinstance(chunk, str):
if chunk.strip().startswith("{") and chunk.strip().endswith("}"):
try:
data = json.loads(chunk)
if 'choices' in data and data['choices']:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
return delta['content']
if 'content' in data:
return data['content']
if 'text' in data:
return data['text']
except:
pass
chunk = chunk.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', ' ')
chunk = chunk.replace('\\"', '"').replace("\\'", "'")
return chunk
return str(chunk)
except Exception as e:
logger.warning(f"خطأ في clean_stream: {e}")
return ""
# =====================================================
# CACHE
# =====================================================
CACHE = {}
CACHE_TS = {}
def _cache_get(key):
if key not in CACHE:
return None
return CACHE[key]
def _cache_set(key, val):
CACHE[key] = val
CACHE_TS[key] = time.time()
# =====================================================
# CHAT LOGIC
# =====================================================
_PROVIDER_MODEL_CACHE = {}
def ask(message: str, history, provider_name: str, model_name: str):
history = history or []
message = (message or "").strip()
if not message:
yield ""
return
key = f"{provider_name}|{model_name}|{message}"
cached = _cache_get(key)
if cached is not None:
yield cached
return
msgs = []
try:
if history:
if isinstance(history[0], dict):
for item in history[-40:]:
role = item.get("role")
content = item.get("content")
if role and content:
msgs.append({
"role": str(role),
"content": str(content)
})
else:
for item in history[-20:]:
if isinstance(item, (list, tuple)) and len(item) == 2:
u = item[0]
a = item[1]
if u:
msgs.append({
"role": "user",
"content": str(u)
})
if a:
msgs.append({
"role": "assistant",
"content": str(a)
})
except Exception as e:
logger.warning(e)
msgs.append({
"role": "user",
"content": message
})
fallback_providers = [
provider_name,
"Perplexity",
"Copilot",
"Blackbox",
"DeepSeek",
"Bing",
"You",
"Qwen"
]
used = []
for pname in fallback_providers:
if pname in used:
continue
used.append(pname)
pobj = REAL_PROVIDERS.get(pname)
if not pobj:
continue
if pname not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[pname] = discover_provider_models(
pobj,
pname
)
model_candidates = [model_name] + [
x for x in _PROVIDER_MODEL_CACHE[pname]
if x != model_name
]
for m in model_candidates[:12]:
try:
stream = g4f.ChatCompletion.create(
model=m,
provider=pobj,
messages=msgs,
stream=True
)
text = ""
for chunk in stream:
c = clean_stream(chunk)
if c is None or c == "":
continue
text += c
yield text
if text.strip():
_cache_set(key, text)
return
except Exception as e:
logger.warning(e)
continue
yield "❌ Failed with all providers."
# =====================================================
# FASTAPI
# =====================================================
app = FastAPI(title="G4F API", description="G4F Smart Router API")
API_KEY = os.getenv("API_KEY", "mysecretkey123")
class ChatRequest(BaseModel):
message: str
provider: str = "Perplexity"
model: str = "sonar"
history: List[Any] = []
# =====================================================
# تعديل دالة التحقق لدعم X-API-Key مع الاحتفاظ بـ Bearer
# =====================================================
def verify_api_key(request: Request):
# الطريقة الأولى: Authorization: Bearer XXX
auth = request.headers.get("Authorization")
if auth and auth.startswith("Bearer "):
key = auth.replace("Bearer ", "")
if key == API_KEY:
return True
# الطريقة الثانية: X-API-Key: XXX
x_api_key = request.headers.get("X-API-Key")
if x_api_key and x_api_key == API_KEY:
return True
# لا يوجد مفتاح صالح
raise HTTPException(
status_code=401,
detail="Invalid or missing API key. Use 'Authorization: Bearer KEY' or 'X-API-Key: KEY'"
)
# =====================================================
# نقاط النهاية (بدون تغيير)
# =====================================================
@app.get("/")
async def root():
return {
"message": "G4F API is running",
"endpoints": {
"GET /": "هذه الصفحة",
"GET /health": "التحقق من صحة الخادم",
"GET /providers": "قائمة المزودين والنماذج (يتطلب مفتاح)",
"POST /chat": "إرسال رسالة والحصول على رد (يتطلب مفتاح)",
"POST /chat/stream": "إرسال رسالة والحصول على رد متدفق (يتطلب مفتاح)"
},
"authentication": "Bearer YOUR_API_KEY or X-API-Key: YOUR_API_KEY",
"status": "✅ Server is working"
}
@app.get("/health")
async def health():
return {"status": "ok", "cookies": COOKIE_STATUS, "providers": list(REAL_PROVIDERS.keys())}
@app.post("/chat")
async def chat(request: Request, chat_req: ChatRequest):
verify_api_key(request)
full_response = ""
for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model):
full_response = chunk
return JSONResponse(content={"response": full_response})
@app.post("/chat/stream")
async def chat_stream(request: Request, chat_req: ChatRequest):
verify_api_key(request)
async def generate():
for chunk in ask(chat_req.message, chat_req.history, chat_req.provider, chat_req.model):
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.get("/providers")
async def get_providers(request: Request):
verify_api_key(request)
providers_info = {}
for pname, pobj in REAL_PROVIDERS.items():
if pname not in _PROVIDER_MODEL_CACHE:
_PROVIDER_MODEL_CACHE[pname] = discover_provider_models(pobj, pname)
providers_info[pname] = _PROVIDER_MODEL_CACHE[pname]
return JSONResponse(content={"providers": providers_info})
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "7860"))
uvicorn.run(app, host="0.0.0.0", port=port)