transcript_service / src /utils /error_handler.py
PCNUSMSE's picture
Upload folder using huggingface_hub
4e37375 verified
raw
history blame
11.7 kB
"""错误处理和容错机制模块
提供统一的错误处理、重试逻辑和异常恢复功能。
"""
import asyncio
import functools
import time
from typing import Any, Callable, Dict, Optional, Type, Union
from enum import Enum
from ..core.config import get_config
from ..utils.logger import get_task_logger
class ErrorCode(Enum):
"""错误代码"""
# 文件相关错误
FILE_NOT_FOUND = "FILE_001"
FILE_TOO_LARGE = "FILE_002"
FILE_FORMAT_UNSUPPORTED = "FILE_003"
FILE_CORRUPTED = "FILE_004"
# 网络相关错误
NETWORK_TIMEOUT = "NET_001"
NETWORK_CONNECTION_ERROR = "NET_002"
NETWORK_DNS_ERROR = "NET_003"
# API相关错误
API_KEY_INVALID = "API_001"
API_QUOTA_EXCEEDED = "API_002"
API_SERVICE_UNAVAILABLE = "API_003"
API_RATE_LIMITED = "API_004"
# OSS相关错误
OSS_ACCESS_DENIED = "OSS_001"
OSS_BUCKET_NOT_FOUND = "OSS_002"
OSS_UPLOAD_FAILED = "OSS_003"
# 系统相关错误
SYSTEM_OUT_OF_MEMORY = "SYS_001"
SYSTEM_DISK_FULL = "SYS_002"
SYSTEM_PERMISSION_DENIED = "SYS_003"
# 通用错误
UNKNOWN_ERROR = "GEN_001"
TIMEOUT_ERROR = "GEN_002"
VALIDATION_ERROR = "GEN_003"
class TranscriptServiceError(Exception):
"""服务自定义异常基类"""
def __init__(self, message: str, error_code: ErrorCode = ErrorCode.UNKNOWN_ERROR, details: Dict = None):
"""初始化异常
Args:
message: 错误消息
error_code: 错误代码
details: 额外详情
"""
super().__init__(message)
self.message = message
self.error_code = error_code
self.details = details or {}
self.timestamp = time.time()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'error_code': self.error_code.value,
'message': self.message,
'details': self.details,
'timestamp': self.timestamp
}
class FileValidationError(TranscriptServiceError):
"""文件验证错误"""
pass
class NetworkError(TranscriptServiceError):
"""网络相关错误"""
pass
class APIError(TranscriptServiceError):
"""API调用错误"""
pass
class OSSError(TranscriptServiceError):
"""OSS操作错误"""
pass
class SystemError(TranscriptServiceError):
"""系统错误"""
pass
class RetryStrategy:
"""重试策略"""
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""初始化重试策略
Args:
max_attempts: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
exponential_base: 指数退避基数
jitter: 是否添加随机抖动
"""
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def calculate_delay(self, attempt: int) -> float:
"""计算延迟时间
Args:
attempt: 当前尝试次数(从1开始)
Returns:
延迟时间(秒)
"""
delay = self.base_delay * (self.exponential_base ** (attempt - 1))
delay = min(delay, self.max_delay)
if self.jitter:
import random
delay *= (0.5 + random.random() * 0.5) # 添加±50%的随机抖动
return delay
class ErrorHandler:
"""错误处理器"""
def __init__(self):
"""初始化错误处理器"""
self.config = get_config()
self.logger = get_task_logger(logger_name="transcript_service.error")
# 错误分类映射
self.error_mapping = {
# 文件错误
FileNotFoundError: (FileValidationError, ErrorCode.FILE_NOT_FOUND),
PermissionError: (SystemError, ErrorCode.SYSTEM_PERMISSION_DENIED),
# 网络错误
asyncio.TimeoutError: (NetworkError, ErrorCode.NETWORK_TIMEOUT),
ConnectionError: (NetworkError, ErrorCode.NETWORK_CONNECTION_ERROR),
# 通用错误
ValueError: (TranscriptServiceError, ErrorCode.VALIDATION_ERROR),
RuntimeError: (TranscriptServiceError, ErrorCode.UNKNOWN_ERROR),
}
# 可重试的错误类型
self.retryable_errors = {
ErrorCode.NETWORK_TIMEOUT,
ErrorCode.NETWORK_CONNECTION_ERROR,
ErrorCode.API_RATE_LIMITED,
ErrorCode.OSS_UPLOAD_FAILED,
ErrorCode.API_SERVICE_UNAVAILABLE
}
def classify_error(self, error: Exception) -> TranscriptServiceError:
"""分类和包装错误
Args:
error: 原始异常
Returns:
分类后的服务异常
"""
if isinstance(error, TranscriptServiceError):
return error
error_type = type(error)
if error_type in self.error_mapping:
exception_class, error_code = self.error_mapping[error_type]
return exception_class(str(error), error_code)
# 根据错误消息内容进行分类
error_msg = str(error).lower()
if "timeout" in error_msg:
return NetworkError(str(error), ErrorCode.NETWORK_TIMEOUT)
elif "permission denied" in error_msg:
return SystemError(str(error), ErrorCode.SYSTEM_PERMISSION_DENIED)
elif "api key" in error_msg:
return APIError(str(error), ErrorCode.API_KEY_INVALID)
elif "quota" in error_msg or "limit" in error_msg:
return APIError(str(error), ErrorCode.API_QUOTA_EXCEEDED)
else:
return TranscriptServiceError(str(error), ErrorCode.UNKNOWN_ERROR)
def is_retryable(self, error: TranscriptServiceError) -> bool:
"""判断错误是否可重试
Args:
error: 服务异常
Returns:
是否可重试
"""
return error.error_code in self.retryable_errors
def handle_error(self, error: Exception, context: str = "") -> TranscriptServiceError:
"""处理错误
Args:
error: 原始异常
context: 错误上下文
Returns:
处理后的服务异常
"""
classified_error = self.classify_error(error)
# 记录错误日志
log_msg = f"错误处理 - {context}: {classified_error.message}"
if classified_error.error_code in [ErrorCode.UNKNOWN_ERROR, ErrorCode.SYSTEM_OUT_OF_MEMORY]:
self.logger.exception(log_msg)
else:
self.logger.error(log_msg)
return classified_error
# 全局错误处理器实例
error_handler = ErrorHandler()
def retry_async(
strategy: Optional[RetryStrategy] = None,
exceptions: tuple = (Exception,),
context: str = ""
):
"""异步函数重试装饰器
Args:
strategy: 重试策略
exceptions: 需要重试的异常类型
context: 上下文信息
"""
if strategy is None:
strategy = RetryStrategy()
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
logger = get_task_logger(logger_name="transcript_service.retry")
for attempt in range(1, strategy.max_attempts + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
classified_error = error_handler.classify_error(e)
# 检查是否可重试
if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error):
logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}")
raise classified_error
# 计算延迟时间
delay = strategy.calculate_delay(attempt)
logger.warning(f"{context}{attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}")
await asyncio.sleep(delay)
# 理论上不会执行到这里
raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR)
return wrapper
return decorator
def retry_sync(
strategy: Optional[RetryStrategy] = None,
exceptions: tuple = (Exception,),
context: str = ""
):
"""同步函数重试装饰器
Args:
strategy: 重试策略
exceptions: 需要重试的异常类型
context: 上下文信息
"""
if strategy is None:
strategy = RetryStrategy()
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger = get_task_logger(logger_name="transcript_service.retry")
for attempt in range(1, strategy.max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
classified_error = error_handler.classify_error(e)
# 检查是否可重试
if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error):
logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}")
raise classified_error
# 计算延迟时间
delay = strategy.calculate_delay(attempt)
logger.warning(f"{context}{attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}")
time.sleep(delay)
# 理论上不会执行到这里
raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR)
return wrapper
return decorator
def safe_execute(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]:
"""安全执行函数
Args:
func: 要执行的函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
(是否成功, 结果或None, 错误或None)
"""
try:
result = func(*args, **kwargs)
return True, result, None
except Exception as e:
error = error_handler.handle_error(e, f"执行 {func.__name__}")
return False, None, error
async def safe_execute_async(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]:
"""安全执行异步函数
Args:
func: 要执行的异步函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
(是否成功, 结果或None, 错误或None)
"""
try:
result = await func(*args, **kwargs)
return True, result, None
except Exception as e:
error = error_handler.handle_error(e, f"执行 {func.__name__}")
return False, None, error
def get_error_handler() -> ErrorHandler:
"""获取错误处理器实例
Returns:
错误处理器实例
"""
return error_handler