| """ |
| Queue Server - hosted on Hugging Face Spaces |
| ============================================ |
| Acts as the neutral relay between the public GUI and local mirrors. |
| |
| Run with: |
| pip install fastapi uvicorn |
| uvicorn server:app --host 0.0.0.0 --port 7860 |
| |
| Environment variables: |
| QUEUE_API_KEY - shared secret for authenticating mirrors and GUI clients |
| JOB_TTL - seconds before a pending job is considered timed out (default 30) |
| POLL_INTERVAL - seconds between mirror poll cycles, informational only (default 2) |
| """ |
|
|
| import asyncio |
| import os |
| import time |
| import logging |
| from contextlib import asynccontextmanager |
| from typing import Optional |
| import json |
| from fastapi import Request, Response |
| import asyncio |
| from fastapi.responses import StreamingResponse |
|
|
| from fastapi import FastAPI, HTTPException, Header, BackgroundTasks |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| |
| _stream_queues: dict[str, asyncio.Queue] = {} |
| _stream_lock = asyncio.Lock() |
|
|
| STRIP_HEADERS = { |
| "content-length", |
| "content-encoding", |
| "transfer-encoding", |
| "connection", |
| "host", |
| "x-api-key", |
| |
| "x-forwarded-for", |
| "x-forwarded-proto", |
| "x-forwarded-port", |
| "x-amzn-trace-id", |
| "x-direct-url", |
| "x-ip-token", |
| "x-request-id", |
| |
| "x-real-ip", |
| "x-scheme", |
| "x-envoy-expected-rq-timeout-ms", |
| } |
|
|
|
|
| |
| |
| |
| from models import APIJob, ClaimRequest, CompleteRequest, JobStatus |
|
|
| logging.basicConfig(level=logging.INFO) |
| log = logging.getLogger("queue-server") |
|
|
| |
| |
| |
| _jobs: dict[str, APIJob] = {} |
| _lock = asyncio.Lock() |
|
|
| API_KEY = os.environ.get("QUEUE_API_KEY", "changeme") |
| JOB_TTL = 300 |
|
|
|
|
| |
| |
| |
| async def _reaper(): |
| while True: |
| await asyncio.sleep(5) |
| now = time.time() |
| async with _lock: |
| for job in list(_jobs.values()): |
| if job.status in (JobStatus.PENDING, JobStatus.CLAIMED): |
| age = now - job.created_at |
| if age > job.ttl: |
| job.status = JobStatus.TIMEOUT |
| log.info(f"Job {job.job_id} timed out after {age:.1f}s") |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| task = asyncio.create_task(_reaper()) |
| yield |
| task.cancel() |
|
|
|
|
| app = FastAPI(title="API Proxy Queue", lifespan=lifespan) |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| |
| |
| def _check_auth(x_api_key: Optional[str], authorization: Optional[str] = None): |
| |
| bearer = None |
| if authorization and authorization.startswith("Bearer "): |
| bearer = authorization[7:] |
| |
| if x_api_key == API_KEY or bearer == API_KEY: |
| return |
| |
| return |
| |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/jobs", response_model=APIJob, summary="Submit a new API job") |
| async def submit_job( |
| job_in: APIJob, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key) |
| job_in.ttl = JOB_TTL |
| job_in.status = JobStatus.PENDING |
| async with _lock: |
| _jobs[job_in.job_id] = job_in |
| log.info(f"Job {job_in.job_id} submitted: {job_in.method} {job_in.endpoint}") |
| return job_in |
|
|
|
|
| @app.get("/jobs/{job_id}", response_model=APIJob, summary="Poll for a job's result") |
| async def get_job( |
| job_id: str, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key) |
| async with _lock: |
| job = _jobs.get(job_id) |
| if not job: |
| raise HTTPException(status_code=404, detail="Job not found") |
| return job |
|
|
|
|
| @app.get("/jobs/{job_id}/wait", response_model=APIJob, summary="Long-poll until job completes or times out") |
| async def wait_for_job( |
| job_id: str, |
| timeout: float = 25.0, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| """ |
| Blocks up to `timeout` seconds waiting for the job to complete. |
| Much more efficient than client-side polling. |
| """ |
| _check_auth(x_api_key) |
| deadline = time.time() + min(timeout, JOB_TTL) |
| while time.time() < deadline: |
| async with _lock: |
| job = _jobs.get(job_id) |
| if not job: |
| raise HTTPException(status_code=404, detail="Job not found") |
| if job.status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.TIMEOUT): |
| return job |
| await asyncio.sleep(0.3) |
|
|
| |
| async with _lock: |
| return _jobs[job_id] |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/mirror/claim", response_model=Optional[APIJob], summary="Mirror claims a pending job it can serve") |
| async def claim_job( |
| claim: ClaimRequest, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| """ |
| The mirror sends its ID and the list of endpoint prefixes it can serve. |
| The server atomically assigns the first matching pending job. |
| Returns null if nothing is available. |
| """ |
| _check_auth(x_api_key) |
| now = time.time() |
| async with _lock: |
| for job in _jobs.values(): |
| if job.status != JobStatus.PENDING: |
| continue |
| |
| if now - job.created_at > job.ttl: |
| continue |
| |
| if job.target_mirror and job.target_mirror != claim.mirror_id: |
| continue |
| |
| if not any(job.endpoint.startswith(ep) for ep in claim.available_endpoints): |
| continue |
| |
| job.status = JobStatus.CLAIMED |
| job.claimed_at = now |
| log.info(f"Job {job.job_id} claimed by mirror '{claim.mirror_id}'") |
| return job |
| return None |
|
|
|
|
| @app.post("/mirror/complete", summary="Mirror posts the result of a completed job") |
| async def complete_job( |
| result: CompleteRequest, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key) |
| async with _lock: |
| job = _jobs.get(result.job_id) |
| if not job: |
| raise HTTPException(status_code=404, detail="Job not found") |
| if job.status != JobStatus.CLAIMED: |
| raise HTTPException(status_code=409, detail=f"Job is in state '{job.status}', cannot complete") |
|
|
| job.status = JobStatus.FAILED if result.error else JobStatus.COMPLETED |
| job.completed_at = time.time() |
| job.response_status_code = result.response_status_code |
| job.response_headers = result.response_headers |
| job.response_body = result.response_body |
| job.error = result.error |
|
|
| log.info(f"Job {result.job_id} completed by mirror '{result.mirror_id}' → {result.response_status_code}") |
| return {"ok": True} |
|
|
| @app.post("/mirror/poll", response_model=Optional[APIJob], summary="Long-poll: blocks until a job is available") |
| async def long_poll( |
| claim: ClaimRequest, |
| timeout: float = 20.0, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| """ |
| Holds the connection open until a matching job appears or timeout expires. |
| Returns the claimed job immediately when one is available, or null on timeout. |
| """ |
| _check_auth(x_api_key) |
| deadline = time.time() + min(timeout, 250.0) |
|
|
| while time.time() < deadline: |
| now = time.time() |
| async with _lock: |
| for job in _jobs.values(): |
| if job.status != JobStatus.PENDING: |
| continue |
| if now - job.created_at > job.ttl: |
| continue |
| if job.target_mirror and job.target_mirror != claim.mirror_id: |
| continue |
| if not any(job.endpoint.startswith(ep) for ep in claim.available_endpoints): |
| continue |
| |
| job.status = JobStatus.CLAIMED |
| job.claimed_at = now |
| log.info(f"Job {job.job_id} claimed by '{claim.mirror_id}' via long-poll") |
| return job |
|
|
| await asyncio.sleep(0.3) |
|
|
| return None |
|
|
|
|
| @app.api_route("/proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) |
| async def proxy( |
| path: str, |
| request: Request, |
| x_api_key: Optional[str] = Header(default=None), |
| authorization: Optional[str] = Header(default=None), |
| ): |
| """ |
| Transparent proxy endpoint. Accepts any HTTP call, queues it internally, |
| waits for a mirror to process it, and returns the result directly. |
| |
| Caller just does: |
| requests.post("https://your-space.hf.space/proxy/api/giulia/infer", json=...) |
| """ |
| _check_auth(x_api_key, authorization) |
|
|
| |
| try: |
| body = await request.json() |
| except Exception: |
| body = None |
|
|
| |
| clean_headers = { |
| k: v for k, v in request.headers.items() |
| if k.lower() not in STRIP_HEADERS |
| } |
| |
| job = APIJob( |
| method=request.method, |
| endpoint="/" + path, |
| headers=clean_headers, |
| query_params=dict(request.query_params), |
| body=body, |
| ttl=JOB_TTL, |
| ) |
|
|
| async with _lock: |
| _jobs[job.job_id] = job |
|
|
| log.info(f"Proxy job {job.job_id}: {job.method} /{path}") |
|
|
| |
| deadline = time.time() + JOB_TTL |
| while time.time() < deadline: |
| async with _lock: |
| j = _jobs[job.job_id] |
| if j.status == JobStatus.COMPLETED: |
| return Response( |
| content=json.dumps(j.response_body), |
| status_code=j.response_status_code, |
| headers={k: v for k, v in j.response_headers.items() |
| if k.lower() not in STRIP_HEADERS}, |
| media_type="application/json", |
| ) |
| if j.status in (JobStatus.FAILED, JobStatus.TIMEOUT): |
| raise HTTPException( |
| status_code=502, |
| detail=j.error or "Job timed out waiting for a mirror" |
| ) |
| await asyncio.sleep(0.3) |
|
|
| raise HTTPException(status_code=504, detail="Gateway timeout") |
|
|
|
|
| @app.api_route("/proxy_stream/{path:path}", methods=["GET", "POST"]) |
| async def proxy_stream( |
| path: str, |
| request: Request, |
| x_api_key: Optional[str] = Header(default=None), |
| authorization: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key, authorization) |
|
|
| try: |
| body = await request.json() |
| except Exception: |
| body = None |
|
|
| clean_headers = { |
| k: v for k, v in request.headers.items() |
| if k.lower() not in STRIP_HEADERS |
| } |
|
|
| job = APIJob( |
| method=request.method, |
| endpoint="/" + path, |
| headers=clean_headers, |
| query_params=dict(request.query_params), |
| body=body, |
| ttl=JOB_TTL, |
| ) |
|
|
| queue = asyncio.Queue() |
| async with _lock: |
| _jobs[job.job_id] = job |
| async with _stream_lock: |
| _stream_queues[job.job_id] = queue |
|
|
| log.info(f"Stream proxy job {job.job_id}: {job.method} /{path}") |
|
|
| async def event_generator(): |
| try: |
| while True: |
| try: |
| chunk = await asyncio.wait_for(queue.get(), timeout=JOB_TTL) |
| except asyncio.TimeoutError: |
| break |
| if chunk is None: |
| break |
| yield chunk |
| finally: |
| async with _stream_lock: |
| _stream_queues.pop(job.job_id, None) |
|
|
| return StreamingResponse( |
| event_generator(), |
| media_type="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "X-Accel-Buffering": "no", |
| "Connection": "keep-alive", |
| }, |
| ) |
|
|
|
|
| @app.post("/mirror/stream_chunk", summary="Mirror pushes an SSE chunk") |
| async def stream_chunk( |
| request: Request, |
| job_id: str, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key) |
| chunk = (await request.body()).decode() |
| async with _stream_lock: |
| queue = _stream_queues.get(job_id) |
| if not queue: |
| raise HTTPException(status_code=404, detail="No active stream for this job") |
| await queue.put(chunk) |
| return {"ok": True} |
|
|
|
|
| @app.post("/mirror/stream_end", summary="Mirror signals stream is complete") |
| async def stream_end( |
| job_id: str, |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key) |
| async with _stream_lock: |
| queue = _stream_queues.get(job_id) |
| if queue: |
| await queue.put(None) |
| return {"ok": True} |
|
|
| |
| |
| |
| |
|
|
| @app.get("/health") |
| async def health(): |
| async with _lock: |
| counts = {s.value: 0 for s in JobStatus} |
| for job in _jobs.values(): |
| counts[job.status.value] += 1 |
| return {"status": "ok", "jobs": counts} |
|
|
|
|
| @app.get("/jobs", summary="List all jobs (debug)") |
| async def list_jobs( |
| x_api_key: Optional[str] = Header(default=None), |
| ): |
| _check_auth(x_api_key) |
| async with _lock: |
| return list(_jobs.values()) |
|
|