File size: 3,456 Bytes
2fd8593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
Rate Limiter for OpenAI API to avoid hitting TPM (tokens per minute) limits.
"""
import time
from typing import List, Tuple

class RateLimiter:
    """Smart rate limiter that tracks token usage and sleeps only when necessary."""
    
    def __init__(self, max_tokens_per_minute: int = 95000, buffer: int = 5000):
        """
        Initialize the rate limiter.
        
        Args:
            max_tokens_per_minute: Maximum tokens allowed per minute (default: 95K for safety)
            buffer: Safety buffer to stay under limit (default: 5K)
        """
        self.max_tokens = max_tokens_per_minute - buffer
        self.tokens_used: List[Tuple[float, int]] = []  # [(timestamp, tokens), ...]
        self.total_waits = 0
        self.total_wait_time = 0.0
    
    def wait_if_needed(self, tokens_needed: int) -> None:
        """
        Check if we need to wait before making the next API call.
        
        Args:
            tokens_needed: Estimated tokens for the next API call
        """
        now = time.time()
        
        # Remove tokens older than 60 seconds (sliding window)
        self.tokens_used = [
            (ts, tok) for ts, tok in self.tokens_used 
            if now - ts < 60
        ]
        
        # Calculate tokens used in last 60 seconds
        tokens_in_window = sum(tok for _, tok in self.tokens_used)
        
        # If adding new request would exceed limit, wait
        if tokens_in_window + tokens_needed > self.max_tokens:
            # Calculate how long to wait
            oldest_timestamp = self.tokens_used[0][0]
            wait_time = 60 - (now - oldest_timestamp) + 1  # +1 for safety
            
            print(f"⏰ Rate limit approaching ({tokens_in_window + tokens_needed}/{self.max_tokens} tokens)")
            print(f"   Waiting {wait_time:.1f}s for rate limit window to reset...")
            
            time.sleep(wait_time)
            self.total_waits += 1
            self.total_wait_time += wait_time
            
            # Clear old tokens after waiting
            now = time.time()
            self.tokens_used = [
                (ts, tok) for ts, tok in self.tokens_used 
                if now - ts < 60
            ]
        
        # Record this request
        self.tokens_used.append((now, tokens_needed))
    
    def get_stats(self) -> dict:
        """Get statistics about rate limiting."""
        return {
            'total_waits': self.total_waits,
            'total_wait_time': self.total_wait_time,
            'current_window_tokens': sum(tok for _, tok in self.tokens_used)
        }
    
    def print_stats(self) -> None:
        """Print rate limiting statistics."""
        stats = self.get_stats()
        print("\n" + "="*50)
        print("📊 Rate Limiter Statistics")
        print("="*50)
        print(f"Total waits: {stats['total_waits']}")
        print(f"Total wait time: {stats['total_wait_time']:.1f}s")
        print(f"Current window usage: {stats['current_window_tokens']} tokens")
        print("="*50 + "\n")


def estimate_tokens(text: str, overhead: int = 800) -> int:
    """
    Estimate tokens for a text string.
    
    Args:
        text: Input text
        overhead: Additional tokens for system prompts, formatting, etc.
    
    Returns:
        Estimated token count
    """
    # Rough estimation: 1 token ≈ 4 characters
    content_tokens = len(str(text)) // 4
    return content_tokens + overhead