File size: 8,532 Bytes
17e971c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# WP Toolkit security
# author: hzh<2024-07-02>
import public_wp as public
from public_wp import Param


# WP安全模块类
class wp_security:
        
    # 开启文件防护
    def open_file_protection(self, get):
        # 校验参数
        try:
            get.validate([
                Param('paths').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        tamper_core_status = public.run_plugin('tamper_core', 'get_service_status', public.to_dict_obj({}))
        if not (tamper_core_status["kernel_module_status"] and tamper_core_status["controller_status"]):
            return public.return_message(-1, 0, public.lang("打开失败请到应用商店-企业级防篡改查看详情"))

        result= public.run_plugin('tamper_core', 'multi_create', public.to_dict_obj({
            'paths': get.paths,
        }))
        if 'status' in result:
            return public.return_message(-1,0,result[0]['msg'])
        if type(result)==list:
            # for i in result:
                
            public.run_plugin('tamper_core', 'assign_rule_to_directory', public.to_dict_obj({
            'rule_group_name': 'WordPress Normal',
            'path_id': result[0]['pid'],
        })) 
            return public.return_message(0, 0, public.lang("开启成功"))
    
    #取安全防护配置
    def get_security_info(self, get):

        # 校验参数
        try:
            get.validate([
                Param('path').Require().String(),
                Param('site_name').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))

        return_result={
            'hotlink_status':0,
            'file_status':0,
            'firewall_status':0,
            'file_count':0,
            'firewall_count':0,
        }
        if get.path[-1] !='/':
            get.path+='/'
        #取文件防护配置
        try:
            tamper_core_status = public.run_plugin('tamper_core', 'get_service_status', public.to_dict_obj({}))
            if tamper_core_status["kernel_module_status"] and tamper_core_status["controller_status"]:
                result= public.run_plugin('tamper_core', 'get_tamper_paths', public.to_dict_obj({}))
                if type(result)==list:
                    for i in result:
                        if i['path']==get.path and i['status']==1:
                            return_result['file_status']=1
                            today=i['total']['today']
                            return_result['file_count']=int(today['create'])+int(today['modify'])+int(today['unlink'])+int(today['rename'])+int(today['mkdir'])+int(today['rmdir'])+int(today['chmod'])+int(today['chown'])+int(today['link'])
                            break
        except:pass

        #取防火墙配置
        result= public.run_plugin('btwaf', 'get_site_config_byname', public.to_dict_obj({'siteName':get.site_name}))
        try:
            if result['open']:
                return_result['firewall_status']=1
        except:
            pass
        result= public.run_plugin('btwaf', 'get_site_config3', public.to_dict_obj({'siteName':get.site_name}))
        try:
            if type(result['data'])==list:
                for i in result['data']:
                    if i['siteName']!=get.site_name:
                        continue
                    if type(i['total'])==list:
                        for total in i['total']:
                            return_result['firewall_count']+=int(total['value'])
        except:
            pass
                        
        return public.return_message(0,0,return_result)

   #关闭文件保护
    def close_file_protection(self, get):
        # 校验参数
        try:
            get.validate([
                Param('path_id').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        tamper_core_status = public.run_plugin('tamper_core', 'get_service_status', public.to_dict_obj({}))
        if not (tamper_core_status["kernel_module_status"] and tamper_core_status["controller_status"]):
            return public.return_message(-1, 0, public.lang("打开失败请到应用商店-企业级防篡改查看详情"))

        result= public.run_plugin('tamper_core', 'remove_path_config', public.to_dict_obj({
            'path_id': get.path_id,
        }))
        status=0
        if not result['status']:
            status=-1
        return public.return_message(status,0,result['msg'])
        
        
        
    # 开启防火墙防护
    def open_firewall_protection(self, get):
        # 校验参数
        try:
            get.validate([
                Param('site_name').Require().String(),
                Param('obj').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        
        result= public.run_plugin('btwaf', 'get_total_all', public.to_dict_obj({
        }))
        if not result['open']:
            result= public.run_plugin('btwaf', 'set_open', public.to_dict_obj({
            }))
            if not result['status']:
                return public.return_message(-1, 0, public.lang("开启失败"))
        
        result= public.run_plugin('btwaf', 'get_site_config_byname', public.to_dict_obj({
            'siteName': get.site_name,
        }))
        if not result['open']:
            result= public.run_plugin('btwaf', 'set_site_obj_open', public.to_dict_obj({
                    'siteName': get.site_name,
                    'obj': get.obj,
                }))
            if not result['status']:
                return public.return_message(-1, 0, public.lang("开启失败"))
        return public.return_message(0, 0, public.lang("开启成功"))
        
        
    # 关闭防火墙防护
    def close_firewall_protection(self, get):
        # 校验参数
        try:
            get.validate([
                Param('site_name').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        
        result= public.run_plugin('btwaf', 'set_site_obj_open', public.to_dict_obj({
            'siteName': get.site_name,
            'obj': 'open',
        }))
        if not result['status']:
            return public.return_message(-1, 0, public.lang("关闭失败"))
        return public.return_message(0, 0, public.lang("关闭成功"))
        
    # 获取防火墙防护配置
    def get_firewall_info(self, get):
        # 校验参数
        try:
            get.validate([
                Param('site_name').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        
        result= public.run_plugin('btwaf', 'get_site_config_byname', public.to_dict_obj({
            'siteName':get.site_name,
        }))
        return public.return_message(0,0,result)
        
        
    # 获取文件防护配置
    def get_file_info(self, get):
        # 校验参数
        try:
            get.validate([
                Param('path').Require().String(),
            ], [
                public.trim_filter(),
            ])
        except Exception as ex:
            public.print_log("error info: {}".format(ex))
            return public.return_message(-1, 0, str(ex))
        if get.path[-1] !='/':
            get.path+='/'
        result= public.run_plugin('tamper_core', 'get_tamper_paths', public.to_dict_obj({
            'path':get.path,
        }))
        for i in result:
            if i['path']==get.path:
                return public.return_message(0,0,i)
        return public.return_message(0,0,{})