| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| import binascii, base64, re, json, os, sys, time, io |
| import psutil |
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
| import public |
|
|
| class Base: |
|
|
| def __init__(self): |
| pass |
|
|
| def M(self, table_name): |
| import db |
| sql = db.Sql() |
| sql._Sql__DB_FILE = '/www/vmail/postfixadmin.db' |
| sql._Sql__encrypt_keys = [] |
| return sql.table(table_name) |
|
|
|
|
| |
| def _encode(self, data): |
| str2 = data.strip() |
| if sys.version_info[0] == 2: |
| b64_data = base64.b64encode(str2) |
| else: |
| b64_data = base64.b64encode(str2.encode('utf-8')) |
| return binascii.hexlify(b64_data).decode() |
|
|
| |
| def _decode(self, data): |
| b64_data = binascii.unhexlify(data.strip()) |
| return base64.b64decode(b64_data).decode() |
|
|
| |
| def _get_pubilc_ip(self): |
| import requests |
|
|
| try: |
| |
| url = 'https://ifconfig.me/ip' |
| opener = requests.get(url) |
| m_str = opener.text |
| ip_address = re.search(r'\d+.\d+.\d+.\d+', m_str).group(0) |
| c_ip = public.check_ip(ip_address) |
| if not c_ip: |
| a, e = public.ExecShell("curl ifconfig.me") |
| return a |
| return ip_address |
| except: |
| filename = '/www/server/panel/data/iplist.txt' |
| ip_address = public.readFile(filename).strip() |
| if public.check_ip(ip_address): |
| return ip_address |
| else: |
| return None |
|
|
| |
| def _check_email_address(self, email_address): |
| return True if re.match(r"^\w+([.-]?\w+)*@.*", email_address) else False |
| def _get_all_ip(self): |
| |
| public_ip = self._get_pubilc_ip() |
| net_info = psutil.net_if_addrs() |
| addr = [] |
| for i in net_info.values(): |
| addr.append(i[0].address) |
| locataddr = public.readFile('/www/server/panel/data/iplist.txt') |
| if not locataddr: |
| locataddr = "" |
| ip_address = locataddr.strip() |
| if ip_address not in addr: |
| addr.append(ip_address) |
| if public_ip not in addr: |
| addr.append(public_ip) |
| return addr |
| def _ipv6_to_ptr(self,ipv6_address): |
|
|
| parts = ipv6_address.split(':') |
| normalized_parts = [part.zfill(4) for part in parts] |
| |
| normalized_address = ''.join(normalized_parts) |
| |
| reversed_address = normalized_address[::-1] |
| |
| ptr_address_parts = list(reversed_address) |
| ptr_address = '.'.join(ptr_address_parts) |
| ptr_address += '.ip6.arpa' |
| |
|
|
| return ptr_address |
|
|
| def returnResult(self, status=True, msg="OK", data=None, timestamp=None, code=0, args=None): |
| ''' |
| 通用响应对象 |
| @param code: 0:成功 1:失败 2:警告 ... |
| @param status: |
| @param msg: 只传msg,不传需要前端处理的数据 |
| @param data: 只传需要前端处理的数据 |
| @param timestamp: 秒级时间戳 |
| @return: |
| |
| 使用示例: |
| 成功:return dp.returnResult(data=data) |
| 失败:return dp.returnResult(code=1, status=False, msg="获取失败!", data=[]) |
| 失败:return dp.returnResult(code=1, status=False, msg="获取失败!") |
| 警告:return dp.returnResult(code=2, status=False, msg="警告,xxxxxxxxxxx!") |
| ... |
| ''' |
| import time |
| if timestamp is None: |
| timestamp = int(time.time()) |
|
|
| try: |
| log_message = json.loads( |
| public.ReadFile('BTPanel/static/language/' + public.GetLanguage() + '/public.json')) |
| keys = log_message.keys() |
| except: |
| log_message = {} |
| keys = [] |
|
|
| if type(msg) == str: |
| if msg in keys: |
| msg = log_message[msg] |
| for i in range(len(args)): |
| rep = '{' + str(i + 1) + '}' |
| msg = msg.replace(rep, args[i]) |
|
|
| return { |
| "code": code, |
| "status": status, |
| "msg": msg, |
| "data": data, |
| "timestamp": timestamp |
| } |
|
|
| def return_msg(self, result): |
| if not isinstance(result, dict): |
| return self.returnResult(True, "", result, time.time(), 0, ) |
| status = result.get('status', True) |
| msg = result.get('msg', '') |
| data = result |
| timestamp = result.get('timestamp', time.time()) |
| code = 0 if status else 1 |
| return self.returnResult(status, msg, data, timestamp, code, ) |
| |
| |