import re import shutil from pathlib import Path import click from ..utils import ( PluginStatus, build_plug_list, check_astrbot_root, get_astrbot_root, get_git_repo, manage_plugin, ) @click.group() def plug() -> None: """Plugin management""" def _get_data_path() -> Path: base = get_astrbot_root() if not check_astrbot_root(base): raise click.ClickException( f"{base} is not a valid AstrBot root directory. Use 'astrbot init' to initialize", ) return (base / "data").resolve() def display_plugins(plugins, title=None, color=None) -> None: if title: click.echo(click.style(title, fg=color, bold=True)) click.echo( f"{'Name':<20} {'Version':<10} {'Status':<10} {'Author':<15} {'Description':<30}" ) click.echo("-" * 85) for p in plugins: desc = p["desc"][:30] + ("..." if len(p["desc"]) > 30 else "") click.echo( f"{p['name']:<20} {p['version']:<10} {p['status']:<10} " f"{p['author']:<15} {desc:<30}", ) @plug.command() @click.argument("name") def new(name: str) -> None: """Create a new plugin""" base_path = _get_data_path() plug_path = base_path / "plugins" / name if plug_path.exists(): raise click.ClickException(f"Plugin {name} already exists") author = click.prompt("Enter plugin author", type=str) desc = click.prompt("Enter plugin description", type=str) version = click.prompt("Enter plugin version", type=str) if not re.match(r"^\d+\.\d+(\.\d+)?$", version.lower().lstrip("v")): raise click.ClickException("Version must be in x.y or x.y.z format") repo = click.prompt("Enter plugin repository URL:", type=str) if not repo.startswith("http"): raise click.ClickException("Repository URL must start with http") click.echo("Downloading plugin template...") get_git_repo( "https://github.com/Soulter/helloworld", plug_path, ) click.echo("Rewriting plugin metadata...") # Rewrite metadata.yaml with open(plug_path / "metadata.yaml", "w", encoding="utf-8") as f: f.write( f"name: {name}\n" f"desc: {desc}\n" f"version: {version}\n" f"author: {author}\n" f"repo: {repo}\n", ) # Rewrite README.md with open(plug_path / "README.md", "w", encoding="utf-8") as f: f.write( f"# {name}\n\n{desc}\n\n# Support\n\n[Documentation](https://astrbot.app)\n" ) # Rewrite main.py with open(plug_path / "main.py", encoding="utf-8") as f: content = f.read() new_content = content.replace( '@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")', f'@register("{name}", "{author}", "{desc}", "{version}")', ) with open(plug_path / "main.py", "w", encoding="utf-8") as f: f.write(new_content) click.echo(f"Plugin {name} created successfully") @plug.command() @click.option("--all", "-a", is_flag=True, help="List uninstalled plugins") def list(all: bool) -> None: """List plugins""" base_path = _get_data_path() plugins = build_plug_list(base_path / "plugins") # Unpublished plugins not_published_plugins = [ p for p in plugins if p["status"] == PluginStatus.NOT_PUBLISHED ] if not_published_plugins: display_plugins(not_published_plugins, "Unpublished Plugins", "red") # Plugins needing update need_update_plugins = [ p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE ] if need_update_plugins: display_plugins(need_update_plugins, "Plugins Needing Update", "yellow") # Installed plugins installed_plugins = [p for p in plugins if p["status"] == PluginStatus.INSTALLED] if installed_plugins: display_plugins(installed_plugins, "Installed Plugins", "green") # Uninstalled plugins not_installed_plugins = [ p for p in plugins if p["status"] == PluginStatus.NOT_INSTALLED ] if not_installed_plugins and all: display_plugins(not_installed_plugins, "Uninstalled Plugins", "blue") if ( not any([not_published_plugins, need_update_plugins, installed_plugins]) and not all ): click.echo("No plugins installed") @plug.command() @click.argument("name") @click.option("--proxy", help="Proxy server address") def install(name: str, proxy: str | None) -> None: """Install a plugin""" base_path = _get_data_path() plug_path = base_path / "plugins" plugins = build_plug_list(base_path / "plugins") plugin = next( ( p for p in plugins if p["name"] == name and p["status"] == PluginStatus.NOT_INSTALLED ), None, ) if not plugin: raise click.ClickException(f"Plugin {name} not found or already installed") manage_plugin(plugin, plug_path, is_update=False, proxy=proxy) @plug.command() @click.argument("name") def remove(name: str) -> None: """Uninstall a plugin""" base_path = _get_data_path() plugins = build_plug_list(base_path / "plugins") plugin = next((p for p in plugins if p["name"] == name), None) if not plugin or not plugin.get("local_path"): raise click.ClickException(f"Plugin {name} does not exist or is not installed") plugin_path = plugin["local_path"] click.confirm( f"Are you sure you want to uninstall plugin {name}?", default=False, abort=True ) try: shutil.rmtree(plugin_path) click.echo(f"Plugin {name} has been uninstalled") except Exception as e: raise click.ClickException(f"Failed to uninstall plugin {name}: {e}") @plug.command() @click.argument("name", required=False) @click.option("--proxy", help="GitHub proxy address") def update(name: str, proxy: str | None) -> None: """Update plugins""" base_path = _get_data_path() plug_path = base_path / "plugins" plugins = build_plug_list(base_path / "plugins") if name: plugin = next( ( p for p in plugins if p["name"] == name and p["status"] == PluginStatus.NEED_UPDATE ), None, ) if not plugin: raise click.ClickException( f"Plugin {name} does not need updating or cannot be updated" ) manage_plugin(plugin, plug_path, is_update=True, proxy=proxy) else: need_update_plugins = [ p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE ] if not need_update_plugins: click.echo("No plugins need updating") return click.echo(f"Found {len(need_update_plugins)} plugin(s) needing update") for plugin in need_update_plugins: plugin_name = plugin["name"] click.echo(f"Updating plugin {plugin_name}...") manage_plugin(plugin, plug_path, is_update=True, proxy=proxy) @plug.command() @click.argument("query") def search(query: str) -> None: """Search for plugins""" base_path = _get_data_path() plugins = build_plug_list(base_path / "plugins") matched_plugins = [ p for p in plugins if query.lower() in p["name"].lower() or query.lower() in p["desc"].lower() or query.lower() in p["author"].lower() ] if not matched_plugins: click.echo(f"No plugins matching '{query}' found") return display_plugins(matched_plugins, f"Search results: '{query}'", "cyan")