import collections import contextlib import json import os import re import sys import time import pymysql import werkzeug os.chdir("/www/server/panel") if 'class/' not in sys.path: sys.path.insert(0, 'class/') from public import * aap_t_simple_result = collections.namedtuple('aap_t_simple_result', ['success', 'msg']) aap_t_http_multipart = collections.namedtuple('aap_t_http_multipart', ['headers', 'body']) aap_t_mysql_dump_info = collections.namedtuple('aap_t_mysql_dump_info', ['db_name', 'file', 'dump_time']) search_sql_special_chars = re.compile(r'''(? @param res 响应数据 @param format_args 响应文本提示时的format参数 @return dict """ if isinstance(res, str): res = get_msg_gettext(res, format_args) return returnMsg(True, res) def fail_v2(res, format_args=()): if isinstance(res, str): res = get_msg_gettext(res, format_args) return returnMsg(False, res) def return_message(status, message, data=None): if status in(0, "0"): status = True elif status in(-1, "-1"): status = False return returnMsg(status, data) def lang(msg, *args): return msg.format(*args) def make_panel_tmp_path() -> str: tmp_path = '{}/temp/tmp_{}_{}'.format(get_panel_path(), int(time.time()), GetRandomString(32)) if not os.path.exists(tmp_path): os.makedirs(tmp_path, 0o755) return tmp_path # 创建临时目录(使用上下文管理器) @contextlib.contextmanager def make_panel_tmp_path_with_context(): tmp_path = make_panel_tmp_path() import shutil try: yield tmp_path finally: # 删除临时目录 shutil.rmtree(tmp_path) def back_file(file, act=None): """ @name 备份配置文件 @author zhwen @param file 需要备份的文件 @param act 如果存在,则备份一份作为默认配置 """ file_type = "_bak" if act: file_type = "_def" ExecShell("/usr/bin/cp -p {0} {1}".format(file, file + file_type)) def OfficialApiBase(): return 'https://www.aapanel.com' class HintException(Exception): pass class PanelError(Exception): ''' @name 面板通用异常对像 @author hwliang<2021-06-25> ''' def __init__(self, value): self.value = value def __str__(self): return ("An error occurred while the panel was running: {}".format(str(self.value))) def is_number(s) -> bool: """ @name 判断输入参数是否一个数字 @author Zhj<2022-07-18> @param s 输入参数 @return bool """ try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False class MysqlConn: # def __init__(self, db=None, user='root', password=None, port=3306, host='localhost'): # if user == 'root': # password = M('config').where('id=?', (1,)).getField('mysql_root') # self.conn = pymysql.connect(user=user, password=password, db=db, port=port, host=host) # # def find(self, sql): # cursor = self.conn.cursor() # cursor.execute(sql) # result = cursor.fetchall() # cursor.close() # return result def __init__(self, db_name=None, db_user: str = 'root', db_pwd=None, db_host: str = 'localhost'): self.__CONN = None self.__DB_NAME = db_name self.__HOST = db_host self.__PORT = 3306 self.__USERNAME = db_user self.__PASSWORD = db_pwd self.__CHARSET = 'utf8mb4' self.__CONNECT_TIMEOUT = 10 self.__UNIX_SOCK = None def __enter__(self): if self.__CONN: return self if self.__HOST in ('localhost', '127.0.0.1'): self.__UNIX_SOCK = '/tmp/mysql.sock' self.__CONNECT_TIMEOUT = 1 myconf = readFile('/etc/my.cnf') m = re.search(r"socket\s*=\s*(.+)", myconf) if m: self.__UNIX_SOCK = m.group(1) m = re.search(r"port\s*=\s*([0-9]+)", myconf) if m: self.__PORT = int(m.group(1)) if self.__USERNAME == 'root': self.__PASSWORD = M('config').where('id=?', (1,)).getField('mysql_root') import pymysql try: self.__CONN = pymysql.connect(host=self.__HOST, user=self.__USERNAME, passwd=self.__PASSWORD, port=self.__PORT, charset=self.__CHARSET, database=self.__DB_NAME, connect_timeout=self.__CONNECT_TIMEOUT, cursorclass=pymysql.cursors.DictCursor, unix_socket=self.__UNIX_SOCK) except pymysql.Error: if self.__HOST == 'localhost': self.__HOST = '127.0.0.1' self.__CONN = pymysql.connect(host=self.__HOST, user=self.__USERNAME, passwd=self.__PASSWORD, port=self.__PORT, charset=self.__CHARSET, database=self.__DB_NAME, connect_timeout=self.__CONNECT_TIMEOUT, cursorclass=pymysql.cursors.DictCursor, unix_socket=self.__UNIX_SOCK) raise return self def __del__(self): if self.__CONN: self.__CONN.close() self.__CONN = None def __exit__(self, exc_type, exc_val, exc_tb): self.__CONN.close() self.__CONN = None def query(self, sql): cur = self.__CONN.cursor() try: row_count = cur.execute(sql) if row_count == 0: return [] return cur.fetchall() finally: cur.close() # 查询单条 def find(self, sql): ret = self.query(sql) if len(ret) == 0: return None return ret[0] # 执行SQL def execute(self, sql): cur = self.__CONN.cursor() try: row_count = cur.execute(sql) self.__CONN.commit() return row_count finally: cur.close() import copy import re import json import socket import os import sys import typing import collections from types import MethodType os.chdir("/www/server/panel") if 'class/' not in sys.path: sys.path.insert(0, 'class/') import public re_key_match = re.compile(r'^[\w\s\[\]\-.]+$') re_key_match2 = re.compile(r'^\.?__[\w\s[\]\-]+__\.?$') key_filter_list = ['get', 'set', 'get_items', 'exists', '__contains__', '__setitem__', '__getitem__', '__delitem__', '__delattr__', '__setattr__', '__getattr__', '__class__', 'get_file'] # 匹配IP地址 match_ipv4 = re.compile(r'^(?:(?:25[0-5]|(?:2[0-4]|1?\d)?\d)\.){3}(?:25[0-5]|(?:2[0-4]|1?\d)?\d)$') match_ipv6 = re.compile(r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4})|(?:(?:[0-9a-fA-F]{1,4}:){1,7}:)|(?:(?:[0-9a-fA-F]{1,4}:){6}:[0-9a-fA-F]{1,4})|(?:(?:[0-9a-fA-F]{1,4}:){5}(?::[0-9a-fA-F]{1,4}){1,2})|(?:(?:[0-9a-fA-F]{1,4}:){4}(?::[0-9a-fA-F]{1,4}){1,3})|(?:(?:[0-9a-fA-F]{1,4}:){3}(?::[0-9a-fA-F]{1,4}){1,4})|(?:(?:[0-9a-fA-F]{1,4}:){2}(?::[0-9a-fA-F]{1,4}){1,5})|(?:(?:[0-9a-fA-F]{1,4}:){1}(?::[0-9a-fA-F]{1,4}){1,6})|(?::(?:(?::[0-9a-fA-F]{1,4}){1,7}|:))') # 安全文件路径 match_safe_path = re.compile(r'^[\w\s./\-]*$') # HOST基本格式 match_based_host = re.compile(r'^[\w.:\-]+$') # 匹配类私有属性名称 match_class_private_property = re.compile(r'^(?:_\w+)?__\w+') # 通用版本号格式验证 major.minor[.patch]/主版本.子版本[.修订号] match_general_version_format = re.compile(r'^\d+(?:\.\d+){1,2}$') # md5格式验证 match_md5_format = re.compile(r'^[a-fA-F0-9]{32}$') aap_t_simple_result = collections.namedtuple('aap_t_simple_result', ['success', 'msg']) class HintException(Exception): pass class Param: __VALIDATE_OPTS = [ '>', '<', '>=', '<=', '=', 'in', 'not in', ] def __init__(self, name: str): self.name: str = name self.__validate_rules: typing.List[_ValidateRule] = [] self.__filters: typing.List[callable] = [] # 验证器 Begin -----> def Require(self): """ 必选参数 @return: self """ self.__validate_rules.append(_RequireValidation(self.name)) return self def Date(self): """ 日期字符串 @return: self """ self.__validate_rules.append(_DateValidation(self.name)) return self def Timestamp(self): """ Unix时间戳 @return: self """ self.__validate_rules.append(_TimestampValidation(self.name)) return self def Url(self): """ URL @return: self """ self.__validate_rules.append(_UrlValidation(self.name)) return self def Ip(self): """ IP地址 @return: self """ self.__validate_rules.append(_IpValidation(self.name)) return self def Ipv4(self): """ IPv4地址 @return: self """ self.__validate_rules.append(_Ipv4Validation(self.name)) return self def Ipv6(self): """ IPv6地址 @return: self """ self.__validate_rules.append(_Ipv6Validation(self.name)) return self def Host(self): """ 主机地址(可以包含端口号) @return: self """ self.__validate_rules.append(_HostValidation(self.name)) return self def Port(self): """ 端口号 @return: self """ return self.Integer('between', [1, 65535]) def Json(self): """ JSON字符串 @return: self """ self.__validate_rules.append(_JsonValidation(self.name)) return self def Array(self): """ JSON-Array字符串 @return: self """ self.__validate_rules.append(_ArrayValidation(self.name)) return self def Object(self): """ JSON-Object字符串 @return: self """ self.__validate_rules.append(_ObjectValidation(self.name)) return self def List(self): """ 限制参数数据类型:list @return: self """ self.__validate_rules.append(_ListValidation(self.name)) return self def Tuple(self): """ 限制参数数据类型:tuple @return: self """ self.__validate_rules.append(_TupleValidation(self.name)) return self def Dict(self): """ 限制参数数据类型:dict @return: self """ self.__validate_rules.append(_DictValidation(self.name)) return self def Bool(self): """ 布尔值或boolean字符串 true/false @return: self """ self.__validate_rules.append(_BoolValidation(self.name)) return self def String(self, opt: typing.Optional[str] = None, length_or_list: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): """ 字符串 @param opt: str 运算符 @param length_or_list: int|list[int|str]|None 字符串长度或字符串集合 @return: self """ self.__validate_rules.append(_StringValidation(self.name, opt, length_or_list)) return self def Number(self, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, float, typing.List[typing.Union[int, float]]]] = None): """ 数值 @param opt: str 运算符 @param num: int 数值大小 @return: self """ self.__validate_rules.append(_NumberValidation(self.name, opt, num)) return self def Integer(self, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[typing.Union[int, typing.List[int]]]] = None): """ 整数 @param opt: str 运算符 @param num: int 数值大小 @return: self """ self.__validate_rules.append(_IntegerValidation(self.name, opt, num)) return self def Float(self, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, float, typing.List[typing.Union[int, float]]]] = None): """ 浮点数 @param opt: str 运算符 @param num: int 数值大小 @return: self """ self.__validate_rules.append(_FloatValidation(self.name, opt, num)) return self def Alpha(self, opt: typing.Optional[str] = None, length_or_list: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): """ 纯字母 @param opt: str 运算符 @param length_or_list: int|list[int|str]|None 字符串长度或字符串集合 @return: self """ self.__validate_rules.append(_AlphaValidation(self.name, opt, length_or_list)) return self def Alphanum(self, opt: typing.Optional[str] = None, length_or_list: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): """ 字母+数字 @param opt: str 运算符 @param length_or_list: int|list[int|str]|None 字符串长度或字符串集合 @return: self """ self.__validate_rules.append(_AlphanumValidation(self.name, opt, length_or_list)) return self def Mobile(self): """ (中国)手机号码 @return: self """ self.__validate_rules.append(_MobileValidation(self.name)) return self def Email(self): """ 邮箱地址 @return: self """ self.__validate_rules.append(_EmailValidation(self.name)) return self def Regexp(self, exp: str): """ 正则表达式 @param exp: str 正则表达式 @return: self """ self.__validate_rules.append(_RegexpValidation(self.name, exp)) return self def File(self): """ 文件上传 @return: self """ self.__validate_rules.append(_FileValidation(self.name)) return self def Size(self, opt: typing.Optional[str] = None, size: typing.Optional[typing.Union[int, typing.List[int]]] = None): """ 上传文件大小 @param opt: str 运算符 @param size: int 上传文件大小bytes @return: self """ self.__validate_rules.append(_SizeValidation(self.name, opt, size)) return self def Mime(self, opt: typing.Optional[str] = None, mime_type: typing.Optional[typing.Union[str, typing.List[str]]] = None): """ 上传文件Mimetype @param opt: str 运算符 @param mime_type: str 上传文件Mimetype @return: self """ self.__validate_rules.append(_MimeValidation(self.name, opt, mime_type)) return self def Ext(self, opt: typing.Optional[str] = None, ext: typing.Optional[typing.Union[str, typing.List[str]]] = None): """ 上传文件后缀名 @param opt: str 运算符 @param ext: str 上传文件后缀名 @return: self """ self.__validate_rules.append(_ExtValidation(self.name, opt, ext)) return self def SafePath(self): """ 文件路径 @return: self """ self.__validate_rules.append(_SafePathValidation(self.name)) return self # <------- 验证器 End # 过滤器 Begin ------> def Trim(self): """ 去除字符串两端空白字符 @return: self """ self.__filters.append(lambda x: str(x).strip()) return self def Xss(self): """ XSS过滤 @return: self """ self.__filters.append(_xssencode) return self def Filter(self, f: callable): """ 自定义参数过滤器 @param f: callable func(x: any) -> any @return: self """ self.__filters.append(f) return self # <------- 过滤器 End def do_validate(self, args: dict): """ 执行验证器 @param args: dict 请求参数列表 @return: self """ for v in self.__validate_rules: v.validate(args) return self def do_filter(self, val, extra_filters: typing.Union[typing.List[callable], typing.Tuple[callable]] = ()) -> any: """ 执行参数过滤器 @param val: any @param extra_filters: list[callable]|tuple[callable] @return: any """ from functools import reduce return reduce(lambda x, y: y(x), list(extra_filters) + self.__filters, val) class _ValidateRule: def validate(self, args: dict): raise NotImplementedError('method validate() not implemented.') class _RequireValidation(_ValidateRule): """ 必选参数验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} is required' def validate(self, args: dict): if self.name in args: return if 'FILES' in args and self.name in args['FILES']: return raise HintException(self.errmsg.format(self.name)) class _DateValidation(_ValidateRule): """ 日期字符串验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid datetime' def validate(self, args: dict): if self.name not in args: return if re.match(r'^(?:\d{2}-\d{2}-\d{2}|\d{2}/\d{2}/\d{2})(?: \d{2}:\d{2}(?::\d{2})?)?$', str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) class _TimestampValidation(_ValidateRule): """ Unix时间戳验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid timestamp' def validate(self, args: dict): if self.name not in args: return if re.match(r'^\d{10}$', str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) class _UrlValidation(_ValidateRule): """ URL地址验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid URL' def validate(self, args: dict): if self.name not in args: return regex_obj = re.compile( r'^(?:http|ftp)s?://' r'(?:(?:[A-Z0-9_](?:[A-Z0-9-_]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-_]{2,}\.?)|' r'localhost|' r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' r'(?::\d+)?' r'(?:/?|[/?]\S+)$', re.IGNORECASE) if regex_obj.match(str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) class _IpValidation(_ValidateRule): """ IP地址验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid IP' def validate(self, args: dict): if self.name not in args: return ipstr = str(args[self.name]).strip() if _is_ipv4(ipstr) or _is_ipv6(ipstr): return raise HintException(self.errmsg.format(self.name)) class _Ipv4Validation(_ValidateRule): """ IPv4地址验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid IPv4' def validate(self, args: dict): if self.name not in args: return if _is_ipv4(str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) class _Ipv6Validation(_ValidateRule): """ IPv6地址验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid IPv4' def validate(self, args: dict): if self.name not in args: return if _is_ipv6(str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) class _HostValidation(_ValidateRule): """ 主机地址验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid HOST' def validate(self, args: dict): if self.name not in args: return if match_based_host.match(str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) class _JsonValidation(_ValidateRule): """ JSON字符串验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid JSON' def validate(self, args: dict): if self.name not in args: return try: json.loads(str(args[self.name]).strip()) return except: pass raise HintException(self.errmsg.format(self.name)) class _ArrayValidation(_ValidateRule): """ JSON-Array字符串验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid JSON Array' def validate(self, args: dict): if self.name not in args: return try: obj = json.loads(str(args[self.name]).strip()) if isinstance(obj, list): return except: pass raise HintException(self.errmsg.format(self.name)) class _ObjectValidation(_ValidateRule): """ JSON-Object字符串验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid JSON Object' def validate(self, args: dict): if self.name not in args: return try: obj = json.loads(str(args[self.name]).strip()) if isinstance(obj, dict): return except: pass raise HintException(self.errmsg.format(self.name)) class _BoolValidation(_ValidateRule): """ bool字符串验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} must be bool' def validate(self, args: dict): if self.name not in args: return val = args[self.name] if isinstance(val, bool) or re.match(r'^true|false$', str(args[self.name]).strip(), re.IGNORECASE): return raise HintException(self.errmsg.format(self.name)) class _ListValidation(_ValidateRule): """ list数据类型验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} must be list' def validate(self, args: dict): if self.name not in args: return if isinstance(args[self.name], list): return raise HintException(self.errmsg.format(self.name)) class _TupleValidation(_ValidateRule): """ tuple数据类型验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} must be tuple' def validate(self, args: dict): if self.name not in args: return if isinstance(args[self.name], tuple): return raise HintException(self.errmsg.format(self.name)) class _DictValidation(_ValidateRule): """ dict数据类型验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} must be dict' def validate(self, args: dict): if self.name not in args: return if isinstance(args[self.name], dict): return raise HintException(self.errmsg.format(self.name)) class _OperationHelper: """ 运算辅助类 """ def __init__(self, name: str, opt: typing.Optional[str], operand: typing.Optional[typing.Union[str, int, float, typing.List[typing.Union[str, int, float]]]], data_type): self.name: str = name self.opt = opt self.operand = operand self.data_type = data_type self._opt_check() self._data_type_check() def do(self, val: typing.Union[str, int, float], data_type=None): val = str(val).strip() if data_type is not None: self.data_type = data_type self._data_type_check() if self.opt is None: return if self.operand is None: return if self.opt == '=': self._eq(self._calc_num(val)) elif self.opt == '>': self._gt(self._calc_num(val)) elif self.opt == '>=': self._gte(self._calc_num(val)) elif self.opt == '<': self._lt(self._calc_num(val)) elif self.opt == '<=': self._lte(self._calc_num(val)) elif self.opt == 'between': self._between(self._calc_num(val)) elif self.opt == 'in': self._in(self.data_type(val)) elif self.opt == 'not in': self._not_in(self.data_type(val)) def _calc_num(self, val: str) -> typing.Union[int, float]: if self.data_type is str: return len(val) return self.data_type(val) def _opt_check(self): if self.opt is None: return self.opt = self.opt.lower() if self.operand is None: return if self.opt in ['=', '>', '<', '>=', '<='] and not isinstance(self.operand, int): raise HintException('当运算符opt是 \'{}\' 时,运算数只能是int类型或float类型,当前类型 {}'.format(self.opt, type(self.operand))) if self.opt in ['in', 'not in', 'between'] and not isinstance(self.operand, list): raise HintException('当运算符opt是 \'{}\' 时,运算数只能是list类型,当前类型 {}'.format(self.opt, type(self.operand))) def _data_type_check(self): if self.data_type is str: return if self.data_type is int: return if self.data_type is float: return raise HintException('data_type只能是str、int、float 当前 {}'.format(self.data_type)) def _eq(self, num: typing.Union[int, float]): if num == self.operand: return raise HintException( '{}{} must equal {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand)) def _gt(self, num: typing.Union[int, float]): if num > self.operand: return raise HintException( '{}{} must greater than {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand)) def _gte(self, num: typing.Union[int, float]): if num >= self.operand: return raise HintException( '{}{} must greater than or equal {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand)) def _lt(self, num: typing.Union[int, float]): if num < self.operand: return raise HintException( '{}{} must less than {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand)) def _lte(self, num: typing.Union[int, float]): if num <= self.operand: return raise HintException( '{}{} must less than or equal {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand)) def _between(self, num: typing.Union[int, float]): if len(self.operand) != 2: raise HintException('当运算符opt是 \'between\' 时,运算数只能是list类型,并且list的长度只能是2,当前list长度 {}'.format(len(self.operand))) if num >= self.operand[0] and num <= self.operand[1]: return raise HintException( '{}{} must between {} and {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand[0], self.operand[1])) def _in(self, item: typing.Union[int, float, str]): if len(self.operand) < 1: raise HintException('当运算符opt是 \'{}\' 时,运算数只能是list类型,并且list的长度必须大于0,当前list长度 0'.format(self.opt)) if item in self.operand: return raise HintException('{} must in {}'.format(self.name, self.operand)) def _not_in(self, item: typing.Union[int, float, str]): if len(self.operand) < 1: raise HintException('当运算符opt是 \'{}\' 时,运算数只能是list类型,并且list的长度必须大于0,当前list长度 0'.format(self.opt)) if item in self.operand: raise HintException('{} must not in {}'.format(self.name, self.operand)) class _StringValidation(_ValidateRule): """ 字符串验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, v: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): self.name: str = name self.errmsg: str = '{} must be string' self.op = _OperationHelper(name, opt, v, str) def validate(self, args: dict): if self.name not in args: return s = args[self.name] if isinstance(s, str): self.op.do(s) return raise HintException(self.errmsg.format(self.name)) class _NumberValidation(_ValidateRule): """ 数字验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, float, typing.List[typing.Union[int, float]]]] = None): self.name: str = name self.errmsg: str = '{} must be number' self.op = _OperationHelper(name, opt, num, float) def validate(self, args: dict): if self.name not in args: return num = args[self.name] if _is_number(num): self.op.do(num, _get_number_data_type(num)) return raise HintException(self.errmsg.format(self.name)) class _IntegerValidation(_ValidateRule): """ 整数验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, typing.List[int]]] = None): self.name: str = name self.errmsg: str = '{} must be integer' self.op = _OperationHelper(name, opt, num, int) def validate(self, args: dict): if self.name not in args: return num = args[self.name] if _is_int(num): self.op.do(num) return raise HintException(self.errmsg.format(self.name)) class _FloatValidation(_ValidateRule): """ 浮点数验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[float, typing.List[float]]] = None): self.name: str = name self.errmsg: str = '{} must be float' self.op = _OperationHelper(name, opt, num, float) def validate(self, args: dict): if self.name not in args: return num = args[self.name] if _is_float(num): self.op.do(num) return raise HintException(self.errmsg.format(self.name)) class _AlphaValidation(_ValidateRule): """ 纯字母验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, v: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): self.name: str = name self.errmsg: str = '{} must be alpha' self.op = _OperationHelper(name, opt, v, str) def validate(self, args: dict): if self.name not in args: return s = str(args[self.name]).strip() if re.match(r'^[a-zA-Z]+$', s): self.op.do(s) return raise HintException(self.errmsg.format(self.name)) class _AlphanumValidation(_ValidateRule): """ 字母数字验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, v: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): self.name: str = name self.errmsg: str = '{} must be alphanum' self.op = _OperationHelper(name, opt, v, str) def validate(self, args: dict): if self.name not in args: return s = str(args[self.name]).strip() if re.match(r'^[a-zA-Z0-9]+$', s): self.op.do(s) return raise HintException(self.errmsg.format(self.name)) class _MobileValidation(_ValidateRule): """ (中国)手机号码验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid mobile' def validate(self, args: dict): if self.name not in args: return s = str(args[self.name]).strip() if re.match(r'^1[3-9]\d{9}$', s): return raise HintException(self.errmsg.format(self.name)) class _EmailValidation(_ValidateRule): """ 邮箱地址验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid email' def validate(self, args: dict): if self.name not in args: return s = str(args[self.name]).strip() if re.match(r'^.+@(\[?)[a-zA-Z0-9\-.]+\.(?:[a-zA-Z]{2,}|\d{1,3})\1$', s): return raise HintException(self.errmsg.format(self.name)) class _RegexpValidation(_ValidateRule): """ 正则表达式验证类 """ def __init__(self, name: str, regexp: str): self.name: str = name self.errmsg: str = '{} not success verified by regexp' self.regexp: str = regexp def validate(self, args: dict): if self.name not in args: return s = str(args[self.name]).strip() if re.match(self.regexp, s): return raise HintException(self.errmsg.format(self.name)) class _FileValidation(_ValidateRule): """ 文件上传验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg: str = '{} not valid file' def validate(self, args: dict): if 'FILES' in args and self.name in args['FILES']: return raise HintException(self.errmsg.format(self.name)) class _SizeValidation(_ValidateRule): """ 文件大小验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, size: typing.Optional[typing.Union[int, typing.List[int]]] = None): self.name: str = name self.op = _OperationHelper(name, opt, size, int) def validate(self, args: dict): if 'FILES' not in args or self.name not in args['FILES']: return self.op.do(args['FILES'][self.name].content_length) class _MimeValidation(_ValidateRule): """ 文件mimetype验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, mime_type: typing.Optional[typing.Union[str, typing.List[str]]] = None): self.name: str = name self.op = _OperationHelper(name, opt, mime_type, str) def validate(self, args: dict): if 'FILES' not in args or self.name not in args['FILES']: return self.op.do(args['FILES'][self.name].mimetype) class _ExtValidation(_ValidateRule): """ 文件后缀名验证类 """ def __init__(self, name: str, opt: typing.Optional[str] = None, ext: typing.Optional[typing.Union[str, typing.List[str]]] = None): self.name: str = name self.op = _OperationHelper(name, opt, ext, str) def validate(self, args: dict): if 'FILES' not in args or self.name not in args['FILES']: return f = args['FILES'][self.name] self.op.do(os.path.splitext(f.filename)[-1]) class _SafePathValidation(_ValidateRule): """ 文件路径名验证类 """ def __init__(self, name: str): self.name: str = name self.errmsg = '{} not safe path' def validate(self, args: dict): if self.name not in args: return if _is_safe_path(str(args[self.name]).strip()): return raise HintException(self.errmsg.format(self.name)) def trim_filter() -> callable: """ 获取Trim参数过滤器 @return: callable """ return lambda x: str(x).strip() def xss_filter() -> callable: """ 获取XSS参数过滤器 @return: callable """ return _xssencode def _is_ipv4(ip: str) -> bool: ''' @name 是否是IPV4地址 @author hwliang @param ip IP地址 @return True/False ''' # 验证基本格式 if not match_ipv4.match(ip): return False # 验证每个段是否在合理范围 try: socket.inet_pton(socket.AF_INET, ip) except AttributeError: try: socket.inet_aton(ip) except socket.error: return False except socket.error: return False return True def _is_ipv6(ip: str) -> bool: ''' @name 是否为IPv6地址 @author hwliang @param ip 地址 @return True/False ''' # 验证基本格式 if not match_ipv6.match(ip): return False # 验证IPv6地址 try: socket.inet_pton(socket.AF_INET6, ip) except socket.error: return False return True def _xssencode(text: str) -> str: """ XSS过滤 @param text: str @return bool """ try: from cgi import html list = ['`', '~', '&', '#', '/', '*', '$', '@', '<', '>', '\"', '\'', ';', '%', ',', '.', '\\u'] ret = [] for i in text: if i in list: i = '' ret.append(i) str_convert = ''.join(ret) text2 = html.escape(str_convert, quote=True) return text2 except: return text.replace('&', '&').replace('"', '"').replace('<', '<').replace('>', '>') def _is_safe_path(path: str, force: bool = True) -> bool: """ 文件路径过滤 @param path: str @param force: bool @return: bool """ if len(path) > 256: return False checks = ['..', './', '\\', '%', '$', '^', '&', '*', '~', '"', "'", ';', '|', '{', '}', '`'] for c in checks: if path.find(c) > -1: return False if force: if not match_safe_path.match(path): return False return True def _is_number(s) -> bool: """ @name 判断输入参数是否一个数字 @author Zhj<2022-07-18> @param s 输入参数 @return bool """ try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False def _is_int(s) -> bool: """ 判断输入是否是整数 @param s: any @return bool """ try: int(s) return True except ValueError: pass return False def _is_float(s) -> bool: """ 判断输入是否是浮点数 @param s: any @return bool """ try: float(s) return True except ValueError: pass return False def _get_number_data_type(s): """ 获取数字的数据类型 @param s 输入参数 @return int|float """ try: int(s) return int except ValueError: pass return float # 参数验证器 class Validator: def __init__(self, rules: typing.Union[typing.Tuple[Param], typing.List[Param]], raise_exc: bool = True): self.__RULES = list(rules) self.__RAISE_EXC = raise_exc # 参数格式校验 def check(self, args: dict) -> aap_t_simple_result: try: for v in self.__RULES: v.do_validate(args) except Exception as e: if self.__RAISE_EXC: raise return aap_t_simple_result(False, str(e)) return aap_t_simple_result(True, 'ok') # 参数列表过滤 def filter(self, args: dict) -> typing.Dict: new_args = {} for v in self.__RULES: if v.name not in args: continue new_args = v.do_filter(args[v.name]) return new_args class dict_obj(dict_obj): def __init__(self): super().__init__() # 存放数据 self.__store = {} # 检测数据是否经过校验 self.__validated = set() def __contains__(self, key): return hasattr(self, key) def __setitem__(self, key, value): if key in key_filter_list: raise ValueError("wrong field name") if not re_key_match.match(key) or re_key_match2.match(key): raise ValueError("wrong field name") self.__store[key] = value def __getitem__(self, key): return getattr(self, key) def __delitem__(self, key): delattr(self, key) def __delattr__(self, key): delattr(self, key) def __setattr__(self, key, value): if match_class_private_property.match(key): object.__setattr__(self, key, value) return self.__store[key] = value def __getattr__(self, key): if key in self.__store: # 未经过校验的数据不允许获取 # if key not in self.__validated: # raise ValueError('参数值获取失败:参数 {} 尚未通过校验,请先调用 validate() 完成校验后再尝试重新获取参数值'.format(key)) return self.__store[key] raise AttributeError('\'{}\' object has no attribute \'{}\''.format(self.__class__.__name__, key)) @property def __dict__(self): return self.__store def get_items(self): return self.__store def validate(self, validate_rules: typing.List[Param], filters: typing.List[callable] = (trim_filter(),)) -> None: """ @name 验证请求参数 @param validate_rules: list[validate.Param] 参数验证规则 @param filters: list[callable] 参数过滤器 @raise Error """ filters = list(filters) for v in validate_rules: v.do_validate(self.__store) if v.name in self.__store: self.__store[v.name] = v.do_filter(self.__store[v.name], filters) self.__validated.add(v.name) def set(self, key, value): if not isinstance(value, str) or not isinstance(key, str): return False if key in key_filter_list: raise ValueError("wrong field name") if not re_key_match.match(key) or re_key_match2.match(key): raise ValueError("wrong field name") return setattr(self, key, value) def to_dict_obj(data: dict) -> dict_obj: ''' @name 将dict转换为dict_obj @author hwliang<2021-07-15> @param data 要被转换的数据 @return dict_obj ''' if not isinstance(data, dict): raise returnMsg(False, 'parameter error: only support transform dict to dict_obj.') pdata = dict_obj() for key in data.keys(): pdata[key] = data[key] return pdata def S(table_name: typing.Optional[str] = None, db_name: str = 'default'): from wptoolkitModel.sqlite_easy import Db query = Db(db_name).query() if table_name is not None and str(table_name).strip() != '': query.table(str(table_name).strip()) return query def OfficialDownloadBase(): return 'https://node.aapanel.com' def build_multipart(data: typing.Dict) -> aap_t_http_multipart: boundary = b'----AapanelFormBoundary' + GetRandomString(16).encode('utf-8') body = b'' # 标准的HTTP请求报文是使用\r\n换行 # \n换行也能被解析,可能存在兼容性问题 eol = b'\r\n' for k in data.keys(): v = data[k] # 二进制数据(文件上传)(bytes, filename) if isinstance(v, tuple) and len(v) == 2: bs, filename = v if isinstance(bs, bytes) and isinstance(filename, str): body += b'--' + boundary + eol + b'Content-Disposition: form-data; name="' + k.encode('utf-8') + b'"; filename="' + filename.encode('utf-8') + b'"' + eol + b'Content-Type: application/octet-stream' + eol + eol + bs + eol # 普通参数 else: # str/number 转 bytes if isinstance(v, str) or is_number(v): v = str(v).encode('utf-8') # 仅处理bytes if isinstance(v, bytes): body += b'--' + boundary + eol + b'Content-Disposition: form-data; name="' + k.encode('utf-8') + b'"' + eol + eol + v + eol body += b'--' + boundary + b'--' + eol return aap_t_http_multipart(headers={ 'Content-Type': 'multipart/form-data; boundary=' + boundary.decode('utf-8'), 'Content-Length': str(len(body)), }, body=body) def SqliteConn(db_name: str = 'default'): from wptoolkitModel.sqlite_easy import Db return Db(db_name) aap_t_simple_site_info = collections.namedtuple('aap_t_simple_site_info', ['site_id', 'database_id']) # 获取当前部署的Web服务器 def get_webserver(): if os.path.exists('{}/apache/bin/apachectl'.format(get_setup_path())): webserver = 'apache' elif os.path.exists('/usr/local/lsws/bin/lswsctrl'): webserver = 'openlitespeed' else: webserver = 'nginx' return webserver # 查询网站对应的PHP版本 def get_site_php_version(siteName: str) -> str: try: webserver = get_webserver() setup_path = get_setup_path() conf = readFile( '{setup_path}/panel/vhost/{webserver}/{siteName}.conf'.format(setup_path=setup_path, webserver=webserver, siteName=siteName)) if webserver == 'openlitespeed': conf = readFile(setup_path + '/panel/vhost/' + webserver + '/detail/' + siteName + '.conf') if webserver == 'nginx': rep = r"enable-php-(\w{2,5})[-\w]*\.conf" elif webserver == 'apache': rep = r"php-cgi-(\w{2,5})\.sock" else: rep = r"path\s*/usr/local/lsws/lsphp(\d+)/bin/lsphp" tmp = re.search(rep, conf).groups() if tmp[0] == '00': return 'Static' if tmp[0] == 'other': return 'Other' return tmp[0][0] + '.' + tmp[0][1] except: return 'Static' # 修复网站文件权限 def fix_permissions(site_root_path_or_site_file: str) -> aap_t_simple_result: """ :param site_root_path_or_site_file: str 网站根目录或者单一网站文件 :return: """ from files import files data = files().fix_permissions(to_dict_obj({'path': site_root_path_or_site_file})) if int(data.get('status', 0)) != 0: return aap_t_simple_result(False, data.get('msg', 'Failed to fix permission')) return aap_t_simple_result(True, data.get('msg', 'Fix permission successfully')) def run_plugin(plugin_name: str, def_name: str, args: dict_obj): import PluginLoader res = PluginLoader.plugin_run(plugin_name, def_name, args) if isinstance(res, dict): if 'status' in res and res['status'] == False and 'msg' in res: if isinstance(res['msg'], str): if res['msg'].find('Traceback ') != -1: raise PanelError(res['msg']) return res # 数据库导出 def dumpsql_with_aap(database_id: int, backup_path: typing.Optional[str] = None): import shlex db_find = M('databases').where("id=?", (database_id,)).find() if not isinstance(db_find, dict): raise HintException(get_msg_gettext('Table {} has been corrupted', ('databases',))) if backup_path is None: backup_path_tmp = M('config').order('`id` desc').limit(1).field('backup_path').find() if not isinstance(backup_path_tmp, dict): raise HintException(get_msg_gettext('Table {} has been corrupted', ('config',))) backup_path = os.path.join(str(backup_path_tmp['backup_path']), 'database') name = db_find['name'] fileName = name + '_' + time.strftime('%Y%m%d_%H%M%S', time.localtime()) + '.sql.gz' backupName = os.path.join(backup_path, fileName) mysqldump_bin = get_mysqldump_bin() from database import database database_obj = database() if db_find['db_type'] in ['0', 0]: # 本地数据库 # 测试数据库连接 with MysqlConn() as conn: conn.execute("show databases") root = M('config').where('id=?', (1,)).getField('mysql_root') if not os.path.exists(backup_path): os.makedirs(backup_path, 0o600) if not database_obj.mypass(True, root): raise HintException(get_msg_gettext("Database configuration file failed to get checked, please check " "if MySQL configuration file exists [/etc/my.cnf]")) try: password = M('config').where('id=?', (1,)).getField('mysql_root') if not password: raise HintException(get_msg_gettext("Database password cannot be empty")) password = shlex.quote(str(password)) os.environ["MYSQL_PWD"] = password ExecShell(mysqldump_bin + " -R -E --triggers=false --default-character-set=" + get_database_character(name) + " --force --opt \"" + name + "\" -u root -p" + password + " | gzip > " + backupName) finally: os.environ["MYSQL_PWD"] = "" database_obj.mypass(False, root) elif db_find['db_type'] in ['1', 1]: # 远程数据库 try: conn_config = json.loads(db_find['conn_config']) res = database_obj.CheckCloudDatabase(conn_config) if isinstance(res, dict): raise HintException(res.get('msg', get_msg_gettext('Cannot connect to remote MySQL'))) password = shlex.quote(str(conn_config['db_password'])) os.environ["MYSQL_PWD"] = password ExecShell(mysqldump_bin + " -h " + conn_config['db_host'] + " -P " + str(int(conn_config['db_port'])) + " -R -E --triggers=false --default-character-set=" + get_database_character(name) + " --force --opt \"" + str(db_find['name']) + "\" -u " + str(conn_config['db_user']) + " -p" + password + " | gzip > " + backupName) finally: os.environ["MYSQL_PWD"] = "" elif db_find['db_type'] in ['2', 2]: try: conn_config = M('database_servers').where('id=?', db_find['sid']).find() res = database_obj.CheckCloudDatabase(conn_config) if isinstance(res, dict): raise HintException(res.get('msg', get_msg_gettext('Cannot connect to remote MySQL'))) password = shlex.quote(str(conn_config['db_password'])) os.environ["MYSQL_PWD"] = password ExecShell(mysqldump_bin + " -h " + conn_config['db_host'] + " -P " + str(int(conn_config['db_port'])) + " -R -E --triggers=false --default-character-set=" + get_database_character(name) + " --force --opt \"" + str(db_find['name']) + "\" -u " + str(conn_config['db_user']) + " -p" + str(conn_config['db_password']) + " | gzip > " + backupName) finally: os.environ["MYSQL_PWD"] = "" else: raise HintException(get_msg_gettext("Unsupported database type")) if not os.path.exists(backupName): raise HintException(get_msg_gettext("Backup error")) # # 将备份信息添加到数据库中 # bak_id = M('backup').add('type,name,pid,filename,size,addtime', (1, fileName, id, backupName, 0, time.strftime('%Y-%m-%d %X', time.localtime()))) return aap_t_mysql_dump_info(db_name=str(db_find['name']), file=backupName, dump_time=int(time.time())) def restore(db_name: str, bak_file: str): from database import database data = database().InputSql(to_dict_obj({'name': db_name, 'file': bak_file})) return data def get_available_php_ver_shorts(without_static: bool = True) -> typing.List[str]: from panelSite import panelSite lst = panelSite().GetPHPVersion(to_dict_obj({})) if without_static: lst = filter(lambda x: x['version'] != '00', lst) return list(map(lambda x: x['version'], lst)) def create_php_site_with_mysql(domain: str, site_path: str, php_ver_short: str, db_user: str, db_pwd: str, another_domains: typing.List = ()) -> aap_t_simple_site_info: """ :param domain: str 网站主域名 :param site_path: str 网站根目录(绝对路径) :param php_ver_short: str PHP版本号缩写 54、74、80、81... :param db_user: str 数据库用户名 :param db_pwd: str 数据库用户密码 :param another_domains: list 网站其它解析域名 :return: aap_t_simple_site_info """ from panelSite import panelSite data = panelSite().AddSite(to_dict_obj({ 'webname': json.dumps({ 'domain': domain, 'domainlist': list(another_domains), 'count': 0, }), 'ftp': '0', 'type': 'PHP', 'version': php_ver_short, 'port': '80', 'path': site_path, 'sql': 'MySQL', 'datauser': db_user, 'datapassword': db_pwd, 'codeing': 'utf8mb4', 'ps': domain.replace('.', '_').replace('-', '_'), })) # if int(data.get('status', 0)) != 0: # raise HintException(data.get('message', {})['result']) # # data = data.get('message', {}) if "status" in data and not data["status"]: return data print(data) if int(data.get('databaseStatus', 0)) != 1: raise HintException('数据库创建失败,请检查mysql运行状态并重试') db_data = M('databases').where('pid=?', (data['siteId'],)).find() data['d_id'] = db_data['id'] return aap_t_simple_site_info(data['siteId'], data['d_id']) def remove_site(site_id: int) -> aap_t_simple_result: site_info = M('sites').where('`id` = ?', (site_id,)).field('name').find() if not isinstance(site_info, dict): return aap_t_simple_result(False, '没找到站点信息 {}'.format(site_id)) from panelSite import panelSite data = panelSite().DeleteSite(to_dict_obj({ 'id': site_id, 'webname': site_info['name'], 'ftp': '1', 'path': '1', 'database': '1', })) print(data) return data def escape_sql_str(s: str) -> str: return search_sql_special_chars.sub(r'\\\g<0>', s) def check_password(password): """ 密码强度: 0 弱 1 中 2 强 """ l = 0 low = False up = False symbol = False digit = False p_len = len(password) if p_len < 8: return l for i in password: if i.islower(): low = True if i.isupper(): up = True if i in ['~', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '_', '-', '=', '+', '<', '>', ',', '.', '/', '"', '|', '\\', "'", '?']: symbol = True if i.isdigit(): digit = True # 判断重复出现 tmp = len(set([i for i in password])) if tmp >= 2: if low and up and symbol and digit: l = 2 if p_len >= 11: l = 1 return l def restore_file(file, act=None): """ @name 还原配置文件 @author zhwen @param file 需要还原的文件 @param act 如果存在,则还原默认配置 """ file_type = "_bak" if act: file_type = "_def" ExecShell("/usr/bin/cp -p {1} {0}".format(file, file + file_type))