File size: 8,616 Bytes
17e971c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | """
Rate limiter for file transfers with clean separation of concerns.
NOTE: These rate limiters are NOT thread-safe by design.
Rate limiting operations are typically single-threaded, and removing
threading overhead improves performance.
"""
import time
from typing import Optional, Callable
class TokenBucketRateLimiter:
"""
Token bucket rate limiter for controlling transfer speed with burst support.
NOTE: This class is NOT thread-safe. Rate limiting operations are typically
single-threaded, and removing threading overhead improves performance.
The token bucket algorithm allows for:
- Sustained rate limiting (tokens per second)
- Burst control (bucket capacity)
- Peak rate control
Usage:
limiter = TokenBucketRateLimiter(
rate=1024*1024, # 1MB/s sustained rate
capacity=2*1024*1024 # 2MB burst capacity
)
for chunk in data_chunks:
limiter.wait_if_needed(len(chunk))
# transfer chunk
"""
def __init__(self,
rate: Optional[int] = None,
capacity: Optional[int] = None,
initial_tokens: Optional[int] = None):
"""
Initialize token bucket rate limiter.
Args:
rate: Tokens (bytes) per second. None means no rate limiting.
capacity: Maximum bucket capacity in bytes. If None, defaults to rate.
initial_tokens: Initial tokens in bucket. If None, defaults to capacity.
"""
self.rate = rate
self.capacity = capacity if capacity is not None else rate
self.initial_tokens = initial_tokens if initial_tokens is not None else self.capacity
# Current state
self.tokens = self.initial_tokens
self.last_update = None
self._started = False
def start(self):
"""Start timing for rate limiting."""
if self.rate and not self._started:
self._started = True
self.last_update = time.time()
self.tokens = self.initial_tokens
def wait_if_needed(self, chunk_size: int):
"""
Wait if necessary to maintain the specified transfer rate.
Args:
chunk_size: Size of the chunk just transferred in bytes.
"""
if not self.rate or not self.last_update:
return
# Refill tokens based on elapsed time
now = time.time()
elapsed = now - self.last_update
tokens_to_add = elapsed * self.rate
# Add tokens up to capacity
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_update = now
# Check if we have enough tokens
if self.tokens < chunk_size:
# Calculate wait time to get enough tokens
tokens_needed = chunk_size - self.tokens
wait_time = tokens_needed / self.rate
time.sleep(wait_time)
# Refill tokens after waiting
now = time.time()
elapsed = now - self.last_update
tokens_to_add = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_update = now
# Consume tokens
self.tokens -= chunk_size
class LeakyBucketRateLimiter:
"""
Leaky bucket rate limiter for strict rate limiting without burst support.
NOTE: This class is NOT thread-safe. Rate limiting operations are typically
single-threaded, and removing threading overhead improves performance.
The leaky bucket algorithm provides:
- Strict rate limiting (no burst)
- Predictable output rate
- Better for network protocols that can't handle bursts
Usage:
limiter = LeakyBucketRateLimiter(rate=1024*1024) # 1MB/s strict rate
for chunk in data_chunks:
limiter.wait_if_needed(len(chunk))
# transfer chunk
"""
def __init__(self, rate: Optional[int] = None):
"""
Initialize leaky bucket rate limiter.
Args:
rate: Tokens (bytes) per second. None means no rate limiting.
"""
self.rate = rate
self.last_update = None
self._started = False
def start(self):
"""Start timing for rate limiting."""
if self.rate and not self._started:
self.last_update = time.time()
def wait_if_needed(self, chunk_size: int):
"""
Wait if necessary to maintain the specified transfer rate.
Args:
chunk_size: Size of the chunk just transferred in bytes.
"""
if not self.rate or not self.last_update:
return
now = time.time()
elapsed = now - self.last_update
# Calculate minimum time needed for this chunk
min_time = chunk_size / self.rate
if elapsed < min_time:
wait_time = min_time - elapsed
time.sleep(wait_time)
self.last_update = time.time()
class RateLimiter:
"""
Main rate limiter class that provides a unified interface.
NOTE: This class is NOT thread-safe. Rate limiting operations are typically
single-threaded, and removing threading overhead improves performance.
This class can use different underlying algorithms:
- TokenBucketRateLimiter: For burst-capable rate limiting
- LeakyBucketRateLimiter: For strict rate limiting
"""
def __init__(self,
bytes_per_second: Optional[int] = None,
algorithm: str = "token_bucket",
burst_capacity: Optional[int] = None):
"""
Initialize rate limiter.
Args:
bytes_per_second: Transfer rate limit in bytes per second.
algorithm: Rate limiting algorithm ("token_bucket" or "leaky_bucket")
burst_capacity: For token bucket, maximum burst capacity in bytes.
If None, defaults to bytes_per_second.
"""
if algorithm == "token_bucket":
self._limiter = TokenBucketRateLimiter(
rate=bytes_per_second,
capacity=burst_capacity
)
elif algorithm == "leaky_bucket":
self._limiter = LeakyBucketRateLimiter(bytes_per_second)
else:
raise ValueError(f"Unknown algorithm: {algorithm}. Use 'token_bucket' or 'leaky_bucket'")
self.bytes_per_second = bytes_per_second
self.algorithm = algorithm
def start(self):
"""Start timing for rate limiting."""
self._limiter.start()
def wait_if_needed(self, chunk_size: int):
"""
Wait if necessary to maintain the specified transfer rate.
Args:
chunk_size: Size of the chunk just transferred in bytes.
"""
self._limiter.wait_if_needed(chunk_size)
class ProgressTracker:
"""
Simple progress tracking with configurable update frequency.
NOTE: This class is NOT thread-safe. Progress tracking operations are typically
single-threaded, and removing threading overhead improves performance.
"""
def __init__(self,
callback: Optional[Callable[[int, int], None]] = None,
update_interval: float = 0.1):
"""
Initialize progress tracker.
Args:
callback: Function to call with (transferred, total) progress updates
update_interval: Minimum seconds between progress updates
"""
self.callback = callback
self.update_interval = update_interval
self.last_update_time = 0
def start(self):
"""Start progress tracking."""
self.last_update_time = 0
def update(self, transferred: int, total: int):
"""
Update progress if enough time has passed since last update.
Args:
transferred: Bytes transferred so far
total: Total bytes to transfer
"""
if not self.callback:
return
current_time = time.time()
if current_time - self.last_update_time >= self.update_interval:
self.callback(transferred, total)
self.last_update_time = current_time
def finish(self, total: int):
"""Force final progress update."""
if self.callback:
self.callback(total, total)
|