Coowork / app.py
bahi-bh's picture
Update app.py
b275930 verified
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import json
import time
import uuid
import logging
import g4f
from g4f.client import Client
# =====================================================
# LOGGING
# =====================================================
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# =====================================================
# CONFIG
# =====================================================
API_KEY = "sk-your-secret-key"
# =====================================================
# FASTAPI
# =====================================================
app = FastAPI(
title="Universal AI Gateway",
version="5.3.0"
)
# =====================================================
# CORS
# =====================================================
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =====================================================
# MODELS
# =====================================================
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: bool = False
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 4096
# =====================================================
# AUTH
# =====================================================
def verify_api_key(req: Request):
auth = req.headers.get("Authorization")
x_api_key = req.headers.get("x-api-key")
# السماح بدون مفتاح للاختبار
if not auth and not x_api_key:
return True
token = None
if x_api_key:
token = x_api_key.strip()
elif auth:
parts = auth.split(" ", 1)
if len(parts) == 2:
token = parts[1].strip()
if not token:
return True
if API_KEY and token != API_KEY:
raise HTTPException(
status_code=403,
detail="Invalid API Key"
)
return True
# =====================================================
# ROOT
# =====================================================
@app.get("/")
async def root():
return {
"status": "online",
"service": "Universal AI Gateway",
"version": "5.3.0"
}
@app.head("/")
async def root_head():
return JSONResponse(
status_code=200,
content={}
)
# =====================================================
# MODEL NORMALIZER
# =====================================================
def normalize_model(model: str):
if not model:
return "gpt-4"
model_lower = model.lower()
# GPT
if model_lower == "gpt-4":
return "gpt-4"
if model_lower == "gpt-4o":
return "gpt-4o"
if model_lower == "gpt-4o-mini":
return "gpt-4o-mini"
if model_lower == "gpt-3.5-turbo":
return "gpt-3.5-turbo"
# Claude
if "claude" in model_lower:
return "gpt-4"
# Gemini
if "gemini" in model_lower:
return "gemini"
# Command
if model_lower == "command-r":
return "command-r"
if model_lower == "command-r-plus":
return "command-r-plus"
# Grok
if "grok" in model_lower:
return "gpt-4"
# Llama
if "llama" in model_lower:
return "llama-3-70b"
# Mistral
if "mistral" in model_lower:
return "mistral-7b"
return model
# =====================================================
# SAFE CLIENT
# =====================================================
def create_client():
return Client()
# =====================================================
# MODELS
# =====================================================
@app.get("/v1/models")
async def get_models():
models_data = []
visible_models = [
# GPT
"gpt-4",
"gpt-4-turbo",
"gpt-4o",
"gpt-4o-mini",
"gpt-3.5-turbo",
# Claude
"claude-sonnet-4-6",
"claude-opus-4",
"claude-3-7-sonnet",
# Gemini
"gemini-2.5-pro",
"gemini-2.5-flash",
# Cohere
"command-r",
"command-r-plus",
# Other
"grok-4",
"llama-3-70b",
"mistral-7b"
]
for model in visible_models:
models_data.append({
"id": model,
"object": "model",
"created": int(time.time()),
"owned_by": "openai",
"permission": [],
"root": model,
"parent": None
})
return {
"object": "list",
"data": models_data
}
# =====================================================
# CHAT COMPLETIONS
# =====================================================
@app.post("/v1/chat/completions")
async def chat_completions(
req: Request,
body: ChatRequest
):
verify_api_key(req)
model = normalize_model(
body.model
)
messages = [
{
"role": m.role,
"content": m.content
}
for m in body.messages
]
logger.info(
f"Request model={model} stream={body.stream}"
)
if body.stream:
def generate_stream():
chunk_id = f"chatcmpl-{uuid.uuid4().hex}"
try:
client = create_client()
response = client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
has_content = False
for chunk in response:
try:
content = ""
if (
hasattr(chunk, "choices")
and chunk.choices
and len(chunk.choices) > 0
and hasattr(chunk.choices[0], "delta")
and chunk.choices[0].delta
and chunk.choices[0].delta.content
):
content = (
chunk
.choices[0]
.delta.content
)
if not content:
continue
has_content = True
payload = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {
"content": content
},
"finish_reason": None
}
]
}
yield (
"data: "
+ json.dumps(
payload,
ensure_ascii=False
)
+ "\n\n"
)
except Exception as chunk_error:
logger.error(
f"Chunk error: {chunk_error}"
)
if not has_content:
yield (
"data: "
+ json.dumps({
"error": {
"message": "Empty response"
}
})
+ "\n\n"
)
final_payload = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop"
}
]
}
yield (
"data: "
+ json.dumps(final_payload)
+ "\n\n"
)
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(
f"Streaming error: {e}"
)
yield (
"data: "
+ json.dumps({
"error": {
"message": str(e)
}
})
+ "\n\n"
)
yield "data: [DONE]\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
try:
client = create_client()
response = client.chat.completions.create(
model=model,
messages=messages,
stream=False
)
assistant_message = ""
try:
assistant_message = (
response
.choices[0]
.message
.content
)
except Exception:
assistant_message = str(response)
return JSONResponse({
"id": (
f"chatcmpl-{uuid.uuid4().hex}"
),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": assistant_message
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
})
except Exception as e:
logger.error(f"Chat error: {e}")
raise HTTPException(
status_code=500,
detail=str(e)
)
# =====================================================
# ANTHROPIC MESSAGES
# =====================================================
@app.post("/v1/messages")
async def anthropic_messages(
req: Request,
body: dict
):
verify_api_key(req)
requested_model = body.get("model")
model = normalize_model(
requested_model
)
messages = body.get("messages", [])
logger.info(
f"Anthropic request model={model} stream=True"
)
converted_messages = []
for m in messages:
role = m.get("role", "user")
content = m.get("content", "")
if isinstance(content, list):
text_parts = []
for part in content:
if (
isinstance(part, dict)
and part.get("type") == "text"
):
text_parts.append(
part.get("text", "")
)
content = "\n".join(text_parts)
converted_messages.append({
"role": role,
"content": content
})
def generate_stream():
message_id = f"msg_{uuid.uuid4().hex}"
try:
client = create_client()
response = client.chat.completions.create(
model=model,
messages=converted_messages,
stream=True
)
# =========================================
# message_start
# =========================================
yield (
"event: message_start\n"
"data: "
+ json.dumps({
"type": "message_start",
"message": {
"id": message_id,
"type": "message",
"role": "assistant",
"model": requested_model,
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {
"input_tokens": 0,
"output_tokens": 0
}
}
})
+ "\n\n"
)
# =========================================
# ping
# =========================================
yield (
"event: ping\n"
"data: "
+ json.dumps({
"type": "ping"
})
+ "\n\n"
)
# =========================================
# content_block_start
# =========================================
yield (
"event: content_block_start\n"
"data: "
+ json.dumps({
"type": "content_block_start",
"index": 0,
"content_block": {
"type": "text",
"text": ""
}
})
+ "\n\n"
)
has_content = False
for chunk in response:
try:
content = ""
if (
hasattr(chunk, "choices")
and chunk.choices
and len(chunk.choices) > 0
and hasattr(chunk.choices[0], "delta")
and chunk.choices[0].delta
and chunk.choices[0].delta.content
):
content = (
chunk
.choices[0]
.delta.content
)
if not content:
continue
has_content = True
payload = {
"type": "content_block_delta",
"index": 0,
"delta": {
"type": "text_delta",
"text": content
}
}
yield (
"event: content_block_delta\n"
"data: "
+ json.dumps(
payload,
ensure_ascii=False
)
+ "\n\n"
)
except Exception as e:
logger.error(
f"Anthropic chunk error: {e}"
)
if not has_content:
yield (
"event: error\n"
"data: "
+ json.dumps({
"type": "error",
"error": {
"type": "api_error",
"message": "Empty response"
}
})
+ "\n\n"
)
return
# =========================================
# content_block_stop
# =========================================
yield (
"event: content_block_stop\n"
"data: "
+ json.dumps({
"type": "content_block_stop",
"index": 0
})
+ "\n\n"
)
# =========================================
# message_delta
# =========================================
yield (
"event: message_delta\n"
"data: "
+ json.dumps({
"type": "message_delta",
"delta": {
"stop_reason": "end_turn",
"stop_sequence": None
},
"usage": {
"output_tokens": 0
}
})
+ "\n\n"
)
# =========================================
# message_stop
# =========================================
yield (
"event: message_stop\n"
"data: "
+ json.dumps({
"type": "message_stop"
})
+ "\n\n"
)
except Exception as e:
logger.error(
f"Anthropic stream error: {e}"
)
yield (
"event: error\n"
"data: "
+ json.dumps({
"type": "error",
"error": {
"type": "api_error",
"message": str(e)
}
})
+ "\n\n"
)
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
# =====================================================
# RUN
# =====================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info"
)