astrbbbb / astrbot /core /log.py
qa1145's picture
Upload 1245 files
8ede856 verified
"""日志系统,统一将标准 logging 输出转发到 loguru。"""
import asyncio
import logging
import os
import sys
import time
from asyncio import Queue
from collections import deque
from typing import TYPE_CHECKING
from loguru import logger as _raw_loguru_logger
from astrbot.core.config.default import VERSION
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
CACHED_SIZE = 500
if TYPE_CHECKING:
from loguru import Record
class _RecordEnricherFilter(logging.Filter):
"""为 logging.LogRecord 注入 AstrBot 日志字段。"""
def filter(self, record: logging.LogRecord) -> bool:
record.plugin_tag = "[Plug]" if _is_plugin_path(record.pathname) else "[Core]"
record.short_levelname = _get_short_level_name(record.levelname)
record.astrbot_version_tag = (
f" [v{VERSION}]" if record.levelno >= logging.WARNING else ""
)
record.source_file = _build_source_file(record.pathname)
record.source_line = record.lineno
record.is_trace = record.name == "astrbot.trace"
return True
class _QueueAnsiColorFilter(logging.Filter):
"""Attach ANSI color prefix for WebUI console rendering."""
_LEVEL_COLOR = {
"DEBUG": "\u001b[1;34m",
"INFO": "\u001b[1;36m",
"WARNING": "\u001b[1;33m",
"ERROR": "\u001b[31m",
"CRITICAL": "\u001b[1;31m",
}
def filter(self, record: logging.LogRecord) -> bool:
record.ansi_prefix = self._LEVEL_COLOR.get(record.levelname, "\u001b[0m")
record.ansi_reset = "\u001b[0m"
return True
def _is_plugin_path(pathname: str | None) -> bool:
if not pathname:
return False
norm_path = os.path.normpath(pathname)
return ("data/plugins" in norm_path) or ("astrbot/builtin_stars/" in norm_path)
def _get_short_level_name(level_name: str) -> str:
level_map = {
"DEBUG": "DBUG",
"INFO": "INFO",
"WARNING": "WARN",
"ERROR": "ERRO",
"CRITICAL": "CRIT",
}
return level_map.get(level_name, level_name[:4].upper())
def _build_source_file(pathname: str | None) -> str:
if not pathname:
return "unknown"
dirname = os.path.dirname(pathname)
return (
os.path.basename(dirname) + "." + os.path.basename(pathname).replace(".py", "")
)
def _patch_record(record: "Record") -> None:
extra = record["extra"]
extra.setdefault("plugin_tag", "[Core]")
extra.setdefault("short_levelname", _get_short_level_name(record["level"].name))
level_no = record["level"].no
extra.setdefault("astrbot_version_tag", f" [v{VERSION}]" if level_no >= 30 else "")
extra.setdefault("source_file", _build_source_file(record["file"].path))
extra.setdefault("source_line", record["line"])
extra.setdefault("is_trace", False)
_loguru = _raw_loguru_logger.patch(_patch_record)
class _LoguruInterceptHandler(logging.Handler):
"""将 logging 记录转发到 loguru。"""
def emit(self, record: logging.LogRecord) -> None:
try:
level: str | int = _loguru.level(record.levelname).name
except ValueError:
level = record.levelno
payload = {
"plugin_tag": getattr(record, "plugin_tag", "[Core]"),
"short_levelname": getattr(
record,
"short_levelname",
_get_short_level_name(record.levelname),
),
"astrbot_version_tag": getattr(record, "astrbot_version_tag", ""),
"source_file": getattr(
record, "source_file", _build_source_file(record.pathname)
),
"source_line": getattr(record, "source_line", record.lineno),
"is_trace": getattr(record, "is_trace", record.name == "astrbot.trace"),
}
_loguru.bind(**payload).opt(exception=record.exc_info).log(
level,
record.getMessage(),
)
class LogBroker:
"""日志代理类,用于缓存和分发日志消息。"""
def __init__(self) -> None:
self.log_cache = deque(maxlen=CACHED_SIZE)
self.subscribers: list[Queue] = []
def register(self) -> Queue:
q = Queue(maxsize=CACHED_SIZE + 10)
self.subscribers.append(q)
return q
def unregister(self, q: Queue) -> None:
self.subscribers.remove(q)
def publish(self, log_entry: dict) -> None:
self.log_cache.append(log_entry)
for q in self.subscribers:
try:
q.put_nowait(log_entry)
except asyncio.QueueFull:
pass
class LogQueueHandler(logging.Handler):
"""日志处理器,用于将日志消息发送到 LogBroker。"""
def __init__(self, log_broker: LogBroker) -> None:
super().__init__()
self.log_broker = log_broker
def emit(self, record: logging.LogRecord) -> None:
log_entry = self.format(record)
self.log_broker.publish(
{
"level": record.levelname,
"time": time.time(),
"data": log_entry,
},
)
class LogManager:
_LOGGER_HANDLER_FLAG = "_astrbot_loguru_handler"
_ENRICH_FILTER_FLAG = "_astrbot_enrich_filter"
_configured = False
_console_sink_id: int | None = None
_file_sink_id: int | None = None
_trace_sink_id: int | None = None
_NOISY_LOGGER_LEVELS: dict[str, int] = {
"aiosqlite": logging.WARNING,
"filelock": logging.WARNING,
"asyncio": logging.WARNING,
"tzlocal": logging.WARNING,
"apscheduler": logging.WARNING,
}
@classmethod
def _default_log_path(cls) -> str:
return os.path.join(get_astrbot_data_path(), "logs", "astrbot.log")
@classmethod
def _resolve_log_path(cls, configured_path: str | None) -> str:
if not configured_path:
return cls._default_log_path()
if os.path.isabs(configured_path):
return configured_path
return os.path.join(get_astrbot_data_path(), configured_path)
@classmethod
def _setup_loguru(cls) -> None:
if cls._configured:
return
_loguru.remove()
cls._console_sink_id = _loguru.add(
sys.stdout,
level="DEBUG",
colorize=True,
filter=lambda record: not record["extra"].get("is_trace", False),
format=(
"<green>[{time:HH:mm:ss.SSS}]</green> {extra[plugin_tag]} "
"<level>[{extra[short_levelname]}]</level>{extra[astrbot_version_tag]} "
"[{extra[source_file]}:{extra[source_line]}]: <level>{message}</level>"
),
)
cls._configured = True
@classmethod
def _setup_root_bridge(cls) -> None:
root_logger = logging.getLogger()
has_handler = any(
getattr(handler, cls._LOGGER_HANDLER_FLAG, False)
for handler in root_logger.handlers
)
if not has_handler:
handler = _LoguruInterceptHandler()
setattr(handler, cls._LOGGER_HANDLER_FLAG, True)
root_logger.addHandler(handler)
root_logger.setLevel(logging.DEBUG)
for name, level in cls._NOISY_LOGGER_LEVELS.items():
logging.getLogger(name).setLevel(level)
@classmethod
def _ensure_logger_enricher_filter(cls, logger: logging.Logger) -> None:
has_filter = any(
getattr(existing_filter, cls._ENRICH_FILTER_FLAG, False)
for existing_filter in logger.filters
)
if not has_filter:
enrich_filter = _RecordEnricherFilter()
setattr(enrich_filter, cls._ENRICH_FILTER_FLAG, True)
logger.addFilter(enrich_filter)
@classmethod
def _ensure_logger_intercept_handler(cls, logger: logging.Logger) -> None:
has_handler = any(
getattr(handler, cls._LOGGER_HANDLER_FLAG, False)
for handler in logger.handlers
)
if not has_handler:
handler = _LoguruInterceptHandler()
setattr(handler, cls._LOGGER_HANDLER_FLAG, True)
logger.addHandler(handler)
@classmethod
def GetLogger(cls, log_name: str = "default") -> logging.Logger:
cls._setup_loguru()
cls._setup_root_bridge()
logger = logging.getLogger(log_name)
cls._ensure_logger_enricher_filter(logger)
cls._ensure_logger_intercept_handler(logger)
logger.setLevel(logging.DEBUG)
logger.propagate = False
return logger
@classmethod
def set_queue_handler(cls, logger: logging.Logger, log_broker: LogBroker) -> None:
cls._ensure_logger_enricher_filter(logger)
for handler in logger.handlers:
if isinstance(handler, LogQueueHandler):
return
handler = LogQueueHandler(log_broker)
handler.setLevel(logging.DEBUG)
handler.addFilter(_QueueAnsiColorFilter())
handler.setFormatter(
logging.Formatter(
"%(ansi_prefix)s[%(asctime)s.%(msecs)03d] %(plugin_tag)s [%(short_levelname)s]%(astrbot_version_tag)s "
"[%(source_file)s:%(source_line)d]: %(message)s%(ansi_reset)s",
datefmt="%Y-%m-%d %H:%M:%S",
),
)
logger.addHandler(handler)
@classmethod
def _remove_sink(cls, sink_id: int | None) -> None:
if sink_id is None:
return
try:
_loguru.remove(sink_id)
except ValueError:
pass
@classmethod
def _add_file_sink(
cls,
*,
file_path: str,
level: int,
max_mb: int | None,
backup_count: int,
trace: bool,
) -> int:
os.makedirs(os.path.dirname(file_path) or ".", exist_ok=True)
rotation = f"{max_mb} MB" if max_mb and max_mb > 0 else None
retention = (
backup_count if rotation and backup_count and backup_count > 0 else None
)
if trace:
return _loguru.add(
file_path,
level="INFO",
format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {message}",
encoding="utf-8",
rotation=rotation,
retention=retention,
enqueue=True,
filter=lambda record: record["extra"].get("is_trace", False),
)
logging_level_name = logging.getLevelName(level)
if isinstance(logging_level_name, int):
logging_level_name = "INFO"
return _loguru.add(
file_path,
level=logging_level_name,
format=(
"[{time:YYYY-MM-DD HH:mm:ss.SSS}] {extra[plugin_tag]} "
"[{extra[short_levelname]}]{extra[astrbot_version_tag]} "
"[{extra[source_file]}:{extra[source_line]}]: {message}"
),
encoding="utf-8",
rotation=rotation,
retention=retention,
enqueue=True,
filter=lambda record: not record["extra"].get("is_trace", False),
)
@classmethod
def configure_logger(
cls,
logger: logging.Logger,
config: dict | None,
override_level: str | None = None,
) -> None:
if not config:
return
level = override_level or config.get("log_level")
if level:
try:
logger.setLevel(level)
except Exception:
logger.setLevel(logging.INFO)
if "log_file" in config:
file_conf = config.get("log_file") or {}
enable_file = bool(file_conf.get("enable", False))
file_path = file_conf.get("path")
max_mb = file_conf.get("max_mb")
else:
enable_file = bool(config.get("log_file_enable", False))
file_path = config.get("log_file_path")
max_mb = config.get("log_file_max_mb")
cls._remove_sink(cls._file_sink_id)
cls._file_sink_id = None
if not enable_file:
return
try:
cls._file_sink_id = cls._add_file_sink(
file_path=cls._resolve_log_path(file_path),
level=logger.level,
max_mb=max_mb,
backup_count=3,
trace=False,
)
except Exception as e:
logger.error(f"Failed to add file sink: {e}")
@classmethod
def configure_trace_logger(cls, config: dict | None) -> None:
if not config:
return
enable = bool(
config.get("trace_log_enable")
or (config.get("log_file", {}) or {}).get("trace_enable", False)
)
path = config.get("trace_log_path")
max_mb = config.get("trace_log_max_mb")
if "log_file" in config:
legacy = config.get("log_file") or {}
path = path or legacy.get("trace_path")
max_mb = max_mb or legacy.get("trace_max_mb")
trace_logger = logging.getLogger("astrbot.trace")
cls._ensure_logger_enricher_filter(trace_logger)
cls._ensure_logger_intercept_handler(trace_logger)
trace_logger.setLevel(logging.INFO)
trace_logger.propagate = False
cls._remove_sink(cls._trace_sink_id)
cls._trace_sink_id = None
if not enable:
return
cls._trace_sink_id = cls._add_file_sink(
file_path=cls._resolve_log_path(path or "logs/astrbot.trace.log"),
level=logging.INFO,
max_mb=max_mb,
backup_count=3,
trace=True,
)