| """FastAPI server for Hutter Prize (100MB) β Live. |
| |
| Two routes do real work: |
| |
| GET /api/messages β JSON: {"items": [{"filename": "...", "content": "..."}]} |
| One round-trip for the whole message_board folder. |
| POST /api/messages β create a human-authored user message. |
| GET /api/leaderboard β text/markdown: the contents of LEADERBOARD.md |
| |
| A small static mount serves the SPA from `./static/`. |
| |
| Two operating modes, picked from environment variables: |
| |
| β’ Production (deployed Space): |
| HF_TOKEN=hf_xxx # Secret with read/write access to the bucket |
| β fetches from huggingface.co with Authorization: Bearer |
| |
| β’ Local development: |
| LOCAL_BUCKET_DIR=/path/to/hutter-prize-collab |
| β reads directly from disk, no network, no auth |
| |
| When neither is set, the API endpoints return 401 with a helpful message. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import os |
| import re |
| import secrets |
| import time |
| from contextlib import asynccontextmanager |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
| from urllib.parse import urlencode |
| from uuid import uuid4 |
|
|
| import httpx |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import RedirectResponse, Response |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel, Field |
| from starlette.middleware.sessions import SessionMiddleware |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
| log = logging.getLogger("hutter-prize-live") |
|
|
| BUCKET = os.environ.get("BUCKET", "ml-intern-explorers/hutter-prize-collab") |
| PREFIX = os.environ.get("PREFIX", "message_board") |
| RESULTS_PREFIX = os.environ.get("RESULTS_PREFIX", "results") |
| AGENTS_PREFIX = os.environ.get("AGENTS_PREFIX", "agents") |
| HUB = "https://huggingface.co" |
|
|
| LOCAL_BUCKET_DIR = os.environ.get("LOCAL_BUCKET_DIR") |
| HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") |
| HUB_FETCH_TIMEOUT = float(os.environ.get("HUB_FETCH_TIMEOUT", "30.0")) |
|
|
| |
| |
| |
| OAUTH_CLIENT_ID = os.environ.get("OAUTH_CLIENT_ID") |
| OAUTH_CLIENT_SECRET = os.environ.get("OAUTH_CLIENT_SECRET") |
| OAUTH_SCOPES = os.environ.get("OAUTH_SCOPES", "openid profile write-repos") |
| OAUTH_REQUIRED_ORG = "ml-intern-explorers" |
| SESSION_SECRET = ( |
| os.environ.get("SESSION_SECRET") |
| or os.environ.get("OAUTH_CLIENT_SECRET") |
| or secrets.token_hex(32) |
| ) |
| MAX_USER_MESSAGE_CHARS = int(os.environ.get("MAX_USER_MESSAGE_CHARS", "4000")) |
| HANDLE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]{0,31}$") |
| REF_FILENAME_RE = re.compile(r"^[A-Za-z0-9_.-]+\.md$") |
|
|
|
|
| class MessagePost(BaseModel): |
| body: str = "" |
| refs: list[str] = Field(default_factory=list) |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| headers: dict[str, str] = {} |
| if HF_TOKEN: |
| headers["Authorization"] = f"Bearer {HF_TOKEN}" |
| |
| |
| app.state.client = httpx.AsyncClient( |
| headers=headers, |
| timeout=httpx.Timeout(HUB_FETCH_TIMEOUT), |
| follow_redirects=True, |
| limits=httpx.Limits(max_connections=200, max_keepalive_connections=50), |
| ) |
| if LOCAL_BUCKET_DIR: |
| log.info("Local mode β reading from %s", LOCAL_BUCKET_DIR) |
| elif HF_TOKEN: |
| log.info("Hub mode β fetching from %s with HF_TOKEN", HUB) |
| |
| |
| async def _warm_cache(): |
| try: |
| await asyncio.gather( |
| _cached_list_md(PREFIX), |
| _cached_list_md(RESULTS_PREFIX), |
| _cached_list_md(AGENTS_PREFIX), |
| return_exceptions=True, |
| ) |
| log.info("Cache warm-up complete.") |
| except Exception as e: |
| log.warning("Cache warm-up failed: %s", e) |
| asyncio.create_task(_warm_cache()) |
| else: |
| log.warning( |
| "Neither LOCAL_BUCKET_DIR nor HF_TOKEN is set. /api/* will 401." |
| ) |
| try: |
| yield |
| finally: |
| await app.state.client.aclose() |
|
|
|
|
| app = FastAPI(title="Hutter Prize Live", lifespan=lifespan) |
| app.add_middleware( |
| SessionMiddleware, |
| secret_key=SESSION_SECRET, |
| session_cookie="hp_session", |
| max_age=60 * 60 * 24 * 30, |
| |
| |
| |
| |
| |
| same_site="none" if OAUTH_CLIENT_ID else "lax", |
| https_only=bool(OAUTH_CLIENT_ID), |
| ) |
|
|
|
|
| |
| |
| |
| @app.get("/api/health") |
| async def health() -> dict[str, Any]: |
| mode = "local" if LOCAL_BUCKET_DIR else ("hub" if HF_TOKEN else "unconfigured") |
| return { |
| "ok": True, |
| "mode": mode, |
| "bucket": BUCKET, |
| "prefix": PREFIX, |
| "results_prefix": RESULTS_PREFIX, |
| "agents_prefix": AGENTS_PREFIX, |
| "oauth": bool(OAUTH_CLIENT_ID), |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| def _redirect_uri(request: Request) -> str: |
| |
| |
| |
| forwarded_proto = request.headers.get("x-forwarded-proto", request.url.scheme) |
| host = request.headers.get("x-forwarded-host") or request.headers.get("host") or request.url.netloc |
| return f"{forwarded_proto}://{host}/auth/callback" |
|
|
|
|
| @app.get("/login") |
| async def login(request: Request): |
| if not (OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET): |
| return Response( |
| "OAuth is not configured on this server (set hf_oauth: true in the " |
| "Space README and redeploy).\n", |
| status_code=503, |
| media_type="text/plain", |
| ) |
| state = secrets.token_urlsafe(16) |
| request.session["oauth_state"] = state |
| next_url = request.query_params.get("next", "/") |
| request.session["oauth_next"] = next_url if next_url.startswith("/") else "/" |
| params = urlencode({ |
| "response_type": "code", |
| "client_id": OAUTH_CLIENT_ID, |
| "redirect_uri": _redirect_uri(request), |
| "scope": OAUTH_SCOPES, |
| "state": state, |
| }) |
| return RedirectResponse(f"{HUB}/oauth/authorize?{params}") |
|
|
|
|
| @app.get("/auth/callback") |
| async def oauth_callback(request: Request): |
| |
| |
| |
| rid = secrets.token_hex(4) |
| error = request.query_params.get("error") |
| if error: |
| log.warning("[oauth %s] provider error=%s desc=%s", rid, error, request.query_params.get("error_description", "")[:200]) |
| return RedirectResponse(f"/?login_error={error}") |
| code = request.query_params.get("code") |
| state = request.query_params.get("state") |
| session_state = request.session.get("oauth_state") |
| if not code or not state or state != session_state: |
| |
| |
| |
| log.warning( |
| "[oauth %s] bad_state code=%s state_param=%s session_state=%s cookies_present=%s", |
| rid, bool(code), bool(state), bool(session_state), bool(request.cookies), |
| ) |
| return RedirectResponse("/?login_error=bad_state") |
| if not (OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET): |
| log.warning("[oauth %s] server_unconfigured", rid) |
| return RedirectResponse("/?login_error=server_unconfigured") |
|
|
| |
| |
| |
| try: |
| async with httpx.AsyncClient(timeout=httpx.Timeout(HUB_FETCH_TIMEOUT), follow_redirects=True) as oauth_client: |
| token_resp = await oauth_client.post( |
| f"{HUB}/oauth/token", |
| data={ |
| "grant_type": "authorization_code", |
| "code": code, |
| "redirect_uri": _redirect_uri(request), |
| "client_id": OAUTH_CLIENT_ID, |
| "client_secret": OAUTH_CLIENT_SECRET, |
| }, |
| headers={"Accept": "application/json"}, |
| ) |
| if not token_resp.is_success: |
| log.warning("[oauth %s] token_exchange status=%s body=%s", rid, token_resp.status_code, token_resp.text[:300]) |
| return RedirectResponse("/?login_error=token_exchange") |
| access_token = token_resp.json().get("access_token") |
| if not access_token: |
| log.warning("[oauth %s] no_token body=%s", rid, token_resp.text[:200]) |
| return RedirectResponse("/?login_error=no_token") |
|
|
| me_resp = await oauth_client.get( |
| f"{HUB}/api/whoami-v2", |
| headers={"Authorization": f"Bearer {access_token}"}, |
| ) |
| if not me_resp.is_success: |
| log.warning("[oauth %s] whoami status=%s body=%s", rid, me_resp.status_code, me_resp.text[:200]) |
| return RedirectResponse("/?login_error=whoami") |
| me = me_resp.json() |
| username = me.get("name") or me.get("preferred_username") |
| if not username: |
| log.warning("[oauth %s] no_username keys=%s", rid, sorted(me.keys())) |
| return RedirectResponse("/?login_error=no_username") |
| |
| |
| org_names = {o.get("name") for o in (me.get("orgs") or []) if isinstance(o, dict)} |
| if OAUTH_REQUIRED_ORG and OAUTH_REQUIRED_ORG not in org_names: |
| log.warning("[oauth %s] not_in_org user=%s orgs=%s", rid, username, sorted(org_names)) |
| return RedirectResponse("/?login_error=not_in_org") |
|
|
| request.session["user"] = username |
| request.session["avatar"] = me.get("avatarUrl") or "" |
| |
| |
| request.session["access_token"] = access_token |
| request.session.pop("oauth_state", None) |
| next_url = request.session.pop("oauth_next", "/") |
| log.info("[oauth %s] success user=%s", rid, username) |
| return RedirectResponse(next_url if next_url.startswith("/") else "/") |
| except Exception as e: |
| log.warning("[oauth %s] exception %s: %s", rid, type(e).__name__, e) |
| return RedirectResponse("/?login_error=exception") |
|
|
|
|
| @app.get("/logout") |
| async def logout(request: Request): |
| request.session.clear() |
| return RedirectResponse("/") |
|
|
|
|
| @app.get("/api/me") |
| async def api_me(request: Request) -> dict[str, Any]: |
| user = request.session.get("user") |
| if not user: |
| return {"logged_in": False, "oauth_configured": bool(OAUTH_CLIENT_ID)} |
| return { |
| "logged_in": True, |
| "user": user, |
| "avatar": request.session.get("avatar") or "", |
| } |
|
|
|
|
| |
| |
| |
| def _list_md_local(prefix: str) -> list[dict[str, str]]: |
| folder = Path(LOCAL_BUCKET_DIR) / prefix |
| if not folder.is_dir(): |
| return [] |
| items: list[dict[str, str]] = [] |
| for f in sorted(folder.glob("*.md")): |
| if f.name.lower() == "readme.md": |
| continue |
| try: |
| items.append({"filename": f.name, "content": f.read_text(encoding="utf-8")}) |
| except OSError: |
| pass |
| return items |
|
|
|
|
| async def _list_md_hub(prefix: str) -> list[dict[str, str]]: |
| if not HF_TOKEN: |
| raise HTTPException(401, "Server is not configured: set HF_TOKEN.") |
| client: httpx.AsyncClient = app.state.client |
|
|
| tree_resp = await client.get(f"{HUB}/api/buckets/{BUCKET}/tree/{prefix}") |
| if tree_resp.status_code == 404: |
| |
| return [] |
| if tree_resp.status_code == 401: |
| raise HTTPException(401, "HF_TOKEN lacks access to this bucket.") |
| if not tree_resp.is_success: |
| raise HTTPException(tree_resp.status_code, f"Hub tree fetch: {tree_resp.text[:200]}") |
|
|
| paths: list[str] = [ |
| e["path"] |
| for e in tree_resp.json() |
| if e.get("type") == "file" |
| and e.get("path", "").endswith(".md") |
| and not e["path"].lower().endswith("readme.md") |
| ] |
|
|
| async def fetch_one(p: str) -> dict[str, str] | None: |
| try: |
| r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/{p}") |
| if r.status_code != 200: |
| log.warning("Fetch %s β %s", p, r.status_code) |
| return None |
| return {"filename": p.split("/")[-1], "content": r.text} |
| except Exception as e: |
| log.warning("Fetch %s failed: %s", p, e) |
| return None |
|
|
| results = await asyncio.gather(*(fetch_one(p) for p in paths)) |
| return [r for r in results if r is not None] |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| LIST_CACHE_TTL = float(os.environ.get("LIST_CACHE_TTL", "20.0")) |
| _list_cache: dict[str, tuple[float, list[dict[str, str]]]] = {} |
| _list_locks: dict[str, asyncio.Lock] = {} |
|
|
|
|
| async def _cached_list_md(prefix: str) -> list[dict[str, str]]: |
| if LOCAL_BUCKET_DIR: |
| |
| return _list_md_local(prefix) |
| now = time.monotonic() |
| cached = _list_cache.get(prefix) |
| if cached and (now - cached[0]) < LIST_CACHE_TTL: |
| return cached[1] |
| lock = _list_locks.setdefault(prefix, asyncio.Lock()) |
| async with lock: |
| |
| |
| cached = _list_cache.get(prefix) |
| if cached and (time.monotonic() - cached[0]) < LIST_CACHE_TTL: |
| return cached[1] |
| items = await _list_md_hub(prefix) |
| _list_cache[prefix] = (time.monotonic(), items) |
| return items |
|
|
|
|
| def _invalidate_list_cache(prefix: str) -> None: |
| _list_cache.pop(prefix, None) |
|
|
|
|
| |
| |
| |
| @app.get("/api/messages") |
| async def messages() -> dict[str, Any]: |
| items = await _cached_list_md(PREFIX) |
| return {"items": items, "count": len(items)} |
|
|
|
|
| @app.get("/api/results") |
| async def results() -> dict[str, Any]: |
| items = await _cached_list_md(RESULTS_PREFIX) |
| return {"items": items, "count": len(items)} |
|
|
|
|
| @app.get("/api/agents") |
| async def agents() -> dict[str, Any]: |
| items = await _cached_list_md(AGENTS_PREFIX) |
| return {"items": items, "count": len(items)} |
|
|
|
|
| def _normalize_refs(refs: list[str]) -> list[str]: |
| clean_refs = [ref.strip().split("/")[-1] for ref in refs if ref.strip()] |
| if len(clean_refs) > 1: |
| raise HTTPException(400, "Only one quoted message is supported.") |
| for ref in clean_refs: |
| if not REF_FILENAME_RE.fullmatch(ref) or ref.lower() == "readme.md": |
| raise HTTPException(400, "Quoted message reference is invalid.") |
| return clean_refs |
|
|
|
|
| def _normalize_human_post(post: MessagePost, username: str) -> tuple[str, str, list[str]]: |
| body = post.body.strip() |
| if not HANDLE_RE.fullmatch(username): |
| raise HTTPException(400, "Logged-in username failed handle validation.") |
| if not body: |
| raise HTTPException(400, "Message body is required.") |
| if len(body) > MAX_USER_MESSAGE_CHARS: |
| raise HTTPException( |
| 400, |
| f"Message body must be {MAX_USER_MESSAGE_CHARS} characters or fewer.", |
| ) |
| refs = _normalize_refs(post.refs) |
| return username, body, refs |
|
|
|
|
| def _format_user_message(username: str, body: str, refs: list[str]) -> tuple[str, str]: |
| now = datetime.now(timezone.utc) |
| filename = f"{now:%Y%m%d-%H%M%S}_human-{username}_{uuid4().hex[:8]}.md" |
| frontmatter = [ |
| "---", |
| f"agent: human:{username}", |
| "type: user", |
| f"timestamp: {now:%Y-%m-%d %H:%M UTC}", |
| ] |
| if refs: |
| frontmatter.append(f"refs: {refs[0]}") |
| content = "\n".join([*frontmatter, "---", "", body, ""]) |
| return filename, content |
|
|
|
|
| def _write_message_local(filename: str, content: str) -> None: |
| msg_dir = Path(LOCAL_BUCKET_DIR) / PREFIX |
| msg_dir.mkdir(parents=True, exist_ok=True) |
| (msg_dir / filename).write_text(content, encoding="utf-8") |
|
|
|
|
| def _write_message_hub(filename: str, content: str, token: str | None = None) -> None: |
| try: |
| from huggingface_hub import batch_bucket_files |
| except ImportError as e: |
| raise RuntimeError("Install huggingface_hub to enable bucket writes.") from e |
|
|
| |
| |
| |
| use_token = token or HF_TOKEN |
| if not use_token: |
| raise RuntimeError("No token available for writing to the bucket.") |
|
|
| batch_bucket_files( |
| BUCKET, |
| add=[(content.encode("utf-8"), f"{PREFIX}/{filename}")], |
| token=use_token, |
| ) |
|
|
|
|
| @app.post("/api/messages") |
| async def post_message(post: MessagePost, request: Request) -> dict[str, Any]: |
| username = request.session.get("user") |
| if not username: |
| raise HTTPException(401, "Not logged in. Sign in with Hugging Face to post.") |
| user_token = request.session.get("access_token") |
| handle, body, refs = _normalize_human_post(post, username) |
| filename, content = _format_user_message(handle, body, refs) |
| if LOCAL_BUCKET_DIR: |
| try: |
| _write_message_local(filename, content) |
| except OSError as e: |
| log.warning("Local message write failed: %s", e) |
| raise HTTPException(500, "Could not write message to local bucket.") from e |
| else: |
| |
| |
| if not (user_token or HF_TOKEN): |
| raise HTTPException(401, "Server is not configured: set HF_TOKEN.") |
| try: |
| await asyncio.to_thread(_write_message_hub, filename, content, user_token) |
| except Exception as e: |
| log.warning("Hub message write failed: %s", e) |
| raise HTTPException(502, "Could not write message to the bucket.") from e |
| |
| |
| _invalidate_list_cache(PREFIX) |
| return {"item": {"filename": filename, "content": content}} |
|
|
|
|
| |
| |
| |
| @app.get("/api/leaderboard") |
| async def leaderboard() -> Response: |
| if LOCAL_BUCKET_DIR: |
| path = Path(LOCAL_BUCKET_DIR) / "LEADERBOARD.md" |
| if not path.is_file(): |
| raise HTTPException(404, "LEADERBOARD.md not found in LOCAL_BUCKET_DIR") |
| return Response( |
| content=path.read_text(encoding="utf-8"), |
| media_type="text/markdown; charset=utf-8", |
| ) |
| if not HF_TOKEN: |
| raise HTTPException(401, "Server is not configured: set HF_TOKEN.") |
| client: httpx.AsyncClient = app.state.client |
| r = await client.get(f"{HUB}/buckets/{BUCKET}/resolve/LEADERBOARD.md") |
| if r.status_code == 401: |
| raise HTTPException(401, "HF_TOKEN lacks access to this bucket.") |
| if not r.is_success: |
| raise HTTPException(r.status_code, f"Hub returned {r.status_code}") |
| return Response(content=r.text, media_type="text/markdown; charset=utf-8") |
|
|
|
|
| |
| |
| |
| _static_dir = Path(__file__).parent / "static" |
| app.mount("/", StaticFiles(directory=str(_static_dir), html=True), name="static") |
|
|