File size: 6,837 Bytes
020c337
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2020 宝塔软件(http://www.bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: 沐落 <cjx@bt.cn>
# | Author: lx
# | 消息通道邮箱模块 
# +-------------------------------------------------------------------

import os, sys, public, base64, json, re
import sys, os
panelPath = "/www/server/panel"
os.chdir(panelPath)
sys.path.insert(0,panelPath + "/class/")
import public

class sms_msg:

    _APIURL = 'http://www.bt.cn/api/wmsg'
    __UPATH = panelPath + '/data/userInfo.json'
    conf_path = panelPath + '/data/sms_main.json'

        #构造方法
    def __init__(self):
        self.setupPath = public.GetConfigValue('setup_path')
        pdata = {}
        data = {}
        if os.path.exists(self.__UPATH):
            try:
                self.__userInfo = json.loads(public.readFile(self.__UPATH));

                if self.__userInfo:
                    pdata['access_key'] = self.__userInfo['access_key'];
                    data['secret_key'] = self.__userInfo['secret_key'];
            except :
                self.__userInfo = None
        else:
            pdata['access_key'] = 'test'
            data['secret_key'] = '123456'

        pdata['data'] = data
        self.__PDATA = pdata
        self.__module_name = self.__class__.__name__.replace('_msg','')


    def get_version_info(self,get):
        """
        获取版本信息
        """
        data = {}
        data['ps'] = '宝塔短信消息通道,用于接收面板消息推送'
        data['version'] = '1.1'
        data['date'] = '2022-08-02'
        data['author'] = '宝塔'
        data['title'] = '短信'
        data['help'] = 'http://www.bt.cn'
        return data

    def get_config(self,get):
        result = {}
        data = {}
        skey = 'sms_count_{}'.format(public.get_user_info()['username'])
        try:
            from BTPanel import cache
            result = cache.get(skey)
        except:
            cache = None
        if not result:
            result = self.request('get_user_sms')
            if cache: cache.set(skey,result,3600)
        try:
            data = json.loads(public.readFile(self.conf_path))
        except :pass

        for key in data.keys():
            result[key] = data[key]
        return result


    def is_strong_password(self,password):
        """判断密码复杂度是否安全
        非弱口令标准:长度大于等于9,分别包含数字、小写。
        @return: True/False
        @author: linxiao<2020-9-19>
        """
        if len(password) < 6:return False

        import re
        digit_reg = "[0-9]"  # 匹配数字 +1
        lower_case_letters_reg = "[a-z]"  # 匹配小写字母 +1
        special_characters_reg = r"((?=[\x21-\x7e]+)[^A-Za-z0-9])"  # 匹配特殊字符 +1

        regs = [digit_reg,lower_case_letters_reg,special_characters_reg]
        grade = 0
        for reg in regs:
            if re.search(reg, password):
                grade += 1

        if grade >= 2 or (grade == 1 and len(password) >= 9):
            return True
        return False

    def __check_auth_path(self):

        auth = public.readFile('data/admin_path.pl')
        if not auth: return False

        slist = ['/','/123456','/admin123','/111111','/bt','/login','/cloudtencent','/tencentcloud','/admin','/admin888','/test']
        if auth in slist: return False

        if not self.is_strong_password(auth.strip('/')):
            return False
        return True

    def set_config(self,get):

        data = {}
        try:
            data = json.loads(public.readFile(self.conf_path))
        except :pass

        if 'login' in get:
            is_login = int(get['login'])
            if is_login and not self.__check_auth_path(): return public.returnMsg(False,'安全入口过于简单,存在安全隐患. <br>1、长度不得少于9位<br>2、英文+数字组合.')
            data['login'] = is_login

        public.writeFile(self.conf_path,json.dumps(data));
        return public.returnMsg(True, '操作成功!')

    """
    @发送短信
    @sm_type 预警类型, ssl_end|宝塔SSL到期提醒
    @sm_args 预警参数
    """
    def send_msg(self,sm_type = None,sm_args = None):

        s_type = sm_type
        title = '宝塔告警提醒'
        tmps = sm_type.split('|')
        if len(tmps) >= 2:
            s_type = tmps[0]
            title = tmps[1]

        self.__PDATA['data']['sm_type'] = s_type
        self.__PDATA['data']['sm_args'] = sm_args
        result = self.request('send_msg')

        try:

            res = {}
            uinfo = public.get_user_info()
            u_key = '{}****{}'.format(uinfo['username'][0:3],uinfo['username'][-3:])

            res[u_key] = 0
            if result['status']:
                res[u_key] = 1

                skey = 'sms_count_{}'.format(public.get_user_info()['username'])
                try:
                    from BTPanel import cache
                except:
                    cache = None
                if not result:
                    result = self.request('get_user_sms')
                    if cache: cache.set(skey,result,3600)

            public.write_push_log(self.__module_name,title,res)
        except:pass

        return result

    def canonical_data(self, args):
        """规范数据内容

        Args:
            args(dict): 消息原始参数

        Returns:
            new args: 替换后的消息参数
        """

        if not type(args) == dict: return args
        new_args = {}
        for param, value in args.items():
            if type(value) != str:
                new_str = str(value)
            else:
                new_str = value.replace(".", "_").replace("+", "+")
            new_args[param] = new_str
        return new_args

    def push_data(self,data):
        sm_args = self.canonical_data(data['sm_args'])
        return self.send_msg(data['sm_type'],sm_args)

    #发送请求
    def request(self,dname):

        pdata = {}
        pdata['access_key'] = self.__PDATA['access_key']
        pdata['data'] = json.dumps(self.__PDATA['data'])
        try:
            result = public.httpPost(self._APIURL + '/' + dname,pdata)
            result = json.loads(result)
            # print("发送result:")
            # print(result)
            return result
        except Exception as e:
            # print("短信发送异常:")
            # print(e)
            return public.returnMsg(False,public.get_error_info())

    def uninstall(self):
        if os.path.exists(self.conf_path):
            os.remove(self.conf_path)