| |
| """ |
| MySQL 主从复制模块 - 前端接口层 |
| 负责处理前端请求,调用具体的业务逻辑实现 |
| """ |
|
|
| import json |
| import os |
| import sys |
| import time |
| |
| if '/www/server/panel/class' not in sys.path: |
| sys.path.insert(0, '/www/server/panel/class') |
| if '/www/server/panel' not in sys.path: |
| sys.path.insert(0, '/www/server/panel') |
|
|
| import public |
| from mod.project.node.mysql_slave_util.config_manager import ConfigManager |
| from mod.project.node.mysql_slave_util.mysql_manager import MySQLManager |
| from mod.project.node.mysql_slave_util.slave_manager import SlaveManager |
| from mod.project.node.mysql_slave_util.sync_service import SyncService |
| from mod.project.node.mysql_slave_util.firewall_manager import FirewallManager |
|
|
|
|
| class main: |
| """MySQL 主从复制主模块""" |
| |
| def __init__(self): |
| self.config_manager = ConfigManager() |
| self.mysql_manager = MySQLManager() |
| self.slave_manager = SlaveManager() |
| self.sync_service = SyncService() |
| self.firewall_manager = FirewallManager() |
|
|
| |
| def get_mysql_version(self, get=None): |
| """获取MySQL版本""" |
| version = self.mysql_manager.get_mysql_version() |
| if not version: |
| return public.returnMsg(False, "此功能仅支持mysql-5.7以上版本使用!") |
| return public.returnMsg(True, version) |
|
|
| def check_slave(self, get): |
| """检查从库连接和配置""" |
| required_params = ['panel_addr', 'panel_key', 'master_ip', 'slave_ip'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
| |
| return self.slave_manager.check_slave( |
| get.panel_addr, get.panel_key, get.master_ip, get.slave_ip |
| ) |
|
|
| |
| def database_list(self, get=None): |
| """获取主库数据库列表(带分页和搜索功能)""" |
| try: |
| |
| version_check = self.mysql_manager.get_mysql_version() |
| if not version_check: |
| return public.returnMsg(False, "此功能仅支持mysql-5.7以上版本使用!") |
| |
| |
| search = getattr(get, 'search', '').strip() if get else '' |
| p = int(getattr(get, 'p', 1)) if get and hasattr(get, 'p') else 1 |
| limit = int(getattr(get, 'limit', 20)) if get and hasattr(get, 'limit') else 20 |
| |
| |
| if p < 1: |
| p = 1 |
| if limit < 1 or limit > 100: |
| limit = 20 |
| |
| |
| all_data = self.mysql_manager.database_list() |
| if not all_data: |
| return { |
| "status": True, |
| "msg": "success", |
| "data": [], |
| "page": { |
| "current_page": p, |
| "total_pages": 0, |
| "total_count": 0, |
| "per_page": limit, |
| "has_prev": False, |
| "has_next": False, |
| "start_item": 0, |
| "end_item": 0, |
| "showing": "显示第 0 到 0 项,共 0 项" |
| }, |
| "search": search, |
| "total": 0 |
| } |
| |
| |
| filtered_data = all_data |
| if search: |
| filtered_data = [] |
| for item in all_data: |
| |
| if isinstance(item, str): |
| if search.lower() in item.lower(): |
| filtered_data.append(item) |
| |
| elif isinstance(item, dict): |
| search_fields = ['name', 'database', 'db_name', 'schema_name'] |
| match_found = False |
| for field in search_fields: |
| if field in item and item[field] and search.lower() in str(item[field]).lower(): |
| match_found = True |
| break |
| if match_found: |
| filtered_data.append(item) |
| else: |
| |
| if search.lower() in str(item).lower(): |
| filtered_data.append(item) |
| |
| |
| total_count = len(filtered_data) |
| total_pages = (total_count + limit - 1) // limit if total_count > 0 else 0 |
| offset = (p - 1) * limit |
| |
| |
| current_page_data = filtered_data[offset:offset + limit] |
| |
| |
| start_item = offset + 1 if total_count > 0 else 0 |
| end_item = min(offset + limit, total_count) |
| |
| page_info = { |
| "current_page": p, |
| "total_pages": total_pages, |
| "total_count": total_count, |
| "per_page": limit, |
| "has_prev": p > 1, |
| "has_next": p < total_pages, |
| "prev_page": p - 1 if p > 1 else None, |
| "next_page": p + 1 if p < total_pages else None, |
| "start_item": start_item, |
| "end_item": end_item, |
| "showing": "显示第 {} 到 {} 项,共 {} 项".format(start_item, end_item, total_count) |
| } |
| |
| |
| result = { |
| "status": True, |
| "msg": "success", |
| "data": current_page_data, |
| "page": page_info, |
| "search": search, |
| "total": total_count |
| } |
| return result |
| |
| except Exception as e: |
| import traceback |
| error_msg = "获取数据库列表失败: {}".format(str(e)) |
| public.WriteLog("TYPE_DATABASE", error_msg) |
| return { |
| "status": False, |
| "msg": error_msg, |
| "data": [], |
| "page": {}, |
| "search": "", |
| "total": 0 |
| } |
|
|
| def get_master_sql_list(self, get=None): |
| """获取主库备份文件列表""" |
| data = self.mysql_manager.get_master_sql_list() |
| result = { |
| "status": True, |
| "msg": "success", |
| "data": data |
| } |
| return result |
|
|
| def del_master_sql(self, get): |
| """删除主库备份文件""" |
| if not hasattr(get, 'path'): |
| return public.returnMsg(False, "缺少参数! path") |
| |
| return self.mysql_manager.del_master_sql(get.path) |
|
|
| |
| def slave_list(self, get=None): |
| """获取从库列表""" |
| |
| version_check = self.mysql_manager.get_mysql_version() |
| if not version_check: |
| return public.returnMsg(False, "此功能仅支持mysql-5.7以上版本使用!") |
|
|
| |
| try: |
| old_config_json = "/www/server/panel/plugin/mysql_replicate/config.json.0" |
| if os.path.exists(old_config_json): |
| old_config_json = json.loads(public.ReadFile(old_config_json)) |
| slave_list = old_config_json.get("slave") |
| master_list = old_config_json.get("master") |
| if slave_list and len(slave_list) > 0 and master_list and len(master_list) > 0: |
| return public.returnMsg(False, "当前主从版本不支持旧版主从同步,如需还原旧版本请在终端执行以下命令还原(不影响当前主从配置) 命令:wget -O remysql_slave.sh https://download.bt.cn/tools/remysql_slave.sh && bash remysql_slave.sh") |
| except: |
| pass |
|
|
| all_configs = self.config_manager.get_all_slave_configs() |
| if not all_configs: |
| return [] |
| |
| search_ip=None |
| if get and get.search: |
| search_ip=get.search |
|
|
| data = [] |
| for item in all_configs: |
| info = { |
| "slave_ip": item["slave_ip"], |
| "panel_addr": item["panel_addr"], |
| "panel_key": item["panel_key"], |
| "err_code": item["err_code"], |
| "sync_type": item["sync_type"], |
| "config_status": item["config_status"], |
| "run_status": item["run_status"], |
| "db_name": item["db_name"] |
| } |
| check_slave_result = self.slave_manager.check_slave(item.get("panel_addr"), item.get("panel_key"), item.get("master_ip"), item.get("slave_ip")) |
| slave_ip = item.get("slave_ip") |
| slave_ip = item.get("slave_ip") |
| if item.get("config_status") == "done" and check_slave_result.get("status") is True: |
| try: |
| slave_status = json.loads(self.slave_manager.get_slave_info(slave_ip)) |
| io_status = slave_status.get("Slave_IO_Running") |
| sql_status = slave_status.get("Slave_SQL_Running") |
| if io_status and sql_status: |
| info["io_status"] = io_status |
| info["sql_status"] = sql_status |
| if io_status != "Yes" or sql_status != "Yes": |
| info["error_msg"] = { |
| "Last_IO_Error_Timestamp": slave_status.get("Last_IO_Error_Timestamp"), |
| "Last_IO_Errno": slave_status.get("Last_IO_Errno"), |
| "Last_IO_Error": slave_status.get("Last_IO_Error"), |
| "Last_SQL_Error_Timestamp": slave_status.get("Last_SQL_Error_Timestamp"), |
| "Last_SQL_Errno": slave_status.get("Last_SQL_Errno"), |
| "Last_SQL_Error": slave_status.get("Last_SQL_Error") |
| } |
| except: |
| info["io_status"] = "None" |
| info["sql_status"] = "None" |
| else: |
| info["io_status"] = "api_error" |
| info["sql_status"] = "api_error" |
|
|
| api_status_result = self.slave_manager.get_slave_mysql_status(slave_ip) |
| info["api_status"] = api_status_result.get("status") |
| info["api_msg"] = api_status_result.get("msg") |
| data.append(info) |
|
|
| if search_ip: |
| data = [item for item in data if search_ip in item["slave_ip"]] |
|
|
| result = { |
| "status": True, |
| "msg": "success", |
| "data": data |
| } |
| return result |
|
|
| def add_slave_server(self, get): |
| """添加主从配置""" |
| required_params = ['node_id', 'db_name', 'err_code', 'sync_type', 'master_ip', 'slave_ip'] |
| |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
|
|
| node_id=get.node_id |
| node_data = public.M("node").where("id = ?", (node_id,)).find() |
| if not node_data: |
| return public.returnMsg(False, "未找到节点信息") |
|
|
| if get.master_ip == get.slave_ip: |
| return public.returnMsg(False, "主库和从库不能是同一个IP!") |
|
|
| public.set_module_logs("nodes_mysql_slave_9", "add_slave_server") |
| if not os.path.exists("/www/server/panel/config/mysql_slave_info"): |
| public.ExecShell("mkdir -p /www/server/panel/config/mysql_slave_info") |
|
|
| panel_addr = node_data.get("address") |
| panel_key = node_data.get("api_key") or node_data.get("app_key") |
|
|
| |
| ip_validation_result = self.validate_ip_network_type(get.master_ip, get.slave_ip) |
| if not ip_validation_result.get("status"): |
| return public.returnMsg(False, ip_validation_result.get("msg")) |
| |
| check_slave_result = self.slave_manager.check_slave( |
| panel_addr, panel_key, get.master_ip, get.slave_ip |
| ) |
| if check_slave_result.get("status") is False: |
| return public.returnMsg(False, check_slave_result.get("msg")) |
| |
| if os.path.exists("/www/server/panel/config/mysql_slave_info/log/{}.log".format(get.master_ip)): |
| public.ExecShell("rm -rf /www/server/panel/config/mysql_slave_info/log/{}.log".format(get.master_ip)) |
| |
| default_err_code="1007,1050" |
| if get.err_code == "": |
| err_code=default_err_code |
| else: |
| err_code=get.err_code + "," + default_err_code |
|
|
| return self.sync_service.add_slave_server( |
| panel_addr, panel_key, get.master_ip, |
| get.slave_ip, get.db_name, err_code, get.sync_type |
| ) |
|
|
| def del_slave_server(self, get): |
| """删除从库服务器""" |
| if not hasattr(get, 'slave_ip'): |
| return public.returnMsg(False, "缺少参数! slave_ip") |
|
|
| slave_ip = get.slave_ip |
| slave_info = self.config_manager.get_slave_config(slave_ip) |
| if not slave_info: |
| return public.returnMsg(False, "未找到从库配置信息") |
|
|
| |
| self.slave_manager.exec_shell_sql(slave_ip, "stop slave;") |
| self.slave_manager.exec_shell_sql(slave_ip, "RESET SLAVE ALL;") |
|
|
| |
| master_user = slave_info.get("master_user") |
| if master_user: |
| self.mysql_manager.drop_slave_user(master_user, slave_ip) |
|
|
| |
| self.config_manager.remove_slave_config(slave_ip) |
|
|
| return public.returnMsg(True, "删除成功!") |
|
|
| def set_slave_status(self, get): |
| """设置从库状态""" |
| required_params = ['slave_ip', 'status'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
|
|
| return self.slave_manager.set_slave_status(get.slave_ip, get.status) |
|
|
| def get_slave_info(self, get): |
| """获取从库信息(面板用)""" |
| if not hasattr(get, 'slave_ip'): |
| return public.returnMsg(False, "缺少参数! slave_ip") |
|
|
| return self.slave_manager.get_slave_info_dict(get.slave_ip) |
|
|
| |
| def sync_slave_config(self, get): |
| """手动同步从库配置""" |
| if not hasattr(get, 'slave_ip'): |
| return public.returnMsg(False, "缺少参数! slave_ip") |
|
|
| return self.sync_service.sync_slave_config(get.slave_ip) |
|
|
| def get_sync_status(self, get): |
| """获取同步状态""" |
| if not hasattr(get, 'slave_ip'): |
| return public.returnMsg(False, "缺少参数! slave_ip") |
|
|
| slave_ip = get.slave_ip |
| info = self.config_manager.get_slave_config(slave_ip) |
| if not info: |
| return public.returnMsg(False, "未找到同步信息,可能未开始配置,请删除重建主从") |
| |
| |
| log_path = self.config_manager.master_slave_log_path + "/" + slave_ip + ".log" |
| if os.path.exists(log_path): |
| info["data"]["logs"] = public.ReadFile(log_path) |
| else: |
| info["data"]["logs"] = "" |
| |
| result = { |
| "status": True, |
| "msg": "success", |
| "data": info |
| } |
| return result |
| |
| def get_slave_error_info(self, get): |
| slave_data=self.slave_list(None) |
| error_data=[] |
| for item in slave_data["data"]: |
| io_status = item.get("io_status") |
| sql_status = item.get("sql_status") |
| |
| if not io_status or not sql_status: |
| error_time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) |
| data={ |
| "slave_ip": item["slave_ip"], |
| "error_time": error_time, |
| "io_status": io_status, |
| "sql_status": sql_status, |
| "error_msg": "无法获取到从库状态,请检查从库mysql是否正常运行" |
| } |
| error_data.append(data) |
| else: |
| if io_status != "Yes" or sql_status != "Yes": |
| error_msg = item.get("error_msg", {}) |
| if error_msg.get("Last_IO_Error_Timestamp"): |
| error_time = error_msg["Last_IO_Error_Timestamp"] |
| elif error_msg.get("Last_SQL_Error_Timestamp"): |
| error_time = error_msg["Last_SQL_Error_Timestamp"] |
| else: |
| error_time = "" |
| data={ |
| "slave_ip": item["slave_ip"], |
| "error_time": error_time, |
| "io_status": io_status, |
| "sql_status": sql_status, |
| "error_msg": error_msg |
| } |
| error_data.append(data) |
| return error_data |
|
|
|
|
| |
| def get_old_slave_list(self, get=None): |
| """获取旧版从库列表(兼容性)""" |
| |
| return False |
|
|
| def info_test(self, slave_ip): |
| """测试信息(兼容性)""" |
| info = self.config_manager.get_slave_config(slave_ip) |
| if info: |
| for step in info["data"]["steps"]: |
| if step["name"] == "导入数据": |
| step["name"] = "从库数据库已创建好,等待手动将数据导入从库...." |
| self.config_manager.save_slave_config(slave_ip, info) |
|
|
| |
| def auto_sync_data(self, slave_ip): |
| """自动同步数据(后台任务调用)""" |
| return self.sync_service.auto_sync_data(slave_ip) |
|
|
| def manual_sync_data(self, slave_ip): |
| """手动同步数据(后台任务调用)""" |
| return self.sync_service.manual_sync_data(slave_ip) |
|
|
| def is_private_ip(self, ip_address): |
| """ |
| 判断IP地址是否为内网IP |
| :param ip_address: IP地址字符串 |
| :return: True表示内网IP,False表示公网IP |
| """ |
| import ipaddress |
| try: |
| ip = ipaddress.ip_address(ip_address) |
| |
| return ip.is_private or ip.is_loopback or ip.is_link_local |
| except ValueError: |
| |
| return False |
|
|
| def validate_ip_network_type(self, master_ip, slave_ip): |
| """ |
| 验证主从IP是否为同一网络类型(都是内网或都是公网) |
| :param master_ip: 主库IP |
| :param slave_ip: 从库IP |
| :return: dict 包含验证结果和消息 |
| """ |
| master_is_private = self.is_private_ip(master_ip) |
| slave_is_private = self.is_private_ip(slave_ip) |
| |
| |
| if master_is_private == slave_is_private: |
| network_type = "内网" if master_is_private else "公网" |
| return { |
| "status": True, |
| "msg": "IP网络类型验证通过,主从库均为{}IP".format(network_type), |
| "master_type": "内网" if master_is_private else "公网", |
| "slave_type": "内网" if slave_is_private else "公网" |
| } |
| else: |
| master_type = "内网" if master_is_private else "公网" |
| slave_type = "内网" if slave_is_private else "公网" |
| return { |
| "status": False, |
| "msg": "主从库IP网络类型不匹配!主库({})为{}IP,从库({})为{}IP。请使用相同网络类型的IP地址确保可连接。".format( |
| master_ip, master_type, slave_ip, slave_type |
| ), |
| "master_type": master_type, |
| "slave_type": slave_type |
| } |
| |