| |
| """ |
| Redis 主从复制模块 - 前端接口层 |
| 负责处理前端请求,调用具体的业务逻辑实现 |
| """ |
|
|
| import json |
| import os |
| import sys |
| import time |
| import socket |
| |
| if '/www/server/panel/class' not in sys.path: |
| sys.path.insert(0, '/www/server/panel/class') |
| if '/www/server/panel' not in sys.path: |
| sys.path.insert(0, '/www/server/panel') |
|
|
| import public |
| from mod.project.node.redis_slave_util.config_manager import ConfigManager |
| from mod.project.node.redis_slave_util.redis_manager import RedisManager |
| from mod.project.node.redis_slave_util.slave_manager import SlaveManager |
| from mod.project.node.redis_slave_util.sync_service import SyncService |
| from mod.project.node.redis_slave_util.monitor_service import MonitorService |
|
|
|
|
|
|
| class main: |
| """Redis 主从复制主模块""" |
| |
| def __init__(self): |
| self.config_manager = ConfigManager() |
| self.redis_manager = RedisManager() |
| self.slave_manager = SlaveManager() |
| self.sync_service = SyncService() |
| self.monitor_service = MonitorService() |
|
|
| |
| def get_redis_version(self, get=None): |
| """获取Redis版本""" |
| version = self.redis_manager.get_redis_version() |
| if not version: |
| return public.returnMsg(False, "此功能仅支持Redis-3.2以上版本使用!") |
| return public.returnMsg(True, version) |
|
|
| def check_slave(self, get): |
| """检查从库连接和配置""" |
| required_params = ['panel_addr', 'panel_key', 'master_ip', 'slave_ip'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
| |
| return self.slave_manager.check_slave( |
| get.panel_addr, get.panel_key, get.master_ip, get.slave_ip |
| ) |
|
|
| def check_redis_port(self, server_ip=None, port=6379): |
| """检查Redis端口是否开放""" |
| try: |
| |
| if isinstance(port, str): |
| port = int(port) |
| |
| |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.settimeout(3) |
| |
| |
| result = sock.connect_ex((server_ip, port)) |
| sock.close() |
| |
| |
| if result == 0: |
| return True |
| else: |
| return False |
| |
| except Exception as e: |
| return False |
|
|
| |
| def replication_group_list(self, get=None): |
| """获取主从复制组列表(带分页和搜索功能)""" |
| try: |
| |
| version_check = self.redis_manager.get_redis_version() |
| if not version_check: |
| return public.returnMsg(False, "此功能仅支持Redis-3.2以上版本使用!") |
| |
| |
| search = getattr(get, 'search', '').strip() if get else '' |
| p = int(getattr(get, 'p', 1)) if get and hasattr(get, 'p') else 1 |
| limit = int(getattr(get, 'limit', 20)) if get and hasattr(get, 'limit') else 20 |
| |
| |
| if p < 1: |
| p = 1 |
| if limit < 1 or limit > 100: |
| limit = 20 |
| |
| |
| all_data = self.config_manager.get_all_replication_groups() |
| if not all_data: |
| return { |
| "status": True, |
| "msg": "success", |
| "data": [], |
| "page": { |
| "current_page": p, |
| "total_pages": 0, |
| "total_count": 0, |
| "per_page": limit, |
| "has_prev": False, |
| "has_next": False, |
| "start_item": 0, |
| "end_item": 0, |
| "showing": "显示第 0 到 0 项,共 0 项" |
| }, |
| "search": search, |
| "total": 0 |
| } |
| |
| |
| enriched_data = [] |
| for group in all_data: |
| group_info = self.enrich_group_data(group) |
| enriched_data.append(group_info) |
| |
| |
| filtered_data = enriched_data |
| if search: |
| filtered_data = [] |
| for item in enriched_data: |
| |
| if (search.lower() in item.get('group_name', '').lower() or |
| search.lower() in item.get('master_server_ip', '').lower()): |
| filtered_data.append(item) |
| |
| |
| total_count = len(filtered_data) |
| total_pages = (total_count + limit - 1) // limit if total_count > 0 else 0 |
| offset = (p - 1) * limit |
| |
| |
| current_page_data = filtered_data[offset:offset + limit] |
| |
| |
| start_item = offset + 1 if total_count > 0 else 0 |
| end_item = min(offset + limit, total_count) |
| |
| page_info = { |
| "current_page": p, |
| "total_pages": total_pages, |
| "total_count": total_count, |
| "per_page": limit, |
| "has_prev": p > 1, |
| "has_next": p < total_pages, |
| "prev_page": p - 1 if p > 1 else None, |
| "next_page": p + 1 if p < total_pages else None, |
| "start_item": start_item, |
| "end_item": end_item, |
| "showing": "显示第 {} 到 {} 项,共 {} 项".format(start_item, end_item, total_count) |
| } |
| |
| |
| result = { |
| "status": True, |
| "msg": "success", |
| "data": current_page_data, |
| "page": page_info, |
| "search": search, |
| "total": total_count |
| } |
| return result |
| |
| except Exception as e: |
| import traceback |
| error_msg = "获取主从复制组列表失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| return { |
| "status": False, |
| "msg": error_msg, |
| "data": [], |
| "page": {}, |
| "search": "", |
| "total": 0 |
| } |
|
|
| def enrich_group_data(self, group): |
| """丰富复制组数据,添加监控信息""" |
| try: |
| |
| master_info = self.monitor_service.get_master_status(group['master_node_id']) |
| |
| |
| slave_count = len(group.get('slave_nodes', [])) |
| normal_slaves = 0 |
| total_lag = 0 |
| lag_count = 0 |
| |
| for slave_node_id in group.get('slave_nodes', []): |
| slave_status = self.monitor_service.get_slave_status(slave_node_id) |
| if slave_status.get('is_online', False): |
| normal_slaves += 1 |
| if slave_status.get('lag') is not None: |
| total_lag += slave_status['lag'] |
| lag_count += 1 |
| |
| |
| avg_lag = round(total_lag / lag_count, 2) if lag_count > 0 else 0 |
| |
| return { |
| 'group_id': group['group_id'], |
| 'group_name': group['group_name'], |
| 'master_server_ip': "{}:{}".format(group['master_ip'], group['master_port']), |
| 'memory_usage': master_info.get('memory_usage_percent', 0), |
| 'qps': master_info.get('qps', 0), |
| 'connections': master_info.get('connections', 0), |
| 'slave_count': slave_count, |
| 'normal_slaves': normal_slaves, |
| 'avg_lag': avg_lag, |
| 'created_time': group.get('created_time', ''), |
| 'status': 'normal' if normal_slaves == slave_count else 'warning', |
| 'task_id': group.get('task_id', '') |
| } |
| except Exception as e: |
| public.WriteLog("TYPE_REDIS", "丰富复制组数据失败: {}".format(str(e))) |
| return group |
|
|
| |
| def create_replication_group(self, get): |
| """创建主从复制组""" |
| required_params = ['group_name', 'master_node_id', 'slave_node_ids','master_ip','redis_password'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
|
|
| |
| group_name = get.group_name.strip() |
| if not group_name: |
| return public.returnMsg(False, "主从名称不能为空!") |
| |
| |
| if self.config_manager.group_name_exists(group_name): |
| return public.returnMsg(False, "主从复制组名称已存在!") |
| |
| |
| master_node = public.M("node").where("id = ?", (get.master_node_id,)).find() |
| if not master_node: |
| return public.returnMsg(False, "未找到主节点信息") |
| |
| |
| slave_node_ids = json.loads(get.slave_node_ids) if isinstance(get.slave_node_ids, str) else get.slave_node_ids |
| slave_nodes = [] |
| for slave_id in slave_node_ids: |
| slave_node = public.M("node").where("id = ?", (slave_id,)).find() |
| if not slave_node: |
| return public.returnMsg(False, "未找到从节点信息: {}".format(slave_id)) |
| slave_nodes.append(slave_node) |
|
|
| |
| |
| master_ip = get.master_ip |
| for slave_node in slave_nodes: |
| slave_ip = slave_node.get("server_ip") |
| if master_ip == slave_ip: |
| return public.returnMsg(False, "主库和从库不能是同一个IP!") |
| |
| |
| |
| if master_ip == "127.0.0.1": |
| return public.returnMsg(False, "主库不能是127.0.0.1!") |
| |
| |
| |
| if len(get.redis_password) < 8: |
| return public.returnMsg(False, "Redis密码长度不能少于8位!") |
| |
| ip_list = [master_ip] |
| for slave_node in slave_nodes: |
| ip_list.append(slave_node.get("server_ip")) |
| for ip in ip_list: |
| if not self.check_redis_port(ip): |
| return public.returnMsg(False, "连接{} redis失败,请指定ip放行6379端口,并设置redis外网访问权限(注意设置密码)".format(ip)) |
|
|
|
|
| |
| if not os.path.exists("/www/server/panel/config/redis_slave_info"): |
| public.ExecShell("mkdir -p /www/server/panel/config/redis_slave_info") |
| |
| return self.sync_service.create_replication_group( |
| group_name, master_node, slave_nodes, get.redis_password,master_ip |
| ) |
|
|
| def get_creation_status(self, get): |
| """获取创建状态""" |
| if not hasattr(get, 'task_id'): |
| return public.returnMsg(False, "缺少参数! task_id") |
| |
|
|
| return self.sync_service.get_creation_status(get.task_id) |
|
|
| def get_group_creation_status(self, get): |
| """通过复制组ID获取创建状态""" |
| if not hasattr(get, 'group_id'): |
| return public.returnMsg(False, "缺少参数! group_id") |
| |
| try: |
| |
| task_id = self.config_manager.get_group_task_id(get.group_id) |
| if not task_id: |
| return public.returnMsg(False, "未找到该复制组的创建任务ID") |
| |
| |
| return self.sync_service.get_creation_status(task_id) |
| |
| except Exception as e: |
| error_msg = "获取复制组创建状态失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| return public.returnMsg(False, error_msg) |
|
|
| |
| def get_group_detail(self, get): |
| """获取复制组详细信息""" |
| if not hasattr(get, 'group_id'): |
| return public.returnMsg(False, "缺少参数! group_id") |
| |
| try: |
| group_config = self.config_manager.get_group_config(get.group_id) |
| if not group_config: |
| return public.returnMsg(False, "未找到复制组配置") |
| |
| |
| master_detail = self.monitor_service.get_master_detail_info(group_config['master_node_id']) |
| |
| |
| slave_details = [] |
| for slave_node_id in group_config.get('slave_nodes', []): |
| slave_detail = self.monitor_service.get_slave_detail_info(slave_node_id) |
| slave_details.append(slave_detail) |
| |
| result = { |
| "status": True, |
| "msg": "success", |
| "data": { |
| "group_info": group_config, |
| "master_detail": master_detail, |
| "slave_details": slave_details |
| } |
| } |
| return result |
| |
| except Exception as e: |
| error_msg = "获取复制组详细信息失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| return public.returnMsg(False, error_msg) |
|
|
| |
| def get_slave_status(self, get): |
| """获取从库状态""" |
| if not hasattr(get, 'slave_node_id'): |
| return public.returnMsg(False, "缺少参数! slave_node_id") |
| |
| return self.monitor_service.get_slave_status(get.slave_node_id) |
|
|
| def reconnect_slave(self, get): |
| """重连从库""" |
| required_params = ['group_id', 'slave_node_id'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
| |
| return self.slave_manager.reconnect_slave(get.group_id, get.slave_node_id) |
|
|
| def remove_slave(self, get): |
| """移除从库""" |
| required_params = ['group_id', 'slave_node_id'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
| |
| return self.slave_manager.remove_slave(get.group_id, get.slave_node_id) |
|
|
| def add_slave_to_group(self, get): |
| """向复制组添加新的从节点""" |
| required_params = ['group_id', 'new_slave_node_id'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
| |
| return self.sync_service.add_slave_to_existing_group(get.group_id, get.new_slave_node_id) |
|
|
| |
| def delete_replication_group(self, get): |
| """删除主从复制组""" |
| if not hasattr(get, 'group_id'): |
| return public.returnMsg(False, "缺少参数! group_id") |
|
|
| group_id = get.group_id |
| group_config = self.config_manager.get_group_config(group_id) |
| if not group_config: |
| return public.returnMsg(False, "未找到复制组配置信息") |
|
|
| try: |
| cleanup_errors = [] |
| |
| |
| self.config_manager.write_group_log(group_id, "开始删除主从复制组", "INFO") |
| |
| |
| slave_nodes = group_config.get('slave_nodes', []) |
| for slave_node_id in slave_nodes: |
| try: |
| |
| stop_result = self.slave_manager.stop_slave_replication(slave_node_id) |
| if not stop_result.get("status"): |
| cleanup_errors.append(f"停止从库 {slave_node_id} 复制失败: {stop_result.get('msg')}") |
| |
| |
| clean_result = self.redis_manager.stop_replication(slave_node_id) |
| if not clean_result.get("status"): |
| cleanup_errors.append(f"清理从库 {slave_node_id} 配置失败: {clean_result.get('msg')}") |
| else: |
| self.config_manager.write_group_log(group_id, f"从库 {slave_node_id} 配置已清理", "INFO") |
| |
| except Exception as e: |
| cleanup_errors.append(f"处理从库 {slave_node_id} 异常: {str(e)}") |
| |
| |
| master_node_id = group_config.get('master_node_id') |
| if master_node_id: |
| try: |
| |
| master_cleanup = self.redis_manager.cleanup_replication_users(master_node_id) |
| if not master_cleanup: |
| cleanup_errors.append(f"清理主库 {master_node_id} 配置失败") |
| else: |
| self.config_manager.write_group_log(group_id, f"主库 {master_node_id} 配置已清理", "INFO") |
| except Exception as e: |
| cleanup_errors.append(f"清理主库 {master_node_id} 异常: {str(e)}") |
| |
| |
| self.config_manager.write_group_log(group_id, "删除配置文件和日志", "INFO") |
| |
| |
| if cleanup_errors: |
| for error in cleanup_errors: |
| self.config_manager.write_group_log(group_id, f"清理错误: {error}", "WARNING") |
| |
| |
| remove_result = self.config_manager.remove_group_config(group_id) |
| if not remove_result: |
| return public.returnMsg(False, "删除配置文件失败") |
| |
| |
| if cleanup_errors: |
| error_summary = "; ".join(cleanup_errors[:3]) |
| if len(cleanup_errors) > 3: |
| error_summary += f" 等共{len(cleanup_errors)}个错误" |
| return public.returnMsg(True, f"删除成功,但存在清理问题: {error_summary}") |
| else: |
| return public.returnMsg(True, "删除成功!") |
| |
| except Exception as e: |
| error_msg = "删除复制组失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| |
| |
| try: |
| self.config_manager.remove_group_config(group_id) |
| except: |
| pass |
| |
| return public.returnMsg(False, error_msg) |
|
|
| |
| def get_monitor_data(self, get=None): |
| """获取监控数据""" |
| try: |
| return self.monitor_service.get_all_monitor_data() |
| except Exception as e: |
| error_msg = "获取监控数据失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| return public.returnMsg(False, error_msg) |
|
|
| def get_alert_info(self, get=None): |
| """获取告警信息""" |
| try: |
| alert_data = [] |
| |
| |
| all_groups = self.config_manager.get_all_replication_groups() |
| |
| for group in all_groups: |
| |
| master_alerts = self.monitor_service.check_master_alerts(group['master_node_id']) |
| if master_alerts: |
| for alert in master_alerts: |
| if alert.get("type") == "master_offline": |
| alert_data.append(alert) |
| |
| |
| for slave_node_id in group.get('slave_nodes', []): |
| slave_alerts = self.monitor_service.check_slave_alerts(slave_node_id) |
| if slave_alerts: |
| for alert in slave_alerts: |
| if alert.get("type") == "slave_offline": |
| alert_data.append(alert) |
|
|
| return { |
| "status": True, |
| "msg": "success", |
| "data": alert_data, |
| "count": len(alert_data) |
| } |
| |
| except Exception as e: |
| error_msg = "获取告警信息失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| return public.returnMsg(False, error_msg) |
|
|
| |
| def generate_redis_password(self, get=None): |
| """生成Redis密码""" |
| import random |
| import string |
| |
| |
| chars = string.ascii_letters + string.digits + "!@#$%^&*" |
| password = ''.join(random.choice(chars) for _ in range(16)) |
| |
| return public.returnMsg(True, password) |
|
|
| def validate_redis_connection(self, get): |
| """验证Redis连接""" |
| required_params = ['node_id', 'password'] |
| for param in required_params: |
| if not hasattr(get, param): |
| return public.returnMsg(False, "缺少参数! {}".format(param)) |
| |
| return self.redis_manager.test_connection(get.node_id, get.password) |
|
|
| def get_available_nodes(self, get=None): |
| """获取可用的Redis节点列表""" |
| |
| try: |
| nodes = public.M("node").select() |
| available_nodes = [] |
| |
| |
| all_groups = self.config_manager.get_all_replication_groups() |
| |
| |
| node_group_map = {} |
| for group in all_groups: |
| |
| master_id = group.get('master_node_id') |
| if master_id: |
| node_group_map[master_id] = { |
| 'group_id': group['group_id'], |
| 'group_name': group['group_name'], |
| 'role': 'master' |
| } |
| |
| |
| for slave_id in group.get('slave_nodes', []): |
| node_group_map[slave_id] = { |
| 'group_id': group['group_id'], |
| 'group_name': group['group_name'], |
| 'role': 'slave' |
| } |
| |
| for node in nodes: |
| try: |
| |
| |
| |
| |
| |
|
|
| |
| redis_version = self.redis_manager.get_redis_version_by_node(node['id']) |
| |
| if redis_version: |
| |
| service_status = self.redis_manager.check_redis_service(node['id']) |
| node_status = 'online' if service_status.get('status', False) else 'offline' |
| |
| |
| is_in_replication_group = node['id'] in node_group_map |
| group_info = node_group_map.get(node['id'], None) |
| |
|
|
| server_ip = node.get('server_ip', '') |
| if server_ip == "127.0.0.1": |
| server_ip = public.GetLocalIp() |
| |
| node_info = { |
| 'id': node['id'], |
| 'name': node.get('remarks', ''), |
| 'server_ip': server_ip, |
| 'status': node_status, |
| 'redis_version': redis_version, |
| 'type': 'standalone', |
| 'is_in_replication_group': is_in_replication_group, |
| 'group_info': group_info |
| } |
| available_nodes.append(node_info) |
| |
| except Exception as e: |
| |
| public.WriteLog("TYPE_REDIS", f"检测节点 {node.get('id')} Redis状态失败: {str(e)}") |
| continue |
| |
| return { |
| "status": True, |
| "msg": "success", |
| "data": available_nodes |
| } |
| |
| except Exception as e: |
| error_msg = "获取可用节点失败: {}".format(str(e)) |
| public.WriteLog("TYPE_REDIS", error_msg) |
| return public.returnMsg(False, error_msg) |
|
|
| |
| def auto_monitor_task(self): |
| """自动监控任务(后台任务调用)""" |
| return self.monitor_service.auto_monitor_task() |
|
|
| def health_check_task(self): |
| """健康检查任务(后台任务调用)""" |
| return self.monitor_service.health_check_task() |
| |
|
|