| import abc |
| import uuid |
| from asyncio import Queue |
| from collections.abc import Coroutine |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from enum import Enum |
| from typing import Any |
|
|
| from astrbot.core.message.message_event_result import MessageChain |
| from astrbot.core.utils.metrics import Metric |
|
|
| from .astr_message_event import AstrMessageEvent |
| from .message_session import MessageSesion |
| from .platform_metadata import PlatformMetadata |
|
|
|
|
| class PlatformStatus(Enum): |
| """平台运行状态""" |
|
|
| PENDING = "pending" |
| RUNNING = "running" |
| ERROR = "error" |
| STOPPED = "stopped" |
|
|
|
|
| @dataclass |
| class PlatformError: |
| """平台错误信息""" |
|
|
| message: str |
| timestamp: datetime = field(default_factory=datetime.now) |
| traceback: str | None = None |
|
|
|
|
| class Platform(abc.ABC): |
| def __init__(self, config: dict, event_queue: Queue) -> None: |
| super().__init__() |
| |
| self.config = config |
| |
| self._event_queue = event_queue |
| self.client_self_id = uuid.uuid4().hex |
|
|
| |
| self._status: PlatformStatus = PlatformStatus.PENDING |
| self._errors: list[PlatformError] = [] |
| self._started_at: datetime | None = None |
|
|
| @property |
| def status(self) -> PlatformStatus: |
| """获取平台运行状态""" |
| return self._status |
|
|
| @status.setter |
| def status(self, value: PlatformStatus) -> None: |
| """设置平台运行状态""" |
| self._status = value |
| if value == PlatformStatus.RUNNING and self._started_at is None: |
| self._started_at = datetime.now() |
|
|
| @property |
| def errors(self) -> list[PlatformError]: |
| """获取错误列表""" |
| return self._errors |
|
|
| @property |
| def last_error(self) -> PlatformError | None: |
| """获取最近的错误""" |
| return self._errors[-1] if self._errors else None |
|
|
| def record_error(self, message: str, traceback_str: str | None = None) -> None: |
| """记录一个错误""" |
| self._errors.append(PlatformError(message=message, traceback=traceback_str)) |
| self._status = PlatformStatus.ERROR |
|
|
| def clear_errors(self) -> None: |
| """清除错误记录""" |
| self._errors.clear() |
| if self._status == PlatformStatus.ERROR: |
| self._status = PlatformStatus.RUNNING |
|
|
| def unified_webhook(self) -> bool: |
| """是否正在使用统一 Webhook 模式""" |
| return bool( |
| self.config.get("unified_webhook_mode", False) |
| and self.config.get("webhook_uuid") |
| ) |
|
|
| def get_stats(self) -> dict: |
| """获取平台统计信息""" |
| meta = self.meta() |
| meta_info = { |
| "id": meta.id, |
| "name": meta.name, |
| "display_name": meta.adapter_display_name or meta.name, |
| "description": meta.description, |
| "support_streaming_message": meta.support_streaming_message, |
| "support_proactive_message": meta.support_proactive_message, |
| } |
| return { |
| "id": meta.id or self.config.get("id"), |
| "type": meta.name, |
| "display_name": meta.adapter_display_name or meta.name, |
| "status": self._status.value, |
| "started_at": self._started_at.isoformat() if self._started_at else None, |
| "error_count": len(self._errors), |
| "last_error": { |
| "message": self.last_error.message, |
| "timestamp": self.last_error.timestamp.isoformat(), |
| "traceback": self.last_error.traceback, |
| } |
| if self.last_error |
| else None, |
| "unified_webhook": self.unified_webhook(), |
| "meta": meta_info, |
| } |
|
|
| @abc.abstractmethod |
| def run(self) -> Coroutine[Any, Any, None]: |
| """得到一个平台的运行实例,需要返回一个协程对象。""" |
| raise NotImplementedError |
|
|
| async def terminate(self) -> None: |
| """终止一个平台的运行实例。""" |
|
|
| @abc.abstractmethod |
| def meta(self) -> PlatformMetadata: |
| """得到一个平台的元数据。""" |
| raise NotImplementedError |
|
|
| async def send_by_session( |
| self, |
| session: MessageSesion, |
| message_chain: MessageChain, |
| ) -> None: |
| """通过会话发送消息。该方法旨在让插件能够直接通过**可持久化的会话数据**发送消息,而不需要保存 event 对象。 |
| |
| 异步方法。 |
| """ |
| await Metric.upload(msg_event_tick=1, adapter_name=self.meta().name) |
|
|
| def commit_event(self, event: AstrMessageEvent) -> None: |
| """提交一个事件到事件队列。""" |
| self._event_queue.put_nowait(event) |
|
|
| def get_client(self) -> object: |
| """获取平台的客户端对象。""" |
|
|
| async def webhook_callback(self, request: Any) -> Any: |
| """统一 Webhook 回调入口。 |
| |
| 支持统一 Webhook 模式的平台需要实现此方法。 |
| 当 Dashboard 收到 /api/platform/webhook/{uuid} 请求时,会调用此方法。 |
| |
| Args: |
| request: Quart 请求对象 |
| |
| Returns: |
| 响应内容,格式取决于具体平台的要求 |
| |
| Raises: |
| NotImplementedError: 平台未实现统一 Webhook 模式 |
| """ |
| raise NotImplementedError(f"平台 {self.meta().name} 未实现统一 Webhook 模式") |
|
|