File size: 7,621 Bytes
8ede856
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import re
import shutil
from pathlib import Path

import click

from ..utils import (
    PluginStatus,
    build_plug_list,
    check_astrbot_root,
    get_astrbot_root,
    get_git_repo,
    manage_plugin,
)


@click.group()
def plug() -> None:
    """Plugin management"""


def _get_data_path() -> Path:
    base = get_astrbot_root()
    if not check_astrbot_root(base):
        raise click.ClickException(
            f"{base} is not a valid AstrBot root directory. Use 'astrbot init' to initialize",
        )
    return (base / "data").resolve()


def display_plugins(plugins, title=None, color=None) -> None:
    if title:
        click.echo(click.style(title, fg=color, bold=True))

    click.echo(
        f"{'Name':<20} {'Version':<10} {'Status':<10} {'Author':<15} {'Description':<30}"
    )
    click.echo("-" * 85)

    for p in plugins:
        desc = p["desc"][:30] + ("..." if len(p["desc"]) > 30 else "")
        click.echo(
            f"{p['name']:<20} {p['version']:<10} {p['status']:<10} "
            f"{p['author']:<15} {desc:<30}",
        )


@plug.command()
@click.argument("name")
def new(name: str) -> None:
    """Create a new plugin"""
    base_path = _get_data_path()
    plug_path = base_path / "plugins" / name

    if plug_path.exists():
        raise click.ClickException(f"Plugin {name} already exists")

    author = click.prompt("Enter plugin author", type=str)
    desc = click.prompt("Enter plugin description", type=str)
    version = click.prompt("Enter plugin version", type=str)
    if not re.match(r"^\d+\.\d+(\.\d+)?$", version.lower().lstrip("v")):
        raise click.ClickException("Version must be in x.y or x.y.z format")
    repo = click.prompt("Enter plugin repository URL:", type=str)
    if not repo.startswith("http"):
        raise click.ClickException("Repository URL must start with http")

    click.echo("Downloading plugin template...")
    get_git_repo(
        "https://github.com/Soulter/helloworld",
        plug_path,
    )

    click.echo("Rewriting plugin metadata...")
    # Rewrite metadata.yaml
    with open(plug_path / "metadata.yaml", "w", encoding="utf-8") as f:
        f.write(
            f"name: {name}\n"
            f"desc: {desc}\n"
            f"version: {version}\n"
            f"author: {author}\n"
            f"repo: {repo}\n",
        )

    # Rewrite README.md
    with open(plug_path / "README.md", "w", encoding="utf-8") as f:
        f.write(
            f"# {name}\n\n{desc}\n\n# Support\n\n[Documentation](https://astrbot.app)\n"
        )

    # Rewrite main.py
    with open(plug_path / "main.py", encoding="utf-8") as f:
        content = f.read()

    new_content = content.replace(
        '@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")',
        f'@register("{name}", "{author}", "{desc}", "{version}")',
    )

    with open(plug_path / "main.py", "w", encoding="utf-8") as f:
        f.write(new_content)

    click.echo(f"Plugin {name} created successfully")


@plug.command()
@click.option("--all", "-a", is_flag=True, help="List uninstalled plugins")
def list(all: bool) -> None:
    """List plugins"""
    base_path = _get_data_path()
    plugins = build_plug_list(base_path / "plugins")

    # Unpublished plugins
    not_published_plugins = [
        p for p in plugins if p["status"] == PluginStatus.NOT_PUBLISHED
    ]
    if not_published_plugins:
        display_plugins(not_published_plugins, "Unpublished Plugins", "red")

    # Plugins needing update
    need_update_plugins = [
        p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE
    ]
    if need_update_plugins:
        display_plugins(need_update_plugins, "Plugins Needing Update", "yellow")

    # Installed plugins
    installed_plugins = [p for p in plugins if p["status"] == PluginStatus.INSTALLED]
    if installed_plugins:
        display_plugins(installed_plugins, "Installed Plugins", "green")

    # Uninstalled plugins
    not_installed_plugins = [
        p for p in plugins if p["status"] == PluginStatus.NOT_INSTALLED
    ]
    if not_installed_plugins and all:
        display_plugins(not_installed_plugins, "Uninstalled Plugins", "blue")

    if (
        not any([not_published_plugins, need_update_plugins, installed_plugins])
        and not all
    ):
        click.echo("No plugins installed")


@plug.command()
@click.argument("name")
@click.option("--proxy", help="Proxy server address")
def install(name: str, proxy: str | None) -> None:
    """Install a plugin"""
    base_path = _get_data_path()
    plug_path = base_path / "plugins"
    plugins = build_plug_list(base_path / "plugins")

    plugin = next(
        (
            p
            for p in plugins
            if p["name"] == name and p["status"] == PluginStatus.NOT_INSTALLED
        ),
        None,
    )

    if not plugin:
        raise click.ClickException(f"Plugin {name} not found or already installed")

    manage_plugin(plugin, plug_path, is_update=False, proxy=proxy)


@plug.command()
@click.argument("name")
def remove(name: str) -> None:
    """Uninstall a plugin"""
    base_path = _get_data_path()
    plugins = build_plug_list(base_path / "plugins")
    plugin = next((p for p in plugins if p["name"] == name), None)

    if not plugin or not plugin.get("local_path"):
        raise click.ClickException(f"Plugin {name} does not exist or is not installed")

    plugin_path = plugin["local_path"]

    click.confirm(
        f"Are you sure you want to uninstall plugin {name}?", default=False, abort=True
    )

    try:
        shutil.rmtree(plugin_path)
        click.echo(f"Plugin {name} has been uninstalled")
    except Exception as e:
        raise click.ClickException(f"Failed to uninstall plugin {name}: {e}")


@plug.command()
@click.argument("name", required=False)
@click.option("--proxy", help="GitHub proxy address")
def update(name: str, proxy: str | None) -> None:
    """Update plugins"""
    base_path = _get_data_path()
    plug_path = base_path / "plugins"
    plugins = build_plug_list(base_path / "plugins")

    if name:
        plugin = next(
            (
                p
                for p in plugins
                if p["name"] == name and p["status"] == PluginStatus.NEED_UPDATE
            ),
            None,
        )

        if not plugin:
            raise click.ClickException(
                f"Plugin {name} does not need updating or cannot be updated"
            )

        manage_plugin(plugin, plug_path, is_update=True, proxy=proxy)
    else:
        need_update_plugins = [
            p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE
        ]

        if not need_update_plugins:
            click.echo("No plugins need updating")
            return

        click.echo(f"Found {len(need_update_plugins)} plugin(s) needing update")
        for plugin in need_update_plugins:
            plugin_name = plugin["name"]
            click.echo(f"Updating plugin {plugin_name}...")
            manage_plugin(plugin, plug_path, is_update=True, proxy=proxy)


@plug.command()
@click.argument("query")
def search(query: str) -> None:
    """Search for plugins"""
    base_path = _get_data_path()
    plugins = build_plug_list(base_path / "plugins")

    matched_plugins = [
        p
        for p in plugins
        if query.lower() in p["name"].lower()
        or query.lower() in p["desc"].lower()
        or query.lower() in p["author"].lower()
    ]

    if not matched_plugins:
        click.echo(f"No plugins matching '{query}' found")
        return

    display_plugins(matched_plugins, f"Search results: '{query}'", "cyan")