Spaces:
Running
Running
| FROM python:3.11-slim | |
| # Set environment variables | |
| ENV PYTHONDONTWRITEBYTECODE=1 | |
| ENV PYTHONUNBUFFERED=1 | |
| ENV PORT=7860 | |
| ENV HOST=0.0.0.0 | |
| # Install system dependencies | |
| RUN apt-get update && apt-get install -y --no-install-recommends \ | |
| build-essential \ | |
| && rm -rf /var/lib/apt/lists/* | |
| # Set working directory | |
| WORKDIR /app | |
| # Install Python dependencies (with fixes for Pydantic schema bugs and SOCKS proxy support) | |
| RUN pip install --no-cache-dir \ | |
| fastapi==0.109.0 \ | |
| uvicorn==0.27.0 \ | |
| "httpx[socks]==0.26.0" \ | |
| cloudscraper==1.2.71 \ | |
| "requests[socks]==2.31.0" \ | |
| gradio==4.44.0 \ | |
| "huggingface_hub<0.27.0" \ | |
| "pydantic<2.10" | |
| # Create the application file | |
| RUN cat > /app/app.py << 'PYTHON_EOF' | |
| #!/usr/bin/env python3 | |
| """ | |
| LMArena Bridge - Single File Version for Hugging Face Spaces | |
| A bridge to interact with LM Arena providing an OpenAI-compatible API. | |
| This version: | |
| - Uses Gradio for the web interface | |
| - Integrates TunnelBear VPN proxy for anonymity | |
| - Allows manual input of auth tokens and cf_clearance cookies | |
| - Uses httpx/cloudscraper for HTTP requests via SOCKS5 proxy | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| import base64 | |
| import secrets | |
| import mimetypes | |
| import threading | |
| import socket | |
| import socketserver | |
| import ssl | |
| import struct | |
| import logging | |
| from collections import defaultdict | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Optional, Dict, List, Any | |
| from contextlib import asynccontextmanager | |
| # FastAPI and web server | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, Request, Depends, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from starlette.responses import StreamingResponse, HTMLResponse, RedirectResponse | |
| # HTTP clients | |
| import httpx | |
| import cloudscraper | |
| import requests | |
| # Gradio for HF Spaces UI | |
| import gradio as gr | |
| # ============================================================ | |
| # TUNNELBEAR VPN PROXY (Embedded from proxy.py) | |
| # ============================================================ | |
| # TunnelBear credentials (hardcoded) | |
| TB_EMAIL = "overwrite249@gmail.com" | |
| TB_PASSWORD = "zaLV3uDsS_E+6VN" | |
| # API endpoints | |
| DASHBOARD_API = "https://prod-api-dashboard.tunnelbear.com/dashboard/web" | |
| TB_API = "https://api.tunnelbear.com" | |
| PB_API = "https://api.polargrizzly.com" | |
| URL_TOKEN = f"{DASHBOARD_API}/v2/token" | |
| URL_TOKEN_COOKIE = f"{DASHBOARD_API}/v2/tokenCookie" | |
| URL_TB_COOKIE_TOKEN = f"{TB_API}/v2/cookieToken" | |
| URL_PB_AUTH = f"{PB_API}/auth" | |
| URL_PB_USER = f"{PB_API}/user" | |
| URL_PB_VPNS = f"{PB_API}/vpns" | |
| APP_VERSION = "3.6.1" | |
| APP_ID = "com.tunnelbear.browser" | |
| DEFAULT_SOCKS5_HOST = "127.0.0.1" | |
| DEFAULT_SOCKS5_PORT = 1080 | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| proxy_log = logging.getLogger("TBProxy") | |
| class TunnelBearClient: | |
| """TunnelBear API client for authentication and server discovery""" | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0", | |
| "Accept": "application/json, text/plain, */*", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Content-Type": "application/json", | |
| }) | |
| self.device_id = f"browser-{uuid.uuid4()}" | |
| self.dashboard_token = "" | |
| self.tb_access_token = "" | |
| self.pb_auth_token = "" | |
| self.vpn_token = "" | |
| self.user_data = {} | |
| self.servers = [] | |
| self.region_name = "" | |
| self.country_iso = "" | |
| def login(self, email: str, password: str) -> bool: | |
| try: | |
| resp = self.session.post( | |
| URL_TOKEN, | |
| json={ | |
| "username": email, | |
| "password": password, | |
| "grant_type": "password", | |
| "device": self.device_id, | |
| }, | |
| ) | |
| if resp.status_code != 200: | |
| proxy_log.error(f"Login failed (HTTP {resp.status_code})") | |
| return False | |
| data = resp.json() | |
| self.dashboard_token = data.get("access_token", "") | |
| if not self.dashboard_token: | |
| return False | |
| proxy_log.info(f"Logged in. Token: {self.dashboard_token[:30]}...") | |
| return True | |
| except Exception as e: | |
| proxy_log.error(f"Login error: {e}") | |
| return False | |
| def get_session_cookie(self) -> bool: | |
| try: | |
| resp = self.session.post( | |
| URL_TOKEN_COOKIE, | |
| json={}, | |
| headers={"Authorization": f"Bearer {self.dashboard_token}"}, | |
| ) | |
| return resp.status_code == 200 | |
| except Exception: | |
| return False | |
| def get_cookie_token(self) -> bool: | |
| try: | |
| resp = self.session.post( | |
| URL_TB_COOKIE_TOKEN, | |
| headers={ | |
| "Authorization": f"Bearer {self.dashboard_token}", | |
| "device": self.device_id, | |
| "tunnelbear-app-id": APP_ID, | |
| "tunnelbear-app-version": APP_VERSION, | |
| "tunnelbear-platform": "Firefox", | |
| "tunnelbear-platform-version": "Firefox", | |
| }, | |
| ) | |
| if resp.status_code != 200: | |
| return False | |
| data = resp.json() | |
| self.tb_access_token = data.get("access_token", "") | |
| return bool(self.tb_access_token) | |
| except Exception: | |
| return False | |
| def authenticate_polarbear(self) -> bool: | |
| try: | |
| resp = self.session.post( | |
| URL_PB_AUTH, | |
| json={"partner": "tunnelbear", "token": self.tb_access_token}, | |
| ) | |
| if resp.status_code != 200: | |
| return False | |
| auth_header = resp.headers.get("authorization", "") | |
| if auth_header.startswith("Bearer "): | |
| self.pb_auth_token = auth_header[7:] | |
| return bool(self.pb_auth_token) | |
| return False | |
| except Exception: | |
| return False | |
| def get_user(self) -> bool: | |
| try: | |
| resp = self.session.get( | |
| URL_PB_USER, | |
| headers={"Authorization": f"Bearer {self.pb_auth_token}"}, | |
| ) | |
| if resp.status_code != 200: | |
| return False | |
| data = resp.json() | |
| self.user_data = data | |
| self.vpn_token = data.get("vpn_token", "") | |
| return bool(self.vpn_token) | |
| except Exception: | |
| return False | |
| def get_servers(self, country: str = None) -> bool: | |
| try: | |
| url = f"{URL_PB_VPNS}/countries/{country}" if country else URL_PB_VPNS | |
| resp = self.session.get( | |
| url, | |
| headers={"Authorization": f"Bearer {self.pb_auth_token}"}, | |
| ) | |
| if resp.status_code != 200: | |
| return False | |
| data = resp.json() | |
| vpns = data.get("vpns", []) | |
| if not vpns: | |
| return False | |
| self.region_name = data.get("region_name", "?") | |
| self.country_iso = data.get("country_iso", "?") | |
| self.servers = [] | |
| for v in vpns: | |
| if "url" in v: | |
| self.servers.append({ | |
| "url": v["url"], | |
| "protocol": v.get("protocol", "udp"), | |
| }) | |
| proxy_log.info(f"Found {len(self.servers)} servers in {self.region_name}") | |
| return bool(self.servers) | |
| except Exception: | |
| return False | |
| def pick_best_server(self) -> dict: | |
| tcp_servers = [s for s in self.servers if s["protocol"] == "tcp"] | |
| if tcp_servers: | |
| return tcp_servers[0] | |
| return self.servers[0] if self.servers else None | |
| def authenticate(self, email: str, password: str, country: str = None) -> bool: | |
| return ( | |
| self.login(email, password) and | |
| self.get_session_cookie() and | |
| self.get_cookie_token() and | |
| self.authenticate_polarbear() and | |
| self.get_user() and | |
| self.get_servers(country) | |
| ) | |
| class HTTPSConnectProxy: | |
| """HTTPS CONNECT proxy through TunnelBear's infrastructure""" | |
| def __init__(self, server_url: str, vpn_token: str): | |
| self.server_host = server_url | |
| self.server_port = 8080 | |
| self.vpn_token = vpn_token | |
| self._ssl_ctx = ssl.create_default_context() | |
| self._ssl_ctx.check_hostname = False | |
| self._ssl_ctx.verify_mode = ssl.CERT_NONE | |
| def _make_ssl_socket(self) -> ssl.SSLSocket: | |
| raw_sock = socket.create_connection( | |
| (self.server_host, self.server_port), timeout=30 | |
| ) | |
| try: | |
| return self._ssl_ctx.wrap_socket( | |
| raw_sock, server_hostname=self.server_host | |
| ) | |
| except Exception: | |
| raw_sock.close() | |
| raise | |
| def _read_http_response(self, sock: ssl.SSLSocket) -> tuple: | |
| response = b"" | |
| while b"\r\n\r\n" not in response: | |
| chunk = sock.recv(4096) | |
| if not chunk: | |
| raise ConnectionError("Proxy closed connection") | |
| response += chunk | |
| header_end = response.index(b"\r\n\r\n") | |
| header_data = response[:header_end].decode("utf-8", errors="ignore") | |
| body_remainder = response[header_end + 4:] | |
| lines = header_data.split("\r\n") | |
| status_line = lines[0] | |
| parts = status_line.split(" ", 2) | |
| status_code = int(parts[1]) if len(parts) >= 2 else 0 | |
| headers = {} | |
| for line in lines[1:]: | |
| if ":" in line: | |
| key, val = line.split(":", 1) | |
| headers[key.strip().lower()] = val.strip() | |
| return status_code, headers, body_remainder | |
| def connect(self, target_host: str, target_port: int) -> ssl.SSLSocket: | |
| auth_b64 = base64.b64encode( | |
| f"{self.vpn_token}:{self.vpn_token}".encode() | |
| ).decode() | |
| sock = self._make_ssl_socket() | |
| request = ( | |
| f"CONNECT {target_host}:{target_port} HTTP/1.1\r\n" | |
| f"Host: {target_host}:{target_port}\r\n" | |
| f"Proxy-Authorization: Basic {auth_b64}\r\n" | |
| f"User-Agent: TunnelBear/{APP_VERSION}\r\n" | |
| f"Proxy-Connection: Keep-Alive\r\n" | |
| f"\r\n" | |
| ) | |
| sock.sendall(request.encode()) | |
| status_code, headers, remainder = self._read_http_response(sock) | |
| if status_code == 200: | |
| return sock | |
| sock.close() | |
| raise ConnectionError(f"CONNECT failed: HTTP {status_code}") | |
| class SOCKS5Handler(socketserver.StreamRequestHandler): | |
| """SOCKS5 server handler""" | |
| proxy = None | |
| def handle(self): | |
| try: | |
| self._handle_socks5() | |
| except Exception: | |
| pass | |
| finally: | |
| try: | |
| self.connection.close() | |
| except OSError: | |
| pass | |
| def _handle_socks5(self): | |
| conn = self.connection | |
| ver_nmethods = self._recv_exact(conn, 2) | |
| if ver_nmethods[0] != 0x05: | |
| return | |
| n_methods = ver_nmethods[1] | |
| methods = self._recv_exact(conn, n_methods) | |
| if 0x00 in methods: | |
| conn.sendall(b"\x05\x00") | |
| elif 0x02 in methods: | |
| conn.sendall(b"\x05\x02") | |
| auth_data = self._recv_exact(conn, 2) | |
| ulen = auth_data[1] | |
| self._recv_exact(conn, ulen) | |
| plen = self._recv_exact(conn, 1)[0] | |
| self._recv_exact(conn, plen) | |
| conn.sendall(b"\x01\x00") | |
| else: | |
| conn.sendall(b"\x05\xFF") | |
| return | |
| header = self._recv_exact(conn, 4) | |
| _, cmd, _, addr_type = header | |
| if cmd != 0x01: | |
| self._reply(conn, 0x07) | |
| return | |
| if addr_type == 0x01: | |
| target_host = socket.inet_ntoa(self._recv_exact(conn, 4)) | |
| elif addr_type == 0x03: | |
| dlen = self._recv_exact(conn, 1)[0] | |
| target_host = self._recv_exact(conn, dlen).decode() | |
| elif addr_type == 0x04: | |
| target_host = socket.inet_ntop(socket.AF_INET6, self._recv_exact(conn, 16)) | |
| else: | |
| self._reply(conn, 0x08) | |
| return | |
| target_port = struct.unpack("!H", self._recv_exact(conn, 2))[0] | |
| proxy_log.debug(f"SOCKS5 -> {target_host}:{target_port}") | |
| try: | |
| remote = self.proxy.connect(target_host, target_port) | |
| except Exception: | |
| self._reply(conn, 0x05) | |
| return | |
| self._reply(conn, 0x00) | |
| self._relay(conn, remote) | |
| def _relay(self, a, b): | |
| a.settimeout(300) | |
| b.settimeout(300) | |
| def pump(src, dst): | |
| try: | |
| while True: | |
| data = src.recv(8192) | |
| if not data: | |
| break | |
| dst.sendall(data) | |
| except (socket.timeout, OSError): | |
| pass | |
| finally: | |
| try: | |
| dst.shutdown(socket.SHUT_WR) | |
| except OSError: | |
| pass | |
| t1 = threading.Thread(target=pump, args=(a, b), daemon=True) | |
| t2 = threading.Thread(target=pump, args=(b, a), daemon=True) | |
| t1.start() | |
| t2.start() | |
| t1.join(timeout=300) | |
| t2.join(timeout=300) | |
| def _reply(self, conn, code): | |
| conn.sendall(b"\x05" + bytes([code, 0, 1, 0, 0, 0, 0, 0, 0, 0])) | |
| @staticmethod | |
| def _recv_exact(sock, n): | |
| buf = bytearray() | |
| while len(buf) < n: | |
| chunk = sock.recv(n - len(buf)) | |
| if not chunk: | |
| raise ConnectionError("Connection closed") | |
| buf.extend(chunk) | |
| return bytes(buf) | |
| class ThreadedSOCKS5Server(socketserver.ThreadingMixIn, socketserver.TCPServer): | |
| allow_reuse_address = True | |
| daemon_threads = True | |
| # Global proxy state | |
| _proxy_client = None | |
| _proxy_server = None | |
| _proxy_running = False | |
| _socks5_port = DEFAULT_SOCKS5_PORT | |
| _socks5_host = DEFAULT_SOCKS5_HOST | |
| def start_tunnelbear_proxy(country: str = None) -> bool: | |
| """Start TunnelBear VPN SOCKS5 proxy""" | |
| global _proxy_client, _proxy_server, _proxy_running, _socks5_port | |
| if _proxy_running: | |
| proxy_log.info("Proxy already running") | |
| return True | |
| try: | |
| _proxy_client = TunnelBearClient() | |
| if not _proxy_client.authenticate(TB_EMAIL, TB_PASSWORD, country): | |
| proxy_log.error("Failed to authenticate with TunnelBear") | |
| return False | |
| best = _proxy_client.pick_best_server() | |
| if not best: | |
| proxy_log.error("No servers available") | |
| return False | |
| server_url = best["url"] | |
| server_proto = best["protocol"] | |
| proxy_log.info(f"Using server: {server_url}:8080 ({server_proto})") | |
| proxy = HTTPSConnectProxy(server_url, _proxy_client.vpn_token) | |
| SOCKS5Handler.proxy = proxy | |
| _proxy_server = ThreadedSOCKS5Server((_socks5_host, _socks5_port), SOCKS5Handler) | |
| proxy_thread = threading.Thread(target=_proxy_server.serve_forever, daemon=True) | |
| proxy_thread.start() | |
| _proxy_running = True | |
| proxy_log.info(f"SOCKS5 proxy running at {_socks5_host}:{_socks5_port}") | |
| return True | |
| except Exception as e: | |
| proxy_log.error(f"Failed to start proxy: {e}") | |
| return False | |
| def stop_tunnelbear_proxy(): | |
| """Stop TunnelBear VPN SOCKS5 proxy""" | |
| global _proxy_server, _proxy_running | |
| if _proxy_server: | |
| try: | |
| _proxy_server.shutdown() | |
| except Exception: | |
| pass | |
| _proxy_server = None | |
| _proxy_running = False | |
| proxy_log.info("Proxy stopped") | |
| def get_proxy_url() -> Optional[str]: | |
| """Get SOCKS5 proxy URL if proxy is running""" | |
| global _proxy_running | |
| if _proxy_running: | |
| return f"socks5://{_socks5_host}:{_socks5_port}" | |
| return None | |
| # ============================================================ | |
| # CONSTANTS | |
| # ============================================================ | |
| DEBUG = os.environ.get("DEBUG", "false").lower() == "true" | |
| PORT = int(os.environ.get("PORT", 7860)) | |
| HOST = os.environ.get("HOST", "0.0.0.0") | |
| # reCAPTCHA constants | |
| RECAPTCHA_SITEKEY = "6Led_uYrAAAAAKjxDIF58fgFtX3t8loNAK85bW9I" | |
| RECAPTCHA_ACTION = "chat_submit" | |
| RECAPTCHA_V2_SITEKEY = "6Ld7ePYrAAAAAB34ovoFoDau1fqCJ6IyOjFEQaMn" | |
| TURNSTILE_SITEKEY = "0x4AAAAAAA65vWDmG-O_lPtT" | |
| # LMArena URLs | |
| LMARENA_ORIGIN = "https://lmarena.ai" | |
| ARENA_ORIGIN = "https://arena.ai" | |
| STREAM_CREATE_EVALUATION_PATH = "/nextjs-api/stream/create-evaluation" | |
| # Timeouts | |
| DEFAULT_REQUEST_TIMEOUT = 120 | |
| DEFAULT_RATE_LIMIT_RPM = 60 | |
| # File paths (in HF Spaces, use /tmp for persistence) | |
| CONFIG_FILE = "/tmp/config.json" | |
| MODELS_FILE = "/tmp/models.json" | |
| # Cookie names | |
| CF_CLEARANCE_COOKIE = "cf_clearance" | |
| ARENA_AUTH_COOKIE = "arena-auth-prod-v1" | |
| # Image support | |
| MAX_IMAGE_SIZE_BYTES = 10 * 1024 * 1024 # 10MB | |
| SUPPORTED_IMAGE_MIME_TYPES = { | |
| "image/png", | |
| "image/jpeg", | |
| "image/gif", | |
| "image/webp", | |
| "image/svg+xml", | |
| } | |
| # Default User-Agent | |
| DEFAULT_USER_AGENT = ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/120.0.0.0 Safari/537.36" | |
| ) | |
| # ============================================================ | |
| # UTILITY FUNCTIONS | |
| # ============================================================ | |
| def debug_print(*args, **kwargs): | |
| """Print debug messages only if DEBUG is True""" | |
| if DEBUG: | |
| print(*args, **kwargs) | |
| def safe_print(*args, **kwargs): | |
| """Print without crashing on encoding issues""" | |
| try: | |
| print(*args, **kwargs) | |
| except UnicodeEncodeError: | |
| import sys | |
| text = " ".join(str(a) for a in args) | |
| print(text.encode("utf-8", errors="replace").decode("utf-8")) | |
| def uuid7(): | |
| """Generate a UUIDv7 using Unix epoch""" | |
| timestamp_ms = int(time.time() * 1000) | |
| rand_a = secrets.randbits(12) | |
| rand_b = secrets.randbits(62) | |
| uuid_int = timestamp_ms << 80 | |
| uuid_int |= (0x7000 | rand_a) << 64 | |
| uuid_int |= (0x8000000000000000 | rand_b) | |
| hex_str = f"{uuid_int:032x}" | |
| return f"{hex_str[0:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:32]}" | |
| # ============================================================ | |
| # CONFIG MANAGEMENT | |
| # ============================================================ | |
| def get_default_config() -> dict: | |
| """Get default configuration values""" | |
| return { | |
| "password": "admin", | |
| "auth_token": "", | |
| "auth_tokens": [], | |
| "cf_clearance": "", | |
| "api_keys": [], | |
| "usage_stats": {}, | |
| "user_agent": DEFAULT_USER_AGENT, | |
| "use_proxy": True, | |
| "proxy_country": "", | |
| } | |
| def get_config() -> dict: | |
| """Load configuration from file with defaults""" | |
| try: | |
| with open(CONFIG_FILE, "r") as f: | |
| config = json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| config = get_default_config() | |
| except Exception: | |
| config = get_default_config() | |
| # Ensure default keys exist | |
| config.setdefault("password", "admin") | |
| config.setdefault("auth_token", "") | |
| config.setdefault("auth_tokens", []) | |
| config.setdefault("cf_clearance", "") | |
| config.setdefault("api_keys", []) | |
| config.setdefault("usage_stats", {}) | |
| config.setdefault("user_agent", DEFAULT_USER_AGENT) | |
| config.setdefault("use_proxy", True) | |
| config.setdefault("proxy_country", "") | |
| return config | |
| def save_config(config: dict): | |
| """Save configuration to file""" | |
| try: | |
| with open(CONFIG_FILE, "w") as f: | |
| json.dump(config, f, indent=4) | |
| except Exception as e: | |
| debug_print(f"Error saving config: {e}") | |
| def get_models() -> list: | |
| """Load models from file""" | |
| try: | |
| with open(MODELS_FILE, "r") as f: | |
| return json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| return [] | |
| except Exception: | |
| return [] | |
| def save_models(models: list): | |
| """Save models to file""" | |
| try: | |
| with open(MODELS_FILE, "w") as f: | |
| json.dump(models, f, indent=2) | |
| except Exception as e: | |
| debug_print(f"Error saving models: {e}") | |
| # ============================================================ | |
| # AUTH TOKEN MANAGEMENT | |
| # ============================================================ | |
| def _decode_arena_auth_session_token(token: str) -> Optional[dict]: | |
| """Decode arena-auth-prod-v1 cookie value when stored as base64-<json>""" | |
| token = str(token or "").strip() | |
| if not token.startswith("base64-"): | |
| return None | |
| b64 = token[len("base64-"):] | |
| if not b64: | |
| return None | |
| try: | |
| b64 += "=" * ((4 - (len(b64) % 4)) % 4) | |
| raw = base64.b64decode(b64.encode("utf-8")) | |
| obj = json.loads(raw.decode("utf-8")) | |
| except Exception: | |
| return None | |
| if isinstance(obj, dict): | |
| return obj | |
| return None | |
| def _decode_jwt_payload(token: str) -> Optional[dict]: | |
| """Decode JWT payload""" | |
| token = str(token or "").strip() | |
| if token.count(".") < 2: | |
| return None | |
| parts = token.split(".") | |
| if len(parts) < 2: | |
| return None | |
| payload_b64 = str(parts[1] or "") | |
| if not payload_b64: | |
| return None | |
| try: | |
| payload_b64 += "=" * ((4 - (len(payload_b64) % 4)) % 4) | |
| raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8")) | |
| obj = json.loads(raw.decode("utf-8")) | |
| except Exception: | |
| return None | |
| if isinstance(obj, dict): | |
| return obj | |
| return None | |
| def get_arena_auth_token_expiry_epoch(token: str) -> Optional[int]: | |
| """Best-effort expiry detection for arena-auth tokens""" | |
| session = _decode_arena_auth_session_token(token) | |
| if isinstance(session, dict): | |
| try: | |
| exp = session.get("expires_at") | |
| if exp is not None: | |
| return int(exp) | |
| except Exception: | |
| pass | |
| access = str(session.get("access_token") or "").strip() | |
| if access: | |
| payload = _decode_jwt_payload(access) | |
| if isinstance(payload, dict): | |
| try: | |
| exp = payload.get("exp") | |
| if exp is not None: | |
| return int(exp) | |
| except Exception: | |
| pass | |
| return None | |
| def is_arena_auth_token_expired(token: str, *, skew_seconds: int = 30) -> bool: | |
| """Return True if token is expired or about to expire""" | |
| exp = get_arena_auth_token_expiry_epoch(token) | |
| if exp is None: | |
| return False | |
| now = time.time() | |
| return now >= (float(exp) - float(max(0, skew_seconds))) | |
| def is_probably_valid_arena_auth_token(token: str) -> bool: | |
| """Check if token is a valid arena-auth token format""" | |
| token = str(token or "").strip() | |
| if not token: | |
| return False | |
| if token.startswith("base64-"): | |
| session = _decode_arena_auth_session_token(token) | |
| if not isinstance(session, dict): | |
| return False | |
| access = str(session.get("access_token") or "").strip() | |
| if access.count(".") < 2: | |
| return False | |
| return not is_arena_auth_token_expired(token) | |
| if token.count(".") >= 2: | |
| if len(token) < 100: | |
| return False | |
| return not is_arena_auth_token_expired(token) | |
| return False | |
| # ============================================================ | |
| # HTTP REQUEST HELPERS (WITH PROXY SUPPORT) | |
| # ============================================================ | |
| def get_request_headers_with_token(token: str) -> dict: | |
| """Get request headers with a specific auth token""" | |
| config = get_config() | |
| cf_clearance = str(config.get("cf_clearance") or "").strip() | |
| user_agent = str(config.get("user_agent") or DEFAULT_USER_AGENT).strip() | |
| cookie_parts = [] | |
| if cf_clearance: | |
| cookie_parts.append(f"cf_clearance={cf_clearance}") | |
| if token: | |
| cookie_parts.append(f"arena-auth-prod-v1={token}") | |
| headers = { | |
| "Content-Type": "text/plain;charset=UTF-8", | |
| "Origin": LMARENA_ORIGIN, | |
| "Referer": f"{LMARENA_ORIGIN}/?mode=direct", | |
| "User-Agent": user_agent, | |
| } | |
| if cookie_parts: | |
| headers["Cookie"] = "; ".join(cookie_parts) | |
| return headers | |
| def get_next_auth_token(exclude_tokens: set = None) -> str: | |
| """Get next auth token using round-robin selection""" | |
| config = get_config() | |
| auth_tokens = config.get("auth_tokens", []) | |
| if not isinstance(auth_tokens, list): | |
| auth_tokens = [] | |
| # Normalize and filter tokens | |
| auth_tokens = [str(t or "").strip() for t in auth_tokens if str(t or "").strip()] | |
| # Filter out expired non-base64 tokens | |
| filtered_tokens = [] | |
| for t in auth_tokens: | |
| if t.startswith("base64-"): | |
| filtered_tokens.append(t) | |
| continue | |
| try: | |
| if is_arena_auth_token_expired(t): | |
| continue | |
| except Exception: | |
| pass | |
| filtered_tokens.append(t) | |
| auth_tokens = filtered_tokens | |
| # Back-compat: single token | |
| if not auth_tokens: | |
| single_token = str(config.get("auth_token") or "").strip() | |
| if single_token and not is_arena_auth_token_expired(single_token): | |
| auth_tokens = [single_token] | |
| if not auth_tokens: | |
| raise HTTPException(status_code=500, detail="No auth tokens configured. Please add your arena-auth-prod-v1 token.") | |
| # Filter out excluded tokens | |
| if exclude_tokens: | |
| available_tokens = [t for t in auth_tokens if t not in exclude_tokens] | |
| if not available_tokens: | |
| raise HTTPException(status_code=500, detail="No more auth tokens available") | |
| else: | |
| available_tokens = auth_tokens | |
| # Simple round-robin | |
| global current_token_index | |
| token = available_tokens[current_token_index % len(available_tokens)] | |
| current_token_index = (current_token_index + 1) % len(auth_tokens) | |
| return token | |
| # Global token index | |
| current_token_index = 0 | |
| def get_proxied_http_client() -> httpx.AsyncClient: | |
| """Get httpx client configured with proxy if available""" | |
| config = get_config() | |
| use_proxy = config.get("use_proxy", True) | |
| proxy_url = get_proxy_url() | |
| proxy = None | |
| if use_proxy and proxy_url: | |
| proxy = proxy_url | |
| debug_print(f"Using proxy: {proxy_url}") | |
| return httpx.AsyncClient( | |
| timeout=DEFAULT_REQUEST_TIMEOUT, | |
| proxy=proxy | |
| ) | |
| # ============================================================ | |
| # MODEL MANAGEMENT | |
| # ============================================================ | |
| async def fetch_models_from_lmarena() -> list: | |
| """Fetch models from LMArena website""" | |
| debug_print("Fetching models from LMArena...") | |
| config = get_config() | |
| cf_clearance = str(config.get("cf_clearance") or "").strip() | |
| user_agent = str(config.get("user_agent") or DEFAULT_USER_AGENT).strip() | |
| use_proxy = config.get("use_proxy", True) | |
| cookies = {} | |
| if cf_clearance: | |
| cookies["cf_clearance"] = cf_clearance | |
| try: | |
| proxy_url = get_proxy_url() if use_proxy else None | |
| def _fetch(): | |
| scraper = cloudscraper.create_scraper() | |
| scraper.headers.update({"User-Agent": user_agent}) | |
| proxies = None | |
| if proxy_url: | |
| proxies = {"http": proxy_url, "https": proxy_url} | |
| return scraper.get( | |
| f"{LMARENA_ORIGIN}/?mode=direct", | |
| cookies=cookies if cookies else None, | |
| timeout=30, | |
| proxies=proxies | |
| ) | |
| response = await asyncio.to_thread(_fetch) | |
| if response.status_code != 200: | |
| debug_print(f"Failed to fetch models: HTTP {response.status_code}") | |
| return [] | |
| # Extract models from page content | |
| page_body = response.text | |
| match = re.search(r'{\\"initialModels\\":(\[.*?\]),\\"initialModel[A-Z]Id', page_body, re.DOTALL) | |
| if match: | |
| models_json = match.group(1).encode().decode('unicode_escape') | |
| models = json.loads(models_json) | |
| debug_print(f"Found {len(models)} models") | |
| return models | |
| else: | |
| debug_print("Could not find models in page") | |
| return [] | |
| except Exception as e: | |
| debug_print(f"Error fetching models: {e}") | |
| return [] | |
| # ============================================================ | |
| # CHAT COMPLETION (WITH PROXY) | |
| # ============================================================ | |
| async def stream_chat_completion( | |
| model_id: str, | |
| model_public_name: str, | |
| conversation_id: str, | |
| prompt: str, | |
| auth_token: str, | |
| experimental_attachments: list = None, | |
| recaptcha_token: str = "", | |
| modality: str = "chat" | |
| ): | |
| """Stream chat completion from LMArena via proxy""" | |
| config = get_config() | |
| user_agent = str(config.get("user_agent") or DEFAULT_USER_AGENT).strip() | |
| cf_clearance = str(config.get("cf_clearance") or "").strip() | |
| # Build request payload | |
| payload = { | |
| "id": conversation_id, | |
| "modelId": model_id, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": prompt, | |
| "experimental_attachments": experimental_attachments or [] | |
| } | |
| ], | |
| "modality": modality, | |
| "recaptchaV3Token": recaptcha_token, | |
| } | |
| # Build headers | |
| headers = { | |
| "Content-Type": "text/plain;charset=UTF-8", | |
| "Origin": LMARENA_ORIGIN, | |
| "Referer": f"{LMARENA_ORIGIN}/?mode=direct", | |
| "User-Agent": user_agent, | |
| } | |
| # Add cookies | |
| cookie_parts = [] | |
| if cf_clearance: | |
| cookie_parts.append(f"cf_clearance={cf_clearance}") | |
| if auth_token: | |
| cookie_parts.append(f"arena-auth-prod-v1={auth_token}") | |
| if cookie_parts: | |
| headers["Cookie"] = "; ".join(cookie_parts) | |
| url = f"{LMARENA_ORIGIN}{STREAM_CREATE_EVALUATION_PATH}" | |
| debug_print(f"Streaming request to: {url}") | |
| debug_print(f"Model: {model_public_name}") | |
| debug_print(f"Conversation ID: {conversation_id}") | |
| try: | |
| async with get_proxied_http_client() as client: | |
| async with client.stream( | |
| "POST", | |
| url, | |
| headers=headers, | |
| content=json.dumps(payload), | |
| ) as response: | |
| debug_print(f"Response status: {response.status_code}") | |
| if response.status_code != 200: | |
| error_text = await response.aread() | |
| error_msg = error_text.decode("utf-8", errors="replace") | |
| debug_print(f"Error response: {error_msg[:500]}") | |
| error_data = {"error": f"HTTP {response.status_code}: {error_msg[:200]}"} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| return | |
| # Stream SSE events | |
| async for line in response.aiter_lines(): | |
| if line: | |
| if line.startswith("data: "): | |
| yield line + "\n" | |
| elif line.startswith("0:"): | |
| try: | |
| content = json.loads(line[2:]) | |
| yield f"data: {json.dumps({'choices': [{'delta': {'content': content}}]})}\n\n" | |
| except Exception: | |
| content = line[2:].strip('"') | |
| yield f"data: {json.dumps({'choices': [{'delta': {'content': content}}]})}\n\n" | |
| else: | |
| yield line + "\n" | |
| yield "data: [DONE]\n\n" | |
| except httpx.TimeoutException: | |
| debug_print("Request timed out") | |
| yield f"data: {json.dumps({'error': 'Request timed out'})}\n\n" | |
| except Exception as e: | |
| debug_print(f"Stream error: {e}") | |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" | |
| async def chat_completion_non_stream( | |
| model_id: str, | |
| model_public_name: str, | |
| conversation_id: str, | |
| prompt: str, | |
| auth_token: str, | |
| experimental_attachments: list = None, | |
| recaptcha_token: str = "", | |
| modality: str = "chat" | |
| ) -> dict: | |
| """Non-streaming chat completion""" | |
| full_content = "" | |
| error_msg = None | |
| async for chunk in stream_chat_completion( | |
| model_id, model_public_name, conversation_id, prompt, | |
| auth_token, experimental_attachments, recaptcha_token, modality | |
| ): | |
| if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"): | |
| try: | |
| data = json.loads(chunk[6:].strip()) | |
| if "error" in data: | |
| error_msg = data["error"] | |
| break | |
| if "choices" in data: | |
| for choice in data["choices"]: | |
| delta = choice.get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| full_content += content | |
| except json.JSONDecodeError: | |
| pass | |
| if error_msg: | |
| return {"error": error_msg} | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4()}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model_public_name, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": full_content | |
| }, | |
| "finish_reason": "stop" | |
| } | |
| ] | |
| } | |
| # ============================================================ | |
| # IMAGE HANDLING | |
| # ============================================================ | |
| async def upload_image_to_lmarena(image_data: bytes, mime_type: str, filename: str) -> Optional[tuple]: | |
| """Upload an image to LMArena R2 storage""" | |
| debug_print(f"Image upload not supported in this version: {filename}") | |
| return None | |
| def _coerce_message_content_to_text(content) -> str: | |
| """Coerce message content to plain text""" | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts = [] | |
| for part in content: | |
| if isinstance(part, dict): | |
| if part.get("type") == "text": | |
| parts.append(str(part.get("text", ""))) | |
| elif "text" in part: | |
| parts.append(str(part.get("text", ""))) | |
| elif isinstance(part, str): | |
| parts.append(part) | |
| return "\n".join(parts).strip() | |
| return str(content) | |
| # ============================================================ | |
| # FASTAPI APPLICATION | |
| # ============================================================ | |
| @asynccontextmanager | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan handler""" | |
| global _proxy_running | |
| debug_print("Starting LMArena Bridge...") | |
| # Start TunnelBear proxy | |
| config = get_config() | |
| if config.get("use_proxy", True): | |
| country = config.get("proxy_country", "").strip() or None | |
| if start_tunnelbear_proxy(country): | |
| debug_print("TunnelBear VPN proxy started successfully") | |
| else: | |
| debug_print("Warning: Failed to start TunnelBear proxy, continuing without VPN") | |
| # Try to fetch models on startup | |
| try: | |
| models = await fetch_models_from_lmarena() | |
| if models: | |
| save_models(models) | |
| debug_print(f"Saved {len(models)} models") | |
| except Exception as e: | |
| debug_print(f"Could not fetch models on startup: {e}") | |
| yield | |
| # Cleanup | |
| stop_tunnelbear_proxy() | |
| app = FastAPI( | |
| title="LMArena Bridge", | |
| description="OpenAI-compatible API for LM Arena with VPN proxy support", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origin_regex=r"https?://.*", | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Usage stats | |
| model_usage_stats = defaultdict(int) | |
| api_key_usage = defaultdict(list) | |
| # API Key dependency | |
| async def rate_limit_api_key(request: Request): | |
| """Rate limit API key""" | |
| config = get_config() | |
| api_keys = config.get("api_keys", []) | |
| # If no API keys configured, allow anonymous access | |
| if not api_keys: | |
| return {"key": "anonymous", "name": "Anonymous", "rpm": 9999} | |
| auth_header = request.headers.get("Authorization", "") | |
| api_key_str = None | |
| if auth_header.startswith("Bearer "): | |
| api_key_str = auth_header[7:].strip() | |
| # If keys configured but none provided, use first key | |
| if not api_key_str and api_keys: | |
| api_key_str = api_keys[0]["key"] | |
| key_data = next((k for k in api_keys if k["key"] == api_key_str), None) | |
| if not key_data: | |
| raise HTTPException(status_code=401, detail="Invalid API Key") | |
| # Rate limiting | |
| rate_limit = key_data.get("rpm", 60) | |
| current_time = time.time() | |
| # Clean up old timestamps | |
| api_key_usage[api_key_str] = [t for t in api_key_usage[api_key_str] if current_time - t < 60] | |
| if len(api_key_usage[api_key_str]) >= rate_limit: | |
| raise HTTPException(status_code=429, detail="Rate limit exceeded") | |
| api_key_usage[api_key_str].append(current_time) | |
| return key_data | |
| # Health check | |
| @app.get("/api/v1/health") | |
| async def health_check(): | |
| """Health check endpoint""" | |
| models = get_models() | |
| config = get_config() | |
| has_cf_clearance = bool(config.get("cf_clearance")) | |
| has_auth_tokens = bool(config.get("auth_tokens") or config.get("auth_token")) | |
| has_models = len(models) > 0 | |
| proxy_active = _proxy_running | |
| status = "healthy" if (has_auth_tokens and has_models) else "degraded" | |
| return { | |
| "status": status, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "checks": { | |
| "cf_clearance": has_cf_clearance, | |
| "auth_tokens": has_auth_tokens, | |
| "models_loaded": has_models, | |
| "model_count": len(models), | |
| "vpn_proxy_active": proxy_active | |
| } | |
| } | |
| # List models | |
| @app.get("/api/v1/models") | |
| async def list_models(api_key: dict = Depends(rate_limit_api_key)): | |
| """List available models""" | |
| models = get_models() | |
| # Filter for valid models | |
| valid_models = [ | |
| m for m in models | |
| if (m.get('capabilities', {}).get('outputCapabilities', {}).get('text') | |
| or m.get('capabilities', {}).get('outputCapabilities', {}).get('search') | |
| or m.get('capabilities', {}).get('outputCapabilities', {}).get('image')) | |
| and m.get('organization') | |
| ] | |
| return { | |
| "object": "list", | |
| "data": [ | |
| { | |
| "id": model.get("publicName"), | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": model.get("organization", "lmarena") | |
| } | |
| for model in valid_models | |
| if model.get("publicName") | |
| ] | |
| } | |
| # Chat completions | |
| @app.post("/api/v1/chat/completions") | |
| async def api_chat_completions(request: Request, api_key: dict = Depends(rate_limit_api_key)): | |
| """Chat completions endpoint""" | |
| try: | |
| body = await request.json() | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON in request body") | |
| model_public_name = body.get("model") | |
| messages = body.get("messages", []) | |
| stream = body.get("stream", False) | |
| if not model_public_name: | |
| raise HTTPException(status_code=400, detail="Missing 'model' in request body") | |
| if not messages: | |
| raise HTTPException(status_code=400, detail="Missing 'messages' in request body") | |
| # Find model | |
| models = get_models() | |
| model_id = None | |
| model_capabilities = {} | |
| for m in models: | |
| if m.get("publicName") == model_public_name: | |
| model_id = m.get("id") | |
| model_capabilities = m.get("capabilities", {}) | |
| break | |
| if not model_id: | |
| raise HTTPException(status_code=404, detail=f"Model '{model_public_name}' not found") | |
| # Determine modality | |
| if model_capabilities.get("outputCapabilities", {}).get("image"): | |
| modality = "image" | |
| elif model_capabilities.get("outputCapabilities", {}).get("search"): | |
| modality = "search" | |
| else: | |
| modality = "chat" | |
| # Update usage stats | |
| model_usage_stats[model_public_name] += 1 | |
| config = get_config() | |
| config["usage_stats"] = dict(model_usage_stats) | |
| save_config(config) | |
| # Extract system prompt | |
| system_prompt = "" | |
| system_messages = [m for m in messages if m.get("role") == "system"] | |
| if system_messages: | |
| system_prompt = "\n\n".join([ | |
| _coerce_message_content_to_text(m.get("content", "")) | |
| for m in system_messages | |
| ]) | |
| # Get last user message | |
| last_message = messages[-1] | |
| prompt = _coerce_message_content_to_text(last_message.get("content", "")) | |
| if system_prompt: | |
| prompt = f"{system_prompt}\n\n{prompt}" | |
| # Generate conversation ID | |
| conversation_id = str(uuid.uuid4()) | |
| # Get auth token | |
| try: | |
| auth_token = get_next_auth_token() | |
| except HTTPException: | |
| raise HTTPException(status_code=500, detail="No auth tokens configured. Please add your arena-auth-prod-v1 token in the Settings tab.") | |
| if stream: | |
| # Return streaming response | |
| return StreamingResponse( | |
| stream_chat_completion( | |
| model_id, model_public_name, conversation_id, prompt, | |
| auth_token, [], "", modality | |
| ), | |
| media_type="text/event-stream" | |
| ) | |
| else: | |
| # Return non-streaming response | |
| result = await chat_completion_non_stream( | |
| model_id, model_public_name, conversation_id, prompt, | |
| auth_token, [], "", modality | |
| ) | |
| if "error" in result: | |
| raise HTTPException(status_code=500, detail=result["error"]) | |
| return result | |
| # ============================================================ | |
| # GRADIO INTERFACE | |
| # ============================================================ | |
| def get_model_list(): | |
| """Get list of available models""" | |
| models = get_models() | |
| valid_models = [ | |
| m for m in models | |
| if (m.get('capabilities', {}).get('outputCapabilities', {}).get('text') | |
| or m.get('capabilities', {}).get('outputCapabilities', {}).get('search')) | |
| and m.get('organization') | |
| and m.get('publicName') | |
| ] | |
| return sorted([m.get('publicName') for m in valid_models]) | |
| def update_config(auth_token, cf_clearance, user_agent, use_proxy, proxy_country): | |
| """Update configuration""" | |
| config = get_config() | |
| if auth_token.strip(): | |
| tokens = [t.strip() for t in auth_token.split("\n") if t.strip()] | |
| config["auth_tokens"] = tokens | |
| if cf_clearance.strip(): | |
| config["cf_clearance"] = cf_clearance.strip() | |
| if user_agent.strip(): | |
| config["user_agent"] = user_agent.strip() | |
| config["use_proxy"] = use_proxy | |
| config["proxy_country"] = proxy_country.strip() | |
| save_config(config) | |
| return "Configuration saved!" | |
| def toggle_proxy(use_proxy, proxy_country): | |
| """Toggle proxy on/off""" | |
| config = get_config() | |
| config["use_proxy"] = use_proxy | |
| config["proxy_country"] = proxy_country.strip() | |
| save_config(config) | |
| if use_proxy: | |
| country = proxy_country.strip() or None | |
| if start_tunnelbear_proxy(country): | |
| return "✅ VPN Proxy started successfully!" | |
| else: | |
| return "❌ Failed to start VPN Proxy. Check logs." | |
| else: | |
| stop_tunnelbear_proxy() | |
| return "🔴 VPN Proxy stopped." | |
| def get_proxy_status(): | |
| """Get current proxy status""" | |
| global _proxy_running, _proxy_client | |
| if _proxy_running and _proxy_client: | |
| return f"🟢 Active - Region: {_proxy_client.region_name} ({_proxy_client.country_iso})\nSOCKS5: {_socks5_host}:{_socks5_port}" | |
| return "🔴 Inactive" | |
| async def refresh_models(): | |
| """Refresh models from LMArena""" | |
| try: | |
| models = await fetch_models_from_lmarena() | |
| if models: | |
| save_models(models) | |
| return f"Successfully loaded {len(models)} models!" | |
| else: | |
| return "No models found. Make sure cf_clearance cookie is valid." | |
| except Exception as e: | |
| return f"Error refreshing models: {str(e)}" | |
| async def chat_with_model(model_name, message, history): | |
| """Chat with a model""" | |
| if not model_name: | |
| return history, "Please select a model first." | |
| if not message.strip(): | |
| return history, "Please enter a message." | |
| config = get_config() | |
| if not config.get("auth_tokens") and not config.get("auth_token"): | |
| return history, "No auth tokens configured. Please add your arena-auth-prod-v1 token in Settings." | |
| try: | |
| # Build messages from history | |
| messages = [] | |
| for user_msg, assistant_msg in history: | |
| messages.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| messages.append({"role": "user", "content": message}) | |
| # Find model | |
| models = get_models() | |
| model_id = None | |
| model_capabilities = {} | |
| for m in models: | |
| if m.get("publicName") == model_name: | |
| model_id = m.get("id") | |
| model_capabilities = m.get("capabilities", {}) | |
| break | |
| if not model_id: | |
| return history, f"Model '{model_name}' not found." | |
| # Determine modality | |
| if model_capabilities.get("outputCapabilities", {}).get("image"): | |
| modality = "image" | |
| elif model_capabilities.get("outputCapabilities", {}).get("search"): | |
| modality = "search" | |
| else: | |
| modality = "chat" | |
| # Get auth token | |
| auth_token = get_next_auth_token() | |
| # Run chat completion | |
| result = await chat_completion_non_stream( | |
| model_id, model_name, str(uuid.uuid4()), message, | |
| auth_token, [], "", modality | |
| ) | |
| if "error" in result: | |
| return history, f"Error: {result['error']}" | |
| # Extract response | |
| response = "" | |
| if "choices" in result and result["choices"]: | |
| response = result["choices"][0].get("message", {}).get("content", "") | |
| # Update history | |
| history.append((message, response)) | |
| return history, "" | |
| except Exception as e: | |
| return history, f"Error: {str(e)}" | |
| def create_gradio_interface(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="LMArena Bridge", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown( | |
| """ | |
| # 🚀 LMArena Bridge with VPN Proxy | |
| A bridge to interact with LM Arena models via an OpenAI-compatible API. | |
| **Now with TunnelBear VPN proxy for anonymous access!** | |
| **Quick Start:** | |
| 1. VPN Proxy starts automatically (check Proxy tab for status) | |
| 2. Go to **Settings** tab and add your `arena-auth-prod-v1` token | |
| 3. Click **Refresh Models** to load available models | |
| 4. Start chatting in the **Chat** tab! | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("💬 Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| model_dropdown = gr.Dropdown( | |
| label="Model", | |
| choices=get_model_list(), | |
| value=None, | |
| interactive=True | |
| ) | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=400, | |
| show_copy_button=True | |
| ) | |
| with gr.Row(): | |
| message_input = gr.Textbox( | |
| label="Message", | |
| placeholder="Type your message here...", | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("Send", variant="primary", scale=1) | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| refresh_models_btn = gr.Button("🔄 Refresh Models", variant="secondary") | |
| async def on_refresh(): | |
| status = await refresh_models() | |
| return status, gr.Dropdown(choices=get_model_list()) | |
| refresh_models_btn.click( | |
| fn=on_refresh, | |
| outputs=[status_output, model_dropdown] | |
| ) | |
| send_btn.click( | |
| fn=chat_with_model, | |
| inputs=[model_dropdown, message_input, chatbot], | |
| outputs=[chatbot, status_output] | |
| ).then( | |
| fn=lambda: "", | |
| outputs=message_input | |
| ) | |
| message_input.submit( | |
| fn=chat_with_model, | |
| inputs=[model_dropdown, message_input, chatbot], | |
| outputs=[chatbot, status_output] | |
| ).then( | |
| fn=lambda: "", | |
| outputs=message_input | |
| ) | |
| with gr.TabItem("🔒 VPN Proxy"): | |
| config = get_config() | |
| gr.Markdown("### TunnelBear VPN Proxy Status") | |
| proxy_status_display = gr.Textbox( | |
| label="Proxy Status", | |
| value=get_proxy_status(), | |
| interactive=False | |
| ) | |
| refresh_proxy_btn = gr.Button("🔄 Refresh Status") | |
| refresh_proxy_btn.click(fn=get_proxy_status, outputs=proxy_status_display) | |
| gr.Markdown("### Proxy Settings") | |
| use_proxy_check = gr.Checkbox( | |
| label="Use VPN Proxy", | |
| value=config.get("use_proxy", True) | |
| ) | |
| proxy_country_input = gr.Textbox( | |
| label="Proxy Country (optional)", | |
| placeholder="US, CA, DE, GB, etc. Leave empty for auto-select", | |
| value=config.get("proxy_country", "") | |
| ) | |
| toggle_proxy_btn = gr.Button("Apply Proxy Settings", variant="primary") | |
| proxy_result = gr.Textbox(label="Result", interactive=False) | |
| toggle_proxy_btn.click( | |
| fn=toggle_proxy, | |
| inputs=[use_proxy_check, proxy_country_input], | |
| outputs=proxy_result | |
| ).then( | |
| fn=get_proxy_status, | |
| outputs=proxy_status_display | |
| ) | |
| gr.Markdown( | |
| """ | |
| ### About the VPN Proxy | |
| This bridge uses TunnelBear's free VPN service to route requests anonymously. | |
| - **SOCKS5 Proxy**: `127.0.0.1:1080` | |
| - **Auto-select**: Chooses closest server automatically | |
| - **Country selection**: You can specify a country code (US, CA, DE, etc.) | |
| The VPN proxy helps avoid rate limits and IP blocks from LMArena. | |
| """ | |
| ) | |
| with gr.TabItem("⚙️ Settings"): | |
| config = get_config() | |
| gr.Markdown("### Authentication") | |
| gr.Markdown( | |
| """ | |
| **Auth Tokens:** Add your `arena-auth-prod-v1` cookie value (one per line). | |
| This is required to use the bridge. Get it from lmarena.ai DevTools > Application > Cookies. | |
| """ | |
| ) | |
| auth_token_input = gr.Textbox( | |
| label="Auth Tokens (arena-auth-prod-v1)", | |
| placeholder="base64-...", | |
| value="\n".join(config.get("auth_tokens", [])), | |
| lines=5 | |
| ) | |
| gr.Markdown( | |
| """ | |
| **Cloudflare Clearance:** Optional but recommended. Get `cf_clearance` cookie from lmarena.ai. | |
| """ | |
| ) | |
| cf_clearance_input = gr.Textbox( | |
| label="Cloudflare Clearance (cf_clearance)", | |
| placeholder="cf_clearance value...", | |
| value=config.get("cf_clearance", "") | |
| ) | |
| gr.Markdown("### Advanced") | |
| user_agent_input = gr.Textbox( | |
| label="User-Agent", | |
| value=config.get("user_agent", DEFAULT_USER_AGENT) | |
| ) | |
| save_btn = gr.Button("💾 Save Configuration", variant="primary") | |
| config_status = gr.Textbox(label="Status", interactive=False) | |
| save_btn.click( | |
| fn=update_config, | |
| inputs=[auth_token_input, cf_clearance_input, user_agent_input, use_proxy_check, proxy_country_input], | |
| outputs=config_status | |
| ) | |
| gr.Markdown("### API Information") | |
| gr.Markdown( | |
| f""" | |
| **API Base URL:** `http://localhost:{PORT}/api/v1` | |
| **OpenAI-Compatible Endpoints:** | |
| - `GET /api/v1/models` - List available models | |
| - `POST /api/v1/chat/completions` - Chat completions | |
| - `GET /api/v1/health` - Health check (includes proxy status) | |
| """ | |
| ) | |
| with gr.TabItem("📊 Stats"): | |
| config = get_config() | |
| stats = config.get("usage_stats", {}) | |
| gr.Markdown("### Usage Statistics") | |
| if stats: | |
| stats_text = "\n".join([f"- **{model}:** {count} requests" for model, count in sorted(stats.items(), key=lambda x: x[1], reverse=True)]) | |
| gr.Markdown(stats_text) | |
| else: | |
| gr.Markdown("No usage statistics yet. Start chatting to see stats!") | |
| refresh_stats_btn = gr.Button("🔄 Refresh Stats") | |
| stats_display = gr.Markdown() | |
| def refresh_stats(): | |
| config = get_config() | |
| stats = config.get("usage_stats", {}) | |
| if stats: | |
| return "\n".join([f"- **{model}:** {count} requests" for model, count in sorted(stats.items(), key=lambda x: x[1], reverse=True)]) | |
| return "No usage statistics yet." | |
| refresh_stats_btn.click(fn=refresh_stats, outputs=stats_display) | |
| return interface | |
| # ============================================================ | |
| # MAIN ENTRY POINT | |
| # ============================================================ | |
| def run_both(): | |
| """Run both FastAPI and Gradio UI cooperatively on the same port""" | |
| interface = create_gradio_interface() | |
| # Safely mounts Gradio to run simultaneously alongside the API endpoints. | |
| app_with_ui = gr.mount_gradio_app(app, interface, path="/") | |
| uvicorn.run( | |
| app_with_ui, | |
| host=HOST, | |
| port=PORT, | |
| log_level="info" | |
| ) | |
| def run_fastapi(): | |
| """Run pure FastAPI API without the UI overlay""" | |
| uvicorn.run( | |
| app, | |
| host=HOST, | |
| port=PORT, | |
| log_level="info" | |
| ) | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="LMArena Bridge with VPN Proxy") | |
| parser.add_argument("--mode", choices=["fastapi", "both"], default="both", | |
| help="Run mode: fastapi (API only) or both (API + Gradio UI). Default is both.") | |
| parser.add_argument("--port", type=int, default=PORT, help="Port to run on") | |
| parser.add_argument("--host", default=HOST, help="Host to bind to") | |
| parser.add_argument("--debug", action="store_true", help="Enable debug mode") | |
| parser.add_argument("--no-proxy", action="store_true", help="Disable VPN proxy") | |
| args = parser.parse_args() | |
| if args.debug: | |
| DEBUG = True | |
| if args.port: | |
| PORT = args.port | |
| if args.host: | |
| HOST = args.host | |
| if args.no_proxy: | |
| config = get_config() | |
| config["use_proxy"] = False | |
| save_config(config) | |
| if args.mode == "fastapi": | |
| run_fastapi() | |
| else: | |
| run_both() | |
| PYTHON_EOF | |
| # Expose port | |
| EXPOSE 7860 | |
| # Run the application | |
| CMD ["python", "/app/app.py"] |