test / bt-source /panel /class /datalistModel /batchModel.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
08c964e verified
# coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: wzz <wzz@bt.cn>
# -------------------------------------------------------------------
#
# ------------------------------
import json
import sys
import os
import panelSite
import public
from datalistModel.base import dataBase
if '/www/server/panel' not in sys.path:
sys.path.insert(0, '/www/server/panel')
class main(dataBase):
site_obj = None
def __init__(self):
self.site_obj = panelSite.panelSite()
# 2024/12/19 15:48 处理通用参数
def check_parm(self, get):
'''
@name 处理通用参数
'''
get.site_list = get.get("site_list", [])
if type(get.site_list) == str:
try:
get.site_list = json.loads(get.site_list)
except:
pass
get.all = get.get("all/d", 0)
if get.all != 0:
public.set_module_logs('全选设置网站', 'check_parm', 1)
get.exclude_ids = get.get("exclude_ids", [])
if type(get.exclude_ids) == str:
try:
get.exclude_ids = json.loads(get.exclude_ids)
except:
pass
get.project_type = get.get("project_type", None)
if get.project_type is None:
return public.returnResult(False, '请选择项目类型: [PHP,Node,proxy,WP2,Java,html,Go,net,Other]')
return public.returnResult(True, '检查完成!')
# 2024/12/19 14:16 批量删除网站
def batch_delete_sites(self, get):
'''
@name 批量删除网站
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.ftp = get.get("database/d", 0)
get.database = get.get("database/d", 0)
get.path = get.get("path/d", 0)
error_list = []
success_list = []
# 2024/12/19 15:07 如果不是全选所有数据的删除模式,就按照get.site_list的数据进行删除
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.id = site["id"]
get.webname = site["name"]
result = self.site_obj.DeleteSite(get, multiple=1)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "删除成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.id = site["id"]
get.webname = site["name"]
result = self.site_obj.DeleteSite(get, multiple=1)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "删除成功!"})
public.serviceReload()
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/19 15:34 批量设置网站状态
def batch_set_site_status(self, get):
'''
@name 批量设置网站状态
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.option = get.get("option/d", 0)
error_list = []
success_list = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.id = site["id"]
get.name = site["name"]
if int(site["phpsync"]) == 1:
get.sitename = site["name"]
if get.option == 0:
get.project_action = "stop"
elif get.option == 1:
get.project_action = "start"
else:
get.project_action = "restart"
self.batch_set_phpsync_status(get, error_list, success_list)
continue
if get.option == 0:
result = self.site_obj.SiteStop(get)
else:
result = self.site_obj.SiteStart(get)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.id = site["id"]
get.name = site["name"]
project_config = {}
if "project_config" in site:
try:
project_config = json.loads(site["project_config"])
except:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": "项目配置文件解析失败!"})
continue
if "type" in project_config and project_config["type"] == "PHPMOD":
get.sitename = site["name"]
self.batch_set_phpsync_status(get, error_list, success_list)
continue
if get.option == 0:
result = self.site_obj.SiteStop(get)
else:
result = self.site_obj.SiteStart(get)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!"})
public.serviceReload()
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/19 16:02 批量备份站点
def batch_site_backup(self, get):
'''
@name 批量备份站点
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
error_list = []
success_list = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.id = site["id"]
result = self.site_obj.ToBackup(get)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "备份成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.id = site["id"]
result = self.site_obj.ToBackup(get)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "备份成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/19 16:08 批量设置到期时间
def batch_set_site_edate(self, get):
'''
@name 批量设置到期时间
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.edate = get.get("edate", "0000-00-00")
error_list = []
success_list = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.id = site["id"]
result = self.site_obj.SetEdate(get)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.id = site["id"]
result = self.site_obj.SetEdate(get)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/19 16:11 批量设置php版本
def batch_set_php_version(self, get):
'''
@name 批量设置php版本
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.version = get.get("version", "00")
error_list = []
success_list = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.sites_id = site["id"]
get.siteName = site["name"]
result = self.site_obj.SetPHPVersion(get, multiple=1)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.sites_id = site["id"]
get.siteName = site["name"]
result = self.site_obj.SetPHPVersion(get, multiple=1)
if not result["status"]:
error_list.append({"id": site["id"], "name": site["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!"})
public.serviceReload()
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/19 16:18 批量设置防跨站
def batch_set_site_basedir(self, get):
'''
@name
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.stat = get.get("stat", "open")
error_list = []
get.site_ids = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.site_ids.append(site["id"])
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.site_ids.append(site["id"])
sites_info = public.M("sites").where(
"project_type=? AND id IN ({})".format(','.join(["?"]*len(get.site_ids))), ("PHP", *get.site_ids)
).field("id,name,path").select()
res = {
"error": error_list,
"success": [],
}
stat = get.stat
for site in sites_info:
get_obj = public.dict_obj()
get_obj.path = site["path"]
get_obj.id = site["id"]
run_path = self.site_obj.GetRunPath(get_obj)
if not run_path:
res["errors"].append(public.returnMsg(False, "运行目录获取失败"))
filename = (site["path"] + run_path + '/.user.ini').replace("//", "/")
if stat == "close": # 关闭
if os.path.exists(filename):
new_get = public.dict_obj()
new_get.path = site["path"]
tmp = self.site_obj.SetDirUserINI(new_get)
tmp["id"] = site["id"]
tmp["name"] = site["name"]
if tmp["status"] is False:
res["error"].append(tmp)
else:
res["success"].append(tmp)
else:
res["success"].append({"id": site["id"], "name":site["name"], "status": True, "msg": "防跨站文件不存在,跳过关闭"})
else:
if not os.path.exists(filename):
new_get = public.dict_obj()
new_get.path = site["path"]
tmp = self.site_obj.SetDirUserINI(new_get)
tmp["id"] = site["id"]
tmp["name"] = site["name"]
if tmp["status"] is False:
res["error"].append(tmp)
else:
res["success"].append(tmp)
else:
res["success"].append({"id": site["id"], "name":site["name"], "status": True, "msg": "防跨站文件已存在,跳过开启"})
return public.returnResult(True, '操作成功!', res)
# 2024/12/19 16:48 批量设置防盗链
def batch_site_referer(self, get):
'''
@name 批量设置防盗链
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.fix = get.get("fix", "jpg,jpeg,gif,png,js,css")
get.domains = get.get("domains", None)
if get.domains is None:
return public.returnResult(False, '请输入域名!')
get.return_rule = get.get("return_rule", "404")
get.http_status = get.get("http_status", True)
get.status = get.get("status", True)
error_list = []
get.site_ids = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.site_ids.append(site["id"])
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.site_ids.append(site["id"])
fix = get.fix
domains = get.domains
status = get.status
return_rule = get.return_rule
http_status = get.http_status
sites_info = public.M("sites").where(
"project_type=? AND id IN ({})".format(','.join(["?"]*len(get.site_ids))), ("PHP", *get.site_ids)
).field("id,name").select()
if len(domains) < 3:
domains = ""
sites_domains = {s["id"]: domains for s in sites_info}
domains_info = public.M("domain").where(
"pid IN ({})".format(','.join(["?"]*len(sites_domains))), list(sites_domains.keys())
).field("pid,name").select()
for domain in domains_info:
sites_domains[domain["pid"]] += ',' + domain["name"]
res = {
"error": error_list,
"success": [],
}
for site in sites_info:
get_obj = public.dict_obj()
get_obj.id = site["id"]
get_obj.name = site["name"]
get_obj.fix = fix
get_obj.domains = sites_domains[site["id"]].strip(",")
get_obj.status = status
get_obj.return_rule = return_rule
get_obj.http_status = http_status
tmp = self.site_obj.SetSecurity(get_obj)
tmp["id"] = site["id"]
tmp["name"] = site["name"]
if tmp["status"] is False:
res["error"].append(tmp)
else:
res["success"].append(tmp)
return public.returnResult(True, '操作成功!', res)
# 2024/12/19 17:02 批量设置流量限制
def batch_site_limit_net(self, get):
'''
@name 批量设置流量限制
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.value = get.get("value", False)
get.perserver = get.get("perserver", 300)
get.perip = get.get("perip", 25)
get.limit_rate = get.get("limit_rate", 512)
get.type = get.get("type", 0)
get.close_limit_net = get.get("close_limit_net", 0)
error_list = []
get.site_ids = []
if get.all == 0:
for site in get.site_list:
if "id" not in site:
error_list.append({"id": None, "name": site["name"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "name" not in site:
error_list.append({"id": site["id"], "name": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.site_ids.append(site["id"])
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.site_ids.append(site["id"])
perserver = get.perserver
perip = get.perip
limit_rate = get.limit_rate
close_limit_net = get.close_limit_net
sites_info = public.M("sites").where(
"project_type=? AND id IN ({})".format(','.join(["?"]*len(get.site_ids))), ("PHP", *get.site_ids)
).field("id,name,path").select()
res = {
"error": [],
"success": [],
}
if close_limit_net in ("true", "1", 1, True):
for site in sites_info:
get_obj = public.dict_obj()
get_obj.id = site["id"]
tmp = self.site_obj.CloseLimitNet(get_obj)
tmp["id"] = site["id"]
tmp["name"] = site["name"]
if tmp["status"] is False:
res["error"].append(tmp)
else:
res["success"].append(tmp)
return res
for site in sites_info:
get_obj = public.dict_obj()
get_obj.id = site["id"]
get_obj.perserver = perserver
get_obj.perip = perip
get_obj.limit_rate = limit_rate
tmp = self.site_obj.SetLimitNet(get_obj)
tmp["id"] = site["id"]
tmp["name"] = site["name"]
if tmp["status"] is False:
res["error"].append(tmp)
else:
res["success"].append(tmp)
return public.returnResult(True, '操作成功!', res)
# 2024/12/19 17:14 批量设置伪静态
def batch_set_site_rewrite(self, get):
'''
@name 批量设置伪静态
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.rewrite_data = get.get("rewrite_data", "")
if get.all == 0:
get.sites = json.dumps(get.site_list)
result = self.site_obj.SetRewriteLists(get)
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
get.sites = []
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.sites.append({"id": site["id"], "name": site["name"], "file":"/www/server/panel/vhost/rewrite/{}.conf".format(site["name"])})
get.sites = json.dumps(get.sites)
result = self.site_obj.SetRewriteLists(get)
res = {
"error": [],
"success": [],
}
if "status" in result and not result["status"]: return result
for site in result:
if site["status"] is False:
res["error"].append({"id": site["id"], "name": site["name"], "status": False, "msg": site["msg"], "file": site["file"]})
else:
res["success"].append({"id": site["id"], "name": site["name"], "status": True, "msg": "设置成功!", "file": site["file"]})
return public.returnResult(True, '操作成功!', res)
# 2024/12/19 17:45 批量设置网站分类
def batch_set_site_type(self, get):
'''
@name 批量设置网站分类
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.id = get.get("id", 0)
get.site_ids = []
if get.all == 0:
for site in get.site_list:
if "id" not in site: continue
if "name" not in site: continue
get.site_ids.append(site["id"])
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.site_ids.append(site["id"])
get.site_ids = json.dumps(get.site_ids)
self.site_obj.set_site_type(get)
return public.returnResult(True, '设置成功!')
# 2024/12/19 17:51 批量获取网站域名
def batch_get_site_domains(self, get):
'''
@name 批量获取网站域名
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.pids = []
if get.all == 0:
for site in get.site_list:
if "id" not in site: continue
if "name" not in site: continue
get.pids.append(site["id"])
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.pids.append(site["id"])
get.pids = json.dumps(get.pids)
result = self.site_obj.get_domains(get)
return public.returnResult(True, '获取成功!', result)
# 2024/12/19 17:58 批量设置nodejs项目状态
def batch_set_nodejs_project_status(self, get):
'''
@name 批量设置nodejs项目状态
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.status = get.get("status", "start")
error_list = []
success_list = []
if get.all == 0:
for site in get.site_list:
if "project_name" not in site:
error_list.append({"project_name": None, "project_type": site["project_type"], "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
if "project_type" not in site:
error_list.append({"project_name": site["project_name"], "project_type": None, "status": False, "msg": "name传参错误,请检查网站传参是否正确!"})
continue
get.project_name = site["project_name"]
get.project_type = site["project_type"]
get.pm2_name = site["pm2_name"] if "pm2_name" in site else None
from mod.project.nodejs.comMod import main as comMod
nodejs_obj = comMod()
result = nodejs_obj.set_project_status(get)
if not result["status"]:
error_list.append({"project_name": site["project_name"], "project_type": site["project_type"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["project_name"], "project_type": site["project_type"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
if not "project_config" in site: continue
project_config = json.loads(site["project_config"])
get.project_name = site["name"]
get.project_type = project_config["project_type"] if "project_type" in project_config else "nodejs"
get.pm2_name = site["name"]
from mod.project.nodejs.comMod import main as comMod
nodejs_obj = comMod()
result = nodejs_obj.set_project_status(get)
if not result["status"]:
error_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/19 18:31 批量设置java项目状态
def batch_set_java_project_status(self, get):
'''
@name 批量设置java项目状态
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
error_list = []
success_list = []
get.status = get.get("status", "start")
if get.all == 0:
for site in get.site_list:
if "project_name" not in site:
error_list.append({"project_name": None, "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
get.project_name = site["project_name"]
from mod.project.java.projectMod import main as comMod
java_obj = comMod()
if get.status == "start":
result = java_obj.start_project(get)
elif get.status == "stop":
result = java_obj.stop_project(get)
else:
result = java_obj.restart_project(get)
if not result["status"]:
error_list.append({"project_name": site["project_name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["project_name"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
if not "project_config" in site: continue
get.project_name = site["name"]
from mod.project.java.projectMod import main as comMod
java_obj = comMod()
if get.status == "start":
result = java_obj.start_project(get)
elif get.status == "stop":
result = java_obj.stop_project(get)
else:
result = java_obj.restart_project(get)
if not result["status"]:
error_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 14:15 批量设置go项目状态
def batch_set_go_project_status(self, get):
'''
@name 批量设置go项目状态
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
error_list = []
success_list = []
get.status = get.get("status", "start")
if get.all == 0:
for site in get.site_list:
if "project_name" not in site:
error_list.append({"project_name": None, "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
get.project_name = site["project_name"]
from projectModel.goModel import main as comMod
go_obj = comMod()
if get.status == "start":
result = go_obj.start_project(get)
elif get.status == "stop":
result = go_obj.stop_project(get)
else:
result = go_obj.restart_project(get)
if not result["status"]:
error_list.append({"project_name": site["project_name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["project_name"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
if not "project_config" in site: continue
get.project_name = site["name"]
from projectModel.goModel import main as comMod
go_obj = comMod()
if get.status == "start":
result = go_obj.start_project(get)
elif get.status == "stop":
result = go_obj.stop_project(get)
else:
result = go_obj.restart_project(get)
if not result["status"]:
error_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 14:21 批量设置python项目状态
def batch_set_python_project_status(self, get):
'''
@name 批量设置python项目状态
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
error_list = []
success_list = []
get.status = get.get("status", "start")
if get.all == 0:
for site in get.site_list:
if "project_name" not in site:
error_list.append({"project_name": None, "status": False, "msg": "ID传参错误,请检查网站传参是否正确!"})
continue
get.project_name = site["project_name"]
from projectModel.pythonModel import main as comMod
python_obj = comMod()
if get.status == "start":
result = python_obj.start_project(get)
elif get.status == "stop":
result = python_obj.stop_project(get)
else:
result = python_obj.restart_project(get)
if not result["status"]:
error_list.append({"project_name": site["project_name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["project_name"], "status": True, "msg": "设置成功!"})
else:
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
if not "project_config" in site: continue
get.project_name = site["name"]
from projectModel.pythonModel import main as comMod
python_obj = comMod()
if get.status == "start":
result = python_obj.start_project(get)
elif get.status == "stop":
result = python_obj.stop_project(get)
else:
result = python_obj.restart_project(get)
if not result["status"]:
error_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": False, "msg": result["msg"]})
else:
success_list.append({"project_name": site["name"], "project_type": site["project_type"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 10:40 批量设置证书
def batch_set_site_ssl(self, get):
'''
@name 批量设置证书
'''
check_parm = self.check_parm(get)
if not check_parm["status"]: return check_parm
get.ssl_hash = get.get("ssl_hash", None)
if get.ssl_hash is None:
return public.returnResult(False, "请传入证书hash")
get.certName = get.get("certName", None)
if get.certName is None:
return public.returnResult(False, "请传入证书域名")
error_list = []
success_list = []
if get.all == 0:
for btinfo in get.site_list:
btinfo["ssl_hash"] = get.ssl_hash
btinfo["certName"] = get.certName
btinfo["siteName"] = btinfo["name"]
get.BatchInfo = get.site_list
else:
get.BatchInfo = []
all_sites_info = public.M('sites').where("project_type=?", (get.project_type,)).field('id,name,project_type').select()
for site in all_sites_info:
if not site: continue
if get.project_type != site["project_type"]: continue
if site["id"] in get.exclude_ids: continue
get.BatchInfo.append({"ssl_hash": get.ssl_hash, "siteName": site["name"], "certName": get.certName})
get.BatchInfo = json.dumps(get.BatchInfo)
import panelSSL
ssl_obj = panelSSL.panelSSL()
result = ssl_obj.SetBatchCertToSite(get)
if "status" in result and not result["status"]:
return public.returnResult(False, result["msg"])
for r in result["successList"]:
if r["status"] is False:
error_list.append({"certName": r["certName"], "siteName": r["siteName"], "status": False, "msg": r["error_msg"]})
else:
success_list.append({"certName": r["certName"], "siteName": r["siteName"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/23 17:58 批量设置php动态项目状态
def batch_set_phpsync_status(self, get, error_list, success_list):
'''
@name 批量设置php动态项目状态
'''
from mod.project.php.php_asyncMod import main as comMod
php_obj = comMod()
result = php_obj.modify_project_run_state(get)
if not result["status"]:
error_list.append({"id": get.id, "name": get.sitename, "status": False, "msg": result["msg"]})
else:
success_list.append({"id": get.id, "name": get.sitename, "status": True, "msg": "设置成功!"})
# 2024/12/20 10:40 数据库批量区域
# 2024/12/20 10:40 处理数据库通用参数
def check_db_parm(self, get):
'''
@name 处理数据库通用参数
'''
get.database_list = get.get("database_list", [])
if type(get.database_list) == str:
try:
get.database_list = json.loads(get.database_list)
except:
pass
get.all = get.get("all/d", 0)
if get.all != 0:
public.set_module_logs('全选设置数据库', 'check_db_parm', 1)
get.exclude_ids = get.get("exclude_ids", [])
if type(get.exclude_ids) == str:
try:
get.exclude_ids = json.loads(get.exclude_ids)
except:
pass
# get.project_type = get.get("project_type", None)
# if get.project_type is None:
# return public.returnResult(False, '请选择项目类型: [PHP,Node,proxy,WP2,Java,html,Go,net,Other]')
return public.returnResult(True, '检查完成!')
# 2024/12/20 10:40 批量删除数据库
def batch_delete_database(self, get):
'''
@name 批量删除数据库
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
error_list = []
success_list = []
import database
db_obj = database.database()
if get.all == 0:
for db in get.database_list:
if "id" not in db:
error_list.append({"id": None, "name": db["name"], "status": False, "msg": "ID传参错误,请检查数据库传参是否正确!"})
continue
if "name" not in db:
error_list.append({"id": db["id"], "name": None, "status": False, "msg": "name传参错误,请检查数据库传参是否正确!"})
continue
get.id = db["id"]
get.name = db["name"]
result = db_obj.DeleteDatabase(get)
if not result["status"]:
error_list.append({"id": db["id"], "name": db["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["id"], "name": db["name"], "status": True, "msg": "删除成功!"})
else:
all_dbs_info = public.M('databases').field('id,name').select()
for db in all_dbs_info:
if not db: continue
if db["id"] in get.exclude_ids: continue
get.id = db["id"]
get.name = db["name"]
result = db_obj.DeleteDatabase(get)
if not result["status"]:
error_list.append({"id": db["id"], "name": db["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["id"], "name": db["name"], "status": True, "msg": "删除成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 11:07 批量设置mysql权限
def batch_set_mysql_access(self, get):
'''
@name 批量设置mysql权限
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
get.dataAccess = get.get("dataAccess", "127.0.0.1")
get.access = get.get("access", "127.0.0.1")
get.address = get.get("address", "")
if get.all == 0:
get.name = []
for db in get.database_list:
get.name.append(db["name"])
else:
get.name = []
all_dbs_info = public.M('databases').field('id,name').select()
for db in all_dbs_info:
if not db: continue
if db["id"] in get.exclude_ids: continue
get.name.append(db["name"])
get.name = ",".join(get.name)
import database
db_obj = database.database()
return db_obj.SetDatabaseAccess(get)
# 2024/12/20 11:11 批量设置mysql的数据库分类
def batch_set_mysql_type(self, get):
'''
@name 批量设置mysql的数据库分类
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
get.id = get.get("id/d", 0)
get.db_type = "mysql"
if get.all == 0:
get.database_names = []
for db in get.database_list:
get.database_names.append(db["name"])
else:
get.database_names = []
all_dbs_info = public.M('databases').field('id,name').select()
for db in all_dbs_info:
if not db: continue
if db["id"] in get.exclude_ids: continue
get.database_names.append(db["name"])
get.database_names = ",".join(get.database_names)
import database
db_obj = database.database()
return db_obj.set_database_type_by_name(get)
# 2024/12/20 11:20 批量同步数据库到服务器
def batch_sync_db_to_server(self, get):
'''
@name 批量同步数据库到服务器
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
get.type = 1
if get.all == 0:
get.ids = []
for db in get.database_list:
get.ids.append(db["id"])
else:
get.ids = []
all_dbs_info = public.M('databases').field('id,name').select()
for db in all_dbs_info:
if not db: continue
if db["id"] in get.exclude_ids: continue
get.ids.append(db["id"])
get.ids = json.dumps(get.ids)
import database
db_obj = database.database()
return db_obj.SyncToDatabases(get)
# 2024/12/20 11:28 批量备份mysql数据库
def batch_backup_mysql(self, get):
'''
@name 批量备份mysql数据库
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
import database
db_obj = database.database()
error_list = []
success_list = []
if get.all == 0:
for db in get.database_list:
get.id = db["id"]
result = db_obj.ToBackup(get)
if not result["status"]:
error_list.append({"id": db["id"], "name": db["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["id"], "name": db["name"], "status": True, "msg": "备份成功!"})
else:
all_dbs_info = public.M('databases').field('id,name').select()
for db in all_dbs_info:
if not db: continue
if db["id"] in get.exclude_ids: continue
get.id = db["id"]
result = db_obj.ToBackup(get)
if not result["status"]:
error_list.append({"id": db["id"], "name": db["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["id"], "name": db["name"], "status": True, "msg": "备份成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 11:32 批量删除redis数据库
def batch_delete_redis(self, get):
'''
@name 批量删除redis数据库
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
get.db_idx = get.get("db_idx", None)
get.sid = get.get("sid", None)
if get.sid is None:
return public.returnResult(False, "请传入数据库sid")
error_list = []
success_list = []
from databaseModel.redisModel import main
redis_obj = main()
if get.all == 0:
for db in get.database_list:
get.key = db["key"]
result = redis_obj.del_redis_val(get)
if not result["status"]:
error_list.append({"id": get.sid, "name": db["key"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": get.sid, "name": db["key"], "status": True, "msg": "删除成功!"})
else:
db_keylist = redis_obj.get_db_keylist(get)["data"]
if not db_keylist:
return public.returnResult(False, "获取数据库列表失败")
for db in db_keylist:
if not db: continue
if db["name"] in get.exclude_ids: continue
get.key = db["name"]
result = redis_obj.del_redis_val(get)
if not result["status"]:
error_list.append({"id": db["name"], "name": db["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["name"], "name": db["name"], "status": True, "msg": "删除成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 11:48 批量删除指定sqlite
def batch_delete_sqlite(self, get):
'''
@name 批量删除指定sqlite
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
from databaseModel.sqliteModel import main
sqlite_obj = main()
error_list = []
success_list = []
if get.all == 0:
for db in get.database_list:
get.path = db["path"]
result = sqlite_obj.DeleteDatabase(get)
if not result["status"]:
error_list.append({"id": db["name"], "name": db["path"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["name"], "name": db["path"], "status": True, "msg": "删除成功!"})
else:
database_list = sqlite_obj.get_list(get)
for db in database_list:
if not db: continue
if db["path"] in get.exclude_ids: continue
get.path = db["path"]
result = sqlite_obj.DeleteDatabase(get)
if not result["status"]:
error_list.append({"id": db["name"], "name": db["path"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["name"], "name": db["path"], "status": True, "msg": "删除成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 11:55 批量备份sqlite
def batch_backup_sqlite(self, get):
'''
@name 批量备份sqlite
'''
check_parm = self.check_db_parm(get)
if not check_parm["status"]: return check_parm
from databaseModel.sqliteModel import main
sqlite_obj = main()
error_list = []
success_list = []
if get.all == 0:
for db in get.database_list:
get.path = db["path"]
result = sqlite_obj.ToBackup(get)
if not result["status"]:
error_list.append({"id": db["name"], "name": db["path"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["name"], "name": db["path"], "status": True, "msg": "备份成功!"})
else:
database_list = sqlite_obj.get_list(get)
for db in database_list:
if not db: continue
if db["path"] in get.exclude_ids: continue
get.path = db["path"]
result = sqlite_obj.ToBackup(get)
if not result["status"]:
error_list.append({"id": db["name"], "name": db["path"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": db["name"], "name": db["path"], "status": True, "msg": "备份成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 12:00 FTP批量区域
# 2024/12/20 12:02 FTP通用参数检测
def check_ftp_parm(self, get):
'''
@name FTP通用参数检测
'''
get.ftp_list = get.get("ftp_list", [])
if type(get.ftp_list) == str:
try:
get.ftp_list = json.loads(get.ftp_list)
except:
pass
get.all = get.get("all/d", 0)
if get.all != 0:
public.set_module_logs('全选设置FTP', 'check_ftp_parm', 1)
get.exclude_ids = get.get("exclude_ids", [])
if type(get.exclude_ids) == str:
try:
get.exclude_ids = json.loads(get.exclude_ids)
except:
pass
return public.returnResult(True, '检查完成!')
# 2024/12/20 11:59 批量设置ftp启用状态
def batch_set_ftp_status(self, get):
'''
@name 批量设置ftp启用状态
'''
check_parm = self.check_ftp_parm(get)
if not check_parm["status"]: return check_parm
get.status = get.get("status/d", None)
if get.status is None:
return public.returnResult(False, "请传入状态")
error_list = []
success_list = []
import ftp
ftp_obj = ftp.ftp()
if get.all == 0:
for ftp in get.ftp_list:
get.id = ftp["id"]
get.username = ftp["name"]
result = ftp_obj.SetStatus(get)
if not result["status"]:
error_list.append({"id": ftp["id"], "name": ftp["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": ftp["id"], "name": ftp["name"], "status": True, "msg": "设置成功!"})
else:
all_ftp_info = public.M('ftps').field('id,name').select()
for ftp in all_ftp_info:
if not ftp: continue
if ftp["id"] in get.exclude_ids: continue
get.id = ftp["id"]
get.username = ftp["name"]
result = ftp_obj.SetStatus(get)
if not result["status"]:
error_list.append({"id": ftp["id"], "name": ftp["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": ftp["id"], "name": ftp["name"], "status": True, "msg": "设置成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})
# 2024/12/20 12:08 批量设置ftp密码
def batch_set_ftp_password(self, get):
'''
@name 批量设置ftp密码
'''
check_parm = self.check_ftp_parm(get)
if not check_parm["status"]: return check_parm
if get.all == 0:
get.data = get.ftp_list
else:
all_ftp_info = public.M('ftps').field('id,name').select()
get.data = []
for ftp in all_ftp_info:
import random
password = ''.join(random.sample('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', 10))
get.data.append({"id": ftp["id"], "ftp_username": ftp["name"], "new_password": password})
get.data = json.dumps(get.data)
import ftp
ftp_obj = ftp.ftp()
return ftp_obj.BatchSetUserPassword(get)
# 2024/12/20 12:14 批量设置ftp分类
def batch_set_ftp_type(self, get):
'''
@name 批量设置ftp分类
'''
check_parm = self.check_ftp_parm(get)
if not check_parm["status"]: return check_parm
get.id = get.get("id/d", 0)
if get.all == 0:
get.ftp_names = []
for ftp in get.ftp_list:
get.ftp_names.append(ftp["name"])
else:
get.ftp_names = []
all_ftp_info = public.M('ftps').field('id,name').select()
for ftp in all_ftp_info:
if not ftp: continue
if ftp["id"] in get.exclude_ids: continue
get.ftp_names.append(ftp["name"])
get.ftp_names = ",".join(get.ftp_names)
import ftp
ftp_obj = ftp.ftp()
return ftp_obj.set_ftp_type_by_id(get)
# 2024/12/24 12:11 批量删除FTP账户
def batch_delete_ftp(self, get):
'''
@name 批量删除FTP账户
'''
check_parm = self.check_ftp_parm(get)
if not check_parm["status"]: return check_parm
error_list = []
success_list = []
import ftp
ftp_obj = ftp.ftp()
if get.all == 0:
for f in get.ftp_list:
get.id = f["id"]
get.username = f["name"]
result = ftp_obj.DeleteUser(get)
if not result["status"]:
error_list.append({"id": f["id"], "name": f["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": f["id"], "name": f["name"], "status": True, "msg": "删除成功!"})
else:
all_ftp_info = public.M('ftps').field('id,name').select()
for ftp in all_ftp_info:
if not ftp: continue
if ftp["id"] in get.exclude_ids: continue
get.id = ftp["id"]
get.username = ftp["name"]
result = ftp_obj.DeleteUser(get)
if not result["status"]:
error_list.append({"id": ftp["id"], "name": ftp["name"], "status": False, "msg": result["msg"]})
else:
success_list.append({"id": ftp["id"], "name": ftp["name"], "status": True, "msg": "删除成功!"})
return public.returnResult(True, '操作成功!', {"error": error_list, "success": success_list})