Test-p4g / app.py
bahi-bh's picture
Update app.py
7aa112f verified
import time
import random
import traceback
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
from g4f.client import Client
from g4f.Provider import __providers__
MODEL = "gpt-4o-mini"
MAX_RETRIES = 10
TIMEOUT = 60
SAFE_PROVIDERS = []
for provider in __providers__:
try:
name = provider.__name__.lower()
if not getattr(provider, "working", False):
continue
if getattr(provider, "needs_auth", False):
continue
if getattr(provider, "use_nodriver", False):
continue
blocked = [
"openai",
"qwen",
"copilot",
"gemini",
"claude",
]
if any(x in name for x in blocked):
continue
SAFE_PROVIDERS.append(provider)
except:
pass
random.shuffle(SAFE_PROVIDERS)
print(f"[+] SAFE PROVIDERS: {len(SAFE_PROVIDERS)}")
client = Client()
class SmartG4F:
def __init__(self):
self.good = []
self.bad = {}
def mark_bad(self, provider, cooldown=300):
self.bad[provider.__name__] = time.time() + cooldown
def is_bad(self, provider):
expire = self.bad.get(provider.__name__)
if not expire:
return False
if time.time() > expire:
del self.bad[provider.__name__]
return False
return True
def provider_pool(self):
providers = []
providers.extend(self.good)
for p in SAFE_PROVIDERS:
if p not in providers and not self.is_bad(p):
providers.append(p)
random.shuffle(providers)
return providers
def ask(self, prompt):
errors = []
for attempt in range(MAX_RETRIES):
pool = self.provider_pool()
if not pool:
return {
"success": False,
"error": "No providers available"
}
provider = random.choice(pool)
print(f"[TRY {attempt+1}] {provider.__name__}")
try:
response = client.chat.completions.create(
model=MODEL,
provider=provider,
messages=[
{
"role": "user",
"content": prompt
}
],
timeout=TIMEOUT,
)
text = response.choices[0].message.content
if not text:
raise Exception("Empty response")
if provider not in self.good:
self.good.append(provider)
return {
"success": True,
"provider": provider.__name__,
"response": text
}
except Exception as e:
err = str(e).lower()
traceback.print_exc()
errors.append({
"provider": provider.__name__,
"error": str(e)
})
if "429" in err:
self.mark_bad(provider, 600)
time.sleep(random.randint(10, 20))
elif "cloudflare" in err:
self.mark_bad(provider, 1200)
time.sleep(20)
elif "timeout" in err:
self.mark_bad(provider, 300)
else:
self.mark_bad(provider, 180)
continue
return {
"success": False,
"errors": errors
}
app = FastAPI()
ai = SmartG4F()
class Query(BaseModel):
prompt: str
@app.get("/")
async def home():
return {
"status": "running",
"providers": len(SAFE_PROVIDERS)
}
@app.post("/ask")
async def ask(query: Query):
return ai.ask(query.prompt)
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=7860
)