at41rv-ai-api / app.py
At41rv
Update app.py
6ec3094 verified
from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Depends
from fastapi.responses import StreamingResponse, HTMLResponse, Response
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
import httpx
import os
import json
import logging
from datetime import datetime
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="At41rvAPI Services",
description="Professional AI Services - Chat Completions, Image Generation, Web Search, Speech-To-Text, and Text-to-Speech",
version="1.0.0",
servers=[
{
"url": "https://at41rv-at41rv-ai-api.hf.space",
"description": "Production server"
}
],
swagger_ui_parameters={"defaultModelsExpandDepth": -1}
)
# Get HF token from environment variable
HF_TOKEN = os.getenv("HF_TOKEN")
BACKEND_URL = "https://at41rv-at41rv-ai-api.hf.space" # This is your Hugging Face Space URL itself
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN", "") # You might want to use this as one of your valid keys or remove it.
if not HF_TOKEN:
logger.warning("HF_TOKEN not found in environment variables. Access token validation might be affected.")
security = HTTPBearer()
# Define the set of hardcoded valid API keys
# Users must use one of these keys to authenticate
VALID_HARDCODED_KEYS = {
"at41rv-2010",
"at41rv-admin",
"at41rv-public",
"at41rv-special"
}
# Optional: Add ACCESS_TOKEN from environment variables as a valid key if it exists
# This makes it easier if ACCESS_TOKEN is used for deployment/testing and you want it to be valid too.
if ACCESS_TOKEN:
VALID_HARDCODED_KEYS.add(ACCESS_TOKEN)
logger.info(f"Added ACCESS_TOKEN from environment variables to valid keys.")
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Verify the access token from the Authorization header against predefined keys.
"""
token = credentials.credentials
if token in VALID_HARDCODED_KEYS:
return token
else:
logger.error(f"Invalid access token provided: {token}")
raise HTTPException(status_code=401, detail="Invalid access token. Please use a valid predefined key.")
# Request timeout configuration
REQUEST_TIMEOUT = 60.0
# Pydantic models for requests (OpenAI Compatible)
class Message(BaseModel):
role: str = Field(..., description="Role of the message sender", example="user")
content: str = Field(..., description="Content of the message", example="Hello, how are you?")
class SimpleChatRequest(BaseModel):
model: str = Field(..., description="AI model to use for completion")
prompt: Optional[str] = Field(None, description="Text prompt for the AI to respond to (legacy)")
messages: Optional[List[Message]] = Field(None, description="List of messages for OpenAI-compatible chat")
system_prompt: Optional[str] = Field(
default="You are a helpful AI assistant.",
description="System prompt to set AI behavior and personality"
)
max_tokens: Optional[int] = Field(default=2048, description="Maximum tokens to generate")
temperature: Optional[float] = Field(default=0.7, description="Temperature for response randomness")
stream: Optional[bool] = Field(default=False, description="Whether to stream the response")
class GenerationRequest(BaseModel):
prompt: str
model: str
size: Optional[str] = "1024x1024"
class SearchRequest(BaseModel):
query: str
max_results: Optional[int] = 10
region: Optional[str] = "us"
safesearch: Optional[str] = "moderate"
max_chars: Optional[int] = 2000
class TTSRequest(BaseModel):
text: str
provider: str
voice: str
# These Pydantic models are no longer needed as key management endpoints are removed from frontend and backend logic.
# class APIKeyCreateRequest(BaseModel):
# name: str = Field(..., min_length=1, max_length=50, description="Name for the API key")
# class APIKeyResponse(BaseModel):
# success: bool
# api_key: str
# name: Optional[str] = ""
# message: str
# created_at: str
# expires: str = "Never"
# usage_note: str
# class APIKeyInfo(BaseModel):
# key: str
# name: str
# created_at: str
# last_used: Optional[str] = None
# usage_count: int
# key_preview: str
# class UserAPIKeysResponse(BaseModel):
# success: bool
# keys: List[APIKeyInfo]
# total_keys: int
# max_keys: int = 3
# remaining_slots: int
# class APIKeyDeleteResponse(BaseModel):
# success: bool
# message: str
async def make_backend_request(
endpoint: str,
method: str = "GET",
data: dict = None,
params: dict = None,
files: dict = None,
timeout: float = REQUEST_TIMEOUT,
# New parameter: token to be used for authentication in the backend request
auth_token: Optional[str] = None
) -> httpx.Response:
"""Make authenticated request to backend API"""
headers = {
# Use the provided auth_token, fallback to HF_TOKEN if not provided
"Authorization": f"Bearer {auth_token if auth_token else HF_TOKEN}",
"User-Agent": "OMNIAPI-Proxy/1.0"
}
# Only add Content-Type for JSON requests; httpx handles multipart for files
if not files:
headers["Content-Type"] = "application/json"
try:
async with httpx.AsyncClient(timeout=timeout) as client:
if method.upper() == "GET":
response = await client.get(
f"{BACKEND_URL}{endpoint}",
headers=headers,
params=params
)
elif method.upper() == "POST":
if files:
# For file uploads, don't set Content-Type (let httpx handle multipart)
headers.pop("Content-Type", None)
response = await client.post(
f"{BACKEND_URL}{endpoint}",
headers=headers,
files=files,
params=params
)
else:
response = await client.post(
f"{BACKEND_URL}{endpoint}",
headers=headers,
json=data,
params=params
)
elif method.upper() == "DELETE":
response = await client.delete(
f"{BACKEND_URL}{endpoint}",
headers=headers,
params=params
)
else:
raise HTTPException(status_code=405, detail="Method not allowed")
return response
except httpx.TimeoutException:
logger.error(f"Request timeout to {endpoint}")
raise HTTPException(status_code=504, detail="Backend request timeout")
except httpx.RequestError as e:
logger.error(f"Request error to {endpoint}: {str(e)}")
raise HTTPException(status_code=502, detail="Backend connection error")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/")
async def root():
"""Root endpoint returning OpenAPI specification"""
return {
"openapi": "3.0.0",
"info": {
"title": "OMNIAPI Services",
"description": "Professional AI Services - Chat Completions, Image Generation, Web Search, Speech-To-Text, and Text-to-Speech",
"version": "1.0.0"
},
"servers": [
{
# Dynamically set the server URL to match the current backend URL
"url": BACKEND_URL,
"description": "Production server"
}
],
"paths": {
"/": {
"get": {
"summary": "Root endpoint returning OpenAPI specification",
"responses": {
"200": {
"description": "OpenAPI specification",
"content": {
"application/json": {
"schema": {
"type": "object"
}
}
}
}
}
}
},
"/health": {
"get": {
"summary": "Health check endpoint",
# No security required for health check unless explicitly desired
"responses": {
"200": {
"description": "Service health status"
}
}
}
},
"/models": {
"get": {
"summary": "Get available chat models",
"security": [{"bearerAuth": []}],
"responses": {
"200": {
"description": "List of available models",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/chat/completions": {
"post": {
"summary": "Chat completions (OpenAI Compatible)",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"model": {"type": "string"},
"messages": {
"type": "array",
"items": {
"type": "object",
"properties": {
"role": {"type": "string"},
"content": {"type": "string"}
}
}
},
"max_tokens": {"type": "integer"},
"temperature": {"type": "number"},
"stream": {"type": "boolean"}
}
}
}
}
},
"responses": {
"200": {
"description": "Chat completion response",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/image/generate": {
"post": {
"summary": "Generate images",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"prompt": {"type": "string"},
"model": {"type": "string"},
"size": {"type": "string"}
}
}
}
}
},
"responses": {
"200": {
"description": "Generated image response",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/web/search": {
"post": {
"summary": "Web search",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"},
"region": {"type": "string"},
"safesearch": {"type": "string"}
}
}
}
}
},
"responses": {
"200": {
"description": "Search results",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/image/search": {
"post": {
"summary": "Image search",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"},
"region": {"type": "string"},
"safesearch": {"type": "string"}
}
}
}
}
},
"responses": {
"200": {
"description": "Image search results",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/videos/search": {
"post": {
"summary": "Video search",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"},
"region": {"type": "string"},
"safesearch": {"type": "string"}
}
}
}
}
},
"responses": {
"200": {
"description": "Video search results",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/tts/{provider}/voices": {
"get": {
"summary": "Get TTS voices for provider",
"security": [{"bearerAuth": []}],
"parameters": [
{
"name": "provider",
"in": "path",
"required": True,
"schema": {"type": "string"}
}
],
"responses": {
"200": {
"description": "Available voices",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/tts/generate": {
"post": {
"summary": "Generate TTS audio",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"text": {"type": "string"},
"provider": {"type": "string"},
"voice": {"type": "string"}
}
}
}
}
},
"responses": {
"200": {
"description": "Generated audio response",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/transcribe": {
"post": {
"summary": "Audio transcription",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": True,
"content": {
"multipart/form-data": {
"schema": {
"type": "object",
"properties": {
"audio_file": {
"type": "string",
"format": "binary"
}
}
}
}
}
},
"responses": {
"200": {
"description": "Transcription result",
"content": { # Added content here
"application/json": {}
}
}
}
}
},
"/audio/{audio_id}": {
"get": {
"summary": "Serve audio file",
"security": [{"bearerAuth": []}],
"parameters": [
{
"name": "audio_id",
"in": "path",
"required": True,
"schema": {"type": "string"}
}
],
"responses": {
"200": {
"description": "Audio file",
"content": {
"audio/mpeg": {
"schema": {
"type": "string",
"format": "binary"
}
}
}
}
}
}
},
"/image/{image_id}": {
"get": {
"summary": "Serve image file",
"security": [{"bearerAuth": []}],
"parameters": [
{
"name": "image_id",
"in": "path",
"required": True,
"schema": {"type": "string"}
}
],
"responses": {
"200": {
"description": "Image file",
"content": {
"image/jpeg": {
"schema": {
"type": "string",
"format": "binary"
}
}
}
}
}
}
},
# Removed /generate-api-key, /api-keys/list, /api-keys/{api_key} from OpenAPI paths
},
"components": {
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
try:
# Note: Health check typically does not require a token for external clients.
# If your backend /health endpoint *does* require the HF_TOKEN, keep the auth_token here.
# Otherwise, remove `auth_token=HF_TOKEN` from the make_backend_request call.
response = await make_backend_request("/health", auth_token=HF_TOKEN)
backend_status = "healthy" if response.status_code == 200 else "unhealthy"
return {
"status": "healthy",
"backend_status": backend_status,
"backend_url": BACKEND_URL,
"hf_token_configured": bool(HF_TOKEN),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"backend_url": BACKEND_URL,
"hf_token_configured": bool(HF_TOKEN),
"timestamp": datetime.now().isoformat()
}
# ==================== CHAT COMPLETIONS ====================
@app.get("/models")
async def get_chat_models(token: str = Depends(verify_token)):
"""Get available chat models"""
# Pass the verified token to the backend request
response = await make_backend_request("/models", auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail="Failed to fetch models")
@app.post("/chat/completions")
async def chat_completions(request: SimpleChatRequest, token: str = Depends(verify_token)):
"""Chat completions proxy (OpenAI Compatible)"""
# Validate input - either messages or prompt must be provided
if not request.messages and not request.prompt:
raise HTTPException(status_code=400, detail="Either 'messages' or 'prompt' must be provided")
# Convert request to dict and handle None values properly
request_data = request.dict(exclude_none=True)
# Pass the verified token to the backend request
response = await make_backend_request("/chat/completions", "POST", request_data, auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
# ==================== IMAGE GENERATION ====================
@app.post("/image/generate")
async def generate_image(request: GenerationRequest, token: str = Depends(verify_token)):
"""Image generation proxy"""
# Pass the verified token to the backend request
response = await make_backend_request("/image/generate", "POST", request.dict(), auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
# ==================== WEB SEARCH ====================
@app.post("/web/search")
async def web_search(request: SearchRequest, token: str = Depends(verify_token)):
"""Web search proxy"""
# Pass the verified token to the backend request
response = await make_backend_request("/web/search", "POST", request.dict(), auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
@app.post("/image/search")
async def image_search(request: SearchRequest, token: str = Depends(verify_token)):
"""Image search proxy"""
# Pass the verified token to the backend request
response = await make_backend_request("/image/search", "POST", request.dict(), auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
@app.post("/videos/search")
async def video_search(request: SearchRequest, token: str = Depends(verify_token)):
"""Video search proxy"""
# Pass the verified token to the backend request
response = await make_backend_request("/videos/search", "POST", request.dict(), auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
# ==================== TEXT-TO-SPEECH ====================
@app.get("/tts/{provider}/voices")
async def get_tts_voices(provider: str, token: str = Depends(verify_token)):
"""Get TTS voices for provider"""
# Pass the verified token to the backend request
response = await make_backend_request(f"/tts/{provider}/voices", auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
@app.post("/tts/generate")
async def generate_tts(request: TTSRequest, token: str = Depends(verify_token)):
"""Generate TTS audio"""
# Pass the verified token to the backend request
response = await make_backend_request("/tts/generate", "POST", request.dict(), auth_token=token)
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
@app.get("/audio/{audio_id}")
async def serve_audio(audio_id: str, token: str = Depends(verify_token)):
"""
Proxy audio file serving
To access audio files, use: https://at41rv-at41rv-ai-api.hf.space/v1/audio/{audio_id}
Example: https://at41rv-at41rv-ai-api.hf.space/v1/audio/GesseritTTS_Emma_1750428123_abc12345
"""
try:
# Pass the verified token to the backend request
response = await make_backend_request(f"/audio/{audio_id}", auth_token=token)
if response.status_code == 200:
return StreamingResponse(
iter([response.content]),
media_type="audio/mpeg",
headers={"Content-Disposition": f"inline; filename={audio_id}.mp3"}
)
else:
raise HTTPException(status_code=response.status_code, detail="Audio file not found")
except Exception as e:
logger.error(f"Audio serving error: {str(e)}")
raise HTTPException(status_code=404, detail="Audio file not found")
@app.get("/image/{image_id}")
async def serve_image(image_id: str, token: str = Depends(verify_token)):
"""
Proxy image file serving
To access image files, use: https://at41rv-at41rv-ai-api.hf.space/v1/image/{image_id}
Example: https://at41rv-at41rv-ai-api.hf.space/v1/image/img_1750428123_abc12345
"""
try:
# Pass the verified token to the backend request
response = await make_backend_request(f"/image/{image_id}", auth_token=token)
if response.status_code == 200:
return StreamingResponse(
iter([response.content]),
media_type="image/jpeg",
headers={"Content-Disposition": f"inline; filename={image_id}.jpg"}
)
else:
raise HTTPException(status_code=response.status_code, detail="Image file not found")
except Exception as e:
logger.error(f"Image serving error: {str(e)}")
raise HTTPException(status_code=404, detail="Image file not found")
@app.post("/transcribe")
async def transcribe_audio(audio_file: UploadFile = File(...), token: str = Depends(verify_token)):
"""Audio transcription proxy"""
try:
logger.info(f"Proxy: Received transcription request for file: {audio_file.filename}")
logger.info(f"Proxy: Content type: {audio_file.content_type}")
# Read file content
file_content = await audio_file.read()
logger.info(f"Proxy: File size: {len(file_content)} bytes")
if len(file_content) == 0:
return {
"success": False,
"message": "Empty file uploaded",
"transcription": None,
"filename": audio_file.filename,
"file_size": 0
}
# Check file size (limit to 50MB)
max_size = 50 * 1024 * 1024 # 50MB
if len(file_content) > max_size:
return {
"success": False,
"message": "File too large. Maximum size is 50MB",
"transcription": None,
"filename": audio_file.filename,
"file_size": len(file_content)
}
# Reset file pointer for forwarding
await audio_file.seek(0)
# Prepare file for backend request
files = {
"audio_file": (audio_file.filename or "audio.mp3", file_content, audio_file.content_type or "audio/mpeg")
}
logger.info(f"Proxy: Forwarding request to backend...")
# Use longer timeout for transcription
# Pass the verified token to the backend request
response = await make_backend_request("/transcribe", "POST", files=files, timeout=120.0, auth_token=token)
logger.info(f"Proxy: Backend responded with status: {response.status_code}")
if response.status_code == 200:
result = response.json()
logger.info("Proxy: Successfully received transcription from backend")
return result
else:
logger.error(f"Proxy: Backend error: {response.text}")
return {
"success": False,
"message": f"Backend error: {response.status_code}",
"transcription": None,
"filename": audio_file.filename,
"file_size": len(file_content)
}
except Exception as e:
logger.error(f"Proxy: Transcription error: {str(e)}")
return {
"success": False,
"message": f"Proxy error: {str(e)}",
"transcription": None,
"filename": audio_file.filename if audio_file else None,
"file_size": None
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7862)