File size: 6,757 Bytes
8ede856 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | import os
import sys
import time
import psutil
from astrbot.core import logger
from astrbot.core.config.default import VERSION
from astrbot.core.utils.astrbot_path import get_astrbot_path
from astrbot.core.utils.io import download_file
from .zip_updator import ReleaseInfo, RepoZipUpdator
class AstrBotUpdator(RepoZipUpdator):
"""AstrBot 更新器,继承自 RepoZipUpdator 类
该类用于处理 AstrBot 的更新操作
功能包括检查更新、下载更新文件、解压缩更新文件等
"""
def __init__(self, repo_mirror: str = "") -> None:
super().__init__(repo_mirror)
self.MAIN_PATH = get_astrbot_path()
self.ASTRBOT_RELEASE_API = "https://api.soulter.top/releases"
def terminate_child_processes(self) -> None:
"""终止当前进程的所有子进程
使用 psutil 库获取当前进程的所有子进程,并尝试终止它们
"""
try:
parent = psutil.Process(os.getpid())
children = parent.children(recursive=True)
logger.info(f"正在终止 {len(children)} 个子进程。")
for child in children:
logger.info(f"正在终止子进程 {child.pid}")
child.terminate()
try:
child.wait(timeout=3)
except psutil.NoSuchProcess:
continue
except psutil.TimeoutExpired:
logger.info(f"子进程 {child.pid} 没有被正常终止, 正在强行杀死。")
child.kill()
except psutil.NoSuchProcess:
pass
@staticmethod
def _is_option_arg(arg: str) -> bool:
return arg.startswith("-")
@classmethod
def _collect_flag_values(cls, argv: list[str], flag: str) -> str | None:
try:
idx = argv.index(flag)
except ValueError:
return None
if idx + 1 >= len(argv):
return None
value_parts: list[str] = []
for arg in argv[idx + 1 :]:
if cls._is_option_arg(arg):
break
if arg:
value_parts.append(arg)
if not value_parts:
return None
return " ".join(value_parts).strip() or None
@classmethod
def _resolve_webui_dir_arg(cls, argv: list[str]) -> str | None:
return cls._collect_flag_values(argv, "--webui-dir")
def _build_frozen_reboot_args(self) -> list[str]:
argv = list(sys.argv[1:])
webui_dir = self._resolve_webui_dir_arg(argv)
if not webui_dir:
webui_dir = os.environ.get("ASTRBOT_WEBUI_DIR")
if webui_dir:
return ["--webui-dir", webui_dir]
return []
@staticmethod
def _reset_pyinstaller_environment() -> None:
if not getattr(sys, "frozen", False):
return
os.environ["PYINSTALLER_RESET_ENVIRONMENT"] = "1"
for key in list(os.environ.keys()):
if key.startswith("_PYI_"):
os.environ.pop(key, None)
def _build_reboot_argv(self, executable: str) -> list[str]:
if os.environ.get("ASTRBOT_CLI") == "1":
args = sys.argv[1:]
return [executable, "-m", "astrbot.cli.__main__", *args]
if getattr(sys, "frozen", False):
args = self._build_frozen_reboot_args()
return [executable, *args]
return [executable, *sys.argv]
@staticmethod
def _exec_reboot(executable: str, argv: list[str]) -> None:
if os.name == "nt" and getattr(sys, "frozen", False):
quoted_executable = f'"{executable}"' if " " in executable else executable
quoted_args = [f'"{arg}"' if " " in arg else arg for arg in argv[1:]]
os.execl(executable, quoted_executable, *quoted_args)
return
os.execv(executable, argv)
def _reboot(self, delay: int = 3) -> None:
"""重启当前程序
在指定的延迟后,终止所有子进程并重新启动程序
这里只能使用 os.exec* 来重启程序
"""
time.sleep(delay)
self.terminate_child_processes()
executable = sys.executable
try:
self._reset_pyinstaller_environment()
reboot_argv = self._build_reboot_argv(executable)
self._exec_reboot(executable, reboot_argv)
except Exception as e:
logger.error(f"重启失败({executable}, {e}),请尝试手动重启。")
raise e
async def check_update(
self,
url: str | None,
current_version: str | None,
consider_prerelease: bool = True,
) -> ReleaseInfo | None:
"""检查更新"""
return await super().check_update(
self.ASTRBOT_RELEASE_API,
VERSION,
consider_prerelease,
)
async def get_releases(self) -> list:
return await self.fetch_release_info(self.ASTRBOT_RELEASE_API)
async def update(self, reboot=False, latest=True, version=None, proxy="") -> None:
update_data = await self.fetch_release_info(self.ASTRBOT_RELEASE_API, latest)
file_url = None
if os.environ.get("ASTRBOT_CLI") or os.environ.get("ASTRBOT_LAUNCHER"):
raise Exception(
"Error: You are running AstrBot via CLI, please use `pip` or `uv tool upgrade` to update AstrBot."
) # 避免版本管理混乱
if latest:
latest_version = update_data[0]["tag_name"]
if self.compare_version(VERSION, latest_version) >= 0:
raise Exception("当前已经是最新版本。")
file_url = update_data[0]["zipball_url"]
elif str(version).startswith("v"):
# 更新到指定版本
for data in update_data:
if data["tag_name"] == version:
file_url = data["zipball_url"]
if not file_url:
raise Exception(f"未找到版本号为 {version} 的更新文件。")
else:
if len(str(version)) != 40:
raise Exception("commit hash 长度不正确,应为 40")
file_url = f"https://github.com/AstrBotDevs/AstrBot/archive/{version}.zip"
logger.info(f"准备更新至指定版本的 AstrBot Core: {version}")
if proxy:
proxy = proxy.removesuffix("/")
file_url = f"{proxy}/{file_url}"
try:
await download_file(file_url, "temp.zip")
logger.info("下载 AstrBot Core 更新文件完成,正在执行解压...")
self.unzip_file("temp.zip", self.MAIN_PATH)
except BaseException as e:
raise e
if reboot:
self._reboot()
|