| |
| |
| |
|
|
| import paramiko |
| import os |
| import time |
| import sys |
| import json |
| import re |
| import argparse |
| import logging |
| import socket |
| import datetime |
|
|
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
|
|
| if '/www/server/panel' not in sys.path: |
| sys.path.insert(0, '/www/server/panel') |
|
|
| import public |
|
|
| try: |
| from paramiko import SSHClient, AutoAddPolicy |
| from paramiko.sftp_client import SFTPClient |
| except: |
| public.ExecShell("btpip install paramiko") |
| try: |
| from paramiko import SSHClient, AutoAddPolicy |
| from paramiko.sftp_client import SFTPClient |
| except: |
| pass |
|
|
| |
| BACKUP_RESTORE_PATH = "/www/backup/backup_restore" |
| MIGRATION_TASK_JSON = '/www/backup/backup_restore/migration_task.json' |
| MIGRATION_LOG_FILE = '/www/backup/backup_restore/migration.log' |
| MIGRATION_PL_FILE = '/www/backup/backup_restore/migration.pl' |
| MIGRATION_SUCCESS_FILE = '/www/backup/backup_restore/migration_success.pl' |
|
|
| |
| MIGRATION_STATUS = { |
| 'PENDING': 0, |
| 'RUNNING': 1, |
| 'COMPLETED': 2, |
| 'FAILED': 3, |
| } |
|
|
| |
| MIGRATION_STAGES = { |
| 'INIT': { |
| 'code': 'init', |
| 'display': '初始化', |
| 'progress': 5, |
| }, |
| 'PANEL_INSTALL': { |
| 'code': 'panel_install', |
| 'display': '面板安装', |
| 'progress': 20, |
| }, |
| 'LOCAL_BACKUP': { |
| 'code': 'local_backup', |
| 'display': '本地备份', |
| 'progress': 40, |
| }, |
| 'FILE_UPLOAD': { |
| 'code': 'file_upload', |
| 'display': '文件上传', |
| 'progress': 70, |
| }, |
| 'RESTORE': { |
| 'code': 'restore', |
| 'display': '数据还原', |
| 'progress': 90, |
| }, |
| 'COMPLETED': { |
| 'code': 'completed', |
| 'display': '迁移完成', |
| 'progress': 100, |
| } |
| } |
|
|
| def write_migration_log(message, task_id=None, log_type='INFO'): |
| """写入迁移日志""" |
| timestamp = time.strftime('%Y-%m-%d %H:%M:%S') |
| log_message = f"[{timestamp}] [{log_type}] {message}\n" |
| |
| |
| log_dir = os.path.dirname(MIGRATION_LOG_FILE) |
| if not os.path.exists(log_dir): |
| os.makedirs(log_dir) |
| |
| |
| with open(MIGRATION_LOG_FILE, 'a+') as f: |
| f.write(log_message) |
| |
| |
| if task_id: |
| task_log_file = f"/www/backup/backup_restore/{task_id}_migration/migration.log" |
| task_log_dir = os.path.dirname(task_log_file) |
| if not os.path.exists(task_log_dir): |
| os.makedirs(task_log_dir) |
| with open(task_log_file, 'a+') as f: |
| f.write(log_message) |
| |
| return log_message.strip() |
|
|
| def update_migration_status(task_id, stage, status=MIGRATION_STATUS['RUNNING'], message=None, details=None): |
| """更新迁移任务状态""" |
| |
| task_dir = f"/www/backup/backup_restore/{task_id}_migration" |
| if not os.path.exists(task_dir): |
| os.makedirs(task_dir) |
| |
| |
| task_status_file = f"{task_dir}/status.json" |
| |
| |
| current_status = {} |
| if os.path.exists(task_status_file): |
| try: |
| with open(task_status_file, 'r') as f: |
| current_status = json.load(f) |
| except Exception as e: |
| write_migration_log(f"读取任务状态失败: {e}", task_id, 'ERROR') |
| |
| |
| current_time = time.strftime('%Y-%m-%d %H:%M:%S') |
| if 'start_time' not in current_status: |
| current_status['start_time'] = current_time |
| |
| current_status['task_id'] = task_id |
| current_status['last_update'] = current_time |
| current_status['server_ip'] = current_status.get('server_ip', '') |
| current_status['run_type'] = stage |
| current_status['run_status'] = status |
| current_status['step'] = list(MIGRATION_STAGES.keys()).index(stage) + 1 if stage in MIGRATION_STAGES else 1 |
| current_status['migrate_progress'] = MIGRATION_STAGES.get(stage, {}).get('progress', 0) |
| current_status['migrate_msg'] = message if message else MIGRATION_STAGES.get(stage, {}).get('display', stage) |
| |
| if details: |
| if 'task_info' not in current_status: |
| current_status['task_info'] = {} |
| current_status['task_info'].update(details) |
| |
| |
| if status == MIGRATION_STATUS['COMPLETED']: |
| current_status['end_time'] = current_time |
| if 'start_time' in current_status: |
| start_time = time.strptime(current_status['start_time'], '%Y-%m-%d %H:%M:%S') |
| end_time = time.strptime(current_time, '%Y-%m-%d %H:%M:%S') |
| total_seconds = time.mktime(end_time) - time.mktime(start_time) |
| current_status['total_time'] = total_seconds |
| |
| |
| try: |
| with open(task_status_file, 'w') as f: |
| json.dump(current_status, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| write_migration_log(f"保存任务状态失败: {e}", task_id, 'ERROR') |
| |
| |
| update_global_migration_tasks(task_id, current_status) |
| |
| return current_status |
|
|
| def update_global_migration_tasks(task_id, task_status): |
| """更新全局迁移任务记录 - 唯一任务运行模式""" |
| |
| try: |
| task_dir = os.path.dirname(MIGRATION_TASK_JSON) |
| if not os.path.exists(task_dir): |
| os.makedirs(task_dir) |
| |
| with open(MIGRATION_TASK_JSON, 'w') as f: |
| json.dump(task_status, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| write_migration_log(f"保存全局任务记录失败: {e}", task_id, 'ERROR') |
|
|
| def get_migration_status(task_id=None): |
| """获取迁移任务进度 |
| |
| Args: |
| task_id: 指定任务ID,如果为空则返回最新任务状态 |
| |
| Returns: |
| 任务状态信息 |
| """ |
| |
| if not task_id: |
| if os.path.exists(MIGRATION_PL_FILE): |
| try: |
| with open(MIGRATION_PL_FILE, 'r') as f: |
| task_id = f.read().strip() |
| except: |
| return {"status": False, "msg": "无法读取当前运行的任务ID"} |
| |
| |
| if not task_id and os.path.exists(MIGRATION_TASK_JSON): |
| try: |
| with open(MIGRATION_TASK_JSON, 'r') as f: |
| task_data = json.load(f) |
| return {"status": True, "msg": "获取任务状态成功", "data": task_data} |
| except Exception as e: |
| return {"status": False, "msg": f"读取任务状态失败: {e}"} |
| |
| if not task_id: |
| return {"status": False, "msg": "没有正在运行的任务"} |
| |
| |
| task_status_file = f"/www/backup/backup_restore/{task_id}_migration/status.json" |
| if os.path.exists(task_status_file): |
| try: |
| with open(task_status_file, 'r') as f: |
| task_data = json.load(f) |
| return {"status": True, "msg": "获取任务状态成功", "data": task_data} |
| except Exception as e: |
| return {"status": False, "msg": f"读取任务状态失败: {e}"} |
| else: |
| return {"status": False, "msg": f"任务 {task_id} 不存在"} |
|
|
| def create_migration_task(task_name, host, port=22, username='root', password=None, key_file=None, backup_file=None): |
| """创建新的迁移任务""" |
| task_id = str(int(time.time())) |
| |
| |
| if os.path.exists(MIGRATION_PL_FILE): |
| try: |
| with open(MIGRATION_PL_FILE, 'r') as f: |
| running_task_id = f.read().strip() |
| |
| |
| if running_task_id: |
| error_msg = f"已有任务正在运行,任务ID: {running_task_id}" |
| write_migration_log(error_msg) |
| return {"status": False, "msg": error_msg} |
| except: |
| pass |
| |
| |
| print(host) |
| task_data = { |
| 'task_id': task_id, |
| 'server_ip': host, |
| 'ssh_port': port, |
| 'ssh_user': username, |
| 'auth_type': 'password' if password else 'key', |
| 'password': password if password else '', |
| 'timestamp': int(time.time()), |
| 'run_type': 'INIT', |
| 'run_status': MIGRATION_STATUS['RUNNING'], |
| 'step': 1, |
| 'migrate_progress': MIGRATION_STAGES['INIT']['progress'], |
| 'migrate_msg': '迁移任务初始化中', |
| 'task_info': { |
| 'task_name': task_name, |
| 'backup_file': backup_file, |
| 'start_time': time.strftime('%Y-%m-%d %H:%M:%S') |
| } |
| } |
| |
| |
| task_dir = f"/www/backup/backup_restore/{task_id}_migration" |
| if not os.path.exists(task_dir): |
| os.makedirs(task_dir) |
| |
| |
| with open(f"{task_dir}/status.json", 'w') as f: |
| json.dump(task_data, f, ensure_ascii=False, indent=2) |
| |
| |
| update_global_migration_tasks(task_id, task_data) |
| |
| |
| with open(MIGRATION_PL_FILE, 'w') as f: |
| f.write(task_id) |
| |
| write_migration_log(f"创建迁移任务: {task_name} -> {host}", task_id) |
| |
| return {"status": True, "msg": "迁移任务创建成功", "task_id": task_id} |
|
|
| class BtInstallManager: |
| def __init__(self, host, port=22, username='root', password=None, key_file=None, |
| backup_file=None, panel_port=8888, max_retries=3, retry_interval=5, task_id=None): |
| """ |
| 初始化SSH连接管理器 |
| |
| Args: |
| host: 远程服务器IP |
| port: SSH端口,默认22 |
| username: SSH用户名,默认root |
| password: SSH密码,与key_file二选一 |
| key_file: SSH密钥文件路径,与password二选一 |
| backup_file: 本地备份文件路径 |
| panel_port: 宝塔面板端口,默认8888 |
| max_retries: 最大重试次数,默认3次 |
| retry_interval: 重试间隔,默认5秒 |
| task_id: 迁移任务ID,用于跟踪进度 |
| """ |
| self.host = host |
| self.port = port |
| self.username = username |
| self.password = password |
| self.key_file = key_file |
| self.backup_file = backup_file |
| self.panel_port = panel_port |
| self.ssh = None |
| self.sftp = None |
| self.remote_backup_path = '/www/backup/backup_restore' |
| self.max_retries = max_retries |
| self.retry_interval = retry_interval |
| self.task_id = task_id |
| |
| def connect(self): |
| """建立SSH连接""" |
| try: |
| if self.task_id: |
| write_migration_log(f"正在连接到服务器 {self.host}:{self.port}", self.task_id) |
| |
| self.ssh = SSHClient() |
| self.ssh.set_missing_host_key_policy(AutoAddPolicy()) |
| |
| |
| if self.key_file: |
| key = self._load_ssh_key(self.key_file) |
| self.ssh.connect(self.host, self.port, self.username, pkey=key) |
| else: |
| self.ssh.connect(self.host, self.port, self.username, self.password) |
| |
| self.sftp = self.ssh.open_sftp() |
| print(f"[+] 成功连接到服务器 {self.host}") |
| |
| if self.task_id: |
| write_migration_log(f"成功连接到服务器 {self.host}", self.task_id) |
| |
| return True |
| except paramiko.AuthenticationException: |
| error_msg = f"认证失败: 用户名或密码错误" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| write_migration_log(error_msg, self.task_id, 'ERROR') |
| update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg) |
| |
| return {"status": False, "msg": error_msg} |
| except paramiko.SSHException as e: |
| error_msg = f"SSH连接异常: {e}" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| write_migration_log(error_msg, self.task_id, 'ERROR') |
| update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg) |
| |
| return {"status": False, "msg": error_msg} |
| except socket.error as e: |
| error_msg = f"网络连接错误: {e}" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| write_migration_log(error_msg, self.task_id, 'ERROR') |
| update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg) |
| |
| return {"status": False, "msg": error_msg} |
| except Exception as e: |
| error_msg = f"连接服务器失败: {e}" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| write_migration_log(error_msg, self.task_id, 'ERROR') |
| update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg) |
| |
| return {"status": False, "msg": error_msg} |
| |
| def _load_ssh_key(self, key_file, password=None): |
| """根据密钥文件自动判断类型并加载""" |
| |
| with open(key_file, 'r') as f: |
| content = f.read() |
| |
| |
| key = None |
| errors = [] |
| |
| |
| if "BEGIN RSA PRIVATE KEY" in content: |
| try: |
| key = paramiko.RSAKey.from_private_key_file(key_file, password=password) |
| if self.task_id: |
| write_migration_log(f"使用RSA类型密钥连接", self.task_id) |
| return key |
| except Exception as e: |
| errors.append(f"RSA密钥加载失败: {str(e)}") |
| |
| elif "BEGIN DSA PRIVATE KEY" in content and hasattr(paramiko, "DSSKey"): |
| try: |
| key = paramiko.DSSKey.from_private_key_file(key_file, password=password) |
| if self.task_id: |
| write_migration_log(f"使用DSA类型密钥连接", self.task_id) |
| return key |
| except Exception as e: |
| errors.append(f"DSA密钥加载失败: {str(e)}") |
| |
| elif "BEGIN EC PRIVATE KEY" in content: |
| try: |
| key = paramiko.ECDSAKey.from_private_key_file(key_file, password=password) |
| if self.task_id: |
| write_migration_log(f"使用ECDSA类型密钥连接", self.task_id) |
| return key |
| except Exception as e: |
| errors.append(f"ECDSA密钥加载失败: {str(e)}") |
| |
| elif "BEGIN OPENSSH PRIVATE KEY" in content: |
| |
| key_types = [ |
| (paramiko.Ed25519Key, "Ed25519"), |
| (paramiko.RSAKey, "RSA"), |
| (paramiko.ECDSAKey, "ECDSA"), |
| ] |
| if hasattr(paramiko, "DSSKey"): |
| key_types.append((paramiko.DSSKey, "DSA")) |
|
|
| for key_class, key_name in key_types: |
| try: |
| key = key_class.from_private_key_file(key_file, password=password) |
| if self.task_id: |
| write_migration_log(f"使用{key_name}类型密钥连接", self.task_id) |
| return key |
| except Exception as e: |
| errors.append(f"{key_name}密钥加载失败: {str(e)}") |
| |
| |
| error_msg = "无法识别或加载密钥,请检查密钥文件格式是否正确" |
| if self.task_id: |
| write_migration_log(error_msg, self.task_id) |
| raise ValueError(error_msg) |
| |
| def reconnect(self): |
| """重新连接SSH""" |
| if self.ssh: |
| try: |
| self.ssh.close() |
| except: |
| pass |
| if self.sftp: |
| try: |
| self.sftp.close() |
| except: |
| pass |
| |
| self.ssh = None |
| self.sftp = None |
| |
| for attempt in range(self.max_retries): |
| print(f"[*] 尝试重新连接服务器 (尝试 {attempt+1}/{self.max_retries})...") |
| connection_result = self.connect() |
| if isinstance(connection_result, dict): |
| |
| if attempt == self.max_retries - 1: |
| return connection_result |
| elif connection_result: |
| return True |
| time.sleep(self.retry_interval) |
| |
| print(f"[!] 重新连接服务器失败,已达到最大重试次数 ({self.max_retries})") |
| return {"status": False, "msg": f"重新连接服务器失败,已达到最大重试次数 ({self.max_retries})"} |
| |
| def disconnect(self): |
| """关闭SSH连接""" |
| if self.sftp: |
| self.sftp.close() |
| if self.ssh: |
| self.ssh.close() |
| print(f"[+] 已断开与服务器 {self.host} 的连接") |
| |
| def exec_command(self, command, print_output=True, retry=True): |
| """ |
| 执行远程命令 |
| |
| Args: |
| command: 要执行的命令 |
| print_output: 是否打印输出 |
| retry: 连接断开时是否重试 |
| |
| Returns: |
| (stdout, stderr) |
| """ |
| if not self.ssh: |
| if retry: |
| reconnect_result = self.reconnect() |
| if isinstance(reconnect_result, dict): |
| return None, None |
| return self.exec_command(command, print_output, False) |
| print("[!] SSH连接未建立") |
| return None, None |
| |
| try: |
| print(f"[*] 执行命令: {command}") |
| stdin, stdout, stderr = self.ssh.exec_command(command) |
| stdout_content = stdout.read().decode('utf-8') |
| stderr_content = stderr.read().decode('utf-8') |
| |
| if print_output: |
| if stdout_content: |
| print("[+] 输出:", stdout_content) |
| if stderr_content: |
| print("[!] 错误:", stderr_content) |
| |
| return stdout_content, stderr_content |
| except (socket.error, paramiko.SSHException) as e: |
| print(f"[!] 执行命令时SSH连接断开: {e}") |
| if retry: |
| reconnect_result = self.reconnect() |
| if isinstance(reconnect_result, dict): |
| return None, None |
| print("[*] 重新连接成功,重试命令...") |
| return self.exec_command(command, print_output, False) |
| return None, None |
| |
| def check_os_type(self): |
| """检测服务器操作系统类型""" |
| print("[*] 检测服务器操作系统类型...") |
| os_type = None |
| |
| stdout, _ = self.exec_command("cat /etc/os-release") |
| if stdout is None: |
| return None |
| |
| if "CentOS" in stdout: |
| os_type = "centos" |
| elif "Ubuntu" in stdout: |
| os_type = "ubuntu" |
| elif "Debian" in stdout: |
| os_type = "debian" |
| else: |
| stdout, _ = self.exec_command("cat /etc/redhat-release", print_output=False) |
| if stdout and ("CentOS" in stdout or "Red Hat" in stdout): |
| os_type = "centos" |
| |
| if os_type: |
| print(f"[+] 操作系统类型: {os_type}") |
| else: |
| print("[!] 无法确定操作系统类型") |
| |
| return os_type |
| |
| def install_bt_panel(self): |
| """安装宝塔面板""" |
| if self.task_id: |
| update_migration_status(self.task_id, 'PANEL_INSTALL', message="开始安装宝塔面板") |
| |
| print("[*] 开始安装宝塔面板...") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| install_cmd = "if [ -f /usr/bin/curl ];then curl -sSO http://download.bt.cn/install/install_panel_backup_last.sh;else wget -O install_panel_backup_last.sh http://download.bt.cn/install/install_panel_backup_last.sh;fi;nohup bash install_panel_backup_last.sh -y -P 8888 > /root/bt_install.log 2>&1 &" |
| |
| if self.task_id: |
| update_migration_status(self.task_id, 'PANEL_INSTALL', message=f"使用命令安装宝塔面板,请稍等...") |
| |
| print(f"[*] 使用命令安装宝塔面板: {install_cmd}") |
| stdout, stderr = self.exec_command(install_cmd) |
|
|
|
|
| |
| timeout = 900 |
| start_time = time.time() |
| |
|
|
| while time.time() - start_time < timeout: |
| get_install_progress_cmd = "ps -ef|grep bash|grep install_panel_backup_last.sh|grep -v grep" |
| stdout, stderr = self.exec_command(get_install_progress_cmd) |
|
|
| if stdout is None or stdout.strip() == "": |
| print("这里是pid输出",stdout) |
| get_install_log_cmd = "cat /root/bt_install.log" |
| stdout, stderr = self.exec_command(get_install_log_cmd) |
| if "安装完成" in stdout or "Installed successfully" in stdout: |
| message = "宝塔面板安装成功,正在启动备份任务..." |
| print(f"[+] {message}") |
| |
| |
| username_match = re.search(r"username: (.*)", stdout) |
| password_match = re.search(r"password: (.*)", stdout) |
|
|
| get_panel_admin_path_cmd = "cat /root/bt_install.log" |
| admin_path, stderr = self.exec_command(" cat /www/server/panel/data/admin_path.pl") |
| |
| panel_info = { |
| "panel_url": f"https://{self.host}:{self.panel_port}{admin_path}" |
| } |
| |
| if username_match and password_match: |
| username = username_match.group(1) |
| password = password_match.group(1) |
| panel_info["username"] = username |
| panel_info["password"] = password |
| print(f"[+] 面板用户名: {username}") |
| print(f"[+] 面板密码: {password}") |
| print(f"[+] 面板地址: https://{self.host}:{self.panel_port}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'PANEL_INSTALL', |
| MIGRATION_STATUS['COMPLETED'], |
| message=message, |
| details={"panel_info": panel_info} |
| ) |
| |
| |
| return {"status": True, "msg": message, "data": panel_info} |
| else: |
| write_migration_log(f"宝塔面板安装失败 {self.host}:{self.port} 错误信息: {stdout}", self.task_id) |
| error_msg = "宝塔面板安装失败" |
| print(f"[!] {error_msg}") |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'PANEL_INSTALL', |
| MIGRATION_STATUS['FAILED'], |
| message=error_msg, |
| details={"stderr": stderr} |
| ) |
| return {"status": False, "msg": error_msg, "error_msg": stderr} |
| else: |
| time.sleep(3) |
| get_install_log_cmd = "cat /root/bt_install.log" |
| stdout, stderr = self.exec_command(get_install_log_cmd) |
| write_migration_log(f"安装进度: {stdout}") |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| if "安装完成" in stdout or "Installed successfully" in stdout: |
| message = "宝塔面板安装成功,正在启动备份任务..." |
| print(f"[+] {message}") |
| |
| |
| username_match = re.search(r"username: (.*)", stdout) |
| password_match = re.search(r"password: (.*)", stdout) |
| |
| panel_info = { |
| "panel_url": f"https://{self.host}:{self.panel_port}" |
| } |
| |
| if username_match and password_match: |
| username = username_match.group(1) |
| password = password_match.group(1) |
| panel_info["username"] = username |
| panel_info["password"] = password |
| print(f"[+] 面板用户名: {username}") |
| print(f"[+] 面板密码: {password}") |
| print(f"[+] 面板地址: https://{self.host}:{self.panel_port}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'PANEL_INSTALL', |
| MIGRATION_STATUS['COMPLETED'], |
| message=message, |
| details={"panel_info": panel_info} |
| ) |
| |
| |
| return {"status": True, "msg": message, "data": panel_info} |
| else: |
| write_migration_log(f"宝塔面板安装失败 {self.host}:{self.port} 错误信息: {stdout}", self.task_id) |
| error_msg = "宝塔面板安装失败" |
| print(f"[!] {error_msg}") |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'PANEL_INSTALL', |
| MIGRATION_STATUS['FAILED'], |
| message=error_msg, |
| details={"stderr": stderr} |
| ) |
| return {"status": False, "msg": error_msg, "error_msg": stderr} |
| |
| def create_backup_dir(self): |
| """创建备份目录""" |
| print("[*] 创建备份目录...") |
| |
| |
| self.exec_command(f"mkdir -p {self.remote_backup_path}") |
| |
| stdout, _ = self.exec_command(f"test -d {self.remote_backup_path} && echo 'exists'") |
| if stdout is None: |
| return False |
| |
| if 'exists' in stdout: |
| print(f"[+] 备份目录 {self.remote_backup_path} 已创建") |
| return True |
| else: |
| print(f"[!] 备份目录 {self.remote_backup_path} 创建失败") |
| return False |
| |
| def get_remote_file_size(self, remote_path): |
| """获取远程文件大小""" |
| try: |
| if not self.sftp: |
| reconnect_result = self.reconnect() |
| if isinstance(reconnect_result, dict): |
| return -1 |
| |
| return self.sftp.stat(remote_path).st_size |
| except Exception as e: |
| |
| return -1 |
| |
| def write_backup_info(self,backup_file_path): |
| backup_task_json = "/www/backup/backup_restore/backup_task.json" |
| if os.path.exists(backup_task_json): |
| task_json_data=json.loads(public.ReadFile(backup_task_json)) |
| for item in task_json_data: |
| if item["backup_file"] == backup_file_path: |
| migrate_backup_info_path="/www/backup/backup_restore/migrate_backup_info.json" |
| public.WriteFile(migrate_backup_info_path,json.dumps(item)) |
| |
| def upload_backup_file(self): |
| """上传备份文件到服务器(支持断点续传)""" |
| if not self.backup_file or not os.path.exists(self.backup_file): |
| error_msg = f"备份文件 {self.backup_file} 不存在" |
| print(f"[!] {error_msg}") |
| if self.task_id: |
| update_migration_status(self.task_id, 'FILE_UPLOAD', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message=f"开始上传备份文件 {self.backup_file} 到服务器" |
| ) |
| |
| print(f"[*] 开始上传备份文件 {self.backup_file} 到服务器...") |
| backup_filename = os.path.basename(self.backup_file) |
| remote_file_path = f"{self.remote_backup_path}/{backup_filename}" |
| |
| |
| if not self.create_backup_dir(): |
| error_msg = "创建备份目录失败" |
| if self.task_id: |
| update_migration_status(self.task_id, 'FILE_UPLOAD', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| |
| local_file_size = os.path.getsize(self.backup_file) |
| file_size_mb = local_file_size / (1024 * 1024) |
| print(f"[*] 文件大小: {file_size_mb:.2f} MB") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message=f"准备上传文件,大小: {file_size_mb:.2f} MB", |
| details={"upload": {"total_size": local_file_size, "size_mb": file_size_mb}} |
| ) |
| |
| |
| remote_file_size = self.get_remote_file_size(remote_file_path) |
| |
| |
| if remote_file_size == local_file_size: |
| message = "文件已完全上传,跳过上传步骤" |
| print(f"[+] {message}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| MIGRATION_STATUS['COMPLETED'], |
| message=message, |
| details={"upload": {"status": "completed", "remote_path": remote_file_path}} |
| ) |
| |
| return {"status": True, "msg": message, "data": {"remote_path": remote_file_path}} |
| |
| |
| if remote_file_size > 0: |
| print(f"[*] 检测到不完整的上传文件,尝试断点续传...") |
| print(f"[*] 已上传: {remote_file_size/local_file_size*100:.2f}% ({remote_file_size/(1024*1024):.2f}MB / {file_size_mb:.2f}MB)") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message=f"检测到不完整的上传文件,从 {remote_file_size/(1024*1024):.2f}MB 继续上传", |
| details={ |
| "upload": { |
| "status": "resuming", |
| "progress": remote_file_size/local_file_size*100, |
| "uploaded": remote_file_size, |
| "total_size": local_file_size |
| } |
| } |
| ) |
| else: |
| remote_file_size = 0 |
| print(f"[*] 开始新的上传...") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message="开始新的上传", |
| details={"upload": {"status": "starting", "progress": 0}} |
| ) |
| |
| max_attempts = 3 |
| attempt = 0 |
| offset = remote_file_size |
| chunk_size = 1024 * 1024 |
| |
| while offset < local_file_size and attempt < max_attempts: |
| try: |
| if not self.sftp: |
| reconnect_result = self.reconnect() |
| if isinstance(reconnect_result, dict): |
| return reconnect_result |
| |
| |
| with open(self.backup_file, 'rb') as local_file: |
| if offset > 0: |
| local_file.seek(offset) |
| |
| |
| if offset > 0: |
| remote_file = self.sftp.open(remote_file_path, 'ab') |
| else: |
| remote_file = self.sftp.open(remote_file_path, 'wb') |
| |
| with remote_file: |
| start_time = time.time() |
| current_offset = offset |
| last_update_time = start_time |
| last_progress_report = 0 |
| |
| |
| while current_offset < local_file_size: |
| data = local_file.read(chunk_size) |
| if not data: |
| break |
| |
| remote_file.write(data) |
| current_offset += len(data) |
| progress = current_offset / local_file_size * 100 |
| elapsed = time.time() - start_time |
| speed = (current_offset - offset) / elapsed / 1024 if elapsed > 0 else 0 |
| |
| |
| remaining_bytes = local_file_size - current_offset |
| if speed > 0: |
| remaining_time_seconds = remaining_bytes / (speed * 1024) |
| remaining_time_str = time.strftime("%H:%M:%S", time.gmtime(remaining_time_seconds)) |
| else: |
| remaining_time_str = "N/A" |
| |
| print(f"\r[*] 上传进度: {progress:.2f}% - {speed:.2f} KB/s - 剩余时间: {remaining_time_str}", end="") |
| write_migration_log(f"总大小{local_file_size/(1024*1024):.2f}MB 正在上传文件: {progress:.2f}% - {speed:.2f} KB/s - 剩余时间: {remaining_time_str}") |
| |
| |
| current_time = time.time() |
| if (current_time - last_update_time > 5 or progress - last_progress_report >= 5) and self.task_id: |
| last_update_time = current_time |
| last_progress_report = progress |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message=f"总大小{local_file_size/(1024*1024):.2f}MB 正在上传文件: {progress:.2f}% - {speed:.2f} KB/s - 剩余时间: {remaining_time_str}", |
| details={ |
| "upload": { |
| "status": "uploading", |
| "progress": progress, |
| "speed": speed, |
| "elapsed": elapsed, |
| "uploaded": current_offset, |
| "total_size": local_file_size, |
| "remaining_time": remaining_time_str |
| } |
| } |
| ) |
| |
| |
| if current_offset % (chunk_size * 10) == 0: |
| remote_file.flush() |
| |
| |
| remote_file.flush() |
| print() |
| |
| |
| if current_offset >= local_file_size: |
| elapsed = time.time() - start_time |
| total_speed = (current_offset - offset) / elapsed / 1024 if elapsed > 0 else 0 |
| |
| message = f"上传完成,耗时 {elapsed:.2f} 秒,平均速度 {total_speed:.2f} KB/s" |
| print(f"[+] {message}") |
| |
| |
| final_size = self.get_remote_file_size(remote_file_path) |
| if final_size == local_file_size: |
| success_msg = f"文件大小验证通过: {final_size/(1024*1024):.2f}MB" |
| print(f"[+] {success_msg}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| MIGRATION_STATUS['COMPLETED'], |
| message=success_msg, |
| details={ |
| "upload": { |
| "status": "completed", |
| "remote_path": remote_file_path, |
| "file_size": final_size, |
| "elapsed_time": elapsed, |
| "speed": total_speed |
| } |
| } |
| ) |
| |
| return {"status": True, "msg": "文件上传成功", "data": { |
| "remote_path": remote_file_path, |
| "file_size": final_size, |
| "elapsed_time": elapsed, |
| "speed": total_speed |
| }} |
| else: |
| error_msg = f"文件大小不匹配: 本地 {local_file_size/(1024*1024):.2f}MB, 远程 {final_size/(1024*1024):.2f}MB" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message=error_msg, |
| details={ |
| "upload": { |
| "status": "size_mismatch", |
| "local_size": local_file_size, |
| "remote_size": final_size |
| } |
| } |
| ) |
| |
| |
| offset = final_size |
| else: |
| |
| offset = current_offset |
| |
| except (socket.error, paramiko.SSHException, IOError) as e: |
| attempt += 1 |
| error_msg = f"上传中断: {e}" |
| print(f"\n[!] {error_msg}") |
| print(f"[*] 将尝试从断点 {offset/(1024*1024):.2f}MB 继续上传 (尝试 {attempt}/{max_attempts})") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| message=f"上传中断: {e},将尝试从断点 {offset/(1024*1024):.2f}MB 继续上传 (尝试 {attempt}/{max_attempts})", |
| details={ |
| "upload": { |
| "status": "interrupted", |
| "attempt": attempt, |
| "max_attempts": max_attempts, |
| "uploaded": offset, |
| "total_size": local_file_size |
| } |
| } |
| ) |
| |
| time.sleep(2) |
| |
| |
| reconnect_result = self.reconnect() |
| if isinstance(reconnect_result, dict): |
| continue |
| |
| if offset >= local_file_size: |
| message = "文件上传成功" |
| print(f"[+] {message}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| MIGRATION_STATUS['COMPLETED'], |
| message=message, |
| details={"upload": {"status": "completed", "remote_path": remote_file_path}} |
| ) |
| |
| return {"status": True, "msg": message, "data": {"remote_path": remote_file_path}} |
| else: |
| error_msg = "文件上传失败,已达到最大重试次数" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'FILE_UPLOAD', |
| MIGRATION_STATUS['FAILED'], |
| message=error_msg, |
| details={ |
| "upload": { |
| "status": "failed", |
| "attempts": attempt, |
| "max_attempts": max_attempts |
| } |
| } |
| ) |
| |
| return {"status": False, "msg": error_msg} |
| |
| def extract_backup(self, backup_filename): |
| """解压备份文件""" |
| print(f"[*] 开始解压备份文件 {backup_filename}...") |
| remote_file_path = f"{self.remote_backup_path}/{backup_filename}" |
| |
| |
| stdout, _ = self.exec_command(f"test -f {remote_file_path} && echo 'exists'") |
| if stdout is None or 'exists' not in stdout: |
| print(f"[!] 备份文件 {remote_file_path} 不存在") |
| return False |
| |
| |
| extract_cmd = f"cd {self.remote_backup_path} && tar -zxvf {backup_filename}" |
| stdout, stderr = self.exec_command(extract_cmd) |
| if stdout is None: |
| return False |
| |
| |
| timestamp_match = re.search(r"(\d+)_backup", backup_filename) |
| if not timestamp_match: |
| print("[!] 无法从备份文件名中提取时间戳") |
| return False |
| |
| timestamp = timestamp_match.group(1) |
| backup_dir = f"{self.remote_backup_path}/{timestamp}_backup" |
| |
| |
| stdout, _ = self.exec_command(f"test -d {backup_dir} && echo 'exists'") |
| if stdout is None or 'exists' not in stdout: |
| print(f"[!] 备份文件解压失败") |
| return False |
| |
| print(f"[+] 备份文件解压成功,解压目录: {backup_dir}") |
| return timestamp |
| |
| def restore_backup(self, timestamp): |
| """还原备份""" |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| message=f"开始还原备份 (时间戳: {timestamp})" |
| ) |
| |
| print(f"[*] 开始还原备份 (时间戳: {timestamp})...") |
| |
| |
| print("[*] 等待宝塔面板服务启动...") |
| |
| |
| |
| |
| restore_script = "/www/server/panel/mod/project/backup_restore/restore_manager.py" |
| stdout, _ = self.exec_command(f"test -f {restore_script} && echo 'exists'") |
| if stdout is None or 'exists' not in stdout: |
| error_msg = f"还原模块不存在: {restore_script}" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| MIGRATION_STATUS['FAILED'], |
| message=error_msg |
| ) |
| |
| return False |
| |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| message="正在执行添加还原任务..." |
| ) |
| |
| print("[*] 执行还原操作...") |
| restore_cmd = "nohup btpython {restore_script} restore_data {timestamp} > /dev/null 2>&1 &".format(restore_script=restore_script,timestamp=timestamp) |
| print(restore_cmd) |
| stdout, stderr = self.exec_command(restore_cmd) |
| print(stdout) |
| print(stderr) |
|
|
| touch_pl_cmd = "echo 'True' > /www/server/panel/data/migration.pl" |
| touch_out, touch_err = self.exec_command(touch_pl_cmd) |
| print(touch_out) |
| print(touch_err) |
|
|
| message = "执行还原命令成功,请在新服务器查看还原进度" |
| print(f"[+] {message}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| MIGRATION_STATUS['COMPLETED'], |
| message=message |
| ) |
| |
| return True |
|
|
| if stdout is None: |
| error_msg = "执行还原命令失败" |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| MIGRATION_STATUS['FAILED'], |
| message=error_msg |
| ) |
| |
| return False |
| |
| if "还原完成" in stdout or "success" in stdout: |
| message = "备份还原成功" |
| print(f"[+] {message}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| MIGRATION_STATUS['COMPLETED'], |
| message=message |
| ) |
| |
| return True |
| else: |
| error_msg = "备份还原失败,请检查日志" |
| print(f"[!] {error_msg}") |
| |
| if self.task_id: |
| update_migration_status( |
| self.task_id, |
| 'RESTORE', |
| MIGRATION_STATUS['FAILED'], |
| message=error_msg, |
| details={"stderr": stderr} |
| ) |
| |
| return False |
|
|
| |
| def migrate(self, task_id): |
| """执行完整的迁移流程""" |
| try: |
| if os.path.exists(MIGRATION_LOG_FILE): |
| public.ExecShell("rm -f {}".format(MIGRATION_LOG_FILE)) |
| |
| update_migration_status(task_id, 'INIT', message="开始迁移任务") |
| |
| |
| self.task_id = task_id |
| connection_result = self.connect() |
| if isinstance(connection_result, dict) and not connection_result.get("status", False): |
| return connection_result |
| |
| |
| update_migration_status(task_id, 'PANEL_INSTALL', message="准备安装宝塔面板") |
| write_migration_log("正在安装宝塔面板...预估5分钟....") |
| install_result = self.install_bt_panel() |
| if not install_result.get("status", False): |
| return install_result |
| write_migration_log("宝塔面板安装完成,正在启动备份任务...") |
| write_migration_log("请等待备份任务完成上传文件后,再登录面板操作...") |
| |
| update_migration_status(task_id, 'LOCAL_BACKUP', message="开始在本机执行备份任务") |
| write_migration_log("开始在本机执行备份任务") |
| |
| backup_manager = BackupRestoreManager( |
| host=self.host, |
| port=self.port, |
| username=self.username, |
| password=self.password, |
| key_file=self.key_file, |
| backup_file=None, |
| panel_port=self.panel_port, |
| max_retries=self.max_retries, |
| retry_interval=self.retry_interval, |
| task_id=task_id |
| ) |
| |
| |
| backup_manager.add_backup_task() |
| |
| |
| migrate_backup_success_pl = '/www/backup/backup_restore/migrate_backup_success.pl' |
| migrate_backup_pl = '/www/backup/backup_restore/migrate_backup.pl' |
| |
| |
| update_migration_status(task_id, 'LOCAL_BACKUP', message="正在等待本地备份完成") |
| |
| |
| timeout = 21600 |
| start_time = time.time() |
| |
| while not os.path.exists(migrate_backup_success_pl) and time.time() - start_time < timeout: |
| time.sleep(5) |
| if not os.path.exists(migrate_backup_pl): |
| error_msg = "备份任务已取消或失败" |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| if not os.path.exists(migrate_backup_success_pl): |
| error_msg = "备份任务超时,未能在指定时间内完成" |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| |
| timestamp = "" |
| if os.path.exists(migrate_backup_pl): |
| try: |
| timestamp = public.ReadFile(migrate_backup_pl).strip() |
| update_migration_status(task_id, 'LOCAL_BACKUP', message=f"获取到备份时间戳: {timestamp}") |
| except: |
| error_msg = "读取备份时间戳失败" |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| else: |
| error_msg = "备份时间戳文件不存在" |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| |
| backup_file_path = "" |
| try: |
| backup_file_path = public.ReadFile(migrate_backup_success_pl).strip() |
| if not os.path.exists(backup_file_path): |
| |
| default_backup_path = f"/www/backup/backup_restore/{timestamp}_backup.tar.gz" |
| if os.path.exists(default_backup_path): |
| backup_file_path = default_backup_path |
| else: |
| error_msg = f"备份文件不存在: {backup_file_path}" |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| except: |
| error_msg = "读取备份文件路径失败" |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['COMPLETED'], |
| message=f"本地备份完成: {backup_file_path}") |
| if os.path.exists("/www/backup/backup_restore/backup.log"): |
| backup_log_data = public.ReadFile("/www/backup/backup_restore/backup.log") |
| write_migration_log(backup_log_data) |
| write_migration_log("本地备份完成") |
| |
| |
| write_migration_log("准备上传备份文件") |
| self.backup_file = backup_file_path |
| update_migration_status(task_id, 'FILE_UPLOAD', message=f"准备上传备份文件: {backup_file_path}") |
| upload_result = self.upload_backup_file() |
| if not upload_result.get("status", False): |
| return upload_result |
| |
| self.write_backup_info(backup_file_path) |
| |
| backup_task_json = '/www/backup/backup_restore/migrate_backup_info.json' |
| if os.path.exists(backup_task_json): |
| update_migration_status(task_id, 'FILE_UPLOAD', message="准备上传migrate_backup_info.json文件") |
| remote_task_json_path = f"{self.remote_backup_path}/migrate_backup_info.json" |
| |
| try: |
| if not self.sftp: |
| reconnect_result = self.reconnect() |
| if isinstance(reconnect_result, dict): |
| error_msg = "上传migrate_backup_info.json文件时连接断开" |
| update_migration_status(task_id, 'FILE_UPLOAD', message=error_msg) |
| return reconnect_result |
| |
| |
| if not self.create_backup_dir(): |
| error_msg = "创建备份目录失败" |
| update_migration_status(task_id, 'FILE_UPLOAD', message=error_msg) |
| return {"status": False, "msg": error_msg} |
| |
| |
| self.sftp.put(backup_task_json, remote_task_json_path) |
| update_migration_status(task_id, 'FILE_UPLOAD', message="migrate_backup_info.json文件上传成功") |
| except Exception as e: |
| error_msg = f"上传migrate_backup_info.json文件失败: {e}" |
| update_migration_status(task_id, 'FILE_UPLOAD', message=error_msg) |
| return {"status": False, "msg": error_msg} |
| write_migration_log("备份文件上传成功") |
| |
| backup_filename = os.path.basename(self.backup_file) |
| print(1) |
| print(backup_filename) |
|
|
| print(2) |
| update_migration_status(task_id, 'RESTORE', message=f"准备解压备份文件 {backup_filename}") |
| |
| |
| |
| |
| |
| |
| |
| print(timestamp) |
| extract_timestamp = timestamp |
| |
| |
| |
| |
| |
| remote_migrate_backup_pl = f"{self.remote_backup_path}/migrate_backup.pl" |
| self.exec_command(f"echo '{extract_timestamp}' > {remote_migrate_backup_pl}") |
| |
| |
| write_migration_log("准备添加还原备份任务") |
| update_migration_status(task_id, 'RESTORE', message=f"准备还原备份 (时间戳: {extract_timestamp})") |
| if not self.restore_backup(extract_timestamp): |
| error_msg = "执行备份还原时出现了异常" |
| update_migration_status(task_id, 'RESTORE', MIGRATION_STATUS['FAILED'], message=error_msg) |
| return {"status": False, "msg": error_msg} |
| write_migration_log("还原备份任务添加完成") |
| |
| |
| success_msg = "所有迁移操作已完成" |
| update_migration_status(task_id, 'COMPLETED', MIGRATION_STATUS['COMPLETED'], message=success_msg) |
| write_migration_log("迁移任务完成") |
| public.ExecShell("\\cp -rpa {} {}/{}_migration/migration.log".format(MIGRATION_LOG_FILE,BACKUP_RESTORE_PATH,task_id)) |
| |
| with open(MIGRATION_SUCCESS_FILE, 'w') as f: |
| f.write(task_id) |
| |
| print(f"[+] {success_msg}") |
| return {"status": True, "msg": success_msg} |
| |
| except Exception as e: |
| error_msg = f"迁移过程中发生错误: {e}" |
| print(f"[!] {error_msg}") |
| |
| if task_id: |
| update_migration_status(task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg) |
| |
| return {"status": False, "msg": error_msg} |
| finally: |
| self.disconnect() |
| |
| |
| if os.path.exists(MIGRATION_PL_FILE): |
| public.ExecShell(f"rm -f {MIGRATION_PL_FILE}") |
|
|
| |
| def verify_ssh_connection(self): |
| """验证SSH连接信息是否正常""" |
| print("[*] 验证SSH连接信息...") |
| connection_result = self.connect() |
| |
| if isinstance(connection_result, dict): |
| return connection_result |
| |
| if connection_result: |
| print("[+] SSH连接验证成功") |
| self.disconnect() |
| return {"status": True, "msg": "SSH连接验证成功"} |
| |
| return {"status": False, "msg": "SSH连接验证失败"} |
|
|
| |
| def install_panel(self): |
| """安装宝塔面板接口""" |
| print("[*] 安装宝塔面板...") |
| |
| |
| connection_result = self.connect() |
| if isinstance(connection_result, dict): |
| return connection_result |
| |
| |
| try: |
| install_result = self.install_bt_panel() |
| return install_result |
| finally: |
| self.disconnect() |
|
|
| |
| def upload_backup(self): |
| """上传备份文件接口""" |
| print("[*] 上传备份文件...") |
| |
| if not self.backup_file: |
| return {"status": False, "msg": "未指定备份文件路径"} |
| |
| |
| connection_result = self.connect() |
| if isinstance(connection_result, dict): |
| return connection_result |
| |
| |
| try: |
| upload_result = self.upload_backup_file() |
| return upload_result |
| finally: |
| self.disconnect() |
| |
| def run(self): |
| """执行全部安装和还原流程""" |
| try: |
| |
| connection_result = self.connect() |
| if isinstance(connection_result, dict): |
| return connection_result |
| |
| |
| |
| |
| |
| |
| |
| if self.backup_file: |
| |
| upload_result = self.upload_backup_file() |
| if not upload_result["status"]: |
| return upload_result |
| |
| |
| backup_filename = os.path.basename(self.backup_file) |
| timestamp = self.extract_backup(backup_filename) |
| if not timestamp: |
| return {"status": False, "msg": "备份文件解压失败"} |
| |
| |
| if not self.restore_backup(timestamp): |
| return {"status": False, "msg": "备份还原失败"} |
| |
| print("[+] 所有操作已完成") |
| return {"status": True, "msg": "所有操作已完成"} |
| |
| except Exception as e: |
| print(f"[!] 执行过程中发生错误: {e}") |
| return {"status": False, "msg": f"执行过程中发生错误: {e}"} |
| finally: |
| self.disconnect() |
|
|
| class BackupRestoreManager: |
| def __init__(self, host, port, username, password, key_file, backup_file, panel_port, max_retries, retry_interval, task_id): |
| self.host = host |
| self.port = port |
| self.username = username |
| self.password = password |
| self.key_file = key_file |
| self.base_path = '/www/backup/backup_restore' |
| self.bakcup_task_json = self.base_path + '/backup_task.json' |
| self.backup_pl_file = self.base_path + '/backup.pl' |
| self.migrate_backup_pl_file = self.base_path + '/migrate_backup.pl' |
| self.migrate_backup_success_file = self.base_path + '/migrate_backup_success.pl' |
| self.migrage_save_data_conf = self.base_path + '/migrate_save_data_conf.json' |
| |
| def add_backup_task(self): |
| backup_config = [] |
| if os.path.exists(self.bakcup_task_json): |
| backup_config=json.loads(public.ReadFile(self.bakcup_task_json)) |
|
|
| local_timestamp=int(time.time()) |
| backup_timestamp=local_timestamp |
| get_time=local_timestamp |
| |
| |
| print(get_time) |
|
|
| backup_now=True |
| |
| backup_conf = {} |
|
|
|
|
| backup_conf['backup_name'] = "migrate_backup" + str(get_time) |
| backup_conf['timestamp'] = get_time |
| backup_conf['create_time'] = datetime.datetime.fromtimestamp(int(local_timestamp)).strftime('%Y-%m-%d %H:%M:%S') |
| backup_conf['backup_time'] = datetime.datetime.fromtimestamp(int(backup_timestamp)).strftime('%Y-%m-%d %H:%M:%S') |
| backup_conf['storage_type'] = "local" |
| backup_conf['auto_exit'] = 0 |
| backup_conf['backup_status'] = 0 |
| backup_conf['restore_status'] = 0 |
| backup_conf['backup_path'] = self.base_path + "/" + str(get_time) + "_backup" |
| backup_conf['backup_file'] = "" |
| backup_conf['backup_file_sha256'] = "" |
| backup_conf['backup_file_size'] = "" |
| backup_conf['backup_count'] = {} |
| backup_conf['backup_count']['success'] = None |
| backup_conf['backup_count']['failed'] = None |
| backup_conf['total_time']=None |
| backup_conf['done_time']=None |
| |
| if os.path.exists(self.migrage_save_data_conf): |
| save_data_conf=json.loads(public.ReadFile(self.migrage_save_data_conf)) |
| backup_conf['backup_data'] = save_data_conf['backup_data'] |
| backup_conf['database_id'] = save_data_conf['database_id'] |
| backup_conf['site_id'] = save_data_conf['site_id'] |
| |
| backup_config.append(backup_conf) |
| public.WriteFile(self.bakcup_task_json,json.dumps(backup_config)) |
|
|
| if backup_now: |
| print("[*] 执行备份命令...") |
| public.ExecShell("nohup btpython /www/server/panel/mod/project/backup_restore/backup_manager.py backup_data {} > /dev/null 2>&1 &".format(int(get_time))) |
| |
| |
| public.WriteFile(self.migrate_backup_pl_file,str(get_time)) |
| print("[+] 备份任务添加成功") |
| |
| |
| timeout = 21600 |
| start_time = time.time() |
| print("[*] 等待备份任务完成...") |
| from mod.project.backup_restore.config_manager import ConfigManager |
|
|
| while time.time() - start_time < timeout: |
| time.sleep(1) |
| print(get_time) |
| sync_backup_config = ConfigManager().get_backup_conf(str(get_time)) |
| print(sync_backup_config) |
| if sync_backup_config['backup_status'] == 2: |
| backup_file = sync_backup_config['backup_file'] |
| if os.path.exists(backup_file): |
| |
| last_size = 0 |
| stable_count = 0 |
| for _ in range(3): |
| current_size = os.path.getsize(backup_file) |
| if current_size == last_size: |
| stable_count += 1 |
| else: |
| stable_count = 0 |
| last_size = current_size |
| time.sleep(2) |
| |
| if stable_count >= 2: |
| |
| public.WriteFile(self.migrate_backup_success_file, backup_file) |
| print(f"[+] 备份任务完成: {backup_file}") |
| return backup_file |
| |
| |
| if not os.path.exists(self.backup_pl_file) and not os.path.exists(backup_file): |
| |
| error_msg = "备份任务失败,未生成备份文件" |
| print(f"[!] {error_msg}") |
| return None |
| time.sleep(2) |
| |
| |
| if os.path.exists(backup_file): |
| |
| public.WriteFile(self.migrate_backup_success_file, backup_file) |
| print(f"[+] 备份任务完成(超时后检测到文件): {backup_file}") |
| return backup_file |
| else: |
| print("[!] 备份任务超时,未能在指定时间内完成") |
| return None |
|
|
| def parse_arguments(): |
| """解析命令行参数""" |
| parser = argparse.ArgumentParser(description='通过SSH连接新服务器,安装宝塔面板并进行备份还原') |
| |
| parser.add_argument('-H', '--host', required=False, help='远程服务器IP地址') |
| parser.add_argument('-P', '--port', type=int, default=22, help='SSH端口,默认22') |
| parser.add_argument('-u', '--username', default='root', help='SSH用户名,默认root') |
| parser.add_argument('-p', '--password', help='SSH密码,与密钥二选一') |
| parser.add_argument('-k', '--key-file', help='SSH密钥文件路径,与密码二选一') |
| parser.add_argument('-b', '--backup-file', help='本地备份文件路径') |
| parser.add_argument('--panel-port', type=int, default=8888, help='宝塔面板端口,默认8888') |
| parser.add_argument('-r', '--max-retries', type=int, default=3, help='连接断开时最大重试次数,默认3次') |
| parser.add_argument('-i', '--retry-interval', type=int, default=5, help='重试间隔秒数,默认5秒') |
| parser.add_argument('--task-id', help='迁移任务ID,用于跟踪进度') |
| parser.add_argument('--action', choices=['verify', 'install', 'upload', 'restore', 'migrate', 'all', 'status'], default='all', |
| help='要执行的操作: verify=验证SSH连接, install=安装宝塔面板, upload=上传备份文件, restore=还原备份, migrate=执行完整迁移, status=获取任务状态, all=全部执行') |
| parser.add_argument('--task-name', default='默认迁移任务', help='迁移任务名称') |
| |
| args = parser.parse_args() |
| |
| |
| if args.action != 'status' and args.host is None: |
| parser.error('必须提供远程服务器IP地址(-H/--host)') |
| |
| |
| if args.action != 'status' and not args.password and not args.key_file: |
| parser.error('必须提供SSH密码或密钥文件路径') |
| |
| return args |
|
|
| if __name__ == "__main__": |
| args = parse_arguments() |
| |
| |
| if args.action == 'status': |
| result = get_migration_status(args.task_id) |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |
| sys.exit(0) |
| |
| |
| if args.action == 'migrate' and not args.task_id: |
| |
| task_result = create_migration_task( |
| task_name=args.task_name, |
| host=args.host, |
| port=args.port, |
| username=args.username, |
| password=args.password, |
| key_file=args.key_file, |
| backup_file=args.backup_file |
| ) |
| |
| if task_result.get("status", False): |
| args.task_id = task_result.get("task_id") |
| print(f"[+] 创建迁移任务成功,任务ID: {args.task_id}") |
| else: |
| print(json.dumps(task_result, ensure_ascii=False, indent=2)) |
| sys.exit(1) |
| |
| manager = BtInstallManager( |
| host=args.host, |
| port=args.port, |
| username=args.username, |
| password=args.password, |
| key_file=args.key_file, |
| backup_file=args.backup_file, |
| panel_port=args.panel_port, |
| max_retries=args.max_retries, |
| retry_interval=args.retry_interval, |
| task_id=args.task_id |
| ) |
| |
| |
| if args.action == 'verify': |
| result = manager.verify_ssh_connection() |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |
| elif args.action == 'install': |
| result = manager.install_panel() |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |
| elif args.action == 'upload': |
| result = manager.upload_backup() |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |
| elif args.action == 'migrate': |
| if not args.task_id: |
| print(json.dumps({"status": False, "msg": "执行迁移任务需要提供task_id参数"}, ensure_ascii=False, indent=2)) |
| else: |
| result = manager.migrate(args.task_id) |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |
| else: |
| result = manager.run() |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |