| |
| import subprocess |
| import requests |
| import os |
| import logging |
| import tempfile |
|
|
|
|
| class UpdateSiteTotal: |
| |
| DEFAULT_USER_AGENT = "BT-Panel/10.0" |
| INSTALL_SCRIPT_TIMEOUT = 600 |
| VERSION_CHECK_TIMEOUT = 10 |
| SCRIPT_DOWNLOAD_TIMEOUT = 15 |
|
|
| site_total_path = '/www/server/site_total' |
| download_url = 'https://download.bt.cn/site_total/' |
| version_url = download_url + 'version.txt' |
| install_sh = download_url + 'install.sh' |
| site_total_bin = os.path.join(site_total_path, 'site_total') |
|
|
| def __init__(self): |
| """初始化更新工具 |
| """ |
|
|
| |
| self._init_logger() |
|
|
| def _init_logger(self): |
| """初始化日志配置(封装为独立方法,便于维护)""" |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s' |
| ) |
|
|
| |
| |
| |
| def get_current_version(self): |
| """获取当前安装的版本号(返回字符串格式,如"1.0")""" |
| if not self.is_installed(): |
| self._log_warning(f"未检测到 {self.site_total_bin},无法获取当前版本") |
| return None |
|
|
| try: |
| |
| result = subprocess.run( |
| [self.site_total_bin, 'version'], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| check=True, |
| timeout=self.VERSION_CHECK_TIMEOUT |
| ) |
| except (subprocess.CalledProcessError, subprocess.TimeoutExpired, Exception) as e: |
| return self._handle_version_cmd_error(e) |
|
|
| |
| return self._parse_version_output(result) |
|
|
| def _parse_version_output(self, result): |
| """解析版本命令的输出结果,提取版本号""" |
| |
| output_lines = result.stdout.splitlines() + result.stderr.splitlines() |
| |
| for line in output_lines: |
| if "Version:" in line: |
| |
| raw_version = line.split(":")[-1].strip() |
| return self._format_version(raw_version) |
| |
| self._log_warning(f"无法从输出中解析版本号: {output_lines}") |
| return None |
|
|
| def _format_version(self, raw_version): |
| """将原始版本号格式化为x.y的字符串格式""" |
| try: |
| float_version = float(raw_version) |
| return f"{float_version:.1f}" |
| except ValueError: |
| self._log_error(f"版本号格式异常,原始值: {raw_version}") |
| return None |
|
|
| def _handle_version_cmd_error(self, error): |
| """处理版本命令执行中的异常(统一错误处理)""" |
| if isinstance(error, subprocess.CalledProcessError): |
| self._log_error(f"版本命令执行失败: {error.stderr.strip()}") |
| elif isinstance(error, subprocess.TimeoutExpired): |
| self._log_error("版本命令执行超时") |
| else: |
| self._log_error(f"获取版本号时发生错误: {str(error)}") |
| return None |
|
|
| def get_latest_version(self): |
| """获取最新版本号(返回字符串格式)""" |
| try: |
| response = requests.get( |
| self.version_url, |
| timeout=self.VERSION_CHECK_TIMEOUT, |
| headers={"User-Agent": self.DEFAULT_USER_AGENT} |
| ) |
| response.raise_for_status() |
| latest_version = response.text.strip() |
| |
| if not latest_version: |
| self._log_warning("获取到的最新版本号为空") |
| return None |
| return latest_version |
| except requests.RequestException as e: |
| self._log_error(f"获取最新版本失败: {str(e)}") |
| return None |
|
|
| def check_update_available(self): |
| """检查是否有可用更新(基于float格式版本号比较)""" |
| current_version_str = self.get_current_version() |
| latest_version_str = self.get_latest_version() |
|
|
| |
| if not current_version_str: |
| return False, "无法获取当前版本" |
| if not latest_version_str: |
| return False, "无法获取最新版本" |
|
|
| |
| try: |
| current_version = float(current_version_str) |
| latest_version = float(latest_version_str) |
| except ValueError as e: |
| error_msg = f"版本号格式错误(应为x.y): {str(e)}" |
| self._log_error(f"{error_msg}(当前: {current_version_str}, 最新: {latest_version_str})") |
| return False, error_msg |
|
|
| if latest_version > current_version: |
| return True, f"当前版本: {current_version_str}, 最新版本: {latest_version_str}" |
| else: |
| return False, f"当前版本: {current_version_str}, 最新版本: {latest_version_str}" |
|
|
| |
| |
| |
| def is_installed(self): |
| """检查site_total是否已安装且可执行""" |
| return os.path.exists(self.site_total_bin) and os.access(self.site_total_bin, os.X_OK) |
|
|
| def install_or_update(self): |
| """执行安装或更新操作""" |
| |
| install_script = self._download_install_script() |
| if not install_script: |
| return False, "下载安装脚本失败" |
|
|
| |
| return self._execute_install_script(install_script) |
|
|
| def _download_install_script(self): |
| """下载安装脚本(提取为独立方法,职责单一)""" |
| try: |
| response = requests.get( |
| self.install_sh, |
| timeout=self.SCRIPT_DOWNLOAD_TIMEOUT, |
| headers={"User-Agent": self.DEFAULT_USER_AGENT} |
| ) |
| response.raise_for_status() |
| return response.text |
| except requests.RequestException as e: |
| self._log_error(f"下载安装脚本失败: {str(e)}") |
| return None |
|
|
| def _execute_install_script(self, script_content): |
| """执行安装脚本(提取为独立方法,职责单一)""" |
| temp_script = None |
| try: |
| |
| temp_script = self._create_temp_script(script_content) |
| |
| |
| result = subprocess.run( |
| ["bash", temp_script], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| timeout=self.INSTALL_SCRIPT_TIMEOUT |
| ) |
|
|
| |
| if result.returncode != 0: |
| error_msg = (f"安装脚本执行失败(返回码: {result.returncode})\n" |
| f"错误输出: {result.stderr.strip()[:500]}") |
| self._log_error(error_msg) |
| return False, error_msg |
|
|
| if not self.is_installed(): |
| return False, "安装完成但未找到可执行文件" |
|
|
| return True, "安装/更新成功" |
|
|
| except subprocess.TimeoutExpired: |
| return False, "安装脚本执行超时" |
| except Exception as e: |
| return False, f"执行安装过程中出错: {str(e)}" |
| finally: |
| |
| self._cleanup_temp_script(temp_script) |
|
|
| def _create_temp_script(self, content): |
| """创建临时脚本文件(辅助方法,封装文件操作)""" |
| with tempfile.NamedTemporaryFile( |
| mode='w', |
| suffix='.sh', |
| delete=False, |
| dir='/tmp' |
| ) as f: |
| f.write(content) |
| return f.name |
|
|
| def _cleanup_temp_script(self, script_path): |
| """清理临时脚本文件(辅助方法,确保资源释放)""" |
| if script_path and os.path.exists(script_path): |
| try: |
| os.remove(script_path) |
| except OSError as e: |
| self._log_warning(f"无法删除临时文件 {script_path}: {str(e)}") |
|
|
| |
| |
| |
| def update_if_needed(self): |
| """有更新时执行更新,未安装时执行安装(主逻辑入口)""" |
| if not self.is_installed(): |
| self._log_info("未检测到安装,开始执行安装") |
| return self.install_or_update() |
|
|
| has_update, msg = self.check_update_available() |
| if not has_update: |
| return False, f"无需更新: {msg}" |
|
|
| self._log_info(f"检测到更新: {msg},开始执行更新") |
| return self.install_or_update() |
|
|
| |
| @staticmethod |
| def _log_info(msg): |
| logging.info(msg) |
|
|
| @staticmethod |
| def _log_warning(msg): |
| logging.warning(msg) |
|
|
| @staticmethod |
| def _log_error(msg): |
| logging.error(msg) |
|
|
|
|
| if __name__ == "__main__": |
| updater = UpdateSiteTotal() |
| success, message = updater.update_if_needed() |
| logging.info(f"操作结果: {'成功' if success else '失败'} - {message}") |
|
|