fe / bt-source /panel /mod /project /backup_restore /ssh_manager.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 通过SSH连接新服务器,安装宝塔面板并进行备份还原
import paramiko
import os
import time
import sys
import json
import re
import argparse
import logging
import socket
import datetime
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
if '/www/server/panel' not in sys.path:
sys.path.insert(0, '/www/server/panel')
import public
try:
from paramiko import SSHClient, AutoAddPolicy
from paramiko.sftp_client import SFTPClient
except:
public.ExecShell("btpip install paramiko")
try:
from paramiko import SSHClient, AutoAddPolicy
from paramiko.sftp_client import SFTPClient
except:
pass
# 添加迁移进度跟踪相关的变量和工具函数
BACKUP_RESTORE_PATH = "/www/backup/backup_restore"
MIGRATION_TASK_JSON = '/www/backup/backup_restore/migration_task.json'
MIGRATION_LOG_FILE = '/www/backup/backup_restore/migration.log'
MIGRATION_PL_FILE = '/www/backup/backup_restore/migration.pl'
MIGRATION_SUCCESS_FILE = '/www/backup/backup_restore/migration_success.pl'
# 迁移状态码
MIGRATION_STATUS = {
'PENDING': 0, # 等待中
'RUNNING': 1, # 运行中
'COMPLETED': 2, # 已完成
'FAILED': 3, # 失败
}
# 迁移阶段
MIGRATION_STAGES = {
'INIT': {
'code': 'init',
'display': '初始化',
'progress': 5,
},
'PANEL_INSTALL': {
'code': 'panel_install',
'display': '面板安装',
'progress': 20,
},
'LOCAL_BACKUP': {
'code': 'local_backup',
'display': '本地备份',
'progress': 40,
},
'FILE_UPLOAD': {
'code': 'file_upload',
'display': '文件上传',
'progress': 70,
},
'RESTORE': {
'code': 'restore',
'display': '数据还原',
'progress': 90,
},
'COMPLETED': {
'code': 'completed',
'display': '迁移完成',
'progress': 100,
}
}
def write_migration_log(message, task_id=None, log_type='INFO'):
"""写入迁移日志"""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_message = f"[{timestamp}] [{log_type}] {message}\n"
# 确保目录存在
log_dir = os.path.dirname(MIGRATION_LOG_FILE)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 写入主日志文件
with open(MIGRATION_LOG_FILE, 'a+') as f:
f.write(log_message)
# 如果指定了任务ID,同时写入任务特定的日志文件
if task_id:
task_log_file = f"/www/backup/backup_restore/{task_id}_migration/migration.log"
task_log_dir = os.path.dirname(task_log_file)
if not os.path.exists(task_log_dir):
os.makedirs(task_log_dir)
with open(task_log_file, 'a+') as f:
f.write(log_message)
return log_message.strip()
def update_migration_status(task_id, stage, status=MIGRATION_STATUS['RUNNING'], message=None, details=None):
"""更新迁移任务状态"""
# 确保目录存在
task_dir = f"/www/backup/backup_restore/{task_id}_migration"
if not os.path.exists(task_dir):
os.makedirs(task_dir)
# 任务状态文件路径
task_status_file = f"{task_dir}/status.json"
# 读取当前状态(如果存在)
current_status = {}
if os.path.exists(task_status_file):
try:
with open(task_status_file, 'r') as f:
current_status = json.load(f)
except Exception as e:
write_migration_log(f"读取任务状态失败: {e}", task_id, 'ERROR')
# 更新状态为新格式
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
if 'start_time' not in current_status:
current_status['start_time'] = current_time
current_status['task_id'] = task_id
current_status['last_update'] = current_time
current_status['server_ip'] = current_status.get('server_ip', '')
current_status['run_type'] = stage
current_status['run_status'] = status
current_status['step'] = list(MIGRATION_STAGES.keys()).index(stage) + 1 if stage in MIGRATION_STAGES else 1
current_status['migrate_progress'] = MIGRATION_STAGES.get(stage, {}).get('progress', 0)
current_status['migrate_msg'] = message if message else MIGRATION_STAGES.get(stage, {}).get('display', stage)
if details:
if 'task_info' not in current_status:
current_status['task_info'] = {}
current_status['task_info'].update(details)
# 如果任务完成,记录完成时间
if status == MIGRATION_STATUS['COMPLETED']:
current_status['end_time'] = current_time
if 'start_time' in current_status:
start_time = time.strptime(current_status['start_time'], '%Y-%m-%d %H:%M:%S')
end_time = time.strptime(current_time, '%Y-%m-%d %H:%M:%S')
total_seconds = time.mktime(end_time) - time.mktime(start_time)
current_status['total_time'] = total_seconds
# 保存状态
try:
with open(task_status_file, 'w') as f:
json.dump(current_status, f, ensure_ascii=False, indent=2)
except Exception as e:
write_migration_log(f"保存任务状态失败: {e}", task_id, 'ERROR')
# 同时更新全局任务记录
update_global_migration_tasks(task_id, current_status)
return current_status
def update_global_migration_tasks(task_id, task_status):
"""更新全局迁移任务记录 - 唯一任务运行模式"""
# 在唯一任务运行模式下,我们只保存最新的任务
try:
task_dir = os.path.dirname(MIGRATION_TASK_JSON)
if not os.path.exists(task_dir):
os.makedirs(task_dir)
# 直接写入当前任务作为全局任务
with open(MIGRATION_TASK_JSON, 'w') as f:
json.dump(task_status, f, ensure_ascii=False, indent=2)
except Exception as e:
write_migration_log(f"保存全局任务记录失败: {e}", task_id, 'ERROR')
def get_migration_status(task_id=None):
"""获取迁移任务进度
Args:
task_id: 指定任务ID,如果为空则返回最新任务状态
Returns:
任务状态信息
"""
# 如果没有指定task_id,则从migration_task.json获取当前运行的任务
if not task_id:
if os.path.exists(MIGRATION_PL_FILE):
try:
with open(MIGRATION_PL_FILE, 'r') as f:
task_id = f.read().strip()
except:
return {"status": False, "msg": "无法读取当前运行的任务ID"}
# 如果没有获取到任务ID,则直接读取全局任务文件
if not task_id and os.path.exists(MIGRATION_TASK_JSON):
try:
with open(MIGRATION_TASK_JSON, 'r') as f:
task_data = json.load(f)
return {"status": True, "msg": "获取任务状态成功", "data": task_data}
except Exception as e:
return {"status": False, "msg": f"读取任务状态失败: {e}"}
if not task_id:
return {"status": False, "msg": "没有正在运行的任务"}
# 获取指定任务的状态
task_status_file = f"/www/backup/backup_restore/{task_id}_migration/status.json"
if os.path.exists(task_status_file):
try:
with open(task_status_file, 'r') as f:
task_data = json.load(f)
return {"status": True, "msg": "获取任务状态成功", "data": task_data}
except Exception as e:
return {"status": False, "msg": f"读取任务状态失败: {e}"}
else:
return {"status": False, "msg": f"任务 {task_id} 不存在"}
def create_migration_task(task_name, host, port=22, username='root', password=None, key_file=None, backup_file=None):
"""创建新的迁移任务"""
task_id = str(int(time.time()))
# 检查是否存在正在运行的任务
if os.path.exists(MIGRATION_PL_FILE):
try:
with open(MIGRATION_PL_FILE, 'r') as f:
running_task_id = f.read().strip()
# 如果有运行中的任务,返回错误
if running_task_id:
error_msg = f"已有任务正在运行,任务ID: {running_task_id}"
write_migration_log(error_msg)
return {"status": False, "msg": error_msg}
except:
pass
# 按照comMod.py中的格式创建任务数据
print(host)
task_data = {
'task_id': task_id,
'server_ip': host,
'ssh_port': port,
'ssh_user': username,
'auth_type': 'password' if password else 'key',
'password': password if password else '',
'timestamp': int(time.time()),
'run_type': 'INIT',
'run_status': MIGRATION_STATUS['RUNNING'],
'step': 1,
'migrate_progress': MIGRATION_STAGES['INIT']['progress'],
'migrate_msg': '迁移任务初始化中',
'task_info': {
'task_name': task_name,
'backup_file': backup_file,
'start_time': time.strftime('%Y-%m-%d %H:%M:%S')
}
}
# 创建任务目录
task_dir = f"/www/backup/backup_restore/{task_id}_migration"
if not os.path.exists(task_dir):
os.makedirs(task_dir)
# 保存任务状态
with open(f"{task_dir}/status.json", 'w') as f:
json.dump(task_data, f, ensure_ascii=False, indent=2)
# 更新全局任务记录
update_global_migration_tasks(task_id, task_data)
# 创建进程锁文件
with open(MIGRATION_PL_FILE, 'w') as f:
f.write(task_id)
write_migration_log(f"创建迁移任务: {task_name} -> {host}", task_id)
return {"status": True, "msg": "迁移任务创建成功", "task_id": task_id}
class BtInstallManager:
def __init__(self, host, port=22, username='root', password=None, key_file=None,
backup_file=None, panel_port=8888, max_retries=3, retry_interval=5, task_id=None):
"""
初始化SSH连接管理器
Args:
host: 远程服务器IP
port: SSH端口,默认22
username: SSH用户名,默认root
password: SSH密码,与key_file二选一
key_file: SSH密钥文件路径,与password二选一
backup_file: 本地备份文件路径
panel_port: 宝塔面板端口,默认8888
max_retries: 最大重试次数,默认3次
retry_interval: 重试间隔,默认5秒
task_id: 迁移任务ID,用于跟踪进度
"""
self.host = host
self.port = port
self.username = username
self.password = password
self.key_file = key_file
self.backup_file = backup_file
self.panel_port = panel_port
self.ssh = None
self.sftp = None
self.remote_backup_path = '/www/backup/backup_restore'
self.max_retries = max_retries
self.retry_interval = retry_interval
self.task_id = task_id # 添加任务ID
def connect(self):
"""建立SSH连接"""
try:
if self.task_id:
write_migration_log(f"正在连接到服务器 {self.host}:{self.port}", self.task_id)
self.ssh = SSHClient()
self.ssh.set_missing_host_key_policy(AutoAddPolicy())
if self.key_file:
key = self._load_ssh_key(self.key_file)
self.ssh.connect(self.host, self.port, self.username, pkey=key)
else:
self.ssh.connect(self.host, self.port, self.username, self.password)
self.sftp = self.ssh.open_sftp()
print(f"[+] 成功连接到服务器 {self.host}")
if self.task_id:
write_migration_log(f"成功连接到服务器 {self.host}", self.task_id)
return True
except paramiko.AuthenticationException:
error_msg = f"认证失败: 用户名或密码错误"
print(f"[!] {error_msg}")
if self.task_id:
write_migration_log(error_msg, self.task_id, 'ERROR')
update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
except paramiko.SSHException as e:
error_msg = f"SSH连接异常: {e}"
print(f"[!] {error_msg}")
if self.task_id:
write_migration_log(error_msg, self.task_id, 'ERROR')
update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
except socket.error as e:
error_msg = f"网络连接错误: {e}"
print(f"[!] {error_msg}")
if self.task_id:
write_migration_log(error_msg, self.task_id, 'ERROR')
update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
except Exception as e:
error_msg = f"连接服务器失败: {e}"
print(f"[!] {error_msg}")
if self.task_id:
write_migration_log(error_msg, self.task_id, 'ERROR')
update_migration_status(self.task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
def _load_ssh_key(self, key_file, password=None):
"""根据密钥文件自动判断类型并加载"""
# 读取文件内容
with open(key_file, 'r') as f:
content = f.read()
# 尝试不同的密钥类型
key = None
errors = []
# 判断明确标记的密钥类型
if "BEGIN RSA PRIVATE KEY" in content:
try:
key = paramiko.RSAKey.from_private_key_file(key_file, password=password)
if self.task_id:
write_migration_log(f"使用RSA类型密钥连接", self.task_id)
return key
except Exception as e:
errors.append(f"RSA密钥加载失败: {str(e)}")
elif "BEGIN DSA PRIVATE KEY" in content and hasattr(paramiko, "DSSKey"): # 兼容无DSSKey功能的paramiko版本
try:
key = paramiko.DSSKey.from_private_key_file(key_file, password=password)
if self.task_id:
write_migration_log(f"使用DSA类型密钥连接", self.task_id)
return key
except Exception as e:
errors.append(f"DSA密钥加载失败: {str(e)}")
elif "BEGIN EC PRIVATE KEY" in content:
try:
key = paramiko.ECDSAKey.from_private_key_file(key_file, password=password)
if self.task_id:
write_migration_log(f"使用ECDSA类型密钥连接", self.task_id)
return key
except Exception as e:
errors.append(f"ECDSA密钥加载失败: {str(e)}")
elif "BEGIN OPENSSH PRIVATE KEY" in content:
# 对于OPENSSH格式,尝试所有可能的类型
key_types = [
(paramiko.Ed25519Key, "Ed25519"),
(paramiko.RSAKey, "RSA"),
(paramiko.ECDSAKey, "ECDSA"),
]
if hasattr(paramiko, "DSSKey"): # 兼容无DSSKey功能的paramiko版本
key_types.append((paramiko.DSSKey, "DSA"))
for key_class, key_name in key_types:
try:
key = key_class.from_private_key_file(key_file, password=password)
if self.task_id:
write_migration_log(f"使用{key_name}类型密钥连接", self.task_id)
return key
except Exception as e:
errors.append(f"{key_name}密钥加载失败: {str(e)}")
# 如果以上方法都失败,抛出异常
error_msg = "无法识别或加载密钥,请检查密钥文件格式是否正确"
if self.task_id:
write_migration_log(error_msg, self.task_id)
raise ValueError(error_msg)
def reconnect(self):
"""重新连接SSH"""
if self.ssh:
try:
self.ssh.close()
except:
pass
if self.sftp:
try:
self.sftp.close()
except:
pass
self.ssh = None
self.sftp = None
for attempt in range(self.max_retries):
print(f"[*] 尝试重新连接服务器 (尝试 {attempt+1}/{self.max_retries})...")
connection_result = self.connect()
if isinstance(connection_result, dict):
# 连接失败,返回错误信息
if attempt == self.max_retries - 1:
return connection_result
elif connection_result:
return True
time.sleep(self.retry_interval)
print(f"[!] 重新连接服务器失败,已达到最大重试次数 ({self.max_retries})")
return {"status": False, "msg": f"重新连接服务器失败,已达到最大重试次数 ({self.max_retries})"}
def disconnect(self):
"""关闭SSH连接"""
if self.sftp:
self.sftp.close()
if self.ssh:
self.ssh.close()
print(f"[+] 已断开与服务器 {self.host} 的连接")
def exec_command(self, command, print_output=True, retry=True):
"""
执行远程命令
Args:
command: 要执行的命令
print_output: 是否打印输出
retry: 连接断开时是否重试
Returns:
(stdout, stderr)
"""
if not self.ssh:
if retry:
reconnect_result = self.reconnect()
if isinstance(reconnect_result, dict):
return None, None
return self.exec_command(command, print_output, False) # 重连成功后再次尝试,但不再重试
print("[!] SSH连接未建立")
return None, None
try:
print(f"[*] 执行命令: {command}")
stdin, stdout, stderr = self.ssh.exec_command(command)
stdout_content = stdout.read().decode('utf-8')
stderr_content = stderr.read().decode('utf-8')
if print_output:
if stdout_content:
print("[+] 输出:", stdout_content)
if stderr_content:
print("[!] 错误:", stderr_content)
return stdout_content, stderr_content
except (socket.error, paramiko.SSHException) as e:
print(f"[!] 执行命令时SSH连接断开: {e}")
if retry:
reconnect_result = self.reconnect()
if isinstance(reconnect_result, dict):
return None, None
print("[*] 重新连接成功,重试命令...")
return self.exec_command(command, print_output, False) # 重连成功后再次尝试,但不再重试
return None, None
def check_os_type(self):
"""检测服务器操作系统类型"""
print("[*] 检测服务器操作系统类型...")
os_type = None
stdout, _ = self.exec_command("cat /etc/os-release")
if stdout is None:
return None
if "CentOS" in stdout:
os_type = "centos"
elif "Ubuntu" in stdout:
os_type = "ubuntu"
elif "Debian" in stdout:
os_type = "debian"
else:
stdout, _ = self.exec_command("cat /etc/redhat-release", print_output=False)
if stdout and ("CentOS" in stdout or "Red Hat" in stdout):
os_type = "centos"
if os_type:
print(f"[+] 操作系统类型: {os_type}")
else:
print("[!] 无法确定操作系统类型")
return os_type
def install_bt_panel(self):
"""安装宝塔面板"""
if self.task_id:
update_migration_status(self.task_id, 'PANEL_INSTALL', message="开始安装宝塔面板")
print("[*] 开始安装宝塔面板...")
# 检查是否已安装宝塔面板
# stdout, _ = self.exec_command("test -d /www/server/panel && echo 'exists'")
# if stdout is None:
# error_msg = "执行命令失败,无法检查宝塔面板是否已安装"
# if self.task_id:
# update_migration_status(self.task_id, 'PANEL_INSTALL', MIGRATION_STATUS['FAILED'], message=error_msg)
# return {"status": False, "msg": error_msg}
# if 'exists' in stdout:
# message = "宝塔面板已安装,跳过安装步骤"
# print(f"[+] {message}")
# if self.task_id:
# update_migration_status(self.task_id, 'PANEL_INSTALL', MIGRATION_STATUS['COMPLETED'], message=message)
# return {"status": True, "msg": message}
# os_type = self.check_os_type()
# if not os_type:
# error_msg = "无法确定操作系统类型,无法安装宝塔面板"
# if self.task_id:
# update_migration_status(self.task_id, 'PANEL_INSTALL', MIGRATION_STATUS['FAILED'], message=error_msg)
# return {"status": False, "msg": error_msg}
# # 根据操作系统类型选择安装命令
# install_cmd = ""
# if os_type == "centos":
# install_cmd = "yum install -y wget && wget -O install.sh https://download.bt.cn/install/install_6.0.sh && bash install.sh -y -P 8888"
# elif os_type in ["ubuntu", "debian"]:
# install_cmd = "apt-get update && apt-get install -y wget && wget -O install.sh https://download.bt.cn/install/install-ubuntu_6.0.sh && echo 'y' | bash install.sh"
# install_cmd = "apt-get update && apt-get install -y wget && wget -O install.sh https://download.bt.cn/install/install-ubuntu_6.0.sh && bash install.sh -y -P 8888"
# else:
# error_msg = "不支持的操作系统类型"
# print(f"[!] {error_msg}")
# if self.task_id:
# update_migration_status(self.task_id, 'PANEL_INSTALL', MIGRATION_STATUS['FAILED'], message=error_msg)
# return {"status": False, "msg": error_msg}
install_cmd = "if [ -f /usr/bin/curl ];then curl -sSO http://download.bt.cn/install/install_panel_backup_last.sh;else wget -O install_panel_backup_last.sh http://download.bt.cn/install/install_panel_backup_last.sh;fi;nohup bash install_panel_backup_last.sh -y -P 8888 > /root/bt_install.log 2>&1 &"
if self.task_id:
update_migration_status(self.task_id, 'PANEL_INSTALL', message=f"使用命令安装宝塔面板,请稍等...")
print(f"[*] 使用命令安装宝塔面板: {install_cmd}")
stdout, stderr = self.exec_command(install_cmd)
#安装超时限制15分钟
timeout = 900
start_time = time.time()
while time.time() - start_time < timeout:
get_install_progress_cmd = "ps -ef|grep bash|grep install_panel_backup_last.sh|grep -v grep"
stdout, stderr = self.exec_command(get_install_progress_cmd)
if stdout is None or stdout.strip() == "":
print("这里是pid输出",stdout)
get_install_log_cmd = "cat /root/bt_install.log"
stdout, stderr = self.exec_command(get_install_log_cmd)
if "安装完成" in stdout or "Installed successfully" in stdout:
message = "宝塔面板安装成功,正在启动备份任务..."
print(f"[+] {message}")
# 提取面板信息
username_match = re.search(r"username: (.*)", stdout)
password_match = re.search(r"password: (.*)", stdout)
get_panel_admin_path_cmd = "cat /root/bt_install.log"
admin_path, stderr = self.exec_command(" cat /www/server/panel/data/admin_path.pl")
panel_info = {
"panel_url": f"https://{self.host}:{self.panel_port}{admin_path}"
}
if username_match and password_match:
username = username_match.group(1)
password = password_match.group(1)
panel_info["username"] = username
panel_info["password"] = password
print(f"[+] 面板用户名: {username}")
print(f"[+] 面板密码: {password}")
print(f"[+] 面板地址: https://{self.host}:{self.panel_port}")
if self.task_id:
update_migration_status(
self.task_id,
'PANEL_INSTALL',
MIGRATION_STATUS['COMPLETED'],
message=message,
details={"panel_info": panel_info}
)
#update_panel_cmd = "wget -O git_ol.sh http://downooad-test.bt.cn/git_ol.sh;bash git_ol.sh LinuxPanel-9.6.0-9.6.0.zip"
#stdoutd, stderrd = self.exec_command(update_panel_cmd)
return {"status": True, "msg": message, "data": panel_info}
else:
write_migration_log(f"宝塔面板安装失败 {self.host}:{self.port} 错误信息: {stdout}", self.task_id)
error_msg = "宝塔面板安装失败"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'PANEL_INSTALL',
MIGRATION_STATUS['FAILED'],
message=error_msg,
details={"stderr": stderr}
)
return {"status": False, "msg": error_msg, "error_msg": stderr}
else:
time.sleep(3)
get_install_log_cmd = "cat /root/bt_install.log"
stdout, stderr = self.exec_command(get_install_log_cmd)
write_migration_log(f"安装进度: {stdout}")
# if stdout is None:
# error_msg = "执行安装命令失败"
# if self.task_id:
# update_migration_status(self.task_id, 'PANEL_INSTALL', MIGRATION_STATUS['FAILED'], message=error_msg)
# return {"status": False, "msg": error_msg}
# 检查安装结果
if "安装完成" in stdout or "Installed successfully" in stdout:
message = "宝塔面板安装成功,正在启动备份任务..."
print(f"[+] {message}")
# 提取面板信息
username_match = re.search(r"username: (.*)", stdout)
password_match = re.search(r"password: (.*)", stdout)
panel_info = {
"panel_url": f"https://{self.host}:{self.panel_port}"
}
if username_match and password_match:
username = username_match.group(1)
password = password_match.group(1)
panel_info["username"] = username
panel_info["password"] = password
print(f"[+] 面板用户名: {username}")
print(f"[+] 面板密码: {password}")
print(f"[+] 面板地址: https://{self.host}:{self.panel_port}")
if self.task_id:
update_migration_status(
self.task_id,
'PANEL_INSTALL',
MIGRATION_STATUS['COMPLETED'],
message=message,
details={"panel_info": panel_info}
)
#update_panel_cmd = "wget -O git_ol.sh http://downooad-test.bt.cn/git_ol.sh;bash git_ol.sh LinuxPanel-9.6.0-9.6.0.zip"
#stdout, stderr = self.exec_command(get_install_progress_cmd)
return {"status": True, "msg": message, "data": panel_info}
else:
write_migration_log(f"宝塔面板安装失败 {self.host}:{self.port} 错误信息: {stdout}", self.task_id)
error_msg = "宝塔面板安装失败"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'PANEL_INSTALL',
MIGRATION_STATUS['FAILED'],
message=error_msg,
details={"stderr": stderr}
)
return {"status": False, "msg": error_msg, "error_msg": stderr}
def create_backup_dir(self):
"""创建备份目录"""
print("[*] 创建备份目录...")
# 检查备份目录是否存在,不存在则创建
self.exec_command(f"mkdir -p {self.remote_backup_path}")
stdout, _ = self.exec_command(f"test -d {self.remote_backup_path} && echo 'exists'")
if stdout is None:
return False
if 'exists' in stdout:
print(f"[+] 备份目录 {self.remote_backup_path} 已创建")
return True
else:
print(f"[!] 备份目录 {self.remote_backup_path} 创建失败")
return False
def get_remote_file_size(self, remote_path):
"""获取远程文件大小"""
try:
if not self.sftp:
reconnect_result = self.reconnect()
if isinstance(reconnect_result, dict):
return -1
return self.sftp.stat(remote_path).st_size
except Exception as e:
# 文件不存在或其他错误
return -1
def write_backup_info(self,backup_file_path):
backup_task_json = "/www/backup/backup_restore/backup_task.json"
if os.path.exists(backup_task_json):
task_json_data=json.loads(public.ReadFile(backup_task_json))
for item in task_json_data:
if item["backup_file"] == backup_file_path:
migrate_backup_info_path="/www/backup/backup_restore/migrate_backup_info.json"
public.WriteFile(migrate_backup_info_path,json.dumps(item))
def upload_backup_file(self):
"""上传备份文件到服务器(支持断点续传)"""
if not self.backup_file or not os.path.exists(self.backup_file):
error_msg = f"备份文件 {self.backup_file} 不存在"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(self.task_id, 'FILE_UPLOAD', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message=f"开始上传备份文件 {self.backup_file} 到服务器"
)
print(f"[*] 开始上传备份文件 {self.backup_file} 到服务器...")
backup_filename = os.path.basename(self.backup_file)
remote_file_path = f"{self.remote_backup_path}/{backup_filename}"
# 确保备份目录存在
if not self.create_backup_dir():
error_msg = "创建备份目录失败"
if self.task_id:
update_migration_status(self.task_id, 'FILE_UPLOAD', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
# 获取本地文件大小
local_file_size = os.path.getsize(self.backup_file)
file_size_mb = local_file_size / (1024 * 1024)
print(f"[*] 文件大小: {file_size_mb:.2f} MB")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message=f"准备上传文件,大小: {file_size_mb:.2f} MB",
details={"upload": {"total_size": local_file_size, "size_mb": file_size_mb}}
)
# 检查远程文件是否存在,存在则获取大小
remote_file_size = self.get_remote_file_size(remote_file_path)
# 如果远程文件存在且大小与本地相同,则认为已上传完成
if remote_file_size == local_file_size:
message = "文件已完全上传,跳过上传步骤"
print(f"[+] {message}")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
MIGRATION_STATUS['COMPLETED'],
message=message,
details={"upload": {"status": "completed", "remote_path": remote_file_path}}
)
return {"status": True, "msg": message, "data": {"remote_path": remote_file_path}}
# 如果远程文件存在但大小不同,尝试断点续传
if remote_file_size > 0:
print(f"[*] 检测到不完整的上传文件,尝试断点续传...")
print(f"[*] 已上传: {remote_file_size/local_file_size*100:.2f}% ({remote_file_size/(1024*1024):.2f}MB / {file_size_mb:.2f}MB)")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message=f"检测到不完整的上传文件,从 {remote_file_size/(1024*1024):.2f}MB 继续上传",
details={
"upload": {
"status": "resuming",
"progress": remote_file_size/local_file_size*100,
"uploaded": remote_file_size,
"total_size": local_file_size
}
}
)
else:
remote_file_size = 0
print(f"[*] 开始新的上传...")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message="开始新的上传",
details={"upload": {"status": "starting", "progress": 0}}
)
max_attempts = 3
attempt = 0
offset = remote_file_size
chunk_size = 1024 * 1024 # 1MB 分块上传
while offset < local_file_size and attempt < max_attempts:
try:
if not self.sftp:
reconnect_result = self.reconnect()
if isinstance(reconnect_result, dict):
return reconnect_result
# 打开本地文件
with open(self.backup_file, 'rb') as local_file:
if offset > 0:
local_file.seek(offset)
# 如果文件已存在,则使用追加模式,否则创建新文件
if offset > 0:
remote_file = self.sftp.open(remote_file_path, 'ab')
else:
remote_file = self.sftp.open(remote_file_path, 'wb')
with remote_file:
start_time = time.time()
current_offset = offset
last_update_time = start_time
last_progress_report = 0
# 分块读取和上传
while current_offset < local_file_size:
data = local_file.read(chunk_size)
if not data:
break
remote_file.write(data)
current_offset += len(data)
progress = current_offset / local_file_size * 100
elapsed = time.time() - start_time
speed = (current_offset - offset) / elapsed / 1024 if elapsed > 0 else 0
# 计算剩余时间
remaining_bytes = local_file_size - current_offset
if speed > 0:
remaining_time_seconds = remaining_bytes / (speed * 1024) # speed is in KB/s
remaining_time_str = time.strftime("%H:%M:%S", time.gmtime(remaining_time_seconds))
else:
remaining_time_str = "N/A"
print(f"\r[*] 上传进度: {progress:.2f}% - {speed:.2f} KB/s - 剩余时间: {remaining_time_str}", end="")
write_migration_log(f"总大小{local_file_size/(1024*1024):.2f}MB 正在上传文件: {progress:.2f}% - {speed:.2f} KB/s - 剩余时间: {remaining_time_str}")
# 每5秒或进度增加5%更新一次状态
current_time = time.time()
if (current_time - last_update_time > 5 or progress - last_progress_report >= 5) and self.task_id:
last_update_time = current_time
last_progress_report = progress
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message=f"总大小{local_file_size/(1024*1024):.2f}MB 正在上传文件: {progress:.2f}% - {speed:.2f} KB/s - 剩余时间: {remaining_time_str}",
details={
"upload": {
"status": "uploading",
"progress": progress,
"speed": speed,
"elapsed": elapsed,
"uploaded": current_offset,
"total_size": local_file_size,
"remaining_time": remaining_time_str
}
}
)
# 定期刷新缓冲区
if current_offset % (chunk_size * 10) == 0:
remote_file.flush()
# 确保最后一次刷新
remote_file.flush()
print() # 换行
# 成功完成上传
if current_offset >= local_file_size:
elapsed = time.time() - start_time
total_speed = (current_offset - offset) / elapsed / 1024 if elapsed > 0 else 0
message = f"上传完成,耗时 {elapsed:.2f} 秒,平均速度 {total_speed:.2f} KB/s"
print(f"[+] {message}")
# 验证文件大小
final_size = self.get_remote_file_size(remote_file_path)
if final_size == local_file_size:
success_msg = f"文件大小验证通过: {final_size/(1024*1024):.2f}MB"
print(f"[+] {success_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
MIGRATION_STATUS['COMPLETED'],
message=success_msg,
details={
"upload": {
"status": "completed",
"remote_path": remote_file_path,
"file_size": final_size,
"elapsed_time": elapsed,
"speed": total_speed
}
}
)
return {"status": True, "msg": "文件上传成功", "data": {
"remote_path": remote_file_path,
"file_size": final_size,
"elapsed_time": elapsed,
"speed": total_speed
}}
else:
error_msg = f"文件大小不匹配: 本地 {local_file_size/(1024*1024):.2f}MB, 远程 {final_size/(1024*1024):.2f}MB"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message=error_msg,
details={
"upload": {
"status": "size_mismatch",
"local_size": local_file_size,
"remote_size": final_size
}
}
)
# 更新偏移量,继续尝试
offset = final_size
else:
# 部分上传,更新偏移量
offset = current_offset
except (socket.error, paramiko.SSHException, IOError) as e:
attempt += 1
error_msg = f"上传中断: {e}"
print(f"\n[!] {error_msg}")
print(f"[*] 将尝试从断点 {offset/(1024*1024):.2f}MB 继续上传 (尝试 {attempt}/{max_attempts})")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
message=f"上传中断: {e},将尝试从断点 {offset/(1024*1024):.2f}MB 继续上传 (尝试 {attempt}/{max_attempts})",
details={
"upload": {
"status": "interrupted",
"attempt": attempt,
"max_attempts": max_attempts,
"uploaded": offset,
"total_size": local_file_size
}
}
)
time.sleep(2)
# 尝试重新连接
reconnect_result = self.reconnect()
if isinstance(reconnect_result, dict):
continue
if offset >= local_file_size:
message = "文件上传成功"
print(f"[+] {message}")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
MIGRATION_STATUS['COMPLETED'],
message=message,
details={"upload": {"status": "completed", "remote_path": remote_file_path}}
)
return {"status": True, "msg": message, "data": {"remote_path": remote_file_path}}
else:
error_msg = "文件上传失败,已达到最大重试次数"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'FILE_UPLOAD',
MIGRATION_STATUS['FAILED'],
message=error_msg,
details={
"upload": {
"status": "failed",
"attempts": attempt,
"max_attempts": max_attempts
}
}
)
return {"status": False, "msg": error_msg}
def extract_backup(self, backup_filename):
"""解压备份文件"""
print(f"[*] 开始解压备份文件 {backup_filename}...")
remote_file_path = f"{self.remote_backup_path}/{backup_filename}"
# 检查文件是否存在
stdout, _ = self.exec_command(f"test -f {remote_file_path} && echo 'exists'")
if stdout is None or 'exists' not in stdout:
print(f"[!] 备份文件 {remote_file_path} 不存在")
return False
# 解压文件
extract_cmd = f"cd {self.remote_backup_path} && tar -zxvf {backup_filename}"
stdout, stderr = self.exec_command(extract_cmd)
if stdout is None:
return False
# 提取备份文件中的时间戳
timestamp_match = re.search(r"(\d+)_backup", backup_filename)
if not timestamp_match:
print("[!] 无法从备份文件名中提取时间戳")
return False
timestamp = timestamp_match.group(1)
backup_dir = f"{self.remote_backup_path}/{timestamp}_backup"
# 检查解压是否成功
stdout, _ = self.exec_command(f"test -d {backup_dir} && echo 'exists'")
if stdout is None or 'exists' not in stdout:
print(f"[!] 备份文件解压失败")
return False
print(f"[+] 备份文件解压成功,解压目录: {backup_dir}")
return timestamp
def restore_backup(self, timestamp):
"""还原备份"""
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
message=f"开始还原备份 (时间戳: {timestamp})"
)
print(f"[*] 开始还原备份 (时间戳: {timestamp})...")
# 等待宝塔面板服务启动
print("[*] 等待宝塔面板服务启动...")
#self.exec_command("systemctl restart btpanel")
#time.sleep(5)
# 检查还原模块是否存在
restore_script = "/www/server/panel/mod/project/backup_restore/restore_manager.py"
stdout, _ = self.exec_command(f"test -f {restore_script} && echo 'exists'")
if stdout is None or 'exists' not in stdout:
error_msg = f"还原模块不存在: {restore_script}"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
MIGRATION_STATUS['FAILED'],
message=error_msg
)
return False
# 执行还原命令
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
message="正在执行添加还原任务..."
)
print("[*] 执行还原操作...")
restore_cmd = "nohup btpython {restore_script} restore_data {timestamp} > /dev/null 2>&1 &".format(restore_script=restore_script,timestamp=timestamp)
print(restore_cmd)
stdout, stderr = self.exec_command(restore_cmd)
print(stdout)
print(stderr)
touch_pl_cmd = "echo 'True' > /www/server/panel/data/migration.pl"
touch_out, touch_err = self.exec_command(touch_pl_cmd)
print(touch_out)
print(touch_err)
message = "执行还原命令成功,请在新服务器查看还原进度"
print(f"[+] {message}")
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
MIGRATION_STATUS['COMPLETED'],
message=message
)
return True
if stdout is None:
error_msg = "执行还原命令失败"
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
MIGRATION_STATUS['FAILED'],
message=error_msg
)
return False
if "还原完成" in stdout or "success" in stdout:
message = "备份还原成功"
print(f"[+] {message}")
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
MIGRATION_STATUS['COMPLETED'],
message=message
)
return True
else:
error_msg = "备份还原失败,请检查日志"
print(f"[!] {error_msg}")
if self.task_id:
update_migration_status(
self.task_id,
'RESTORE',
MIGRATION_STATUS['FAILED'],
message=error_msg,
details={"stderr": stderr}
)
return False
# 增加执行迁移任务的方法
def migrate(self, task_id):
"""执行完整的迁移流程"""
try:
if os.path.exists(MIGRATION_LOG_FILE):
public.ExecShell("rm -f {}".format(MIGRATION_LOG_FILE))
# 更新任务状态为开始
update_migration_status(task_id, 'INIT', message="开始迁移任务")
# 连接服务器
self.task_id = task_id
connection_result = self.connect()
if isinstance(connection_result, dict) and not connection_result.get("status", False):
return connection_result
# 安装宝塔面板
update_migration_status(task_id, 'PANEL_INSTALL', message="准备安装宝塔面板")
write_migration_log("正在安装宝塔面板...预估5分钟....")
install_result = self.install_bt_panel()
if not install_result.get("status", False):
return install_result
write_migration_log("宝塔面板安装完成,正在启动备份任务...")
write_migration_log("请等待备份任务完成上传文件后,再登录面板操作...")
# 在本机执行备份任务
update_migration_status(task_id, 'LOCAL_BACKUP', message="开始在本机执行备份任务")
write_migration_log("开始在本机执行备份任务")
# 创建备份管理器实例
backup_manager = BackupRestoreManager(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
key_file=self.key_file,
backup_file=None,
panel_port=self.panel_port,
max_retries=self.max_retries,
retry_interval=self.retry_interval,
task_id=task_id
)
# 执行本地备份任务
backup_manager.add_backup_task()
# 等待备份任务完成,检查migrate_backup_success.pl文件
migrate_backup_success_pl = '/www/backup/backup_restore/migrate_backup_success.pl'
migrate_backup_pl = '/www/backup/backup_restore/migrate_backup.pl'
# 更新状态
update_migration_status(task_id, 'LOCAL_BACKUP', message="正在等待本地备份完成")
# 最多等待21600秒(6小时)
timeout = 21600
start_time = time.time()
while not os.path.exists(migrate_backup_success_pl) and time.time() - start_time < timeout:
time.sleep(5)
if not os.path.exists(migrate_backup_pl):
error_msg = "备份任务已取消或失败"
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
if not os.path.exists(migrate_backup_success_pl):
error_msg = "备份任务超时,未能在指定时间内完成"
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
# 从migrate_backup.pl文件中获取时间戳
timestamp = ""
if os.path.exists(migrate_backup_pl):
try:
timestamp = public.ReadFile(migrate_backup_pl).strip()
update_migration_status(task_id, 'LOCAL_BACKUP', message=f"获取到备份时间戳: {timestamp}")
except:
error_msg = "读取备份时间戳失败"
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
else:
error_msg = "备份时间戳文件不存在"
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
# 从migrate_backup_success.pl获取备份文件路径
backup_file_path = ""
try:
backup_file_path = public.ReadFile(migrate_backup_success_pl).strip()
if not os.path.exists(backup_file_path):
# 尝试默认路径
default_backup_path = f"/www/backup/backup_restore/{timestamp}_backup.tar.gz"
if os.path.exists(default_backup_path):
backup_file_path = default_backup_path
else:
error_msg = f"备份文件不存在: {backup_file_path}"
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
except:
error_msg = "读取备份文件路径失败"
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
update_migration_status(task_id, 'LOCAL_BACKUP', MIGRATION_STATUS['COMPLETED'],
message=f"本地备份完成: {backup_file_path}")
if os.path.exists("/www/backup/backup_restore/backup.log"):
backup_log_data = public.ReadFile("/www/backup/backup_restore/backup.log")
write_migration_log(backup_log_data)
write_migration_log("本地备份完成")
# 上传备份文件
write_migration_log("准备上传备份文件")
self.backup_file = backup_file_path
update_migration_status(task_id, 'FILE_UPLOAD', message=f"准备上传备份文件: {backup_file_path}")
upload_result = self.upload_backup_file()
if not upload_result.get("status", False):
return upload_result
self.write_backup_info(backup_file_path)
# 上传backup_task.json文件
backup_task_json = '/www/backup/backup_restore/migrate_backup_info.json'
if os.path.exists(backup_task_json):
update_migration_status(task_id, 'FILE_UPLOAD', message="准备上传migrate_backup_info.json文件")
remote_task_json_path = f"{self.remote_backup_path}/migrate_backup_info.json"
try:
if not self.sftp:
reconnect_result = self.reconnect()
if isinstance(reconnect_result, dict):
error_msg = "上传migrate_backup_info.json文件时连接断开"
update_migration_status(task_id, 'FILE_UPLOAD', message=error_msg)
return reconnect_result
# 确保备份目录存在
if not self.create_backup_dir():
error_msg = "创建备份目录失败"
update_migration_status(task_id, 'FILE_UPLOAD', message=error_msg)
return {"status": False, "msg": error_msg}
# 上传任务文件
self.sftp.put(backup_task_json, remote_task_json_path)
update_migration_status(task_id, 'FILE_UPLOAD', message="migrate_backup_info.json文件上传成功")
except Exception as e:
error_msg = f"上传migrate_backup_info.json文件失败: {e}"
update_migration_status(task_id, 'FILE_UPLOAD', message=error_msg)
return {"status": False, "msg": error_msg}
write_migration_log("备份文件上传成功")
# 解压备份文件
backup_filename = os.path.basename(self.backup_file)
print(1)
print(backup_filename)
print(2)
update_migration_status(task_id, 'RESTORE', message=f"准备解压备份文件 {backup_filename}")
# extract_timestamp = self.extract_backup(backup_filename)
# if not extract_timestamp:
# error_msg = "备份文件解压失败"
# update_migration_status(task_id, 'RESTORE', MIGRATION_STATUS['FAILED'], message=error_msg)
# return {"status": False, "msg": error_msg}
# 确保使用正确的时间戳(从migrate_backup.pl中获取)
print(timestamp)
extract_timestamp = timestamp
# if timestamp and timestamp != extract_timestamp:
# update_migration_status(task_id, 'RESTORE', message=f"注意: 使用时间戳 {timestamp} 而非解压得到的 {extract_timestamp}")
# extract_timestamp = timestamp
# 在远程服务器上创建migrate_backup.pl文件,写入时间戳
remote_migrate_backup_pl = f"{self.remote_backup_path}/migrate_backup.pl"
self.exec_command(f"echo '{extract_timestamp}' > {remote_migrate_backup_pl}")
# 还原备份
write_migration_log("准备添加还原备份任务")
update_migration_status(task_id, 'RESTORE', message=f"准备还原备份 (时间戳: {extract_timestamp})")
if not self.restore_backup(extract_timestamp):
error_msg = "执行备份还原时出现了异常"
update_migration_status(task_id, 'RESTORE', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
write_migration_log("还原备份任务添加完成")
# 完成迁移
success_msg = "所有迁移操作已完成"
update_migration_status(task_id, 'COMPLETED', MIGRATION_STATUS['COMPLETED'], message=success_msg)
write_migration_log("迁移任务完成")
public.ExecShell("\\cp -rpa {} {}/{}_migration/migration.log".format(MIGRATION_LOG_FILE,BACKUP_RESTORE_PATH,task_id))
# 创建成功标记
with open(MIGRATION_SUCCESS_FILE, 'w') as f:
f.write(task_id)
print(f"[+] {success_msg}")
return {"status": True, "msg": success_msg}
except Exception as e:
error_msg = f"迁移过程中发生错误: {e}"
print(f"[!] {error_msg}")
if task_id:
update_migration_status(task_id, 'INIT', MIGRATION_STATUS['FAILED'], message=error_msg)
return {"status": False, "msg": error_msg}
finally:
self.disconnect()
# 清理进程锁文件
if os.path.exists(MIGRATION_PL_FILE):
public.ExecShell(f"rm -f {MIGRATION_PL_FILE}")
# 接口1: 验证SSH连接信息
def verify_ssh_connection(self):
"""验证SSH连接信息是否正常"""
print("[*] 验证SSH连接信息...")
connection_result = self.connect()
if isinstance(connection_result, dict):
return connection_result
if connection_result:
print("[+] SSH连接验证成功")
self.disconnect()
return {"status": True, "msg": "SSH连接验证成功"}
return {"status": False, "msg": "SSH连接验证失败"}
# 接口2: 安装宝塔面板
def install_panel(self):
"""安装宝塔面板接口"""
print("[*] 安装宝塔面板...")
# 连接服务器
connection_result = self.connect()
if isinstance(connection_result, dict):
return connection_result
# 安装宝塔面板
try:
install_result = self.install_bt_panel()
return install_result
finally:
self.disconnect()
# 接口3: 上传备份文件
def upload_backup(self):
"""上传备份文件接口"""
print("[*] 上传备份文件...")
if not self.backup_file:
return {"status": False, "msg": "未指定备份文件路径"}
# 连接服务器
connection_result = self.connect()
if isinstance(connection_result, dict):
return connection_result
# 上传备份文件
try:
upload_result = self.upload_backup_file()
return upload_result
finally:
self.disconnect()
def run(self):
"""执行全部安装和还原流程"""
try:
# 连接服务器
connection_result = self.connect()
if isinstance(connection_result, dict):
return connection_result
# 安装宝塔面板
# install_result = self.install_bt_panel()
# if not install_result["status"]:
# return install_result
# 如果指定了备份文件,执行还原操作
if self.backup_file:
# 上传备份文件
upload_result = self.upload_backup_file()
if not upload_result["status"]:
return upload_result
# 解压备份文件
backup_filename = os.path.basename(self.backup_file)
timestamp = self.extract_backup(backup_filename)
if not timestamp:
return {"status": False, "msg": "备份文件解压失败"}
# 还原备份
if not self.restore_backup(timestamp):
return {"status": False, "msg": "备份还原失败"}
print("[+] 所有操作已完成")
return {"status": True, "msg": "所有操作已完成"}
except Exception as e:
print(f"[!] 执行过程中发生错误: {e}")
return {"status": False, "msg": f"执行过程中发生错误: {e}"}
finally:
self.disconnect()
class BackupRestoreManager:
def __init__(self, host, port, username, password, key_file, backup_file, panel_port, max_retries, retry_interval, task_id):
self.host = host
self.port = port
self.username = username
self.password = password
self.key_file = key_file
self.base_path = '/www/backup/backup_restore'
self.bakcup_task_json = self.base_path + '/backup_task.json'
self.backup_pl_file = self.base_path + '/backup.pl'
self.migrate_backup_pl_file = self.base_path + '/migrate_backup.pl'
self.migrate_backup_success_file = self.base_path + '/migrate_backup_success.pl'
self.migrage_save_data_conf = self.base_path + '/migrate_save_data_conf.json'
def add_backup_task(self):
backup_config = []
if os.path.exists(self.bakcup_task_json):
backup_config=json.loads(public.ReadFile(self.bakcup_task_json))
local_timestamp=int(time.time())
backup_timestamp=local_timestamp
get_time=local_timestamp
#get_time=1744611093
print(get_time)
backup_now=True
backup_conf = {}
backup_conf['backup_name'] = "migrate_backup" + str(get_time)
backup_conf['timestamp'] = get_time
backup_conf['create_time'] = datetime.datetime.fromtimestamp(int(local_timestamp)).strftime('%Y-%m-%d %H:%M:%S')
backup_conf['backup_time'] = datetime.datetime.fromtimestamp(int(backup_timestamp)).strftime('%Y-%m-%d %H:%M:%S')
backup_conf['storage_type'] = "local"
backup_conf['auto_exit'] = 0
backup_conf['backup_status'] = 0
backup_conf['restore_status'] = 0
backup_conf['backup_path'] = self.base_path + "/" + str(get_time) + "_backup"
backup_conf['backup_file'] = ""
backup_conf['backup_file_sha256'] = ""
backup_conf['backup_file_size'] = ""
backup_conf['backup_count'] = {}
backup_conf['backup_count']['success'] = None
backup_conf['backup_count']['failed'] = None
backup_conf['total_time']=None
backup_conf['done_time']=None
if os.path.exists(self.migrage_save_data_conf):
save_data_conf=json.loads(public.ReadFile(self.migrage_save_data_conf))
backup_conf['backup_data'] = save_data_conf['backup_data']
backup_conf['database_id'] = save_data_conf['database_id']
backup_conf['site_id'] = save_data_conf['site_id']
backup_config.append(backup_conf)
public.WriteFile(self.bakcup_task_json,json.dumps(backup_config))
if backup_now:
print("[*] 执行备份命令...")
public.ExecShell("nohup btpython /www/server/panel/mod/project/backup_restore/backup_manager.py backup_data {} > /dev/null 2>&1 &".format(int(get_time)))
public.WriteFile(self.migrate_backup_pl_file,str(get_time))
print("[+] 备份任务添加成功")
# 等待备份完成,最多等待21600秒(6小时)
timeout = 21600
start_time = time.time()
print("[*] 等待备份任务完成...")
from mod.project.backup_restore.config_manager import ConfigManager
while time.time() - start_time < timeout:
time.sleep(1)
print(get_time)
sync_backup_config = ConfigManager().get_backup_conf(str(get_time))
print(sync_backup_config)
if sync_backup_config['backup_status'] == 2:
backup_file = sync_backup_config['backup_file']
if os.path.exists(backup_file):
# 检查文件是否已经完成写入(检查文件大小是否稳定)
last_size = 0
stable_count = 0
for _ in range(3): # 检查3次,确保文件大小稳定
current_size = os.path.getsize(backup_file)
if current_size == last_size:
stable_count += 1
else:
stable_count = 0
last_size = current_size
time.sleep(2)
if stable_count >= 2: # 连续2次大小相同,认为文件写入完成
# 写入成功标记,包含备份文件路径
public.WriteFile(self.migrate_backup_success_file, backup_file)
print(f"[+] 备份任务完成: {backup_file}")
return backup_file
# 检查备份任务是否还在进行
if not os.path.exists(self.backup_pl_file) and not os.path.exists(backup_file):
# 备份过程已结束但未生成备份文件,可能失败
error_msg = "备份任务失败,未生成备份文件"
print(f"[!] {error_msg}")
return None
time.sleep(2)
# 超时处理
if os.path.exists(backup_file):
# 虽然超时,但文件存在,可以继续
public.WriteFile(self.migrate_backup_success_file, backup_file)
print(f"[+] 备份任务完成(超时后检测到文件): {backup_file}")
return backup_file
else:
print("[!] 备份任务超时,未能在指定时间内完成")
return None
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='通过SSH连接新服务器,安装宝塔面板并进行备份还原')
parser.add_argument('-H', '--host', required=False, help='远程服务器IP地址')
parser.add_argument('-P', '--port', type=int, default=22, help='SSH端口,默认22')
parser.add_argument('-u', '--username', default='root', help='SSH用户名,默认root')
parser.add_argument('-p', '--password', help='SSH密码,与密钥二选一')
parser.add_argument('-k', '--key-file', help='SSH密钥文件路径,与密码二选一')
parser.add_argument('-b', '--backup-file', help='本地备份文件路径')
parser.add_argument('--panel-port', type=int, default=8888, help='宝塔面板端口,默认8888')
parser.add_argument('-r', '--max-retries', type=int, default=3, help='连接断开时最大重试次数,默认3次')
parser.add_argument('-i', '--retry-interval', type=int, default=5, help='重试间隔秒数,默认5秒')
parser.add_argument('--task-id', help='迁移任务ID,用于跟踪进度')
parser.add_argument('--action', choices=['verify', 'install', 'upload', 'restore', 'migrate', 'all', 'status'], default='all',
help='要执行的操作: verify=验证SSH连接, install=安装宝塔面板, upload=上传备份文件, restore=还原备份, migrate=执行完整迁移, status=获取任务状态, all=全部执行')
parser.add_argument('--task-name', default='默认迁移任务', help='迁移任务名称')
args = parser.parse_args()
# 对于非status操作,验证必要参数
if args.action != 'status' and args.host is None:
parser.error('必须提供远程服务器IP地址(-H/--host)')
# 验证密码和密钥文件至少提供一个,除非是status操作
if args.action != 'status' and not args.password and not args.key_file:
parser.error('必须提供SSH密码或密钥文件路径')
return args
if __name__ == "__main__":
args = parse_arguments()
# 获取任务状态
if args.action == 'status':
result = get_migration_status(args.task_id)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0)
# 检查是否有正在运行的任务
if args.action == 'migrate' and not args.task_id:
# 创建新的迁移任务
task_result = create_migration_task(
task_name=args.task_name,
host=args.host,
port=args.port,
username=args.username,
password=args.password,
key_file=args.key_file,
backup_file=args.backup_file
)
if task_result.get("status", False):
args.task_id = task_result.get("task_id")
print(f"[+] 创建迁移任务成功,任务ID: {args.task_id}")
else:
print(json.dumps(task_result, ensure_ascii=False, indent=2))
sys.exit(1)
manager = BtInstallManager(
host=args.host,
port=args.port,
username=args.username,
password=args.password,
key_file=args.key_file,
backup_file=args.backup_file,
panel_port=args.panel_port,
max_retries=args.max_retries,
retry_interval=args.retry_interval,
task_id=args.task_id
)
# 根据指定的操作执行相应的功能
if args.action == 'verify':
result = manager.verify_ssh_connection()
print(json.dumps(result, ensure_ascii=False, indent=2))
elif args.action == 'install':
result = manager.install_panel()
print(json.dumps(result, ensure_ascii=False, indent=2))
elif args.action == 'upload':
result = manager.upload_backup()
print(json.dumps(result, ensure_ascii=False, indent=2))
elif args.action == 'migrate':
if not args.task_id:
print(json.dumps({"status": False, "msg": "执行迁移任务需要提供task_id参数"}, ensure_ascii=False, indent=2))
else:
result = manager.migrate(args.task_id)
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
result = manager.run()
print(json.dumps(result, ensure_ascii=False, indent=2))