| """Astrbot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作. |
| |
| 该类负责初始化各个组件, 包括 ProviderManager、PlatformManager、ConversationManager、PluginManager、PipelineScheduler、EventBus等。 |
| 该类还负责加载和执行插件, 以及处理事件总线的分发。 |
| |
| 工作流程: |
| 1. 初始化所有组件 |
| 2. 启动事件总线和任务, 所有任务都在这里运行 |
| 3. 执行启动完成事件钩子 |
| """ |
|
|
| import asyncio |
| import os |
| import threading |
| import time |
| import traceback |
| from asyncio import Queue |
|
|
| from astrbot.api import logger, sp |
| from astrbot.core import LogBroker, LogManager |
| from astrbot.core.astrbot_config_mgr import AstrBotConfigManager |
| from astrbot.core.config.default import VERSION |
| from astrbot.core.conversation_mgr import ConversationManager |
| from astrbot.core.cron import CronJobManager |
| from astrbot.core.db import BaseDatabase |
| from astrbot.core.knowledge_base.kb_mgr import KnowledgeBaseManager |
| from astrbot.core.persona_mgr import PersonaManager |
| from astrbot.core.pipeline.scheduler import PipelineContext, PipelineScheduler |
| from astrbot.core.platform.manager import PlatformManager |
| from astrbot.core.platform_message_history_mgr import PlatformMessageHistoryManager |
| from astrbot.core.provider.manager import ProviderManager |
| from astrbot.core.star.context import Context |
| from astrbot.core.star.star_handler import EventType, star_handlers_registry, star_map |
| from astrbot.core.star.star_manager import PluginManager |
| from astrbot.core.subagent_orchestrator import SubAgentOrchestrator |
| from astrbot.core.umop_config_router import UmopConfigRouter |
| from astrbot.core.updator import AstrBotUpdator |
| from astrbot.core.utils.llm_metadata import update_llm_metadata |
| from astrbot.core.utils.migra_helper import migra |
| from astrbot.core.utils.temp_dir_cleaner import TempDirCleaner |
|
|
| from . import astrbot_config, html_renderer |
| from .event_bus import EventBus |
|
|
|
|
| class AstrBotCoreLifecycle: |
| """AstrBot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作. |
| |
| 该类负责初始化各个组件, 包括 ProviderManager、PlatformManager、ConversationManager、PluginManager、PipelineScheduler、 |
| EventBus 等。 |
| 该类还负责加载和执行插件, 以及处理事件总线的分发。 |
| """ |
|
|
| def __init__(self, log_broker: LogBroker, db: BaseDatabase) -> None: |
| self.log_broker = log_broker |
| self.astrbot_config = astrbot_config |
| self.db = db |
|
|
| self.subagent_orchestrator: SubAgentOrchestrator | None = None |
| self.cron_manager: CronJobManager | None = None |
| self.temp_dir_cleaner: TempDirCleaner | None = None |
|
|
| |
| proxy_config = self.astrbot_config.get("http_proxy", "") |
| if proxy_config != "": |
| os.environ["https_proxy"] = proxy_config |
| os.environ["http_proxy"] = proxy_config |
| logger.debug(f"Using proxy: {proxy_config}") |
| |
| no_proxy_list = self.astrbot_config.get("no_proxy", []) |
| os.environ["no_proxy"] = ",".join(no_proxy_list) |
| else: |
| |
| if "https_proxy" in os.environ: |
| del os.environ["https_proxy"] |
| if "http_proxy" in os.environ: |
| del os.environ["http_proxy"] |
| if "no_proxy" in os.environ: |
| del os.environ["no_proxy"] |
| logger.debug("HTTP proxy cleared") |
|
|
| async def _init_or_reload_subagent_orchestrator(self) -> None: |
| """Create (if needed) and reload the subagent orchestrator from config. |
| |
| This keeps lifecycle wiring in one place while allowing the orchestrator |
| to manage enable/disable and tool registration details. |
| """ |
| try: |
| if self.subagent_orchestrator is None: |
| self.subagent_orchestrator = SubAgentOrchestrator( |
| self.provider_manager.llm_tools, |
| self.persona_mgr, |
| ) |
| await self.subagent_orchestrator.reload_from_config( |
| self.astrbot_config.get("subagent_orchestrator", {}), |
| ) |
| except Exception as e: |
| logger.error(f"Subagent orchestrator init failed: {e}", exc_info=True) |
|
|
| async def initialize(self) -> None: |
| """初始化 AstrBot 核心生命周期管理类. |
| |
| 负责初始化各个组件, 包括 ProviderManager、PlatformManager、ConversationManager、PluginManager、PipelineScheduler、EventBus、AstrBotUpdator等。 |
| """ |
| |
| logger.info("AstrBot v" + VERSION) |
| if os.environ.get("TESTING", ""): |
| LogManager.configure_logger( |
| logger, self.astrbot_config, override_level="DEBUG" |
| ) |
| LogManager.configure_trace_logger(self.astrbot_config) |
| else: |
| LogManager.configure_logger(logger, self.astrbot_config) |
| LogManager.configure_trace_logger(self.astrbot_config) |
|
|
| await self.db.initialize() |
|
|
| await html_renderer.initialize() |
|
|
| |
| self.umop_config_router = UmopConfigRouter(sp=sp) |
| await self.umop_config_router.initialize() |
|
|
| |
| self.astrbot_config_mgr = AstrBotConfigManager( |
| default_config=self.astrbot_config, |
| ucr=self.umop_config_router, |
| sp=sp, |
| ) |
| self.temp_dir_cleaner = TempDirCleaner( |
| max_size_getter=lambda: self.astrbot_config_mgr.default_conf.get( |
| TempDirCleaner.CONFIG_KEY, |
| TempDirCleaner.DEFAULT_MAX_SIZE, |
| ), |
| ) |
|
|
| |
| try: |
| await migra( |
| self.db, |
| self.astrbot_config_mgr, |
| self.umop_config_router, |
| self.astrbot_config_mgr, |
| ) |
| except Exception as e: |
| logger.error(f"AstrBot migration failed: {e!s}") |
| logger.error(traceback.format_exc()) |
|
|
| |
| self.event_queue = Queue() |
|
|
| |
| self.persona_mgr = PersonaManager(self.db, self.astrbot_config_mgr) |
| await self.persona_mgr.initialize() |
|
|
| |
| self.provider_manager = ProviderManager( |
| self.astrbot_config_mgr, |
| self.db, |
| self.persona_mgr, |
| ) |
|
|
| |
| self.platform_manager = PlatformManager(self.astrbot_config, self.event_queue) |
|
|
| |
| self.conversation_manager = ConversationManager(self.db) |
|
|
| |
| self.platform_message_history_manager = PlatformMessageHistoryManager(self.db) |
|
|
| |
| self.kb_manager = KnowledgeBaseManager(self.provider_manager) |
|
|
| |
| self.cron_manager = CronJobManager(self.db) |
|
|
| |
| await self._init_or_reload_subagent_orchestrator() |
|
|
| |
| self.star_context = Context( |
| self.event_queue, |
| self.astrbot_config, |
| self.db, |
| self.provider_manager, |
| self.platform_manager, |
| self.conversation_manager, |
| self.platform_message_history_manager, |
| self.persona_mgr, |
| self.astrbot_config_mgr, |
| self.kb_manager, |
| self.cron_manager, |
| self.subagent_orchestrator, |
| ) |
|
|
| |
| self.plugin_manager = PluginManager(self.star_context, self.astrbot_config) |
|
|
| |
| await self.plugin_manager.reload() |
|
|
| |
| await self.provider_manager.initialize() |
|
|
| await self.kb_manager.initialize() |
|
|
| |
| self.pipeline_scheduler_mapping = await self.load_pipeline_scheduler() |
|
|
| |
| self.astrbot_updator = AstrBotUpdator() |
|
|
| |
| self.event_bus = EventBus( |
| self.event_queue, |
| self.pipeline_scheduler_mapping, |
| self.astrbot_config_mgr, |
| ) |
|
|
| |
| self.start_time = int(time.time()) |
|
|
| |
| self.curr_tasks: list[asyncio.Task] = [] |
|
|
| |
| await self.platform_manager.initialize() |
|
|
| |
| self.dashboard_shutdown_event = asyncio.Event() |
|
|
| asyncio.create_task(update_llm_metadata()) |
|
|
| def _load(self) -> None: |
| """加载事件总线和任务并初始化.""" |
| |
| |
| event_bus_task = asyncio.create_task( |
| self.event_bus.dispatch(), |
| name="event_bus", |
| ) |
| cron_task = None |
| if self.cron_manager: |
| cron_task = asyncio.create_task( |
| self.cron_manager.start(self.star_context), |
| name="cron_manager", |
| ) |
| temp_dir_cleaner_task = None |
| if self.temp_dir_cleaner: |
| temp_dir_cleaner_task = asyncio.create_task( |
| self.temp_dir_cleaner.run(), |
| name="temp_dir_cleaner", |
| ) |
|
|
| |
| extra_tasks = [] |
| for task in self.star_context._register_tasks: |
| extra_tasks.append(asyncio.create_task(task, name=task.__name__)) |
|
|
| tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])] |
| if cron_task: |
| tasks_.append(cron_task) |
| if temp_dir_cleaner_task: |
| tasks_.append(temp_dir_cleaner_task) |
| for task in tasks_: |
| self.curr_tasks.append( |
| asyncio.create_task(self._task_wrapper(task), name=task.get_name()), |
| ) |
|
|
| self.start_time = int(time.time()) |
|
|
| async def _task_wrapper(self, task: asyncio.Task) -> None: |
| """异步任务包装器, 用于处理异步任务执行中出现的各种异常. |
| |
| Args: |
| task (asyncio.Task): 要执行的异步任务 |
| |
| """ |
| try: |
| await task |
| except asyncio.CancelledError: |
| pass |
| except Exception as e: |
| |
| logger.error(f"------- 任务 {task.get_name()} 发生错误: {e}") |
| for line in traceback.format_exc().split("\n"): |
| logger.error(f"| {line}") |
| logger.error("-------") |
|
|
| async def start(self) -> None: |
| """启动 AstrBot 核心生命周期管理类. |
| |
| 用load加载事件总线和任务并初始化, 执行启动完成事件钩子 |
| """ |
| self._load() |
| logger.info("AstrBot 启动完成。") |
|
|
| |
| handlers = star_handlers_registry.get_handlers_by_event_type( |
| EventType.OnAstrBotLoadedEvent, |
| ) |
| for handler in handlers: |
| try: |
| logger.info( |
| f"hook(on_astrbot_loaded) -> {star_map[handler.handler_module_path].name} - {handler.handler_name}", |
| ) |
| await handler.handler() |
| except BaseException: |
| logger.error(traceback.format_exc()) |
|
|
| |
| await asyncio.gather(*self.curr_tasks, return_exceptions=True) |
|
|
| async def stop(self) -> None: |
| """停止 AstrBot 核心生命周期管理类, 取消所有当前任务并终止各个管理器.""" |
| if self.temp_dir_cleaner: |
| await self.temp_dir_cleaner.stop() |
|
|
| |
| for task in self.curr_tasks: |
| task.cancel() |
|
|
| if self.cron_manager: |
| await self.cron_manager.shutdown() |
|
|
| for plugin in self.plugin_manager.context.get_all_stars(): |
| try: |
| await self.plugin_manager._terminate_plugin(plugin) |
| except Exception as e: |
| logger.warning(traceback.format_exc()) |
| logger.warning( |
| f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。", |
| ) |
|
|
| await self.provider_manager.terminate() |
| await self.platform_manager.terminate() |
| await self.kb_manager.terminate() |
| self.dashboard_shutdown_event.set() |
|
|
| |
| for task in self.curr_tasks: |
| try: |
| await task |
| except asyncio.CancelledError: |
| pass |
| except Exception as e: |
| logger.error(f"任务 {task.get_name()} 发生错误: {e}") |
|
|
| async def restart(self) -> None: |
| """重启 AstrBot 核心生命周期管理类, 终止各个管理器并重新加载平台实例""" |
| await self.provider_manager.terminate() |
| await self.platform_manager.terminate() |
| await self.kb_manager.terminate() |
| self.dashboard_shutdown_event.set() |
| threading.Thread( |
| target=self.astrbot_updator._reboot, |
| name="restart", |
| daemon=True, |
| ).start() |
|
|
| def load_platform(self) -> list[asyncio.Task]: |
| """加载平台实例并返回所有平台实例的异步任务列表""" |
| tasks = [] |
| platform_insts = self.platform_manager.get_insts() |
| for platform_inst in platform_insts: |
| tasks.append( |
| asyncio.create_task( |
| platform_inst.run(), |
| name=f"{platform_inst.meta().id}({platform_inst.meta().name})", |
| ), |
| ) |
| return tasks |
|
|
| async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]: |
| """加载消息事件流水线调度器. |
| |
| Returns: |
| dict[str, PipelineScheduler]: 平台 ID 到流水线调度器的映射 |
| |
| """ |
| mapping = {} |
| for conf_id, ab_config in self.astrbot_config_mgr.confs.items(): |
| scheduler = PipelineScheduler( |
| PipelineContext(ab_config, self.plugin_manager, conf_id), |
| ) |
| await scheduler.initialize() |
| mapping[conf_id] = scheduler |
| return mapping |
|
|
| async def reload_pipeline_scheduler(self, conf_id: str) -> None: |
| """重新加载消息事件流水线调度器. |
| |
| Returns: |
| dict[str, PipelineScheduler]: 平台 ID 到流水线调度器的映射 |
| |
| """ |
| ab_config = self.astrbot_config_mgr.confs.get(conf_id) |
| if not ab_config: |
| raise ValueError(f"配置文件 {conf_id} 不存在") |
| scheduler = PipelineScheduler( |
| PipelineContext(ab_config, self.plugin_manager, conf_id), |
| ) |
| await scheduler.initialize() |
| self.pipeline_scheduler_mapping[conf_id] = scheduler |
|
|