Spaces:
Running
Running
| import gradio as gr | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| import random | |
| import threading | |
| import uvicorn | |
| import aiohttp | |
| from src.config import get_api_keys | |
| from src.model_tester import ModelTester | |
| from src.scheduler import Scheduler | |
| model_tester = ModelTester() | |
| scheduler = Scheduler(task_callback=lambda: model_tester.scan_all_models()) | |
| fastapi_app = FastAPI(title="OpenRouter Free API") | |
| async def startup_event(): | |
| threading.Thread(target=lambda: scheduler.start(), daemon=True).start() | |
| async def list_models(): | |
| available = model_tester.get_available_models(free_only=False) | |
| available_free = model_tester.get_available_models(free_only=True) | |
| models = [] | |
| for model_id in available: | |
| is_free = model_id in available_free | |
| models.append({ | |
| "id": model_id, | |
| "object": "model", | |
| "created": 1677610602, | |
| "owned_by": "openrouter", | |
| "free": is_free | |
| }) | |
| return {"object": "list", "data": models} | |
| async def proxy_request(body: dict): | |
| """透传请求,只替换 model 和 key""" | |
| api_key = random.choice(get_api_keys()) | |
| model_tester.refresh_model_list() | |
| available_free = model_tester.get_all_free_models() | |
| model_hint = body.get("model") | |
| target_model = None | |
| if model_hint and available_free: | |
| for m in available_free: | |
| model_name = m.replace(":free", "").split("/")[-1] | |
| if model_hint.lower() in model_name.lower(): | |
| target_model = m | |
| break | |
| if not target_model and available_free: | |
| target_model = random.choice(available_free[:5]) | |
| if not target_model: | |
| raise HTTPException(status_code=400, detail="No available model") | |
| body["model"] = target_model | |
| url = "https://openrouter.ai/api/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(url, json=body, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response: | |
| if body.get("stream"): | |
| async for chunk in response.content: | |
| yield chunk | |
| else: | |
| yield await response.json() | |
| async def chat_completions(request: Request): | |
| body = await request.json() | |
| if body.get("stream"): | |
| return StreamingResponse( | |
| proxy_request(body), | |
| media_type="text/event-stream" | |
| ) | |
| result = None | |
| async for data in proxy_request(body): | |
| result = data | |
| break | |
| if not result: | |
| raise HTTPException(status_code=400, detail="Request failed") | |
| if "error" in result: | |
| raise HTTPException(status_code=400, detail=result["error"]) | |
| return result | |
| async def health(): | |
| return {"status": "ok"} | |
| def get_scan_status(): | |
| scan_result = model_tester.scan_result | |
| total = scan_result.get("total_available", 0) | |
| free = scan_result.get("free_available", 0) | |
| return f"Free: {free} | Total: {total}" | |
| def format_model_list(models): | |
| return "\n".join(models) if models else "No models available" | |
| with gr.Blocks(title="OpenRouter Free API") as demo: | |
| gr.Markdown("# OpenRouter Free API") | |
| gr.Markdown("Standard OpenAI-compatible API with free model support") | |
| gr.Markdown(f"**Status: {get_scan_status()}**") | |
| gr.Markdown("## Available Free Models") | |
| gr.Textbox(value=format_model_list(model_tester.get_available_models(free_only=True)), lines=15, interactive=False) | |
| app = gr.mount_gradio_app(fastapi_app, demo, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |