| import re |
| import shutil |
| from pathlib import Path |
|
|
| import click |
|
|
| from ..utils import ( |
| PluginStatus, |
| build_plug_list, |
| check_astrbot_root, |
| get_astrbot_root, |
| get_git_repo, |
| manage_plugin, |
| ) |
|
|
|
|
| @click.group() |
| def plug() -> None: |
| """Plugin management""" |
|
|
|
|
| def _get_data_path() -> Path: |
| base = get_astrbot_root() |
| if not check_astrbot_root(base): |
| raise click.ClickException( |
| f"{base} is not a valid AstrBot root directory. Use 'astrbot init' to initialize", |
| ) |
| return (base / "data").resolve() |
|
|
|
|
| def display_plugins(plugins, title=None, color=None) -> None: |
| if title: |
| click.echo(click.style(title, fg=color, bold=True)) |
|
|
| click.echo( |
| f"{'Name':<20} {'Version':<10} {'Status':<10} {'Author':<15} {'Description':<30}" |
| ) |
| click.echo("-" * 85) |
|
|
| for p in plugins: |
| desc = p["desc"][:30] + ("..." if len(p["desc"]) > 30 else "") |
| click.echo( |
| f"{p['name']:<20} {p['version']:<10} {p['status']:<10} " |
| f"{p['author']:<15} {desc:<30}", |
| ) |
|
|
|
|
| @plug.command() |
| @click.argument("name") |
| def new(name: str) -> None: |
| """Create a new plugin""" |
| base_path = _get_data_path() |
| plug_path = base_path / "plugins" / name |
|
|
| if plug_path.exists(): |
| raise click.ClickException(f"Plugin {name} already exists") |
|
|
| author = click.prompt("Enter plugin author", type=str) |
| desc = click.prompt("Enter plugin description", type=str) |
| version = click.prompt("Enter plugin version", type=str) |
| if not re.match(r"^\d+\.\d+(\.\d+)?$", version.lower().lstrip("v")): |
| raise click.ClickException("Version must be in x.y or x.y.z format") |
| repo = click.prompt("Enter plugin repository URL:", type=str) |
| if not repo.startswith("http"): |
| raise click.ClickException("Repository URL must start with http") |
|
|
| click.echo("Downloading plugin template...") |
| get_git_repo( |
| "https://github.com/Soulter/helloworld", |
| plug_path, |
| ) |
|
|
| click.echo("Rewriting plugin metadata...") |
| |
| with open(plug_path / "metadata.yaml", "w", encoding="utf-8") as f: |
| f.write( |
| f"name: {name}\n" |
| f"desc: {desc}\n" |
| f"version: {version}\n" |
| f"author: {author}\n" |
| f"repo: {repo}\n", |
| ) |
|
|
| |
| with open(plug_path / "README.md", "w", encoding="utf-8") as f: |
| f.write( |
| f"# {name}\n\n{desc}\n\n# Support\n\n[Documentation](https://astrbot.app)\n" |
| ) |
|
|
| |
| with open(plug_path / "main.py", encoding="utf-8") as f: |
| content = f.read() |
|
|
| new_content = content.replace( |
| '@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")', |
| f'@register("{name}", "{author}", "{desc}", "{version}")', |
| ) |
|
|
| with open(plug_path / "main.py", "w", encoding="utf-8") as f: |
| f.write(new_content) |
|
|
| click.echo(f"Plugin {name} created successfully") |
|
|
|
|
| @plug.command() |
| @click.option("--all", "-a", is_flag=True, help="List uninstalled plugins") |
| def list(all: bool) -> None: |
| """List plugins""" |
| base_path = _get_data_path() |
| plugins = build_plug_list(base_path / "plugins") |
|
|
| |
| not_published_plugins = [ |
| p for p in plugins if p["status"] == PluginStatus.NOT_PUBLISHED |
| ] |
| if not_published_plugins: |
| display_plugins(not_published_plugins, "Unpublished Plugins", "red") |
|
|
| |
| need_update_plugins = [ |
| p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE |
| ] |
| if need_update_plugins: |
| display_plugins(need_update_plugins, "Plugins Needing Update", "yellow") |
|
|
| |
| installed_plugins = [p for p in plugins if p["status"] == PluginStatus.INSTALLED] |
| if installed_plugins: |
| display_plugins(installed_plugins, "Installed Plugins", "green") |
|
|
| |
| not_installed_plugins = [ |
| p for p in plugins if p["status"] == PluginStatus.NOT_INSTALLED |
| ] |
| if not_installed_plugins and all: |
| display_plugins(not_installed_plugins, "Uninstalled Plugins", "blue") |
|
|
| if ( |
| not any([not_published_plugins, need_update_plugins, installed_plugins]) |
| and not all |
| ): |
| click.echo("No plugins installed") |
|
|
|
|
| @plug.command() |
| @click.argument("name") |
| @click.option("--proxy", help="Proxy server address") |
| def install(name: str, proxy: str | None) -> None: |
| """Install a plugin""" |
| base_path = _get_data_path() |
| plug_path = base_path / "plugins" |
| plugins = build_plug_list(base_path / "plugins") |
|
|
| plugin = next( |
| ( |
| p |
| for p in plugins |
| if p["name"] == name and p["status"] == PluginStatus.NOT_INSTALLED |
| ), |
| None, |
| ) |
|
|
| if not plugin: |
| raise click.ClickException(f"Plugin {name} not found or already installed") |
|
|
| manage_plugin(plugin, plug_path, is_update=False, proxy=proxy) |
|
|
|
|
| @plug.command() |
| @click.argument("name") |
| def remove(name: str) -> None: |
| """Uninstall a plugin""" |
| base_path = _get_data_path() |
| plugins = build_plug_list(base_path / "plugins") |
| plugin = next((p for p in plugins if p["name"] == name), None) |
|
|
| if not plugin or not plugin.get("local_path"): |
| raise click.ClickException(f"Plugin {name} does not exist or is not installed") |
|
|
| plugin_path = plugin["local_path"] |
|
|
| click.confirm( |
| f"Are you sure you want to uninstall plugin {name}?", default=False, abort=True |
| ) |
|
|
| try: |
| shutil.rmtree(plugin_path) |
| click.echo(f"Plugin {name} has been uninstalled") |
| except Exception as e: |
| raise click.ClickException(f"Failed to uninstall plugin {name}: {e}") |
|
|
|
|
| @plug.command() |
| @click.argument("name", required=False) |
| @click.option("--proxy", help="GitHub proxy address") |
| def update(name: str, proxy: str | None) -> None: |
| """Update plugins""" |
| base_path = _get_data_path() |
| plug_path = base_path / "plugins" |
| plugins = build_plug_list(base_path / "plugins") |
|
|
| if name: |
| plugin = next( |
| ( |
| p |
| for p in plugins |
| if p["name"] == name and p["status"] == PluginStatus.NEED_UPDATE |
| ), |
| None, |
| ) |
|
|
| if not plugin: |
| raise click.ClickException( |
| f"Plugin {name} does not need updating or cannot be updated" |
| ) |
|
|
| manage_plugin(plugin, plug_path, is_update=True, proxy=proxy) |
| else: |
| need_update_plugins = [ |
| p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE |
| ] |
|
|
| if not need_update_plugins: |
| click.echo("No plugins need updating") |
| return |
|
|
| click.echo(f"Found {len(need_update_plugins)} plugin(s) needing update") |
| for plugin in need_update_plugins: |
| plugin_name = plugin["name"] |
| click.echo(f"Updating plugin {plugin_name}...") |
| manage_plugin(plugin, plug_path, is_update=True, proxy=proxy) |
|
|
|
|
| @plug.command() |
| @click.argument("query") |
| def search(query: str) -> None: |
| """Search for plugins""" |
| base_path = _get_data_path() |
| plugins = build_plug_list(base_path / "plugins") |
|
|
| matched_plugins = [ |
| p |
| for p in plugins |
| if query.lower() in p["name"].lower() |
| or query.lower() in p["desc"].lower() |
| or query.lower() in p["author"].lower() |
| ] |
|
|
| if not matched_plugins: |
| click.echo(f"No plugins matching '{query}' found") |
| return |
|
|
| display_plugins(matched_plugins, f"Search results: '{query}'", "cyan") |
|
|