| from __future__ import annotations |
|
|
| from collections import defaultdict |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| from astrbot.api import sp |
| from astrbot.core import db_helper, logger |
| from astrbot.core.db.po import CommandConfig |
| from astrbot.core.star.filter.command import CommandFilter |
| from astrbot.core.star.filter.command_group import CommandGroupFilter |
| from astrbot.core.star.filter.permission import PermissionType, PermissionTypeFilter |
| from astrbot.core.star.star import star_map |
| from astrbot.core.star.star_handler import StarHandlerMetadata, star_handlers_registry |
|
|
|
|
| @dataclass |
| class CommandDescriptor: |
| handler: StarHandlerMetadata = field(repr=False) |
| filter_ref: CommandFilter | CommandGroupFilter | None = field( |
| default=None, |
| repr=False, |
| ) |
| handler_full_name: str = "" |
| handler_name: str = "" |
| plugin_name: str = "" |
| plugin_display_name: str | None = None |
| module_path: str = "" |
| description: str = "" |
| command_type: str = "command" |
| raw_command_name: str | None = None |
| current_fragment: str | None = None |
| parent_signature: str = "" |
| parent_group_handler: str = "" |
| original_command: str | None = None |
| effective_command: str | None = None |
| aliases: list[str] = field(default_factory=list) |
| permission: str = "everyone" |
| enabled: bool = True |
| is_group: bool = False |
| is_sub_command: bool = False |
| reserved: bool = False |
| config: CommandConfig | None = None |
| has_conflict: bool = False |
| sub_commands: list[CommandDescriptor] = field(default_factory=list) |
|
|
|
|
| async def sync_command_configs() -> None: |
| """同步指令配置,清理过期配置。""" |
| descriptors = _collect_descriptors(include_sub_commands=False) |
| config_records = await db_helper.get_command_configs() |
| config_map = _bind_configs_to_descriptors(descriptors, config_records) |
| live_handlers = {desc.handler_full_name for desc in descriptors} |
|
|
| stale_configs = [key for key in config_map if key not in live_handlers] |
| if stale_configs: |
| await db_helper.delete_command_configs(stale_configs) |
|
|
|
|
| async def toggle_command(handler_full_name: str, enabled: bool) -> CommandDescriptor: |
| descriptor = _build_descriptor_by_full_name(handler_full_name) |
| if not descriptor: |
| raise ValueError("指定的处理函数不存在或不是指令。") |
|
|
| existing_cfg = await db_helper.get_command_config(handler_full_name) |
| config = await db_helper.upsert_command_config( |
| handler_full_name=handler_full_name, |
| plugin_name=descriptor.plugin_name or "", |
| module_path=descriptor.module_path, |
| original_command=descriptor.original_command or descriptor.handler_name, |
| resolved_command=( |
| existing_cfg.resolved_command |
| if existing_cfg |
| else descriptor.current_fragment |
| ), |
| enabled=enabled, |
| keep_original_alias=False, |
| conflict_key=existing_cfg.conflict_key |
| if existing_cfg and existing_cfg.conflict_key |
| else descriptor.original_command, |
| resolution_strategy=existing_cfg.resolution_strategy if existing_cfg else None, |
| note=existing_cfg.note if existing_cfg else None, |
| extra_data=existing_cfg.extra_data if existing_cfg else None, |
| auto_managed=False, |
| ) |
| _bind_descriptor_with_config(descriptor, config) |
| await sync_command_configs() |
| return descriptor |
|
|
|
|
| async def rename_command( |
| handler_full_name: str, |
| new_fragment: str, |
| aliases: list[str] | None = None, |
| ) -> CommandDescriptor: |
| descriptor = _build_descriptor_by_full_name(handler_full_name) |
| if not descriptor: |
| raise ValueError("指定的处理函数不存在或不是指令。") |
|
|
| new_fragment = new_fragment.strip() |
| if not new_fragment: |
| raise ValueError("指令名不能为空。") |
|
|
| |
| candidate_full = _compose_command(descriptor.parent_signature, new_fragment) |
| if _is_command_in_use(handler_full_name, candidate_full): |
| raise ValueError(f"指令名 '{candidate_full}' 已被其他指令占用。") |
|
|
| |
| if aliases: |
| for alias in aliases: |
| alias = alias.strip() |
| if not alias: |
| continue |
| alias_full = _compose_command(descriptor.parent_signature, alias) |
| if _is_command_in_use(handler_full_name, alias_full): |
| raise ValueError(f"别名 '{alias_full}' 已被其他指令占用。") |
|
|
| existing_cfg = await db_helper.get_command_config(handler_full_name) |
| merged_extra = dict(existing_cfg.extra_data or {}) if existing_cfg else {} |
| merged_extra["resolved_aliases"] = aliases or [] |
|
|
| config = await db_helper.upsert_command_config( |
| handler_full_name=handler_full_name, |
| plugin_name=descriptor.plugin_name or "", |
| module_path=descriptor.module_path, |
| original_command=descriptor.original_command or descriptor.handler_name, |
| resolved_command=new_fragment, |
| enabled=True if descriptor.enabled else False, |
| keep_original_alias=False, |
| conflict_key=descriptor.original_command, |
| resolution_strategy="manual_rename", |
| note=None, |
| extra_data=merged_extra, |
| auto_managed=False, |
| ) |
| _bind_descriptor_with_config(descriptor, config) |
|
|
| await sync_command_configs() |
| return descriptor |
|
|
|
|
| async def update_command_permission( |
| handler_full_name: str, |
| permission_type: str, |
| ) -> CommandDescriptor: |
| descriptor = _build_descriptor_by_full_name(handler_full_name) |
| if not descriptor: |
| raise ValueError("指定的处理函数不存在或不是指令。") |
|
|
| if permission_type not in ["admin", "member"]: |
| raise ValueError("权限类型必须为 admin 或 member。") |
|
|
| handler = descriptor.handler |
| found_plugin = star_map.get(handler.handler_module_path) |
| if not found_plugin: |
| raise ValueError("未找到指令所属插件") |
|
|
| |
| alter_cmd_cfg = await sp.global_get("alter_cmd", {}) |
| plugin_ = alter_cmd_cfg.get(found_plugin.name, {}) |
| cfg = plugin_.get(handler.handler_name, {}) |
| cfg["permission"] = permission_type |
| plugin_[handler.handler_name] = cfg |
| alter_cmd_cfg[found_plugin.name] = plugin_ |
|
|
| await sp.global_put("alter_cmd", alter_cmd_cfg) |
|
|
| |
| found_permission_filter = False |
| target_perm_type = ( |
| PermissionType.ADMIN if permission_type == "admin" else PermissionType.MEMBER |
| ) |
|
|
| for filter_ in handler.event_filters: |
| if isinstance(filter_, PermissionTypeFilter): |
| filter_.permission_type = target_perm_type |
| found_permission_filter = True |
| break |
|
|
| if not found_permission_filter: |
| handler.event_filters.insert(0, PermissionTypeFilter(target_perm_type)) |
|
|
| |
| return _build_descriptor(handler) or descriptor |
|
|
|
|
| async def list_commands() -> list[dict[str, Any]]: |
| descriptors = _collect_descriptors(include_sub_commands=True) |
| config_records = await db_helper.get_command_configs() |
| _bind_configs_to_descriptors(descriptors, config_records) |
|
|
| conflict_groups = _group_conflicts(descriptors) |
| conflict_handler_names: set[str] = { |
| d.handler_full_name for group in conflict_groups.values() for d in group |
| } |
|
|
| |
| group_map: dict[str, CommandDescriptor] = {} |
| sub_commands: list[CommandDescriptor] = [] |
| root_commands: list[CommandDescriptor] = [] |
|
|
| for desc in descriptors: |
| desc.has_conflict = desc.handler_full_name in conflict_handler_names |
| if desc.is_group: |
| group_map[desc.handler_full_name] = desc |
| elif desc.is_sub_command: |
| sub_commands.append(desc) |
| else: |
| root_commands.append(desc) |
|
|
| for sub in sub_commands: |
| if sub.parent_group_handler and sub.parent_group_handler in group_map: |
| group_map[sub.parent_group_handler].sub_commands.append(sub) |
| else: |
| root_commands.append(sub) |
|
|
| |
| all_commands = list(group_map.values()) + root_commands |
| all_commands.sort(key=lambda d: (d.effective_command or "").lower()) |
|
|
| result = [_descriptor_to_dict(desc) for desc in all_commands] |
| return result |
|
|
|
|
| async def list_command_conflicts() -> list[dict[str, Any]]: |
| """列出所有冲突的指令组。""" |
| descriptors = _collect_descriptors(include_sub_commands=False) |
| config_records = await db_helper.get_command_configs() |
| _bind_configs_to_descriptors(descriptors, config_records) |
|
|
| conflict_groups = _group_conflicts(descriptors) |
| details = [ |
| { |
| "conflict_key": key, |
| "handlers": [ |
| { |
| "handler_full_name": item.handler_full_name, |
| "plugin": item.plugin_name, |
| "current_name": item.effective_command, |
| } |
| for item in group |
| ], |
| } |
| for key, group in conflict_groups.items() |
| ] |
| return details |
|
|
|
|
| |
|
|
|
|
| def _collect_descriptors(include_sub_commands: bool) -> list[CommandDescriptor]: |
| """收集指令,按需包含子指令。""" |
| descriptors: list[CommandDescriptor] = [] |
| for handler in star_handlers_registry: |
| try: |
| desc = _build_descriptor(handler) |
| if not desc: |
| continue |
| if not include_sub_commands and desc.is_sub_command: |
| continue |
| descriptors.append(desc) |
| except Exception as e: |
| logger.warning( |
| f"解析指令处理函数 {handler.handler_full_name} 失败,跳过该指令。原因: {e!s}" |
| ) |
| continue |
| return descriptors |
|
|
|
|
| def _build_descriptor(handler: StarHandlerMetadata) -> CommandDescriptor | None: |
| filter_ref = _locate_primary_filter(handler) |
| if filter_ref is None: |
| return None |
|
|
| plugin_meta = star_map.get(handler.handler_module_path) |
| plugin_name = ( |
| plugin_meta.name if plugin_meta else None |
| ) or handler.handler_module_path |
| plugin_display = plugin_meta.display_name if plugin_meta else None |
|
|
| is_sub_command = bool(handler.extras_configs.get("sub_command")) |
| parent_group_handler = "" |
|
|
| if isinstance(filter_ref, CommandFilter): |
| raw_fragment = getattr( |
| filter_ref, "_original_command_name", filter_ref.command_name |
| ) |
| current_fragment = filter_ref.command_name |
| parent_signature = (filter_ref.parent_command_names or [""])[0].strip() |
| |
| if is_sub_command and parent_signature: |
| parent_group_handler = _find_parent_group_handler( |
| handler.handler_module_path, parent_signature |
| ) |
| else: |
| raw_fragment = getattr( |
| filter_ref, "_original_group_name", filter_ref.group_name |
| ) |
| current_fragment = filter_ref.group_name |
| parent_signature = _resolve_group_parent_signature(filter_ref) |
|
|
| original_command = _compose_command(parent_signature, raw_fragment) |
| effective_command = _compose_command(parent_signature, current_fragment) |
|
|
| |
| if isinstance(filter_ref, CommandGroupFilter): |
| command_type = "group" |
| elif is_sub_command: |
| command_type = "sub_command" |
| else: |
| command_type = "command" |
|
|
| descriptor = CommandDescriptor( |
| handler=handler, |
| filter_ref=filter_ref, |
| handler_full_name=handler.handler_full_name, |
| handler_name=handler.handler_name, |
| plugin_name=plugin_name, |
| plugin_display_name=plugin_display, |
| module_path=handler.handler_module_path, |
| description=handler.desc or "", |
| command_type=command_type, |
| raw_command_name=raw_fragment, |
| current_fragment=current_fragment, |
| parent_signature=parent_signature, |
| parent_group_handler=parent_group_handler, |
| original_command=original_command, |
| effective_command=effective_command, |
| aliases=sorted(getattr(filter_ref, "alias", set())), |
| permission=_determine_permission(handler), |
| enabled=handler.enabled, |
| is_group=isinstance(filter_ref, CommandGroupFilter), |
| is_sub_command=is_sub_command, |
| reserved=plugin_meta.reserved if plugin_meta else False, |
| ) |
| return descriptor |
|
|
|
|
| def _build_descriptor_by_full_name(full_name: str) -> CommandDescriptor | None: |
| handler = star_handlers_registry.get_handler_by_full_name(full_name) |
| if not handler: |
| return None |
| return _build_descriptor(handler) |
|
|
|
|
| def _locate_primary_filter( |
| handler: StarHandlerMetadata, |
| ) -> CommandFilter | CommandGroupFilter | None: |
| for filter_ref in handler.event_filters: |
| if isinstance(filter_ref, CommandFilter | CommandGroupFilter): |
| return filter_ref |
| return None |
|
|
|
|
| def _determine_permission(handler: StarHandlerMetadata) -> str: |
| for filter_ref in handler.event_filters: |
| if isinstance(filter_ref, PermissionTypeFilter): |
| return ( |
| "admin" |
| if filter_ref.permission_type == PermissionType.ADMIN |
| else "member" |
| ) |
| return "everyone" |
|
|
|
|
| def _resolve_group_parent_signature(group_filter: CommandGroupFilter) -> str: |
| signatures: list[str] = [] |
| parent = group_filter.parent_group |
| while parent: |
| signatures.append(getattr(parent, "_original_group_name", parent.group_name)) |
| parent = parent.parent_group |
| return " ".join(reversed(signatures)).strip() |
|
|
|
|
| def _find_parent_group_handler(module_path: str, parent_signature: str) -> str: |
| """根据模块路径和父级签名,找到对应的指令组 handler_full_name。""" |
| parent_sig_normalized = parent_signature.strip() |
| for handler in star_handlers_registry: |
| if handler.handler_module_path != module_path: |
| continue |
| filter_ref = _locate_primary_filter(handler) |
| if not isinstance(filter_ref, CommandGroupFilter): |
| continue |
| |
| group_names = filter_ref.get_complete_command_names() |
| if parent_sig_normalized in group_names: |
| return handler.handler_full_name |
| return "" |
|
|
|
|
| def _compose_command(parent_signature: str, fragment: str | None) -> str: |
| fragment = (fragment or "").strip() |
| parent_signature = parent_signature.strip() |
| if not parent_signature: |
| return fragment |
| if not fragment: |
| return parent_signature |
| return f"{parent_signature} {fragment}" |
|
|
|
|
| def _bind_descriptor_with_config( |
| descriptor: CommandDescriptor, |
| config: CommandConfig, |
| ) -> None: |
| _apply_config_to_descriptor(descriptor, config) |
| _apply_config_to_runtime(descriptor, config) |
|
|
|
|
| def _apply_config_to_descriptor( |
| descriptor: CommandDescriptor, |
| config: CommandConfig, |
| ) -> None: |
| descriptor.config = config |
| descriptor.enabled = config.enabled |
|
|
| if config.original_command: |
| descriptor.original_command = config.original_command |
|
|
| new_fragment = config.resolved_command or descriptor.current_fragment |
| descriptor.current_fragment = new_fragment |
| descriptor.effective_command = _compose_command( |
| descriptor.parent_signature, |
| new_fragment, |
| ) |
|
|
| extra = config.extra_data or {} |
| resolved_aliases = extra.get("resolved_aliases") |
| if isinstance(resolved_aliases, list): |
| descriptor.aliases = [str(x) for x in resolved_aliases if str(x).strip()] |
|
|
|
|
| def _apply_config_to_runtime( |
| descriptor: CommandDescriptor, |
| config: CommandConfig, |
| ) -> None: |
| descriptor.handler.enabled = config.enabled |
| if descriptor.filter_ref: |
| if descriptor.current_fragment: |
| _set_filter_fragment(descriptor.filter_ref, descriptor.current_fragment) |
| extra = config.extra_data or {} |
| resolved_aliases = extra.get("resolved_aliases") |
| if isinstance(resolved_aliases, list): |
| _set_filter_aliases( |
| descriptor.filter_ref, |
| [str(x) for x in resolved_aliases if str(x).strip()], |
| ) |
|
|
|
|
| def _bind_configs_to_descriptors( |
| descriptors: list[CommandDescriptor], |
| config_records: list[CommandConfig], |
| ) -> dict[str, CommandConfig]: |
| config_map = {cfg.handler_full_name: cfg for cfg in config_records} |
| for desc in descriptors: |
| if cfg := config_map.get(desc.handler_full_name): |
| _bind_descriptor_with_config(desc, cfg) |
| return config_map |
|
|
|
|
| def _group_conflicts( |
| descriptors: list[CommandDescriptor], |
| ) -> dict[str, list[CommandDescriptor]]: |
| conflicts: dict[str, list[CommandDescriptor]] = defaultdict(list) |
| for desc in descriptors: |
| if desc.effective_command and desc.enabled: |
| conflicts[desc.effective_command].append(desc) |
| return {k: v for k, v in conflicts.items() if len(v) > 1} |
|
|
|
|
| def _set_filter_fragment( |
| filter_ref: CommandFilter | CommandGroupFilter, |
| fragment: str, |
| ) -> None: |
| attr = ( |
| "group_name" if isinstance(filter_ref, CommandGroupFilter) else "command_name" |
| ) |
| current_value = getattr(filter_ref, attr) |
| if fragment == current_value: |
| return |
| setattr(filter_ref, attr, fragment) |
| if hasattr(filter_ref, "_cmpl_cmd_names"): |
| filter_ref._cmpl_cmd_names = None |
|
|
|
|
| def _set_filter_aliases( |
| filter_ref: CommandFilter | CommandGroupFilter, |
| aliases: list[str], |
| ) -> None: |
| current_aliases = getattr(filter_ref, "alias", set()) |
| if set(aliases) == current_aliases: |
| return |
| setattr(filter_ref, "alias", set(aliases)) |
| if hasattr(filter_ref, "_cmpl_cmd_names"): |
| filter_ref._cmpl_cmd_names = None |
|
|
|
|
| def _is_command_in_use( |
| target_handler_full_name: str, |
| candidate_full_command: str, |
| ) -> bool: |
| candidate = candidate_full_command.strip() |
| for handler in star_handlers_registry: |
| if handler.handler_full_name == target_handler_full_name: |
| continue |
| filter_ref = _locate_primary_filter(handler) |
| if not filter_ref: |
| continue |
| names = {name.strip() for name in filter_ref.get_complete_command_names()} |
| if candidate in names: |
| return True |
| return False |
|
|
|
|
| def _descriptor_to_dict(desc: CommandDescriptor) -> dict[str, Any]: |
| result = { |
| "handler_full_name": desc.handler_full_name, |
| "handler_name": desc.handler_name, |
| "plugin": desc.plugin_name, |
| "plugin_display_name": desc.plugin_display_name, |
| "module_path": desc.module_path, |
| "description": desc.description, |
| "type": desc.command_type, |
| "parent_signature": desc.parent_signature, |
| "parent_group_handler": desc.parent_group_handler, |
| "original_command": desc.original_command, |
| "current_fragment": desc.current_fragment, |
| "effective_command": desc.effective_command, |
| "aliases": desc.aliases, |
| "permission": desc.permission, |
| "enabled": desc.enabled, |
| "is_group": desc.is_group, |
| "has_conflict": desc.has_conflict, |
| "reserved": desc.reserved, |
| } |
| |
| if desc.is_group and desc.sub_commands: |
| result["sub_commands"] = [_descriptor_to_dict(sub) for sub in desc.sub_commands] |
| else: |
| result["sub_commands"] = [] |
| return result |
|
|