File size: 5,787 Bytes
dc71cad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
telemetry/rate_limiter.py
──────────────────────────
Rate limiting + request queue depth monitoring for the FastAPI API.

Uses slowapi (Starlette-compatible limiter) with Redis backend.
Falls back to in-memory storage when Redis is unavailable.

Limits:
  POST /api/solve:        5 requests/minute per IP
  GET  /api/task/{id}:   60 requests/minute per IP
  WS   /ws/{id}:         10 connections/minute per IP
  GET  /api/metrics:      30 requests/minute per IP

Why rate limiting?
  - GPT-4o costs $5/1M tokens; unconstrained API = runaway costs
  - Sandbox execution uses CPU + memory; need to bound concurrency
  - Public demo must be resilient to abuse/crawlers
"""
from __future__ import annotations

import logging
import time
from collections import defaultdict, deque
from typing import Optional

logger = logging.getLogger(__name__)


# ── Sliding window rate limiter (in-memory fallback) ──────────────────────────

class SlidingWindowRateLimiter:
    """
    Token-bucket / sliding window rate limiter.
    Thread-safe for single-process deployments.
    For multi-process, use Redis-backed slowapi instead.
    """

    def __init__(self, requests: int, window_seconds: int):
        self.limit = requests
        self.window = window_seconds
        self._buckets: dict[str, deque] = defaultdict(deque)

    def is_allowed(self, key: str) -> bool:
        """Return True if request is allowed, False if rate-limited."""
        now = time.monotonic()
        bucket = self._buckets[key]

        # Remove expired timestamps
        cutoff = now - self.window
        while bucket and bucket[0] < cutoff:
            bucket.popleft()

        if len(bucket) >= self.limit:
            return False

        bucket.append(now)
        return True

    def remaining(self, key: str) -> int:
        """Return how many requests remain in the current window."""
        now = time.monotonic()
        bucket = self._buckets[key]
        cutoff = now - self.window
        active = sum(1 for t in bucket if t > cutoff)
        return max(0, self.limit - active)

    def reset_for(self, key: str) -> None:
        """Clear rate limit for a key (admin use)."""
        self._buckets.pop(key, None)

    def stats(self) -> dict:
        return {
            "limit": self.limit,
            "window_seconds": self.window,
            "tracked_keys": len(self._buckets),
        }


# ── Shared limiters ───────────────────────────────────────────────────────────

# In-memory fallback limiters (used when Redis/slowapi not available)
SOLVE_LIMITER   = SlidingWindowRateLimiter(requests=5,  window_seconds=60)
QUERY_LIMITER   = SlidingWindowRateLimiter(requests=60, window_seconds=60)
WS_LIMITER      = SlidingWindowRateLimiter(requests=10, window_seconds=60)
METRICS_LIMITER = SlidingWindowRateLimiter(requests=30, window_seconds=60)


# ── SlowAPI integration helper ─────────────────────────────────────────────────

def setup_slowapi(app, redis_url: str = "redis://localhost:6379/2") -> Optional[object]:
    """
    Attach slowapi rate limiter to a FastAPI app.
    Returns the limiter instance, or None if slowapi is unavailable.
    """
    try:
        from slowapi import Limiter, _rate_limit_exceeded_handler
        from slowapi.util import get_remote_address
        from slowapi.errors import RateLimitExceeded

        storage_uri = redis_url if redis_url else "memory://"
        limiter = Limiter(
            key_func=get_remote_address,
            default_limits=["100/minute"],
            storage_uri=storage_uri,
        )
        app.state.limiter = limiter
        app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
        logger.info("SlowAPI rate limiter attached (storage: %s)", storage_uri)
        return limiter

    except ImportError:
        logger.debug("slowapi not installed β€” using in-memory rate limiter")
        return None


# ── Queue depth monitor ───────────────────────────────────────────────────────

class QueueDepthMonitor:
    """
    Tracks running and queued task counts.
    Exposes metrics for the Grafana dashboard.
    """

    def __init__(self, max_concurrent: int = 5):
        self.max_concurrent = max_concurrent
        self._running: int = 0
        self._queued: int = 0
        self._completed: int = 0
        self._rejected: int = 0

    def task_queued(self) -> bool:
        """Returns True if task was accepted, False if queue is full."""
        if self._running >= self.max_concurrent:
            self._rejected += 1
            return False
        self._queued += 1
        return True

    def task_started(self) -> None:
        self._queued = max(0, self._queued - 1)
        self._running += 1

    def task_finished(self) -> None:
        self._running = max(0, self._running - 1)
        self._completed += 1

    @property
    def is_at_capacity(self) -> bool:
        return self._running >= self.max_concurrent

    def snapshot(self) -> dict:
        return {
            "running": self._running,
            "queued": self._queued,
            "completed": self._completed,
            "rejected": self._rejected,
            "capacity": self.max_concurrent,
            "utilisation_pct": round(self._running / self.max_concurrent * 100, 1),
        }


# Singleton
QUEUE_MONITOR = QueueDepthMonitor(max_concurrent=5)