File size: 8,847 Bytes
8ede856 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | import shutil
import tempfile
from enum import Enum
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import click
import httpx
import yaml
from .version_comparator import VersionComparator
class PluginStatus(str, Enum):
INSTALLED = "installed"
NEED_UPDATE = "needs-update"
NOT_INSTALLED = "not-installed"
NOT_PUBLISHED = "unpublished"
def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None:
"""Download code from a Git repository and extract to the specified path"""
temp_dir = Path(tempfile.mkdtemp())
try:
# Parse repository info
repo_namespace = url.split("/")[-2:]
author = repo_namespace[0]
repo = repo_namespace[1]
# Try to get the latest release
release_url = f"https://api.github.com/repos/{author}/{repo}/releases"
try:
with httpx.Client(
proxy=proxy if proxy else None,
follow_redirects=True,
) as client:
resp = client.get(release_url)
resp.raise_for_status()
releases = resp.json()
if releases:
# Use the latest release
download_url = releases[0]["zipball_url"]
else:
# No release found, use default branch
click.echo(f"Downloading {author}/{repo} from default branch")
download_url = f"https://github.com/{author}/{repo}/archive/refs/heads/master.zip"
except Exception as e:
click.echo(f"Failed to get release info: {e}. Using provided URL directly")
download_url = url
# Apply proxy
if proxy:
download_url = f"{proxy}/{download_url}"
# Download and extract
with httpx.Client(
proxy=proxy if proxy else None,
follow_redirects=True,
) as client:
resp = client.get(download_url)
if (
resp.status_code == 404
and "archive/refs/heads/master.zip" in download_url
):
alt_url = download_url.replace("master.zip", "main.zip")
click.echo("Branch 'master' not found, trying 'main' branch")
resp = client.get(alt_url)
resp.raise_for_status()
else:
resp.raise_for_status()
zip_content = BytesIO(resp.content)
with ZipFile(zip_content) as z:
z.extractall(temp_dir)
namelist = z.namelist()
root_dir = Path(namelist[0]).parts[0] if namelist else ""
if target_path.exists():
shutil.rmtree(target_path)
shutil.move(temp_dir / root_dir, target_path)
finally:
if temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
def load_yaml_metadata(plugin_dir: Path) -> dict:
"""Load plugin metadata from metadata.yaml file
Args:
plugin_dir: Plugin directory path
Returns:
dict: Dictionary containing metadata, or empty dict if loading fails
"""
yaml_path = plugin_dir / "metadata.yaml"
if yaml_path.exists():
try:
return yaml.safe_load(yaml_path.read_text(encoding="utf-8")) or {}
except Exception as e:
click.echo(f"Failed to read {yaml_path}: {e}", err=True)
return {}
def build_plug_list(plugins_dir: Path) -> list:
"""Build plugin list containing local and online plugin information
Args:
plugins_dir (Path): Plugin directory path
Returns:
list: List of dicts containing plugin information
"""
# Get local plugin info
result = []
if plugins_dir.exists():
for plugin_name in [d.name for d in plugins_dir.glob("*") if d.is_dir()]:
plugin_dir = plugins_dir / plugin_name
# Load metadata from metadata.yaml
metadata = load_yaml_metadata(plugin_dir)
if "desc" not in metadata and "description" in metadata:
metadata["desc"] = metadata["description"]
# If metadata loaded successfully, add to result list
if metadata and all(
k in metadata for k in ["name", "desc", "version", "author", "repo"]
):
result.append(
{
"name": str(metadata.get("name", "")),
"desc": str(metadata.get("desc", "")),
"version": str(metadata.get("version", "")),
"author": str(metadata.get("author", "")),
"repo": str(metadata.get("repo", "")),
"status": PluginStatus.INSTALLED,
"local_path": str(plugin_dir),
},
)
# Get online plugin list
online_plugins = []
try:
with httpx.Client() as client:
resp = client.get("https://api.soulter.top/astrbot/plugins")
resp.raise_for_status()
data = resp.json()
for plugin_id, plugin_info in data.items():
online_plugins.append(
{
"name": str(plugin_id),
"desc": str(plugin_info.get("desc", "")),
"version": str(plugin_info.get("version", "")),
"author": str(plugin_info.get("author", "")),
"repo": str(plugin_info.get("repo", "")),
"status": PluginStatus.NOT_INSTALLED,
"local_path": None,
},
)
except Exception as e:
click.echo(f"Failed to get online plugin list: {e}", err=True)
# Compare with online plugins and update status
online_plugin_names = {plugin["name"] for plugin in online_plugins}
for local_plugin in result:
if local_plugin["name"] in online_plugin_names:
# Find the corresponding online plugin
online_plugin = next(
p for p in online_plugins if p["name"] == local_plugin["name"]
)
if (
VersionComparator.compare_version(
local_plugin["version"],
online_plugin["version"],
)
< 0
):
local_plugin["status"] = PluginStatus.NEED_UPDATE
else:
# Local plugin is not published online
local_plugin["status"] = PluginStatus.NOT_PUBLISHED
# Add uninstalled online plugins
for online_plugin in online_plugins:
if not any(plugin["name"] == online_plugin["name"] for plugin in result):
result.append(online_plugin)
return result
def manage_plugin(
plugin: dict,
plugins_dir: Path,
is_update: bool = False,
proxy: str | None = None,
) -> None:
"""Install or update a plugin
Args:
plugin (dict): Plugin info dict
plugins_dir (Path): Plugins directory
is_update (bool, optional): Whether this is an update operation. Defaults to False
proxy (str, optional): Proxy server address
"""
plugin_name = plugin["name"]
repo_url = plugin["repo"]
# If updating and local path exists, use it directly
if is_update and plugin.get("local_path"):
target_path = Path(plugin["local_path"])
else:
target_path = plugins_dir / plugin_name
backup_path = Path(f"{target_path}_backup") if is_update else None
# Check if plugin exists
if is_update and not target_path.exists():
raise click.ClickException(
f"Plugin {plugin_name} is not installed and cannot be updated"
)
# Backup existing plugin
if is_update and backup_path is not None and backup_path.exists():
shutil.rmtree(backup_path)
if is_update and backup_path is not None:
shutil.copytree(target_path, backup_path)
try:
click.echo(
f"{'Updating' if is_update else 'Downloading'} plugin {plugin_name} from {repo_url}...",
)
get_git_repo(repo_url, target_path, proxy)
# Update succeeded, delete backup
if is_update and backup_path is not None and backup_path.exists():
shutil.rmtree(backup_path)
click.echo(
f"Plugin {plugin_name} {'updated' if is_update else 'installed'} successfully"
)
except Exception as e:
if target_path.exists():
shutil.rmtree(target_path, ignore_errors=True)
if is_update and backup_path is not None and backup_path.exists():
shutil.move(backup_path, target_path)
raise click.ClickException(
f"Error {'updating' if is_update else 'installing'} plugin {plugin_name}: {e}",
)
|