ai / bt-source /panel /mod /project /node /mysql_slaveMod.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
17e971c verified
# -*- coding: utf-8 -*-
"""
MySQL 主从复制模块 - 前端接口层
负责处理前端请求,调用具体的业务逻辑实现
"""
import json
import os
import sys
import time
# 添加路径
if '/www/server/panel/class' not in sys.path:
sys.path.insert(0, '/www/server/panel/class')
if '/www/server/panel' not in sys.path:
sys.path.insert(0, '/www/server/panel')
import public
from mod.project.node.mysql_slave_util.config_manager import ConfigManager
from mod.project.node.mysql_slave_util.mysql_manager import MySQLManager
from mod.project.node.mysql_slave_util.slave_manager import SlaveManager
from mod.project.node.mysql_slave_util.sync_service import SyncService
from mod.project.node.mysql_slave_util.firewall_manager import FirewallManager
class main:
"""MySQL 主从复制主模块"""
def __init__(self):
self.config_manager = ConfigManager()
self.mysql_manager = MySQLManager()
self.slave_manager = SlaveManager()
self.sync_service = SyncService()
self.firewall_manager = FirewallManager()
# ==================== 版本和检查相关 ====================
def get_mysql_version(self, get=None):
"""获取MySQL版本"""
version = self.mysql_manager.get_mysql_version()
if not version:
return public.returnMsg(False, "此功能仅支持mysql-5.7以上版本使用!")
return public.returnMsg(True, version)
def check_slave(self, get):
"""检查从库连接和配置"""
required_params = ['panel_addr', 'panel_key', 'master_ip', 'slave_ip']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.slave_manager.check_slave(
get.panel_addr, get.panel_key, get.master_ip, get.slave_ip
)
# ==================== 主库相关 ====================
def database_list(self, get=None):
"""获取主库数据库列表(带分页和搜索功能)"""
try:
# 1. 检查MySQL版本
version_check = self.mysql_manager.get_mysql_version()
if not version_check:
return public.returnMsg(False, "此功能仅支持mysql-5.7以上版本使用!")
# 2. 获取并验证分页参数
search = getattr(get, 'search', '').strip() if get else ''
p = int(getattr(get, 'p', 1)) if get and hasattr(get, 'p') else 1
limit = int(getattr(get, 'limit', 20)) if get and hasattr(get, 'limit') else 20
# 参数验证
if p < 1:
p = 1
if limit < 1 or limit > 100:
limit = 20
# 3. 获取所有数据库列表
all_data = self.mysql_manager.database_list()
if not all_data:
return {
"status": True,
"msg": "success",
"data": [],
"page": {
"current_page": p,
"total_pages": 0,
"total_count": 0,
"per_page": limit,
"has_prev": False,
"has_next": False,
"start_item": 0,
"end_item": 0,
"showing": "显示第 0 到 0 项,共 0 项"
},
"search": search,
"total": 0
}
# 4. 搜索过滤
filtered_data = all_data
if search:
filtered_data = []
for item in all_data:
# 支持按数据库名称搜索(如果item是字符串)
if isinstance(item, str):
if search.lower() in item.lower():
filtered_data.append(item)
# 如果item是字典,支持多字段搜索
elif isinstance(item, dict):
search_fields = ['name', 'database', 'db_name', 'schema_name']
match_found = False
for field in search_fields:
if field in item and item[field] and search.lower() in str(item[field]).lower():
match_found = True
break
if match_found:
filtered_data.append(item)
else:
# 其他类型转换为字符串进行搜索
if search.lower() in str(item).lower():
filtered_data.append(item)
# 5. 计算分页参数
total_count = len(filtered_data)
total_pages = (total_count + limit - 1) // limit if total_count > 0 else 0
offset = (p - 1) * limit
# 6. 获取当前页数据
current_page_data = filtered_data[offset:offset + limit]
# 7. 构建分页信息
start_item = offset + 1 if total_count > 0 else 0
end_item = min(offset + limit, total_count)
page_info = {
"current_page": p,
"total_pages": total_pages,
"total_count": total_count,
"per_page": limit,
"has_prev": p > 1,
"has_next": p < total_pages,
"prev_page": p - 1 if p > 1 else None,
"next_page": p + 1 if p < total_pages else None,
"start_item": start_item,
"end_item": end_item,
"showing": "显示第 {} 到 {} 项,共 {} 项".format(start_item, end_item, total_count)
}
# 8. 返回结果
result = {
"status": True,
"msg": "success",
"data": current_page_data,
"page": page_info,
"search": search,
"total": total_count
}
return result
except Exception as e:
import traceback
error_msg = "获取数据库列表失败: {}".format(str(e))
public.WriteLog("TYPE_DATABASE", error_msg)
return {
"status": False,
"msg": error_msg,
"data": [],
"page": {},
"search": "",
"total": 0
}
def get_master_sql_list(self, get=None):
"""获取主库备份文件列表"""
data = self.mysql_manager.get_master_sql_list()
result = {
"status": True,
"msg": "success",
"data": data
}
return result
def del_master_sql(self, get):
"""删除主库备份文件"""
if not hasattr(get, 'path'):
return public.returnMsg(False, "缺少参数! path")
return self.mysql_manager.del_master_sql(get.path)
# ==================== 从库管理 ====================
def slave_list(self, get=None):
"""获取从库列表"""
# 检查MySQL版本
version_check = self.mysql_manager.get_mysql_version()
if not version_check:
return public.returnMsg(False, "此功能仅支持mysql-5.7以上版本使用!")
# 检查旧版本兼容性
try:
old_config_json = "/www/server/panel/plugin/mysql_replicate/config.json.0"
if os.path.exists(old_config_json):
old_config_json = json.loads(public.ReadFile(old_config_json))
slave_list = old_config_json.get("slave")
master_list = old_config_json.get("master")
if slave_list and len(slave_list) > 0 and master_list and len(master_list) > 0:
return public.returnMsg(False, "当前主从版本不支持旧版主从同步,如需还原旧版本请在终端执行以下命令还原(不影响当前主从配置) 命令:wget -O remysql_slave.sh https://download.bt.cn/tools/remysql_slave.sh && bash remysql_slave.sh")
except:
pass
all_configs = self.config_manager.get_all_slave_configs()
if not all_configs:
return []
search_ip=None
if get and get.search:
search_ip=get.search
data = []
for item in all_configs:
info = {
"slave_ip": item["slave_ip"],
"panel_addr": item["panel_addr"],
"panel_key": item["panel_key"],
"err_code": item["err_code"],
"sync_type": item["sync_type"],
"config_status": item["config_status"],
"run_status": item["run_status"],
"db_name": item["db_name"]
}
check_slave_result = self.slave_manager.check_slave(item.get("panel_addr"), item.get("panel_key"), item.get("master_ip"), item.get("slave_ip"))
slave_ip = item.get("slave_ip")
slave_ip = item.get("slave_ip")
if item.get("config_status") == "done" and check_slave_result.get("status") is True:
try:
slave_status = json.loads(self.slave_manager.get_slave_info(slave_ip))
io_status = slave_status.get("Slave_IO_Running")
sql_status = slave_status.get("Slave_SQL_Running")
if io_status and sql_status:
info["io_status"] = io_status
info["sql_status"] = sql_status
if io_status != "Yes" or sql_status != "Yes":
info["error_msg"] = {
"Last_IO_Error_Timestamp": slave_status.get("Last_IO_Error_Timestamp"),
"Last_IO_Errno": slave_status.get("Last_IO_Errno"),
"Last_IO_Error": slave_status.get("Last_IO_Error"),
"Last_SQL_Error_Timestamp": slave_status.get("Last_SQL_Error_Timestamp"),
"Last_SQL_Errno": slave_status.get("Last_SQL_Errno"),
"Last_SQL_Error": slave_status.get("Last_SQL_Error")
}
except:
info["io_status"] = "None"
info["sql_status"] = "None"
else:
info["io_status"] = "api_error"
info["sql_status"] = "api_error"
api_status_result = self.slave_manager.get_slave_mysql_status(slave_ip)
info["api_status"] = api_status_result.get("status")
info["api_msg"] = api_status_result.get("msg")
data.append(info)
if search_ip:
data = [item for item in data if search_ip in item["slave_ip"]]
result = {
"status": True,
"msg": "success",
"data": data
}
return result
def add_slave_server(self, get):
"""添加主从配置"""
required_params = ['node_id', 'db_name', 'err_code', 'sync_type', 'master_ip', 'slave_ip']
#required_params = ['panel_addr', 'panel_key', 'db_name', 'err_code', 'sync_type', 'master_ip', 'slave_ip']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
node_id=get.node_id
node_data = public.M("node").where("id = ?", (node_id,)).find()
if not node_data:
return public.returnMsg(False, "未找到节点信息")
if get.master_ip == get.slave_ip:
return public.returnMsg(False, "主库和从库不能是同一个IP!")
public.set_module_logs("nodes_mysql_slave_9", "add_slave_server")
if not os.path.exists("/www/server/panel/config/mysql_slave_info"):
public.ExecShell("mkdir -p /www/server/panel/config/mysql_slave_info")
panel_addr = node_data.get("address")
panel_key = node_data.get("api_key") or node_data.get("app_key")
ip_validation_result = self.validate_ip_network_type(get.master_ip, get.slave_ip)
if not ip_validation_result.get("status"):
return public.returnMsg(False, ip_validation_result.get("msg"))
check_slave_result = self.slave_manager.check_slave(
panel_addr, panel_key, get.master_ip, get.slave_ip
)
if check_slave_result.get("status") is False:
return public.returnMsg(False, check_slave_result.get("msg"))
if os.path.exists("/www/server/panel/config/mysql_slave_info/log/{}.log".format(get.master_ip)):
public.ExecShell("rm -rf /www/server/panel/config/mysql_slave_info/log/{}.log".format(get.master_ip))
default_err_code="1007,1050"
if get.err_code == "":
err_code=default_err_code
else:
err_code=get.err_code + "," + default_err_code
return self.sync_service.add_slave_server(
panel_addr, panel_key, get.master_ip,
get.slave_ip, get.db_name, err_code, get.sync_type
)
def del_slave_server(self, get):
"""删除从库服务器"""
if not hasattr(get, 'slave_ip'):
return public.returnMsg(False, "缺少参数! slave_ip")
slave_ip = get.slave_ip
slave_info = self.config_manager.get_slave_config(slave_ip)
if not slave_info:
return public.returnMsg(False, "未找到从库配置信息")
# 停止从库同步
self.slave_manager.exec_shell_sql(slave_ip, "stop slave;")
self.slave_manager.exec_shell_sql(slave_ip, "RESET SLAVE ALL;")
# 删除MySQL用户
master_user = slave_info.get("master_user")
if master_user:
self.mysql_manager.drop_slave_user(master_user, slave_ip)
# 删除配置
self.config_manager.remove_slave_config(slave_ip)
return public.returnMsg(True, "删除成功!")
def set_slave_status(self, get):
"""设置从库状态"""
required_params = ['slave_ip', 'status']
for param in required_params:
if not hasattr(get, param):
return public.returnMsg(False, "缺少参数! {}".format(param))
return self.slave_manager.set_slave_status(get.slave_ip, get.status)
def get_slave_info(self, get):
"""获取从库信息(面板用)"""
if not hasattr(get, 'slave_ip'):
return public.returnMsg(False, "缺少参数! slave_ip")
return self.slave_manager.get_slave_info_dict(get.slave_ip)
# ==================== 同步相关 ====================
def sync_slave_config(self, get):
"""手动同步从库配置"""
if not hasattr(get, 'slave_ip'):
return public.returnMsg(False, "缺少参数! slave_ip")
return self.sync_service.sync_slave_config(get.slave_ip)
def get_sync_status(self, get):
"""获取同步状态"""
if not hasattr(get, 'slave_ip'):
return public.returnMsg(False, "缺少参数! slave_ip")
slave_ip = get.slave_ip
info = self.config_manager.get_slave_config(slave_ip)
if not info:
return public.returnMsg(False, "未找到同步信息,可能未开始配置,请删除重建主从")
# 添加日志信息
log_path = self.config_manager.master_slave_log_path + "/" + slave_ip + ".log"
if os.path.exists(log_path):
info["data"]["logs"] = public.ReadFile(log_path)
else:
info["data"]["logs"] = ""
result = {
"status": True,
"msg": "success",
"data": info
}
return result
def get_slave_error_info(self, get):
slave_data=self.slave_list(None)
error_data=[]
for item in slave_data["data"]:
io_status = item.get("io_status")
sql_status = item.get("sql_status")
if not io_status or not sql_status:
error_time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
data={
"slave_ip": item["slave_ip"],
"error_time": error_time,
"io_status": io_status,
"sql_status": sql_status,
"error_msg": "无法获取到从库状态,请检查从库mysql是否正常运行"
}
error_data.append(data)
else:
if io_status != "Yes" or sql_status != "Yes":
error_msg = item.get("error_msg", {})
if error_msg.get("Last_IO_Error_Timestamp"):
error_time = error_msg["Last_IO_Error_Timestamp"]
elif error_msg.get("Last_SQL_Error_Timestamp"):
error_time = error_msg["Last_SQL_Error_Timestamp"]
else:
error_time = ""
data={
"slave_ip": item["slave_ip"],
"error_time": error_time,
"io_status": io_status,
"sql_status": sql_status,
"error_msg": error_msg
}
error_data.append(data)
return error_data
# ==================== 兼容性方法 ====================
def get_old_slave_list(self, get=None):
"""获取旧版从库列表(兼容性)"""
# 这个方法主要用于兼容旧版本,可以根据需要实现
return False
def info_test(self, slave_ip):
"""测试信息(兼容性)"""
info = self.config_manager.get_slave_config(slave_ip)
if info:
for step in info["data"]["steps"]:
if step["name"] == "导入数据":
step["name"] = "从库数据库已创建好,等待手动将数据导入从库...."
self.config_manager.save_slave_config(slave_ip, info)
# ==================== 后台任务方法 ====================
def auto_sync_data(self, slave_ip):
"""自动同步数据(后台任务调用)"""
return self.sync_service.auto_sync_data(slave_ip)
def manual_sync_data(self, slave_ip):
"""手动同步数据(后台任务调用)"""
return self.sync_service.manual_sync_data(slave_ip)
def is_private_ip(self, ip_address):
"""
判断IP地址是否为内网IP
:param ip_address: IP地址字符串
:return: True表示内网IP,False表示公网IP
"""
import ipaddress
try:
ip = ipaddress.ip_address(ip_address)
# 判断是否为私有地址
return ip.is_private or ip.is_loopback or ip.is_link_local
except ValueError:
# IP地址格式不正确,返回False
return False
def validate_ip_network_type(self, master_ip, slave_ip):
"""
验证主从IP是否为同一网络类型(都是内网或都是公网)
:param master_ip: 主库IP
:param slave_ip: 从库IP
:return: dict 包含验证结果和消息
"""
master_is_private = self.is_private_ip(master_ip)
slave_is_private = self.is_private_ip(slave_ip)
# 如果都是内网IP或都是公网IP,则通过验证
if master_is_private == slave_is_private:
network_type = "内网" if master_is_private else "公网"
return {
"status": True,
"msg": "IP网络类型验证通过,主从库均为{}IP".format(network_type),
"master_type": "内网" if master_is_private else "公网",
"slave_type": "内网" if slave_is_private else "公网"
}
else:
master_type = "内网" if master_is_private else "公网"
slave_type = "内网" if slave_is_private else "公网"
return {
"status": False,
"msg": "主从库IP网络类型不匹配!主库({})为{}IP,从库({})为{}IP。请使用相同网络类型的IP地址确保可连接。".format(
master_ip, master_type, slave_ip, slave_type
),
"master_type": master_type,
"slave_type": slave_type
}