File size: 6,837 Bytes
17e971c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | #coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2020 宝塔软件(http://www.bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: 沐落 <cjx@bt.cn>
# | Author: lx
# | 消息通道邮箱模块
# +-------------------------------------------------------------------
import os, sys, public, base64, json, re
import sys, os
panelPath = "/www/server/panel"
os.chdir(panelPath)
sys.path.insert(0,panelPath + "/class/")
import public
class sms_msg:
_APIURL = 'http://www.bt.cn/api/wmsg'
__UPATH = panelPath + '/data/userInfo.json'
conf_path = panelPath + '/data/sms_main.json'
#构造方法
def __init__(self):
self.setupPath = public.GetConfigValue('setup_path')
pdata = {}
data = {}
if os.path.exists(self.__UPATH):
try:
self.__userInfo = json.loads(public.readFile(self.__UPATH));
if self.__userInfo:
pdata['access_key'] = self.__userInfo['access_key'];
data['secret_key'] = self.__userInfo['secret_key'];
except :
self.__userInfo = None
else:
pdata['access_key'] = 'test'
data['secret_key'] = '123456'
pdata['data'] = data
self.__PDATA = pdata
self.__module_name = self.__class__.__name__.replace('_msg','')
def get_version_info(self,get):
"""
获取版本信息
"""
data = {}
data['ps'] = '宝塔短信消息通道,用于接收面板消息推送'
data['version'] = '1.1'
data['date'] = '2022-08-02'
data['author'] = '宝塔'
data['title'] = '短信'
data['help'] = 'http://www.bt.cn'
return data
def get_config(self,get):
result = {}
data = {}
skey = 'sms_count_{}'.format(public.get_user_info()['username'])
try:
from BTPanel import cache
result = cache.get(skey)
except:
cache = None
if not result:
result = self.request('get_user_sms')
if cache: cache.set(skey,result,3600)
try:
data = json.loads(public.readFile(self.conf_path))
except :pass
for key in data.keys():
result[key] = data[key]
return result
def is_strong_password(self,password):
"""判断密码复杂度是否安全
非弱口令标准:长度大于等于9,分别包含数字、小写。
@return: True/False
@author: linxiao<2020-9-19>
"""
if len(password) < 6:return False
import re
digit_reg = "[0-9]" # 匹配数字 +1
lower_case_letters_reg = "[a-z]" # 匹配小写字母 +1
special_characters_reg = r"((?=[\x21-\x7e]+)[^A-Za-z0-9])" # 匹配特殊字符 +1
regs = [digit_reg,lower_case_letters_reg,special_characters_reg]
grade = 0
for reg in regs:
if re.search(reg, password):
grade += 1
if grade >= 2 or (grade == 1 and len(password) >= 9):
return True
return False
def __check_auth_path(self):
auth = public.readFile('data/admin_path.pl')
if not auth: return False
slist = ['/','/123456','/admin123','/111111','/bt','/login','/cloudtencent','/tencentcloud','/admin','/admin888','/test']
if auth in slist: return False
if not self.is_strong_password(auth.strip('/')):
return False
return True
def set_config(self,get):
data = {}
try:
data = json.loads(public.readFile(self.conf_path))
except :pass
if 'login' in get:
is_login = int(get['login'])
if is_login and not self.__check_auth_path(): return public.returnMsg(False,'安全入口过于简单,存在安全隐患. <br>1、长度不得少于9位<br>2、英文+数字组合.')
data['login'] = is_login
public.writeFile(self.conf_path,json.dumps(data));
return public.returnMsg(True, '操作成功!')
"""
@发送短信
@sm_type 预警类型, ssl_end|宝塔SSL到期提醒
@sm_args 预警参数
"""
def send_msg(self,sm_type = None,sm_args = None):
s_type = sm_type
title = '宝塔告警提醒'
tmps = sm_type.split('|')
if len(tmps) >= 2:
s_type = tmps[0]
title = tmps[1]
self.__PDATA['data']['sm_type'] = s_type
self.__PDATA['data']['sm_args'] = sm_args
result = self.request('send_msg')
try:
res = {}
uinfo = public.get_user_info()
u_key = '{}****{}'.format(uinfo['username'][0:3],uinfo['username'][-3:])
res[u_key] = 0
if result['status']:
res[u_key] = 1
skey = 'sms_count_{}'.format(public.get_user_info()['username'])
try:
from BTPanel import cache
except:
cache = None
if not result:
result = self.request('get_user_sms')
if cache: cache.set(skey,result,3600)
public.write_push_log(self.__module_name,title,res)
except:pass
return result
def canonical_data(self, args):
"""规范数据内容
Args:
args(dict): 消息原始参数
Returns:
new args: 替换后的消息参数
"""
if not type(args) == dict: return args
new_args = {}
for param, value in args.items():
if type(value) != str:
new_str = str(value)
else:
new_str = value.replace(".", "_").replace("+", "+")
new_args[param] = new_str
return new_args
def push_data(self,data):
sm_args = self.canonical_data(data['sm_args'])
return self.send_msg(data['sm_type'],sm_args)
#发送请求
def request(self,dname):
pdata = {}
pdata['access_key'] = self.__PDATA['access_key']
pdata['data'] = json.dumps(self.__PDATA['data'])
try:
result = public.httpPost(self._APIURL + '/' + dname,pdata)
result = json.loads(result)
# print("发送result:")
# print(result)
return result
except Exception as e:
# print("短信发送异常:")
# print(e)
return public.returnMsg(False,public.get_error_info())
def uninstall(self):
if os.path.exists(self.conf_path):
os.remove(self.conf_path) |