File size: 5,470 Bytes
3a5cf48 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | # coding: utf-8
# -------------------------------------------------------------------
# aapanel
# -------------------------------------------------------------------
# Copyright (c) 2014-2099 aapanel(http://www.aapanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: wzz <wzz@bt.cn>
# -------------------------------------------------------------------
# ------------------------------
# 邮局 - 底层基类
# ------------------------------
import binascii, base64, re, json, os, sys, time, io
import psutil
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import public
class Base:
def __init__(self):
pass
def M(self, table_name):
import db
sql = db.Sql()
sql._Sql__DB_FILE = '/www/vmail/postfixadmin.db'
sql._Sql__encrypt_keys = []
return sql.table(table_name)
# 加密数据
def _encode(self, data):
str2 = data.strip()
if sys.version_info[0] == 2:
b64_data = base64.b64encode(str2)
else:
b64_data = base64.b64encode(str2.encode('utf-8'))
return binascii.hexlify(b64_data).decode()
# 解密数据
def _decode(self, data):
b64_data = binascii.unhexlify(data.strip())
return base64.b64decode(b64_data).decode()
# 获取公网ip
def _get_pubilc_ip(self):
import requests
try:
#url = 'http://pv.sohu.com/cityjson?ie=utf-8'
url = 'https://ifconfig.me/ip'
opener = requests.get(url)
m_str = opener.text
ip_address = re.search(r'\d+.\d+.\d+.\d+', m_str).group(0)
c_ip = public.check_ip(ip_address)
if not c_ip:
a, e = public.ExecShell("curl ifconfig.me")
return a
return ip_address
except:
filename = '/www/server/panel/data/iplist.txt'
ip_address = public.readFile(filename).strip()
if public.check_ip(ip_address):
return ip_address
else:
return None
# 检查邮箱合法性
def _check_email_address(self, email_address):
return True if re.match(r"^\w+([.-]?\w+)*@.*", email_address) else False
def _get_all_ip(self):
# import psutil
public_ip = self._get_pubilc_ip()
net_info = psutil.net_if_addrs()
addr = []
for i in net_info.values():
addr.append(i[0].address)
locataddr = public.readFile('/www/server/panel/data/iplist.txt')
if not locataddr:
locataddr = ""
ip_address = locataddr.strip()
if ip_address not in addr:
addr.append(ip_address)
if public_ip not in addr:
addr.append(public_ip)
return addr
def _ipv6_to_ptr(self,ipv6_address):
parts = ipv6_address.split(':')
normalized_parts = [part.zfill(4) for part in parts]
# 去掉冒号
normalized_address = ''.join(normalized_parts)
# 反转字符串
reversed_address = normalized_address[::-1]
# 加上点号
ptr_address_parts = list(reversed_address)
ptr_address = '.'.join(ptr_address_parts)
ptr_address += '.ip6.arpa'
# public.print_log("ptr_address ^--{}".format(ptr_address))
return ptr_address
def returnResult(self, status=True, msg="OK", data=None, timestamp=None, code=0, args=None):
'''
通用响应对象
@param code: 0:成功 1:失败 2:警告 ...
@param status:
@param msg: 只传msg,不传需要前端处理的数据
@param data: 只传需要前端处理的数据
@param timestamp: 秒级时间戳
@return:
使用示例:
成功:return dp.returnResult(data=data)
失败:return dp.returnResult(code=1, status=False, msg="获取失败!", data=[])
失败:return dp.returnResult(code=1, status=False, msg="获取失败!")
警告:return dp.returnResult(code=2, status=False, msg="警告,xxxxxxxxxxx!")
...
'''
import time
if timestamp is None:
timestamp = int(time.time())
try:
log_message = json.loads(
public.ReadFile('BTPanel/static/language/' + public.GetLanguage() + '/public.json'))
keys = log_message.keys()
except:
log_message = {}
keys = []
if type(msg) == str:
if msg in keys:
msg = log_message[msg]
for i in range(len(args)):
rep = '{' + str(i + 1) + '}'
msg = msg.replace(rep, args[i])
return {
"code": code,
"status": status,
"msg": msg,
"data": data,
"timestamp": timestamp
}
def return_msg(self, result):
if not isinstance(result, dict):
return self.returnResult(True, "", result, time.time(), 0, )
status = result.get('status', True)
msg = result.get('msg', '')
data = result
timestamp = result.get('timestamp', time.time())
code = 0 if status else 1
return self.returnResult(status, msg, data, timestamp, code, )
# return {'status': result.get('status', False), 'message': result, 'timestamp': time.time()}
# return result |