| |
| |
| |
| |
| |
| |
| import json |
| import os |
| import sys |
| import time |
| import datetime |
| import re |
|
|
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
|
|
| if '/www/server/panel' not in sys.path: |
| sys.path.insert(0, '/www/server/panel') |
|
|
| import public |
| import panelMysql |
| import db_mysql |
| import database |
| from mod.project.mysql_binlog_backup.config_manager import ConfigManager |
| from mod.project.mysql_binlog_backup.backup_manager import BackupManager |
| from mod.project.mysql_binlog_backup.restore_manager import RestoreManager |
| from mod.project.mysql_binlog_backup.cleanup_manager import CleanupManager |
|
|
| class main: |
| def __init__(self): |
| self.base_path = '/www/backup/mysql_binlog_backup' |
| self.config_manager = ConfigManager() |
| self.backup_manager = BackupManager() |
| self.restore_manager = RestoreManager() |
| self.cleanup_manager = CleanupManager(self.backup_manager, self.config_manager) |
| |
| self._init_db_connections() |
|
|
| |
| if not os.path.exists(self.base_path): |
| os.makedirs(self.base_path, exist_ok=True) |
| |
| def _init_db_connections(self): |
| """初始化数据库连接""" |
| self.mysql_obj = db_mysql.panelMysql() |
|
|
| def return_data(self, status: bool, msg=None, data=None): |
| """统一返回数据格式""" |
| return { |
| "status": status, |
| "msg": msg, |
| "data": data |
| } |
| |
| def _id_get_database_name(self, get): |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少ID") |
| db_info = public.M('databases').where('id=? AND type=?', (get.id, 'MySQL')).find() |
| if not db_info: |
| return self.return_data(False, "MySQL数据库ID {} 不存在".format(get.id)) |
| try: |
| if db_info['db_type'] != 0: |
| return self.return_data(False, "只支持本地本机数据库,不支持docker或远程数据库!") |
| except: |
| return self.return_data(False, "只支持本地本机数据库,不支持docker或远程数据库!") |
| return db_info['name'] |
|
|
| def add_binlog_backup_task(self, get): |
| """添加binlog备份任务""" |
| try: |
| |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
| |
| if not hasattr(get, 'schedule_type') or not get.schedule_type: |
| return self.return_data(False, "缺少调度类型") |
| |
| if not hasattr(get, 'incremental_backup_interval') or not get.incremental_backup_interval: |
| return self.return_data(False, "缺少增量备份间隔(分钟)") |
|
|
| |
| keep_days = int(getattr(get, 'keep_days', 30)) |
| if keep_days < 1: |
| return self.return_data(False, "保留天数必须大于0") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| |
| if not self.backup_manager.check_binlog_enabled(): |
| return self.return_data(False, "请先开启MySQL的binlog功能") |
|
|
| try: |
| incremental_backup_interval=int(get.incremental_backup_interval) |
| except: |
| return self.return_data(False, "增量备份间隔请设置为合理的数值") |
|
|
| |
| schedule_calculator = self.config_manager.schedule_calculator |
| try: |
| schedule_config = schedule_calculator.parse_schedule_from_request(get) |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| |
| task_config = { |
| 'database_name': database_name, |
| 'full_backup_schedule': schedule_config, |
| 'incremental_backup_interval': incremental_backup_interval, |
| 'keep_days': keep_days, |
| 'enabled': True, |
| 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
| 'last_full_backup': None, |
| 'last_incremental_backup': None, |
| 'next_full_backup': None, |
| 'next_incremental_backup': None |
| } |
|
|
| |
| result = self.config_manager.save_backup_task_config(task_config) |
| if result['status']: |
| |
| self._add_cron_task() |
| public.set_module_logs('mysql_binlog_backup', 'add_binlog_backup_task', 1) |
| return self.return_data(True, "添加备份任务成功", result['data']) |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_schedule_options(self, get=None): |
| """获取支持的调度选项""" |
| try: |
| options = { |
| "schedule_types": [ |
| { |
| "value": "hours", |
| "label": "每N小时", |
| "description": "传统的小时间隔模式", |
| "params": ["interval_hours"] |
| }, |
| { |
| "value": "daily", |
| "label": "每天固定时间", |
| "description": "每天在指定时间执行", |
| "params": ["time"] |
| }, |
| { |
| "value": "weekly", |
| "label": "每周固定时间", |
| "description": "每周指定天的指定时间执行", |
| "params": ["time", "weekday"] |
| }, |
| { |
| "value": "interval", |
| "label": "自定义间隔", |
| "description": "每隔N天的指定时间执行", |
| "params": ["time", "interval_days", "start_date"] |
| } |
| ], |
| "weekdays": [ |
| {"value": 0, "label": "周日"}, |
| {"value": 1, "label": "周一"}, |
| {"value": 2, "label": "周二"}, |
| {"value": 3, "label": "周三"}, |
| {"value": 4, "label": "周四"}, |
| {"value": 5, "label": "周五"}, |
| {"value": 6, "label": "周六"} |
| ], |
| "examples": { |
| "daily": { |
| "description": "每天凌晨1:30执行", |
| "config": { |
| "schedule_type": "daily", |
| "schedule_time": "01:30:00" |
| } |
| }, |
| "weekly": { |
| "description": "每周日凌晨1:30执行", |
| "config": { |
| "schedule_type": "weekly", |
| "schedule_time": "01:30:00", |
| "weekday": 0 |
| } |
| }, |
| "interval": { |
| "description": "每3天的凌晨1:01执行", |
| "config": { |
| "schedule_type": "interval", |
| "schedule_time": "01:01:00", |
| "interval_days": 3, |
| "start_date": "2025-07-08" |
| } |
| }, |
| "hours": { |
| "description": "每24小时执行", |
| "config": { |
| "schedule_type": "hours", |
| "interval_hours": 24 |
| } |
| } |
| } |
| } |
| return self.return_data(True, "获取调度选项成功", options) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_backup_task_list(self, get): |
| """获取备份任务配置""" |
| try: |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| |
| task = self.config_manager.get_backup_task_config(database_name) |
| space_free = self.backup_manager.disk_free_check() |
| if task: |
| if not space_free: |
| task['space_free'] = False |
| else: |
| task['space_free'] = True |
|
|
| if task['last_incremental_backup'] == None and task['last_full_backup'] != None: |
| task['last_exec_time'] = task['last_full_backup'] |
| elif task['last_incremental_backup'] != None and task['last_full_backup'] != None: |
| try: |
| incremental_time = datetime.datetime.strptime(task['last_incremental_backup'], '%Y-%m-%d %H:%M:%S') |
| full_backup_time = datetime.datetime.strptime(task['last_full_backup'], '%Y-%m-%d %H:%M:%S') |
| |
| if incremental_time > full_backup_time: |
| task['last_exec_time'] = task['last_incremental_backup'] |
| else: |
| task['last_exec_time'] = task['last_full_backup'] |
| except (ValueError, TypeError) as e: |
| task['last_exec_time'] = task['last_full_backup'] |
| else: |
| task['last_exec_time'] = None |
|
|
| task['next_exec_time'] = task['next_full_backup'] |
|
|
| incremental_time = datetime.datetime.strptime(task['next_incremental_backup'], '%Y-%m-%d %H:%M:%S') |
| full_backup_time = datetime.datetime.strptime(task['next_full_backup'], '%Y-%m-%d %H:%M:%S') |
| if incremental_time > full_backup_time: |
| task['next_exec_time'] = task['next_incremental_backup'] |
| else: |
| task['next_exec_time'] = task['last_full_backup'] |
|
|
| return self.return_data(True, "获取成功", task) |
| else: |
| if not space_free: |
| data = { |
| "enabled": False, |
| "space_free": False |
| } |
| else: |
| data = { |
| "enabled": False, |
| "space_free": True |
| } |
| return self.return_data(True, "获取成功", data) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_all_backup_tasks(self, get=None): |
| """获取所有备份任务列表""" |
| try: |
| tasks = self.config_manager.get_backup_task_list() |
| return self.return_data(True, "获取成功", tasks) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_backup_files_list(self, get): |
| """获取备份文件列表(支持分页、日期筛选、类型筛选)""" |
| try: |
| |
| database_name = None |
| if hasattr(get, 'id') and get.id: |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
| |
| backup_type = getattr(get, 'backup_type', 'all') |
| date = getattr(get, 'date', None) |
| page = int(getattr(get, 'page', 1)) |
| limit = int(getattr(get, 'limit', 20)) |
| |
| |
| if page < 1: |
| page = 1 |
| if limit < 1 or limit > 100: |
| limit = 20 |
| |
| |
| result = self.backup_manager.get_backup_files_list_with_filter( |
| database_name=database_name, |
| backup_type=backup_type, |
| date=date, |
| page=page, |
| limit=limit |
| ) |
| |
| return self.return_data(True, "获取成功", result) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def update_backup_task_config(self, get): |
| """更新备份任务配置""" |
| try: |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| update_data = {} |
| if hasattr(get, 'full_backup_interval'): |
| update_data['full_backup_interval'] = int(get.full_backup_interval) |
| if hasattr(get, 'incremental_backup_interval'): |
| update_data['incremental_backup_interval'] = int(get.incremental_backup_interval) |
| if hasattr(get, 'keep_days'): |
| keep_days = int(get.keep_days) |
| if keep_days < 1: |
| return self.return_data(False, "保留天数必须大于0") |
| update_data['keep_days'] = keep_days |
| if hasattr(get, 'enabled'): |
| update_data['enabled'] = bool(get.enabled) |
|
|
| result = self.config_manager.update_backup_task_config(database_name, update_data) |
| if result['status']: |
| return self.return_data(True, "更新配置成功") |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def set_backup_task_status(self, get): |
| """设置备份任务状态(启用/禁用)""" |
| try: |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
| |
| if not hasattr(get, 'enabled'): |
| return self.return_data(False, "缺少状态参数(enabled)") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| |
| task_config = self.config_manager.get_backup_task_config(database_name) |
| if not task_config: |
| return self.return_data(False, "数据库 {} 的备份任务不存在".format(database_name)) |
|
|
| |
| enabled = False |
| if str(get.enabled).lower() in ['true', '1', 'yes', 'on']: |
| enabled = True |
| elif str(get.enabled).lower() in ['false', '0', 'no', 'off']: |
| enabled = False |
| else: |
| return self.return_data(False, "状态参数无效,请使用 true/false 或 1/0") |
|
|
| |
| update_data = {'enabled': enabled} |
| result = self.config_manager.update_backup_task_config(database_name, update_data) |
| |
| if result['status']: |
| status_text = "启用" if enabled else "禁用" |
| |
| public.set_module_logs('mysql_binlog_backup', 'set_backup_task_status', 1) |
| |
| return self.return_data(True, "备份任务已{}".format(status_text), { |
| 'database_name': database_name, |
| 'enabled': enabled, |
| 'status_text': status_text |
| }) |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def del_binlog_backup_task(self, get): |
| """删除binlog备份任务""" |
| try: |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| |
| result = self.config_manager.delete_backup_task_config(database_name) |
| if result['status']: |
| |
| remaining_tasks = self.config_manager.get_backup_task_list() |
| if not remaining_tasks: |
| self._remove_cron_task() |
| |
| public.set_module_logs('mysql_binlog_backup', 'del_binlog_backup_task', 1) |
| return self.return_data(True, "设置成功!") |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_backup_logs(self, get): |
| """查看备份任务日志""" |
| try: |
| database_name = None |
| if hasattr(get, 'id') and get.id: |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
| |
| log_type = getattr(get, 'log_type', 'all') |
| limit = getattr(get, 'limit', 100) |
| |
| logs = self.backup_manager.get_backup_logs(database_name, log_type, limit) |
| return self.return_data(True, "获取日志成功", logs) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def delete_backup_file(self, get): |
| """删除备份文件""" |
| try: |
| if not hasattr(get, 'backup_id') or not get.backup_id: |
| return self.return_data(False, "缺少备份ID") |
|
|
| result = self.backup_manager.delete_backup_file(get.backup_id) |
| if result['status']: |
| return self.return_data(True, "删除备份文件成功") |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def restore_binlog_data(self, get): |
| """还原备份数据""" |
| try: |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
| |
| if not hasattr(get, 'restore_time') or not get.restore_time: |
| return self.return_data(False, "缺少还原时间点") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| |
| result = self.restore_manager.start_restore_task(database_name, get.restore_time) |
| if result['status']: |
| public.set_module_logs('mysql_binlog_backup', 'restore_binlog_data', 1) |
| return self.return_data(True, "还原任务已启动", result['data']) |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_restore_progress(self, get): |
| """获取还原进度""" |
| try: |
| task_id = getattr(get, 'task_id', None) |
| progress = self.restore_manager.get_restore_progress(task_id) |
| return self.return_data(True, "获取进度成功", progress) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_backup_status(self, get=None): |
| """获取备份状态""" |
| try: |
| status = self.backup_manager.get_backup_status() |
| return self.return_data(True, "获取状态成功", status) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
| |
| def get_db_info(self, get=None): |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
| db_name = self._id_get_database_name(get) |
| if isinstance(db_name, dict): |
| return db_name |
| |
| db_size = 0 |
| try: |
| table_list = self.mysql_obj.query("show tables from `{}`".format(db_name)) |
| db_size=self.mysql_obj.query("SELECT SUM(data_length + index_length) AS db_size_bytes FROM information_schema.tables WHERE table_schema = '{}'".format(db_name)) |
| db_size=int(db_size[0][0]) |
| except: |
| db_size = public.ExecShell("du -sb /www/server/data/{}".format(db_name))[0].split("\t")[0] |
| if not db_size: |
| db_size=0 |
| elif int(db_size) < 100: |
| db_size=0 |
| |
| try: |
| table_count=len(table_list) |
| except: |
| return self.return_data(False, "获取数据库信息失败,请检查mysql是否正常启动/root密码是否正确后再进行增量备份设置") |
| |
| log_bin_status=self.mysql_obj.query("SHOW VARIABLES LIKE 'log_bin'") |
| if log_bin_status[0][1].find("ON") != -1: |
| log_bin_status=True |
| else: |
| log_bin_status=False |
|
|
| if log_bin_status == False: |
| return self.return_data(False, "二进制日志已关闭,请开启二进制日志后再设置增量备份!") |
|
|
| data={ |
| "db_size": db_size, |
| "table_count": table_count |
| } |
| |
| return self.return_data(True, "获取数据库信息成功", data) |
| |
| def cleanup_all_backups(self, get=None): |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
| |
| """清理所有备份文件""" |
| try: |
| result = self.backup_manager.cleanup_all_backups(database_name) |
| if result['status']: |
| return self.return_data(True, result['msg']) |
| else: |
| return self.return_data(False, result['msg']) |
| except: |
| return self.return_data(False, "清理失败") |
|
|
| def cleanup_old_backups(self, get): |
| """清理旧备份文件""" |
| try: |
| |
| database_name = None |
| if hasattr(get, 'id') and get.id: |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
| |
| |
| keep_days = None |
| if hasattr(get, 'keep_days') and get.keep_days: |
| keep_days = int(get.keep_days) |
| if keep_days < 1: |
| return self.return_data(False, "保留天数必须大于0") |
| |
| |
| if database_name: |
| |
| if not keep_days: |
| task_config = self.config_manager.get_backup_task_config(database_name) |
| if task_config and 'keep_days' in task_config: |
| keep_days = task_config['keep_days'] |
| else: |
| keep_days = 30 |
| |
| result = self.cleanup_manager.cleanup_database_backups(database_name, keep_days) |
| if result['status']: |
| return self.return_data(True, result['msg'], result) |
| else: |
| return self.return_data(False, result['msg']) |
| else: |
| |
| result = self.cleanup_manager.cleanup_all_backups(keep_days) |
| if result['status']: |
| return self.return_data(True, result['msg'], result.get('data', {})) |
| else: |
| return self.return_data(False, result['msg']) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_cleanup_preview(self, get): |
| """预览清理效果(不实际删除)""" |
| try: |
| if not hasattr(get, 'id') or not get.id: |
| return self.return_data(False, "缺少数据库ID") |
|
|
| |
| database_name = self._id_get_database_name(get) |
| if isinstance(database_name, dict): |
| return database_name |
|
|
| |
| keep_days = int(getattr(get, 'keep_days', 30)) |
| if keep_days < 1: |
| return self.return_data(False, "保留天数必须大于0") |
|
|
| result = self.cleanup_manager.get_cleanup_preview(database_name, keep_days) |
| if result['status']: |
| return self.return_data(True, result['msg'], result.get('preview', {})) |
| else: |
| return self.return_data(False, result['msg']) |
|
|
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def get_cleanup_logs(self, get=None): |
| """获取清理日志""" |
| try: |
| limit = int(getattr(get, 'limit', 100)) if get else 100 |
| logs = self.cleanup_manager.get_cleanup_logs(limit) |
| return self.return_data(True, "获取清理日志成功", logs) |
| |
| except Exception as e: |
| return self.return_data(False, str(e)) |
|
|
| def _add_cron_task(self): |
| """添加计划任务""" |
| import crontab |
| try: |
| |
| cmd = "btpython {}/task_scheduler.py".format(os.path.dirname(__file__)) |
| |
| |
| cron_list = public.M('crontab').where('name=?', ('[勿删]MySQL 增量备份任务',)).select() |
| if not cron_list: |
| pdata = { |
| 'name': '[勿删]MySQL 增量备份任务', |
| 'sBody': cmd, |
| 'sType': 'toShell', |
| 'sName': '', |
| 'backupTo': '', |
| 'save': '', |
| 'urladdress': '', |
| 'save_local': 0, |
| 'notice': 0, |
| 'notice_channel': '', |
| 'datab_name': '', |
| 'tables_name': '', |
| 'flock': 1, |
| 'version': '', |
| 'user': '', |
| 'stop_site': 0, |
| 'type': 'minute-n', |
| 'week': 1, |
| 'hour': 1, |
| 'minute': 1, |
| 'where1': 1, |
| 'timeSet': 1, |
| 'timeType': "sday", |
| } |
| bt_syssafe_stop=False |
| if os.path.exists("/etc/init.d/bt_syssafe"): |
| bt_syssafe_status = public.ExecShell("/etc/init.d/bt_syssafe status") |
| if bt_syssafe_status[0].find("already running") != -1: |
| public.ExecShell("/etc/init.d/bt_syssafe stop") |
| bt_syssafe_stop=True |
| time.sleep(1) |
|
|
| crontab.crontab().AddCrontab(pdata) |
|
|
| if bt_syssafe_stop: |
| public.ExecShell("/etc/init.d/bt_syssafe start") |
| time.sleep(1) |
| except Exception as e: |
| print("添加计划任务失败: {}".format(e)) |
|
|
| def _remove_cron_task(self): |
| """移除计划任务""" |
| try: |
| public.M('crontab').where('echo=?', ('mysql_binlog_backup',)).delete() |
| except Exception as e: |
| print("移除计划任务失败: {}".format(e)) |
|
|