lmarena-bridge / Dockerfile
overwrite69's picture
Update Dockerfile
57f5cba verified
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PORT=7860
ENV HOST=0.0.0.0
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Install Python dependencies (with fixes for Pydantic schema bugs and SOCKS proxy support)
RUN pip install --no-cache-dir \
fastapi==0.109.0 \
uvicorn==0.27.0 \
"httpx[socks]==0.26.0" \
cloudscraper==1.2.71 \
"requests[socks]==2.31.0" \
gradio==4.44.0 \
"huggingface_hub<0.27.0" \
"pydantic<2.10"
# Create the application file
RUN cat > /app/app.py << 'PYTHON_EOF'
#!/usr/bin/env python3
"""
LMArena Bridge - Single File Version for Hugging Face Spaces
A bridge to interact with LM Arena providing an OpenAI-compatible API.
This version:
- Uses Gradio for the web interface
- Integrates TunnelBear VPN proxy for anonymity
- Allows manual input of auth tokens and cf_clearance cookies
- Uses httpx/cloudscraper for HTTP requests via SOCKS5 proxy
"""
import asyncio
import json
import os
import re
import time
import uuid
import base64
import secrets
import mimetypes
import threading
import socket
import socketserver
import ssl
import struct
import logging
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List, Any
from contextlib import asynccontextmanager
# FastAPI and web server
import uvicorn
from fastapi import FastAPI, HTTPException, Request, Depends, Form
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, HTMLResponse, RedirectResponse
# HTTP clients
import httpx
import cloudscraper
import requests
# Gradio for HF Spaces UI
import gradio as gr
# ============================================================
# TUNNELBEAR VPN PROXY (Embedded from proxy.py)
# ============================================================
# TunnelBear credentials (hardcoded)
TB_EMAIL = "overwrite249@gmail.com"
TB_PASSWORD = "zaLV3uDsS_E+6VN"
# API endpoints
DASHBOARD_API = "https://prod-api-dashboard.tunnelbear.com/dashboard/web"
TB_API = "https://api.tunnelbear.com"
PB_API = "https://api.polargrizzly.com"
URL_TOKEN = f"{DASHBOARD_API}/v2/token"
URL_TOKEN_COOKIE = f"{DASHBOARD_API}/v2/tokenCookie"
URL_TB_COOKIE_TOKEN = f"{TB_API}/v2/cookieToken"
URL_PB_AUTH = f"{PB_API}/auth"
URL_PB_USER = f"{PB_API}/user"
URL_PB_VPNS = f"{PB_API}/vpns"
APP_VERSION = "3.6.1"
APP_ID = "com.tunnelbear.browser"
DEFAULT_SOCKS5_HOST = "127.0.0.1"
DEFAULT_SOCKS5_PORT = 1080
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
proxy_log = logging.getLogger("TBProxy")
class TunnelBearClient:
"""TunnelBear API client for authentication and server discovery"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.5",
"Content-Type": "application/json",
})
self.device_id = f"browser-{uuid.uuid4()}"
self.dashboard_token = ""
self.tb_access_token = ""
self.pb_auth_token = ""
self.vpn_token = ""
self.user_data = {}
self.servers = []
self.region_name = ""
self.country_iso = ""
def login(self, email: str, password: str) -> bool:
try:
resp = self.session.post(
URL_TOKEN,
json={
"username": email,
"password": password,
"grant_type": "password",
"device": self.device_id,
},
)
if resp.status_code != 200:
proxy_log.error(f"Login failed (HTTP {resp.status_code})")
return False
data = resp.json()
self.dashboard_token = data.get("access_token", "")
if not self.dashboard_token:
return False
proxy_log.info(f"Logged in. Token: {self.dashboard_token[:30]}...")
return True
except Exception as e:
proxy_log.error(f"Login error: {e}")
return False
def get_session_cookie(self) -> bool:
try:
resp = self.session.post(
URL_TOKEN_COOKIE,
json={},
headers={"Authorization": f"Bearer {self.dashboard_token}"},
)
return resp.status_code == 200
except Exception:
return False
def get_cookie_token(self) -> bool:
try:
resp = self.session.post(
URL_TB_COOKIE_TOKEN,
headers={
"Authorization": f"Bearer {self.dashboard_token}",
"device": self.device_id,
"tunnelbear-app-id": APP_ID,
"tunnelbear-app-version": APP_VERSION,
"tunnelbear-platform": "Firefox",
"tunnelbear-platform-version": "Firefox",
},
)
if resp.status_code != 200:
return False
data = resp.json()
self.tb_access_token = data.get("access_token", "")
return bool(self.tb_access_token)
except Exception:
return False
def authenticate_polarbear(self) -> bool:
try:
resp = self.session.post(
URL_PB_AUTH,
json={"partner": "tunnelbear", "token": self.tb_access_token},
)
if resp.status_code != 200:
return False
auth_header = resp.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
self.pb_auth_token = auth_header[7:]
return bool(self.pb_auth_token)
return False
except Exception:
return False
def get_user(self) -> bool:
try:
resp = self.session.get(
URL_PB_USER,
headers={"Authorization": f"Bearer {self.pb_auth_token}"},
)
if resp.status_code != 200:
return False
data = resp.json()
self.user_data = data
self.vpn_token = data.get("vpn_token", "")
return bool(self.vpn_token)
except Exception:
return False
def get_servers(self, country: str = None) -> bool:
try:
url = f"{URL_PB_VPNS}/countries/{country}" if country else URL_PB_VPNS
resp = self.session.get(
url,
headers={"Authorization": f"Bearer {self.pb_auth_token}"},
)
if resp.status_code != 200:
return False
data = resp.json()
vpns = data.get("vpns", [])
if not vpns:
return False
self.region_name = data.get("region_name", "?")
self.country_iso = data.get("country_iso", "?")
self.servers = []
for v in vpns:
if "url" in v:
self.servers.append({
"url": v["url"],
"protocol": v.get("protocol", "udp"),
})
proxy_log.info(f"Found {len(self.servers)} servers in {self.region_name}")
return bool(self.servers)
except Exception:
return False
def pick_best_server(self) -> dict:
tcp_servers = [s for s in self.servers if s["protocol"] == "tcp"]
if tcp_servers:
return tcp_servers[0]
return self.servers[0] if self.servers else None
def authenticate(self, email: str, password: str, country: str = None) -> bool:
return (
self.login(email, password) and
self.get_session_cookie() and
self.get_cookie_token() and
self.authenticate_polarbear() and
self.get_user() and
self.get_servers(country)
)
class HTTPSConnectProxy:
"""HTTPS CONNECT proxy through TunnelBear's infrastructure"""
def __init__(self, server_url: str, vpn_token: str):
self.server_host = server_url
self.server_port = 8080
self.vpn_token = vpn_token
self._ssl_ctx = ssl.create_default_context()
self._ssl_ctx.check_hostname = False
self._ssl_ctx.verify_mode = ssl.CERT_NONE
def _make_ssl_socket(self) -> ssl.SSLSocket:
raw_sock = socket.create_connection(
(self.server_host, self.server_port), timeout=30
)
try:
return self._ssl_ctx.wrap_socket(
raw_sock, server_hostname=self.server_host
)
except Exception:
raw_sock.close()
raise
def _read_http_response(self, sock: ssl.SSLSocket) -> tuple:
response = b""
while b"\r\n\r\n" not in response:
chunk = sock.recv(4096)
if not chunk:
raise ConnectionError("Proxy closed connection")
response += chunk
header_end = response.index(b"\r\n\r\n")
header_data = response[:header_end].decode("utf-8", errors="ignore")
body_remainder = response[header_end + 4:]
lines = header_data.split("\r\n")
status_line = lines[0]
parts = status_line.split(" ", 2)
status_code = int(parts[1]) if len(parts) >= 2 else 0
headers = {}
for line in lines[1:]:
if ":" in line:
key, val = line.split(":", 1)
headers[key.strip().lower()] = val.strip()
return status_code, headers, body_remainder
def connect(self, target_host: str, target_port: int) -> ssl.SSLSocket:
auth_b64 = base64.b64encode(
f"{self.vpn_token}:{self.vpn_token}".encode()
).decode()
sock = self._make_ssl_socket()
request = (
f"CONNECT {target_host}:{target_port} HTTP/1.1\r\n"
f"Host: {target_host}:{target_port}\r\n"
f"Proxy-Authorization: Basic {auth_b64}\r\n"
f"User-Agent: TunnelBear/{APP_VERSION}\r\n"
f"Proxy-Connection: Keep-Alive\r\n"
f"\r\n"
)
sock.sendall(request.encode())
status_code, headers, remainder = self._read_http_response(sock)
if status_code == 200:
return sock
sock.close()
raise ConnectionError(f"CONNECT failed: HTTP {status_code}")
class SOCKS5Handler(socketserver.StreamRequestHandler):
"""SOCKS5 server handler"""
proxy = None
def handle(self):
try:
self._handle_socks5()
except Exception:
pass
finally:
try:
self.connection.close()
except OSError:
pass
def _handle_socks5(self):
conn = self.connection
ver_nmethods = self._recv_exact(conn, 2)
if ver_nmethods[0] != 0x05:
return
n_methods = ver_nmethods[1]
methods = self._recv_exact(conn, n_methods)
if 0x00 in methods:
conn.sendall(b"\x05\x00")
elif 0x02 in methods:
conn.sendall(b"\x05\x02")
auth_data = self._recv_exact(conn, 2)
ulen = auth_data[1]
self._recv_exact(conn, ulen)
plen = self._recv_exact(conn, 1)[0]
self._recv_exact(conn, plen)
conn.sendall(b"\x01\x00")
else:
conn.sendall(b"\x05\xFF")
return
header = self._recv_exact(conn, 4)
_, cmd, _, addr_type = header
if cmd != 0x01:
self._reply(conn, 0x07)
return
if addr_type == 0x01:
target_host = socket.inet_ntoa(self._recv_exact(conn, 4))
elif addr_type == 0x03:
dlen = self._recv_exact(conn, 1)[0]
target_host = self._recv_exact(conn, dlen).decode()
elif addr_type == 0x04:
target_host = socket.inet_ntop(socket.AF_INET6, self._recv_exact(conn, 16))
else:
self._reply(conn, 0x08)
return
target_port = struct.unpack("!H", self._recv_exact(conn, 2))[0]
proxy_log.debug(f"SOCKS5 -> {target_host}:{target_port}")
try:
remote = self.proxy.connect(target_host, target_port)
except Exception:
self._reply(conn, 0x05)
return
self._reply(conn, 0x00)
self._relay(conn, remote)
def _relay(self, a, b):
a.settimeout(300)
b.settimeout(300)
def pump(src, dst):
try:
while True:
data = src.recv(8192)
if not data:
break
dst.sendall(data)
except (socket.timeout, OSError):
pass
finally:
try:
dst.shutdown(socket.SHUT_WR)
except OSError:
pass
t1 = threading.Thread(target=pump, args=(a, b), daemon=True)
t2 = threading.Thread(target=pump, args=(b, a), daemon=True)
t1.start()
t2.start()
t1.join(timeout=300)
t2.join(timeout=300)
def _reply(self, conn, code):
conn.sendall(b"\x05" + bytes([code, 0, 1, 0, 0, 0, 0, 0, 0, 0]))
@staticmethod
def _recv_exact(sock, n):
buf = bytearray()
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Connection closed")
buf.extend(chunk)
return bytes(buf)
class ThreadedSOCKS5Server(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
# Global proxy state
_proxy_client = None
_proxy_server = None
_proxy_running = False
_socks5_port = DEFAULT_SOCKS5_PORT
_socks5_host = DEFAULT_SOCKS5_HOST
def start_tunnelbear_proxy(country: str = None) -> bool:
"""Start TunnelBear VPN SOCKS5 proxy"""
global _proxy_client, _proxy_server, _proxy_running, _socks5_port
if _proxy_running:
proxy_log.info("Proxy already running")
return True
try:
_proxy_client = TunnelBearClient()
if not _proxy_client.authenticate(TB_EMAIL, TB_PASSWORD, country):
proxy_log.error("Failed to authenticate with TunnelBear")
return False
best = _proxy_client.pick_best_server()
if not best:
proxy_log.error("No servers available")
return False
server_url = best["url"]
server_proto = best["protocol"]
proxy_log.info(f"Using server: {server_url}:8080 ({server_proto})")
proxy = HTTPSConnectProxy(server_url, _proxy_client.vpn_token)
SOCKS5Handler.proxy = proxy
_proxy_server = ThreadedSOCKS5Server((_socks5_host, _socks5_port), SOCKS5Handler)
proxy_thread = threading.Thread(target=_proxy_server.serve_forever, daemon=True)
proxy_thread.start()
_proxy_running = True
proxy_log.info(f"SOCKS5 proxy running at {_socks5_host}:{_socks5_port}")
return True
except Exception as e:
proxy_log.error(f"Failed to start proxy: {e}")
return False
def stop_tunnelbear_proxy():
"""Stop TunnelBear VPN SOCKS5 proxy"""
global _proxy_server, _proxy_running
if _proxy_server:
try:
_proxy_server.shutdown()
except Exception:
pass
_proxy_server = None
_proxy_running = False
proxy_log.info("Proxy stopped")
def get_proxy_url() -> Optional[str]:
"""Get SOCKS5 proxy URL if proxy is running"""
global _proxy_running
if _proxy_running:
return f"socks5://{_socks5_host}:{_socks5_port}"
return None
# ============================================================
# CONSTANTS
# ============================================================
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
PORT = int(os.environ.get("PORT", 7860))
HOST = os.environ.get("HOST", "0.0.0.0")
# reCAPTCHA constants
RECAPTCHA_SITEKEY = "6Led_uYrAAAAAKjxDIF58fgFtX3t8loNAK85bW9I"
RECAPTCHA_ACTION = "chat_submit"
RECAPTCHA_V2_SITEKEY = "6Ld7ePYrAAAAAB34ovoFoDau1fqCJ6IyOjFEQaMn"
TURNSTILE_SITEKEY = "0x4AAAAAAA65vWDmG-O_lPtT"
# LMArena URLs
LMARENA_ORIGIN = "https://lmarena.ai"
ARENA_ORIGIN = "https://arena.ai"
STREAM_CREATE_EVALUATION_PATH = "/nextjs-api/stream/create-evaluation"
# Timeouts
DEFAULT_REQUEST_TIMEOUT = 120
DEFAULT_RATE_LIMIT_RPM = 60
# File paths (in HF Spaces, use /tmp for persistence)
CONFIG_FILE = "/tmp/config.json"
MODELS_FILE = "/tmp/models.json"
# Cookie names
CF_CLEARANCE_COOKIE = "cf_clearance"
ARENA_AUTH_COOKIE = "arena-auth-prod-v1"
# Image support
MAX_IMAGE_SIZE_BYTES = 10 * 1024 * 1024 # 10MB
SUPPORTED_IMAGE_MIME_TYPES = {
"image/png",
"image/jpeg",
"image/gif",
"image/webp",
"image/svg+xml",
}
# Default User-Agent
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
# ============================================================
# UTILITY FUNCTIONS
# ============================================================
def debug_print(*args, **kwargs):
"""Print debug messages only if DEBUG is True"""
if DEBUG:
print(*args, **kwargs)
def safe_print(*args, **kwargs):
"""Print without crashing on encoding issues"""
try:
print(*args, **kwargs)
except UnicodeEncodeError:
import sys
text = " ".join(str(a) for a in args)
print(text.encode("utf-8", errors="replace").decode("utf-8"))
def uuid7():
"""Generate a UUIDv7 using Unix epoch"""
timestamp_ms = int(time.time() * 1000)
rand_a = secrets.randbits(12)
rand_b = secrets.randbits(62)
uuid_int = timestamp_ms << 80
uuid_int |= (0x7000 | rand_a) << 64
uuid_int |= (0x8000000000000000 | rand_b)
hex_str = f"{uuid_int:032x}"
return f"{hex_str[0:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:32]}"
# ============================================================
# CONFIG MANAGEMENT
# ============================================================
def get_default_config() -> dict:
"""Get default configuration values"""
return {
"password": "admin",
"auth_token": "",
"auth_tokens": [],
"cf_clearance": "",
"api_keys": [],
"usage_stats": {},
"user_agent": DEFAULT_USER_AGENT,
"use_proxy": True,
"proxy_country": "",
}
def get_config() -> dict:
"""Load configuration from file with defaults"""
try:
with open(CONFIG_FILE, "r") as f:
config = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
config = get_default_config()
except Exception:
config = get_default_config()
# Ensure default keys exist
config.setdefault("password", "admin")
config.setdefault("auth_token", "")
config.setdefault("auth_tokens", [])
config.setdefault("cf_clearance", "")
config.setdefault("api_keys", [])
config.setdefault("usage_stats", {})
config.setdefault("user_agent", DEFAULT_USER_AGENT)
config.setdefault("use_proxy", True)
config.setdefault("proxy_country", "")
return config
def save_config(config: dict):
"""Save configuration to file"""
try:
with open(CONFIG_FILE, "w") as f:
json.dump(config, f, indent=4)
except Exception as e:
debug_print(f"Error saving config: {e}")
def get_models() -> list:
"""Load models from file"""
try:
with open(MODELS_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
except Exception:
return []
def save_models(models: list):
"""Save models to file"""
try:
with open(MODELS_FILE, "w") as f:
json.dump(models, f, indent=2)
except Exception as e:
debug_print(f"Error saving models: {e}")
# ============================================================
# AUTH TOKEN MANAGEMENT
# ============================================================
def _decode_arena_auth_session_token(token: str) -> Optional[dict]:
"""Decode arena-auth-prod-v1 cookie value when stored as base64-<json>"""
token = str(token or "").strip()
if not token.startswith("base64-"):
return None
b64 = token[len("base64-"):]
if not b64:
return None
try:
b64 += "=" * ((4 - (len(b64) % 4)) % 4)
raw = base64.b64decode(b64.encode("utf-8"))
obj = json.loads(raw.decode("utf-8"))
except Exception:
return None
if isinstance(obj, dict):
return obj
return None
def _decode_jwt_payload(token: str) -> Optional[dict]:
"""Decode JWT payload"""
token = str(token or "").strip()
if token.count(".") < 2:
return None
parts = token.split(".")
if len(parts) < 2:
return None
payload_b64 = str(parts[1] or "")
if not payload_b64:
return None
try:
payload_b64 += "=" * ((4 - (len(payload_b64) % 4)) % 4)
raw = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
obj = json.loads(raw.decode("utf-8"))
except Exception:
return None
if isinstance(obj, dict):
return obj
return None
def get_arena_auth_token_expiry_epoch(token: str) -> Optional[int]:
"""Best-effort expiry detection for arena-auth tokens"""
session = _decode_arena_auth_session_token(token)
if isinstance(session, dict):
try:
exp = session.get("expires_at")
if exp is not None:
return int(exp)
except Exception:
pass
access = str(session.get("access_token") or "").strip()
if access:
payload = _decode_jwt_payload(access)
if isinstance(payload, dict):
try:
exp = payload.get("exp")
if exp is not None:
return int(exp)
except Exception:
pass
return None
def is_arena_auth_token_expired(token: str, *, skew_seconds: int = 30) -> bool:
"""Return True if token is expired or about to expire"""
exp = get_arena_auth_token_expiry_epoch(token)
if exp is None:
return False
now = time.time()
return now >= (float(exp) - float(max(0, skew_seconds)))
def is_probably_valid_arena_auth_token(token: str) -> bool:
"""Check if token is a valid arena-auth token format"""
token = str(token or "").strip()
if not token:
return False
if token.startswith("base64-"):
session = _decode_arena_auth_session_token(token)
if not isinstance(session, dict):
return False
access = str(session.get("access_token") or "").strip()
if access.count(".") < 2:
return False
return not is_arena_auth_token_expired(token)
if token.count(".") >= 2:
if len(token) < 100:
return False
return not is_arena_auth_token_expired(token)
return False
# ============================================================
# HTTP REQUEST HELPERS (WITH PROXY SUPPORT)
# ============================================================
def get_request_headers_with_token(token: str) -> dict:
"""Get request headers with a specific auth token"""
config = get_config()
cf_clearance = str(config.get("cf_clearance") or "").strip()
user_agent = str(config.get("user_agent") or DEFAULT_USER_AGENT).strip()
cookie_parts = []
if cf_clearance:
cookie_parts.append(f"cf_clearance={cf_clearance}")
if token:
cookie_parts.append(f"arena-auth-prod-v1={token}")
headers = {
"Content-Type": "text/plain;charset=UTF-8",
"Origin": LMARENA_ORIGIN,
"Referer": f"{LMARENA_ORIGIN}/?mode=direct",
"User-Agent": user_agent,
}
if cookie_parts:
headers["Cookie"] = "; ".join(cookie_parts)
return headers
def get_next_auth_token(exclude_tokens: set = None) -> str:
"""Get next auth token using round-robin selection"""
config = get_config()
auth_tokens = config.get("auth_tokens", [])
if not isinstance(auth_tokens, list):
auth_tokens = []
# Normalize and filter tokens
auth_tokens = [str(t or "").strip() for t in auth_tokens if str(t or "").strip()]
# Filter out expired non-base64 tokens
filtered_tokens = []
for t in auth_tokens:
if t.startswith("base64-"):
filtered_tokens.append(t)
continue
try:
if is_arena_auth_token_expired(t):
continue
except Exception:
pass
filtered_tokens.append(t)
auth_tokens = filtered_tokens
# Back-compat: single token
if not auth_tokens:
single_token = str(config.get("auth_token") or "").strip()
if single_token and not is_arena_auth_token_expired(single_token):
auth_tokens = [single_token]
if not auth_tokens:
raise HTTPException(status_code=500, detail="No auth tokens configured. Please add your arena-auth-prod-v1 token.")
# Filter out excluded tokens
if exclude_tokens:
available_tokens = [t for t in auth_tokens if t not in exclude_tokens]
if not available_tokens:
raise HTTPException(status_code=500, detail="No more auth tokens available")
else:
available_tokens = auth_tokens
# Simple round-robin
global current_token_index
token = available_tokens[current_token_index % len(available_tokens)]
current_token_index = (current_token_index + 1) % len(auth_tokens)
return token
# Global token index
current_token_index = 0
def get_proxied_http_client() -> httpx.AsyncClient:
"""Get httpx client configured with proxy if available"""
config = get_config()
use_proxy = config.get("use_proxy", True)
proxy_url = get_proxy_url()
proxy = None
if use_proxy and proxy_url:
proxy = proxy_url
debug_print(f"Using proxy: {proxy_url}")
return httpx.AsyncClient(
timeout=DEFAULT_REQUEST_TIMEOUT,
proxy=proxy
)
# ============================================================
# MODEL MANAGEMENT
# ============================================================
async def fetch_models_from_lmarena() -> list:
"""Fetch models from LMArena website"""
debug_print("Fetching models from LMArena...")
config = get_config()
cf_clearance = str(config.get("cf_clearance") or "").strip()
user_agent = str(config.get("user_agent") or DEFAULT_USER_AGENT).strip()
use_proxy = config.get("use_proxy", True)
cookies = {}
if cf_clearance:
cookies["cf_clearance"] = cf_clearance
try:
proxy_url = get_proxy_url() if use_proxy else None
def _fetch():
scraper = cloudscraper.create_scraper()
scraper.headers.update({"User-Agent": user_agent})
proxies = None
if proxy_url:
proxies = {"http": proxy_url, "https": proxy_url}
return scraper.get(
f"{LMARENA_ORIGIN}/?mode=direct",
cookies=cookies if cookies else None,
timeout=30,
proxies=proxies
)
response = await asyncio.to_thread(_fetch)
if response.status_code != 200:
debug_print(f"Failed to fetch models: HTTP {response.status_code}")
return []
# Extract models from page content
page_body = response.text
match = re.search(r'{\\"initialModels\\":(\[.*?\]),\\"initialModel[A-Z]Id', page_body, re.DOTALL)
if match:
models_json = match.group(1).encode().decode('unicode_escape')
models = json.loads(models_json)
debug_print(f"Found {len(models)} models")
return models
else:
debug_print("Could not find models in page")
return []
except Exception as e:
debug_print(f"Error fetching models: {e}")
return []
# ============================================================
# CHAT COMPLETION (WITH PROXY)
# ============================================================
async def stream_chat_completion(
model_id: str,
model_public_name: str,
conversation_id: str,
prompt: str,
auth_token: str,
experimental_attachments: list = None,
recaptcha_token: str = "",
modality: str = "chat"
):
"""Stream chat completion from LMArena via proxy"""
config = get_config()
user_agent = str(config.get("user_agent") or DEFAULT_USER_AGENT).strip()
cf_clearance = str(config.get("cf_clearance") or "").strip()
# Build request payload
payload = {
"id": conversation_id,
"modelId": model_id,
"messages": [
{
"role": "user",
"content": prompt,
"experimental_attachments": experimental_attachments or []
}
],
"modality": modality,
"recaptchaV3Token": recaptcha_token,
}
# Build headers
headers = {
"Content-Type": "text/plain;charset=UTF-8",
"Origin": LMARENA_ORIGIN,
"Referer": f"{LMARENA_ORIGIN}/?mode=direct",
"User-Agent": user_agent,
}
# Add cookies
cookie_parts = []
if cf_clearance:
cookie_parts.append(f"cf_clearance={cf_clearance}")
if auth_token:
cookie_parts.append(f"arena-auth-prod-v1={auth_token}")
if cookie_parts:
headers["Cookie"] = "; ".join(cookie_parts)
url = f"{LMARENA_ORIGIN}{STREAM_CREATE_EVALUATION_PATH}"
debug_print(f"Streaming request to: {url}")
debug_print(f"Model: {model_public_name}")
debug_print(f"Conversation ID: {conversation_id}")
try:
async with get_proxied_http_client() as client:
async with client.stream(
"POST",
url,
headers=headers,
content=json.dumps(payload),
) as response:
debug_print(f"Response status: {response.status_code}")
if response.status_code != 200:
error_text = await response.aread()
error_msg = error_text.decode("utf-8", errors="replace")
debug_print(f"Error response: {error_msg[:500]}")
error_data = {"error": f"HTTP {response.status_code}: {error_msg[:200]}"}
yield f"data: {json.dumps(error_data)}\n\n"
return
# Stream SSE events
async for line in response.aiter_lines():
if line:
if line.startswith("data: "):
yield line + "\n"
elif line.startswith("0:"):
try:
content = json.loads(line[2:])
yield f"data: {json.dumps({'choices': [{'delta': {'content': content}}]})}\n\n"
except Exception:
content = line[2:].strip('"')
yield f"data: {json.dumps({'choices': [{'delta': {'content': content}}]})}\n\n"
else:
yield line + "\n"
yield "data: [DONE]\n\n"
except httpx.TimeoutException:
debug_print("Request timed out")
yield f"data: {json.dumps({'error': 'Request timed out'})}\n\n"
except Exception as e:
debug_print(f"Stream error: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
async def chat_completion_non_stream(
model_id: str,
model_public_name: str,
conversation_id: str,
prompt: str,
auth_token: str,
experimental_attachments: list = None,
recaptcha_token: str = "",
modality: str = "chat"
) -> dict:
"""Non-streaming chat completion"""
full_content = ""
error_msg = None
async for chunk in stream_chat_completion(
model_id, model_public_name, conversation_id, prompt,
auth_token, experimental_attachments, recaptcha_token, modality
):
if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"):
try:
data = json.loads(chunk[6:].strip())
if "error" in data:
error_msg = data["error"]
break
if "choices" in data:
for choice in data["choices"]:
delta = choice.get("delta", {})
content = delta.get("content", "")
if content:
full_content += content
except json.JSONDecodeError:
pass
if error_msg:
return {"error": error_msg}
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": model_public_name,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": full_content
},
"finish_reason": "stop"
}
]
}
# ============================================================
# IMAGE HANDLING
# ============================================================
async def upload_image_to_lmarena(image_data: bytes, mime_type: str, filename: str) -> Optional[tuple]:
"""Upload an image to LMArena R2 storage"""
debug_print(f"Image upload not supported in this version: {filename}")
return None
def _coerce_message_content_to_text(content) -> str:
"""Coerce message content to plain text"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for part in content:
if isinstance(part, dict):
if part.get("type") == "text":
parts.append(str(part.get("text", "")))
elif "text" in part:
parts.append(str(part.get("text", "")))
elif isinstance(part, str):
parts.append(part)
return "\n".join(parts).strip()
return str(content)
# ============================================================
# FASTAPI APPLICATION
# ============================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler"""
global _proxy_running
debug_print("Starting LMArena Bridge...")
# Start TunnelBear proxy
config = get_config()
if config.get("use_proxy", True):
country = config.get("proxy_country", "").strip() or None
if start_tunnelbear_proxy(country):
debug_print("TunnelBear VPN proxy started successfully")
else:
debug_print("Warning: Failed to start TunnelBear proxy, continuing without VPN")
# Try to fetch models on startup
try:
models = await fetch_models_from_lmarena()
if models:
save_models(models)
debug_print(f"Saved {len(models)} models")
except Exception as e:
debug_print(f"Could not fetch models on startup: {e}")
yield
# Cleanup
stop_tunnelbear_proxy()
app = FastAPI(
title="LMArena Bridge",
description="OpenAI-compatible API for LM Arena with VPN proxy support",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"https?://.*",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Usage stats
model_usage_stats = defaultdict(int)
api_key_usage = defaultdict(list)
# API Key dependency
async def rate_limit_api_key(request: Request):
"""Rate limit API key"""
config = get_config()
api_keys = config.get("api_keys", [])
# If no API keys configured, allow anonymous access
if not api_keys:
return {"key": "anonymous", "name": "Anonymous", "rpm": 9999}
auth_header = request.headers.get("Authorization", "")
api_key_str = None
if auth_header.startswith("Bearer "):
api_key_str = auth_header[7:].strip()
# If keys configured but none provided, use first key
if not api_key_str and api_keys:
api_key_str = api_keys[0]["key"]
key_data = next((k for k in api_keys if k["key"] == api_key_str), None)
if not key_data:
raise HTTPException(status_code=401, detail="Invalid API Key")
# Rate limiting
rate_limit = key_data.get("rpm", 60)
current_time = time.time()
# Clean up old timestamps
api_key_usage[api_key_str] = [t for t in api_key_usage[api_key_str] if current_time - t < 60]
if len(api_key_usage[api_key_str]) >= rate_limit:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
api_key_usage[api_key_str].append(current_time)
return key_data
# Health check
@app.get("/api/v1/health")
async def health_check():
"""Health check endpoint"""
models = get_models()
config = get_config()
has_cf_clearance = bool(config.get("cf_clearance"))
has_auth_tokens = bool(config.get("auth_tokens") or config.get("auth_token"))
has_models = len(models) > 0
proxy_active = _proxy_running
status = "healthy" if (has_auth_tokens and has_models) else "degraded"
return {
"status": status,
"timestamp": datetime.now(timezone.utc).isoformat(),
"checks": {
"cf_clearance": has_cf_clearance,
"auth_tokens": has_auth_tokens,
"models_loaded": has_models,
"model_count": len(models),
"vpn_proxy_active": proxy_active
}
}
# List models
@app.get("/api/v1/models")
async def list_models(api_key: dict = Depends(rate_limit_api_key)):
"""List available models"""
models = get_models()
# Filter for valid models
valid_models = [
m for m in models
if (m.get('capabilities', {}).get('outputCapabilities', {}).get('text')
or m.get('capabilities', {}).get('outputCapabilities', {}).get('search')
or m.get('capabilities', {}).get('outputCapabilities', {}).get('image'))
and m.get('organization')
]
return {
"object": "list",
"data": [
{
"id": model.get("publicName"),
"object": "model",
"created": int(time.time()),
"owned_by": model.get("organization", "lmarena")
}
for model in valid_models
if model.get("publicName")
]
}
# Chat completions
@app.post("/api/v1/chat/completions")
async def api_chat_completions(request: Request, api_key: dict = Depends(rate_limit_api_key)):
"""Chat completions endpoint"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
model_public_name = body.get("model")
messages = body.get("messages", [])
stream = body.get("stream", False)
if not model_public_name:
raise HTTPException(status_code=400, detail="Missing 'model' in request body")
if not messages:
raise HTTPException(status_code=400, detail="Missing 'messages' in request body")
# Find model
models = get_models()
model_id = None
model_capabilities = {}
for m in models:
if m.get("publicName") == model_public_name:
model_id = m.get("id")
model_capabilities = m.get("capabilities", {})
break
if not model_id:
raise HTTPException(status_code=404, detail=f"Model '{model_public_name}' not found")
# Determine modality
if model_capabilities.get("outputCapabilities", {}).get("image"):
modality = "image"
elif model_capabilities.get("outputCapabilities", {}).get("search"):
modality = "search"
else:
modality = "chat"
# Update usage stats
model_usage_stats[model_public_name] += 1
config = get_config()
config["usage_stats"] = dict(model_usage_stats)
save_config(config)
# Extract system prompt
system_prompt = ""
system_messages = [m for m in messages if m.get("role") == "system"]
if system_messages:
system_prompt = "\n\n".join([
_coerce_message_content_to_text(m.get("content", ""))
for m in system_messages
])
# Get last user message
last_message = messages[-1]
prompt = _coerce_message_content_to_text(last_message.get("content", ""))
if system_prompt:
prompt = f"{system_prompt}\n\n{prompt}"
# Generate conversation ID
conversation_id = str(uuid.uuid4())
# Get auth token
try:
auth_token = get_next_auth_token()
except HTTPException:
raise HTTPException(status_code=500, detail="No auth tokens configured. Please add your arena-auth-prod-v1 token in the Settings tab.")
if stream:
# Return streaming response
return StreamingResponse(
stream_chat_completion(
model_id, model_public_name, conversation_id, prompt,
auth_token, [], "", modality
),
media_type="text/event-stream"
)
else:
# Return non-streaming response
result = await chat_completion_non_stream(
model_id, model_public_name, conversation_id, prompt,
auth_token, [], "", modality
)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
# ============================================================
# GRADIO INTERFACE
# ============================================================
def get_model_list():
"""Get list of available models"""
models = get_models()
valid_models = [
m for m in models
if (m.get('capabilities', {}).get('outputCapabilities', {}).get('text')
or m.get('capabilities', {}).get('outputCapabilities', {}).get('search'))
and m.get('organization')
and m.get('publicName')
]
return sorted([m.get('publicName') for m in valid_models])
def update_config(auth_token, cf_clearance, user_agent, use_proxy, proxy_country):
"""Update configuration"""
config = get_config()
if auth_token.strip():
tokens = [t.strip() for t in auth_token.split("\n") if t.strip()]
config["auth_tokens"] = tokens
if cf_clearance.strip():
config["cf_clearance"] = cf_clearance.strip()
if user_agent.strip():
config["user_agent"] = user_agent.strip()
config["use_proxy"] = use_proxy
config["proxy_country"] = proxy_country.strip()
save_config(config)
return "Configuration saved!"
def toggle_proxy(use_proxy, proxy_country):
"""Toggle proxy on/off"""
config = get_config()
config["use_proxy"] = use_proxy
config["proxy_country"] = proxy_country.strip()
save_config(config)
if use_proxy:
country = proxy_country.strip() or None
if start_tunnelbear_proxy(country):
return "✅ VPN Proxy started successfully!"
else:
return "❌ Failed to start VPN Proxy. Check logs."
else:
stop_tunnelbear_proxy()
return "🔴 VPN Proxy stopped."
def get_proxy_status():
"""Get current proxy status"""
global _proxy_running, _proxy_client
if _proxy_running and _proxy_client:
return f"🟢 Active - Region: {_proxy_client.region_name} ({_proxy_client.country_iso})\nSOCKS5: {_socks5_host}:{_socks5_port}"
return "🔴 Inactive"
async def refresh_models():
"""Refresh models from LMArena"""
try:
models = await fetch_models_from_lmarena()
if models:
save_models(models)
return f"Successfully loaded {len(models)} models!"
else:
return "No models found. Make sure cf_clearance cookie is valid."
except Exception as e:
return f"Error refreshing models: {str(e)}"
async def chat_with_model(model_name, message, history):
"""Chat with a model"""
if not model_name:
return history, "Please select a model first."
if not message.strip():
return history, "Please enter a message."
config = get_config()
if not config.get("auth_tokens") and not config.get("auth_token"):
return history, "No auth tokens configured. Please add your arena-auth-prod-v1 token in Settings."
try:
# Build messages from history
messages = []
for user_msg, assistant_msg in history:
messages.append({"role": "user", "content": user_msg})
if assistant_msg:
messages.append({"role": "assistant", "content": assistant_msg})
messages.append({"role": "user", "content": message})
# Find model
models = get_models()
model_id = None
model_capabilities = {}
for m in models:
if m.get("publicName") == model_name:
model_id = m.get("id")
model_capabilities = m.get("capabilities", {})
break
if not model_id:
return history, f"Model '{model_name}' not found."
# Determine modality
if model_capabilities.get("outputCapabilities", {}).get("image"):
modality = "image"
elif model_capabilities.get("outputCapabilities", {}).get("search"):
modality = "search"
else:
modality = "chat"
# Get auth token
auth_token = get_next_auth_token()
# Run chat completion
result = await chat_completion_non_stream(
model_id, model_name, str(uuid.uuid4()), message,
auth_token, [], "", modality
)
if "error" in result:
return history, f"Error: {result['error']}"
# Extract response
response = ""
if "choices" in result and result["choices"]:
response = result["choices"][0].get("message", {}).get("content", "")
# Update history
history.append((message, response))
return history, ""
except Exception as e:
return history, f"Error: {str(e)}"
def create_gradio_interface():
"""Create Gradio interface"""
with gr.Blocks(title="LMArena Bridge", theme=gr.themes.Soft()) as interface:
gr.Markdown(
"""
# 🚀 LMArena Bridge with VPN Proxy
A bridge to interact with LM Arena models via an OpenAI-compatible API.
**Now with TunnelBear VPN proxy for anonymous access!**
**Quick Start:**
1. VPN Proxy starts automatically (check Proxy tab for status)
2. Go to **Settings** tab and add your `arena-auth-prod-v1` token
3. Click **Refresh Models** to load available models
4. Start chatting in the **Chat** tab!
"""
)
with gr.Tabs():
with gr.TabItem("💬 Chat"):
with gr.Row():
with gr.Column(scale=3):
model_dropdown = gr.Dropdown(
label="Model",
choices=get_model_list(),
value=None,
interactive=True
)
chatbot = gr.Chatbot(
label="Conversation",
height=400,
show_copy_button=True
)
with gr.Row():
message_input = gr.Textbox(
label="Message",
placeholder="Type your message here...",
scale=4
)
send_btn = gr.Button("Send", variant="primary", scale=1)
status_output = gr.Textbox(label="Status", interactive=False)
refresh_models_btn = gr.Button("🔄 Refresh Models", variant="secondary")
async def on_refresh():
status = await refresh_models()
return status, gr.Dropdown(choices=get_model_list())
refresh_models_btn.click(
fn=on_refresh,
outputs=[status_output, model_dropdown]
)
send_btn.click(
fn=chat_with_model,
inputs=[model_dropdown, message_input, chatbot],
outputs=[chatbot, status_output]
).then(
fn=lambda: "",
outputs=message_input
)
message_input.submit(
fn=chat_with_model,
inputs=[model_dropdown, message_input, chatbot],
outputs=[chatbot, status_output]
).then(
fn=lambda: "",
outputs=message_input
)
with gr.TabItem("🔒 VPN Proxy"):
config = get_config()
gr.Markdown("### TunnelBear VPN Proxy Status")
proxy_status_display = gr.Textbox(
label="Proxy Status",
value=get_proxy_status(),
interactive=False
)
refresh_proxy_btn = gr.Button("🔄 Refresh Status")
refresh_proxy_btn.click(fn=get_proxy_status, outputs=proxy_status_display)
gr.Markdown("### Proxy Settings")
use_proxy_check = gr.Checkbox(
label="Use VPN Proxy",
value=config.get("use_proxy", True)
)
proxy_country_input = gr.Textbox(
label="Proxy Country (optional)",
placeholder="US, CA, DE, GB, etc. Leave empty for auto-select",
value=config.get("proxy_country", "")
)
toggle_proxy_btn = gr.Button("Apply Proxy Settings", variant="primary")
proxy_result = gr.Textbox(label="Result", interactive=False)
toggle_proxy_btn.click(
fn=toggle_proxy,
inputs=[use_proxy_check, proxy_country_input],
outputs=proxy_result
).then(
fn=get_proxy_status,
outputs=proxy_status_display
)
gr.Markdown(
"""
### About the VPN Proxy
This bridge uses TunnelBear's free VPN service to route requests anonymously.
- **SOCKS5 Proxy**: `127.0.0.1:1080`
- **Auto-select**: Chooses closest server automatically
- **Country selection**: You can specify a country code (US, CA, DE, etc.)
The VPN proxy helps avoid rate limits and IP blocks from LMArena.
"""
)
with gr.TabItem("⚙️ Settings"):
config = get_config()
gr.Markdown("### Authentication")
gr.Markdown(
"""
**Auth Tokens:** Add your `arena-auth-prod-v1` cookie value (one per line).
This is required to use the bridge. Get it from lmarena.ai DevTools > Application > Cookies.
"""
)
auth_token_input = gr.Textbox(
label="Auth Tokens (arena-auth-prod-v1)",
placeholder="base64-...",
value="\n".join(config.get("auth_tokens", [])),
lines=5
)
gr.Markdown(
"""
**Cloudflare Clearance:** Optional but recommended. Get `cf_clearance` cookie from lmarena.ai.
"""
)
cf_clearance_input = gr.Textbox(
label="Cloudflare Clearance (cf_clearance)",
placeholder="cf_clearance value...",
value=config.get("cf_clearance", "")
)
gr.Markdown("### Advanced")
user_agent_input = gr.Textbox(
label="User-Agent",
value=config.get("user_agent", DEFAULT_USER_AGENT)
)
save_btn = gr.Button("💾 Save Configuration", variant="primary")
config_status = gr.Textbox(label="Status", interactive=False)
save_btn.click(
fn=update_config,
inputs=[auth_token_input, cf_clearance_input, user_agent_input, use_proxy_check, proxy_country_input],
outputs=config_status
)
gr.Markdown("### API Information")
gr.Markdown(
f"""
**API Base URL:** `http://localhost:{PORT}/api/v1`
**OpenAI-Compatible Endpoints:**
- `GET /api/v1/models` - List available models
- `POST /api/v1/chat/completions` - Chat completions
- `GET /api/v1/health` - Health check (includes proxy status)
"""
)
with gr.TabItem("📊 Stats"):
config = get_config()
stats = config.get("usage_stats", {})
gr.Markdown("### Usage Statistics")
if stats:
stats_text = "\n".join([f"- **{model}:** {count} requests" for model, count in sorted(stats.items(), key=lambda x: x[1], reverse=True)])
gr.Markdown(stats_text)
else:
gr.Markdown("No usage statistics yet. Start chatting to see stats!")
refresh_stats_btn = gr.Button("🔄 Refresh Stats")
stats_display = gr.Markdown()
def refresh_stats():
config = get_config()
stats = config.get("usage_stats", {})
if stats:
return "\n".join([f"- **{model}:** {count} requests" for model, count in sorted(stats.items(), key=lambda x: x[1], reverse=True)])
return "No usage statistics yet."
refresh_stats_btn.click(fn=refresh_stats, outputs=stats_display)
return interface
# ============================================================
# MAIN ENTRY POINT
# ============================================================
def run_both():
"""Run both FastAPI and Gradio UI cooperatively on the same port"""
interface = create_gradio_interface()
# Safely mounts Gradio to run simultaneously alongside the API endpoints.
app_with_ui = gr.mount_gradio_app(app, interface, path="/")
uvicorn.run(
app_with_ui,
host=HOST,
port=PORT,
log_level="info"
)
def run_fastapi():
"""Run pure FastAPI API without the UI overlay"""
uvicorn.run(
app,
host=HOST,
port=PORT,
log_level="info"
)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="LMArena Bridge with VPN Proxy")
parser.add_argument("--mode", choices=["fastapi", "both"], default="both",
help="Run mode: fastapi (API only) or both (API + Gradio UI). Default is both.")
parser.add_argument("--port", type=int, default=PORT, help="Port to run on")
parser.add_argument("--host", default=HOST, help="Host to bind to")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--no-proxy", action="store_true", help="Disable VPN proxy")
args = parser.parse_args()
if args.debug:
DEBUG = True
if args.port:
PORT = args.port
if args.host:
HOST = args.host
if args.no_proxy:
config = get_config()
config["use_proxy"] = False
save_config(config)
if args.mode == "fastapi":
run_fastapi()
else:
run_both()
PYTHON_EOF
# Expose port
EXPOSE 7860
# Run the application
CMD ["python", "/app/app.py"]