fe / bt-source /panel /mod /base /ssh_executor /rate_limiter.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
"""
Rate limiter for file transfers with clean separation of concerns.
NOTE: These rate limiters are NOT thread-safe by design.
Rate limiting operations are typically single-threaded, and removing
threading overhead improves performance.
"""
import time
from typing import Optional, Callable
class TokenBucketRateLimiter:
"""
Token bucket rate limiter for controlling transfer speed with burst support.
NOTE: This class is NOT thread-safe. Rate limiting operations are typically
single-threaded, and removing threading overhead improves performance.
The token bucket algorithm allows for:
- Sustained rate limiting (tokens per second)
- Burst control (bucket capacity)
- Peak rate control
Usage:
limiter = TokenBucketRateLimiter(
rate=1024*1024, # 1MB/s sustained rate
capacity=2*1024*1024 # 2MB burst capacity
)
for chunk in data_chunks:
limiter.wait_if_needed(len(chunk))
# transfer chunk
"""
def __init__(self,
rate: Optional[int] = None,
capacity: Optional[int] = None,
initial_tokens: Optional[int] = None):
"""
Initialize token bucket rate limiter.
Args:
rate: Tokens (bytes) per second. None means no rate limiting.
capacity: Maximum bucket capacity in bytes. If None, defaults to rate.
initial_tokens: Initial tokens in bucket. If None, defaults to capacity.
"""
self.rate = rate
self.capacity = capacity if capacity is not None else rate
self.initial_tokens = initial_tokens if initial_tokens is not None else self.capacity
# Current state
self.tokens = self.initial_tokens
self.last_update = None
self._started = False
def start(self):
"""Start timing for rate limiting."""
if self.rate and not self._started:
self._started = True
self.last_update = time.time()
self.tokens = self.initial_tokens
def wait_if_needed(self, chunk_size: int):
"""
Wait if necessary to maintain the specified transfer rate.
Args:
chunk_size: Size of the chunk just transferred in bytes.
"""
if not self.rate or not self.last_update:
return
# Refill tokens based on elapsed time
now = time.time()
elapsed = now - self.last_update
tokens_to_add = elapsed * self.rate
# Add tokens up to capacity
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_update = now
# Check if we have enough tokens
if self.tokens < chunk_size:
# Calculate wait time to get enough tokens
tokens_needed = chunk_size - self.tokens
wait_time = tokens_needed / self.rate
time.sleep(wait_time)
# Refill tokens after waiting
now = time.time()
elapsed = now - self.last_update
tokens_to_add = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_update = now
# Consume tokens
self.tokens -= chunk_size
class LeakyBucketRateLimiter:
"""
Leaky bucket rate limiter for strict rate limiting without burst support.
NOTE: This class is NOT thread-safe. Rate limiting operations are typically
single-threaded, and removing threading overhead improves performance.
The leaky bucket algorithm provides:
- Strict rate limiting (no burst)
- Predictable output rate
- Better for network protocols that can't handle bursts
Usage:
limiter = LeakyBucketRateLimiter(rate=1024*1024) # 1MB/s strict rate
for chunk in data_chunks:
limiter.wait_if_needed(len(chunk))
# transfer chunk
"""
def __init__(self, rate: Optional[int] = None):
"""
Initialize leaky bucket rate limiter.
Args:
rate: Tokens (bytes) per second. None means no rate limiting.
"""
self.rate = rate
self.last_update = None
self._started = False
def start(self):
"""Start timing for rate limiting."""
if self.rate and not self._started:
self.last_update = time.time()
def wait_if_needed(self, chunk_size: int):
"""
Wait if necessary to maintain the specified transfer rate.
Args:
chunk_size: Size of the chunk just transferred in bytes.
"""
if not self.rate or not self.last_update:
return
now = time.time()
elapsed = now - self.last_update
# Calculate minimum time needed for this chunk
min_time = chunk_size / self.rate
if elapsed < min_time:
wait_time = min_time - elapsed
time.sleep(wait_time)
self.last_update = time.time()
class RateLimiter:
"""
Main rate limiter class that provides a unified interface.
NOTE: This class is NOT thread-safe. Rate limiting operations are typically
single-threaded, and removing threading overhead improves performance.
This class can use different underlying algorithms:
- TokenBucketRateLimiter: For burst-capable rate limiting
- LeakyBucketRateLimiter: For strict rate limiting
"""
def __init__(self,
bytes_per_second: Optional[int] = None,
algorithm: str = "token_bucket",
burst_capacity: Optional[int] = None):
"""
Initialize rate limiter.
Args:
bytes_per_second: Transfer rate limit in bytes per second.
algorithm: Rate limiting algorithm ("token_bucket" or "leaky_bucket")
burst_capacity: For token bucket, maximum burst capacity in bytes.
If None, defaults to bytes_per_second.
"""
if algorithm == "token_bucket":
self._limiter = TokenBucketRateLimiter(
rate=bytes_per_second,
capacity=burst_capacity
)
elif algorithm == "leaky_bucket":
self._limiter = LeakyBucketRateLimiter(bytes_per_second)
else:
raise ValueError(f"Unknown algorithm: {algorithm}. Use 'token_bucket' or 'leaky_bucket'")
self.bytes_per_second = bytes_per_second
self.algorithm = algorithm
def start(self):
"""Start timing for rate limiting."""
self._limiter.start()
def wait_if_needed(self, chunk_size: int):
"""
Wait if necessary to maintain the specified transfer rate.
Args:
chunk_size: Size of the chunk just transferred in bytes.
"""
self._limiter.wait_if_needed(chunk_size)
class ProgressTracker:
"""
Simple progress tracking with configurable update frequency.
NOTE: This class is NOT thread-safe. Progress tracking operations are typically
single-threaded, and removing threading overhead improves performance.
"""
def __init__(self,
callback: Optional[Callable[[int, int], None]] = None,
update_interval: float = 0.1):
"""
Initialize progress tracker.
Args:
callback: Function to call with (transferred, total) progress updates
update_interval: Minimum seconds between progress updates
"""
self.callback = callback
self.update_interval = update_interval
self.last_update_time = 0
def start(self):
"""Start progress tracking."""
self.last_update_time = 0
def update(self, transferred: int, total: int):
"""
Update progress if enough time has passed since last update.
Args:
transferred: Bytes transferred so far
total: Total bytes to transfer
"""
if not self.callback:
return
current_time = time.time()
if current_time - self.last_update_time >= self.update_interval:
self.callback(transferred, total)
self.last_update_time = current_time
def finish(self, total: int):
"""Force final progress update."""
if self.callback:
self.callback(total, total)