Test-ckod / app.py
bahi-bh's picture
Create app.py
88a6b83 verified
import asyncio
import json
from concurrent.futures import ThreadPoolExecutor
from typing import List
import g4f
from fastapi import FastAPI, HTTPException, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
# =====================================
# CONFIG
# =====================================
API_KEY = "sk-your-secret-key"
PROVIDERS = [
g4f.Provider.DuckDuckGo,
g4f.Provider.Blackbox,
]
executor = ThreadPoolExecutor(max_workers=5)
# =====================================
# APP
# =====================================
app = FastAPI(
title="HuggingFace AI Gateway"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# =====================================
# MODELS
# =====================================
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str = "gpt-4o-mini"
messages: List[Message]
stream: bool = False
# =====================================
# AUTH
# =====================================
def verify_api_key(auth: str):
if not auth.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail="Invalid Authorization"
)
token = auth.replace("Bearer ", "")
if token != API_KEY:
raise HTTPException(
status_code=403,
detail="Invalid API Key"
)
# =====================================
# GENERATE
# =====================================
def generate(model, messages):
last_error = None
for provider in PROVIDERS:
try:
response = g4f.ChatCompletion.create(
model=model,
provider=provider,
messages=messages
)
return response
except Exception as e:
print(f"{provider.__name__}: {e}")
last_error = e
raise Exception(str(last_error))
# =====================================
# STREAM
# =====================================
async def stream_generate(model, messages):
loop = asyncio.get_event_loop()
for provider in PROVIDERS:
try:
response = await loop.run_in_executor(
executor,
lambda: g4f.ChatCompletion.create(
model=model,
provider=provider,
messages=messages,
stream=True
)
)
for chunk in response:
if chunk:
payload = {
"choices": [
{
"delta": {
"content": chunk
}
}
]
}
yield f"data: {json.dumps(payload)}\\n\\n"
yield "data: [DONE]\\n\\n"
return
except Exception as e:
print(f"Streaming Error: {e}")
continue
yield f"data: {json.dumps({'error':'All providers failed'})}\\n\\n"
# =====================================
# API
# =====================================
@app.get("/")
async def home():
return {
"status": "online"
}
@app.post("/v1/chat/completions")
async def chat(
req: ChatRequest,
authorization: str = Header(None)
):
if not authorization:
raise HTTPException(
status_code=401,
detail="Missing Authorization"
)
verify_api_key(authorization)
messages = [
m.model_dump()
for m in req.messages
]
# =================================
# STREAM
# =================================
if req.stream:
return StreamingResponse(
stream_generate(
req.model,
messages
),
media_type="text/event-stream"
)
# =================================
# NORMAL
# =================================
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(
executor,
lambda: generate(
req.model,
messages
)
)
return {
"choices": [
{
"message": {
"role": "assistant",
"content": response
}
}
]
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)