action / bt-source /panel /mod /project /node /redis_slaveMod.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
020c337 verified
# -*- coding: utf-8 -*-
"""
Redis 主从复制模块 - 前端接口层
负责处理前端请求,调用具体的业务逻辑实现
"""
import json
import os
import sys
import time
import socket
# 添加路径
if '/www/server/panel/class' not in sys.path:
sys.path.insert(0, '/www/server/panel/class')
if '/www/server/panel' not in sys.path:
sys.path.insert(0, '/www/server/panel')
import public
from mod.project.node.redis_slave_util.config_manager import ConfigManager
from mod.project.node.redis_slave_util.redis_manager import RedisManager
from mod.project.node.redis_slave_util.slave_manager import SlaveManager
from mod.project.node.redis_slave_util.sync_service import SyncService
from mod.project.node.redis_slave_util.monitor_service import MonitorService
class main:
"""Redis 主从复制主模块"""
def __init__(self):
self.config_manager = ConfigManager()
self.redis_manager = RedisManager()
self.slave_manager = SlaveManager()
self.sync_service = SyncService()
self.monitor_service = MonitorService()
# ==================== 版本和检查相关 ====================
def get_redis_version(self, get=None):
"""获取Redis版本"""
version = self.redis_manager.get_redis_version()
if not version:
return public.returnMsg(False, "此功能仅支持Redis-3.2以上版本使用!")
return public.returnMsg(True, version)
def check_slave(self, get):
"""检查从库连接和配置"""
required_params = ['panel_addr', 'panel_key', 'master_ip', 'slave_ip']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.slave_manager.check_slave(
get.panel_addr, get.panel_key, get.master_ip, get.slave_ip
)
def check_redis_port(self, server_ip=None, port=6379):
"""检查Redis端口是否开放"""
try:
# 将端口转换为整数
if isinstance(port, str):
port = int(port)
# 创建socket连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3) # 设置3秒超时
# 尝试连接
result = sock.connect_ex((server_ip, port))
sock.close()
# connect_ex返回0表示连接成功
if result == 0:
return True
else:
return False
except Exception as e:
return False
# ==================== 主从复制组管理 ====================
def replication_group_list(self, get=None):
"""获取主从复制组列表(带分页和搜索功能)"""
try:
# 1. 检查Redis版本
version_check = self.redis_manager.get_redis_version()
if not version_check:
return public.returnMsg(False, "此功能仅支持Redis-3.2以上版本使用!")
# 2. 获取并验证分页参数
search = getattr(get, 'search', '').strip() if get else ''
p = int(getattr(get, 'p', 1)) if get and hasattr(get, 'p') else 1
limit = int(getattr(get, 'limit', 20)) if get and hasattr(get, 'limit') else 20
# 参数验证
if p < 1:
p = 1
if limit < 1 or limit > 100:
limit = 20
# 3. 获取所有复制组列表
all_data = self.config_manager.get_all_replication_groups()
if not all_data:
return {
"status": True,
"msg": "success",
"data": [],
"page": {
"current_page": p,
"total_pages": 0,
"total_count": 0,
"per_page": limit,
"has_prev": False,
"has_next": False,
"start_item": 0,
"end_item": 0,
"showing": "显示第 0 到 0 项,共 0 项"
},
"search": search,
"total": 0
}
# 4. 丰富数据 - 添加监控信息
enriched_data = []
for group in all_data:
group_info = self.enrich_group_data(group)
enriched_data.append(group_info)
# 5. 搜索过滤
filtered_data = enriched_data
if search:
filtered_data = []
for item in enriched_data:
# 支持按复制组名称、主库地址搜索
if (search.lower() in item.get('group_name', '').lower() or
search.lower() in item.get('master_server_ip', '').lower()):
filtered_data.append(item)
# 6. 计算分页参数
total_count = len(filtered_data)
total_pages = (total_count + limit - 1) // limit if total_count > 0 else 0
offset = (p - 1) * limit
# 7. 获取当前页数据
current_page_data = filtered_data[offset:offset + limit]
# 8. 构建分页信息
start_item = offset + 1 if total_count > 0 else 0
end_item = min(offset + limit, total_count)
page_info = {
"current_page": p,
"total_pages": total_pages,
"total_count": total_count,
"per_page": limit,
"has_prev": p > 1,
"has_next": p < total_pages,
"prev_page": p - 1 if p > 1 else None,
"next_page": p + 1 if p < total_pages else None,
"start_item": start_item,
"end_item": end_item,
"showing": "显示第 {} 到 {} 项,共 {} 项".format(start_item, end_item, total_count)
}
# 9. 返回结果
result = {
"status": True,
"msg": "success",
"data": current_page_data,
"page": page_info,
"search": search,
"total": total_count
}
return result
except Exception as e:
import traceback
error_msg = "获取主从复制组列表失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
return {
"status": False,
"msg": error_msg,
"data": [],
"page": {},
"search": "",
"total": 0
}
def enrich_group_data(self, group):
"""丰富复制组数据,添加监控信息"""
try:
# 获取主节点监控数据
master_info = self.monitor_service.get_master_status(group['master_node_id'])
# 获取从节点状态
slave_count = len(group.get('slave_nodes', []))
normal_slaves = 0
total_lag = 0
lag_count = 0
for slave_node_id in group.get('slave_nodes', []):
slave_status = self.monitor_service.get_slave_status(slave_node_id)
if slave_status.get('is_online', False):
normal_slaves += 1
if slave_status.get('lag') is not None:
total_lag += slave_status['lag']
lag_count += 1
# 计算平均延迟
avg_lag = round(total_lag / lag_count, 2) if lag_count > 0 else 0
return {
'group_id': group['group_id'],
'group_name': group['group_name'],
'master_server_ip': "{}:{}".format(group['master_ip'], group['master_port']),
'memory_usage': master_info.get('memory_usage_percent', 0),
'qps': master_info.get('qps', 0),
'connections': master_info.get('connections', 0),
'slave_count': slave_count,
'normal_slaves': normal_slaves,
'avg_lag': avg_lag,
'created_time': group.get('created_time', ''),
'status': 'normal' if normal_slaves == slave_count else 'warning',
'task_id': group.get('task_id', '')
}
except Exception as e:
public.WriteLog("TYPE_REDIS", "丰富复制组数据失败: {}".format(str(e)))
return group
# ==================== 创建主从复制组 ====================
def create_replication_group(self, get):
"""创建主从复制组"""
required_params = ['group_name', 'master_node_id', 'slave_node_ids','master_ip','redis_password']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
# 验证参数
group_name = get.group_name.strip()
if not group_name:
return public.returnMsg(False, "主从名称不能为空!")
# 检查名称是否已存在
if self.config_manager.group_name_exists(group_name):
return public.returnMsg(False, "主从复制组名称已存在!")
# 获取节点信息
master_node = public.M("node").where("id = ?", (get.master_node_id,)).find()
if not master_node:
return public.returnMsg(False, "未找到主节点信息")
slave_node_ids = json.loads(get.slave_node_ids) if isinstance(get.slave_node_ids, str) else get.slave_node_ids
slave_nodes = []
for slave_id in slave_node_ids:
slave_node = public.M("node").where("id = ?", (slave_id,)).find()
if not slave_node:
return public.returnMsg(False, "未找到从节点信息: {}".format(slave_id))
slave_nodes.append(slave_node)
# 验证主从IP不能相同
#master_ip = master_node.get("server_ip")
master_ip = get.master_ip
for slave_node in slave_nodes:
slave_ip = slave_node.get("server_ip")
if master_ip == slave_ip:
return public.returnMsg(False, "主库和从库不能是同一个IP!")
# if slave_ip == "127.0.0.1":
# return public.returnMsg(False, "从库不能是127.0.0.1!")
if master_ip == "127.0.0.1":
return public.returnMsg(False, "主库不能是127.0.0.1!")
# 检查密码强度
if len(get.redis_password) < 8:
return public.returnMsg(False, "Redis密码长度不能少于8位!")
ip_list = [master_ip]
for slave_node in slave_nodes:
ip_list.append(slave_node.get("server_ip"))
for ip in ip_list:
if not self.check_redis_port(ip):
return public.returnMsg(False, "连接{} redis失败,请指定ip放行6379端口,并设置redis外网访问权限(注意设置密码)".format(ip))
# 创建配置目录
if not os.path.exists("/www/server/panel/config/redis_slave_info"):
public.ExecShell("mkdir -p /www/server/panel/config/redis_slave_info")
# 启动创建工作流
return self.sync_service.create_replication_group(
group_name, master_node, slave_nodes, get.redis_password,master_ip
)
def get_creation_status(self, get):
"""获取创建状态"""
if not hasattr(get, 'task_id'):
return public.returnMsg(False, "缺少参数! task_id")
return self.sync_service.get_creation_status(get.task_id)
def get_group_creation_status(self, get):
"""通过复制组ID获取创建状态"""
if not hasattr(get, 'group_id'):
return public.returnMsg(False, "缺少参数! group_id")
try:
# 通过group_id获取task_id
task_id = self.config_manager.get_group_task_id(get.group_id)
if not task_id:
return public.returnMsg(False, "未找到该复制组的创建任务ID")
# 获取任务状态
return self.sync_service.get_creation_status(task_id)
except Exception as e:
error_msg = "获取复制组创建状态失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
return public.returnMsg(False, error_msg)
# ==================== 复制组详细管理 ====================
def get_group_detail(self, get):
"""获取复制组详细信息"""
if not hasattr(get, 'group_id'):
return public.returnMsg(False, "缺少参数! group_id")
try:
group_config = self.config_manager.get_group_config(get.group_id)
if not group_config:
return public.returnMsg(False, "未找到复制组配置")
# 获取主节点详细信息
master_detail = self.monitor_service.get_master_detail_info(group_config['master_node_id'])
# 获取从节点详细信息列表
slave_details = []
for slave_node_id in group_config.get('slave_nodes', []):
slave_detail = self.monitor_service.get_slave_detail_info(slave_node_id)
slave_details.append(slave_detail)
result = {
"status": True,
"msg": "success",
"data": {
"group_info": group_config,
"master_detail": master_detail,
"slave_details": slave_details
}
}
return result
except Exception as e:
error_msg = "获取复制组详细信息失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
return public.returnMsg(False, error_msg)
# ==================== 从库管理 ====================
def get_slave_status(self, get):
"""获取从库状态"""
if not hasattr(get, 'slave_node_id'):
return public.returnMsg(False, "缺少参数! slave_node_id")
return self.monitor_service.get_slave_status(get.slave_node_id)
def reconnect_slave(self, get):
"""重连从库"""
required_params = ['group_id', 'slave_node_id']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.slave_manager.reconnect_slave(get.group_id, get.slave_node_id)
def remove_slave(self, get):
"""移除从库"""
required_params = ['group_id', 'slave_node_id']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.slave_manager.remove_slave(get.group_id, get.slave_node_id)
def add_slave_to_group(self, get):
"""向复制组添加新的从节点"""
required_params = ['group_id', 'new_slave_node_id']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.sync_service.add_slave_to_existing_group(get.group_id, get.new_slave_node_id)
# ==================== 复制组操作 ====================
def delete_replication_group(self, get):
"""删除主从复制组"""
if not hasattr(get, 'group_id'):
return public.returnMsg(False, "缺少参数! group_id")
group_id = get.group_id
group_config = self.config_manager.get_group_config(group_id)
if not group_config:
return public.returnMsg(False, "未找到复制组配置信息")
try:
cleanup_errors = []
# 记录开始删除
self.config_manager.write_group_log(group_id, "开始删除主从复制组", "INFO")
# 1. 停止所有从库复制并清理配置
slave_nodes = group_config.get('slave_nodes', [])
for slave_node_id in slave_nodes:
try:
# 停止从库复制
stop_result = self.slave_manager.stop_slave_replication(slave_node_id)
if not stop_result.get("status"):
cleanup_errors.append(f"停止从库 {slave_node_id} 复制失败: {stop_result.get('msg')}")
# 清理从节点Redis配置(移除replicaof等配置)
clean_result = self.redis_manager.stop_replication(slave_node_id)
if not clean_result.get("status"):
cleanup_errors.append(f"清理从库 {slave_node_id} 配置失败: {clean_result.get('msg')}")
else:
self.config_manager.write_group_log(group_id, f"从库 {slave_node_id} 配置已清理", "INFO")
except Exception as e:
cleanup_errors.append(f"处理从库 {slave_node_id} 异常: {str(e)}")
# 2. 清理主库复制相关配置
master_node_id = group_config.get('master_node_id')
if master_node_id:
try:
# 清理主库复制用户和配置
master_cleanup = self.redis_manager.cleanup_replication_users(master_node_id)
if not master_cleanup:
cleanup_errors.append(f"清理主库 {master_node_id} 配置失败")
else:
self.config_manager.write_group_log(group_id, f"主库 {master_node_id} 配置已清理", "INFO")
except Exception as e:
cleanup_errors.append(f"清理主库 {master_node_id} 异常: {str(e)}")
# 3. 删除配置文件和日志文件
self.config_manager.write_group_log(group_id, "删除配置文件和日志", "INFO")
# 先获取日志内容用于记录错误
if cleanup_errors:
for error in cleanup_errors:
self.config_manager.write_group_log(group_id, f"清理错误: {error}", "WARNING")
# 删除配置和日志文件
remove_result = self.config_manager.remove_group_config(group_id)
if not remove_result:
return public.returnMsg(False, "删除配置文件失败")
# 构建返回消息
if cleanup_errors:
error_summary = "; ".join(cleanup_errors[:3]) # 只显示前3个错误
if len(cleanup_errors) > 3:
error_summary += f" 等共{len(cleanup_errors)}个错误"
return public.returnMsg(True, f"删除成功,但存在清理问题: {error_summary}")
else:
return public.returnMsg(True, "删除成功!")
except Exception as e:
error_msg = "删除复制组失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
# 即使出现异常也尝试删除配置文件
try:
self.config_manager.remove_group_config(group_id)
except:
pass
return public.returnMsg(False, error_msg)
# ==================== 监控相关 ====================
def get_monitor_data(self, get=None):
"""获取监控数据"""
try:
return self.monitor_service.get_all_monitor_data()
except Exception as e:
error_msg = "获取监控数据失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
return public.returnMsg(False, error_msg)
def get_alert_info(self, get=None):
"""获取告警信息"""
try:
alert_data = []
# 获取所有复制组
all_groups = self.config_manager.get_all_replication_groups()
for group in all_groups:
# 检查主节点告警
master_alerts = self.monitor_service.check_master_alerts(group['master_node_id'])
if master_alerts:
for alert in master_alerts:
if alert.get("type") == "master_offline":
alert_data.append(alert)
# 检查从节点告警
for slave_node_id in group.get('slave_nodes', []):
slave_alerts = self.monitor_service.check_slave_alerts(slave_node_id)
if slave_alerts:
for alert in slave_alerts:
if alert.get("type") == "slave_offline":
alert_data.append(alert)
return {
"status": True,
"msg": "success",
"data": alert_data,
"count": len(alert_data)
}
except Exception as e:
error_msg = "获取告警信息失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
return public.returnMsg(False, error_msg)
# ==================== 工具方法 ====================
def generate_redis_password(self, get=None):
"""生成Redis密码"""
import random
import string
# 生成16位强密码,包含字母、数字和特殊字符
chars = string.ascii_letters + string.digits + "!@#$%^&*"
password = ''.join(random.choice(chars) for _ in range(16))
return public.returnMsg(True, password)
def validate_redis_connection(self, get):
"""验证Redis连接"""
required_params = ['node_id', 'password']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.redis_manager.test_connection(get.node_id, get.password)
def get_available_nodes(self, get=None):
"""获取可用的Redis节点列表"""
#_REPO_DIR="/www/server/panel/data/mod_node_status_cache/"
try:
nodes = public.M("node").select()
available_nodes = []
# 获取所有复制组信息,用于检测节点是否已在复制组中
all_groups = self.config_manager.get_all_replication_groups()
# 构建节点到复制组的映射关系
node_group_map = {}
for group in all_groups:
# 主节点
master_id = group.get('master_node_id')
if master_id:
node_group_map[master_id] = {
'group_id': group['group_id'],
'group_name': group['group_name'],
'role': 'master'
}
# 从节点
for slave_id in group.get('slave_nodes', []):
node_group_map[slave_id] = {
'group_id': group['group_id'],
'group_name': group['group_name'],
'role': 'slave'
}
for node in nodes:
try:
# if node.get("api_key") != "local":
# if not os.path.exists(_REPO_DIR+"server_{}.json".format(node.get("id"))):
# continue
# if not self.bt_api_client_test(node.get("id")):
# continue
# 检查节点是否安装了Redis并获取版本信息
redis_version = self.redis_manager.get_redis_version_by_node(node['id'])
if redis_version: # 只有检测到Redis版本的节点才添加到列表
# 检查Redis服务状态
service_status = self.redis_manager.check_redis_service(node['id'])
node_status = 'online' if service_status.get('status', False) else 'offline'
# 检查节点是否在复制组中
is_in_replication_group = node['id'] in node_group_map
group_info = node_group_map.get(node['id'], None)
server_ip = node.get('server_ip', '')
if server_ip == "127.0.0.1":
server_ip = public.GetLocalIp()
node_info = {
'id': node['id'],
'name': node.get('remarks', ''),
'server_ip': server_ip,
'status': node_status,
'redis_version': redis_version,
'type': 'standalone', # Redis类型,默认为单机
'is_in_replication_group': is_in_replication_group,
'group_info': group_info # 如果在复制组中,包含复制组信息;否则为None
}
available_nodes.append(node_info)
except Exception as e:
# 如果检测某个节点失败,记录日志但继续处理其他节点
public.WriteLog("TYPE_REDIS", f"检测节点 {node.get('id')} Redis状态失败: {str(e)}")
continue
return {
"status": True,
"msg": "success",
"data": available_nodes
}
except Exception as e:
error_msg = "获取可用节点失败: {}".format(str(e))
public.WriteLog("TYPE_REDIS", error_msg)
return public.returnMsg(False, error_msg)
# ==================== 后台任务方法 ====================
def auto_monitor_task(self):
"""自动监控任务(后台任务调用)"""
return self.monitor_service.auto_monitor_task()
def health_check_task(self):
"""健康检查任务(后台任务调用)"""
return self.monitor_service.health_check_task()