File size: 9,605 Bytes
17e971c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 | # encoding utf-8
import subprocess
import requests
import os
import logging
import tempfile
class UpdateSiteTotal:
# 类级别的常量定义,集中管理固定配置
DEFAULT_USER_AGENT = "BT-Panel/10.0"
INSTALL_SCRIPT_TIMEOUT = 600 # 安装脚本超时时间(秒)
VERSION_CHECK_TIMEOUT = 10 # 版本检查超时时间(秒)
SCRIPT_DOWNLOAD_TIMEOUT = 15 # 脚本下载超时时间(秒)
site_total_path = '/www/server/site_total'
download_url = 'https://download.bt.cn/site_total/'
version_url = download_url + 'version.txt'
install_sh = download_url + 'install.sh'
site_total_bin = os.path.join(site_total_path, 'site_total')
def __init__(self):
"""初始化更新工具
"""
# 初始化日志配置
self._init_logger()
def _init_logger(self):
"""初始化日志配置(封装为独立方法,便于维护)"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# ------------------------------
# 版本相关方法
# ------------------------------
def get_current_version(self):
"""获取当前安装的版本号(返回字符串格式,如"1.0")"""
if not self.is_installed():
self._log_warning(f"未检测到 {self.site_total_bin},无法获取当前版本")
return None
try:
# 执行版本命令
result = subprocess.run(
[self.site_total_bin, 'version'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
timeout=self.VERSION_CHECK_TIMEOUT
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, Exception) as e:
return self._handle_version_cmd_error(e)
# 解析版本号(调用辅助方法)
return self._parse_version_output(result)
def _parse_version_output(self, result):
"""解析版本命令的输出结果,提取版本号"""
# 合并stdout和stderr,避免版本信息输出到错误流
output_lines = result.stdout.splitlines() + result.stderr.splitlines()
for line in output_lines:
if "Version:" in line:
# 提取版本号并格式化(确保x.y格式)
raw_version = line.split(":")[-1].strip()
return self._format_version(raw_version)
self._log_warning(f"无法从输出中解析版本号: {output_lines}")
return None
def _format_version(self, raw_version):
"""将原始版本号格式化为x.y的字符串格式"""
try:
float_version = float(raw_version)
return f"{float_version:.1f}"
except ValueError:
self._log_error(f"版本号格式异常,原始值: {raw_version}")
return None
def _handle_version_cmd_error(self, error):
"""处理版本命令执行中的异常(统一错误处理)"""
if isinstance(error, subprocess.CalledProcessError):
self._log_error(f"版本命令执行失败: {error.stderr.strip()}")
elif isinstance(error, subprocess.TimeoutExpired):
self._log_error("版本命令执行超时")
else:
self._log_error(f"获取版本号时发生错误: {str(error)}")
return None
def get_latest_version(self):
"""获取最新版本号(返回字符串格式)"""
try:
response = requests.get(
self.version_url,
timeout=self.VERSION_CHECK_TIMEOUT,
headers={"User-Agent": self.DEFAULT_USER_AGENT}
)
response.raise_for_status()
latest_version = response.text.strip()
if not latest_version:
self._log_warning("获取到的最新版本号为空")
return None
return latest_version
except requests.RequestException as e:
self._log_error(f"获取最新版本失败: {str(e)}")
return None
def check_update_available(self):
"""检查是否有可用更新(基于float格式版本号比较)"""
current_version_str = self.get_current_version()
latest_version_str = self.get_latest_version()
# 检查版本号获取结果
if not current_version_str:
return False, "无法获取当前版本"
if not latest_version_str:
return False, "无法获取最新版本"
# 版本号比较
try:
current_version = float(current_version_str)
latest_version = float(latest_version_str)
except ValueError as e:
error_msg = f"版本号格式错误(应为x.y): {str(e)}"
self._log_error(f"{error_msg}(当前: {current_version_str}, 最新: {latest_version_str})")
return False, error_msg
if latest_version > current_version:
return True, f"当前版本: {current_version_str}, 最新版本: {latest_version_str}"
else:
return False, f"当前版本: {current_version_str}, 最新版本: {latest_version_str}"
# ------------------------------
# 安装/更新相关方法
# ------------------------------
def is_installed(self):
"""检查site_total是否已安装且可执行"""
return os.path.exists(self.site_total_bin) and os.access(self.site_total_bin, os.X_OK)
def install_or_update(self):
"""执行安装或更新操作"""
# 下载安装脚本
install_script = self._download_install_script()
if not install_script:
return False, "下载安装脚本失败"
# 执行安装脚本
return self._execute_install_script(install_script)
def _download_install_script(self):
"""下载安装脚本(提取为独立方法,职责单一)"""
try:
response = requests.get(
self.install_sh,
timeout=self.SCRIPT_DOWNLOAD_TIMEOUT,
headers={"User-Agent": self.DEFAULT_USER_AGENT}
)
response.raise_for_status()
return response.text
except requests.RequestException as e:
self._log_error(f"下载安装脚本失败: {str(e)}")
return None
def _execute_install_script(self, script_content):
"""执行安装脚本(提取为独立方法,职责单一)"""
temp_script = None
try:
# 创建临时脚本文件
temp_script = self._create_temp_script(script_content)
# 执行脚本
result = subprocess.run(
["bash", temp_script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=self.INSTALL_SCRIPT_TIMEOUT
)
# 检查执行结果
if result.returncode != 0:
error_msg = (f"安装脚本执行失败(返回码: {result.returncode})\n"
f"错误输出: {result.stderr.strip()[:500]}")
self._log_error(error_msg)
return False, error_msg
if not self.is_installed():
return False, "安装完成但未找到可执行文件"
return True, "安装/更新成功"
except subprocess.TimeoutExpired:
return False, "安装脚本执行超时"
except Exception as e:
return False, f"执行安装过程中出错: {str(e)}"
finally:
# 清理临时文件
self._cleanup_temp_script(temp_script)
def _create_temp_script(self, content):
"""创建临时脚本文件(辅助方法,封装文件操作)"""
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.sh',
delete=False,
dir='/tmp'
) as f:
f.write(content)
return f.name
def _cleanup_temp_script(self, script_path):
"""清理临时脚本文件(辅助方法,确保资源释放)"""
if script_path and os.path.exists(script_path):
try:
os.remove(script_path)
except OSError as e:
self._log_warning(f"无法删除临时文件 {script_path}: {str(e)}")
# ------------------------------
# 主逻辑与工具方法
# ------------------------------
def update_if_needed(self):
"""有更新时执行更新,未安装时执行安装(主逻辑入口)"""
if not self.is_installed():
self._log_info("未检测到安装,开始执行安装")
return self.install_or_update()
has_update, msg = self.check_update_available()
if not has_update:
return False, f"无需更新: {msg}"
self._log_info(f"检测到更新: {msg},开始执行更新")
return self.install_or_update()
# 日志工具方法(封装日志调用,便于统一管理)
@staticmethod
def _log_info(msg):
logging.info(msg)
@staticmethod
def _log_warning(msg):
logging.warning(msg)
@staticmethod
def _log_error(msg):
logging.error(msg)
if __name__ == "__main__":
updater = UpdateSiteTotal()
success, message = updater.update_if_needed()
logging.info(f"操作结果: {'成功' if success else '失败'} - {message}")
|