""" Performance optimization utilities for API calls and UI rendering """ import time import functools from typing import Callable, Any from src.core.logging import log_operation def measure_execution_time(func: Callable) -> Callable: """ Decorator to measure and log the execution time of functions """ @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() execution_time_ms = (end_time - start_time) * 1000 # Log the execution time log_operation( f"EXECUTION_TIME_{func.__name__.upper()}", task_id=int(execution_time_ms) if execution_time_ms < 1000000 else None # Only if it fits as task_id ) print(f"{func.__name__} executed in {execution_time_ms:.2f} ms") # Log warning if execution time is too high if execution_time_ms > 200: # Threshold of 200ms log_operation( f"SLOW_EXECUTION_{func.__name__.upper()}", details=f"Execution took {execution_time_ms:.2f} ms" ) return result return wrapper def cache_result(expiration_time: int = 300): """ Decorator to cache function results for a specified time (in seconds) """ def decorator(func: Callable) -> Callable: cache = {} @functools.wraps(func) def wrapper(*args, **kwargs): # Create a cache key based on function name and arguments cache_key = f"{func.__name__}_{hash(str(args) + str(kwargs))}" current_time = time.time() # Check if result is cached and not expired if cache_key in cache: result, timestamp = cache[cache_key] if current_time - timestamp < expiration_time: return result # Execute the function and cache the result result = func(*args, **kwargs) cache[cache_key] = (result, current_time) return result return wrapper return decorator def batch_process(items: list, batch_size: int = 10): """ Process items in batches to optimize performance """ for i in range(0, len(items), batch_size): yield items[i:i + batch_size] def optimize_database_queries(): """ Utility function to apply database query optimizations """ # This would typically configure database connection pooling, query optimization settings # For now, we'll just return a success message log_operation("DATABASE_OPTIMIZATIONS_APPLIED") return { "connection_pooling": "enabled", "query_batching": "available", "caching": "configured" } def throttle_requests(max_requests_per_minute: int = 1000): """ Decorator to throttle requests to prevent overwhelming the system """ def decorator(func: Callable) -> Callable: request_times = [] @functools.wraps(func) def wrapper(*args, **kwargs): current_time = time.time() # Remove requests older than 1 minute request_times[:] = [req_time for req_time in request_times if current_time - req_time < 60] # Check if we've exceeded the limit if len(request_times) >= max_requests_per_minute: raise Exception(f"Rate limit exceeded: {max_requests_per_minute} requests per minute") # Add current request time request_times.append(current_time) return func(*args, **kwargs) return wrapper return decorator def lazy_load_data(load_func: Callable, threshold: int = 100): """ Utility to implement lazy loading for large datasets """ def wrapper(*args, **kwargs): # If the dataset is small, load everything result = load_func(*args, **kwargs) if isinstance(result, list) and len(result) > threshold: # For large datasets, implement pagination or chunking return { "data": result[:threshold], # Return first chunk "has_more": True, "total_count": len(result) } else: return result return wrapper def debounce(wait_time: float = 0.3): """ Decorator to debounce function calls (useful for UI events) """ def decorator(func: Callable) -> Callable: timer = None @functools.wraps(func) def debounced(*args, **kwargs): nonlocal timer if timer: # Cancel previous timer timer.cancel() # Set new timer import threading timer = threading.Timer(wait_time, lambda: func(*args, **kwargs)) timer.start() return debounced return decorator def memoize(func: Callable) -> Callable: """ Simple memoization decorator to cache function results based on arguments """ cache = {} @functools.wraps(func) def wrapper(*args, **kwargs): # Create a key from the function arguments key = str(args) + str(sorted(kwargs.items())) if key in cache: return cache[key] result = func(*args, **kwargs) cache[key] = result return result return wrapper def apply_performance_optimizations(): """ Apply all performance optimizations to the application """ log_operation("APPLYING_PERFORMANCE_OPTIMIZATIONS") optimizations = { "execution_time_monitoring": "enabled", "result_caching": "configured", "request_throttling": "set_to_1000_per_minute", "database_optimizations": optimize_database_queries(), "lazy_loading_threshold": 100, "debounce_defaults": 0.3 } log_operation("PERFORMANCE_OPTIMIZATIONS_APPLIED") return optimizations