from __future__ import annotations import logging from asyncio import Queue from collections.abc import Awaitable, Callable from typing import TYPE_CHECKING, Any, Protocol from deprecated import deprecated from astrbot.core.agent.hooks import BaseAgentRunHooks from astrbot.core.agent.message import Message from astrbot.core.agent.runners.tool_loop_agent_runner import ToolLoopAgentRunner from astrbot.core.agent.tool import ToolSet from astrbot.core.astrbot_config_mgr import AstrBotConfigManager from astrbot.core.config.astrbot_config import AstrBotConfig from astrbot.core.conversation_mgr import ConversationManager from astrbot.core.db import BaseDatabase from astrbot.core.knowledge_base.kb_mgr import KnowledgeBaseManager from astrbot.core.message.message_event_result import MessageChain from astrbot.core.persona_mgr import PersonaManager from astrbot.core.platform import Platform from astrbot.core.platform.astr_message_event import AstrMessageEvent, MessageSesion from astrbot.core.platform_message_history_mgr import PlatformMessageHistoryManager from astrbot.core.provider.entities import LLMResponse, ProviderRequest, ProviderType from astrbot.core.provider.func_tool_manager import FunctionTool, FunctionToolManager from astrbot.core.provider.manager import ProviderManager from astrbot.core.provider.provider import ( EmbeddingProvider, Provider, RerankProvider, STTProvider, TTSProvider, ) from astrbot.core.star.filter.platform_adapter_type import ( ADAPTER_NAME_2_TYPE, PlatformAdapterType, ) from astrbot.core.subagent_orchestrator import SubAgentOrchestrator from ..exceptions import ProviderNotFoundError from .filter.command import CommandFilter from .filter.regex import RegexFilter from .star import StarMetadata, star_map, star_registry from .star_handler import EventType, StarHandlerMetadata, star_handlers_registry logger = logging.getLogger("astrbot") if TYPE_CHECKING: from astrbot.core.cron.manager import CronJobManager class PlatformManagerProtocol(Protocol): platform_insts: list[Platform] class Context: """暴露给插件的接口上下文。""" registered_web_apis: list = [] # 向后兼容的变量 _register_tasks: list[Awaitable] = [] _star_manager = None def __init__( self, event_queue: Queue, config: AstrBotConfig, db: BaseDatabase, provider_manager: ProviderManager, platform_manager: PlatformManagerProtocol, conversation_manager: ConversationManager, message_history_manager: PlatformMessageHistoryManager, persona_manager: PersonaManager, astrbot_config_mgr: AstrBotConfigManager, knowledge_base_manager: KnowledgeBaseManager, cron_manager: CronJobManager, subagent_orchestrator: SubAgentOrchestrator | None = None, ) -> None: self._event_queue = event_queue """事件队列。消息平台通过事件队列传递消息事件。""" self._config = config """AstrBot 默认配置""" self._db = db """AstrBot 数据库""" self.provider_manager = provider_manager """模型提供商管理器""" self.platform_manager = platform_manager """平台适配器管理器""" self.conversation_manager = conversation_manager """会话管理器""" self.message_history_manager = message_history_manager """平台消息历史管理器""" self.persona_manager = persona_manager """人格角色设定管理器""" self.astrbot_config_mgr = astrbot_config_mgr """配置文件管理器(非webui)""" self.kb_manager = knowledge_base_manager """知识库管理器""" self.cron_manager = cron_manager """Cron job manager, initialized by core lifecycle.""" self.subagent_orchestrator = subagent_orchestrator async def llm_generate( self, *, chat_provider_id: str, prompt: str | None = None, image_urls: list[str] | None = None, tools: ToolSet | None = None, system_prompt: str | None = None, contexts: list[Message] | None = None, **kwargs: Any, ) -> LLMResponse: """Call the LLM to generate a response. The method will not automatically execute tool calls. If you want to use tool calls, please use `tool_loop_agent()`. .. versionadded:: 4.5.7 (sdk) Args: chat_provider_id: The chat provider ID to use. prompt: The prompt to send to the LLM, if `contexts` and `prompt` are both provided, `prompt` will be appended as the last user message image_urls: List of image URLs to include in the prompt, if `contexts` and `prompt` are both provided, `image_urls` will be appended to the last user message tools: ToolSet of tools available to the LLM system_prompt: System prompt to guide the LLM's behavior, if provided, it will always insert as the first system message in the context contexts: context messages for the LLM **kwargs: Additional keyword arguments for LLM generation, OpenAI compatible Raises: ChatProviderNotFoundError: If the specified chat provider ID is not found Exception: For other errors during LLM generation """ prov = await self.provider_manager.get_provider_by_id(chat_provider_id) if not prov or not isinstance(prov, Provider): raise ProviderNotFoundError(f"Provider {chat_provider_id} not found") llm_resp = await prov.text_chat( prompt=prompt, image_urls=image_urls, func_tool=tools, contexts=contexts, system_prompt=system_prompt, **kwargs, ) return llm_resp async def tool_loop_agent( self, *, event: AstrMessageEvent, chat_provider_id: str, prompt: str | None = None, image_urls: list[str] | None = None, tools: ToolSet | None = None, system_prompt: str | None = None, contexts: list[Message] | None = None, max_steps: int = 30, tool_call_timeout: int = 60, **kwargs: Any, ) -> LLMResponse: """Run an agent loop that allows the LLM to call tools iteratively until a final answer is produced. If you do not pass the agent_context parameter, the method will recreate a new agent context. .. versionadded:: 4.5.7 (sdk) Args: chat_provider_id: The chat provider ID to use. prompt: The prompt to send to the LLM, if `contexts` and `prompt` are both provided, `prompt` will be appended as the last user message image_urls: List of image URLs to include in the prompt, if `contexts` and `prompt` are both provided, `image_urls` will be appended to the last user message tools: ToolSet of tools available to the LLM system_prompt: System prompt to guide the LLM's behavior, if provided, it will always insert as the first system message in the context contexts: context messages for the LLM max_steps: Maximum number of tool calls before stopping the loop **kwargs: Additional keyword arguments. The kwargs will not be passed to the LLM directly for now, but can include: stream: bool - whether to stream the LLM response agent_hooks: BaseAgentRunHooks[AstrAgentContext] - hooks to run during agent execution agent_context: AstrAgentContext - context to use for the agent other kwargs will be DIRECTLY passed to the runner.reset() method Returns: The final LLMResponse after tool calls are completed. Raises: ChatProviderNotFoundError: If the specified chat provider ID is not found Exception: For other errors during LLM generation """ # Import here to avoid circular imports from astrbot.core.astr_agent_context import ( AgentContextWrapper, AstrAgentContext, ) from astrbot.core.astr_agent_tool_exec import FunctionToolExecutor prov = await self.provider_manager.get_provider_by_id(chat_provider_id) if not prov or not isinstance(prov, Provider): raise ProviderNotFoundError(f"Provider {chat_provider_id} not found") agent_hooks = kwargs.get("agent_hooks") or BaseAgentRunHooks[AstrAgentContext]() agent_context = kwargs.get("agent_context") context_ = [] for msg in contexts or []: if isinstance(msg, Message): context_.append(msg.model_dump()) else: context_.append(msg) request = ProviderRequest( prompt=prompt, image_urls=image_urls or [], func_tool=tools, contexts=context_, system_prompt=system_prompt or "", ) if agent_context is None: agent_context = AstrAgentContext( context=self, event=event, ) agent_runner = ToolLoopAgentRunner() tool_executor = FunctionToolExecutor() streaming = kwargs.get("stream", False) other_kwargs = { k: v for k, v in kwargs.items() if k not in ["stream", "agent_hooks", "agent_context"] } await agent_runner.reset( provider=prov, request=request, run_context=AgentContextWrapper( context=agent_context, tool_call_timeout=tool_call_timeout, ), tool_executor=tool_executor, agent_hooks=agent_hooks, streaming=streaming, **other_kwargs, ) async for _ in agent_runner.step_until_done(max_steps): pass llm_resp = agent_runner.get_final_llm_resp() if not llm_resp: raise Exception("Agent did not produce a final LLM response") return llm_resp async def get_current_chat_provider_id(self, umo: str) -> str: """获取当前使用的聊天模型 Provider ID。 Args: umo: unified_message_origin。消息会话来源 ID。 Returns: 指定消息会话来源当前使用的聊天模型 Provider ID。 Raises: ProviderNotFoundError: 未找到。 """ prov = self.get_using_provider(umo) if not prov: raise ProviderNotFoundError("Provider not found") return prov.meta().id def get_registered_star(self, star_name: str) -> StarMetadata | None: """根据插件名获取插件的 Metadata""" for star in star_registry: if star.name == star_name: return star def get_all_stars(self) -> list[StarMetadata]: """获取当前载入的所有插件 Metadata 的列表""" return star_registry def get_llm_tool_manager(self) -> FunctionToolManager: """获取 LLM Tool Manager,其用于管理注册的所有的 Function-calling tools""" return self.provider_manager.llm_tools def activate_llm_tool(self, name: str) -> bool: """激活一个已经注册的函数调用工具。 Args: name: 工具名称。 Returns: 如果成功激活返回 True,如果没找到工具返回 False。 Note: 注册的工具默认是激活状态。 """ return self.provider_manager.llm_tools.activate_llm_tool(name, star_map) def deactivate_llm_tool(self, name: str) -> bool: """停用一个已经注册的函数调用工具。 Args: name: 工具名称。 Returns: 如果成功停用返回 True,如果没找到工具返回 False。 """ return self.provider_manager.llm_tools.deactivate_llm_tool(name) def get_provider_by_id( self, provider_id: str, ) -> ( Provider | TTSProvider | STTProvider | EmbeddingProvider | RerankProvider | None ): """通过 ID 获取对应的 LLM Provider。 Args: provider_id: 提供者 ID。 Returns: 提供者实例,如果未找到则返回 None。 Note: 如果提供者 ID 存在但未找到提供者,会记录警告日志。 """ prov = self.provider_manager.inst_map.get(provider_id) if provider_id and not prov: logger.warning( f"没有找到 ID 为 {provider_id} 的提供商,这可能是由于您修改了提供商(模型)ID 导致的。" ) return prov def get_all_providers(self) -> list[Provider]: """获取所有用于文本生成任务的 LLM Provider(Chat_Completion 类型)。""" return self.provider_manager.provider_insts def get_all_tts_providers(self) -> list[TTSProvider]: """获取所有用于 TTS 任务的 Provider。""" return self.provider_manager.tts_provider_insts def get_all_stt_providers(self) -> list[STTProvider]: """获取所有用于 STT 任务的 Provider。""" return self.provider_manager.stt_provider_insts def get_all_embedding_providers(self) -> list[EmbeddingProvider]: """获取所有用于 Embedding 任务的 Provider。""" return self.provider_manager.embedding_provider_insts def get_using_provider(self, umo: str | None = None) -> Provider | None: """获取当前使用的用于文本生成任务的 LLM Provider(Chat_Completion 类型)。 Args: umo: unified_message_origin 值,如果传入并且用户启用了提供商会话隔离, 则使用该会话偏好的对话模型(提供商)。 Returns: 当前使用的对话模型(提供商),如果未设置则返回 None。 Raises: ValueError: 该会话来源配置的的对话模型(提供商)的类型不正确。 """ prov = self.provider_manager.get_using_provider( provider_type=ProviderType.CHAT_COMPLETION, umo=umo, ) if prov is None: return None if not isinstance(prov, Provider): raise ValueError( f"该会话来源的对话模型(提供商)的类型不正确: {type(prov)}" ) return prov def get_using_tts_provider(self, umo: str | None = None) -> TTSProvider | None: """获取当前使用的用于 TTS 任务的 Provider。 Args: umo: unified_message_origin 值,如果传入,则使用该会话偏好的提供商。 Returns: 当前使用的 TTS 提供者,如果未设置则返回 None。 Raises: ValueError: 返回的提供者不是 TTSProvider 类型。 """ prov = self.provider_manager.get_using_provider( provider_type=ProviderType.TEXT_TO_SPEECH, umo=umo, ) if prov and not isinstance(prov, TTSProvider): raise ValueError("返回的 Provider 不是 TTSProvider 类型") return prov def get_using_stt_provider(self, umo: str | None = None) -> STTProvider | None: """获取当前使用的用于 STT 任务的 Provider。 Args: umo: unified_message_origin 值,如果传入,则使用该会话偏好的提供商。 Returns: 当前使用的 STT 提供者,如果未设置则返回 None。 Raises: ValueError: 返回的提供者不是 STTProvider 类型。 """ prov = self.provider_manager.get_using_provider( provider_type=ProviderType.SPEECH_TO_TEXT, umo=umo, ) if prov and not isinstance(prov, STTProvider): raise ValueError("返回的 Provider 不是 STTProvider 类型") return prov def get_config(self, umo: str | None = None) -> AstrBotConfig: """获取 AstrBot 的配置。 Args: umo: unified_message_origin 值,用于获取特定会话的配置。 Returns: AstrBot 配置对象。 Note: 如果不提供 umo 参数,将返回默认配置。 """ if not umo: # 使用默认配置 return self._config return self.astrbot_config_mgr.get_conf(umo) async def send_message( self, session: str | MessageSesion, message_chain: MessageChain, ) -> bool: """根据 session(unified_msg_origin) 主动发送消息。 Args: session: 消息会话。通过 event.session 或者 event.unified_msg_origin 获取。 message_chain: 消息链。 Returns: 是否找到匹配的平台。 Raises: ValueError: session 字符串不合法时抛出。 Note: 当 session 为字符串时,会尝试解析为 MessageSession 对象。(类名为MessageSesion是因为历史遗留拼写错误) qq_official(QQ 官方 API 平台) 不支持此方法。 """ if isinstance(session, str): try: session = MessageSesion.from_str(session) except BaseException as e: raise ValueError("不合法的 session 字符串: " + str(e)) for platform in self.platform_manager.platform_insts: if platform.meta().id == session.platform_name: await platform.send_by_session(session, message_chain) return True logger.warning( f"cannot find platform for session {str(session)}, message not sent" ) return False def add_llm_tools(self, *tools: FunctionTool) -> None: """添加 LLM 工具。 Args: *tools: 要添加的函数工具对象。 Note: 如果工具已存在,会替换已存在的工具。 """ tool_name = {tool.name for tool in self.provider_manager.llm_tools.func_list} module_path = "" for tool in tools: if not module_path: _parts = [] module_part = tool.__module__.split(".") flags = ["builtin_stars", "plugins"] for i, part in enumerate(module_part): _parts.append(part) if part in flags and i + 1 < len(module_part): _parts.append(module_part[i + 1]) module_part.append("main") break tool.handler_module_path = ".".join(_parts) module_path = tool.handler_module_path else: tool.handler_module_path = module_path logger.info( f"plugin(module_path {module_path}) added LLM tool: {tool.name}" ) if tool.name in tool_name: logger.warning("替换已存在的 LLM 工具: " + tool.name) self.provider_manager.llm_tools.remove_func(tool.name) self.provider_manager.llm_tools.func_list.append(tool) def register_web_api( self, route: str, view_handler: Awaitable, methods: list, desc: str, ) -> None: """注册 Web API。 Args: route: API 路由路径。 view_handler: 异步视图处理函数。 methods: HTTP 方法列表。 desc: API 描述。 Note: 如果相同路由和方法已注册,会替换现有的 API。 """ for idx, api in enumerate(self.registered_web_apis): if api[0] == route and methods == api[2]: self.registered_web_apis[idx] = (route, view_handler, methods, desc) return self.registered_web_apis.append((route, view_handler, methods, desc)) """ 以下的方法已经不推荐使用。请从 AstrBot 文档查看更好的注册方式。 """ def get_event_queue(self) -> Queue: """获取事件队列。""" return self._event_queue @deprecated(version="4.0.0", reason="Use get_platform_inst instead") def get_platform(self, platform_type: PlatformAdapterType | str) -> Platform | None: """获取指定类型的平台适配器。 Args: platform_type: 平台类型或平台名称。 Returns: 平台适配器实例,如果未找到则返回 None。 Note: 该方法已经过时,请使用 get_platform_inst 方法。(>= AstrBot v4.0.0) """ for platform in self.platform_manager.platform_insts: name = platform.meta().name if isinstance(platform_type, str): if name == platform_type: return platform elif ( name in ADAPTER_NAME_2_TYPE and ADAPTER_NAME_2_TYPE[name] & platform_type ): return platform def get_platform_inst(self, platform_id: str) -> Platform | None: """获取指定 ID 的平台适配器实例。 Args: platform_id: 平台适配器的唯一标识符。 Returns: 平台适配器实例,如果未找到则返回 None。 Note: 可以通过 event.get_platform_id() 获取平台 ID。 """ for platform in self.platform_manager.platform_insts: if platform.meta().id == platform_id: return platform def get_db(self) -> BaseDatabase: """获取 AstrBot 数据库。 Returns: 数据库实例。 """ return self._db def register_provider(self, provider: Provider) -> None: """注册一个 LLM Provider(Chat_Completion 类型)。 Args: provider: 提供者实例。 """ self.provider_manager.provider_insts.append(provider) def register_llm_tool( self, name: str, func_args: list, desc: str, func_obj: Callable[..., Awaitable[Any]], ) -> None: """[DEPRECATED]为函数调用(function-calling / tools-use)添加工具。 Args: name: 函数名。 func_args: 函数参数列表,格式为 [{"type": "string", "name": "arg_name", "description": "arg_description"}, ...]。 desc: 函数描述。 func_obj: 异步处理函数。 Note: 异步处理函数会接收到额外的关键词参数:event: AstrMessageEvent, context: Context。 该方法已弃用,请使用新的注册方式。 """ md = StarHandlerMetadata( event_type=EventType.OnLLMRequestEvent, handler_full_name=func_obj.__module__ + "_" + func_obj.__name__, handler_name=func_obj.__name__, handler_module_path=func_obj.__module__, handler=func_obj, event_filters=[], desc=desc, ) star_handlers_registry.append(md) self.provider_manager.llm_tools.add_func(name, func_args, desc, func_obj) def unregister_llm_tool(self, name: str) -> None: """[DEPRECATED]删除一个函数调用工具。 Args: name: 工具名称。 Note: 如果再要启用,需要重新注册。 该方法已弃用。 """ self.provider_manager.llm_tools.remove_func(name) def register_commands( self, star_name: str, command_name: str, desc: str, priority: int, awaitable: Callable[..., Awaitable[Any]], use_regex=False, ignore_prefix=False, ) -> None: """[DEPRECATED]注册一个命令。 Args: star_name: 插件(Star)名称。 command_name: 命令名称。 desc: 命令描述。 priority: 优先级。1-10。 awaitable: 异步处理函数。 use_regex: 是否使用正则表达式匹配命令。 ignore_prefix: 是否忽略命令前缀。 Note: 推荐使用装饰器注册指令。该方法将在未来的版本中被移除。 """ md = StarHandlerMetadata( event_type=EventType.AdapterMessageEvent, handler_full_name=awaitable.__module__ + "_" + awaitable.__name__, handler_name=awaitable.__name__, handler_module_path=awaitable.__module__, handler=awaitable, event_filters=[], desc=desc, ) if use_regex: md.event_filters.append(RegexFilter(regex=command_name)) else: md.event_filters.append( CommandFilter(command_name=command_name, handler_md=md), ) star_handlers_registry.append(md) def register_task(self, task: Awaitable, desc: str) -> None: """[DEPRECATED]注册一个异步任务。 Args: task: 异步任务。 desc: 任务描述。 Note: 该方法已弃用。 """ self._register_tasks.append(task)