import os import time from hashlib import md5 from typing import Optional, List, Union, Dict, Any from .util import DB, ExecShell, write_file, write_log from .versions_tool import VersionTool class BackupTool: def __init__(self): self._backup_path: Optional[str] = None self._sub_dir_name: str = "" self.exec_log_file = "/tmp/mod_backup_exec.log" @staticmethod def _hash_src_name(name: Union[str, bytes]) -> str: if isinstance(name, str): name = name.encode('utf-8') md5_obj = md5() md5_obj.update(name) return md5_obj.hexdigest() @property def backup_path(self) -> str: if self._backup_path is None: config_data = DB("config").where("id=?", (1,)).select() if isinstance(config_data, dict): path = config_data["backup_path"] else: # 查询出错 path = "/www/backup" self._backup_path = path return self._backup_path # sub_dir 可以设置为多级子目录, 如 "site/aaa", 或使用列表传递如:["site", "aaa"] def set_sub_dir(self, sub_dir: Union[str, List[str]]) -> Optional[str]: if isinstance(sub_dir, str): self._sub_dir_name = sub_dir.strip("./") elif isinstance(sub_dir, list): self._sub_dir_name = "/".join(filter(None, [i.strip("./") for i in sub_dir])) else: return "不支持的类型设置" def backup(self, src: str, # 源文件位置 backup_path: Optional[str] = None, # 备份位置 sub_dir: Union[str, List[str]] = None, # 备份目录的子目录 site_info: Dict[str, Any] = None, # 关联的站点信息, 必须包含 id 和 name sync=False # 是否同步执行, 默认异步由单独的线程放入后台执行 ) -> Optional[str]: # 返回执行错误的信息 if not os.path.exists(src): return "源路径不存在" if backup_path is None: backup_path = self.backup_path if not os.path.exists(backup_path): return "备份目录不存在" if sub_dir is not None: set_res = self.set_sub_dir(sub_dir) if set_res is not None: return set_res target_path = os.path.join(backup_path, self._sub_dir_name) if not os.path.isdir(target_path): os.makedirs(target_path) zip_name = "{}_{}.tar.gz".format(os.path.basename(src), time.strftime('%Y%m%d_%H%M%S', time.localtime())) if sync: return self._sync_backup(src, target_path, zip_name, site_info) else: return self._async_backup(src, target_path, zip_name, site_info) def _sync_backup(self, src: str, target_path: str, zip_name: str, site_info: dict): try: write_file(self.exec_log_file, "") execStr = ("cd {} && " "tar -zcvf '{}' --exclude=.user.ini ./ 2>&1 > {} \n" "echo '---备份执行完成---' >> {}" ).format(src, os.path.join(target_path, zip_name), self.exec_log_file, self.exec_log_file) ExecShell(execStr) if site_info is not None and "id" in site_info and "name" in site_info: DB('backup').add( 'type,name,pid,filename,size,addtime', (0, zip_name, site_info["id"], os.path.join(target_path, zip_name), 0, self.get_date()) ) write_log('TYPE_SITE', 'SITE_BACKUP_SUCCESS', (site_info["name"],)) except: return "备份执行失败" def _async_backup(self, src: str, target_path: str, zip_name: str, site_info: dict): import threading hash_name = self._hash_src_name(src) backup_tip_path = "/tmp/mod_backup_tip" if os.path.exists(backup_tip_path): os.makedirs(backup_tip_path) tip_file = os.path.join(backup_tip_path, hash_name) if os.path.isfile(tip_file): mtime = os.stat(tip_file).st_mtime if time.time() - mtime > 60 * 20: # 20 分钟未执行,认为出现在不可抗力,导致备份失败,允许再次备份 os.remove(tip_file) else: return "备份进行中,请勿继续操作" write_file(tip_file, "") def _back_p(): try: write_file(self.exec_log_file, "") execStr = ("cd {} && " "tar -zcvf '{}' --exclude=.user.ini ./ 2>&1 > {} \n" "echo '---备份执行完成---' >> {}" ).format(src, os.path.join(target_path, zip_name), self.exec_log_file, self.exec_log_file) ExecShell(execStr) if site_info is not None and "id" in site_info and "name" in site_info: DB('backup').add( 'type,name,pid,filename,size,addtime', (0, zip_name, site_info["id"], os.path.join(target_path, zip_name), 0, self.get_date()) ) write_log('TYPE_SITE', 'SITE_BACKUP_SUCCESS', (site_info["name"],)) except: pass finally: if os.path.exists(tip_file): os.remove(tip_file) t = threading.Thread(target=_back_p) t.start() @staticmethod def get_date(): # 取格式时间 return time.strftime('%Y-%m-%d %X', time.localtime())