Spaces:
Running
Running
File size: 5,787 Bytes
dc71cad | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """
telemetry/rate_limiter.py
ββββββββββββββββββββββββββ
Rate limiting + request queue depth monitoring for the FastAPI API.
Uses slowapi (Starlette-compatible limiter) with Redis backend.
Falls back to in-memory storage when Redis is unavailable.
Limits:
POST /api/solve: 5 requests/minute per IP
GET /api/task/{id}: 60 requests/minute per IP
WS /ws/{id}: 10 connections/minute per IP
GET /api/metrics: 30 requests/minute per IP
Why rate limiting?
- GPT-4o costs $5/1M tokens; unconstrained API = runaway costs
- Sandbox execution uses CPU + memory; need to bound concurrency
- Public demo must be resilient to abuse/crawlers
"""
from __future__ import annotations
import logging
import time
from collections import defaultdict, deque
from typing import Optional
logger = logging.getLogger(__name__)
# ββ Sliding window rate limiter (in-memory fallback) ββββββββββββββββββββββββββ
class SlidingWindowRateLimiter:
"""
Token-bucket / sliding window rate limiter.
Thread-safe for single-process deployments.
For multi-process, use Redis-backed slowapi instead.
"""
def __init__(self, requests: int, window_seconds: int):
self.limit = requests
self.window = window_seconds
self._buckets: dict[str, deque] = defaultdict(deque)
def is_allowed(self, key: str) -> bool:
"""Return True if request is allowed, False if rate-limited."""
now = time.monotonic()
bucket = self._buckets[key]
# Remove expired timestamps
cutoff = now - self.window
while bucket and bucket[0] < cutoff:
bucket.popleft()
if len(bucket) >= self.limit:
return False
bucket.append(now)
return True
def remaining(self, key: str) -> int:
"""Return how many requests remain in the current window."""
now = time.monotonic()
bucket = self._buckets[key]
cutoff = now - self.window
active = sum(1 for t in bucket if t > cutoff)
return max(0, self.limit - active)
def reset_for(self, key: str) -> None:
"""Clear rate limit for a key (admin use)."""
self._buckets.pop(key, None)
def stats(self) -> dict:
return {
"limit": self.limit,
"window_seconds": self.window,
"tracked_keys": len(self._buckets),
}
# ββ Shared limiters βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# In-memory fallback limiters (used when Redis/slowapi not available)
SOLVE_LIMITER = SlidingWindowRateLimiter(requests=5, window_seconds=60)
QUERY_LIMITER = SlidingWindowRateLimiter(requests=60, window_seconds=60)
WS_LIMITER = SlidingWindowRateLimiter(requests=10, window_seconds=60)
METRICS_LIMITER = SlidingWindowRateLimiter(requests=30, window_seconds=60)
# ββ SlowAPI integration helper βββββββββββββββββββββββββββββββββββββββββββββββββ
def setup_slowapi(app, redis_url: str = "redis://localhost:6379/2") -> Optional[object]:
"""
Attach slowapi rate limiter to a FastAPI app.
Returns the limiter instance, or None if slowapi is unavailable.
"""
try:
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
storage_uri = redis_url if redis_url else "memory://"
limiter = Limiter(
key_func=get_remote_address,
default_limits=["100/minute"],
storage_uri=storage_uri,
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
logger.info("SlowAPI rate limiter attached (storage: %s)", storage_uri)
return limiter
except ImportError:
logger.debug("slowapi not installed β using in-memory rate limiter")
return None
# ββ Queue depth monitor βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class QueueDepthMonitor:
"""
Tracks running and queued task counts.
Exposes metrics for the Grafana dashboard.
"""
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self._running: int = 0
self._queued: int = 0
self._completed: int = 0
self._rejected: int = 0
def task_queued(self) -> bool:
"""Returns True if task was accepted, False if queue is full."""
if self._running >= self.max_concurrent:
self._rejected += 1
return False
self._queued += 1
return True
def task_started(self) -> None:
self._queued = max(0, self._queued - 1)
self._running += 1
def task_finished(self) -> None:
self._running = max(0, self._running - 1)
self._completed += 1
@property
def is_at_capacity(self) -> bool:
return self._running >= self.max_concurrent
def snapshot(self) -> dict:
return {
"running": self._running,
"queued": self._queued,
"completed": self._completed,
"rejected": self._rejected,
"capacity": self.max_concurrent,
"utilisation_pct": round(self._running / self.max_concurrent * 100, 1),
}
# Singleton
QUEUE_MONITOR = QueueDepthMonitor(max_concurrent=5)
|