| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import json |
| import os.path |
| import re |
| import shutil |
| import sys |
| from typing import Dict, Optional, List, Tuple |
|
|
| os.chdir("/www/server/panel") |
| if "class/" not in sys.path: |
| sys.path.insert(0, "class/") |
|
|
| import public |
|
|
|
|
| def is_ipv6() -> bool: |
| return os.path.exists('/www/server/panel/data/ipv6.pl') |
|
|
|
|
| class WebDav: |
| _WEBDAV_PATH = "{}/vhost/nginx/webdav".format(public.get_panel_path()) |
| _WEBDAV_DIR_AUTH_PATH = "{}/vhost/nginx/webdav/dir_auth/".format(public.get_panel_path()) |
| _WEBDAV_AUTH_PASS_PATH = "/www/server/pass/webdav_auth" |
| _TEMPLATE = """server |
| {{ |
| listen {listen_port};{listen_ipv6} |
| server_name {domain}; |
| |
| location / {{ |
| # DIRECTORY-AUTH-START 身份认证相关配置,请勿删除 |
| include /www/server/panel/vhost/nginx/webdav/dir_auth/{site_name}.conf; |
| # DIRECTORY-AUTH-END |
| root {site_path}; #注意修改成自己的目录 |
| client_max_body_size {client_max_body_size}M; #大文件支持 |
| autoindex on; |
| dav_methods PUT DELETE MKCOL COPY MOVE; |
| # 需要 nginx-dav-ext-module 才有下面的选项 |
| dav_ext_methods PROPFIND OPTIONS LOCK UNLOCK; |
| create_full_put_path on; |
| }} |
| |
| access_log /www/wwwlogs/webdav/{site_name}.log; |
| error_log /www/wwwlogs/webdav/{site_name}.error.log; |
| }} |
| """ |
|
|
| def __init__(self, site_name: str, site_path: str, domain: str, port: str, status: bool, |
| auth: Optional[Dict],client_max_body_size:int = 50): |
| self.domain = domain |
| self.site_name = site_name |
| self.site_path = site_path |
| self.port = port |
| self.auth = auth |
| self.status = status |
| self.client_max_body_size=client_max_body_size |
| |
| |
| |
| |
| |
| |
|
|
| def set_config(self) -> bool: |
| listen_ipv6 = '' |
| if is_ipv6(): |
| listen_ipv6 = "\n listen [::]:%s;" % self.port |
| new_config = self._TEMPLATE.format( |
| listen_port=self.port, |
| listen_ipv6=listen_ipv6, |
| domain=self.domain, |
| site_name=self.site_name, |
| site_path=self.site_path, |
| client_max_body_size=self.client_max_body_size |
| ) |
| webdav_dir_auth_path = "{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) |
| if not os.path.exists(self._WEBDAV_DIR_AUTH_PATH): |
| os.makedirs(self._WEBDAV_DIR_AUTH_PATH, 0o600) |
| webdav_conf_path = "{}/{}.conf".format(self._WEBDAV_PATH, self.site_name) |
| if not os.path.isfile(webdav_dir_auth_path): |
| public.writeFile(webdav_dir_auth_path, "") |
|
|
| webdav_log_path = "/www/wwwlogs/webdav" |
| if not os.path.exists(webdav_log_path): |
| os.makedirs(webdav_log_path, 0o700) |
| public.ExecShell("chown www:www {}".format(webdav_log_path)) |
|
|
| public.writeFile(webdav_conf_path, new_config) |
| if public.checkWebConfig() is not True: |
| os.remove(webdav_conf_path) |
| return False |
|
|
| public.serviceReload() |
| return True |
|
|
| def remove(self): |
| auth_pass_file = "{}/{}.pass".format(self._WEBDAV_AUTH_PASS_PATH, self.site_name) |
| dir_auth_file = "{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) |
| webdav_conf_file = "{}/{}.conf".format(self._WEBDAV_PATH, self.site_name) |
| if os.path.exists(auth_pass_file): |
| os.remove(auth_pass_file) |
|
|
| if os.path.exists(dir_auth_file): |
| os.remove(dir_auth_file) |
|
|
| if os.path.exists(webdav_conf_file): |
| os.remove(webdav_conf_file) |
| public.serviceReload() |
|
|
| def close(self): |
| webdav_conf_file = "{}/{}.conf".format(self._WEBDAV_PATH, self.site_name) |
| if os.path.exists(webdav_conf_file): |
| os.remove(webdav_conf_file) |
| public.serviceReload() |
| self.status = False |
|
|
| def open(self) -> bool: |
| if not self.set_config(): |
| return False |
| self.status = True |
| return True |
|
|
| def set_auth(self, auth_name: str, auth_value: str) -> Tuple[bool, str]: |
| if not auth_name or not auth_value: |
| return False, "验证名称或密码不能为空" |
| |
| if not re.match(r'^[A-Za-z0-9!@#$%^&*\-_]+$', auth_value): |
| return False, "密码中只能包含字母(大小写)、数字、英文的常见符号(如 !, @, #, $, %, ^, &, *, -, _)。" |
| self.auth = { |
| "auth_name": auth_name, |
| "auth_value": auth_value |
| } |
|
|
| self._set_auth() |
| return True, "添加成功" |
|
|
| def remove_auth(self) -> Tuple[bool, str]: |
|
|
| self.auth = None |
|
|
| auth_pass_file = "{}/{}.pass".format(self._WEBDAV_AUTH_PASS_PATH, self.site_name) |
| dir_auth_file = "{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) |
|
|
| if os.path.exists(auth_pass_file): |
| os.remove(auth_pass_file) |
| public.writeFile(dir_auth_file, "") |
|
|
| return True, "删除成功" |
|
|
| def to_conf(self): |
| return { |
| "domain": self.domain, |
| "site_name": self.site_name, |
| "site_path": self.site_path, |
| "port": self.port, |
| "status": self.status, |
| "auth": self.auth, |
| "client_max_body_size": self.client_max_body_size |
| } |
|
|
| def _set_auth(self): |
| auth_pass_file = "{}/{}.pass".format(self._WEBDAV_AUTH_PASS_PATH, self.site_name) |
| if not os.path.exists(self._WEBDAV_AUTH_PASS_PATH): |
| os.makedirs(self._WEBDAV_AUTH_PASS_PATH, 0o755) |
| dir_auth_path = "{}/{}".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) |
| if not os.path.exists(dir_auth_path): |
| os.makedirs(dir_auth_path, 0o755) |
|
|
| auth_conf = """ |
| #AUTH_START |
| auth_basic "Authorization"; |
| auth_basic_user_file /www/server/pass/webdav_auth/{site_name}.pass; |
| #AUTH_END |
| """.format( |
| site_name=self.site_name, |
| ) |
| pass_str = "{}:{}".format(self.auth["auth_name"], public.hasPwd(self.auth["auth_value"])) |
| public.writeFile(auth_pass_file, pass_str) |
| public.writeFile("{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name), auth_conf) |
|
|
|
|
| class WebDavManager: |
| _CONF_FILE = "{}/data/nginx_webdav.conf".format(public.get_panel_path()) |
| _WEBDAV_PATH = "{}/vhost/nginx/webdav".format(public.get_panel_path()) |
| _WEBDAV_DIR_AUTH_PATH = "{}/vhost/nginx/webdav/dir_auth/".format(public.get_panel_path()) |
|
|
| def __init__(self): |
| self._config = None |
|
|
| @property |
| def config(self) -> Dict: |
| if self._config is not None: |
| return self._config |
| default_config = dict() |
| try: |
| self._config = json.loads(public.readFile(self._CONF_FILE)) |
| except (json.JSONDecodeError, TypeError): |
| pass |
| if isinstance(self._config, dict): |
| return self._config |
| self._config = default_config |
| return self._config |
|
|
| def save_config(self): |
| public.writeFile(self._CONF_FILE, json.dumps(self.config)) |
|
|
| |
| @classmethod |
| def check_webdav_conf(cls): |
| if not os.path.isdir(cls._WEBDAV_PATH): |
| os.makedirs(cls._WEBDAV_PATH, 0o600) |
| webdav_log_path = "/www/wwwlogs/webdav" |
| if not os.path.exists(webdav_log_path): |
| os.makedirs(webdav_log_path, 0o700) |
| public.ExecShell("chown www:www {}".format(webdav_log_path)) |
| nginx_conf_path = "/www/server/nginx/conf/nginx.conf" |
| nginx_conf = public.readFile(nginx_conf_path) |
| if not isinstance(nginx_conf, str): |
| raise ValueError("Nginx主配置文件丢失,无法操作") |
| if nginx_conf.find("/www/server/panel/vhost/nginx/webdav/*.conf;") == -1: |
| rep = re.compile("include +/www/server/panel/vhost/nginx/\*\.conf;\n") |
| sun_conf_str = ( |
| "include /www/server/panel/vhost/nginx/*.conf;\n" |
| "include /www/server/panel/vhost/nginx/webdav/*.conf;\n" |
| ) |
| new_conf = rep.sub(sun_conf_str, nginx_conf, 1) |
| public.writeFile(nginx_conf_path, new_conf) |
| if public.checkWebConfig() is not True: |
| public.writeFile(nginx_conf_path, nginx_conf) |
| raise ValueError("配置文件存在错误,无法操作") |
| return None |
|
|
| |
| @staticmethod |
| def check_webdav_model(): |
| out, _ = public.ExecShell("nginx -V 2>&1 | grep 'nginx-dav-ext-module'") |
| if out == "": |
| raise ValueError("当前的nginx未编译WebDav模块,请重新安装") |
|
|
| return None |
|
|
| @staticmethod |
| def check_domain(domain: str, port: str) -> bool: |
| try: |
| int_port = int(port) |
| if int_port < 1 or int_port > 65535: |
| return False |
| except ValueError: |
| return False |
|
|
| return public.is_domain(domain) |
|
|
| @staticmethod |
| def check_domain_exists(domain: str, port: str) -> bool: |
| res = public.M('domain').where("name=? and port=?", (domain, int(port))).select() |
| if isinstance(res, str): |
| return False |
| if isinstance(res, list) and len(res) > 0: |
| return True |
| return False |
|
|
| def get_web_dav(self, site_name) -> Optional[WebDav]: |
| if site_name not in self.config: |
| return None |
| info = self.config[site_name] |
| return WebDav( |
| site_name=info["site_name"], |
| site_path=info["site_path"], |
| domain=info["domain"], |
| port=info["port"], |
| auth=info.get("auth", None), |
| status=info["status"], |
| client_max_body_size=info.get("client_max_body_size", 50), |
| ) |
|
|
| def _check_env(self) -> Tuple[bool, str]: |
| try: |
| self.check_webdav_model() |
| self.check_webdav_conf() |
| except ValueError as e: |
| return False, str(e) |
| return True, "" |
|
|
| def add_web_dav(self, site_id, domain: str, port: str,client_max_body_size:int=50) -> Tuple[bool, str]: |
| site_info = public.M("sites").where('id=?', (site_id,)).field('id,path,name').find() |
| if isinstance(site_info, str): |
| return False, "网站查询错误" |
|
|
| flag, err = self._check_env() |
| if not flag: |
| return False, err |
|
|
| if not isinstance(site_info, dict): |
| return False, "没有该网站" |
|
|
| if not self.check_domain(domain, port): |
| return False, "域名错误" |
|
|
| if not self.check_domain(domain, port): |
| return False, "域名错误" |
|
|
| if self.check_domain_exists(domain, port): |
| return False, "域名已存在" |
|
|
| webdav = WebDav( |
| site_name=site_info["name"], |
| site_path=site_info["path"], |
| domain=domain, |
| port=port, |
| auth=None, |
| status=True, |
| client_max_body_size=client_max_body_size |
| ) |
|
|
| flag = webdav.set_config() |
| if not flag: |
| return False, "添加失败" |
|
|
| self.config[webdav.site_name] = webdav.to_conf() |
| self.save_config() |
| |
| return True, "添加成功" |
|
|
| def remove_web_dav(self, site_name) -> Tuple[bool, str]: |
| webdav = self.get_web_dav(site_name) |
| if webdav: |
| webdav.remove() |
| del self.config[site_name] |
| self.save_config() |
| return True, "删除成功" |
|
|
| def set_web_dav(self, site_name, option) -> Tuple[bool, str]: |
| webdav = self.get_web_dav(site_name) |
| if webdav: |
| if option in ("close", "closed", "0"): |
| webdav.close() |
| self.config[site_name] = webdav.to_conf() |
| self.save_config() |
| return True, "关闭成功" |
| else: |
| if not webdav.open(): |
| return False, "开启失败" |
| else: |
| self.config[site_name] = webdav.to_conf() |
| self.save_config() |
| return True, "开启成功" |
| else: |
| return False, "没有指定的WebDav配置" |
|
|
|
|
| class main: |
|
|
| @staticmethod |
| def get_webdav_conf(get): |
| try: |
| site_name = get.site_name.strip() |
| except AttributeError: |
| return public.returnMsg(False, "参数错误") |
|
|
| webdav = WebDavManager().get_web_dav(site_name) |
| if not webdav: |
| return {"need_create": True} |
|
|
| data = webdav.to_conf() |
| data["need_create"] = False |
| return data |
|
|
| @staticmethod |
| def add_webdav(get): |
| try: |
| |
| |
| if not hasattr(get, 'client_max_body_size'): |
| get.client_max_body_size = 50 |
| client_max_body_size = int(get.client_max_body_size) |
| |
| site_id = int(get.site_id.strip()) |
| tmp_domain = get.domain.strip() |
| except (AttributeError, ValueError): |
| return public.returnMsg(False, "参数错误") |
| tmp = tmp_domain.split(":") |
| if len(tmp) == 1 or tmp[1] == "": |
| domain = tmp[0] |
| port = "80" |
| else: |
| domain = tmp[0] |
| port = tmp[1] |
|
|
| return public.returnMsg(*WebDavManager().add_web_dav(site_id, domain, port,client_max_body_size)) |
|
|
| @staticmethod |
| def set_webdav(get): |
| try: |
| site_name = get.site_name.strip() |
| option = get.option.strip() |
| except (AttributeError, ValueError): |
| return public.returnMsg(False, "参数错误") |
|
|
| return public.returnMsg(*WebDavManager().set_web_dav(site_name, option)) |
|
|
| @staticmethod |
| def remove_webdav(get): |
| try: |
| site_name = get.site_name.strip() |
| except (AttributeError, ValueError): |
| return public.returnMsg(False, "参数错误") |
|
|
| return public.returnMsg(*WebDavManager().remove_web_dav(site_name)) |
|
|
| @staticmethod |
| def set_client_max_body_size(get): |
| try: |
| try: |
| site_name = get.site_name.strip() |
| client_max_body_size = int(get.client_max_body_size.strip()) |
| |
| except (AttributeError, ValueError): |
| return public.returnMsg(False, "参数错误") |
|
|
|
|
| manager = WebDavManager() |
| webdav = manager.get_web_dav(site_name) |
| if not webdav: |
| return public.returnMsg(False, "没有指定的WebDav配置") |
| webdav.client_max_body_size = client_max_body_size |
| manager.config[site_name] = webdav.to_conf() |
| manager.save_config() |
| webdav.set_config() |
| public.serviceReload() |
|
|
| return public.returnMsg(True, "设置成功") |
| except Exception as e: |
| return public.returnMsg(False, str(e)) |
| @staticmethod |
| def set_auth(get): |
| try: |
| site_name = get.site_name.strip() |
| auth_name = get.auth_name.strip() |
| auth_value = get.auth_value.strip() |
| except (AttributeError, ValueError): |
| return public.returnMsg(False, "参数错误") |
|
|
| if len(auth_value) < 3: |
| return public.returnMsg(False, '密码不能少于3位') |
| if len(auth_value) > 8: |
| return public.returnMsg(False, '密码不能大于8位,超过8位的部分无法验证。') |
| if re.search('\s', auth_value): |
| return public.returnMsg(False, '密码不能存在空格') |
|
|
| manager = WebDavManager() |
| webdav = manager.get_web_dav(site_name) |
| if not webdav: |
| return public.returnMsg(False, "没有指定的WebDav配置") |
| flag, msg = webdav.set_auth( |
| auth_name=auth_name, |
| auth_value=auth_value, |
| ) |
| if flag: |
| manager.config[site_name] = webdav.to_conf() |
| manager.save_config() |
| public.serviceReload() |
|
|
| return public.returnMsg(flag, msg) |
|
|
| @staticmethod |
| def remove_auth(get): |
| try: |
| site_name = get.site_name.strip() |
| except (AttributeError, ValueError): |
| return public.returnMsg(False, "参数错误") |
|
|
| manager = WebDavManager() |
| webdav = manager.get_web_dav(site_name) |
| if not webdav: |
| return public.returnMsg(False, "没有指定的WebDav配置") |
| flag, msg = webdav.remove_auth() |
| if flag: |
| manager.config[site_name] = webdav.to_conf() |
| manager.save_config() |
| public.serviceReload() |
|
|
| return public.returnMsg(flag, msg) |
|
|
|
|
|
|