File size: 9,005 Bytes
8ede856
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
from __future__ import annotations

import enum
from collections.abc import AsyncGenerator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any, Generic, Literal, TypeVar, overload

from .filter import HandlerFilter
from .star import star_map

T = TypeVar("T", bound="StarHandlerMetadata")


class StarHandlerRegistry(Generic[T]):
    def __init__(self) -> None:
        self.star_handlers_map: dict[str, StarHandlerMetadata] = {}
        self._handlers: list[StarHandlerMetadata] = []

    def append(self, handler: StarHandlerMetadata) -> None:
        """添加一个 Handler,并保持按优先级有序"""
        if "priority" not in handler.extras_configs:
            handler.extras_configs["priority"] = 0

        self.star_handlers_map[handler.handler_full_name] = handler
        self._handlers.append(handler)
        self._handlers.sort(key=lambda h: -h.extras_configs["priority"])

    def _print_handlers(self) -> None:
        for handler in self._handlers:
            print(handler.handler_full_name)

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnAstrBotLoadedEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnPlatformLoadedEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.AdapterMessageEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[
        StarHandlerMetadata[Callable[..., Awaitable[Any] | AsyncGenerator[Any]]]
    ]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnLLMRequestEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnLLMResponseEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnDecoratingResultEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnCallingFuncToolEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[
        StarHandlerMetadata[Callable[..., Awaitable[Any] | AsyncGenerator[Any]]]
    ]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnAfterMessageSentEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnPluginErrorEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnPluginLoadedEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: Literal[EventType.OnPluginUnloadedEvent],
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata[Callable[..., Awaitable[Any]]]]: ...

    @overload
    def get_handlers_by_event_type(
        self,
        event_type: EventType,
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[
        StarHandlerMetadata[Callable[..., Awaitable[Any] | AsyncGenerator[Any]]]
    ]: ...

    def get_handlers_by_event_type(
        self,
        event_type: EventType,
        only_activated=True,
        plugins_name: list[str] | None = None,
    ) -> list[StarHandlerMetadata]:
        handlers = []
        for handler in self._handlers:
            # 过滤事件类型
            if handler.event_type != event_type:
                continue
            if not handler.enabled:
                continue
            # 过滤启用状态
            if only_activated:
                plugin = star_map.get(handler.handler_module_path)
                if not (plugin and plugin.activated):
                    continue
            # 过滤插件白名单
            if plugins_name is not None and plugins_name != ["*"]:
                plugin = star_map.get(handler.handler_module_path)
                if not plugin:
                    continue
                if (
                    plugin.name not in plugins_name
                    and event_type
                    not in (
                        EventType.OnAstrBotLoadedEvent,
                        EventType.OnPlatformLoadedEvent,
                        EventType.OnPluginLoadedEvent,
                        EventType.OnPluginUnloadedEvent,
                    )
                    and not plugin.reserved
                ):
                    continue
            handlers.append(handler)
        return handlers

    def get_handler_by_full_name(self, full_name: str) -> StarHandlerMetadata | None:
        return self.star_handlers_map.get(full_name, None)

    def get_handlers_by_module_name(
        self,
        module_name: str,
    ) -> list[StarHandlerMetadata]:
        return [
            handler
            for handler in self._handlers
            if handler.handler_module_path == module_name
        ]

    def clear(self) -> None:
        self.star_handlers_map.clear()
        self._handlers.clear()

    def remove(self, handler: StarHandlerMetadata) -> None:
        self.star_handlers_map.pop(handler.handler_full_name, None)
        self._handlers = [h for h in self._handlers if h != handler]

    def __iter__(self):
        return iter(self._handlers)

    def __len__(self) -> int:
        return len(self._handlers)


star_handlers_registry = StarHandlerRegistry()  # type: ignore


class EventType(enum.Enum):
    """表示一个 AstrBot 内部事件的类型。如适配器消息事件、LLM 请求事件、发送消息前的事件等

    用于对 Handler 的职能分组。
    """

    OnAstrBotLoadedEvent = enum.auto()  # AstrBot 加载完成
    OnPlatformLoadedEvent = enum.auto()  # 平台加载完成

    AdapterMessageEvent = enum.auto()  # 收到适配器发来的消息
    OnWaitingLLMRequestEvent = enum.auto()  # 等待调用 LLM(在获取锁之前,仅通知)
    OnLLMRequestEvent = enum.auto()  # 收到 LLM 请求(可以是用户也可以是插件)
    OnLLMResponseEvent = enum.auto()  # LLM 响应后
    OnDecoratingResultEvent = enum.auto()  # 发送消息前
    OnCallingFuncToolEvent = enum.auto()  # 调用函数工具
    OnUsingLLMToolEvent = enum.auto()  # 使用 LLM 工具
    OnLLMToolRespondEvent = enum.auto()  # 调用函数工具后
    OnAfterMessageSentEvent = enum.auto()  # 发送消息后
    OnPluginErrorEvent = enum.auto()  # 插件处理消息异常时
    OnPluginLoadedEvent = enum.auto()  # 插件加载完成
    OnPluginUnloadedEvent = enum.auto()  # 插件卸载完成


H = TypeVar("H", bound=Callable[..., Any])


@dataclass
class StarHandlerMetadata(Generic[H]):
    """描述一个 Star 所注册的某一个 Handler。"""

    event_type: EventType
    """Handler 的事件类型"""

    handler_full_name: str
    '''格式为 f"{handler.__module__}_{handler.__name__}"'''

    handler_name: str
    """Handler 的名字,也就是方法名"""

    handler_module_path: str
    """Handler 所在的模块路径。"""

    handler: H
    """Handler 的函数对象,应当是一个异步函数"""

    event_filters: list[HandlerFilter]
    """一个适配器消息事件过滤器,用于描述这个 Handler 能够处理、应该处理的适配器消息事件"""

    desc: str = ""
    """Handler 的描述信息"""

    extras_configs: dict = field(default_factory=dict)
    """插件注册的一些其他的信息, 如 priority 等"""

    enabled: bool = True

    def __lt__(self, other: StarHandlerMetadata):
        """定义小于运算符以支持优先队列"""
        return self.extras_configs.get("priority", 0) < other.extras_configs.get(
            "priority",
            0,
        )