File size: 8,847 Bytes
8ede856
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import shutil
import tempfile
from enum import Enum
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile

import click
import httpx
import yaml

from .version_comparator import VersionComparator


class PluginStatus(str, Enum):
    INSTALLED = "installed"
    NEED_UPDATE = "needs-update"
    NOT_INSTALLED = "not-installed"
    NOT_PUBLISHED = "unpublished"


def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None:
    """Download code from a Git repository and extract to the specified path"""
    temp_dir = Path(tempfile.mkdtemp())
    try:
        # Parse repository info
        repo_namespace = url.split("/")[-2:]
        author = repo_namespace[0]
        repo = repo_namespace[1]

        # Try to get the latest release
        release_url = f"https://api.github.com/repos/{author}/{repo}/releases"
        try:
            with httpx.Client(
                proxy=proxy if proxy else None,
                follow_redirects=True,
            ) as client:
                resp = client.get(release_url)
                resp.raise_for_status()
                releases = resp.json()

                if releases:
                    # Use the latest release
                    download_url = releases[0]["zipball_url"]
                else:
                    # No release found, use default branch
                    click.echo(f"Downloading {author}/{repo} from default branch")
                    download_url = f"https://github.com/{author}/{repo}/archive/refs/heads/master.zip"
        except Exception as e:
            click.echo(f"Failed to get release info: {e}. Using provided URL directly")
            download_url = url

        # Apply proxy
        if proxy:
            download_url = f"{proxy}/{download_url}"

        # Download and extract
        with httpx.Client(
            proxy=proxy if proxy else None,
            follow_redirects=True,
        ) as client:
            resp = client.get(download_url)
            if (
                resp.status_code == 404
                and "archive/refs/heads/master.zip" in download_url
            ):
                alt_url = download_url.replace("master.zip", "main.zip")
                click.echo("Branch 'master' not found, trying 'main' branch")
                resp = client.get(alt_url)
                resp.raise_for_status()
            else:
                resp.raise_for_status()
            zip_content = BytesIO(resp.content)
        with ZipFile(zip_content) as z:
            z.extractall(temp_dir)
            namelist = z.namelist()
            root_dir = Path(namelist[0]).parts[0] if namelist else ""
            if target_path.exists():
                shutil.rmtree(target_path)
            shutil.move(temp_dir / root_dir, target_path)
    finally:
        if temp_dir.exists():
            shutil.rmtree(temp_dir, ignore_errors=True)


def load_yaml_metadata(plugin_dir: Path) -> dict:
    """Load plugin metadata from metadata.yaml file

    Args:
        plugin_dir: Plugin directory path

    Returns:
        dict: Dictionary containing metadata, or empty dict if loading fails

    """
    yaml_path = plugin_dir / "metadata.yaml"
    if yaml_path.exists():
        try:
            return yaml.safe_load(yaml_path.read_text(encoding="utf-8")) or {}
        except Exception as e:
            click.echo(f"Failed to read {yaml_path}: {e}", err=True)
    return {}


def build_plug_list(plugins_dir: Path) -> list:
    """Build plugin list containing local and online plugin information

    Args:
        plugins_dir (Path): Plugin directory path

    Returns:
        list: List of dicts containing plugin information

    """
    # Get local plugin info
    result = []
    if plugins_dir.exists():
        for plugin_name in [d.name for d in plugins_dir.glob("*") if d.is_dir()]:
            plugin_dir = plugins_dir / plugin_name

            # Load metadata from metadata.yaml
            metadata = load_yaml_metadata(plugin_dir)

            if "desc" not in metadata and "description" in metadata:
                metadata["desc"] = metadata["description"]

            # If metadata loaded successfully, add to result list
            if metadata and all(
                k in metadata for k in ["name", "desc", "version", "author", "repo"]
            ):
                result.append(
                    {
                        "name": str(metadata.get("name", "")),
                        "desc": str(metadata.get("desc", "")),
                        "version": str(metadata.get("version", "")),
                        "author": str(metadata.get("author", "")),
                        "repo": str(metadata.get("repo", "")),
                        "status": PluginStatus.INSTALLED,
                        "local_path": str(plugin_dir),
                    },
                )

    # Get online plugin list
    online_plugins = []
    try:
        with httpx.Client() as client:
            resp = client.get("https://api.soulter.top/astrbot/plugins")
            resp.raise_for_status()
            data = resp.json()
            for plugin_id, plugin_info in data.items():
                online_plugins.append(
                    {
                        "name": str(plugin_id),
                        "desc": str(plugin_info.get("desc", "")),
                        "version": str(plugin_info.get("version", "")),
                        "author": str(plugin_info.get("author", "")),
                        "repo": str(plugin_info.get("repo", "")),
                        "status": PluginStatus.NOT_INSTALLED,
                        "local_path": None,
                    },
                )
    except Exception as e:
        click.echo(f"Failed to get online plugin list: {e}", err=True)

    # Compare with online plugins and update status
    online_plugin_names = {plugin["name"] for plugin in online_plugins}
    for local_plugin in result:
        if local_plugin["name"] in online_plugin_names:
            # Find the corresponding online plugin
            online_plugin = next(
                p for p in online_plugins if p["name"] == local_plugin["name"]
            )
            if (
                VersionComparator.compare_version(
                    local_plugin["version"],
                    online_plugin["version"],
                )
                < 0
            ):
                local_plugin["status"] = PluginStatus.NEED_UPDATE
        else:
            # Local plugin is not published online
            local_plugin["status"] = PluginStatus.NOT_PUBLISHED

    # Add uninstalled online plugins
    for online_plugin in online_plugins:
        if not any(plugin["name"] == online_plugin["name"] for plugin in result):
            result.append(online_plugin)

    return result


def manage_plugin(
    plugin: dict,
    plugins_dir: Path,
    is_update: bool = False,
    proxy: str | None = None,
) -> None:
    """Install or update a plugin

    Args:
        plugin (dict): Plugin info dict
        plugins_dir (Path): Plugins directory
        is_update (bool, optional): Whether this is an update operation. Defaults to False
        proxy (str, optional): Proxy server address

    """
    plugin_name = plugin["name"]
    repo_url = plugin["repo"]

    # If updating and local path exists, use it directly
    if is_update and plugin.get("local_path"):
        target_path = Path(plugin["local_path"])
    else:
        target_path = plugins_dir / plugin_name

    backup_path = Path(f"{target_path}_backup") if is_update else None

    # Check if plugin exists
    if is_update and not target_path.exists():
        raise click.ClickException(
            f"Plugin {plugin_name} is not installed and cannot be updated"
        )

    # Backup existing plugin
    if is_update and backup_path is not None and backup_path.exists():
        shutil.rmtree(backup_path)
    if is_update and backup_path is not None:
        shutil.copytree(target_path, backup_path)

    try:
        click.echo(
            f"{'Updating' if is_update else 'Downloading'} plugin {plugin_name} from {repo_url}...",
        )
        get_git_repo(repo_url, target_path, proxy)

        # Update succeeded, delete backup
        if is_update and backup_path is not None and backup_path.exists():
            shutil.rmtree(backup_path)
        click.echo(
            f"Plugin {plugin_name} {'updated' if is_update else 'installed'} successfully"
        )
    except Exception as e:
        if target_path.exists():
            shutil.rmtree(target_path, ignore_errors=True)
        if is_update and backup_path is not None and backup_path.exists():
            shutil.move(backup_path, target_path)
        raise click.ClickException(
            f"Error {'updating' if is_update else 'installing'} plugin {plugin_name}: {e}",
        )