| """ |
| Structured logging configuration for the AI chat backend |
| """ |
| import logging |
| import logging.config |
| import os |
| from typing import Optional, Dict, Any |
| from datetime import datetime |
| from fastapi import Request, Response |
| from src.core.config import settings |
|
|
| class StructuredLogger: |
| def __init__(self, logger_name: str = "chat_backend"): |
| self.logger_name = logger_name |
| self.logger = logging.getLogger(logger_name) |
| self.setup_logging() |
|
|
| def setup_logging(self): |
| """Configure structured logging with appropriate formatters and handlers""" |
| |
| if self.logger.hasHandlers(): |
| self.logger.handlers.clear() |
|
|
| |
| log_level = logging.INFO if not settings.DEBUG else logging.DEBUG |
| self.logger.setLevel(log_level) |
|
|
| |
| formatter = logging.Formatter( |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
|
|
| |
| console_handler = logging.StreamHandler() |
| console_handler.setLevel(log_level) |
| console_handler.setFormatter(formatter) |
| self.logger.addHandler(console_handler) |
|
|
| |
| log_file = os.getenv("LOG_FILE") |
| if log_file: |
| file_handler = logging.FileHandler(log_file) |
| file_handler.setLevel(logging.INFO) |
| file_handler.setFormatter(formatter) |
| self.logger.addHandler(file_handler) |
|
|
| def log_request(self, request: Request, user_id: Optional[str] = None) -> None: |
| """Log incoming request with structured data""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "INFO", |
| "type": "request", |
| "method": request.method, |
| "path": request.url.path, |
| "client_ip": request.client.host, |
| "user_agent": request.headers.get("user-agent", "unknown"), |
| "headers": dict(request.headers), |
| "query_params": dict(request.query_params), |
| "user_id": user_id |
| } |
| self.logger.info("Incoming Request", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log request: {str(e)}") |
|
|
| def log_response(self, response: Response, request: Request, processing_time_ms: float) -> None: |
| """Log outgoing response with structured data""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "INFO", |
| "type": "response", |
| "status_code": response.status_code, |
| "method": request.method, |
| "path": request.url.path, |
| "processing_time_ms": processing_time_ms, |
| "response_size": len(response.body or b""), |
| "user_id": response.headers.get("X-User-Id", "unknown") |
| } |
| self.logger.info("Outgoing Response", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log response: {str(e)}") |
|
|
| def log_error(self, error: Exception, request: Request, user_id: Optional[str] = None) -> None: |
| """Log errors with structured data""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "ERROR", |
| "type": "error", |
| "error_type": type(error).__name__, |
| "error_message": str(error), |
| "method": request.method, |
| "path": request.url.path, |
| "client_ip": request.client.host, |
| "user_id": user_id |
| } |
| self.logger.error("Error Occurred", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log error: {str(e)}") |
|
|
| def log_database_query(self, query: str, execution_time_ms: float, user_id: Optional[str] = None) -> None: |
| """Log database queries with performance metrics""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "INFO", |
| "type": "database_query", |
| "query": query[:200], |
| "execution_time_ms": execution_time_ms, |
| "user_id": user_id |
| } |
| self.logger.info("Database Query", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log database query: {str(e)}") |
|
|
| def log_rate_limit_event(self, user_id: str, limit_type: str, current_count: int, max_limit: int) -> None: |
| """Log rate limiting events""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "INFO", |
| "type": "rate_limit", |
| "user_id": user_id, |
| "limit_type": limit_type, |
| "current_count": current_count, |
| "max_limit": max_limit |
| } |
| self.logger.info("Rate Limit Event", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log rate limit event: {str(e)}") |
|
|
| def log_health_check(self, status: str, response_time_ms: float) -> None: |
| """Log health check events""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "INFO", |
| "type": "health_check", |
| "status": status, |
| "response_time_ms": response_time_ms |
| } |
| self.logger.info("Health Check", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log health check: {str(e)}") |
|
|
| def log_metric(self, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None) -> None: |
| """Log custom metrics""" |
| try: |
| log_data = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "level": "INFO", |
| "type": "metric", |
| "metric_name": metric_name, |
| "value": value |
| } |
| if tags: |
| log_data["tags"] = tags |
| self.logger.info("Custom Metric", extra=log_data) |
| except Exception as e: |
| self.logger.error(f"Failed to log metric: {str(e)}") |
|
|
| |
| structured_logger = StructuredLogger() |
|
|
|
|
| def log_operation(operation: str, **kwargs): |
| """ |
| Log an operation with optional parameters |
| |
| Args: |
| operation: Name of the operation being logged |
| **kwargs: Additional parameters to log |
| """ |
| message = f"{operation}" |
| structured_logger.logger.info(message, extra=kwargs) |
|
|
|
|
| def log_error(error: str, **kwargs): |
| """ |
| Log an error with optional parameters |
| |
| Args: |
| error: Description of the error |
| **kwargs: Additional parameters to log |
| """ |
| structured_logger.logger.error(error, extra=kwargs) |
|
|
|
|
| def log_authorization_decision(user_id: str, resource: str, action: str, allowed: bool, **kwargs): |
| """ |
| Log an authorization decision |
| |
| Args: |
| user_id: ID of the user requesting access |
| resource: Resource being accessed |
| action: Action being performed |
| allowed: Whether access was granted |
| **kwargs: Additional parameters to log |
| """ |
| decision = "ALLOWED" if allowed else "DENIED" |
| message = f"AUTHORIZATION_DECISION: user={user_id}, resource={resource}, action={action}, decision={decision}" |
| structured_logger.logger.info(message, extra=kwargs) |
|
|
|
|
| def log_token_validation_result(event: str, user_id: str = "unknown", is_valid: bool = True, reason: str = "", **kwargs): |
| """ |
| Log a token validation result |
| |
| Args: |
| event: The event type (CREATED, VALID, INVALID, EXPIRED, etc.) |
| user_id: The user ID associated with the token |
| is_valid: Whether the token is valid |
| reason: Reason for validation result |
| **kwargs: Additional parameters to log |
| """ |
| message = f"TOKEN_VALIDATION_RESULT: event={event}, user={user_id}, is_valid={is_valid}, reason={reason}" |
| structured_logger.logger.info(message, extra=kwargs) |
|
|
|
|
| def log_token_lifecycle_event(event: str, user_id: str, token_type: str = "", **kwargs): |
| """ |
| Log a token lifecycle event |
| |
| Args: |
| event: Type of event (creation, refresh, expiry, etc.) |
| user_id: The user ID associated with the token |
| token_type: Type of token |
| **kwargs: Additional parameters to log |
| """ |
| message = f"TOKEN_LIFECYCLE_EVENT: event={event}, user={user_id}, type={token_type}" |
| structured_logger.logger.info(message, extra=kwargs) |