# coding: utf-8 # ------------------------------------------------------------------- # 宝塔Linux面板 # ------------------------------------------------------------------- # Copyright (c) 2015-2017 宝塔软件(http:#bt.cn) All rights reserved. # ------------------------------------------------------------------- # Author: baozi # ------------------------------------------------------------------- # webdav 管理器 # ------------------------------ import json import os.path import re import shutil import sys from typing import Dict, Optional, List, Tuple os.chdir("/www/server/panel") if "class/" not in sys.path: sys.path.insert(0, "class/") import public def is_ipv6() -> bool: return os.path.exists('/www/server/panel/data/ipv6.pl') class WebDav: _WEBDAV_PATH = "{}/vhost/nginx/webdav".format(public.get_panel_path()) _WEBDAV_DIR_AUTH_PATH = "{}/vhost/nginx/webdav/dir_auth/".format(public.get_panel_path()) _WEBDAV_AUTH_PASS_PATH = "/www/server/pass/webdav_auth" _TEMPLATE = """server {{ listen {listen_port};{listen_ipv6} server_name {domain}; location / {{ # DIRECTORY-AUTH-START 身份认证相关配置,请勿删除 include /www/server/panel/vhost/nginx/webdav/dir_auth/{site_name}.conf; # DIRECTORY-AUTH-END root {site_path}; #注意修改成自己的目录 client_max_body_size {client_max_body_size}M; #大文件支持 autoindex on; dav_methods PUT DELETE MKCOL COPY MOVE; # 需要 nginx-dav-ext-module 才有下面的选项 dav_ext_methods PROPFIND OPTIONS LOCK UNLOCK; create_full_put_path on; }} access_log /www/wwwlogs/webdav/{site_name}.log; error_log /www/wwwlogs/webdav/{site_name}.error.log; }} """ def __init__(self, site_name: str, site_path: str, domain: str, port: str, status: bool, auth: Optional[Dict],client_max_body_size:int = 50): self.domain = domain self.site_name = site_name self.site_path = site_path self.port = port self.auth = auth self.status = status self.client_max_body_size=client_max_body_size # auths: Dict[str, str] = { # "name": "aaa", # "path": "/", # "auth_name": "aaaa", # "auth_value": "pass" # } def set_config(self) -> bool: listen_ipv6 = '' if is_ipv6(): listen_ipv6 = "\n listen [::]:%s;" % self.port new_config = self._TEMPLATE.format( listen_port=self.port, listen_ipv6=listen_ipv6, domain=self.domain, site_name=self.site_name, site_path=self.site_path, client_max_body_size=self.client_max_body_size ) webdav_dir_auth_path = "{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) if not os.path.exists(self._WEBDAV_DIR_AUTH_PATH): os.makedirs(self._WEBDAV_DIR_AUTH_PATH, 0o600) webdav_conf_path = "{}/{}.conf".format(self._WEBDAV_PATH, self.site_name) if not os.path.isfile(webdav_dir_auth_path): public.writeFile(webdav_dir_auth_path, "") webdav_log_path = "/www/wwwlogs/webdav" if not os.path.exists(webdav_log_path): os.makedirs(webdav_log_path, 0o700) public.ExecShell("chown www:www {}".format(webdav_log_path)) public.writeFile(webdav_conf_path, new_config) if public.checkWebConfig() is not True: os.remove(webdav_conf_path) return False public.serviceReload() return True def remove(self): auth_pass_file = "{}/{}.pass".format(self._WEBDAV_AUTH_PASS_PATH, self.site_name) dir_auth_file = "{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) webdav_conf_file = "{}/{}.conf".format(self._WEBDAV_PATH, self.site_name) if os.path.exists(auth_pass_file): os.remove(auth_pass_file) if os.path.exists(dir_auth_file): os.remove(dir_auth_file) if os.path.exists(webdav_conf_file): os.remove(webdav_conf_file) public.serviceReload() def close(self): webdav_conf_file = "{}/{}.conf".format(self._WEBDAV_PATH, self.site_name) if os.path.exists(webdav_conf_file): os.remove(webdav_conf_file) public.serviceReload() self.status = False def open(self) -> bool: if not self.set_config(): return False self.status = True return True def set_auth(self, auth_name: str, auth_value: str) -> Tuple[bool, str]: if not auth_name or not auth_value: return False, "验证名称或密码不能为空" # 允许的字符:字母(大小写)、数字、常见符号 if not re.match(r'^[A-Za-z0-9!@#$%^&*\-_]+$', auth_value): return False, "密码中只能包含字母(大小写)、数字、英文的常见符号(如 !, @, #, $, %, ^, &, *, -, _)。" self.auth = { "auth_name": auth_name, "auth_value": auth_value } self._set_auth() return True, "添加成功" def remove_auth(self) -> Tuple[bool, str]: self.auth = None auth_pass_file = "{}/{}.pass".format(self._WEBDAV_AUTH_PASS_PATH, self.site_name) dir_auth_file = "{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) if os.path.exists(auth_pass_file): os.remove(auth_pass_file) public.writeFile(dir_auth_file, "") return True, "删除成功" def to_conf(self): return { "domain": self.domain, "site_name": self.site_name, "site_path": self.site_path, "port": self.port, "status": self.status, "auth": self.auth, "client_max_body_size": self.client_max_body_size } def _set_auth(self): auth_pass_file = "{}/{}.pass".format(self._WEBDAV_AUTH_PASS_PATH, self.site_name) if not os.path.exists(self._WEBDAV_AUTH_PASS_PATH): os.makedirs(self._WEBDAV_AUTH_PASS_PATH, 0o755) dir_auth_path = "{}/{}".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name) if not os.path.exists(dir_auth_path): os.makedirs(dir_auth_path, 0o755) auth_conf = """ #AUTH_START auth_basic "Authorization"; auth_basic_user_file /www/server/pass/webdav_auth/{site_name}.pass; #AUTH_END """.format( site_name=self.site_name, ) pass_str = "{}:{}".format(self.auth["auth_name"], public.hasPwd(self.auth["auth_value"])) public.writeFile(auth_pass_file, pass_str) public.writeFile("{}/{}.conf".format(self._WEBDAV_DIR_AUTH_PATH, self.site_name), auth_conf) class WebDavManager: _CONF_FILE = "{}/data/nginx_webdav.conf".format(public.get_panel_path()) _WEBDAV_PATH = "{}/vhost/nginx/webdav".format(public.get_panel_path()) _WEBDAV_DIR_AUTH_PATH = "{}/vhost/nginx/webdav/dir_auth/".format(public.get_panel_path()) def __init__(self): self._config = None @property def config(self) -> Dict: if self._config is not None: return self._config default_config = dict() try: self._config = json.loads(public.readFile(self._CONF_FILE)) except (json.JSONDecodeError, TypeError): pass if isinstance(self._config, dict): return self._config self._config = default_config return self._config def save_config(self): public.writeFile(self._CONF_FILE, json.dumps(self.config)) # 检查webdav的引入是否在主配置文件中 @classmethod def check_webdav_conf(cls): if not os.path.isdir(cls._WEBDAV_PATH): os.makedirs(cls._WEBDAV_PATH, 0o600) webdav_log_path = "/www/wwwlogs/webdav" if not os.path.exists(webdav_log_path): os.makedirs(webdav_log_path, 0o700) public.ExecShell("chown www:www {}".format(webdav_log_path)) nginx_conf_path = "/www/server/nginx/conf/nginx.conf" nginx_conf = public.readFile(nginx_conf_path) if not isinstance(nginx_conf, str): raise ValueError("Nginx主配置文件丢失,无法操作") if nginx_conf.find("/www/server/panel/vhost/nginx/webdav/*.conf;") == -1: rep = re.compile("include +/www/server/panel/vhost/nginx/\*\.conf;\n") sun_conf_str = ( "include /www/server/panel/vhost/nginx/*.conf;\n" "include /www/server/panel/vhost/nginx/webdav/*.conf;\n" ) new_conf = rep.sub(sun_conf_str, nginx_conf, 1) public.writeFile(nginx_conf_path, new_conf) if public.checkWebConfig() is not True: public.writeFile(nginx_conf_path, nginx_conf) raise ValueError("配置文件存在错误,无法操作") return None # 检查用户的nginx是否存在dav模块 @staticmethod def check_webdav_model(): out, _ = public.ExecShell("nginx -V 2>&1 | grep 'nginx-dav-ext-module'") if out == "": raise ValueError("当前的nginx未编译WebDav模块,请重新安装") return None @staticmethod def check_domain(domain: str, port: str) -> bool: try: int_port = int(port) if int_port < 1 or int_port > 65535: return False except ValueError: return False return public.is_domain(domain) @staticmethod def check_domain_exists(domain: str, port: str) -> bool: res = public.M('domain').where("name=? and port=?", (domain, int(port))).select() if isinstance(res, str): return False if isinstance(res, list) and len(res) > 0: return True return False def get_web_dav(self, site_name) -> Optional[WebDav]: if site_name not in self.config: return None info = self.config[site_name] return WebDav( site_name=info["site_name"], site_path=info["site_path"], domain=info["domain"], port=info["port"], auth=info.get("auth", None), status=info["status"], client_max_body_size=info.get("client_max_body_size", 50), ) def _check_env(self) -> Tuple[bool, str]: try: self.check_webdav_model() self.check_webdav_conf() except ValueError as e: return False, str(e) return True, "" def add_web_dav(self, site_id, domain: str, port: str,client_max_body_size:int=50) -> Tuple[bool, str]: site_info = public.M("sites").where('id=?', (site_id,)).field('id,path,name').find() if isinstance(site_info, str): return False, "网站查询错误" flag, err = self._check_env() if not flag: return False, err if not isinstance(site_info, dict): return False, "没有该网站" if not self.check_domain(domain, port): return False, "域名错误" if not self.check_domain(domain, port): return False, "域名错误" if self.check_domain_exists(domain, port): return False, "域名已存在" webdav = WebDav( site_name=site_info["name"], site_path=site_info["path"], domain=domain, port=port, auth=None, status=True, client_max_body_size=client_max_body_size ) flag = webdav.set_config() if not flag: return False, "添加失败" self.config[webdav.site_name] = webdav.to_conf() self.save_config() return True, "添加成功" def remove_web_dav(self, site_name) -> Tuple[bool, str]: webdav = self.get_web_dav(site_name) if webdav: webdav.remove() del self.config[site_name] self.save_config() return True, "删除成功" def set_web_dav(self, site_name, option) -> Tuple[bool, str]: webdav = self.get_web_dav(site_name) if webdav: if option in ("close", "closed", "0"): webdav.close() self.config[site_name] = webdav.to_conf() self.save_config() return True, "关闭成功" else: if not webdav.open(): return False, "开启失败" else: self.config[site_name] = webdav.to_conf() self.save_config() return True, "开启成功" else: return False, "没有指定的WebDav配置" class main: @staticmethod def get_webdav_conf(get): try: site_name = get.site_name.strip() except AttributeError: return public.returnMsg(False, "参数错误") webdav = WebDavManager().get_web_dav(site_name) if not webdav: return {"need_create": True} data = webdav.to_conf() data["need_create"] = False return data @staticmethod def add_webdav(get): try: # 确保client_max_body_size存在并且可以转换为整数 if not hasattr(get, 'client_max_body_size'): get.client_max_body_size = 50 client_max_body_size = int(get.client_max_body_size) site_id = int(get.site_id.strip()) tmp_domain = get.domain.strip() except (AttributeError, ValueError): return public.returnMsg(False, "参数错误") tmp = tmp_domain.split(":") if len(tmp) == 1 or tmp[1] == "": domain = tmp[0] port = "80" else: domain = tmp[0] port = tmp[1] return public.returnMsg(*WebDavManager().add_web_dav(site_id, domain, port,client_max_body_size)) @staticmethod def set_webdav(get): try: site_name = get.site_name.strip() option = get.option.strip() except (AttributeError, ValueError): return public.returnMsg(False, "参数错误") return public.returnMsg(*WebDavManager().set_web_dav(site_name, option)) @staticmethod def remove_webdav(get): try: site_name = get.site_name.strip() except (AttributeError, ValueError): return public.returnMsg(False, "参数错误") return public.returnMsg(*WebDavManager().remove_web_dav(site_name)) @staticmethod def set_client_max_body_size(get): try: try: site_name = get.site_name.strip() client_max_body_size = int(get.client_max_body_size.strip()) # 获取上传大小限制 except (AttributeError, ValueError): return public.returnMsg(False, "参数错误") manager = WebDavManager() webdav = manager.get_web_dav(site_name) if not webdav: return public.returnMsg(False, "没有指定的WebDav配置") webdav.client_max_body_size = client_max_body_size manager.config[site_name] = webdav.to_conf() manager.save_config() webdav.set_config() public.serviceReload() return public.returnMsg(True, "设置成功") except Exception as e: return public.returnMsg(False, str(e)) @staticmethod def set_auth(get): try: site_name = get.site_name.strip() auth_name = get.auth_name.strip() auth_value = get.auth_value.strip() except (AttributeError, ValueError): return public.returnMsg(False, "参数错误") if len(auth_value) < 3: return public.returnMsg(False, '密码不能少于3位') if len(auth_value) > 8: return public.returnMsg(False, '密码不能大于8位,超过8位的部分无法验证。') if re.search('\s', auth_value): return public.returnMsg(False, '密码不能存在空格') manager = WebDavManager() webdav = manager.get_web_dav(site_name) if not webdav: return public.returnMsg(False, "没有指定的WebDav配置") flag, msg = webdav.set_auth( auth_name=auth_name, auth_value=auth_value, ) if flag: manager.config[site_name] = webdav.to_conf() manager.save_config() public.serviceReload() return public.returnMsg(flag, msg) @staticmethod def remove_auth(get): try: site_name = get.site_name.strip() except (AttributeError, ValueError): return public.returnMsg(False, "参数错误") manager = WebDavManager() webdav = manager.get_web_dav(site_name) if not webdav: return public.returnMsg(False, "没有指定的WebDav配置") flag, msg = webdav.remove_auth() if flag: manager.config[site_name] = webdav.to_conf() manager.save_config() public.serviceReload() return public.returnMsg(flag, msg)