"""Astrbot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作. 该类负责初始化各个组件, 包括 ProviderManager、PlatformManager、ConversationManager、PluginManager、PipelineScheduler、EventBus等。 该类还负责加载和执行插件, 以及处理事件总线的分发。 工作流程: 1. 初始化所有组件 2. 启动事件总线和任务, 所有任务都在这里运行 3. 执行启动完成事件钩子 """ import asyncio import os import threading import time import traceback from asyncio import Queue from astrbot.api import logger, sp from astrbot.core import LogBroker, LogManager from astrbot.core.astrbot_config_mgr import AstrBotConfigManager from astrbot.core.config.default import VERSION from astrbot.core.conversation_mgr import ConversationManager from astrbot.core.cron import CronJobManager from astrbot.core.db import BaseDatabase from astrbot.core.knowledge_base.kb_mgr import KnowledgeBaseManager from astrbot.core.persona_mgr import PersonaManager from astrbot.core.pipeline.scheduler import PipelineContext, PipelineScheduler from astrbot.core.platform.manager import PlatformManager from astrbot.core.platform_message_history_mgr import PlatformMessageHistoryManager from astrbot.core.provider.manager import ProviderManager from astrbot.core.star.context import Context from astrbot.core.star.star_handler import EventType, star_handlers_registry, star_map from astrbot.core.star.star_manager import PluginManager from astrbot.core.subagent_orchestrator import SubAgentOrchestrator from astrbot.core.umop_config_router import UmopConfigRouter from astrbot.core.updator import AstrBotUpdator from astrbot.core.utils.llm_metadata import update_llm_metadata from astrbot.core.utils.migra_helper import migra from astrbot.core.utils.temp_dir_cleaner import TempDirCleaner from . import astrbot_config, html_renderer from .event_bus import EventBus class AstrBotCoreLifecycle: """AstrBot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作. 该类负责初始化各个组件, 包括 ProviderManager、PlatformManager、ConversationManager、PluginManager、PipelineScheduler、 EventBus 等。 该类还负责加载和执行插件, 以及处理事件总线的分发。 """ def __init__(self, log_broker: LogBroker, db: BaseDatabase) -> None: self.log_broker = log_broker # 初始化日志代理 self.astrbot_config = astrbot_config # 初始化配置 self.db = db # 初始化数据库 self.subagent_orchestrator: SubAgentOrchestrator | None = None self.cron_manager: CronJobManager | None = None self.temp_dir_cleaner: TempDirCleaner | None = None # 设置代理 proxy_config = self.astrbot_config.get("http_proxy", "") if proxy_config != "": os.environ["https_proxy"] = proxy_config os.environ["http_proxy"] = proxy_config logger.debug(f"Using proxy: {proxy_config}") # 设置 no_proxy no_proxy_list = self.astrbot_config.get("no_proxy", []) os.environ["no_proxy"] = ",".join(no_proxy_list) else: # 清空代理环境变量 if "https_proxy" in os.environ: del os.environ["https_proxy"] if "http_proxy" in os.environ: del os.environ["http_proxy"] if "no_proxy" in os.environ: del os.environ["no_proxy"] logger.debug("HTTP proxy cleared") async def _init_or_reload_subagent_orchestrator(self) -> None: """Create (if needed) and reload the subagent orchestrator from config. This keeps lifecycle wiring in one place while allowing the orchestrator to manage enable/disable and tool registration details. """ try: if self.subagent_orchestrator is None: self.subagent_orchestrator = SubAgentOrchestrator( self.provider_manager.llm_tools, self.persona_mgr, ) await self.subagent_orchestrator.reload_from_config( self.astrbot_config.get("subagent_orchestrator", {}), ) except Exception as e: logger.error(f"Subagent orchestrator init failed: {e}", exc_info=True) async def initialize(self) -> None: """初始化 AstrBot 核心生命周期管理类. 负责初始化各个组件, 包括 ProviderManager、PlatformManager、ConversationManager、PluginManager、PipelineScheduler、EventBus、AstrBotUpdator等。 """ # 初始化日志代理 logger.info("AstrBot v" + VERSION) if os.environ.get("TESTING", ""): LogManager.configure_logger( logger, self.astrbot_config, override_level="DEBUG" ) LogManager.configure_trace_logger(self.astrbot_config) else: LogManager.configure_logger(logger, self.astrbot_config) LogManager.configure_trace_logger(self.astrbot_config) await self.db.initialize() await html_renderer.initialize() # 初始化 UMOP 配置路由器 self.umop_config_router = UmopConfigRouter(sp=sp) await self.umop_config_router.initialize() # 初始化 AstrBot 配置管理器 self.astrbot_config_mgr = AstrBotConfigManager( default_config=self.astrbot_config, ucr=self.umop_config_router, sp=sp, ) self.temp_dir_cleaner = TempDirCleaner( max_size_getter=lambda: self.astrbot_config_mgr.default_conf.get( TempDirCleaner.CONFIG_KEY, TempDirCleaner.DEFAULT_MAX_SIZE, ), ) # apply migration try: await migra( self.db, self.astrbot_config_mgr, self.umop_config_router, self.astrbot_config_mgr, ) except Exception as e: logger.error(f"AstrBot migration failed: {e!s}") logger.error(traceback.format_exc()) # 初始化事件队列 self.event_queue = Queue() # 初始化人格管理器 self.persona_mgr = PersonaManager(self.db, self.astrbot_config_mgr) await self.persona_mgr.initialize() # 初始化供应商管理器 self.provider_manager = ProviderManager( self.astrbot_config_mgr, self.db, self.persona_mgr, ) # 初始化平台管理器 self.platform_manager = PlatformManager(self.astrbot_config, self.event_queue) # 初始化对话管理器 self.conversation_manager = ConversationManager(self.db) # 初始化平台消息历史管理器 self.platform_message_history_manager = PlatformMessageHistoryManager(self.db) # 初始化知识库管理器 self.kb_manager = KnowledgeBaseManager(self.provider_manager) # 初始化 CronJob 管理器 self.cron_manager = CronJobManager(self.db) # Dynamic subagents (handoff tools) from config. await self._init_or_reload_subagent_orchestrator() # 初始化提供给插件的上下文 self.star_context = Context( self.event_queue, self.astrbot_config, self.db, self.provider_manager, self.platform_manager, self.conversation_manager, self.platform_message_history_manager, self.persona_mgr, self.astrbot_config_mgr, self.kb_manager, self.cron_manager, self.subagent_orchestrator, ) # 初始化插件管理器 self.plugin_manager = PluginManager(self.star_context, self.astrbot_config) # 扫描、注册插件、实例化插件类 await self.plugin_manager.reload() # 根据配置实例化各个 Provider await self.provider_manager.initialize() await self.kb_manager.initialize() # 初始化消息事件流水线调度器 self.pipeline_scheduler_mapping = await self.load_pipeline_scheduler() # 初始化更新器 self.astrbot_updator = AstrBotUpdator() # 初始化事件总线 self.event_bus = EventBus( self.event_queue, self.pipeline_scheduler_mapping, self.astrbot_config_mgr, ) # 记录启动时间 self.start_time = int(time.time()) # 初始化当前任务列表 self.curr_tasks: list[asyncio.Task] = [] # 根据配置实例化各个平台适配器 await self.platform_manager.initialize() # 初始化关闭控制面板的事件 self.dashboard_shutdown_event = asyncio.Event() asyncio.create_task(update_llm_metadata()) def _load(self) -> None: """加载事件总线和任务并初始化.""" # 创建一个异步任务来执行事件总线的 dispatch() 方法 # dispatch是一个无限循环的协程, 从事件队列中获取事件并处理 event_bus_task = asyncio.create_task( self.event_bus.dispatch(), name="event_bus", ) cron_task = None if self.cron_manager: cron_task = asyncio.create_task( self.cron_manager.start(self.star_context), name="cron_manager", ) temp_dir_cleaner_task = None if self.temp_dir_cleaner: temp_dir_cleaner_task = asyncio.create_task( self.temp_dir_cleaner.run(), name="temp_dir_cleaner", ) # 把插件中注册的所有协程函数注册到事件总线中并执行 extra_tasks = [] for task in self.star_context._register_tasks: extra_tasks.append(asyncio.create_task(task, name=task.__name__)) # type: ignore tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])] if cron_task: tasks_.append(cron_task) if temp_dir_cleaner_task: tasks_.append(temp_dir_cleaner_task) for task in tasks_: self.curr_tasks.append( asyncio.create_task(self._task_wrapper(task), name=task.get_name()), ) self.start_time = int(time.time()) async def _task_wrapper(self, task: asyncio.Task) -> None: """异步任务包装器, 用于处理异步任务执行中出现的各种异常. Args: task (asyncio.Task): 要执行的异步任务 """ try: await task except asyncio.CancelledError: pass # 任务被取消, 静默处理 except Exception as e: # 获取完整的异常堆栈信息, 按行分割并记录到日志中 logger.error(f"------- 任务 {task.get_name()} 发生错误: {e}") for line in traceback.format_exc().split("\n"): logger.error(f"| {line}") logger.error("-------") async def start(self) -> None: """启动 AstrBot 核心生命周期管理类. 用load加载事件总线和任务并初始化, 执行启动完成事件钩子 """ self._load() logger.info("AstrBot 启动完成。") # 执行启动完成事件钩子 handlers = star_handlers_registry.get_handlers_by_event_type( EventType.OnAstrBotLoadedEvent, ) for handler in handlers: try: logger.info( f"hook(on_astrbot_loaded) -> {star_map[handler.handler_module_path].name} - {handler.handler_name}", ) await handler.handler() except BaseException: logger.error(traceback.format_exc()) # 同时运行curr_tasks中的所有任务 await asyncio.gather(*self.curr_tasks, return_exceptions=True) async def stop(self) -> None: """停止 AstrBot 核心生命周期管理类, 取消所有当前任务并终止各个管理器.""" if self.temp_dir_cleaner: await self.temp_dir_cleaner.stop() # 请求停止所有正在运行的异步任务 for task in self.curr_tasks: task.cancel() if self.cron_manager: await self.cron_manager.shutdown() for plugin in self.plugin_manager.context.get_all_stars(): try: await self.plugin_manager._terminate_plugin(plugin) except Exception as e: logger.warning(traceback.format_exc()) logger.warning( f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。", ) await self.provider_manager.terminate() await self.platform_manager.terminate() await self.kb_manager.terminate() self.dashboard_shutdown_event.set() # 再次遍历curr_tasks等待每个任务真正结束 for task in self.curr_tasks: try: await task except asyncio.CancelledError: pass except Exception as e: logger.error(f"任务 {task.get_name()} 发生错误: {e}") async def restart(self) -> None: """重启 AstrBot 核心生命周期管理类, 终止各个管理器并重新加载平台实例""" await self.provider_manager.terminate() await self.platform_manager.terminate() await self.kb_manager.terminate() self.dashboard_shutdown_event.set() threading.Thread( target=self.astrbot_updator._reboot, name="restart", daemon=True, ).start() def load_platform(self) -> list[asyncio.Task]: """加载平台实例并返回所有平台实例的异步任务列表""" tasks = [] platform_insts = self.platform_manager.get_insts() for platform_inst in platform_insts: tasks.append( asyncio.create_task( platform_inst.run(), name=f"{platform_inst.meta().id}({platform_inst.meta().name})", ), ) return tasks async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]: """加载消息事件流水线调度器. Returns: dict[str, PipelineScheduler]: 平台 ID 到流水线调度器的映射 """ mapping = {} for conf_id, ab_config in self.astrbot_config_mgr.confs.items(): scheduler = PipelineScheduler( PipelineContext(ab_config, self.plugin_manager, conf_id), ) await scheduler.initialize() mapping[conf_id] = scheduler return mapping async def reload_pipeline_scheduler(self, conf_id: str) -> None: """重新加载消息事件流水线调度器. Returns: dict[str, PipelineScheduler]: 平台 ID 到流水线调度器的映射 """ ab_config = self.astrbot_config_mgr.confs.get(conf_id) if not ab_config: raise ValueError(f"配置文件 {conf_id} 不存在") scheduler = PipelineScheduler( PipelineContext(ab_config, self.plugin_manager, conf_id), ) await scheduler.initialize() self.pipeline_scheduler_mapping[conf_id] = scheduler