| import collections |
| import contextlib |
| import json |
| import os |
| import re |
| import sys |
| import time |
|
|
| import pymysql |
| import werkzeug |
|
|
| os.chdir("/www/server/panel") |
| if 'class/' not in sys.path: |
| sys.path.insert(0, 'class/') |
| from public import * |
|
|
| aap_t_simple_result = collections.namedtuple('aap_t_simple_result', ['success', 'msg']) |
| aap_t_http_multipart = collections.namedtuple('aap_t_http_multipart', ['headers', 'body']) |
| aap_t_mysql_dump_info = collections.namedtuple('aap_t_mysql_dump_info', ['db_name', 'file', 'dump_time']) |
| search_sql_special_chars = re.compile(r'''(?<!\\)(?:[%_]|\\(?![^\\abfnrtvxuUN'"0-7]))''') |
| search_php_first_fatal_error = re.compile(r'PHP Fatal error: \s*([^\r\n]+)') |
|
|
|
|
| def get_msg_gettext(msg, args=()): |
| try: |
| msg = msg.format(*args) |
| except: |
| pass |
| finally: |
| return msg |
|
|
| def return_msg_gettext(status, msg, args=()): |
| """ |
| @name 取通用dict返回 |
| @author hwliang |
| @date 2022.9.20 |
| """ |
| msg = get_msg_gettext(msg, args) |
| if status in(0, "0"): |
| status = True |
| elif status in(-1, "-1"): |
| status = False |
| return {'status': status, 'msg': msg} |
|
|
| def success_v2(res, format_args=()): |
| """ |
| @name V2版本的成功响应函数 |
| @author Zhj<2024-06-05> |
| @param res<any> 响应数据 |
| @param format_args<tuple> 响应文本提示时的format参数 |
| @return dict |
| """ |
| if isinstance(res, str): |
| res = get_msg_gettext(res, format_args) |
|
|
| return returnMsg(True, res) |
|
|
| def fail_v2(res, format_args=()): |
| if isinstance(res, str): |
| res = get_msg_gettext(res, format_args) |
|
|
| return returnMsg(False, res) |
|
|
| def return_message(status, message, data=None): |
| if status in(0, "0"): |
| status = True |
| elif status in(-1, "-1"): |
| status = False |
| return returnMsg(status, data) |
|
|
| def lang(msg, *args): |
| return msg.format(*args) |
|
|
| def make_panel_tmp_path() -> str: |
| tmp_path = '{}/temp/tmp_{}_{}'.format(get_panel_path(), int(time.time()), GetRandomString(32)) |
| if not os.path.exists(tmp_path): |
| os.makedirs(tmp_path, 0o755) |
| return tmp_path |
|
|
|
|
| |
| @contextlib.contextmanager |
| def make_panel_tmp_path_with_context(): |
| tmp_path = make_panel_tmp_path() |
|
|
| import shutil |
|
|
| try: |
| yield tmp_path |
| finally: |
| |
| shutil.rmtree(tmp_path) |
|
|
| def back_file(file, act=None): |
| """ |
| @name 备份配置文件 |
| @author zhwen<zhw@aapanel.com> |
| @param file 需要备份的文件 |
| @param act 如果存在,则备份一份作为默认配置 |
| """ |
| file_type = "_bak" |
| if act: |
| file_type = "_def" |
| ExecShell("/usr/bin/cp -p {0} {1}".format(file, file + file_type)) |
|
|
| def OfficialApiBase(): |
| return 'https://www.aapanel.com' |
|
|
| class HintException(Exception): |
| pass |
|
|
| class PanelError(Exception): |
| ''' |
| @name 面板通用异常对像 |
| @author hwliang<2021-06-25> |
| ''' |
|
|
| def __init__(self, value): |
| self.value = value |
|
|
| def __str__(self): |
| return ("An error occurred while the panel was running: {}".format(str(self.value))) |
|
|
| def is_number(s) -> bool: |
| """ |
| @name 判断输入参数是否一个数字 |
| @author Zhj<2022-07-18> |
| @param s<string|integer|float> 输入参数 |
| @return bool |
| """ |
| try: |
| float(s) |
| return True |
| except ValueError: |
| pass |
|
|
| try: |
| import unicodedata |
| unicodedata.numeric(s) |
| return True |
| except (TypeError, ValueError): |
| pass |
|
|
| return False |
|
|
| class MysqlConn: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def __init__(self, db_name=None, db_user: str = 'root', db_pwd=None, db_host: str = 'localhost'): |
| self.__CONN = None |
| self.__DB_NAME = db_name |
| self.__HOST = db_host |
| self.__PORT = 3306 |
| self.__USERNAME = db_user |
| self.__PASSWORD = db_pwd |
| self.__CHARSET = 'utf8mb4' |
| self.__CONNECT_TIMEOUT = 10 |
| self.__UNIX_SOCK = None |
|
|
| def __enter__(self): |
| if self.__CONN: |
| return self |
|
|
| if self.__HOST in ('localhost', '127.0.0.1'): |
| self.__UNIX_SOCK = '/tmp/mysql.sock' |
| self.__CONNECT_TIMEOUT = 1 |
|
|
| myconf = readFile('/etc/my.cnf') |
| m = re.search(r"socket\s*=\s*(.+)", myconf) |
| if m: |
| self.__UNIX_SOCK = m.group(1) |
|
|
| m = re.search(r"port\s*=\s*([0-9]+)", myconf) |
| if m: |
| self.__PORT = int(m.group(1)) |
|
|
| if self.__USERNAME == 'root': |
| self.__PASSWORD = M('config').where('id=?', (1,)).getField('mysql_root') |
|
|
| import pymysql |
|
|
| try: |
| self.__CONN = pymysql.connect(host=self.__HOST, user=self.__USERNAME, passwd=self.__PASSWORD, |
| port=self.__PORT, charset=self.__CHARSET, database=self.__DB_NAME, |
| connect_timeout=self.__CONNECT_TIMEOUT, |
| cursorclass=pymysql.cursors.DictCursor, unix_socket=self.__UNIX_SOCK) |
| except pymysql.Error: |
| if self.__HOST == 'localhost': |
| self.__HOST = '127.0.0.1' |
| self.__CONN = pymysql.connect(host=self.__HOST, user=self.__USERNAME, passwd=self.__PASSWORD, |
| port=self.__PORT, charset=self.__CHARSET, database=self.__DB_NAME, |
| connect_timeout=self.__CONNECT_TIMEOUT, |
| cursorclass=pymysql.cursors.DictCursor, unix_socket=self.__UNIX_SOCK) |
| raise |
|
|
| return self |
|
|
| def __del__(self): |
| if self.__CONN: |
| self.__CONN.close() |
| self.__CONN = None |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.__CONN.close() |
| self.__CONN = None |
|
|
| def query(self, sql): |
| cur = self.__CONN.cursor() |
|
|
| try: |
| row_count = cur.execute(sql) |
|
|
| if row_count == 0: |
| return [] |
|
|
| return cur.fetchall() |
| finally: |
| cur.close() |
|
|
| |
| def find(self, sql): |
| ret = self.query(sql) |
|
|
| if len(ret) == 0: |
| return None |
|
|
| return ret[0] |
|
|
| |
| def execute(self, sql): |
| cur = self.__CONN.cursor() |
|
|
| try: |
| row_count = cur.execute(sql) |
|
|
| self.__CONN.commit() |
|
|
| return row_count |
| finally: |
| cur.close() |
|
|
| import copy |
| import re |
| import json |
| import socket |
| import os |
| import sys |
| import typing |
| import collections |
| from types import MethodType |
|
|
| os.chdir("/www/server/panel") |
| if 'class/' not in sys.path: |
| sys.path.insert(0, 'class/') |
| import public |
|
|
| re_key_match = re.compile(r'^[\w\s\[\]\-.]+$') |
| re_key_match2 = re.compile(r'^\.?__[\w\s[\]\-]+__\.?$') |
| key_filter_list = ['get', 'set', 'get_items', 'exists', '__contains__', '__setitem__', '__getitem__', '__delitem__', |
| '__delattr__', '__setattr__', '__getattr__', '__class__', 'get_file'] |
| |
| match_ipv4 = re.compile(r'^(?:(?:25[0-5]|(?:2[0-4]|1?\d)?\d)\.){3}(?:25[0-5]|(?:2[0-4]|1?\d)?\d)$') |
| match_ipv6 = re.compile(r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4})|(?:(?:[0-9a-fA-F]{1,4}:){1,7}:)|(?:(?:[0-9a-fA-F]{1,4}:){6}:[0-9a-fA-F]{1,4})|(?:(?:[0-9a-fA-F]{1,4}:){5}(?::[0-9a-fA-F]{1,4}){1,2})|(?:(?:[0-9a-fA-F]{1,4}:){4}(?::[0-9a-fA-F]{1,4}){1,3})|(?:(?:[0-9a-fA-F]{1,4}:){3}(?::[0-9a-fA-F]{1,4}){1,4})|(?:(?:[0-9a-fA-F]{1,4}:){2}(?::[0-9a-fA-F]{1,4}){1,5})|(?:(?:[0-9a-fA-F]{1,4}:){1}(?::[0-9a-fA-F]{1,4}){1,6})|(?::(?:(?::[0-9a-fA-F]{1,4}){1,7}|:))') |
| |
| match_safe_path = re.compile(r'^[\w\s./\-]*$') |
| |
| match_based_host = re.compile(r'^[\w.:\-]+$') |
| |
| match_class_private_property = re.compile(r'^(?:_\w+)?__\w+') |
| |
| match_general_version_format = re.compile(r'^\d+(?:\.\d+){1,2}$') |
| |
| match_md5_format = re.compile(r'^[a-fA-F0-9]{32}$') |
|
|
| aap_t_simple_result = collections.namedtuple('aap_t_simple_result', ['success', 'msg']) |
|
|
| class HintException(Exception): |
| pass |
|
|
|
|
| class Param: |
| __VALIDATE_OPTS = [ |
| '>', |
| '<', |
| '>=', |
| '<=', |
| '=', |
| 'in', |
| 'not in', |
| ] |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.__validate_rules: typing.List[_ValidateRule] = [] |
| self.__filters: typing.List[callable] = [] |
|
|
| |
|
|
| def Require(self): |
| """ |
| 必选参数 |
| @return: self |
| """ |
| self.__validate_rules.append(_RequireValidation(self.name)) |
| return self |
|
|
| def Date(self): |
| """ |
| 日期字符串 |
| @return: self |
| """ |
| self.__validate_rules.append(_DateValidation(self.name)) |
| return self |
|
|
| def Timestamp(self): |
| """ |
| Unix时间戳 |
| @return: self |
| """ |
| self.__validate_rules.append(_TimestampValidation(self.name)) |
| return self |
|
|
| def Url(self): |
| """ |
| URL |
| @return: self |
| """ |
| self.__validate_rules.append(_UrlValidation(self.name)) |
| return self |
|
|
| def Ip(self): |
| """ |
| IP地址 |
| @return: self |
| """ |
| self.__validate_rules.append(_IpValidation(self.name)) |
| return self |
|
|
| def Ipv4(self): |
| """ |
| IPv4地址 |
| @return: self |
| """ |
| self.__validate_rules.append(_Ipv4Validation(self.name)) |
| return self |
|
|
| def Ipv6(self): |
| """ |
| IPv6地址 |
| @return: self |
| """ |
| self.__validate_rules.append(_Ipv6Validation(self.name)) |
| return self |
|
|
| def Host(self): |
| """ |
| 主机地址(可以包含端口号) |
| @return: self |
| """ |
| self.__validate_rules.append(_HostValidation(self.name)) |
| return self |
|
|
| def Port(self): |
| """ |
| 端口号 |
| @return: self |
| """ |
| return self.Integer('between', [1, 65535]) |
|
|
| def Json(self): |
| """ |
| JSON字符串 |
| @return: self |
| """ |
| self.__validate_rules.append(_JsonValidation(self.name)) |
| return self |
|
|
| def Array(self): |
| """ |
| JSON-Array字符串 |
| @return: self |
| """ |
| self.__validate_rules.append(_ArrayValidation(self.name)) |
| return self |
|
|
| def Object(self): |
| """ |
| JSON-Object字符串 |
| @return: self |
| """ |
| self.__validate_rules.append(_ObjectValidation(self.name)) |
| return self |
|
|
| def List(self): |
| """ |
| 限制参数数据类型:list |
| @return: self |
| """ |
| self.__validate_rules.append(_ListValidation(self.name)) |
| return self |
|
|
| def Tuple(self): |
| """ |
| 限制参数数据类型:tuple |
| @return: self |
| """ |
| self.__validate_rules.append(_TupleValidation(self.name)) |
| return self |
|
|
| def Dict(self): |
| """ |
| 限制参数数据类型:dict |
| @return: self |
| """ |
| self.__validate_rules.append(_DictValidation(self.name)) |
| return self |
|
|
| def Bool(self): |
| """ |
| 布尔值或boolean字符串 true/false |
| @return: self |
| """ |
| self.__validate_rules.append(_BoolValidation(self.name)) |
| return self |
|
|
| def String(self, opt: typing.Optional[str] = None, length_or_list: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): |
| """ |
| 字符串 |
| @param opt: str 运算符 |
| @param length_or_list: int|list[int|str]|None 字符串长度或字符串集合 |
| @return: self |
| """ |
| self.__validate_rules.append(_StringValidation(self.name, opt, length_or_list)) |
| return self |
|
|
| def Number(self, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, float, typing.List[typing.Union[int, float]]]] = None): |
| """ |
| 数值 |
| @param opt: str 运算符 |
| @param num: int 数值大小 |
| @return: self |
| """ |
| self.__validate_rules.append(_NumberValidation(self.name, opt, num)) |
| return self |
|
|
| def Integer(self, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[typing.Union[int, typing.List[int]]]] = None): |
| """ |
| 整数 |
| @param opt: str 运算符 |
| @param num: int 数值大小 |
| @return: self |
| """ |
| self.__validate_rules.append(_IntegerValidation(self.name, opt, num)) |
| return self |
|
|
| def Float(self, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, float, typing.List[typing.Union[int, float]]]] = None): |
| """ |
| 浮点数 |
| @param opt: str 运算符 |
| @param num: int 数值大小 |
| @return: self |
| """ |
| self.__validate_rules.append(_FloatValidation(self.name, opt, num)) |
| return self |
|
|
| def Alpha(self, opt: typing.Optional[str] = None, length_or_list: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): |
| """ |
| 纯字母 |
| @param opt: str 运算符 |
| @param length_or_list: int|list[int|str]|None 字符串长度或字符串集合 |
| @return: self |
| """ |
| self.__validate_rules.append(_AlphaValidation(self.name, opt, length_or_list)) |
| return self |
|
|
| def Alphanum(self, opt: typing.Optional[str] = None, length_or_list: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): |
| """ |
| 字母+数字 |
| @param opt: str 运算符 |
| @param length_or_list: int|list[int|str]|None 字符串长度或字符串集合 |
| @return: self |
| """ |
| self.__validate_rules.append(_AlphanumValidation(self.name, opt, length_or_list)) |
| return self |
|
|
| def Mobile(self): |
| """ |
| (中国)手机号码 |
| @return: self |
| """ |
| self.__validate_rules.append(_MobileValidation(self.name)) |
| return self |
|
|
| def Email(self): |
| """ |
| 邮箱地址 |
| @return: self |
| """ |
| self.__validate_rules.append(_EmailValidation(self.name)) |
| return self |
|
|
| def Regexp(self, exp: str): |
| """ |
| 正则表达式 |
| @param exp: str 正则表达式 |
| @return: self |
| """ |
| self.__validate_rules.append(_RegexpValidation(self.name, exp)) |
| return self |
|
|
| def File(self): |
| """ |
| 文件上传 |
| @return: self |
| """ |
| self.__validate_rules.append(_FileValidation(self.name)) |
| return self |
|
|
| def Size(self, opt: typing.Optional[str] = None, size: typing.Optional[typing.Union[int, typing.List[int]]] = None): |
| """ |
| 上传文件大小 |
| @param opt: str 运算符 |
| @param size: int 上传文件大小bytes |
| @return: self |
| """ |
| self.__validate_rules.append(_SizeValidation(self.name, opt, size)) |
| return self |
|
|
| def Mime(self, opt: typing.Optional[str] = None, mime_type: typing.Optional[typing.Union[str, typing.List[str]]] = None): |
| """ |
| 上传文件Mimetype |
| @param opt: str 运算符 |
| @param mime_type: str 上传文件Mimetype |
| @return: self |
| """ |
| self.__validate_rules.append(_MimeValidation(self.name, opt, mime_type)) |
| return self |
|
|
| def Ext(self, opt: typing.Optional[str] = None, ext: typing.Optional[typing.Union[str, typing.List[str]]] = None): |
| """ |
| 上传文件后缀名 |
| @param opt: str 运算符 |
| @param ext: str 上传文件后缀名 |
| @return: self |
| """ |
| self.__validate_rules.append(_ExtValidation(self.name, opt, ext)) |
| return self |
|
|
| def SafePath(self): |
| """ |
| 文件路径 |
| @return: self |
| """ |
| self.__validate_rules.append(_SafePathValidation(self.name)) |
| return self |
|
|
| |
|
|
| |
|
|
| def Trim(self): |
| """ |
| 去除字符串两端空白字符 |
| @return: self |
| """ |
| self.__filters.append(lambda x: str(x).strip()) |
| return self |
|
|
| def Xss(self): |
| """ |
| XSS过滤 |
| @return: self |
| """ |
| self.__filters.append(_xssencode) |
| return self |
|
|
| def Filter(self, f: callable): |
| """ |
| 自定义参数过滤器 |
| @param f: callable func(x: any) -> any |
| @return: self |
| """ |
| self.__filters.append(f) |
| return self |
|
|
| |
|
|
| def do_validate(self, args: dict): |
| """ |
| 执行验证器 |
| @param args: dict 请求参数列表 |
| @return: self |
| """ |
| for v in self.__validate_rules: |
| v.validate(args) |
|
|
| return self |
|
|
| def do_filter(self, val, extra_filters: typing.Union[typing.List[callable], typing.Tuple[callable]] = ()) -> any: |
| """ |
| 执行参数过滤器 |
| @param val: any |
| @param extra_filters: list[callable]|tuple[callable] |
| @return: any |
| """ |
| from functools import reduce |
| return reduce(lambda x, y: y(x), list(extra_filters) + self.__filters, val) |
|
|
|
|
| class _ValidateRule: |
| def validate(self, args: dict): |
| raise NotImplementedError('method validate() not implemented.') |
|
|
|
|
| class _RequireValidation(_ValidateRule): |
| """ |
| 必选参数验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} is required' |
|
|
| def validate(self, args: dict): |
| if self.name in args: |
| return |
|
|
| if 'FILES' in args and self.name in args['FILES']: |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _DateValidation(_ValidateRule): |
| """ |
| 日期字符串验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid datetime' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if re.match(r'^(?:\d{2}-\d{2}-\d{2}|\d{2}/\d{2}/\d{2})(?: \d{2}:\d{2}(?::\d{2})?)?$', |
| str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _TimestampValidation(_ValidateRule): |
| """ |
| Unix时间戳验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid timestamp' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if re.match(r'^\d{10}$', str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _UrlValidation(_ValidateRule): |
| """ |
| URL地址验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid URL' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| regex_obj = re.compile( |
| r'^(?:http|ftp)s?://' |
| r'(?:(?:[A-Z0-9_](?:[A-Z0-9-_]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-_]{2,}\.?)|' |
| r'localhost|' |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
| r'(?::\d+)?' |
| r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
|
| if regex_obj.match(str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _IpValidation(_ValidateRule): |
| """ |
| IP地址验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid IP' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| ipstr = str(args[self.name]).strip() |
|
|
| if _is_ipv4(ipstr) or _is_ipv6(ipstr): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _Ipv4Validation(_ValidateRule): |
| """ |
| IPv4地址验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid IPv4' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if _is_ipv4(str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _Ipv6Validation(_ValidateRule): |
| """ |
| IPv6地址验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid IPv4' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if _is_ipv6(str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _HostValidation(_ValidateRule): |
| """ |
| 主机地址验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid HOST' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if match_based_host.match(str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _JsonValidation(_ValidateRule): |
| """ |
| JSON字符串验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid JSON' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| try: |
| json.loads(str(args[self.name]).strip()) |
| return |
| except: |
| pass |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _ArrayValidation(_ValidateRule): |
| """ |
| JSON-Array字符串验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid JSON Array' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| try: |
| obj = json.loads(str(args[self.name]).strip()) |
|
|
| if isinstance(obj, list): |
| return |
| except: |
| pass |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _ObjectValidation(_ValidateRule): |
| """ |
| JSON-Object字符串验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid JSON Object' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| try: |
| obj = json.loads(str(args[self.name]).strip()) |
|
|
| if isinstance(obj, dict): |
| return |
| except: |
| pass |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _BoolValidation(_ValidateRule): |
| """ |
| bool字符串验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} must be bool' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| val = args[self.name] |
|
|
| if isinstance(val, bool) or re.match(r'^true|false$', str(args[self.name]).strip(), re.IGNORECASE): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _ListValidation(_ValidateRule): |
| """ |
| list数据类型验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} must be list' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if isinstance(args[self.name], list): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _TupleValidation(_ValidateRule): |
| """ |
| tuple数据类型验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} must be tuple' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if isinstance(args[self.name], tuple): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _DictValidation(_ValidateRule): |
| """ |
| dict数据类型验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} must be dict' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if isinstance(args[self.name], dict): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _OperationHelper: |
| """ |
| 运算辅助类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str], operand: typing.Optional[typing.Union[str, int, float, typing.List[typing.Union[str, int, float]]]], data_type): |
| self.name: str = name |
| self.opt = opt |
| self.operand = operand |
| self.data_type = data_type |
| self._opt_check() |
| self._data_type_check() |
|
|
| def do(self, val: typing.Union[str, int, float], data_type=None): |
| val = str(val).strip() |
|
|
| if data_type is not None: |
| self.data_type = data_type |
| self._data_type_check() |
|
|
| if self.opt is None: |
| return |
|
|
| if self.operand is None: |
| return |
|
|
| if self.opt == '=': |
| self._eq(self._calc_num(val)) |
| elif self.opt == '>': |
| self._gt(self._calc_num(val)) |
| elif self.opt == '>=': |
| self._gte(self._calc_num(val)) |
| elif self.opt == '<': |
| self._lt(self._calc_num(val)) |
| elif self.opt == '<=': |
| self._lte(self._calc_num(val)) |
| elif self.opt == 'between': |
| self._between(self._calc_num(val)) |
| elif self.opt == 'in': |
| self._in(self.data_type(val)) |
| elif self.opt == 'not in': |
| self._not_in(self.data_type(val)) |
|
|
| def _calc_num(self, val: str) -> typing.Union[int, float]: |
| if self.data_type is str: |
| return len(val) |
|
|
| return self.data_type(val) |
|
|
| def _opt_check(self): |
| if self.opt is None: |
| return |
|
|
| self.opt = self.opt.lower() |
|
|
| if self.operand is None: |
| return |
|
|
| if self.opt in ['=', '>', '<', '>=', '<='] and not isinstance(self.operand, int): |
| raise HintException('当运算符opt是 \'{}\' 时,运算数只能是int类型或float类型,当前类型 {}'.format(self.opt, type(self.operand))) |
|
|
| if self.opt in ['in', 'not in', 'between'] and not isinstance(self.operand, list): |
| raise HintException('当运算符opt是 \'{}\' 时,运算数只能是list类型,当前类型 {}'.format(self.opt, type(self.operand))) |
|
|
| def _data_type_check(self): |
| if self.data_type is str: |
| return |
|
|
| if self.data_type is int: |
| return |
|
|
| if self.data_type is float: |
| return |
|
|
| raise HintException('data_type只能是str、int、float 当前 {}'.format(self.data_type)) |
|
|
| def _eq(self, num: typing.Union[int, float]): |
| if num == self.operand: |
| return |
|
|
| raise HintException( |
| '{}{} must equal {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', self.operand)) |
|
|
| def _gt(self, num: typing.Union[int, float]): |
| if num > self.operand: |
| return |
|
|
| raise HintException( |
| '{}{} must greater than {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', |
| self.operand)) |
|
|
| def _gte(self, num: typing.Union[int, float]): |
| if num >= self.operand: |
| return |
|
|
| raise HintException( |
| '{}{} must greater than or equal {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', |
| self.operand)) |
|
|
| def _lt(self, num: typing.Union[int, float]): |
| if num < self.operand: |
| return |
|
|
| raise HintException( |
| '{}{} must less than {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', |
| self.operand)) |
|
|
| def _lte(self, num: typing.Union[int, float]): |
| if num <= self.operand: |
| return |
|
|
| raise HintException( |
| '{}{} must less than or equal {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', |
| self.operand)) |
|
|
| def _between(self, num: typing.Union[int, float]): |
| if len(self.operand) != 2: |
| raise HintException('当运算符opt是 \'between\' 时,运算数只能是list类型,并且list的长度只能是2,当前list长度 {}'.format(len(self.operand))) |
|
|
| if num >= self.operand[0] and num <= self.operand[1]: |
| return |
|
|
| raise HintException( |
| '{}{} must between {} and {}'.format(self.name, ' length' if isinstance(self.data_type, str) else '', |
| self.operand[0], self.operand[1])) |
|
|
| def _in(self, item: typing.Union[int, float, str]): |
| if len(self.operand) < 1: |
| raise HintException('当运算符opt是 \'{}\' 时,运算数只能是list类型,并且list的长度必须大于0,当前list长度 0'.format(self.opt)) |
|
|
| if item in self.operand: |
| return |
|
|
| raise HintException('{} must in {}'.format(self.name, self.operand)) |
|
|
| def _not_in(self, item: typing.Union[int, float, str]): |
| if len(self.operand) < 1: |
| raise HintException('当运算符opt是 \'{}\' 时,运算数只能是list类型,并且list的长度必须大于0,当前list长度 0'.format(self.opt)) |
|
|
| if item in self.operand: |
| raise HintException('{} must not in {}'.format(self.name, self.operand)) |
|
|
|
|
| class _StringValidation(_ValidateRule): |
| """ |
| 字符串验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, v: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): |
| self.name: str = name |
| self.errmsg: str = '{} must be string' |
| self.op = _OperationHelper(name, opt, v, str) |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| s = args[self.name] |
|
|
| if isinstance(s, str): |
| self.op.do(s) |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _NumberValidation(_ValidateRule): |
| """ |
| 数字验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, float, typing.List[typing.Union[int, float]]]] = None): |
| self.name: str = name |
| self.errmsg: str = '{} must be number' |
| self.op = _OperationHelper(name, opt, num, float) |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| num = args[self.name] |
|
|
| if _is_number(num): |
| self.op.do(num, _get_number_data_type(num)) |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _IntegerValidation(_ValidateRule): |
| """ |
| 整数验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[int, typing.List[int]]] = None): |
| self.name: str = name |
| self.errmsg: str = '{} must be integer' |
| self.op = _OperationHelper(name, opt, num, int) |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| num = args[self.name] |
|
|
| if _is_int(num): |
| self.op.do(num) |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _FloatValidation(_ValidateRule): |
| """ |
| 浮点数验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, num: typing.Optional[typing.Union[float, typing.List[float]]] = None): |
| self.name: str = name |
| self.errmsg: str = '{} must be float' |
| self.op = _OperationHelper(name, opt, num, float) |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| num = args[self.name] |
|
|
| if _is_float(num): |
| self.op.do(num) |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _AlphaValidation(_ValidateRule): |
| """ |
| 纯字母验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, v: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): |
| self.name: str = name |
| self.errmsg: str = '{} must be alpha' |
| self.op = _OperationHelper(name, opt, v, str) |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| s = str(args[self.name]).strip() |
|
|
| if re.match(r'^[a-zA-Z]+$', s): |
| self.op.do(s) |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _AlphanumValidation(_ValidateRule): |
| """ |
| 字母数字验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, v: typing.Optional[typing.Union[int, typing.List[typing.Union[int, str]]]] = None): |
| self.name: str = name |
| self.errmsg: str = '{} must be alphanum' |
| self.op = _OperationHelper(name, opt, v, str) |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| s = str(args[self.name]).strip() |
|
|
| if re.match(r'^[a-zA-Z0-9]+$', s): |
| self.op.do(s) |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _MobileValidation(_ValidateRule): |
| """ |
| (中国)手机号码验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid mobile' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| s = str(args[self.name]).strip() |
|
|
| if re.match(r'^1[3-9]\d{9}$', s): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _EmailValidation(_ValidateRule): |
| """ |
| 邮箱地址验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid email' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| s = str(args[self.name]).strip() |
|
|
| if re.match(r'^.+@(\[?)[a-zA-Z0-9\-.]+\.(?:[a-zA-Z]{2,}|\d{1,3})\1$', s): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _RegexpValidation(_ValidateRule): |
| """ |
| 正则表达式验证类 |
| """ |
|
|
| def __init__(self, name: str, regexp: str): |
| self.name: str = name |
| self.errmsg: str = '{} not success verified by regexp' |
| self.regexp: str = regexp |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| s = str(args[self.name]).strip() |
|
|
| if re.match(self.regexp, s): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _FileValidation(_ValidateRule): |
| """ |
| 文件上传验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg: str = '{} not valid file' |
|
|
| def validate(self, args: dict): |
| if 'FILES' in args and self.name in args['FILES']: |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| class _SizeValidation(_ValidateRule): |
| """ |
| 文件大小验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, size: typing.Optional[typing.Union[int, typing.List[int]]] = None): |
| self.name: str = name |
| self.op = _OperationHelper(name, opt, size, int) |
|
|
| def validate(self, args: dict): |
| if 'FILES' not in args or self.name not in args['FILES']: |
| return |
|
|
| self.op.do(args['FILES'][self.name].content_length) |
|
|
|
|
| class _MimeValidation(_ValidateRule): |
| """ |
| 文件mimetype验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, mime_type: typing.Optional[typing.Union[str, typing.List[str]]] = None): |
| self.name: str = name |
| self.op = _OperationHelper(name, opt, mime_type, str) |
|
|
| def validate(self, args: dict): |
| if 'FILES' not in args or self.name not in args['FILES']: |
| return |
|
|
| self.op.do(args['FILES'][self.name].mimetype) |
|
|
|
|
| class _ExtValidation(_ValidateRule): |
| """ |
| 文件后缀名验证类 |
| """ |
|
|
| def __init__(self, name: str, opt: typing.Optional[str] = None, ext: typing.Optional[typing.Union[str, typing.List[str]]] = None): |
| self.name: str = name |
| self.op = _OperationHelper(name, opt, ext, str) |
|
|
| def validate(self, args: dict): |
| if 'FILES' not in args or self.name not in args['FILES']: |
| return |
|
|
| f = args['FILES'][self.name] |
|
|
| self.op.do(os.path.splitext(f.filename)[-1]) |
|
|
|
|
| class _SafePathValidation(_ValidateRule): |
| """ |
| 文件路径名验证类 |
| """ |
|
|
| def __init__(self, name: str): |
| self.name: str = name |
| self.errmsg = '{} not safe path' |
|
|
| def validate(self, args: dict): |
| if self.name not in args: |
| return |
|
|
| if _is_safe_path(str(args[self.name]).strip()): |
| return |
|
|
| raise HintException(self.errmsg.format(self.name)) |
|
|
|
|
| def trim_filter() -> callable: |
| """ |
| 获取Trim参数过滤器 |
| @return: callable |
| """ |
| return lambda x: str(x).strip() |
|
|
|
|
| def xss_filter() -> callable: |
| """ |
| 获取XSS参数过滤器 |
| @return: callable |
| """ |
| return _xssencode |
|
|
|
|
| def _is_ipv4(ip: str) -> bool: |
| ''' |
| @name 是否是IPV4地址 |
| @author hwliang |
| @param ip<string> IP地址 |
| @return True/False |
| ''' |
| |
| if not match_ipv4.match(ip): |
| return False |
|
|
| |
| try: |
| socket.inet_pton(socket.AF_INET, ip) |
| except AttributeError: |
| try: |
| socket.inet_aton(ip) |
| except socket.error: |
| return False |
| except socket.error: |
| return False |
| return True |
|
|
|
|
| def _is_ipv6(ip: str) -> bool: |
| ''' |
| @name 是否为IPv6地址 |
| @author hwliang |
| @param ip<string> 地址 |
| @return True/False |
| ''' |
| |
| if not match_ipv6.match(ip): |
| return False |
|
|
| |
| try: |
| socket.inet_pton(socket.AF_INET6, ip) |
| except socket.error: |
| return False |
| return True |
|
|
|
|
| def _xssencode(text: str) -> str: |
| """ |
| XSS过滤 |
| @param text: str |
| @return bool |
| """ |
| try: |
| from cgi import html |
| list = ['`', '~', '&', '#', '/', '*', '$', '@', '<', '>', '\"', '\'', ';', '%', ',', '.', '\\u'] |
| ret = [] |
| for i in text: |
| if i in list: |
| i = '' |
| ret.append(i) |
| str_convert = ''.join(ret) |
| text2 = html.escape(str_convert, quote=True) |
| return text2 |
| except: |
| return text.replace('&', '&').replace('"', '"').replace('<', '<').replace('>', '>') |
|
|
|
|
| def _is_safe_path(path: str, force: bool = True) -> bool: |
| """ |
| 文件路径过滤 |
| @param path: str |
| @param force: bool |
| @return: bool |
| """ |
| if len(path) > 256: |
| return False |
|
|
| checks = ['..', './', '\\', '%', '$', '^', '&', '*', '~', '"', "'", ';', '|', '{', '}', '`'] |
|
|
| for c in checks: |
| if path.find(c) > -1: |
| return False |
|
|
| if force: |
| if not match_safe_path.match(path): |
| return False |
|
|
| return True |
|
|
|
|
| def _is_number(s) -> bool: |
| """ |
| @name 判断输入参数是否一个数字 |
| @author Zhj<2022-07-18> |
| @param s<string|integer|float> 输入参数 |
| @return bool |
| """ |
| try: |
| float(s) |
| return True |
| except ValueError: |
| pass |
|
|
| try: |
| import unicodedata |
| unicodedata.numeric(s) |
| return True |
| except (TypeError, ValueError): |
| pass |
|
|
| return False |
|
|
|
|
| def _is_int(s) -> bool: |
| """ |
| 判断输入是否是整数 |
| @param s: any |
| @return bool |
| """ |
| try: |
| int(s) |
| return True |
| except ValueError: |
| pass |
|
|
| return False |
|
|
|
|
| def _is_float(s) -> bool: |
| """ |
| 判断输入是否是浮点数 |
| @param s: any |
| @return bool |
| """ |
| try: |
| float(s) |
| return True |
| except ValueError: |
| pass |
|
|
| return False |
|
|
|
|
| def _get_number_data_type(s): |
| """ |
| 获取数字的数据类型 |
| @param s<string> 输入参数 |
| @return int|float |
| """ |
| try: |
| int(s) |
| return int |
| except ValueError: |
| pass |
|
|
| return float |
|
|
|
|
| |
| class Validator: |
| def __init__(self, rules: typing.Union[typing.Tuple[Param], typing.List[Param]], raise_exc: bool = True): |
| self.__RULES = list(rules) |
| self.__RAISE_EXC = raise_exc |
|
|
| |
| def check(self, args: dict) -> aap_t_simple_result: |
| try: |
| for v in self.__RULES: |
| v.do_validate(args) |
| except Exception as e: |
| if self.__RAISE_EXC: |
| raise |
|
|
| return aap_t_simple_result(False, str(e)) |
|
|
| return aap_t_simple_result(True, 'ok') |
|
|
| |
| def filter(self, args: dict) -> typing.Dict: |
| new_args = {} |
|
|
| for v in self.__RULES: |
| if v.name not in args: |
| continue |
|
|
| new_args = v.do_filter(args[v.name]) |
|
|
| return new_args |
|
|
| class dict_obj(dict_obj): |
| def __init__(self): |
| super().__init__() |
| |
| self.__store = {} |
|
|
| |
| self.__validated = set() |
|
|
| def __contains__(self, key): |
| return hasattr(self, key) |
|
|
| def __setitem__(self, key, value): |
| if key in key_filter_list: |
| raise ValueError("wrong field name") |
|
|
| if not re_key_match.match(key) or re_key_match2.match(key): |
| raise ValueError("wrong field name") |
|
|
| self.__store[key] = value |
|
|
| def __getitem__(self, key): |
| return getattr(self, key) |
|
|
| def __delitem__(self, key): |
| delattr(self, key) |
|
|
| def __delattr__(self, key): |
| delattr(self, key) |
|
|
| def __setattr__(self, key, value): |
| if match_class_private_property.match(key): |
| object.__setattr__(self, key, value) |
| return |
|
|
| self.__store[key] = value |
|
|
| def __getattr__(self, key): |
| if key in self.__store: |
| |
| |
| |
| return self.__store[key] |
|
|
| raise AttributeError('\'{}\' object has no attribute \'{}\''.format(self.__class__.__name__, key)) |
|
|
| @property |
| def __dict__(self): |
| return self.__store |
|
|
| def get_items(self): |
| return self.__store |
|
|
| def validate(self, validate_rules: typing.List[Param], filters: typing.List[callable] = (trim_filter(),)) -> None: |
| """ |
| @name 验证请求参数 |
| @param validate_rules: list[validate.Param] 参数验证规则 |
| @param filters: list[callable] 参数过滤器 |
| @raise Error |
| """ |
| filters = list(filters) |
|
|
| for v in validate_rules: |
| v.do_validate(self.__store) |
|
|
| if v.name in self.__store: |
| self.__store[v.name] = v.do_filter(self.__store[v.name], filters) |
|
|
| self.__validated.add(v.name) |
|
|
| def set(self, key, value): |
| if not isinstance(value, str) or not isinstance(key, str): return False |
| if key in key_filter_list: |
| raise ValueError("wrong field name") |
| if not re_key_match.match(key) or re_key_match2.match(key): |
| raise ValueError("wrong field name") |
| return setattr(self, key, value) |
|
|
| def to_dict_obj(data: dict) -> dict_obj: |
| ''' |
| @name 将dict转换为dict_obj |
| @author hwliang<2021-07-15> |
| @param data<dict> 要被转换的数据 |
| @return dict_obj |
| ''' |
| if not isinstance(data, dict): |
| raise returnMsg(False, 'parameter error: only support transform dict to dict_obj.') |
| pdata = dict_obj() |
| for key in data.keys(): |
| pdata[key] = data[key] |
| return pdata |
|
|
| def S(table_name: typing.Optional[str] = None, db_name: str = 'default'): |
| from wptoolkitModel.sqlite_easy import Db |
|
|
| query = Db(db_name).query() |
|
|
| if table_name is not None and str(table_name).strip() != '': |
| query.table(str(table_name).strip()) |
|
|
| return query |
|
|
| def OfficialDownloadBase(): |
| return 'https://node.aapanel.com' |
|
|
| def build_multipart(data: typing.Dict) -> aap_t_http_multipart: |
| boundary = b'----AapanelFormBoundary' + GetRandomString(16).encode('utf-8') |
| body = b'' |
|
|
| |
| |
| eol = b'\r\n' |
|
|
| for k in data.keys(): |
| v = data[k] |
|
|
| |
| if isinstance(v, tuple) and len(v) == 2: |
| bs, filename = v |
|
|
| if isinstance(bs, bytes) and isinstance(filename, str): |
| body += b'--' + boundary + eol + b'Content-Disposition: form-data; name="' + k.encode('utf-8') + b'"; filename="' + filename.encode('utf-8') + b'"' + eol + b'Content-Type: application/octet-stream' + eol + eol + bs + eol |
| |
| else: |
| |
| if isinstance(v, str) or is_number(v): |
| v = str(v).encode('utf-8') |
|
|
| |
| if isinstance(v, bytes): |
| body += b'--' + boundary + eol + b'Content-Disposition: form-data; name="' + k.encode('utf-8') + b'"' + eol + eol + v + eol |
|
|
| body += b'--' + boundary + b'--' + eol |
|
|
| return aap_t_http_multipart(headers={ |
| 'Content-Type': 'multipart/form-data; boundary=' + boundary.decode('utf-8'), |
| 'Content-Length': str(len(body)), |
| }, body=body) |
|
|
| def SqliteConn(db_name: str = 'default'): |
| from wptoolkitModel.sqlite_easy import Db |
| return Db(db_name) |
|
|
| aap_t_simple_site_info = collections.namedtuple('aap_t_simple_site_info', ['site_id', 'database_id']) |
|
|
|
|
| |
| def get_webserver(): |
| if os.path.exists('{}/apache/bin/apachectl'.format(get_setup_path())): |
| webserver = 'apache' |
| elif os.path.exists('/usr/local/lsws/bin/lswsctrl'): |
| webserver = 'openlitespeed' |
| else: |
| webserver = 'nginx' |
| return webserver |
|
|
|
|
| |
| def get_site_php_version(siteName: str) -> str: |
| try: |
| webserver = get_webserver() |
| setup_path = get_setup_path() |
|
|
| conf = readFile( |
| '{setup_path}/panel/vhost/{webserver}/{siteName}.conf'.format(setup_path=setup_path, webserver=webserver, |
| siteName=siteName)) |
| if webserver == 'openlitespeed': |
| conf = readFile(setup_path + '/panel/vhost/' + webserver + '/detail/' + siteName + '.conf') |
| if webserver == 'nginx': |
| rep = r"enable-php-(\w{2,5})[-\w]*\.conf" |
| elif webserver == 'apache': |
| rep = r"php-cgi-(\w{2,5})\.sock" |
| else: |
| rep = r"path\s*/usr/local/lsws/lsphp(\d+)/bin/lsphp" |
|
|
| tmp = re.search(rep, conf).groups() |
|
|
| if tmp[0] == '00': |
| return 'Static' |
|
|
| if tmp[0] == 'other': |
| return 'Other' |
|
|
| return tmp[0][0] + '.' + tmp[0][1] |
| except: |
| return 'Static' |
|
|
|
|
| |
| def fix_permissions(site_root_path_or_site_file: str) -> aap_t_simple_result: |
| """ |
| :param site_root_path_or_site_file: str 网站根目录或者单一网站文件 |
| :return: |
| """ |
| from files import files |
| data = files().fix_permissions(to_dict_obj({'path': site_root_path_or_site_file})) |
|
|
| if int(data.get('status', 0)) != 0: |
| return aap_t_simple_result(False, data.get('msg', 'Failed to fix permission')) |
|
|
| return aap_t_simple_result(True, data.get('msg', 'Fix permission successfully')) |
|
|
|
|
| def run_plugin(plugin_name: str, def_name: str, args: dict_obj): |
| import PluginLoader |
| res = PluginLoader.plugin_run(plugin_name, def_name, args) |
| if isinstance(res, dict): |
| if 'status' in res and res['status'] == False and 'msg' in res: |
| if isinstance(res['msg'], str): |
| if res['msg'].find('Traceback ') != -1: |
| raise PanelError(res['msg']) |
| return res |
|
|
| |
| def dumpsql_with_aap(database_id: int, backup_path: typing.Optional[str] = None): |
| import shlex |
| db_find = M('databases').where("id=?", (database_id,)).find() |
|
|
| if not isinstance(db_find, dict): |
| raise HintException(get_msg_gettext('Table {} has been corrupted', ('databases',))) |
|
|
| if backup_path is None: |
| backup_path_tmp = M('config').order('`id` desc').limit(1).field('backup_path').find() |
|
|
| if not isinstance(backup_path_tmp, dict): |
| raise HintException(get_msg_gettext('Table {} has been corrupted', ('config',))) |
|
|
| backup_path = os.path.join(str(backup_path_tmp['backup_path']), 'database') |
|
|
| name = db_find['name'] |
| fileName = name + '_' + time.strftime('%Y%m%d_%H%M%S', time.localtime()) + '.sql.gz' |
| backupName = os.path.join(backup_path, fileName) |
| mysqldump_bin = get_mysqldump_bin() |
|
|
| from database import database |
| database_obj = database() |
|
|
| if db_find['db_type'] in ['0', 0]: |
| |
| |
| with MysqlConn() as conn: |
| conn.execute("show databases") |
|
|
| root = M('config').where('id=?', (1,)).getField('mysql_root') |
| if not os.path.exists(backup_path): |
| os.makedirs(backup_path, 0o600) |
|
|
| if not database_obj.mypass(True, root): |
| raise HintException(get_msg_gettext("Database configuration file failed to get checked, please check " |
| "if MySQL configuration file exists [/etc/my.cnf]")) |
|
|
| try: |
| password = M('config').where('id=?', (1,)).getField('mysql_root') |
| if not password: |
| raise HintException(get_msg_gettext("Database password cannot be empty")) |
|
|
| password = shlex.quote(str(password)) |
| os.environ["MYSQL_PWD"] = password |
| ExecShell(mysqldump_bin + " -R -E --triggers=false --default-character-set=" + get_database_character(name) + " --force --opt \"" + name + "\" -u root -p" + password + " | gzip > " + backupName) |
| finally: |
| os.environ["MYSQL_PWD"] = "" |
|
|
| database_obj.mypass(False, root) |
|
|
| elif db_find['db_type'] in ['1', 1]: |
| |
| try: |
| conn_config = json.loads(db_find['conn_config']) |
| res = database_obj.CheckCloudDatabase(conn_config) |
| if isinstance(res, dict): |
| raise HintException(res.get('msg', get_msg_gettext('Cannot connect to remote MySQL'))) |
|
|
| password = shlex.quote(str(conn_config['db_password'])) |
| os.environ["MYSQL_PWD"] = password |
| ExecShell(mysqldump_bin + " -h " + conn_config['db_host'] + " -P " + str(int(conn_config['db_port'])) + " -R -E --triggers=false --default-character-set=" + get_database_character(name) + " --force --opt \"" + str(db_find['name']) + "\" -u " + str(conn_config['db_user']) + " -p" + password + " | gzip > " + backupName) |
| finally: |
| os.environ["MYSQL_PWD"] = "" |
|
|
| elif db_find['db_type'] in ['2', 2]: |
| try: |
| conn_config = M('database_servers').where('id=?', db_find['sid']).find() |
| res = database_obj.CheckCloudDatabase(conn_config) |
| if isinstance(res, dict): |
| raise HintException(res.get('msg', get_msg_gettext('Cannot connect to remote MySQL'))) |
|
|
| password = shlex.quote(str(conn_config['db_password'])) |
| os.environ["MYSQL_PWD"] = password |
| ExecShell(mysqldump_bin + " -h " + conn_config['db_host'] + " -P " + str(int(conn_config['db_port'])) + " -R -E --triggers=false --default-character-set=" + get_database_character(name) + " --force --opt \"" + str(db_find['name']) + "\" -u " + str(conn_config['db_user']) + " -p" + str(conn_config['db_password']) + " | gzip > " + backupName) |
| finally: |
| os.environ["MYSQL_PWD"] = "" |
|
|
| else: |
| raise HintException(get_msg_gettext("Unsupported database type")) |
|
|
| if not os.path.exists(backupName): |
| raise HintException(get_msg_gettext("Backup error")) |
|
|
| |
| |
|
|
| return aap_t_mysql_dump_info(db_name=str(db_find['name']), file=backupName, dump_time=int(time.time())) |
|
|
| def restore(db_name: str, bak_file: str): |
| from database import database |
| data = database().InputSql(to_dict_obj({'name': db_name, 'file': bak_file})) |
| return data |
|
|
| def get_available_php_ver_shorts(without_static: bool = True) -> typing.List[str]: |
| from panelSite import panelSite |
| lst = panelSite().GetPHPVersion(to_dict_obj({})) |
|
|
| if without_static: |
| lst = filter(lambda x: x['version'] != '00', lst) |
|
|
| return list(map(lambda x: x['version'], lst)) |
|
|
| def create_php_site_with_mysql(domain: str, site_path: str, php_ver_short: str, db_user: str, db_pwd: str, another_domains: typing.List = ()) -> aap_t_simple_site_info: |
| """ |
| :param domain: str 网站主域名 |
| :param site_path: str 网站根目录(绝对路径) |
| :param php_ver_short: str PHP版本号缩写 54、74、80、81... |
| :param db_user: str 数据库用户名 |
| :param db_pwd: str 数据库用户密码 |
| :param another_domains: list 网站其它解析域名 |
| :return: aap_t_simple_site_info |
| """ |
| from panelSite import panelSite |
| data = panelSite().AddSite(to_dict_obj({ |
| 'webname': json.dumps({ |
| 'domain': domain, |
| 'domainlist': list(another_domains), |
| 'count': 0, |
| }), |
| 'ftp': '0', |
| 'type': 'PHP', |
| 'version': php_ver_short, |
| 'port': '80', |
| 'path': site_path, |
| 'sql': 'MySQL', |
| 'datauser': db_user, |
| 'datapassword': db_pwd, |
| 'codeing': 'utf8mb4', |
| 'ps': domain.replace('.', '_').replace('-', '_'), |
| })) |
|
|
| |
| |
| |
| |
| if "status" in data and not data["status"]: |
| return data |
| print(data) |
| if int(data.get('databaseStatus', 0)) != 1: |
| raise HintException('数据库创建失败,请检查mysql运行状态并重试') |
| db_data = M('databases').where('pid=?', (data['siteId'],)).find() |
| data['d_id'] = db_data['id'] |
| return aap_t_simple_site_info(data['siteId'], data['d_id']) |
|
|
| def remove_site(site_id: int) -> aap_t_simple_result: |
| site_info = M('sites').where('`id` = ?', (site_id,)).field('name').find() |
|
|
| if not isinstance(site_info, dict): |
| return aap_t_simple_result(False, '没找到站点信息 {}'.format(site_id)) |
|
|
| from panelSite import panelSite |
| data = panelSite().DeleteSite(to_dict_obj({ |
| 'id': site_id, |
| 'webname': site_info['name'], |
| 'ftp': '1', |
| 'path': '1', |
| 'database': '1', |
| })) |
| print(data) |
|
|
| return data |
|
|
| def escape_sql_str(s: str) -> str: |
| return search_sql_special_chars.sub(r'\\\g<0>', s) |
|
|
| def check_password(password): |
| """ |
| 密码强度: |
| 0 弱 |
| 1 中 |
| 2 强 |
| """ |
| l = 0 |
| low = False |
| up = False |
| symbol = False |
| digit = False |
| p_len = len(password) |
| if p_len < 8: |
| return l |
| for i in password: |
| if i.islower(): |
| low = True |
| if i.isupper(): |
| up = True |
| if i in ['~', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '_', '-', '=', '+', '<', '>', ',', '.', '/', |
| '"', '|', '\\', "'", '?']: |
| symbol = True |
| if i.isdigit(): |
| digit = True |
| |
| tmp = len(set([i for i in password])) |
| if tmp >= 2: |
| if low and up and symbol and digit: |
| l = 2 |
| if p_len >= 11: |
| l = 1 |
| return l |
|
|
| def restore_file(file, act=None): |
| """ |
| @name 还原配置文件 |
| @author zhwen<zhw@aapanel.com> |
| @param file 需要还原的文件 |
| @param act 如果存在,则还原默认配置 |
| """ |
| file_type = "_bak" |
| if act: |
| file_type = "_def" |
| ExecShell("/usr/bin/cp -p {1} {0}".format(file, file + file_type)) |