| |
| |
| |
| |
| |
| |
|
|
| import json |
| import os |
| import sys |
| import time |
| import datetime |
| import threading |
| from typing import Dict, List, Any, Optional |
|
|
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
|
|
| if '/www/server/panel' not in sys.path: |
| sys.path.insert(0, '/www/server/panel') |
|
|
| import public |
|
|
| class CleanupManager: |
| def __init__(self, backup_manager, config_manager): |
| self.backup_manager = backup_manager |
| self.config_manager = config_manager |
| self.base_path = '/www/backup/mysql_binlog_backup' |
| self.log_file = os.path.join(self.base_path, 'cleanup.log') |
| self.lock = threading.Lock() |
| |
| |
| if not os.path.exists(self.base_path): |
| os.makedirs(self.base_path, exist_ok=True) |
|
|
| def cleanup_database_backups(self, database_name: str, keep_days: int) -> Dict[str, Any]: |
| """清理指定数据库的备份文件""" |
| try: |
| self._write_log(f"开始清理数据库 {database_name} 的备份,保留 {keep_days} 天") |
| |
| |
| all_backups = self.backup_manager.get_backup_files_list(database_name) |
| if not all_backups: |
| msg = f"数据库 {database_name} 没有备份文件" |
| self._write_log(msg) |
| return {"status": True, "msg": msg, "deleted_count": 0, "size_freed": 0} |
| |
| all_backups.sort(key=lambda x: x['backup_time']) |
| |
| |
| cutoff_date = datetime.datetime.now() - datetime.timedelta(days=keep_days) |
| cutoff_date_str = cutoff_date.strftime('%Y-%m-%d %H:%M:%S') |
| |
| self._write_log(f"保留截止时间: {cutoff_date_str}") |
| |
| |
| analysis = self._analyze_backup_structure(all_backups, cutoff_date_str) |
| |
| |
| if not analysis['full_backups']: |
| msg = f"数据库 {database_name} 没有全量备份,跳过清理" |
| self._write_log(msg, 'warning') |
| return {"status": True, "msg": msg, "deleted_count": 0, "size_freed": 0} |
| |
| |
| can_delete = self._determine_deletable_backups(analysis) |
| |
| |
| result = self._execute_cleanup(can_delete, database_name) |
| |
| self._write_log(f"数据库 {database_name} 清理完成: {result['msg']}") |
| |
| return result |
| |
| except Exception as e: |
| error_msg = f"清理数据库 {database_name} 备份失败: {str(e)}" |
| self._write_log(error_msg, 'error') |
| return {"status": False, "msg": error_msg, "deleted_count": 0, "size_freed": 0} |
|
|
| def cleanup_all_backups(self, override_keep_days: Optional[int] = None) -> Dict[str, Any]: |
| """清理所有数据库的备份""" |
| try: |
| self._write_log("开始清理所有数据库的备份") |
| |
| all_tasks = self.config_manager.get_backup_task_list() |
| if not all_tasks: |
| msg = "没有找到备份任务" |
| self._write_log(msg, 'warning') |
| return {"status": True, "msg": msg, "total_deleted": 0, "total_size_freed": 0, "databases": []} |
| |
| total_deleted = 0 |
| total_size_freed = 0 |
| cleanup_results = [] |
| |
| for task in all_tasks: |
| db_name = task['database_name'] |
| keep_days = override_keep_days or task.get('keep_days', 30) |
| |
| result = self.cleanup_database_backups(db_name, keep_days) |
| if result['status']: |
| total_deleted += result['deleted_count'] |
| total_size_freed += result['size_freed'] |
| cleanup_results.append({ |
| 'database': db_name, |
| 'keep_days': keep_days, |
| 'deleted_count': result['deleted_count'], |
| 'size_freed': result['size_freed'], |
| 'formatted_size_freed': self._format_size(result['size_freed']) |
| }) |
| else: |
| self._write_log(f"清理数据库 {db_name} 失败: {result['msg']}", 'error') |
| |
| summary = { |
| 'total_deleted': total_deleted, |
| 'total_size_freed': total_size_freed, |
| 'formatted_size_freed': self._format_size(total_size_freed), |
| 'databases': cleanup_results |
| } |
| |
| msg = f"所有数据库清理完成,共删除 {total_deleted} 个备份,释放 {summary['formatted_size_freed']} 空间" |
| self._write_log(msg) |
| |
| return {"status": True, "msg": msg, "data": summary} |
| |
| except Exception as e: |
| error_msg = f"清理所有备份失败: {str(e)}" |
| self._write_log(error_msg, 'error') |
| return {"status": False, "msg": error_msg} |
|
|
| def _analyze_backup_structure(self, all_backups: List[Dict], cutoff_date_str: str) -> Dict[str, Any]: |
| """分析备份结构""" |
| analysis = { |
| 'recent_backups': [], |
| 'old_backups': [], |
| 'full_backups': [], |
| 'incremental_backups': [] |
| } |
| |
| for backup in all_backups: |
| |
| if backup['backup_time'] >= cutoff_date_str: |
| analysis['recent_backups'].append(backup) |
| else: |
| analysis['old_backups'].append(backup) |
| |
| |
| if backup['backup_type'] == 'full': |
| analysis['full_backups'].append(backup) |
| else: |
| analysis['incremental_backups'].append(backup) |
| |
| |
| analysis['full_backups'].sort(key=lambda x: x['backup_time']) |
| analysis['incremental_backups'].sort(key=lambda x: x['backup_time']) |
| |
| return analysis |
|
|
| def _determine_deletable_backups(self, analysis: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """确定可以删除的备份(智能策略)""" |
| can_delete = [] |
| |
| |
| latest_full_backup = max(analysis['full_backups'], key=lambda x: x['backup_time']) |
| |
| self._write_log(f"最新全量备份: {latest_full_backup['backup_id']} ({latest_full_backup['backup_time']})") |
| |
| |
| for backup in analysis['old_backups']: |
| if backup['backup_type'] == 'incremental': |
| |
| if backup['backup_time'] < latest_full_backup['backup_time']: |
| can_delete.append(backup) |
| self._write_log(f"可删除增量备份: {backup['backup_id']} (有更新的全量备份)") |
| else: |
| self._write_log(f"保留增量备份: {backup['backup_id']} (依赖最新全量备份)") |
| |
| elif backup['backup_type'] == 'full': |
| |
| if backup['backup_id'] != latest_full_backup['backup_id']: |
| can_delete.append(backup) |
| self._write_log(f"可删除旧全量备份: {backup['backup_id']}") |
| |
| |
| dependent_incrementals = self._find_dependent_incrementals( |
| backup, analysis['incremental_backups'], analysis['full_backups'] |
| ) |
| can_delete.extend(dependent_incrementals) |
| |
| for dep in dependent_incrementals: |
| self._write_log(f"可删除依赖增量备份: {dep['backup_id']} (依赖已删除的全量备份)") |
| else: |
| self._write_log(f"保留最新全量备份: {backup['backup_id']}") |
| |
| return can_delete |
|
|
| def _find_dependent_incrementals(self, full_backup: Dict, all_incrementals: List[Dict], |
| all_full_backups: List[Dict]) -> List[Dict[str, Any]]: |
| """找到依赖指定全量备份的增量备份""" |
| dependent = [] |
| full_backup_time = full_backup['backup_time'] |
| |
| |
| next_full_time = None |
| for backup in all_full_backups: |
| if backup['backup_time'] > full_backup_time: |
| if not next_full_time or backup['backup_time'] < next_full_time: |
| next_full_time = backup['backup_time'] |
| |
| |
| for backup in all_incrementals: |
| backup_time = backup['backup_time'] |
| if backup_time > full_backup_time: |
| if not next_full_time or backup_time < next_full_time: |
| dependent.append(backup) |
| |
| return dependent |
|
|
| def _execute_cleanup(self, can_delete: List[Dict], database_name: str) -> Dict[str, Any]: |
| """执行实际的删除操作""" |
| deleted_count = 0 |
| total_size_freed = 0 |
| failed_deletes = [] |
| |
| for backup in can_delete: |
| try: |
| result = self.backup_manager.delete_backup_file(backup['backup_id']) |
| if result['status']: |
| deleted_count += 1 |
| total_size_freed += backup['file_size'] |
| self._write_log(f"已删除备份: {backup['backup_id']} ({self._format_size(backup['file_size'])})") |
| else: |
| failed_deletes.append(backup['backup_id']) |
| self._write_log(f"删除失败: {backup['backup_id']} - {result['msg']}", 'error') |
| except Exception as e: |
| failed_deletes.append(backup['backup_id']) |
| self._write_log(f"删除异常: {backup['backup_id']} - {str(e)}", 'error') |
| |
| msg = f"数据库 {database_name} 清理完成,删除 {deleted_count} 个备份,释放 {self._format_size(total_size_freed)} 空间" |
| if failed_deletes: |
| msg += f",{len(failed_deletes)} 个备份删除失败" |
| |
| return { |
| "status": True, |
| "msg": msg, |
| "deleted_count": deleted_count, |
| "size_freed": total_size_freed, |
| "failed_deletes": failed_deletes |
| } |
|
|
| def get_cleanup_preview(self, database_name: str, keep_days: int) -> Dict[str, Any]: |
| """预览清理效果(不实际删除)""" |
| try: |
| |
| all_backups = self.backup_manager.get_backup_files_list(database_name) |
| if not all_backups: |
| return {"status": True, "msg": "没有备份文件", "preview": {"will_delete": [], "will_keep": []}} |
| |
| all_backups.sort(key=lambda x: x['backup_time']) |
| |
| |
| cutoff_date = datetime.datetime.now() - datetime.timedelta(days=keep_days) |
| cutoff_date_str = cutoff_date.strftime('%Y-%m-%d %H:%M:%S') |
| |
| |
| analysis = self._analyze_backup_structure(all_backups, cutoff_date_str) |
| |
| if not analysis['full_backups']: |
| return {"status": True, "msg": "没有全量备份", "preview": {"will_delete": [], "will_keep": all_backups}} |
| |
| |
| can_delete = self._determine_deletable_backups(analysis) |
| will_keep = [b for b in all_backups if b not in can_delete] |
| |
| |
| delete_size = sum(b['file_size'] for b in can_delete) |
| keep_size = sum(b['file_size'] for b in will_keep) |
| |
| preview = { |
| "will_delete": [{ |
| "backup_id": b['backup_id'], |
| "backup_type": b['backup_type'], |
| "backup_time": b['backup_time'], |
| "file_size": b['file_size'], |
| "formatted_size": self._format_size(b['file_size']) |
| } for b in can_delete], |
| "will_keep": [{ |
| "backup_id": b['backup_id'], |
| "backup_type": b['backup_type'], |
| "backup_time": b['backup_time'], |
| "file_size": b['file_size'], |
| "formatted_size": self._format_size(b['file_size']) |
| } for b in will_keep], |
| "summary": { |
| "total_backups": len(all_backups), |
| "will_delete_count": len(can_delete), |
| "will_keep_count": len(will_keep), |
| "delete_size": delete_size, |
| "keep_size": keep_size, |
| "formatted_delete_size": self._format_size(delete_size), |
| "formatted_keep_size": self._format_size(keep_size), |
| "cutoff_date": cutoff_date_str |
| } |
| } |
| |
| return {"status": True, "msg": "预览生成成功", "preview": preview} |
| |
| except Exception as e: |
| return {"status": False, "msg": f"生成预览失败: {str(e)}"} |
|
|
| def get_cleanup_logs(self, limit: int = 100) -> List[str]: |
| """获取清理日志""" |
| try: |
| if not os.path.exists(self.log_file): |
| return [] |
| |
| logs = [] |
| with open(self.log_file, 'r', encoding='utf-8') as f: |
| lines = f.readlines() |
| |
| |
| for line in reversed(lines[-limit:]): |
| line = line.strip() |
| if line: |
| logs.append(line) |
| |
| return logs |
| |
| except Exception as e: |
| return [f"获取清理日志失败: {str(e)}"] |
|
|
| def _format_size(self, size_bytes: int) -> str: |
| """格式化文件大小""" |
| if size_bytes == 0: |
| return "0 B" |
| |
| size_names = ["B", "KB", "MB", "GB", "TB"] |
| i = 0 |
| while size_bytes >= 1024 and i < len(size_names) - 1: |
| size_bytes /= 1024.0 |
| i += 1 |
| |
| return f"{size_bytes:.2f} {size_names[i]}" |
|
|
| def _write_log(self, message: str, level: str = 'info'): |
| """写入清理日志""" |
| try: |
| with self.lock: |
| timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| log_message = f"[{timestamp}] [{level.upper()}] {message}\n" |
| |
| with open(self.log_file, 'a', encoding='utf-8') as f: |
| f.write(log_message) |
| |
| |
| print(log_message.strip()) |
| |
| except Exception as e: |
| print(f"写入清理日志失败: {e}") |
|
|
| if __name__ == '__main__': |
| |
| print("CleanupManager 清理管理模块已创建") |