# coding: utf-8 # ------------------------------------------------------------------- # 宝塔Linux面板 # ------------------------------------------------------------------- # Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved. # ------------------------------------------------------------------- # Author: baozi # ------------------------------------------------------------------- # ------------------------------ # Python模型 # ------------------------------ import os, re, json, time, shutil, psutil import sys import ssh_terminal import subprocess from projectModel.base import projectBase import public from typing import Union, Dict, TextIO, Optional, Tuple, List, Set, Callable if "/www/server/panel" not in sys.path: sys.path.insert(0, "/www/server/panel") from mod.base import json_response from mod.project.python.pyenv_tool import EnvironmentManager, PythonEnvironment from urllib3.util import Url, parse_url try: from BTPanel import cache, python_env_ssh from projectModel.btpyvm import PYVM except: PYVM = None pass def _init_gvm() -> None: panel_path = "/www/server/panel" pyvm_path = "/usr/bin/pyvm" bt_py_project_env_path = "/usr/bin/py-project-env" try: if not os.path.exists(pyvm_path): real_path = '{}/class/projectModel/btpyvm.py'.format(panel_path) os.chmod(real_path, mode=0o755) os.symlink(real_path, pyvm_path) if not os.path.exists(bt_py_project_env_path): real_path = '{}/script/btpyprojectenv.sh'.format(panel_path) os.chmod(real_path, mode=0o755) os.symlink(real_path, bt_py_project_env_path) except Exception: pass _init_gvm() del _init_gvm class main(projectBase): _panel_path = public.get_panel_path() _project_path = '/www/server/python_project' _log_name = '项目管理' _pyv_path = '/www/server/pyporject_evn' _tmp_path = '/var/tmp' _logs_path = '{}/vhost/logs'.format(_project_path) _script_path = '{}/vhost/scripts'.format(_project_path) _pid_path = '{}/vhost/pids'.format(_project_path) _env_path = '{}/vhost/env'.format(_project_path) _prep_path = '{}/prep'.format(_project_path) _activate_path = '{}/active_shell'.format(_project_path) _project_logs = '/www/wwwlogs/python' _vhost_path = '{}/vhost'.format(_panel_path) _pip_source = "https://mirrors.aliyun.com/pypi/simple/" __log_split_script_py = public.get_panel_path() + '/script/run_log_split.py' _project_conf = {} _pids = None pip_source_dict = { "阿里云": "https://mirrors.aliyun.com/pypi/simple/", "清华大学": "https://pypi.tuna.tsinghua.edu.cn/simple", "中国科技大学": "https://pypi.mirrors.ustc.edu.cn/simple/", "豆瓣": "https://pypi.douban.com/simple/", "腾讯云": "https://mirrors.cloud.tencent.com/pypi/simple", "华为云": "https://mirrors.huaweicloud.com/repository/pypi/simple", "网易": "https://mirrors.163.com/pypi/simple/" } def __init__(self): if not os.path.exists(self._project_path): os.makedirs(self._project_path, mode=0o755) if not os.path.exists(self._logs_path): os.makedirs(self._logs_path, mode=0o777) if not os.path.exists(self._project_logs): os.makedirs(self._project_logs, mode=0o777) if not os.path.exists(self._pyv_path): os.makedirs(self._pyv_path, mode=0o755) if not os.path.exists(self._script_path): os.makedirs(self._script_path, mode=0o755) if not os.path.exists(self._pid_path): os.makedirs(self._pid_path, mode=0o777) if not os.path.exists(self._prep_path): os.makedirs(self._prep_path, mode=0o755) if not os.path.exists(self._env_path): os.makedirs(self._env_path, mode=0o755) if not os.path.exists(self._activate_path): os.makedirs(self._activate_path, mode=0o755) self._pids = None self._pyvm_tool = None self._environment_manager: Optional[EnvironmentManager] = None @property def pyvm(self): if PYVM is None: return None if self._pyvm_tool is None: self._pyvm_tool = PYVM() return self._pyvm_tool @property def environment_manager(self): if self._environment_manager is None: self._environment_manager = EnvironmentManager() return self._environment_manager def need_update_project(self, update_name: str): tip_file = "{}/{}.pl".format(self._project_path, update_name) if os.path.exists(tip_file): return True return False # def get_cloud_version(self, get=None): # """从云端获取Python版本 # @author baozi <202-02-22> # @param: # @return list[str] : 可用python版本列表 # """ # res = public.httpGet(public.get_url() + '/install/plugin/pythonmamager/pyv.txt') # if not res: # return {"status": False, # "msg": "请求不到官方python版本数据,请切换节点试一试.
相关教程参考:
https://www.bt.cn/bbs/thread-87257-1-1.html"} # text = res.split('\n') # pyv_data = {"v": text, "time": int(time.time())} # public.writeFile('{}/pyproject_v.txt'.format(self._tmp_path), json.dumps(pyv_data)) # return text # # def get_pyv_can_install(self): # """获取那些版本的python能安装 # @author baozi <202-02-22> # @param: # @return list[str] : 可用python版本列表 # """ # pyv_data = public.readFile('{}/pyproject_v.txt'.format(self._tmp_path)) # if not pyv_data: # return self.get_cloud_version() # try: # res: dict = json.loads(pyv_data) # if time.time() - res["time"] > 60 * 60 * 24 * 30: # return self.get_cloud_version() # else: # return res["v"] # except: # return self.get_cloud_version() # # def GetCloudPython(self, get): # """显示可以安装的python版本 # # @author baozi <202-02-22> # @param: # get (dict ): 无请求信息 # @return msg : 返回Python版本的安装情况 # """ # data = self.get_pyv_can_install() # existpy = self._get_python_v(get) # if "status" in data: # return public.returnMsg(False, data["msg"]) # v = [] # l = {} # for i in data: # i = i.strip() # if re.match(r"[\d\.]+", i): # v.append(i) # for i in v: # if i.split()[0] in existpy: # l[i] = "1" # else: # l[i] = "0" # # l = sorted(l.items(), key=lambda d: [int(i) for i in d[0].split('.')], reverse=True) # for i, v in enumerate(l): # l[i] = {"version": v[0], "installed": v[1]} # return public.return_data(True, l) # def _get_python_v(self, get): # """获取已安装的Python版本 # @author baozi <202-02-22> # @param: # get (dict ): 无请求信息 # @return list[str] : 已安装python版本列表 # """ # if get is not None and "is_pypy" in get and get.is_pypy in ("1", "true", 1, True): # path = '{}/pypy_versions'.format(self._pyv_path) # else: # path = '{}/versions'.format(self._pyv_path) # if not os.path.exists(path): # return [] # data = os.listdir(path) # return data # def GetPythonVersion(self, get): # """获取已安装的Python版本 # @author baozi <202-02-22> # @param: # get (dict ): 无请求信息 # @return list[str] : 已安装python版本列表 # """ # return self._get_python_v(get) # def InstallPythonV(self, get): # """安装新的Python # @author baozi <202-02-22> # @param: # get (dict): 请求信息,包含版本号 # @return msg : 是否安装成功 # """ # can_install = self.get_pyv_can_install() # if get.version not in can_install: # return public.returnMsg(False, '该版本尚未支持,请到论坛反馈。') # if get.version in self._get_python_v(None): # return public.returnMsg(False, "该版本已安装过,无需重复安装") # _sh = f"bash {self._panel_path}/script/get_python.sh {get.version} {public.get_url()}&> {self._logs_path}/py.log" # public.ExecShell(_sh) # path = '{}/versions/{}/bin/'.format(self._pyv_path, get.version) # if "2.7" in get.version: # path = path + "python" # else: # path = path + "python3" # if os.path.exists(path): # # public.writeFile(f"{self._logs_path}/py.log", "") # return public.returnMsg(True, "安装成功!") # return public.returnMsg(False, "安装失败!{}".format(path)) # def install_pip(self, vpath, pyv, is_pypy=False): # """安装包管理工具pip # @author baozi <202-02-22> # @param: # vpath (str): Python环境地址 # pyv (str): Python版本 # @return # """ # if self.pyvm is not None: # self.pyvm.is_pypy = is_pypy # self.pyvm.re_install_pip_tools(pyv, vpath) # return # # # 以下部分不再使用,使用pyvm安装pip # if [int(i) for i in pyv.split('.')] > [3, 6]: # pyv = "3.6" # # _pyv = pyv.split('.')[1] # _sh = f'bash {self._panel_path}/script/get_python.sh {pyv} {public.get_url()} {vpath} &>> {self._logs_path}/py.log' # public.ExecShell(_sh) # res = public.ExecShell("{}/bin/pip3 -V".format(vpath))[0] # if res.find(vpath) == -1: # public.ExecShell("rm -rf {}/bin/pip*".format(vpath)) # public.ExecShell("rm -rf {}/lib/python{}/site-packages/pip*".format(vpath, pyv)) # public.ExecShell("rm -rf {}/bin/python3 -m ensurepip --default-pip") # def __write_log(self, pjname, msg): # """写日志 # @author baozi <202-02-22> # @param: # pjname ( str ) : 项目名称 # msg ( dict ): 需要写入的日志信息 # @return # """ # path = f"{self._logs_path}/{pjname}.log" # localtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) # if not os.path.exists(path): # public.ExecShell("touch %s" % path) # public.writeFile(path, localtime + "\n" + msg + "\n", "a+") def RemovePythonV(self, get): """卸载面板安装的Python @author baozi <202-02-22> @param: get ( dict ): 请求信息,包含要删除的版本信息 @return msg : 是否删除成功 """ v = get.version.split()[0] if "is_pypy" in get and get.is_pypy in ("1", "true", 1, True): path = '{}/pypy_versions'.format(self._pyv_path) else: path = '{}/versions'.format(self._pyv_path) if not os.path.exists(path): return public.returnMsg(True, "卸载Python成功") python_bin = "{}/{}/bin/python".format(path, v) if not os.path.exists(python_bin): python_bin = "{}/{}/bin/python3".format(path, v) if not os.path.exists(python_bin): return public.returnMsg(False, "Python版本不存在") res = EnvironmentManager().multi_remove_env(os.path.realpath(python_bin)) return res def _get_project_conf(self, name_id) -> Union[Dict, bool]: """获取项目的配置信息 @author baozi <202-02-22> @param: name_id ( str|id ): 项目名称或者项目id @return dict_onj: 项目信息 """ if isinstance(name_id, int): _id = name_id _name = None else: _id = None _name = name_id data = public.M('sites').where('project_type=? AND (name = ? OR id = ?)', ('Python', _name, _id)).field( 'name,path,status,project_config').find() if not data: return False project_conf = json.loads(data['project_config']) if "env_list" not in project_conf: project_conf["env_list"] = [] if "env_file" not in project_conf: project_conf["env_file"] = "" if "call_app" not in project_conf: project_conf["call_app"] = "" if not os.path.exists(data["path"]): self.__stop_project(project_conf) return project_conf def _get_vp_pip(self, vpath): """获取虚拟环境下的pip @author baozi <202-02-22> @param: vpath ( str ): 虚拟环境位置 @return str : pip 位置 """ if os.path.exists('{}/bin/pip'.format(vpath)): return '{}/bin/pip'.format(vpath) else: return '{}/bin/pip3'.format(vpath) def _get_vp_python(self, vpath): """获取虚拟环境下的python解释器 @author baozi <202-02-22> @param: vpath ( str ): 虚拟环境位置 @return str : python解释器 位置 """ if os.path.exists('{}/bin/python'.format(vpath)): return '{}/bin/python'.format(vpath) else: return '{}/bin/python3'.format(vpath) @staticmethod def _check_port(port: str): """检查端口是否合格 @author baozi <202-02-22> @param port ( str ): 端口号 @return [bool,msg]: 结果 + 错误信息 """ try: if 0 < int(port) < 65535: data = public.ExecShell("ss -nultp|grep ':%s '" % port)[0] if data: return False, "该端口已经被占用" else: return True, "" else: return False, "请输入正确的端口范围 1 < 端口 < 65535" except ValueError: return False, "端口请输入整数" @staticmethod def _check_project_exist(project_name): """检查项目是否存在 @author baozi <202-02-22> @param: pjname ( str ): 项目名称 path ( str ): 项目路径 @return bool : 返回验证结果 """ data = public.M('sites').where('name=?', (project_name,)).field('id').find() return bool(data) @staticmethod def _check_project_path_exist(path=None): """检查项目地址是否存在 @author baozi <202-02-22> @param: pjname ( str ): 项目名称 path ( str ): 项目路径 @return bool : 返回验证结果 """ data = public.M('sites').where('path=? ', (path,)).field('id').find() return bool(data) @staticmethod def __check_feasibility(values): """检查用户部署方式的可行性 @author baozi <202-02-22> @param: values ( dict ): 用户输入参数的规范化数据 @return msg """ re_v = re.compile(r"\s+(?P[23]\.\d+(\.\d+)?)\s*") version_res = re_v.search(values["version"]) if not version_res: return None version = version_res.group("ver") xsgi = values["xsgi"] framework = values["framework"] stype = values["stype"] if framework == "sanic" and [int(i) for i in version.split('.')[:2]] < [3, 7]: return "sanic框架不支持python3.7以下的版本" if xsgi == "asgi" and stype == "uwsgi": return "uWsgi服务框架不支持asgi协议" def simple_prep_env(self, values: dict) -> Optional[bool]: """ 准备python虚拟环境和服务器应用 """ log_path: str = "{}/{}.log".format(self._logs_path, values['pjname']) fd = open(log_path, 'w') fd.flush() py_env = EnvironmentManager().get_env_py_path(values.get("python_bin", "")) if not py_env: fd.write("|- 环境丢失,无法继续初始化python环境。") fd.flush() fd.close() return False def call_log(log: str) -> None: if log[-1] != "\n": log += "\n" fd.write(log) fd.flush() try: # 安装服务器依赖 call_log("\n|- 开始安装托管服务依赖库.\n") py_env.init_site_server_pkg(call_log=call_log) py_env.use2project(values['pjname']) # 安装第三方依赖 self.install_requirement(values, py_env, call_log=call_log) self.__prepare_start_conf(values, pyenv=py_env) call_log("\n|- 配置文件输出成功\n") initialize = values.get("initialize", '') if initialize: call_log("\n|- 开始执行项目初始化命令.......\n") if values.get("env_list", None) or values.get("env_file", None): env_file = "{}/{}.env".format(self._env_path, values["pjname"]) initialize = "source {env_file} \n".format(env_file=env_file) + initialize py_env.exec_shell(initialize, call_log=call_log, user=values.get("user", "root")) call_log("\n|- 项目初始化命令执行结束-------\n") # 先尝试启动 conf = self._get_project_conf(values['pjname']) self.__start_project(conf) call_log("\n|- 已尝试启动项目\n") for k, v in values.items(): # 更新配置文件 if k not in conf: conf[k] = v pdata = { "project_config": json.dumps(conf) } public.M('sites').where('name=?', (values['pjname'].strip(),)).update(pdata) fd.close() except: import traceback if not fd.closed: fd.write(traceback.format_exc()) fd.write("\n|- 环境准备失败\n") fd.close() return True def install_requirement(self, values: dict, pyenv: PythonEnvironment, call_log: Callable[[str], None]): if "requirement_path" in values and values["requirement_path"] is not None: call_log("\n|- 开始安装需求包....\n") requirement_data = public.read_rare_charset_file(values['requirement_path']) if not isinstance(requirement_data, str): call_log("\n|- 未识别到安装包信息\n") list_sh = [] list_normative_pkg = [] for i in requirement_data.split("\n"): tmp_data = i.strip() if not tmp_data or tmp_data.startswith("#"): continue if re.search(r"-e\s+\.{0,2}/", tmp_data): # 本地库依赖且为可编辑模式的不安装 continue tmp_env = "" if tmp_data.find("-e") != -1: tmp_env += "cd {}\n".format(values["path"]) if tmp_data.find("git+") != -1: tmp_sh = tmp_env + "{} install {}".format(pyenv.pip_bin(), tmp_data) rep_name_list = [re.compile(r"#egg=(?P\S+)"), re.compile(r"/(?P\S+\.git)")] name = tmp_data for tmp_rep in rep_name_list: tmp_name = tmp_rep.search(tmp_data) if tmp_name: name = tmp_name.group("name") break list_sh.append((name, tmp_sh)) elif tmp_data.find("file:") != -1: tmp_sh = tmp_env + "{} install {}".format(pyenv.pip_bin(), tmp_data) list_sh.append((tmp_data.split("file:", 1)[1], tmp_sh)) else: if tmp_data.find("==") != -1: pkg_name, pkg_version = tmp_data.split("==") elif tmp_data.find(">=") != -1: pkg_name, pkg_version = tmp_data.split(">=") else: pkg_name, pkg_version = tmp_data, "" list_normative_pkg.append((pkg_name, pkg_version)) length = len(list_sh) + len(list_normative_pkg) for idx, (name, tmp_sh) in enumerate(list_sh): call_log("\n|- ({}/{})开始安装【{}】...\n".format(idx + 1, length, name)) pyenv.exec_shell(tmp_sh, call_log=call_log) for idx, (name, pkg_version) in enumerate(list_normative_pkg): call_log("\n|- ({}/{})开始安装【{}】...\n".format(idx + len(list_sh) + 1, length, name)) pyenv.pip_install(name, pkg_version, call_log=call_log) call_log("\n|- 需求包安装执行完毕....\n") def re_prep_env(self, get: public.dict_obj): name = get.name.strip() project_info = self.get_project_find(name) if not project_info: return public.returnMsg(False, "项目不存在") project_conf = project_info['project_config'] prep_status = self.prep_status(project_conf) if prep_status == "complete": return public.returnMsg(False, "项目准备已完成,无需再次准备") if prep_status == "running": return public.returnMsg(False, "项目准备中,不能再次开启准备") self.run_simple_prep_env(project_info["id"], project_conf) time.sleep(0.5) return public.returnMsg(True, "已重新进行准备,请等待准备完成。") @staticmethod def exec_shell(sh_str: str, out: TextIO, timeout=None, user=None): if user: import pwd res = pwd.getpwnam(user) uid = res.pw_uid gid = res.pw_gid def preexec_fn(): os.setgid(gid) os.setuid(uid) else: preexec_fn = None p = subprocess.Popen(sh_str, stdout=out, stderr=out, shell=True, preexec_fn=preexec_fn) p.wait(timeout=timeout) return def run_simple_prep_env(self, project_id: int, project_conf: dict) -> Tuple[bool, str]: prep_pid_file = "{}/{}.pid".format(self._prep_path, project_conf["pjname"]) if os.path.exists(prep_pid_file): pid = public.readFile(prep_pid_file) try: ps = psutil.Process(int(pid)) if ps.is_running(): return False, "项目准备中,不能再次开启准备" except: pass os.remove(prep_pid_file) tmp_sh = "nohup {}/pyenv/bin/python3 {}/script/py_project_env.py {} &> /dev/null & \necho $! > {}".format( self._panel_path, self._panel_path, project_id, prep_pid_file ) res = public.ExecShell(tmp_sh) return True, "" def prep_status(self, project_conf: dict): try: prep_pid_file = "{}/{}.pid".format(self._prep_path, project_conf["pjname"]) pid = public.readFile(prep_pid_file) if isinstance(pid, str): ps = psutil.Process(int(pid)) if ps.is_running() and os.path.samefile(ps.exe(), "/www/server/panel/pyenv/bin/python3") and \ any("script/py_project_env.py" in tmp for tmp in ps.cmdline()): return "running" except: pass v_path = project_conf["vpath"] v_pip: str = self._get_vp_pip(v_path) v_python: str = self._get_vp_python(v_path) if not os.path.exists(v_path) or not os.path.exists(v_python) or not os.path.exists(v_pip): return "failure" return "complete" # 检查输入参数 def __check_args(self, get): """检查输入的参数 @author baozi <202-02-22> @param: get ( dict ): 创建Python项目时的请求 @return dict : 规范化的请求参数 参数列表: pjname port stype path user requirement_path env_list env_file framework 可能有: # venv_path # version venv_path 和 version 替换为 python_bin initialize project_cmd xsgi rfile call_app is_pypy logpath auto_run """ project_cmd = "" xsgi = "wsgi" rfile = "" call_app = "app" user = "root" initialize = "" try: pjname = get.pjname.strip() port = get.port stype = get.stype.strip() path = get.path.strip().rstrip("/") python_bin = get.get("python_bin/s", "") if not python_bin or not os.path.exists(python_bin): return False, public.returnMsg(False, "python环境选择错误") if "user" in get and get.user.strip(): user = get.user.strip() if "requirement_path" in get and get.requirement_path.strip(): requirement_path = get.requirement_path.strip() else: requirement_path = None if "env_list" in get and get.env_list: if isinstance(get.env_list, str): env_list = json.loads(get.env_list.strip()) else: env_list = get.env_list else: env_list = [] if "env_file" in get and get.env_file.strip(): env_file = get.env_file.strip() else: env_file = None if "framework" in get and get.framework.strip(): framework = get.framework.strip() else: framework = 'python' if "project_cmd" in get and get.project_cmd.strip(): project_cmd = get.project_cmd.strip() if "xsgi" in get and get.xsgi.strip(): if get.xsgi.strip() not in ("wsgi", "asgi"): xsgi = "wsgi" else: xsgi = get.xsgi.strip() if "rfile" in get and get.rfile.strip(): rfile = get.rfile.strip() if not os.path.exists: return False, public.returnMsg(False, "项目启动文件不存在") if "call_app" in get and get.call_app.strip(): call_app = get.call_app.strip() if "initialize" in get and get.initialize.strip(): initialize = get.initialize.strip() except: return False, public.returnMsg(False, "参数错误") danger_cmd_list = [ 'rm', 'rmi', 'kill', 'init', 'shutdown', 'reboot', 'chmod', 'chown', 'dd', 'fdisk', 'killall', 'mkfs', 'mkswap', 'mount', 'swapoff', 'swapon', 'umount', 'userdel', 'usermod', 'passwd', 'groupadd', 'groupdel', 'groupmod', 'chpasswd', 'chage', 'usermod', 'useradd', 'userdel', 'pkill' ] name_rep = re.compile(r"""[\\/:*<|>"'#&$^)(]+""") if name_rep.search(pjname): return False, public.returnMsg(False, "项目名称不能包含特殊字符") # 命令行启动跳过端口检测 flag, msg = (True, "") if stype == "command" and port == "" else self._check_port(port) if not flag: return False, public.returnMsg(False, msg) if stype not in ("uwsgi", "gunicorn", "command"): return False, public.returnMsg(False, "运行方式选择错误") if not os.path.isdir(path): return False, public.returnMsg(False, "项目路径不存在") if user not in self.get_system_user_list(): return False, public.returnMsg(False, "用户名不存在") if not isinstance(env_list, list): return False, public.returnMsg(False, "环境变量格式错误") if env_file and not os.path.isfile(env_file): return False, public.returnMsg(False, "环境变量文件不存在") if initialize: for d_cmd in danger_cmd_list: if re.search(r"\s+%s\s+" % d_cmd, project_cmd): return False, public.returnMsg(False, "当前初始化操作中存在危险命令:{}".format(d_cmd)) is_pypy = False if "is_pypy" in get: is_pypy = get.is_pypy in ("1", "true", 1, True, "True") em = EnvironmentManager() env = em.get_env_py_path(python_bin) if not env: return False, public.returnMsg(False, "未找到指定运行环境") auto_run = False if "auto_run" in get: auto_run = get.auto_run in ("1", "true", 1, True, "True") if "logpath" not in get or not get.logpath.strip(): logpath = os.path.join(self._project_logs, pjname) else: logpath = get.logpath.strip() if not os.path.exists(logpath): logpath = os.path.join(self._project_logs, pjname) # 对run_file 进行检查 if stype == "command": if not project_cmd: return False, public.returnMsg(False, "缺少必要的启动命令") else: if not xsgi or not rfile or not call_app: return False, public.returnMsg(False, "缺少必要的服务器托管启动参数") if requirement_path and not os.path.isfile(requirement_path): return False, public.returnMsg(False, "未找到指定依赖包文件【requirement.txt】") if self._check_project_exist(pjname): return False, public.returnMsg(False, "项目已经存在") if self._check_project_path_exist(path): return False, public.returnMsg(False, "该路径已存在其他项目") return True, { "pjname": pjname, "port": port, "stype": stype, "path": path, "user": user, "requirement_path": requirement_path, "env_list": env_list, "env_file": env_file, "framework": framework, "vpath": os.path.dirname(os.path.dirname(env.bin_path)), "version": env.version, "python_bin": env.bin_path, "project_cmd": project_cmd, "xsgi": xsgi, "rfile": rfile, "call_app": call_app, "auto_run": auto_run, "logpath": logpath, "is_pypy": is_pypy, "initialize": initialize, } def CreateProject(self, get): """创建Python项目 @author baozi <202-02-22> @param: get ( dict ): 请求信息 @return test : 创建情况 """ # 检查输入参数 flag, values = self.__check_args(get) if not flag: return values public.set_module_logs("create_python_project", "create") # 检查服务器部署的可行性 msg = self.__check_feasibility(values) if msg: return public.returnMsg(False, msg) # 默认不开启映射,不绑定外网 values["domains"], values["bind_extranet"] = [], 0 # 默认进程数与线程数 values["processes"], values["threads"] = 4, 2 # 默认日志等级info values["loglevel"] = "info" # 默认uwsgi使用http values['is_http'] = "is_http" p_data = { "name": values["pjname"], "path": values["path"], "ps": values["pjname"], "status": 1, 'type_id': 0, "project_type": "Python", "addtime": public.getDate(), "project_config": json.dumps(values) } res = public.M("sites").insert(p_data) if isinstance(res, str) and res.startswith("error"): return public.returnMsg(False, "项目记录失败,请联系官方") self.run_simple_prep_env(res, values) time.sleep(0.5) # 返回信息 public.WriteLog(self._log_name, "添加Python项目{}".format(values["pjname"])) flag, tip = self._release_firewall(get) tip = "" if flag else "
" + tip return public.returnMsg(True, "项目添加成功" + tip) def __prepare_start_conf(self, values, force=False, pyenv: Optional[PythonEnvironment]=None): """准备启动的配置文件,python运行不需要,uwsgi和gunicorn需要 @author baozi <202-02-22> @param: values ( dict ): 用户传入的参数 @return : """ # 加入默认配置 if pyenv is None: pyenv = EnvironmentManager().get_env_py_path(values.get("python_bin", values.get("vpath"))) public.print_log(pyenv.to_dict()) if not pyenv: return values["user"] = values['user'] if 'user' in values else 'root' values["processes"] = values['processes'] if 'processes' in values else 4 values["threads"] = values['threads'] if 'threads' in values else 2 if not os.path.isdir(values['logpath']): os.makedirs(values['logpath'], mode=0o777) env_file = "{}/{}.env".format(self._env_path, values["pjname"]) self._build_env_file(env_file, values) self.__prepare_uwsgi_start_conf(values, pyenv, force) self.__prepare_gunicorn_start_conf(values, pyenv, force) if "project_cmd" not in values: values["project_cmd"] = '' self.__prepare_cmd_start_conf(values, pyenv, force) self.__prepare_python_start_conf(values, pyenv, force) @staticmethod def _get_callable_app(project_config: dict): callable_app = "application" if project_config['framework'] == "django" else "app" data = public.read_rare_charset_file(project_config.get("rfile", "")) if isinstance(data, str): re_list = ( re.compile(r"\s*(?P\w+)\s*=\s*(make|create)_?app(lication)?", re.M | re.I), re.compile(r"\s*(?Papp|application)\s*=\s*", re.M | re.I), re.compile(r"\s*(?P\w+)\s*=\s*(Flask\(|flask\.Flask\()", re.M | re.I), re.compile(r"\s*(?P\w+)\s*=\s*(Sanic\(|sanic\.Sanic\()", re.M | re.I), re.compile(r"\s*(?P\w+)\s*=\s*get_wsgi_application\(\)", re.M | re.I), re.compile(r"\s*(?P\w+)\s*=\s*(FastAPI\(|fastapi\.FastAPI\()", re.M | re.I), re.compile(r"\s*(?P\w+)\s*=\s*.*web\.Application\(", re.M | re.I), re.compile(r"\s*(?Pserver|service|web|webserver|web_server|http_server|httpserver)\s*=\s*", re.M | re.I), ) for i in re_list: res = i.search(data) if not res: continue callable_app = res.group("app") break return callable_app def __prepare_uwsgi_start_conf(self, values, pyenv: PythonEnvironment, force=False): # uwsgi if not values["rfile"]: return uwsgi_file = "{}/uwsgi.ini".format(values['path']) cmd_file = "{}/{}_uwsgi.sh".format(self._script_path, values["pjname"]) if not force and os.path.exists(uwsgi_file) and os.path.exists(cmd_file): return template_file = "{}/template/python_project/uwsgi_conf.conf".format(self._vhost_path) values["is_http"] = values["is_http"] if "is_http" in values else True env_file = "{}/{}.env".format(self._env_path, values["pjname"]) if "call_app" not in values or not values["call_app"]: callable_app = self._get_callable_app(values) else: callable_app = values["call_app"] if not os.path.exists(uwsgi_file): config_body: str = public.readFile(template_file) config_body = config_body.format( path=values["path"], rfile=values["rfile"], processes=values["processes"], threads=values["threads"], is_http="" if values["is_http"] else "#", is_socket="#" if values["is_http"] else "", port=values["port"], user=values["user"], logpath=values['logpath'], app=callable_app, ) public.writeFile(uwsgi_file, config_body) pid_file = "{}/{}.pid".format(self._pid_path, values["pjname"]) _sh = "%s -d --ini %s/uwsgi.ini --pidfile='%s'" % (pyenv.uwsgi_bin() or "uwsgi", values['path'], pid_file) values["start_sh"] = _sh self._create_cmd_file( cmd_file=cmd_file, v_ptah_bin=os.path.dirname(self._get_vp_python(values['vpath'])), project_path=values["path"], command=_sh, log_file="{}/uwsgi.log".format(values["logpath"]), pid_file="/dev/null", env_file=env_file, activate_sh= pyenv.activate_shell(), evn_name=public.Md5(values["pjname"]), ) def __prepare_gunicorn_start_conf(self, values, pyenv: PythonEnvironment, force=False): # gunicorn if not values["rfile"]: return gconf_file = "{}/gunicorn_conf.py".format(values['path']) cmd_file = "{}/{}_gunicorn.sh".format(self._script_path, values["pjname"]) if not force and os.path.exists(gconf_file) and os.path.exists(cmd_file): return worker_class = "sync" if values["xsgi"] == "wsgi" else 'uvicorn.workers.UvicornWorker' template_file = "{}/template/python_project/gunicorn_conf.conf".format(self._vhost_path) values["loglevel"] = values["loglevel"] if "loglevel" in values else "info" if not os.path.exists(gconf_file): config_body: str = public.readFile(template_file) config_body = config_body.format( path=values["path"], processes=values["processes"], threads=values["threads"], user=values["user"], worker_class=worker_class, port=values["port"], logpath=values['logpath'], loglevel=values["loglevel"] ) public.writeFile(gconf_file, config_body) error_log = '{}/gunicorn_error.log'.format(values["logpath"]) access_log = '{}/gunicorn_acess.log'.format(values["logpath"]) if not os.path.isfile(error_log): public.writeFile(error_log, "") if not os.path.isfile(access_log): public.writeFile(access_log, "") self._pass_dir_for_user(values["logpath"], values["user"]) public.set_own(error_log, values["user"]) public.set_own(access_log, values["user"]) _app = values['rfile'].replace((values['path'] + "/"), "")[:-3] _app = _app.replace("/", ".") if "call_app" not in values or not values["call_app"]: callable_app = self._get_callable_app(values) else: callable_app = values["call_app"] _app += ":" + callable_app _sh = "%s -c %s/gunicorn_conf.py %s " % (pyenv.gunicorn_bin() or "gunicorn", values['path'], _app) values["start_sh"] = _sh pid_file = "{}/{}.pid".format(self._pid_path, values["pjname"]) env_file = "{}/{}.env".format(self._env_path, values["pjname"]) self._create_cmd_file( cmd_file=cmd_file, v_ptah_bin=os.path.dirname(self._get_vp_python(values['vpath'])), project_path=values["path"], command=_sh, log_file=error_log, pid_file=pid_file, env_file=env_file, activate_sh= pyenv.activate_shell(), evn_name=public.Md5(values["pjname"]), ) def __prepare_cmd_start_conf(self, values, pyenv: PythonEnvironment, force=False): if "project_cmd" not in values or not values["project_cmd"]: return cmd_file = "{}/{}_cmd.sh".format(self._script_path, values["pjname"]) if not force and os.path.exists(cmd_file): return pid_file = "{}/{}.pid".format(self._pid_path, values["pjname"]) log_file = values['logpath'] + "/error.log" env_file = "{}/{}.env".format(self._env_path, values["pjname"]) self._create_cmd_file( cmd_file=cmd_file, v_ptah_bin=os.path.dirname(self._get_vp_python(values['vpath'])), project_path=values["path"], command=values["project_cmd"], log_file=log_file, pid_file=pid_file, env_file=env_file, activate_sh= pyenv.activate_shell(), evn_name=public.Md5(values["pjname"]), ) values["start_sh"] = values["project_cmd"] def __prepare_python_start_conf(self, values, pyenv: PythonEnvironment, force=False): if not values["rfile"]: return cmd_file = "{}/{}_python.sh".format(self._script_path, values["pjname"]) if not force and os.path.exists(cmd_file): return pid_file = "{}/{}.pid".format(self._pid_path, values["pjname"]) env_file = "{}/{}.env".format(self._env_path, values["pjname"]) self._build_env_file(env_file, values) log_file = (values['logpath'] + "/error.log").replace("//", "/") v_python = self._get_vp_python(values['vpath']) command = "{vpath} -u {run_file} {parm} ".format( vpath=v_python, run_file=values['rfile'], parm=values.get("parm", "") ) self._create_cmd_file( cmd_file=cmd_file, v_ptah_bin=os.path.dirname(v_python), project_path=values["path"], command=command, log_file=log_file, pid_file=pid_file, env_file=env_file, activate_sh= pyenv.activate_shell(), evn_name=public.Md5(values["pjname"]), ) values["start_sh"] = command @staticmethod def _build_env_file(env_file: str, values: dict): env_body_list = [] if "env_file" in values and values["env_file"] and os.path.isfile(values["env_file"]): env_body_list.append("source {}\n".format(values["env_file"])) if "env_list" in values: for tmp in values["env_list"]: if "k" not in tmp or "v" not in tmp: continue env_body_list.append("export {}={}\n".format(tmp["k"], tmp["v"])) public.writeFile(env_file, "".join(env_body_list)) @staticmethod def _create_cmd_file(cmd_file, v_ptah_bin, project_path, command, log_file, pid_file, env_file, activate_sh='', evn_name=""): start_cmd = '''#!/bin/bash PATH={v_ptah_bin}:{project_path}:/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin export PATH export BT_PYTHON_SERVICE_SID={sid} {activate_sh} source {env_file} cd {project_path} nohup {command} &>> {log_file} & echo $! > {pid_file}'''.format( v_ptah_bin=v_ptah_bin, activate_sh=activate_sh, project_path=project_path, command=command, log_file=log_file, pid_file=pid_file, env_file=env_file, sid=evn_name, ) public.writeFile(cmd_file, start_cmd) def _get_cmd_file(self, project_conf): cmd_file_map = { "python": "_python.sh", "uwsgi": "_uwsgi.sh", "gunicorn": "_gunicorn.sh", "command": "_cmd.sh", } cmd_file = "{}/{}{}".format(self._script_path, project_conf["pjname"], cmd_file_map[project_conf["stype"]]) if project_conf["stype"] == "uwsgi": data = public.readFile(cmd_file) if data and "--pidfile" not in data: os.remove(cmd_file) return cmd_file @staticmethod def get_project_pids(pid): """ @name 获取项目进程pid列表 @author baozi<2021-08-10> @param pid: int 主进程pid @return list """ try: p = psutil.Process(pid) return [p.pid] + [c.pid for c in p.children(recursive=True) if p.status() != psutil.STATUS_ZOMBIE] except: return [] def get_project_run_state(self, project_name): ''' @name 获取项目运行状态 @author hwliang<2021-08-12> @param project_name 项目名称 @return bool ''' pid_file = "{}/{}.pid".format(self._pid_path, project_name) pid = public.readFile(pid_file) project_data = self.get_project_find(project_name) if isinstance(pid, str): try: pid = int(pid) psutil.Process(pid) except: pid = self._get_pid_by_env_name(project_data) if not pid: pid = self._get_pid_by_command(project_data) else: pid = self._get_pid_by_env_name(project_data) if not pid: pid = self._get_pid_by_command(project_data) if not pid: return [] pids = self.get_project_pids(pid=pid) if not pids: return [] return pids @staticmethod def other_service_pids(project_data: dict) -> Set[int]: from mod.project.python.serviceMod import ServiceManager s_mgr = ServiceManager(project_data["name"], project_data["project_config"]) return s_mgr.other_service_pids() def _get_pid_by_command(self, project_data: dict) -> Optional[int]: project_config = project_data["project_config"] v_path = project_config['vpath'] runfile = project_config['rfile'] path = project_config['path'] stype = project_config["stype"] pids = [] try: if stype == "python": for i in psutil.process_iter(['pid', 'exe', 'cmdline']): try: if i.status() == "zombie": continue if v_path in i.exe() and runfile in " ".join(i.cmdline()): pids.append(i.pid) except: pass elif stype in ("uwsgi", "gunicorn"): for i in psutil.process_iter(['pid', 'exe', 'cmdline']): try: if i.status() == "zombie": continue if v_path in i.exe() and stype in i.exe() and \ path in " ".join(i.cmdline()) and stype in " ".join(i.cmdline()): pids.append(i.pid) except: pass elif stype == "command": for i in psutil.process_iter(['pid', 'exe']): try: if i.status() == "zombie": continue if v_path in i.exe() and i.cwd().startswith(path.rstrip("/")): pids.append(i.pid) except: pass else: return None except: return None running_pid = [] other_service_pids = self.other_service_pids(project_data) for pid in pids: if pid in psutil.pids() and pid not in other_service_pids: running_pid.append(pid) if len(running_pid) == 1: pid_file = "{}/{}.pid".format(self._pid_path, project_data["name"]) public.writeFile(pid_file, str(running_pid[0])) return running_pid[0] main_pid = [] for pid in running_pid: try: p = psutil.Process(pid) if p.ppid() not in running_pid: main_pid.append(pid) except: pass if len(main_pid) == 1: pid_file = "{}/{}.pid".format(self._pid_path, project_data["name"]) public.writeFile(pid_file, str(main_pid[0])) return main_pid[0] return None def _get_pid_by_env_name(self, project_data: dict): env_key = "BT_PYTHON_SERVICE_SID={}".format(public.Md5(project_data["name"])) pid_file = "{}/{}.pid".format(self._pid_path, project_data["name"]) target_list = [] for p in psutil.pids(): try: data: str = public.readFile("/proc/{}/environ".format(p)) if data.rfind(env_key) != -1: target_list.append(p) except: continue main_pid = 0 for i in target_list: try: p = psutil.Process(i) if p.ppid() not in target_list: main_pid = i except: continue if main_pid: public.writeFile(pid_file, str(main_pid)) return main_pid return None def __start_project(self, project_conf, reconstruction=False): """启动 项目 @author baozi <202-02-22> @param: project_conf ( dict ): 站点配置 reconstruction ( bool ): 是否重写启动指令 @return bool : 是否启动成功 """ if self.get_project_run_state(project_name=project_conf["pjname"]): return True uwsgi_file = "{}/uwsgi.ini".format(project_conf['path']) gconf_file = "{}/gunicorn_conf.py".format(project_conf['path']) cmd_file = self._get_cmd_file(project_conf) if not os.path.exists(cmd_file) or not os.path.exists(uwsgi_file) or not os.path.exists(gconf_file): self.__prepare_start_conf(project_conf) pid_file = "{}/{}.pid".format(self._pid_path, project_conf["pjname"]) if os.path.exists(pid_file): os.remove(pid_file) run_user = project_conf["user"] public.ExecShell("chown -R {}:{} {}".format(run_user, run_user, project_conf["path"])) public.set_mode(cmd_file, 755) public.set_mode(self._pid_path, 777) public.set_own(cmd_file, run_user) # 处理日志文件 log_file = self._project_logfile(project_conf) if not os.path.exists(log_file): public.ExecShell("touch {}".format(log_file)) public.ExecShell("chown {}:{} {}".format(run_user, run_user, log_file)) self._pass_dir_for_user(os.path.dirname(log_file), run_user) # 让进程至少可以访问到日志文件 self._pass_dir_for_user(os.path.dirname(project_conf["path"]), run_user) # 让进程至少可以访问到程序文件 # 执行脚本文件 if project_conf["stype"] in ("uwsgi", "gunicorn"): res = public.ExecShell("{}".format(cmd_file), env=os.environ.copy()) else: res = public.ExecShell("{}".format(cmd_file), user=run_user, env=os.environ.copy()) time.sleep(1) if self._pids: self._pids = None # 清理缓存重新检查 if self.get_project_run_state(project_name=project_conf["pjname"]): return True return False def only_start_main_project(self, project_name): """启动项目api接口 @author baozi <202-02-22> @param: get ( dict ): 请求信息,包含name @return msg: 启动情况信息 """ project_conf = self._get_project_conf(name_id=project_name) if not project_conf: return public.returnMsg(False, "没有该项目,请尝试刷新页面") if self.prep_status(project_conf) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") if "port" in project_conf and project_conf["port"]: flag, msg = self._check_port(project_conf["port"]) if not flag: return public.returnMsg(False, msg) if not os.path.exists(project_conf["path"]): return public.returnMsg(False, "项目文件丢失,无法启动") flag = self.__start_project(project_conf) pdata = { "project_config": json.dumps(project_conf) } public.M('sites').where('name=?', (project_name,)).update(pdata) if flag: self.start_by_user(self.get_project_find(project_name)["id"]) return public.returnMsg(True, "项目启动成功") else: return public.returnMsg(False, "项目启动失败") def StartProject(self, get): project_name = None if hasattr(get, "name"): project_name = get.name.strip() if hasattr(get, "project_name"): project_name = get.project_name.strip() if not project_name: return public.returnMsg(False, "请选择要启动的项目") project_find = self.get_project_find(project_name) if not project_find: return public.returnMsg(False, "没有该项目,请尝试刷新页面后重试") # 2024.4.3 修复项目过期时间判断不对 mEdate = time.strftime('%Y-%m-%d', time.localtime()) if project_find['edate'] != "0000-00-00" and project_find['edate'] < mEdate: return public.return_error('当前项目已过期,请重新设置项目到期时间') from mod.project.python.serviceMod import ServiceManager s_mgr = ServiceManager.new_mgr(project_name) if isinstance(s_mgr, str): return public.returnMsg(False, s_mgr) s_mgr.start_project() return public.returnMsg(True, "启动指令已执行,请注意查看日志") def start_project(self, get): get.name = get.project_name return self.StartProject(get) def __stop_project(self, project_conf, reconstruction=False): """停止项目 @author baozi <202-02-22> @param: project_conf ( dict ): 站点配置 @return bool : 是否停止成功 """ project_name = project_conf["pjname"] if not self.get_project_run_state(project_name): return True pid_file = "{}/{}.pid".format(self._pid_path, project_conf["pjname"]) pid = int(public.readFile(pid_file)) pids = self.get_project_pids(pid=pid) if not pids: return True self.kill_pids(pids=pids) if os.path.exists(pid_file): os.remove(pid_file) return True @staticmethod def kill_pids(pids=None): """ @name 结束进程列表 @author hwliang<2021-08-10> @param pids: string<进程pid列表> @return dict """ if not pids: return public.return_data(True, '没有进程') pids = sorted(pids, reverse=True) for i in pids: try: p = psutil.Process(i) p.terminate() except: pass for i in pids: try: p = psutil.Process(i) p.kill() except: pass return public.return_data(True, '进程已全部结束') def StopProject(self, get): project_name = None if hasattr(get, "name"): project_name = get.name.strip() if hasattr(get, "project_name"): project_name = get.project_name.strip() if not project_name: return public.returnMsg(False, "请选择要停止的项目") project_find = self.get_project_find(project_name) if not project_find: return public.returnMsg(False, "没有该项目,请尝试刷新页面后重试") # 2024.4.3 修复项目过期时间判断不对 mEdate = time.strftime('%Y-%m-%d', time.localtime()) if project_find['edate'] != "0000-00-00" and project_find['edate'] < mEdate: return public.return_error('当前项目已过期,请重新设置项目到期时间') from mod.project.python.serviceMod import ServiceManager s_mgr = ServiceManager.new_mgr(project_name) if isinstance(s_mgr, str): return public.returnMsg(False, s_mgr) s_mgr.stop_project() return public.returnMsg(True, "停止指令已执行,请注意查看日志") def only_stop_main_project(self, project_name): """停止项目的api接口 @author baozi <202-02-22> @param: get ( dict ): 请求信息 @return msg : 返回停止操作的结果 """ project_find = self.get_project_find(project_name) if not project_find: return public.returnMsg(False, "没有该项目,请尝试刷新页面") project_conf = project_find["project_config"] if self.prep_status(project_conf) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") res = self.__stop_project(project_conf) pdata = { "project_config": json.dumps(project_conf) } public.M('sites').where('name=?', (project_name,)).update(pdata) if res: self.stop_by_user(self.get_project_find(project_name)["id"]) return public.returnMsg(True, "项目停止成功") else: return public.returnMsg(False, "项目停止失败") def restart_project(self, get): get.name = get.project_name return self.RestartProject(get) def RestartProject(self, get): name = get.name.strip() project_find = self.get_project_find(name) if not project_find: return public.returnMsg(False, "没有该项目,请尝试刷新页面后重试") # 2024.4.3 修复项目过期时间判断不对 mEdate = time.strftime('%Y-%m-%d', time.localtime()) if project_find['edate'] != "0000-00-00" and project_find['edate'] < mEdate: return public.return_error('当前项目已过期,请重新设置项目到期时间') conf = project_find["project_config"] if self.prep_status(conf) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") from mod.project.python.serviceMod import ServiceManager s_mgr = ServiceManager.new_mgr(name) if isinstance(s_mgr, str): return public.returnMsg(False, s_mgr) s_mgr.stop_project() s_mgr.start_project() return public.returnMsg(True, "项目重启指令已执行,请注意查看日志") def stop_project(self, get): get.name = get.project_name return self.StopProject(get) def remove_project(self, get): get.name = get.project_name get.remove_env = True return self.RemoveProject(get) def RemoveProject(self, get): """删除项目接口 @author baozi <202-02-22> @param: get ( dict ): 请求信息对象 @return msg : 是否删除成功 """ name = get.name.strip() project = self.get_project_find(name) if not project: return public.returnMsg(False, "没有该项目,请尝试刷新页面") conf = project["project_config"] if not conf: return public.returnMsg(False, "没有该项目,请尝试刷新页面") if self.prep_status(conf) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") pid = self.get_project_run_state(name) if pid: self.StopProject(get) self.del_crontab(name) self.remove_redirect_by_project_name(get.name) self.clear_config(get.name) logfile = self._logs_path + "/%s.log" % conf["pjname"] try: em = EnvironmentManager() python_bin = conf.get("python_bin", "") if not python_bin: python_bin_data = em.get_env_py_path(conf["vpath"]) if python_bin_data: python_bin = python_bin_data.bin_path if python_bin: em.multi_remove_env(python_bin) except Exception as e: pass if os.path.exists(logfile): os.remove(logfile) if os.path.exists(conf["path"] + "/uwsgi.ini"): os.remove(conf["path"] + "/uwsgi.ini") if os.path.exists(conf["path"] + "/gunicorn_conf.py"): os.remove(conf["path"] + "/gunicorn_conf.py") for suffix in ("_python.sh", "_uwsgi.sh", "_gunicorn.sh", "_cmd.sh"): cmd_file = os.path.join("{}/{}{}".format(self._script_path, conf["pjname"], suffix)) if os.path.exists(cmd_file): os.remove(cmd_file) from mod.base.web_conf import remove_sites_service_config remove_sites_service_config(get.name, "python_") public.M('domain').where('pid=?', (project['id'],)).delete() public.M('sites').where('name=?', (name,)).delete() public.WriteLog(self._log_name, '删除Python项目{}'.format(name)) return public.returnMsg(True, "删除成功") @staticmethod def _check_venv_path(v_path: str, project_id) -> bool: site_list = public.M('sites').where('project_type=?', ('Python',)).select() if not isinstance(site_list, list): return True for site in site_list: conf = json.loads(site["project_config"]) if conf["vpath"] == v_path and site["id"] != project_id: return False return True @staticmethod def xsssec(text): return text.replace('<', '<').replace('>', '>') @staticmethod def last_lines(filename, lines=1): block_size = 3145928 block = '' nl_count = 0 start = 0 fsock = open(filename, 'rU') try: fsock.seek(0, 2) curpos = fsock.tell() while (curpos > 0): curpos -= (block_size + len(block)) if curpos < 0: curpos = 0 fsock.seek(curpos) try: block = fsock.read() except: continue nl_count = block.count('\n') if nl_count >= lines: break for n in range(nl_count - lines + 1): start = block.find('\n', start) + 1 finally: fsock.close() return block[start:] @staticmethod def _project_logfile(project_conf): if project_conf["stype"] in ("python", "command"): log_file = project_conf["logpath"] + "/error.log" elif project_conf["stype"] == "gunicorn": log_file = project_conf["logpath"] + "/gunicorn_error.log" else: log_file = project_conf["logpath"] + "/uwsgi.log" return log_file def GetProjectLog(self, get): """获取项目日志api @author baozi <202-02-22> @param: get ( dict ): 请求信息,需要包含项目名称 @return msg : 日志信息 """ project_conf = self._get_project_conf(get.name.strip()) if not project_conf: return public.returnMsg(False, '项目不存在') log_file = self._project_logfile(project_conf) if not os.path.exists(log_file): return public.returnMsg(False, '日志文件不存在') log_file_size = os.path.getsize(log_file) if log_file_size > 3145928: return {"status": True, "path": log_file, "data": self.xsssec(self.last_lines(log_file, 3000)), "size": public.to_size(log_file_size)} return {"status": True, "path": log_file, "data": self.xsssec(public.GetNumLines(log_file, 3000)), "size": public.to_size(log_file_size)} def GetProjectList(self, get): """获取项目列表 @author baozi <202-02-22> @param: get ( dict ): 请求信息 @return msg : _description_ """ if not self.need_update_project("mod"): self.update_all_project() if not 'p' in get: get.p = 1 if not 'limit' in get: get.limit = 20 if not 'callback' in get: get.callback = '' if not 'order' in get: get.order = 'id desc' type_id = None if "type_id" in get: try: type_id = int(get.type_id) except: type_id = None if 'search' in get: get.project_name = get.search.strip() search = "%{}%".format(get.project_name) if type_id is None: count = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?)', ('Python', search, search)).count() data = public.get_page(count, int(get.p), int(get.limit), get.callback) data['data'] = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?)', ('Python', search, search)).limit(data['shift'] + ',' + data['row']).order(get.order).select() else: count = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?) AND type_id = ?', ('Python', search, search, type_id)).count() data = public.get_page(count, int(get.p), int(get.limit), get.callback) data['data'] = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?) AND type_id = ?', ('Python', search, search, type_id)).limit(data['shift'] + ',' + data['row']).order(get.order).select() else: if type_id is None: count = public.M('sites').where('project_type=?', 'Python').count() data = public.get_page(count, int(get.p), int(get.limit), get.callback) data['data'] = public.M('sites').where('project_type=?', 'Python').limit(data['shift'] + ',' + data['row']).order(get.order).select() else: count = public.M('sites').where('project_type=? AND type_id = ?', ('Python', type_id)).count() data = public.get_page(count, int(get.p), int(get.limit), get.callback) data['data'] = public.M('sites').where('project_type=? AND type_id = ?', ('Python', type_id)).limit(data['shift'] + ',' + data['row']).order(get.order).select() if isinstance(data["data"], str) and data["data"].startswith("error"): raise public.PanelError("数据库查询错误:" + data["data"]) for i in range(len(data['data'])): data['data'][i]["ssl"] = self.get_ssl_end_date(data['data'][i]["name"]) self._get_project_state(data['data'][i]) return data def _get_project_state(self, project_info): """获取项目详情信息 @author baozi <202-02-22> @param: project_info ( dict ): 项目详情 @return : 项目详情的列表 """ if not isinstance(project_info['project_config'], dict): project_info['project_config'] = json.loads(project_info['project_config']) pyenv = self.environment_manager.get_env_py_path( project_info['project_config'].get("python_bin", project_info['project_config']["vpath"]) ) if pyenv: project_info["shell_active"] = self.get_active_shell(project_info["name"], pyenv) project_info["pyenv_data"] = pyenv.to_dict() else: project_info["shell_active"] = "" project_info["pyenv_data"] = {} project_info["project_config"]["prep_status"] = self.prep_status(project_info['project_config']) if project_info["project_config"]["stype"] == "python": project_info["config_file"] = None elif project_info["project_config"]["stype"] == "uwsgi": project_info["config_file"] = '{}/uwsgi.ini'.format(project_info["project_config"]["path"]) else: project_info["config_file"] = '{}/gunicorn_conf.py'.format(project_info["project_config"]["path"]) pids = self.get_project_run_state(project_info["name"]) if not pids: project_info['run'], project_info['status'], project_info["project_config"]["status"] = False, 0, 0 project_info["listen"] = [] else: project_info['run'], project_info['status'], project_info["project_config"]["status"] = True, 1, 1 mem, cpu = self.get_mem_and_cpu(pids) project_info.update({"cpu": cpu, "mem": mem}) project_info["listen"] = self._list_listen(pids) project_info["pids"] = pids for i in ("start_sh", "stop_sh", "check_sh"): if i in project_info["project_config"]: project_info["project_config"].pop(i) def get_active_shell(self, p_name, pyenv) -> str: pyenv.use2project(p_name) env_file = os.path.join(self._env_path, "{}.env".format(p_name)) evn_sh = "\nsource {}\n".format(env_file) if pyenv.env_type == "conda": public.writeFile("{}/{}.sh".format(self._activate_path, p_name), pyenv.activate_shell() + evn_sh) else: return "unset _BT_PROJECT_ENV && source /www/server/panel/script/btpyprojectenv.sh {} {}".format(p_name, evn_sh) return "source {}/{}.sh".format(self._activate_path, p_name) @staticmethod def _list_listen(pids: List[int]) -> List[int]: res = set() try: for i in pids: try: p = psutil.Process(i) for conn in (p.net_connections() if hasattr(p, "net_connections") else p.connections()): if conn.status == "LISTEN": res.add(conn.laddr.port) except: pass except: pass return list(res) def ChangeProjectConf(self, get): """修改项目配置信息 @author baozi <202-02-22> @param: get ( dict ): 用户请求信息 包含name,data @return """ conf = self._get_project_conf(get.name.strip()) if not conf: return public.returnMsg("没有该项目") if self.prep_status(conf) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") if not os.path.exists(conf["path"]): return public.returnMsg(False, "项目文件丢失,请尝试移除本项目,重新建立") data: dict = get.data change_values = {} if "call_app" in data and data["call_app"] != conf["call_app"]: conf["call_app"] = data["call_app"] change_values["call_app"] = data["call_app"] try: if "env_list" in data and isinstance(data["env_list"], str): conf["env_list"] = json.loads(data["env_list"]) except: return public.returnMsg(False, "环境变量格式错误") if "env_list" in data and isinstance(data["env_list"], list): conf["env_list"] = data["env_list"] if "env_file" in data and isinstance(data["env_file"], str) and data["env_file"] != conf["env_file"]: conf["env_file"] = data["env_file"] # stype if "stype" in data and data["stype"] != conf["stype"]: if data["stype"] not in ("uwsgi", "gunicorn", "python", "command"): return public.returnMsg(False, "启动方式选择错误") else: self.__stop_project(conf) conf["stype"] = data["stype"] if "xsgi" in data and data["xsgi"] != conf["xsgi"]: if data["xsgi"] not in ("wsgi", "asgi"): return public.returnMsg(False, "网络协议选择错误") else: conf["xsgi"] = data["stype"] change_values["xsgi"] = data["stype"] # 检查服务器部署的可行性 msg = self.__check_feasibility(conf) if msg: return public.returnMsg(False, msg) # rfile if "rfile" in data and data["rfile"] != conf["rfile"]: if not data["rfile"].startswith(conf["path"]): return public.returnMsg(False, "启动文件不在项目目录下") change_values["rfile"] = data["rfile"] conf["rfile"] = data["rfile"] # parm if conf["stype"] == "python": conf["parm"] = data["parm"] if "parm" in data else conf["parm"] # project_cmd if conf["stype"] == "command": project_cmd = conf.get("project_cmd", "") if "project_cmd" in data: project_cmd = data.get("project_cmd", "") if not project_cmd: return public.returnMsg(False, "没有自定义启动命令") else: conf["project_cmd"] = project_cmd # processes and threads try: if "processes" in data and int(data["processes"]) != int(conf["processes"]): change_values["processes"], conf["processes"] = int(data["processes"]), int(data["processes"]) if "threads" in data and int(data["threads"]) != int(conf["threads"]): change_values["threads"], conf["threads"] = int(data["threads"]), int(data["threads"]) except ValueError: return public.returnMsg(False, "线程或进程数设置有误") # port 某些情况下可以关闭 if "port" in data and data["port"] != conf["port"] and data["port"]: # flag, msg = self._check_port(data["port"]) # if not flag: # return public.returnMsg(False, msg) change_values["port"] = data["port"] conf["port"] = data["port"] # user if "user" in data and data["user"] != conf["user"]: if data["user"] in self.get_system_user_list(): change_values["user"] = data["user"] conf["user"] = data["user"] # auto_run if "auto_run" in data and data["auto_run"] != conf["auto_run"]: if isinstance(data["auto_run"], bool): conf["auto_run"] = data["auto_run"] # logpath if "logpath" in data and data["logpath"].strip() and data["logpath"] != conf["logpath"]: data["logpath"] = data["logpath"].rstrip("/") if os.path.isfile(data["logpath"]): return public.returnMsg(False, "日志路径不应当是一个文件") if '\n' in data["logpath"].strip(): return public.returnMsg(False, "日志路径不能包含换行") change_values["logpath"] = data["logpath"] conf["logpath"] = data["logpath"] # 特殊 uwsgi和gunicorn 不需要修改启动的脚本,只需要修改配置文件 if conf["stype"] == "gunicorn": if "loglevel" in data and data["loglevel"] != conf["loglevel"]: if data["loglevel"] in ("debug", "info", "warning", "error", "critical"): change_values["loglevel"] = data["loglevel"] conf["loglevel"] = data["loglevel"] config_file = public.readFile(conf["path"] + "/gunicorn_conf.py") if config_file: config_file = self.__change_gunicorn_config_to_file(change_values, config_file) public.writeFile(conf["path"] + "/gunicorn_conf.py", config_file) if conf["stype"] == "uwsgi": if "is_http" in data and isinstance(data["is_http"], bool): change_values["is_http"] = data["is_http"] conf["is_http"] = data["is_http"] if "port" not in change_values: change_values["port"] = conf["port"] config_file = public.readFile(conf["path"] + "/uwsgi.ini") if config_file: config_file = self.__change_uwsgi_config_to_file(change_values, config_file) public.writeFile(conf["path"] + "/uwsgi.ini", config_file) self.__prepare_start_conf(conf, force=True) # 尝试重启项目 msg = '' if not self.__stop_project(conf, reconstruction=True): msg = "修改成功,但尝试重启时,项目停止失败" if not self.__start_project(conf, reconstruction=True): msg = "修改成功,但尝试重启时,项目启动失败" pdata = { "project_config": json.dumps(conf) } public.M('sites').where('name=?', (get.name.strip(),)).update(pdata) public.WriteLog(self._log_name, 'Python项目{}, 修改了启动配置项'.format(get.name.strip())) if msg: return public.returnMsg(False, msg) return public.returnMsg(True, "修改成功") @staticmethod def __change_uwsgi_config_to_file(changes, config_file): """修改配置信息 @author baozi <202-03-08> @param: changes ( dict ): 改变的项和值 config_file ( string ): 需要改变的文件 @return """ reps = { "rfile": (r'wsgi-file\s{0,3}=\s{0,3}[^#\n]*\n', lambda x: f"wsgi-file={x.strip()}\n"), "processes": (r'processes\s{0,3}=\s{0,3}[\d]*\n', lambda x: f"processes={x.strip()}\n"), "threads": (r'threads\s{0,3}=\s{0,3}[\d]*\n', lambda x: f"threads={x.strip()}\n"), "user": ( r'uid\s{0,3}=\s{0,3}[^\n]*\ngid\s{0,3}=\s{0,3}[^\n]*\n', lambda x: f"uid={x.strip()}\ngid={x.strip()}\n" ), "logpath": (r'daemonize\s{0,3}=\s{0,3}.*\n', lambda x: f"daemonize={x.strip().rstrip('/')}/uwsgi.log\n"), "call_app": (r'callable\s*=\s{0,3}.*\n', lambda x: f"callable={x.strip()}\n") } if "logpath" in changes and not os.path.exists(changes['logpath']): os.makedirs(changes['logpath'], mode=0o777) for k, (rep, fun) in reps.items(): if k not in changes: continue config_file = re.sub(rep, fun(str(changes[k])), config_file) if "port" in changes: # 被用户关闭了预设的通信方式 if config_file.find("\n#http") != -1 and config_file.find("\n#socket") != -1: pass elif "is_http" in changes: # 按照预设的方式修改 rep = r"\n#?http\s{0,3}=\s{0,3}((\d{0,3}\.){3}\d{0,3})?:\d{2,5}\n#?socket\s{0,3}=\s{0,3}((\d{0,3}\.){3}\d{0,3})?:\d{2,5}\n" is_http, is_socket = ("", "#") if changes["is_http"] else ("#", "") new = "\n{is_http}http=0.0.0.0:{port}\n{is_socket}socket=0.0.0.0:{port}\n".format( is_http=is_http, port=changes["port"], is_socket=is_socket) config_file = re.sub(rep, new, config_file) else: rpe_h = r'http\s{0,3}=\s{0,3}((\d{0,3}\.){3}\d{0,3})?:\d{2,5}\n' config_file = re.sub(rpe_h, f"http=0.0.0.0:{changes['port']}\n", config_file) rpe_s = r'socket\s{0,3}=\s{0,3}((\d{0,3}\.){3}\d{0,3})?:\d{2,5}\n' config_file = re.sub(rpe_s, f"socket=0.0.0.0:{changes['port']}\n", config_file) return config_file @staticmethod def __prevent_re(test_str): # 防正则转译 re_char = ['$', '(', ')', '*', '+', '.', '[', ']', '{', '}', '?', '^', '|', '\\'] res = "" for i in test_str: if i in re_char: res += "\\" + i else: res += i return res def __get_uwsgi_config_from_file(self, config_file, conf): """检查并从修改的配置信息获取必要信息 @author baozi <202-03-08> @param: changes ( dict ): 改变的项和值 config_file ( string ): 需要改变的文件 @return """ # 检查必要项目 check_reps = [ (r"\n\s?chdir\s{0,3}=\s{0,3}" + self.__prevent_re(conf["path"]) + r"[^\n]*\n", "不能修改项目路径"), (r"\n\s?pidfile\s{0,3}=\s{0,3}" + self.__prevent_re(conf["path"] + "/uwsgi.pid") + r"[^\n]*\n", "不能修改项目的pidfile文件位置"), (r"\n\s?master\s{0,3}=\s{0,3}true[^\n]*\n", "不能修改主进程相关配置"), ] for rep, msg in check_reps: if not re.search(rep, config_file): return False, msg get_reps = { "rfile": (r'\n\s?wsgi-file\s{0,3}=\s{0,3}(?P[^#\n]*)\n', None), "module": (r'\n\s?module\s{0,3}=\s{0,3}(?P[^\n/:])*:[^\n]*\n', None), "processes": (r'\n\s?processes\s{0,3}=\s{0,3}(?P[\d]*)\n', None), "threads": (r'\n\s?threads\s{0,3}=\s{0,3}(?P[\d]*)\n', None), "logpath": ( r'\n\s?daemonize\s{0,3}=\s{0,3}(?P[^\n]*)\n', "没有检查到。配置项:日志路径,请注意您的修改") } changes = {} for k, (rep, msg) in get_reps.items(): res = re.search(rep, config_file) if not res and msg: return False, msg elif res: changes[k] = res.group("target").strip() if "module" in changes: _rfile = conf["path"] + changes["module"].replace(".", "/") + ".py" if os.path.isfile(_rfile): changes["rfile"] = _rfile changes.pop("module") if "logpath" in changes: if not os.path.exists(changes['logpath']): os.makedirs(changes['logpath'], mode=0o777) if "/" in changes["logpath"]: _path, filename = changes["logpath"].rsplit("/", 1) if filename != "uwsgi.log": return False, "为方便日志管理,日志文件名称请使用 uwsgi.log " else: changes["logpath"] = _path else: if changes["logpath"] != "uwsgi.log": return False, "为方便日志管理,日志文件名称请使用 uwsgi.log" else: changes["logpath"] = conf["path"] # port 相关查询 rep_h = r'\n\s{0,3}http\s{0,3}=\s{0,3}((\d{0,3}\.){3}\d{0,3})?:(?P\d{2,5})[^\n]*\n' rep_s = r'\n\s{0,3}socket\s{0,3}=\s{0,3}((\d{0,3}\.){3}\d{0,3})?:(?P\d{2,5})[^\n]*\n' res_http = re.search(rep_h, config_file) res_socket = re.search(rep_s, config_file) if res_http: changes["port"] = res_http.group("target").strip() elif res_socket: changes["port"] = res_socket.group("target").strip() else: # 被用户关闭了预设的通信方式 changes["port"] = "" return True, changes @staticmethod def __change_gunicorn_config_to_file(changes, config_file): """修改配置信息 @author baozi <202-03-08> @param: changes ( dict ): 改变的项和值 config_file ( string ): 需要改变的文件 @return """ reps = { "processes": (r'workers\s{0,3}=\s{0,3}[^\n]*\n', lambda x: f"workers = {x.strip()}\n"), "threads": (r'threads\s{0,3}=\s{0,3}[\d]*\n', lambda x: f"threads = {x.strip()}\n"), "user": (r'user\s{0,3}=\s{0,3}[^\n]*\n', lambda x: f"user = '{x.strip()}'\n"), "loglevel": (r'loglevel\s{0,3}=\s{0,3}[^\n]*\n', lambda x: f"loglevel = '{x.strip()}'\n"), "port": (r'bind\s{0,3}=\s{0,3}[^\n]*\n', lambda x: f"bind = '0.0.0.0:{x.strip()}'\n"), } for k, (rep, fun) in reps.items(): if k not in changes: continue config_file = re.sub(rep, fun(str(changes[k])), config_file) if "logpath" in changes: if not os.path.exists(changes['logpath']): os.makedirs(changes['logpath'], mode=0o777) rpe_accesslog = r'''accesslog\s{0,3}=\s{0,3}['"](/[^/\n]*)*['"]\n''' config_file = re.sub(rpe_accesslog, "accesslog = '{}/gunicorn_acess.log'\n".format(changes['logpath']), config_file) rpe_errorlog = r'''errorlog\s{0,3}=\s{0,3}['"](/[^/\n]*)*['"]\n''' config_file = re.sub(rpe_errorlog, "errorlog = '{}/gunicorn_error.log'\n".format(changes['logpath']), config_file) return config_file def __get_gunicorn_config_from_file(self, config_file, conf): """修改配置信息 @author baozi <202-03-08> @param: config_file ( dict ): 被改变的文件 conf ( string ): 项目原配置 @return """ # 检查必要项目 check_reps = [ (r'''\n\s?chdir ?= ?["']''' + self.__prevent_re(conf["path"]) + '''["']\n''', "请不要修改项目路径"), (r'''\n\s?pidfile\s{0,3}=\s{0,3}['"]''' + self.__prevent_re( conf["path"] + "/gunicorn.pid") + r'''['"][^\n]*\n''', "不能修改项目的pidfile文件位置,这将导致我们无法准确监控项目运行情况"), (r'''\n\s?worker_class\s{0,3}=\s{0,3}((['"]sync['"])|(['"]uvicorn\.workers\.UvicornWorker['"]))[^\n]*\n''', "请不要修改worker_class相关配置"), ] for rep, msg in check_reps: if not re.findall(rep, config_file): return False, msg get_reps = { "port": (r'''\n\s?bind\s{0,3}=\s{0,3}['"]((\d{0,3}\.){3}\d{0,3})?:(?P\d{2,5})['"][^\n]*\n''', "没有检查到配置项:bind,请注意您的修改"), "processes": (r'\n\s?workers\s{0,3}=\s{0,3}(?P[^\n]*)[^\n]*\n', None), "threads": (r'\n\s?threads\s{0,3}=\s{0,3}(?P[\d]*)[^\n]*\n', None), "logpath": (r'''\n\s?errorlog\s{0,3}=\s{0,3}['"](?P[^"'\n]*)['"][^\n]*\n''', "没有检查到配置项:日志路径,请注意您的修改"), "loglevel": (r'''\n\s?loglevel\s{0,3}=\s{0,3}['"](?P[^'"\n]*)['"][^\n]*\n''', "没有检查到配置项:日志等级,请注意您的修改") } changes: Dict[str, str] = {} for k, (rep, msg) in get_reps.items(): res = re.search(rep, config_file) if not res and msg: return False, msg elif res: changes[k] = str(res.group("target").strip()) if "logpath" in changes: if not os.path.exists(changes['logpath']): os.makedirs(changes['logpath'], mode=0o777) if "/" in changes["logpath"]: _path, filename = changes["logpath"].rsplit("/", 1) if filename != "gunicorn_error.log": return False, "为方便日志管理,日志文件名称请使用 gunicorn_error.log" else: changes["logpath"] = _path else: if changes["logpath"] != "gunicorn_error.log": return False, "为方便日志管理,日志文件名称请使用 gunicorn_error.log" else: changes["logpath"] = conf["path"] rep_accesslog = r'''\n\s?accesslog\s{0,3}=\s{0,3}['"]''' + self.__prevent_re( changes["logpath"] + "/gunicorn_acess.log") + r'''['"][^\n]*\n''' if not re.search(rep_accesslog, config_file): return False, "为方便日志管理, 请将错误日志(errorlog) 与 访问日志(accesslog) 放到同一文件路径下" if "loglevel" in changes: if not changes["loglevel"] in ("debug", "info", "warning", "error", "critical"): return False, "日志等级配置错误" return True, changes @staticmethod def get_ssl_end_date(project_name): ''' @name 获取SSL信息 @author hwliang<2021-08-09> @param project_name 项目名称 @return dict ''' import data return data.data().get_site_ssl_info('python_{}'.format(project_name)) def GetProjectInfo(self, get): """获取项目所有信息 @author baozi <202-03-08> @param: get ( dict ): 请求信息,站点名称name @return """ project = self.get_project_find(get.name.strip()) if not project: return public.returnMsg(False, "没该项目") if self.prep_status(project["project_config"]) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") self._get_project_state(project) project_conf = project["project_config"] if project_conf["stype"] == "python": return project project_conf["processes"] = project_conf["processes"] if "processes" in project_conf else 4 project_conf["threads"] = project_conf["threads"] if "threads" in project_conf else 2 if project_conf["stype"] != "python": project_conf["is_http"] = bool(project_conf.get("is_http", True)) project["ssl"] = self.get_ssl_end_date(get.name.strip()) return project # 取文件配置 def GetConfFile(self, get): """获取项目配置文件信息 @author baozi <202-03-08> @param: get ( dict ): 用户请求信息 包含name @return 文件信息 """ project_conf = self._get_project_conf(get.name.strip()) if not project_conf: return public.return_error('项目不存在') import files if project_conf["stype"] in ("python", "command"): return public.returnMsg(False, "Python或自定义命令的启动方式没有配置文件可修改") elif project_conf["stype"] == "gunicorn": get.path = project_conf["path"] + "/gunicorn_conf.py" else: get.path = project_conf["path"] + "/uwsgi.ini" f = files.files() return f.GetFileBody(get) # 保存文件配置 def SaveConfFile(self, get): """修改项目配置文件信息 @author baozi <202-03-08> @param: get ( dict ): 用户请求信息 包含name,data,encoding @return 文件信息 """ project_conf = self._get_project_conf(get.name.strip()) if not project_conf: return public.return_error('项目不存在') import files data = get.data if project_conf["stype"] == "python": return public.returnMsg(False, "Python启动方式没有配置文件可修改") elif project_conf["stype"] == "gunicorn": get.path = project_conf["path"] + "/gunicorn_conf.py" flag, changes = self.__get_gunicorn_config_from_file(data, project_conf) if not flag: return public.returnMsg(False, changes) else: get.path = project_conf["path"] + "/uwsgi.ini" flag, changes = self.__get_uwsgi_config_from_file(data, project_conf) if not flag: return public.returnMsg(False, changes) project_conf.update(changes) f = files.files() get.encoding = "utf-8" result = f.SaveFileBody(get) if not result["status"]: return public.returnMsg(False, "保存失败") # 尝试重启项目 msg = '' if not self.__stop_project(project_conf, reconstruction=True): msg = "修改成功,但尝试重启时,项目停止失败" if not self.__start_project(project_conf, reconstruction=True): msg = "修改成功,但尝试重启时,项目启动失败" pdata = { "project_config": json.dumps(project_conf) } public.M('sites').where('name=?', (get.name.strip(),)).update(pdata) public.WriteLog(self._log_name, 'Python项目{}, 修改了启动配置项'.format(get.name.strip())) if msg: return public.returnMsg(False, msg) return public.returnMsg(True, "修改成功") # ——————————————————————————————————————————— # Nginx 与 Apache 相关的设置内容(包含SSL) | # ——————————————————————————————————————————— def exists_nginx_ssl(self, project_name): ''' @name 判断项目是否配置Nginx SSL配置 @author hwliang<2021-08-09> @param project_name: string<项目名称> @return tuple ''' config_file = "{}/nginx/python_{}.conf".format(public.get_vhost_path(), project_name) if not os.path.exists(config_file): return False, False config_body = public.readFile(config_file) if not config_body: return False, False is_ssl, is_force_ssl = False, False if config_body.find('ssl_certificate') != -1: is_ssl = True if config_body.find('HTTP_TO_HTTPS_START') != -1: is_force_ssl = True return is_ssl, is_force_ssl def exists_apache_ssl(self, project_name): ''' @name 判断项目是否配置Apache SSL配置 @author hwliang<2021-08-09> @param project_name: string<项目名称> @return bool ''' config_file = "{}/apache/python_{}.conf".format(public.get_vhost_path(), project_name) if not os.path.exists(config_file): return False, False config_body = public.readFile(config_file) if not config_body: return False, False is_ssl, is_force_ssl = False, False if config_body.find('SSLCertificateFile') != -1: is_ssl = True if config_body.find('HTTP_TO_HTTPS_START') != -1: is_force_ssl = True return is_ssl, is_force_ssl def set_apache_config(self, project): ''' @name 设置Apache配置 @author hwliang<2021-08-09> @param project: dict<项目信息> @return bool ''' project_name = project['name'] # 处理域名和端口 ports = [] domains = [] for d in project['project_config']['domains']: domain_tmp = d.rsplit(':', 1) if len(domain_tmp) == 1: domain_tmp.append(80) if not int(domain_tmp[1]) in ports: ports.append(int(domain_tmp[1])) if not domain_tmp[0] in domains: domains.append(domain_tmp[0]) config_file = "{}/apache/python_{}.conf".format(self._vhost_path, project_name) template_file = "{}/template/apache/python_http.conf".format(self._vhost_path) config_body = public.readFile(template_file) apache_config_body = '' # 旧的配置文件是否配置SSL is_ssl, is_force_ssl = self.exists_apache_ssl(project_name) if is_ssl: if not 443 in ports: ports.append(443) from panelSite import panelSite s = panelSite() proxy_port = project['project_config'].get("port", "") if not proxy_port: pids = self.get_project_run_state(project_name) if not pids: listen_port = self._list_listen(pids) if listen_port: proxy_port = listen_port[0] if not proxy_port: proxy_info = project['project_config'].get("proxy_info", []) if proxy_info: proxy_port = proxy_info[0].get("proxy_port", "") # 根据端口列表生成配置 for p in ports: # 生成SSL配置 ssl_config = '' if p == 443 and is_ssl: ssl_key_file = "{vhost_path}/cert/{project_name}/privkey.pem".format(project_name=project_name, vhost_path=public.get_vhost_path()) if not os.path.exists(ssl_key_file): continue # 不存在证书文件则跳过 ssl_config = '''#SSL SSLEngine On SSLCertificateFile {vhost_path}/cert/{project_name}/fullchain.pem SSLCertificateKeyFile {vhost_path}/cert/{project_name}/privkey.pem SSLCipherSuite EECDH+CHACHA20:EECDH+CHACHA20-draft:EECDH+AES128:RSA+AES128:EECDH+AES256:RSA+AES256:EECDH+3DES:RSA+3DES:!MD5 SSLProtocol All -SSLv2 -SSLv3 -TLSv1 SSLHonorCipherOrder On'''.format(project_name=project_name, vhost_path=public.get_vhost_path()) else: if is_force_ssl: ssl_config = '''#HTTP_TO_HTTPS_START RewriteEngine on RewriteCond %{SERVER_PORT} !^443$ RewriteRule (.*) https://%{SERVER_NAME}$1 [L,R=301] #HTTP_TO_HTTPS_END''' # 生成vhost主体配置 apache_config_body += config_body.format( site_path=project['path'], server_name='{}.{}'.format(p, project_name), domains=' '.join(domains), log_path=public.get_logs_path(), server_admin='admin@{}'.format(project_name), url='http://127.0.0.1:{}'.format(proxy_port), port=p, ssl_config=ssl_config, project_name=project_name ) apache_config_body += "\n" # 添加端口到主配置文件 if p not in [80]: s.apacheAddPort(p) # 写.htaccess rewrite_file = "{}/.htaccess".format(project['path']) if not os.path.exists(rewrite_file): public.writeFile(rewrite_file, '# 请将伪静态规则或自定义Apache配置填写到此处\n') from mod.base.web_conf import ap_ext apache_config_body = ap_ext.set_extension_by_config(project_name, apache_config_body) # 写配置文件 public.writeFile(config_file, apache_config_body) return True def set_nginx_config(self, project): ''' @name 设置Nginx配置 @author hwliang<2021-08-09> @param project: dict<项目信息> @return bool ''' project_name = project['name'] ports = [] domains = [] for d in project['project_config']['domains']: domain_tmp = d.rsplit(':', 1) if len(domain_tmp) == 1: domain_tmp.append(80) if not int(domain_tmp[1]) in ports: ports.append(int(domain_tmp[1])) if not domain_tmp[0] in domains: domains.append(domain_tmp[0]) listen_ipv6 = public.listen_ipv6() config_file = "{}/nginx/python_{}.conf".format(self._vhost_path, project_name) def creat_by_template(): listen_ports_list = [] for p in ports: listen_ports_list.append(" listen {};".format(p)) if listen_ipv6: listen_ports_list.append(" listen [::]:{};".format(p)) template_file = "{}/template/nginx/python_http.conf".format(self._vhost_path) listen_ports = "\n".join(listen_ports_list).strip() config_body_template = public.readFile(template_file) config_body = config_body_template.format( site_path=project['path'], domains=' '.join(domains), project_name=project_name, panel_path=self._panel_path, log_path=public.get_logs_path(), listen_ports=listen_ports, ssl_config='', proxy="" # 添加代理替换 ) rewrite_file = "{panel_path}/vhost/rewrite/python_{project_name}.conf".format( panel_path=self._panel_path, project_name=project_name) if not os.path.exists(rewrite_file): public.writeFile(rewrite_file, '# 请将伪静态规则或自定义NGINX配置填写到此处\n') if not os.path.exists("/www/server/panel/vhost/nginx/well-known"): os.makedirs("/www/server/panel/vhost/nginx/well-known", 0o600) apply_check = "{}/vhost/nginx/well-known/{}.conf".format(self._panel_path, project_name) if not os.path.exists(apply_check): public.writeFile(apply_check, '') from mod.base.web_conf import ng_ext config_body = ng_ext.set_extension_by_config(project_name, config_body) public.writeFile(config_file, config_body) def _modify_nginx_config(): proxy_info = project['project_config'].get('proxy_info', []) from mod.base import pynginx from mod.base.pynginx.extension import ServerTools, ConfigTools, ConfigFinder, LocationTools config_data = public.readFile(config_file) # 暂时禁用comment_line_count, 防止删除时误删除注释 pycn = pynginx.parse_string(config_data) ctools = ConfigTools(pycn) stools = ctools.get_mian_server() if not stools: raise Exception("未找到主配置") stools.modify_server_config( domains=domains, ports=ports, ) match_level = {"~*": 0, "~": 1, "=": 2,"": 3, "^~":4} l_list = stools.get_location(sub_directives=["proxy_pass"]) l_list.sort(key=lambda x: match_level[x.match] if x.match in match_level else -1, reverse=True) for l in l_list: proxy_pass = l.get_block().find_directives("proxy_pass")[0].get_parameters()[0] proxy_url = parse_url(proxy_pass) public.print_log(proxy_url.hostname) if proxy_url.hostname not in ("127.0.0.1", "localhost", "0.0.0.0", "::1"): continue proxy_path_data = [p["proxy_path"] for p in proxy_info if p["proxy_port"] == proxy_url.port] if len(proxy_path_data) == 0: # 不包含端口的本地代理,删除 stools.remove_location(l.match, l.modifier, hold_comments=("HTTP反向代理", )) # 保留注释 else: # 匹配成功,处理了的代理就重计划列中移除 l.match = proxy_path_data[0] l.parameters = [l.match] if not l.modifier else [l.modifier, l.match] proxy_info = [p for p in proxy_info if p["proxy_port"] != proxy_url.port] if len(proxy_info) > 0: for p in proxy_info: loc = stools.get_location(path=p["proxy_path"], modifier="", create=True)[0] ltools = LocationTools(loc) ltools.add_proxy("http://127.0.0.1:{}".format(p["proxy_port"])) public.writeFile(config_file, ctools.to_string()) error = public.checkWebConfig() if error is not True: public.print_log(error) public.writeFile(config_file, config_data) raise ValueError("Nginx配置文件有错误") if not os.path.isfile(config_file): creat_by_template() else: try: _modify_nginx_config() except : public.print_error() creat_by_template() return True def clear_nginx_config(self, project): ''' @name 清除nginx配置 @author hwliang<2021-08-09> @param project: dict<项目信息> @return bool ''' project_name = project['name'] config_file = "{}/nginx/python_{}.conf".format(self._vhost_path, project_name) if os.path.exists(config_file): os.remove(config_file) rewrite_file = "{panel_path}/vhost/rewrite/python_{project_name}.conf".format( panel_path=self._panel_path, project_name=project_name) if os.path.exists(rewrite_file): os.remove(rewrite_file) return True def clear_apache_config(self, project): ''' @name 清除apache配置 @author hwliang<2021-08-09> @param project_find: dict<项目信息> @return bool ''' project_name = project['name'] config_file = "{}/apache/python_{}.conf".format(self._vhost_path, project_name) if os.path.exists(config_file): os.remove(config_file) return True def get_project_find(self, project_name) -> Union[bool, dict]: ''' @name 获取指定项目配置 @author hwliang<2021-08-09> @param project_name 项目名称 @return dict ''' project_info = public.M('sites').where('project_type=? AND name=?', ('Python', project_name)).find() if isinstance(project_info, str): raise public.PanelError('数据库查询错误:' + project_info) if not project_info: return False project_info['project_config'] = json.loads(project_info['project_config']) if "env_list" not in project_info['project_config']: project_info['project_config']["env_list"] = [] if "env_file" not in project_info['project_config']: project_info['project_config']["env_file"] = "" return project_info def clear_config(self, project_name): ''' @name 清除项目配置 @author hwliang<2021-08-09> @param project_name: string<项目名称> @return bool ''' project_find = self.get_project_find(project_name) if not project_find: return False self.clear_nginx_config(project_find) self.clear_apache_config(project_find) public.serviceReload() return True def set_config(self, project_name): ''' @name 设置项目配置 @author hwliang<2021-08-09> @param project_name: string<项目名称> @return bool ''' project_find = self.get_project_find(project_name) if not project_find: return False if not project_find['project_config']: return False if not project_find['project_config']['bind_extranet']: return False if not project_find['project_config']['domains']: return False self.set_nginx_config(project_find) self.set_apache_config(project_find) public.serviceReload() return True def BindExtranet(self, get): ''' @name 绑定外网 @author hwliang<2021-08-09> @param get{ name: string<项目名称> } @return dict ''' res_msg = self._check_webserver() if res_msg: return public.return_error(res_msg) project_name = get.name.strip() project_find = self.get_project_find(project_name) if self.prep_status(project_find["project_config"]) == "running": return public.return_error("项目环境安装制作中.....
请勿操作") if not project_find: return public.return_error('项目不存在') if not project_find['project_config']['domains']: return public.return_error( '请先到【域名管理】选项中至少添加一个域名') project_find['project_config']['bind_extranet'] = 1 public.M('sites').where("id=?", (project_find['id'],)).setField('project_config', json.dumps(project_find['project_config'])) self.set_config(project_name) public.WriteLog(self._log_name, 'Python项目{}, 开启外网映射'.format(project_name)) return public.returnMsg(True, '开启外网映射成功') def unBindExtranet(self, get): ''' @name 解绑外网 @author hwliang<2021-08-09> @param get{ name: string<项目名称> } @return dict ''' project_name = get.name.strip() self.clear_config(project_name) public.serviceReload() project_find = self.get_project_find(project_name) project_find['project_config']['bind_extranet'] = 0 public.M('sites').where("id=?", (project_find['id'],)).setField( 'project_config', json.dumps(project_find['project_config'])) public.WriteLog(self._log_name, 'Python项目{}, 关闭外网映射'.format(project_name)) return public.returnMsg(True, '关闭成功') def GetProjectDomain(self, get): ''' @name 获取指定项目的域名列表 @author hwliang<2021-08-09> @param get{ name: string<项目名称> } @return dict ''' project_name = get.name.strip() project_id = public.M('sites').where('name=?', (project_name,)).getField('id') if not project_id: return public.returnMsg(False, '未查询到该网站') domains = public.M('domain').where('pid=?', (project_id,)).order('id desc').select() return domains def RemoveProjectDomain(self, get): ''' @name 为指定项目删除域名 @author hwliang<2021-08-09> @param get{ name: string<项目名称> domain: string<域名> } @return dict ''' project_name = get.name.strip() project_find = self.get_project_find(project_name) if not project_find: return public.return_error('指定项目不存在') domain_arr = get.domain.rsplit(':', 1) if len(domain_arr) == 1: domain_arr.append(80) # 从域名配置表中删除 project_id = public.M('sites').where('name=?', (project_name,)).getField('id') if len(project_find['project_config']['domains']) == 1: if int(project_find['project_config']['bind_extranet']): return public.returnMsg(False, '项目至少需要一个域名') domain_id = public.M('domain').where('name=? AND port=? AND pid=?', (domain_arr[0], domain_arr[1], project_id)).getField('id') if not domain_id: return public.returnMsg(False, '指定域名不存在') public.M('domain').where('id=?', (domain_id,)).delete() # 从 project_config 中删除 if get.domain in project_find['project_config']['domains']: project_find['project_config']['domains'].remove(get.domain) if get.domain + ":80" in project_find['project_config']['domains']: project_find['project_config']['domains'].remove(get.domain + ":80") public.M('sites').where('id=?', (project_id,)).save('project_config', json.dumps(project_find['project_config'])) public.WriteLog(self._log_name, '从项目:{},删除域名{}'.format(project_name, get.domain)) self.set_config(project_name) return public.returnMsg(True, '删除域名成功') def MultiRemoveProjectDomain(self, get): ''' @name 为指定项目删除域名 @author hwliang<2021-08-09> @param get{ name: string<项目名称> domain: string<域名> } @return dict ''' project_name = get.name.strip() project_find = self.get_project_find(project_name) if not project_find: return public.return_error('指定项目不存在') domain_ids: list = get.domain_ids try: if isinstance(domain_ids, str): domain_ids = json.loads(domain_ids) for i in range(len(domain_ids)): domain_ids[i] = int(domain_ids[i]) except: return public.returnMsg(False, '域名id参数错误') # 获取正确的IDS project_id = public.M('sites').where('name=?', (project_name,)).getField('id') _all_id = public.M('domain').where('pid=?', (project_id,)).field("id,name,port").select() if not isinstance(_all_id, list): return public.returnMsg(False, '网站数据错误') all_id = {i["id"]: (i["name"], i["port"]) for i in _all_id} # 从域名配置表中删除 for i in domain_ids: if i not in all_id: return public.returnMsg(False, '域名id参数不来自本站点') is_all = len(domain_ids) == len(all_id) not_del = None if is_all: domain_ids.sort(reverse=True) domain_ids, not_del = domain_ids[:-1], domain_ids[-1] if not_del: not_del = {"id": not_del, "name": all_id[not_del][0], "port": all_id[not_del][1]} public.M('domain').where(f'id IN ({",".join(["?"] * len(domain_ids))})', domain_ids).delete() del_domains = [] for i in domain_ids: # 从 project_config 中删除 d_n, d_p = all_id[i] del_domains.append(d_n + ':' + str(d_p)) if d_n in project_find['project_config']['domains']: project_find['project_config']['domains'].remove(d_n) if d_n + ':' + str(d_p) in project_find['project_config']['domains']: project_find['project_config']['domains'].remove(d_n + ':' + str(d_p)) public.M('sites').where('id=?', (project_id,)).save( 'project_config', json.dumps(project_find['project_config'])) public.WriteLog(self._log_name, '从项目:{},批量删除域名:'.format(project_name, del_domains)) self.set_config(project_name) if isinstance(not_del, dict): error_data = {not_del["name"]: "项目至少需要一个域名"} else: error_data = {} return { "status": True, "msg": "删除成功 :{}".format(del_domains), "error": error_data, "success": del_domains } def AddProjectDomain(self, get): ''' @name 为指定项目添加域名 @author hwliang<2021-08-09> @param get{ name: string<项目名称> domains: list<域名列表> } @return dict ''' project_name = get.name.strip() project_find = self.get_project_find(project_name) if not project_find: return public.return_error('指定项目不存在') project_id = project_find['id'] domains = get.domains check_cloud = False flag = False res_domains = [] for domain in domains: domain = domain.strip() if not domain: continue if "[" in domain and "]" in domain: # IPv6格式特殊处理 if "]:" in domain: domain_arr = domain.rsplit(":", 1) else: domain_arr = [domain] else: domain_arr = domain.split(':') domain_arr[0] = self.check_domain(domain_arr[0]) if domain_arr[0] is False: res_domains.append({"name": domain, "status": False, "msg": '域名格式错误'}) continue if len(domain_arr) == 1: domain_arr.append("") if domain_arr[1] == "": domain_arr[1] = 80 domain += ':80' try: if not (0 < int(domain_arr[1]) < 65535): res_domains.append({"name": domain, "status": False, "msg": '域名格式错误'}) continue except ValueError: res_domains.append({"name": domain, "status": False, "msg": '域名格式错误'}) continue if not public.M('domain').where('name=? AND port=?', (domain_arr[0], domain_arr[1])).count(): public.M('domain').add('name,pid,port,addtime', (domain_arr[0], project_id, domain_arr[1], public.getDate())) if not domain in project_find['project_config']['domains']: project_find['project_config']['domains'].append(domain) public.WriteLog(self._log_name, '成功添加域名{}到项目{}'.format(domain, project_name)) res_domains.append({"name": domain_arr[0], "status": True, "msg": '添加成功'}) if not check_cloud: public.check_domain_cloud(domain_arr[0]) check_cloud = True flag = True else: public.WriteLog(self._log_name, '添加域名错误,域名{}已存在'.format(domain)) res_domains.append( {"name": domain_arr[0], "status": False, "msg": '添加失败,域名{}已存在'.format(domain)}) if flag: public.M('sites').where('id=?', (project_id,)).save('project_config', json.dumps(project_find['project_config'])) self.set_config(project_name) return self._check_add_domain(project_name, res_domains) def auto_run(self): ''' @name 开机自动启动 ''' # 获取数据库信息 project_list = public.M('sites').where('project_type=?', ('Python',)).field('name,path,project_config').select() get = public.dict_obj() success_count = 0 error_count = 0 for project in project_list: project_config = json.loads(project['project_config']) if project_config['auto_run'] in [0, False, '0', None]: continue project_name = project['name'] project_state = self.get_project_run_state(project_name=project_name) if not project_state: get.name = project_name result = self.StartProject(get) if not result['status']: error_count += 1 error_msg = '自动启动Python项目[' + project_name + ']失败!' public.WriteLog(self._log_name, error_msg) else: success_count += 1 success_msg = '自动启动Python项目[' + project_name + ']成功!' public.WriteLog(self._log_name, success_msg) if (success_count + error_count) < 1: return False dene_msg = '共需要启动{}个Python项目,成功{}个,失败{}个'.format(success_count + error_count, success_count, error_count) public.WriteLog(self._log_name, dene_msg) return True # ————————————— # 日志切割 | # ————————————— def del_crontab(self, name): """ @name 删除项目日志切割任务 @auther hezhihong<2022-10-31> @return """ cron_name = '[勿删]Python项目[{}]运行日志切割'.format(name) cron_path = public.GetConfigValue('setup_path') + '/cron/' cron_list = public.M('crontab').where("name=?", (cron_name,)).select() if cron_list: for i in cron_list: if not i: continue cron_echo = public.M('crontab').where("id=?", (i['id'],)).getField('echo') args = {"id": i['id']} import crontab crontab.crontab().DelCrontab(args) del_cron_file = cron_path + cron_echo public.ExecShell("crontab -u root -l| grep -v '{}'|crontab -u root -".format(del_cron_file)) def add_crontab(self, name, log_conf, python_path): """ @name 构造站点运行日志切割任务 """ cron_name = f'[勿删]Python项目[{name}]运行日志切割' if not public.M('crontab').where('name=?', (cron_name,)).count(): cmd = '{pyenv} {script_path} {name}'.format( pyenv=python_path, script_path=self.__log_split_script_py, name=name ) args = { "name": cron_name, "type": 'day' if log_conf["log_size"] == 0 else "minute-n", "where1": "" if log_conf["log_size"] == 0 else log_conf["minute"], "hour": log_conf["hour"], "minute": log_conf["minute"], "sName": name, "sType": 'toShell', "notice": '0', "notice_channel": '', "save": str(log_conf["num"]), "save_local": '1', "backupTo": '', "sBody": cmd, "urladdress": '' } import crontab res = crontab.crontab().AddCrontab(args) if res and "id" in res.keys(): return True, "新建任务成功" return False, res["msg"] return True def change_cronta(self, name, log_conf): """ @name 更改站点运行日志切割任务 """ python_path = "/www/server/panel/pyenv/bin/python3" if not python_path: return False cronInfo = public.M('crontab').where('name=?', (f'[勿删]Python项目[{name}]运行日志切割',)).find() if not cronInfo: return self.add_crontab(name, log_conf, python_path) import crontab recrontabMode = crontab.crontab() id = cronInfo['id'] del (cronInfo['id']) del (cronInfo['addtime']) cronInfo['sBody'] = '{pyenv} {script_path} {name}'.format( pyenv=python_path, script_path=self.__log_split_script_py, name=name ) cronInfo['where_hour'] = log_conf['hour'] cronInfo['where_minute'] = log_conf['minute'] cronInfo['save'] = log_conf['num'] cronInfo['type'] = 'day' if log_conf["log_size"] == 0 else "minute-n" cronInfo['where1'] = '' if log_conf["log_size"] == 0 else log_conf['minute'] columns = 'where_hour,where_minute,sBody,save,type,where1' values = ( cronInfo['where_hour'], cronInfo['where_minute'], cronInfo['sBody'], cronInfo['save'], cronInfo['type'], cronInfo['where1']) recrontabMode.remove_for_crond(cronInfo['echo']) if cronInfo['status'] == 0: return False, '当前任务处于停止状态,请开启任务后再修改!' sync_res=recrontabMode.sync_to_crond(cronInfo) if not sync_res['status']: return False,sync_res['msg'] public.M('crontab').where('id=?', (id,)).save(columns, values) public.WriteLog('计划任务', '修改计划任务[' + cronInfo['name'] + ']成功') return True, '修改成功' def mamger_log_split(self, get): """管理日志切割任务 @author baozi <202-02-27> @param: get ( dict ): 包含name, mode, hour, minute @return """ name = get.name.strip() project_conf = self._get_project_conf(name_id=name) if not project_conf: return public.returnMsg(False, "没有该项目,请尝试刷新页面") try: _compress = False _log_size = float(get.log_size) if float(get.log_size) >= 0 else 0 _hour = get.hour.strip() if 0 <= int(get.hour) < 24 else "2" _minute = get.minute.strip() if 0 <= int(get.minute) < 60 else '0' _num = int(get.num) if 0 < int(get.num) <= 1800 else 180 if "compress" in get: _compress = int(get.compress) == 1 except (ValueError, AttributeError): _log_size = 0 _hour = "2" _minute = "0" _num = 180 _compress = False if _log_size != 0: _log_size = _log_size * 1024 * 1024 _hour = 0 _minute = 5 log_conf = { "log_size": _log_size, "hour": _hour, "minute": _minute, "num": _num, "compress": _compress, } flag, msg = self.change_cronta(name, log_conf) if flag: conf_path = '{}/data/run_log_split.conf'.format(public.get_panel_path()) if os.path.exists(conf_path): try: data = json.loads(public.readFile(conf_path)) except: data = {} else: data = {} data[name] = { "stype": "size" if bool(_log_size) else "day", "log_size": _log_size, "limit": _num, "compress": _compress, } public.writeFile(conf_path, json.dumps(data)) project_conf["log_conf"] = log_conf pdata = { "project_config": json.dumps(project_conf) } public.M('sites').where('name=?', (name,)).update(pdata) return public.returnMsg(flag, msg) def set_log_split(self, get): """设置日志计划任务状态 @author baozi <202-02-27> @param: get ( dict ): 包含项目名称name @return msg : 操作结果 """ name = get.name.strip() project_conf = self._get_project_conf(name_id=name) if not project_conf: return public.returnMsg(False, "没有该项目,请尝试刷新页面") cronInfo = public.M('crontab').where('name=?', (f'[勿删]Python项目[{name}]运行日志切割',)).find() if not cronInfo: return public.returnMsg(False, "该项目没有设置运行日志的切割任务") status_msg = ['停用', '启用'] status = 1 import crontab recrontabMode = crontab.crontab() if cronInfo['status'] == status: status = 0 recrontabMode.remove_for_crond(cronInfo['echo']) else: cronInfo['status'] = 1 sync_res=recrontabMode.sync_to_crond(cronInfo) if not sync_res['status']: return public.returnMsg(False, sync_res['msg']) public.M('crontab').where('id=?', (cronInfo["id"],)).setField('status', status) public.WriteLog('计划任务', '修改计划任务[' + cronInfo['name'] + ']状态为[' + status_msg[status] + ']') return public.returnMsg(True, '设置成功') def get_log_split(self, get): """获取站点的日志切割任务 @author baozi <202-02-27> @param: get ( dict ): name @return msg : 操作结果 """ name = get.name.strip() project_conf = self._get_project_conf(name_id=name) if not project_conf: return public.returnMsg(False, "没有该项目,请尝试刷新页面") cronInfo = public.M('crontab').where('name=?', (f'[勿删]Python项目[{name}]运行日志切割',)).find() if not cronInfo: return public.returnMsg(False, "该项目没有设置运行日志的切割任务") if "log_conf" not in project_conf: return public.returnMsg(False, "日志切割配置丢失,请尝试重新设置") res = project_conf["log_conf"] res["status"] = cronInfo["status"] return {"status": True, "data": res} # —————————————————————————————————————————————— # 对用户的项目目录进行预先读取, 获取有效信息 | # —————————————————————————————————————————————— def _get_requirements_by_readme_file(self, path) -> Optional[str]: readme_rep = re.compile("^[Rr][Ee][Aa][Dd][Mm][Ee]") readme_files = self.__search_file(readme_rep, path, this_type="file") if not readme_files: return None # 从README找安装依赖包文件 target_path = None requirements_rep = re.compile(r'pip\s+install\s+-r\s+(?P[A-z0-9_/.]*)') for i in readme_files: file_data = public.read_rare_charset_file(i) if not isinstance(file_data, str): continue target = re.search(requirements_rep, file_data) if target: requirements_path = os.path.join(path, target.group("target")) if os.path.exists(requirements_path) and os.path.isfile(requirements_path): target_path = str(requirements_path) break if not target_path: return None return target_path def _get_requirements_file_by_name(self, path) -> Optional[str]: requirements_rep = re.compile(r"^[rR]equirements\.txt$") requirements_path = self.__search_file(requirements_rep, path, this_type="file") if not requirements_path: requirements_rep2 = re.compile(r"^[Rr]equirements?") requirements_dir = self.__search_file(requirements_rep2, path, this_type="dir") if requirements_dir: for i in requirements_dir: tmp = self._get_requirements_file_by_name(i) if tmp: return tmp return None return requirements_path[0] def get_requirements_file(self, path: str) -> Optional[str]: requirement_path = self._get_requirements_file_by_name(path) if not requirement_path: requirement_path = self._get_requirements_by_readme_file(path) return requirement_path @staticmethod def _get_framework_by_requirements(requirements_path: str) -> Optional[str]: file_body = public.read_rare_charset_file(requirements_path) if not isinstance(file_body, str): return None rep_list = [ (r"[Dd]jango(\s*==|\s*\n)", "django"), (r"[Ff]lask(\s*==|\s*\n)", "flask"), (r"[Ss]anic(\s*==|\s*\n)", "sanic"), (r"[Ff]ast[Aa]pi(\s*==|\s*\n)", "fastapi"), (r"[Tt]ornado(\s*==|\s*\n)", "tornado"), (r"aiohttp(\s*==|\s*\n)", "aiohttp"), ] frameworks = set() for rep_str, framework in rep_list: if re.search(rep_str, file_body): frameworks.add(framework) if "aiohttp" in frameworks and len(frameworks) == 2: frameworks.remove("aiohttp") return frameworks.pop() if len(frameworks) == 1: return frameworks.pop() return None @staticmethod def _check_runfile_framework_xsgi( runfile_list: List[str], framework: str = None) -> Tuple[Optional[str], Optional[str], Optional[str]]: if not runfile_list: return None, None, None framework_check_dict = { "django": [ (re.compile(r"from\s+django\.core\.asgi\s+import\s+get_asgi_application"), "asgi"), (re.compile(r"get_asgi_application\(\)"), "asgi"), (re.compile(r"from\s+django\.core\.wsgi\s+import\s+get_wsgi_application"), "wsgi"), (re.compile(r"get_wsgi_application\(\)"), "wsgi"), ], "flask": [ (re.compile(r"from\s+flask\s+import(.*)Flask"), "wsgi"), (re.compile(r"\s*=\s*Flask\(.*\)"), "wsgi"), (re.compile(r"from\s+flask\s+import"), "wsgi"), ], "fastapi": [ (re.compile(r"from\s+fastapi\s+import(.*)FastAPI"), "asgi"), (re.compile(r"\s*=\s*FastAPI\(.*\)"), "asgi"), (re.compile(r"from\s+fastapi\s+import"), "asgi"), ], "sanic": [ (re.compile(r"from\s+sanic\s+import\s+Sanic"), "asgi"), (re.compile(r"\s*=\s*Sanic\(.*\)c"), "asgi"), (re.compile(r"from\s+sanic\s+import"), "asgi"), ], "tornado": [ (re.compile(r"import\s+tornado"), None), ], } if framework and framework in framework_check_dict: framework_check_dict = {framework: framework_check_dict[framework]} for i in runfile_list: file_data = public.read_rare_charset_file(i) if not isinstance(file_data, str): continue for tmp_framework, check_list in framework_check_dict.items(): for tmp_rep, xwgi in check_list: if re.search(tmp_rep, file_data): return i, tmp_framework, xwgi if framework: return runfile_list[0], framework, None if runfile_list: return runfile_list[0], None, None return None, None, None def _get_run_file_list(self, path, search_sub=False) -> List[str]: """ 常用的名称: manager,wsgi,asgi,app,main,run, server """ runfile_rep = re.compile(r"^(wsgi|asgi|app|main|manager|run|server)\.py$") maybe_runfile = self.__search_file(runfile_rep, path, this_type="file") if maybe_runfile: return maybe_runfile elif not search_sub: return [] for i in os.listdir(path): tmp_path = os.path.join(path, i) if os.path.isdir(tmp_path): maybe_runfile = self._get_run_file_list(tmp_path, search_sub=False) if maybe_runfile: return maybe_runfile return [] def get_info(self, get): """ 对用户的项目目录进行预先读取, 获取有效信息 @author baozi <202-03-10> @param: get ( dict ): 请求信息,包含path,路径 @return _type_ : _description_ """ if "path" not in get: return public.returnMsg(False, "没有选择项目路径信息") else: path = get.path.strip() if path[-1] == "/": path = path[:-1] if not os.path.exists(path): return public.returnMsg(False, "项目目录不存在") # 找requirement文件 requirement_path = self.get_requirements_file(path) maybe_runfile_list = self._get_run_file_list(path, search_sub=True) framework = None if requirement_path: framework = self._get_framework_by_requirements(requirement_path) runfile, framework, xsgi = self._check_runfile_framework_xsgi(maybe_runfile_list, framework) call_app = "app" if framework and runfile: values = { "framework": framework, "rfile": runfile, } call_app = self._get_callable_app(values) return { "framework": framework, "requirement_path": requirement_path, "runfile": runfile, "xsgi": xsgi, "call_app": call_app } @staticmethod def __search_file(name_rep: re.Pattern, path: str, this_type="file", exclude=None) -> List[str]: target_names = [] for f_name in os.listdir(path): f_name.encode('utf-8') target_name = name_rep.search(f_name) if target_name: target_names.append(f_name) res = [] for i in target_names: if exclude and i.find(exclude) != -1: continue _path = os.path.join(path, i) if this_type == "file" and os.path.isfile(_path): res.append(_path) continue if this_type == "dir" and not os.path.isfile(_path): res.append(_path) continue return res def get_info_by_runfile(self, get): """ 通过运行文件对用户的项目预先读取, 获取有效信息 @author baozi <202-03-10> @param: get ( dict ): 请求信息,包含path,路径 @return _type_ : _description_ """ if "runfile" not in get: return public.returnMsg(False, "没有选择项目路径信息") else: runfile = get.runfile.strip() if not os.path.isfile(runfile): return False, "项目运行文件错误" runfile, framework, xsgi = self._check_runfile_framework_xsgi([runfile]) if runfile is None: return { "framework": None, "xsgi": None, "call_app": None } values = { "framework": framework, "rfile": runfile, } call_app = self._get_callable_app(values) return { "framework": framework, "xsgi": xsgi, "call_app": call_app } def for_split(self, logsplit, project): """日志切割方法调用 @author baozi <202-03-20> @param: logsplit ( LogSplit ): 日志切割方法,传入 pjanme:项目名称 sfile:日志文件路径 log_prefix:产生的日志文件前缀 project ( dict ): 项目内容 @return """ if project['project_config']["stype"] == "uwsgi": # uwsgi 启动 log_file = project['project_config']["logpath"] + "/uwsgi.log" logsplit(project["name"], log_file, project["name"]) elif project['project_config']["stype"] == "gunicorn": # gunicorn 启动 log_file = project['project_config']["logpath"] + "/gunicorn_error.log" logsplit(project["name"], log_file, project["name"] + "_error") log_file2 = project['project_config']["logpath"] + "/gunicorn_acess.log" logsplit(project["name"], log_file2, project["name"] + "_acess") else: # 命令行启动或原本的python启动 log_file = project['project_config']["logpath"] + "/error.log" logsplit(project["name"], log_file, project["name"]) @staticmethod def _check_add_domain(site_name, domains): from panelSite import panelSite ssl_data = panelSite().GetSSL(type("get", tuple(), {"siteName": site_name})()) if not ssl_data["status"] or not ssl_data.get("cert_data", {}).get("dns", None): return {"domains": domains} domain_rep = [] for i in ssl_data["cert_data"]["dns"]: if i.startswith("*"): _rep = "^[^\.]+\." + i[2:].replace(".", "\.") else: _rep = "^" + i.replace(".", "\.") domain_rep.append(_rep) no_ssl = [] for domain in domains: if not domain["status"]: continue for _rep in domain_rep: if re.search(_rep, domain["name"]): break else: no_ssl.append(domain["name"]) if no_ssl: return { "domains": domains, "not_ssl": no_ssl, "tip": "本站点已启用SSL证书,但本次添加的域名:{},无法匹配当前证书,如有需求,请重新申请证书。".format( str(no_ssl)) } return {"domains": domains} # @staticmethod # def _get_pid_by_ps(check_sh: str) -> List[int]: # _check_sh = check_sh.rsplit("|", 1)[0] # _check_sh += "| awk '{print $2}'" # s, e = public.ExecShell(_check_sh) # pids = [int(i) for i in s.split("\n") if bool(i.strip())] # return pids def get_mem_and_cpu(self, pids: list): mem, cpusum = 0, 0 for pid in pids: res = self.get_process_info_by_pid(pid) if "memory_used" in res: mem += res["memory_used"] if "cpu_percent" in res: cpusum += res["cpu_percent"] return mem, cpusum @staticmethod def get_proc_rss(pid): status_path = '/proc/' + str(pid) + '/status' if not os.path.exists(status_path): return 0 status_file = public.readFile(status_path) if not status_file: return 0 rss = 0 try: rss = int(re.search(r'VmRSS:\s*(\d+)\s*kB', status_file).groups()[0]) except: pass rss = int(rss) * 1024 return rss def get_process_info_by_pid(self, pid): process_info = {} try: if not os.path.exists('/proc/{}'.format(pid)): return process_info p = psutil.Process(pid) status_ps = {'sleeping': '睡眠', 'running': '活动'} with p.oneshot(): process_info['memory_used'] = self.get_proc_rss(pid) process_info['cpu_percent'] = self.get_cpu_precent(p) return process_info except: return process_info def get_cpu_precent(self, p): ''' @name 获取进程cpu使用率 @author hwliang<2021-08-09> @param p: Process<进程对像> @return dict ''' skey = "cpu_pre_{}".format(p.pid) old_cpu_times = cache.get(skey) process_cpu_time = self.get_process_cpu_time(p.cpu_times()) if not old_cpu_times: cache.set(skey, [process_cpu_time, time.time()], 3600) old_cpu_times = cache.get(skey) process_cpu_time = self.get_process_cpu_time(p.cpu_times()) old_process_cpu_time = old_cpu_times[0] old_time = old_cpu_times[1] new_time = time.time() cache.set(skey, [process_cpu_time, new_time], 3600) percent = round(100.00 * (process_cpu_time - old_process_cpu_time) / (new_time - old_time) / psutil.cpu_count(), 2) return percent @staticmethod def get_process_cpu_time(cpu_times): cpu_time = 0.00 for s in cpu_times: cpu_time += s return cpu_time def get_project_status(self, project_id): # 仅使用在项目停止告警中 project_info = public.M('sites').where('project_type=? AND id=?', ('Python', project_id)).find() if not project_info: return None, project_info["name"] if self.is_stop_by_user(project_id): return True, project_info["name"] project_config = json.loads(project_info['project_config']) res = self.get_project_run_state(project_name=project_info["name"]) return bool(res), project_info["name"] @staticmethod def _serializer_of_list(s: list, installed: List[str]) -> List[Dict]: return [{ "version": v.version, "type": "stable", "installed": True if v.version in installed else False } for v in s] def list_py_version(self, get: public.dict_obj) -> Dict: """ 获取已安装的sdk,可安装的sdk """ if self.pyvm is None: return public.returnMsg(False, "Python版本管理工具丢失") force = False if "force" in get and get.force in ("1", "true"): force = True self.pyvm.async_version = True res = self.pyvm.python_versions(force) # res["command_path"] += self._project_env_path_list() install_data = public.M("tasks").where("status in (0, -1) and name LIKE ?", ("安装[Python-%",)).select() install_version = [] for i in install_data: install_version.append(i["name"].replace("安装[Python-", "").replace("]", "")) for i in res.get("sdk", {}).get("all", []): if i["version"] in install_version: i["is_install"] = True else: i["is_install"] = False for i in res.get("sdk", {}).get("streamline", []): if i["version"] in install_version: i["is_install"] = True else: i["is_install"] = False res.get("sdk", {}).get("all", []).sort(key=lambda x: (x["installed"], x["is_install"]), reverse=True) res.get("sdk", {}).get("streamline", []).sort(key=lambda x: (x["installed"], x["is_install"]), reverse=True) return res @staticmethod def _parser_version(version: str) -> Optional[str]: v_rep = re.compile(r"(?P\d+\.\d{1,2}(\.\d{1,2})?)") v_res = v_rep.search(version) if v_res: return v_res.group("target") return None def install_py_version(self, get: public.dict_obj) -> Dict: """ 安装一个版本的sdk """ if self.pyvm is None: return public.returnMsg(False, "Python包管理器错误") version = self._parser_version(getattr(get, "version", '')) if version is None: return public.returnMsg(False, "版本参数信息错误") is_pypy = False if "is_pypy" in get and get.is_pypy in ("1", "true"): is_pypy = True log_path = self._logs_path + "/py.log" out_err = open(log_path, "w") self.pyvm.set_std(out_err, out_err) self.pyvm.is_pypy = is_pypy flag, msg = self.pyvm.api_install(version) self.pyvm.set_std(sys.stdout, sys.stderr) time.sleep(0.1) out_err.close() if not flag: return public.returnMsg(False, msg) return public.returnMsg(True, "安装成功") def async_install_py_version(self, get: public.dict_obj) -> Dict: if not os.path.exists("{}/class/projectModel/btpyvm.py".format(public.get_panel_path())): return public.returnMsg(False, "Python包管理器错误, 请尝试修复面板") version = self._parser_version(getattr(get, "version", '')) if os.path.exists("{}/versions/{}".format(self._pyv_path, version)): return public.returnMsg(False, "该版本已经安装") if public.M("tasks").where("status in (0, -1) and name=?", ("安装[Python-{}]".format(version),)).find(): return public.returnMsg(False, "该版本的安装已加入任务队列, 请等待完成") extended = getattr(get, "extended", '') if version is None: return public.returnMsg(False, "版本参数信息错误") sh_str = "{}/pyenv/bin/python3 {}/class/projectModel/btpyvm.py install {} --extend='{}'".format( public.get_panel_path(), public.get_panel_path(), version, extended ) if not os.path.exists("/tmp/panelTask.pl"): # 如果当前任务队列并未执行,就把日志清空 public.writeFile('/tmp/panelExec.log', '') task_id = public.M('tasks').add( 'id,name,type,status,addtime,execstr', (None, '安装[Python-{}]'.format(version), 'execshell', '0', time.strftime('%Y-%m-%d %H:%M:%S'), sh_str)) self._create_install_wait_msg(task_id, version) return public.returnMsg(True, "任务已添加到队列") @staticmethod def _create_install_wait_msg(task_id: int, version: str): from panel_msg.msg_file import message_mgr file_path = "/tmp/panelExec.log" if not os.path.exists(file_path): public.writeFile(file_path, "") soft_name = "Python-{}".format(version) data = { "soft_name": "Python-{}".format(version), "install_status": "等待安装" + soft_name, "file_name": file_path, "self_type": "soft_install", "status": 0, "task_id": task_id } title = "等待安装" + soft_name res = message_mgr.collect_message(title, ["Python版本管理", soft_name], data) if isinstance(res, str): public.WriteLog("消息盒子", "安装信息收集失败") return None return res def uninstall_py_version(self, get: public.dict_obj) -> Dict: """ 卸载一个指定版本的sdk """ if self.pyvm is None: return public.returnMsg(False, "Python包管理器错误") version = self._parser_version(getattr(get, "version", '')) if version is None: return public.returnMsg(False, "版本参数信息错误") is_pypy = False if "is_pypy" in get and get.is_pypy in ("1", "true"): is_pypy = True self.pyvm.is_pypy = is_pypy flag, msg = self.pyvm.api_uninstall(version) if not flag: return public.returnMsg(False, msg) return public.returnMsg(True, "卸载成功") def update_all_project(self): all_project = public.M('sites').where('project_type=?', ('Python',)).select() if not isinstance(all_project, list): return for p in all_project: project_config = json.loads(p["project_config"]) if project_config["stype"] == "python": project_config["project_cmd"] = "{vpath} -u {run_file} {parm} ".format( vpath=self._get_vp_python(project_config["vpath"]), run_file=project_config['rfile'], parm=project_config['parm'] ) project_config["stype"] = "command" public.M("sites").where("id=?", (p["id"],)).update({"project_config": json.dumps(project_config)}) @staticmethod def _read_requirement_file(requirement_path): requirement_dict = {} requirement_data = public.read_rare_charset_file(requirement_path) if isinstance(requirement_data, str): for i in requirement_data.split("\n"): tmp_data = i.strip() if not tmp_data or tmp_data.startswith("#"): continue if re.search(r"-e\s+\.{0,2}/", tmp_data): # 本地库依赖且为可编辑模式的不安装 continue if tmp_data.find("git+") != -1: rep_name_list = [re.compile(r"#egg=(?P\S+)"), re.compile(r"/(?P\S+\.git)")] name = tmp_data for tmp_rep in rep_name_list: tmp_name = tmp_rep.search(tmp_data) if tmp_name: name = tmp_name.group("name") break ver = tmp_data for tmp_i in tmp_data.split(): if "git+" in tmp_i: ver = tmp_i requirement_dict[name] = ver elif tmp_data.find("file:") != -1: file = tmp_data.split("file:", 1)[1] name = os.path.basename(file) requirement_dict[name] = file else: if tmp_data.find("==") != -1: n, v = tmp_data.split("==", 1) requirement_dict[n.strip()] = v.strip() else: requirement_dict[tmp_data] = "--" return requirement_dict def get_env_info(self, get): force = False try: project_name = get.project_name.strip() if "force" in get: if isinstance(get.force, str): if get.force in ("1", "true"): force = True else: force = bool(get.force) except: return public.returnMsg(False, "参数错误") project_info = self.get_project_find(project_name) if not isinstance(project_info, dict): return public.returnMsg(False, "没有找到项目") conf = project_info["project_config"] pyenv = EnvironmentManager().get_env_py_path(conf.get("python_bin", conf.get("vpath"))) python_version = pyenv.version requirement_path = conf.get("requirement_path", "") if requirement_path and os.path.isfile(requirement_path): requirement_dict = self._read_requirement_file(requirement_path) else: requirement_dict = {} pip_list_data = pyenv.pip_list(force) pip_list = [] for p, v in pip_list_data: if p in requirement_dict: pip_list.append({"name": p, "version": v, "requirement": requirement_dict.pop(p)}) else: pip_list.append({"name": p, "version": v, "requirement": "--"}) for k, v in requirement_dict.items(): pip_list.append({"name": k, "version": "--", "requirement": v}) return { "python_version": python_version, "requirement_path": requirement_path, "pip_list": pip_list, "pip_source": self.pip_source_dict } def modify_requirement(self, get): try: project_name = get.project_name.strip() requirement_path = get.requirement_path.strip() except: return public.returnMsg(False, "参数错误") project_info = self.get_project_find(project_name) if not isinstance(project_info, dict): return public.returnMsg(False, "没有找到项目") conf = project_info["project_config"] if not os.path.isfile(requirement_path): return public.returnMsg(False, "requirement.txt文件不存在") conf["requirement_path"] = requirement_path public.M("sites").where("id=?", (project_info["id"],)).update({"project_config": json.dumps(conf)}) return public.returnMsg(True, "修改成功") def manage_package(self, get): """安装与卸载虚拟环境模块""" requirement_path = "" package_name = '' package_version = '' pip_source = "阿里云" active = "install" try: project_name = get.project_name.strip() if "package_name" in get and get.package_name.strip(): package_name = get.package_name.strip() if "package_version" in get and get.package_version.strip(): package_version = get.package_version.strip() if "requirement_path" in get and get.requirement_path.strip(): requirement_path = get.requirement_path.strip() if "active" in get and get.active.strip(): active = get.active.strip() if "pip_source" in get and get.pip_source.strip(): pip_source = get.pip_source.strip() if pip_source not in self.pip_source_dict: return public.returnMsg(False, "pip源错误") except: return public.returnMsg(False, "参数错误") log_file = "{}/pip_{}.log".format(self._logs_path, project_name) conf = self._get_project_conf(project_name) if not isinstance(conf, dict): return public.returnMsg(False, "没有该项目,请尝试刷新页面") pyenv = EnvironmentManager().get_env_py_path(conf.get("python_bin", conf.get("vpath", ""))) if not pyenv: return public.returnMsg(False, "没有找到python环境") public.writeFile(log_file, "") if self.prep_status(conf) == "running": return public.returnMsg(False, "项目环境安装制作中.....
请勿操作") if not (package_name or requirement_path): return public.returnMsg(False, "参数错误") if requirement_path: if not os.path.isfile(requirement_path): return public.returnMsg(False, "依赖包记录文件不存在") if active not in ("install", "uninstall"): return public.returnMsg(False, "操作参数错误") real_pip_source = self.pip_source_dict[pip_source] pyenv.set_pip_source(real_pip_source) log_file = "{}/pip_{}.log".format(self._logs_path, project_name) log_fd = open(log_file, "w") def call_log(log: str) -> None: if not log.endswith("\n"): log += "\n" log_fd.write(log) log_fd.flush() if requirement_path: conf["requirement_path"] = requirement_path public.M("sites").where("name=?", (project_name,)).update({"project_config": json.dumps(conf)}) self.install_requirement(conf, pyenv, call_log) log_fd.write("|-安装结束\n") log_fd.close() return public.returnMsg(True, "安装结束") if active == "install": res = pyenv.pip_install(package_name, version=package_version, call_log=call_log) log_fd.write("|-安装结束\n") log_fd.close() if res is None: return public.returnMsg(True, "安装成功") else: return public.returnMsg(False, "安装失败") else: if package_name == "pip": return public.returnMsg(False, "PIP不能卸载....") res = pyenv.pip_uninstall(package_name, call_log=call_log) log_fd.write("|-卸载结束\n") log_fd.close() if res is None: return public.returnMsg(True, "卸载成功") else: return public.returnMsg(False, "卸载失败") # # 虚拟终端环境启动 # def set_export(self, project_name): # conf = self._get_project_conf(project_name) # if not conf: # return False, "没有该项目\r\n" # v_path_bin = conf["vpath"] + "/bin" # if not os.path.exists(conf["path"]): # return False, "项目文件丢失\r\n" # if not os.path.exists(v_path_bin): # return False, "没有该虚拟环境\r\n" # pre_v_path_bin = self.__prevent_re(v_path_bin) # msg = "虚拟环境已就绪!" # 使用中文的感叹号 # _cd_sh = "clear\ncd %s\n" % conf["path"] # _sh = 'if [[ "$PATH" =~ "^%s:.*" ]]; then { echo "%s"; } else { export PATH="%s:${PATH}"; echo "%s"; } fi\n' % ( # pre_v_path_bin, msg, v_path_bin, msg # ) # return True, _sh + _cd_sh def get_port_status(self, get): try: conf = self.get_project_find(get.project_name.strip()) if not conf: return json_response(False, "未找到项目") except: return json_response(False, '参数错误') pids = self.get_project_run_state(get.project_name.strip()) if not pids: return json_response(False, "项目未启动") ports = self._list_listen(pids) if not ports: return json_response(False, "未找到端口") # 初始化结果字典 res: Dict[str, Dict] = {str(i): { "port": i, "fire_wall": None, "nginx_proxy": None, } for i in ports} # 获取端口规则列表 from firewallModel.comModel import main port_list = main().port_rules_list(get)['data'] # 更新防火墙信息 for i in port_list: if str(i["Port"]) in res: res[str(i["Port"])]['fire_wall'] = i try: # 读取配置文件 file_path = "{}/nginx/python_{}.conf".format(self._vhost_path, get.project_name) config_data = public.readFile(file_path) from mod.base import pynginx from mod.base.pynginx.extension import ServerTools, ConfigTools, ConfigFinder pync = pynginx.parse_string(config_data) if pync is None: return public.returnResult(status=False, msg="nginx配置文件解析失败!请尝试关闭外网映射并重新开启") ctool = ConfigTools(pync) stool = ctool.get_mian_server() if stool is None: return public.returnResult(status=False, msg="未找到nginx配置") locs = stool.get_location(create=False, sub_directives=["proxy_pass", ["proxy_set_header", "Host"]]) proxy_map = {} if locs: for loc in locs: proxy_pass = loc.top_find_directives("proxy_pass") pass_url = proxy_pass[0].get_parameters()[0] # 解析端口信息 res_url = parse_url(pass_url) if not res_url.hostname in ("127.0.0.1", "localhost", "0.0.0.0", "::1"): continue proxy_map[str(res_url.port)] = loc.match # 更新 nginx_proxy 信息 for i in res: if i in proxy_map: res[i]['nginx_proxy'] = { "proxy_dir": proxy_map[i], "status": True, "site_name": get.project_name, "proxy_port": i } return json_response(True, "获取成功", data=list(res.values())) except: return json_response(True, "获取成功", data=list(res.values())) @staticmethod def _project_domain_list(project_id: int): return public.M('domain').where('pid=?', (project_id,)).select() # 添加代理 def add_server_proxy(self, get): if not hasattr(get, "site_name") or not get.site_name.strip(): return json_response(status=False, msg="参数错误") project_data = self.get_project_find(get.site_name) if not project_data: return json_response(False, "未找到项目") status = get.get("status/d") proxy_path = get.get("proxy_path/s", "/") proxy_port = get.get("proxy_port/d", 0) if not 0 < proxy_port < 65535: return json_response(False, "请输入正确的端口范围") if not proxy_path.startswith("/"): proxy_path = "/" + proxy_path if not proxy_path.endswith("/"): proxy_path = proxy_path + "/" # 不能包含../ 这样的不安全路由路径 re_safe_uri = re.compile(r"\.\./") if re_safe_uri.search(proxy_path): return json_response(False, "请输入安全的路径") file_path = "{}/nginx/python_{}.conf".format(self._vhost_path, get.site_name) config_file = public.readFile(file_path) if not isinstance(config_file, str): return json_response(False, "未找到配置文件,请先开启外网映射") proxy_info = project_data["project_config"].setdefault("proxy_info", []) for p_info in proxy_info: if p_info["proxy_port"] == proxy_port: p_info["proxy_path"] = proxy_path p_info["status"] = status break else: proxy_info.append({ "proxy_path": proxy_path, "proxy_port": proxy_port, "status": status }) default_port = project_data["project_config"].get("port", "") try: default_ports = [int(default_port)] except: default_ports = [] listens = self._list_listen(self.get_project_run_state(get.site_name)) or default_ports for idx in range(len(proxy_info) - 1, -1, -1): p_info = proxy_info[idx] if not p_info["status"]: proxy_info.pop(idx) continue if p_info["proxy_port"] not in listens: proxy_info.pop(idx) pdata = { "project_config": json.dumps(project_data["project_config"]) } public.M('sites').where('name=?', (project_data["name"],)).update(pdata) self.set_config(project_data["name"]) return json_response(status=True, msg="成功更新代理配置") class PyenvSshTerminal(ssh_terminal.local_ssh_terminal): """ 实际上依靠前端执行 切换目录 + 设置环境变量 """ pass # _set_python_export = None # # def send(self): # ''' # @name 写入数据到缓冲区 # @author hwliang<2020-08-07> # @return void # ''' # try: # while self._ws.connected: # if self._s_code: # time.sleep(0.1) # continue # client_data = self._ws.receive() # if not client_data: continue # if client_data == '{}': continue # if len(client_data) > 10: # if client_data.find('{"host":"') != -1: # continue # if client_data.find('"resize":1') != -1: # self.resize(client_data) # continue # if client_data.find('{"pj_name"') != -1: # client_data = self.__set_export(client_data) # if not client_data: # continue # # self._ssh.send(client_data) # except Exception as ex: # ex = str(ex) # # if ex.find('_io.BufferedReader') != -1: # self.debug('从websocket读取数据发生错误,正在重新试') # self.send() # return # elif ex.find('closed') != -1: # self.debug('会话已中断') # else: # self.debug('写入数据到缓冲区发生错误: {}'.format(ex)) # # if not self._ws.connected: # self.debug('客户端已主动断开连接') # self.close() # # def __set_export(self, client_data): # _data = json.loads(client_data) # flag, msg = main().set_export(_data["pj_name"]) # if not flag: # self._ws.send(msg) # return None # return msg