| """ |
| Performance benchmarking and load testing utilities |
| """ |
| import time |
| import asyncio |
| import httpx |
| from typing import Dict, List, Any, Optional |
| from fastapi import APIRouter, HTTPException |
| from src.core.logging import structured_logger |
|
|
| router = APIRouter() |
|
|
| class PerformanceBenchmark: |
| """Performance benchmarking utilities""" |
|
|
| @staticmethod |
| async def benchmark_endpoint(endpoint: str, method: str = "GET", |
| headers: Optional[Dict[str, str]] = None, |
| data: Optional[Dict[str, Any]] = None, |
| auth_token: Optional[str] = None, |
| iterations: int = 100) -> Dict[str, Any]: |
| """ |
| Benchmark a single API endpoint |
| |
| Args: |
| endpoint: The API endpoint to benchmark |
| method: HTTP method (GET, POST, PUT, DELETE) |
| headers: Request headers |
| data: Request payload |
| auth_token: JWT token for authentication |
| iterations: Number of requests to send |
| |
| Returns: |
| Dictionary with benchmark results |
| """ |
| results = { |
| "endpoint": endpoint, |
| "method": method, |
| "iterations": iterations, |
| "total_time_ms": 0, |
| "average_time_ms": 0, |
| "min_time_ms": float('inf'), |
| "max_time_ms": 0, |
| "success_count": 0, |
| "error_count": 0, |
| "response_times": [], |
| "status_codes": {}, |
| "errors": [] |
| } |
|
|
| headers = headers or {} |
| if auth_token: |
| headers["Authorization"] = f"Bearer {auth_token}" |
|
|
| async with httpx.AsyncClient() as client: |
| tasks = [] |
|
|
| for i in range(iterations): |
| tasks.append( |
| PerformanceBenchmark._make_request( |
| client, endpoint, method, headers, data, results |
| ) |
| ) |
|
|
| await asyncio.gather(*tasks) |
|
|
| |
| if results["response_times"]: |
| results["average_time_ms"] = sum(results["response_times"]) / len(results["response_times"]) |
| results["min_time_ms"] = min(results["response_times"]) |
| results["max_time_ms"] = max(results["response_times"]) |
|
|
| return results |
|
|
| @staticmethod |
| async def _make_request(client: httpx.AsyncClient, endpoint: str, method: str, |
| headers: Dict[str, str], data: Optional[Dict[str, Any]], |
| results: Dict[str, Any]) -> None: |
| """Make a single HTTP request and record metrics""" |
| start_time = time.time() |
|
|
| try: |
| if method.upper() == "GET": |
| response = await client.get(endpoint, headers=headers) |
| elif method.upper() == "POST": |
| response = await client.post(endpoint, json=data, headers=headers) |
| elif method.upper() == "PUT": |
| response = await client.put(endpoint, json=data, headers=headers) |
| elif method.upper() == "DELETE": |
| response = await client.delete(endpoint, headers=headers) |
| else: |
| results["errors"].append(f"Unsupported method: {method}") |
| return |
|
|
| execution_time = (time.time() - start_time) * 1000 |
| results["response_times"].append(execution_time) |
| results["total_time_ms"] += execution_time |
|
|
| |
| status_code = response.status_code |
| results["status_codes"][status_code] = results["status_codes"].get(status_code, 0) + 1 |
|
|
| if 200 <= status_code < 300: |
| results["success_count"] += 1 |
| else: |
| results["error_count"] += 1 |
| results["errors"].append({ |
| "status_code": status_code, |
| "response": response.text[:200] |
| }) |
|
|
| except Exception as e: |
| execution_time = (time.time() - start_time) * 1000 |
| results["response_times"].append(execution_time) |
| results["total_time_ms"] += execution_time |
| results["error_count"] += 1 |
| results["errors"].append(str(e)) |
|
|
| @staticmethod |
| async def load_test(endpoints: List[Dict[str, Any]], concurrency: int = 10, |
| duration: int = 60) -> Dict[str, Any]: |
| """ |
| Perform load testing on multiple endpoints |
| |
| Args: |
| endpoints: List of endpoint configurations |
| concurrency: Number of concurrent requests |
| duration: Duration of the load test in seconds |
| |
| Returns: |
| Dictionary with load test results |
| """ |
| results = { |
| "test_duration": duration, |
| "concurrency": concurrency, |
| "total_requests": 0, |
| "successful_requests": 0, |
| "failed_requests": 0, |
| "average_response_time_ms": 0, |
| "min_response_time_ms": float('inf'), |
| "max_response_time_ms": 0, |
| "endpoint_stats": {}, |
| "errors": [] |
| } |
|
|
| async def worker(endpoint_config: Dict[str, Any]): |
| """Worker function for load testing""" |
| client = httpx.AsyncClient() |
| try: |
| while time.time() - start_time < duration: |
| await PerformanceBenchmark._make_request( |
| client, |
| endpoint_config["endpoint"], |
| endpoint_config["method"], |
| endpoint_config.get("headers", {}), |
| endpoint_config.get("data", None), |
| results |
| ) |
| await asyncio.sleep(0.01) |
| finally: |
| await client.aclose() |
|
|
| |
| start_time = time.time() |
| tasks = [] |
|
|
| for _ in range(concurrency): |
| for endpoint in endpoints: |
| tasks.append(worker(endpoint)) |
|
|
| await asyncio.gather(*tasks) |
|
|
| |
| if results["total_requests"] > 0: |
| results["average_response_time_ms"] = ( |
| sum(results["response_times"]) / results["total_requests"] |
| if results["response_times"] else 0 |
| ) |
| results["min_response_time_ms"] = min(results["response_times"]) if results["response_times"] else 0 |
| results["max_response_time_ms"] = max(results["response_times"]) if results["response_times"] else 0 |
|
|
| return results |
|
|
| class BenchmarkMiddleware: |
| """Middleware to automatically benchmark endpoints""" |
|
|
| def __init__(self, benchmark_config: Dict[str, Any]): |
| self.benchmark_config = benchmark_config |
| self.benchmarks = {} |
|
|
| async def __call__(self, request: Request, call_next: Callable) -> Response: |
| endpoint = request.url.path |
|
|
| |
| if endpoint in self.benchmark_config: |
| benchmark = self.benchmark_config[endpoint] |
| if benchmark.get("enabled", False): |
| return await self._benchmark_request(request, call_next, endpoint, benchmark) |
|
|
| return await call_next(request) |
|
|
| async def _benchmark_request(self, request: Request, call_next: Callable, |
| endpoint: str, benchmark: Dict[str, Any]) -> Response: |
| """Benchmark a single request""" |
| start_time = time.time() |
| user_id = request.headers.get("X-User-Id", "unknown") |
|
|
| try: |
| response = await call_next(request) |
| except Exception as e: |
| |
| structured_logger.log_metric( |
| "benchmark_error", |
| 1, |
| { |
| "endpoint": endpoint, |
| "user_id": user_id, |
| "error": str(e) |
| } |
| ) |
| raise |
|
|
| |
| execution_time_ms = (time.time() - start_time) * 1000 |
| status_code = response.status_code |
|
|
| |
| structured_logger.log_metric( |
| "endpoint_benchmark", |
| execution_time_ms, |
| { |
| "endpoint": endpoint, |
| "method": request.method, |
| "status_code": status_code, |
| "user_id": user_id, |
| "benchmark_name": benchmark.get("name", "default") |
| } |
| ) |
|
|
| return response |
|
|
| @router.post("/benchmark/endpoint") |
| async def benchmark_single_endpoint(benchmark_request: Dict[str, Any]) -> Dict[str, Any]: |
| """Endpoint to benchmark a single API endpoint""" |
| try: |
| |
| required_fields = ["endpoint", "method"] |
| for field in required_fields: |
| if field not in benchmark_request: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Missing required field: {field}" |
| ) |
|
|
| |
| endpoint = benchmark_request["endpoint"] |
| method = benchmark_request["method"] |
| headers = benchmark_request.get("headers", {}) |
| data = benchmark_request.get("data", None) |
| auth_token = benchmark_request.get("auth_token", None) |
| iterations = benchmark_request.get("iterations", 100) |
|
|
| |
| result = await PerformanceBenchmark.benchmark_endpoint( |
| endpoint, method, headers, data, auth_token, iterations |
| ) |
|
|
| |
| structured_logger.log_metric( |
| "benchmark_completed", |
| 1, |
| { |
| "endpoint": endpoint, |
| "iterations": iterations, |
| "success_rate": result["success_count"] / iterations if iterations > 0 else 0 |
| } |
| ) |
|
|
| return result |
|
|
| except Exception as e: |
| structured_logger.log_error(e, None, "benchmark") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Benchmark failed: {str(e)}" |
| ) |
|
|
| @router.post("/benchmark/load") |
| async def load_test_endpoint(load_test_request: Dict[str, Any]) -> Dict[str, Any]: |
| """Endpoint to perform load testing""" |
| try: |
| |
| required_fields = ["endpoints", "concurrency", "duration"] |
| for field in required_fields: |
| if field not in load_test_request: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Missing required field: {field}" |
| ) |
|
|
| |
| endpoints = load_test_request["endpoints"] |
| concurrency = load_test_request["concurrency"] |
| duration = load_test_request["duration"] |
|
|
| |
| result = await PerformanceBenchmark.load_test(endpoints, concurrency, duration) |
|
|
| |
| structured_logger.log_metric( |
| "load_test_completed", |
| 1, |
| { |
| "endpoints_count": len(endpoints), |
| "concurrency": concurrency, |
| "duration": duration |
| } |
| ) |
|
|
| return result |
|
|
| except Exception as e: |
| structured_logger.log_error(e, None, "load_test") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Load test failed: {str(e)}" |
| ) |
|
|
| @router.get("/benchmark/health") |
| async def benchmark_health_check() -> Dict[str, Any]: |
| """Health check for benchmarking system""" |
| try: |
| return { |
| "status": "healthy", |
| "timestamp": time.time(), |
| "benchmark_system": "operational", |
| "version": "1.0.0" |
| } |
| except Exception as e: |
| structured_logger.log_error(e, None, "benchmark_health") |
| raise HTTPException( |
| status_code=503, |
| detail="Benchmark system unavailable" |
| ) |