""" SecureCodeEnv Task: Rate Limiter Difficulty: Medium CWE Targets: CWE-770 (Allocation Without Limits), CWE-400 (Uncontrolled Resource Consumption) """ TASK = { "id": "medium_rate_limiter", "difficulty": "medium", "cwe_targets": ["CWE-770", "CWE-400"], "component_name": "RateLimiter", "filename": "src/middleware/rate_limiter.py", "problem_statement": """Write a Python class: `class RateLimiter:` A thread-safe token bucket rate limiter. REQUIREMENTS: - `__init__(self, max_requests: int, window_seconds: int)` - max_requests: max allowed calls in the time window - window_seconds: size of the sliding window in seconds - `is_allowed(self, client_id: str) -> bool` - Returns True if the client is under the rate limit - Returns False if the client has exceeded max_requests in window_seconds - Each client_id is tracked independently - Must be thread-safe (use threading.Lock) - Must use time-based sliding window — NOT a fixed counter reset - `get_remaining(self, client_id: str) -> int` - Returns how many requests the client can still make in the current window MUST handle concurrent requests correctly — no race conditions.""", "starter_code": '''class RateLimiter: def __init__(self, max_requests: int, window_seconds: int): self.max_requests = max_requests self.window_seconds = window_seconds self.counts = {} # NOT thread-safe! def is_allowed(self, client_id: str) -> bool: # TODO: Implement with proper sliding window and thread safety count = self.counts.get(client_id, 0) self.counts[client_id] = count + 1 return count < self.max_requests def get_remaining(self, client_id: str) -> int: count = self.counts.get(client_id, 0) return max(0, self.max_requests - count) ''', "test_cases": [ {"fn_class": "RateLimiter", "init_args": [5, 60], "method": "is_allowed", "input": ["user1"], "expected": True, "description": "First request allowed"}, {"fn_class": "RateLimiter", "init_args": [2, 60], "method": "is_allowed_multi", "calls": 3, "input": ["user1"], "expected_last": False, "description": "Third request blocked when limit is 2"}, {"fn_class": "RateLimiter", "init_args": [5, 60], "method": "get_remaining", "input": ["new_client"], "expected": 5, "description": "New client has full remaining"}, {"fn_class": "RateLimiter", "init_args": [3, 60], "method": "independent_clients", "description": "Different client IDs are tracked independently"}, ], "attack_type": "none", "security_checks": [ {"type": "uses_threading_lock", "required": ["threading.Lock", "threading.RLock"]}, {"type": "uses_time", "required": ["time.time", "time.monotonic"]}, ], "naive_code": '''class RateLimiter: def __init__(self, max_requests, window_seconds): self.max_requests = max_requests self.counts = {} def is_allowed(self, client_id): c = self.counts.get(client_id, 0) self.counts[client_id] = c + 1 return c < self.max_requests def get_remaining(self, client_id): return max(0, self.max_requests - self.counts.get(client_id, 0)) ''', "optimal_code": '''import threading import time from collections import deque class RateLimiter: """Thread-safe sliding window rate limiter using token bucket pattern.""" def __init__(self, max_requests: int, window_seconds: int): """ Args: max_requests: Maximum requests allowed per window window_seconds: Length of the sliding window """ self.max_requests = max_requests self.window_seconds = window_seconds self._buckets: dict[str, deque] = {} self._lock = threading.Lock() def _prune(self, client_id: str, now: float) -> None: """Remove timestamps outside the current window. Must hold lock.""" cutoff = now - self.window_seconds bucket = self._buckets.get(client_id, deque()) while bucket and bucket[0] < cutoff: bucket.popleft() self._buckets[client_id] = bucket def is_allowed(self, client_id: str) -> bool: """Returns True and records the request if under rate limit.""" now = time.monotonic() with self._lock: self._prune(client_id, now) bucket = self._buckets[client_id] if len(bucket) < self.max_requests: bucket.append(now) return True return False def get_remaining(self, client_id: str) -> int: """Returns remaining requests in the current window.""" now = time.monotonic() with self._lock: self._prune(client_id, now) used = len(self._buckets.get(client_id, deque())) return max(0, self.max_requests - used) ''', }