File size: 11,662 Bytes
3a5cf48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import os
import re
import json
from typing import Tuple, Optional, Union
from ipaddress import ip_address

from .util import webserver, check_server_config, write_file, read_file, DB, service_reload
from mod.base import json_response


class _BaseRestrict:
    def __init__(self, config_file: str, site_name: str):
        self._conf_file = config_file
        self._conf = self._read_conf()
        self.site_name = site_name

    def _read_conf(self):
        default_conf = {
                "restrict_type": "closed", 
                "black_list": [],
                "white_list": []
            }
        
        if not os.path.exists(self._conf_file):
            return default_conf
        try:
            conf = json.loads(read_file(self._conf_file))
        except:
            conf = default_conf
        return conf

    def to_view(self):
        return self._conf


class _IpRestrict(_BaseRestrict):
    def __init__(self, site_name: str, config_prefix: str):
        setup_path = "/www/server/panel"
        ip_restrict_conf_dir = "{}/data/ip_restrict_data".format(setup_path)
        if not os.path.exists(ip_restrict_conf_dir):
            os.makedirs(ip_restrict_conf_dir)
        super().__init__("{}/{}{}".format(ip_restrict_conf_dir, config_prefix, site_name), site_name)
        self.config_prefix = config_prefix
        self.nginx_sub_file = "{}/vhost/ip-restrict/{}{}.conf".format(setup_path, self.config_prefix, self.site_name)

    @property
    def restrict_type(self):
        return self._conf.get("restrict_type", "black")

    @restrict_type.setter
    def restrict_type(self, data: str):
        if data in ("black", "white", "closed"):
            self._conf["restrict_type"] = data

    @property
    def black_list(self):
        return self._conf.get("black_list", [])

    @black_list.setter
    def black_list(self, list_data: list):
        self._conf["black_list"] = list_data

    @property
    def white_list(self):
        return self._conf.get("white_list", [])

    @white_list.setter
    def white_list(self, list_data: list):
        self._conf["white_list"] = list_data

    def save(self) -> Tuple[bool, str]:
        if not self._conf:  # 没有的时候不操作
            return True, "操作成功"
        write_file(self._conf_file, json.dumps(self._conf))

        if self.restrict_type == "closed":
            write_file(self.nginx_sub_file, "")
            service_reload()
            return True, "操作成功"

        tmp_conf = []
        if self.restrict_type == "white":
            for i in self.white_list:
                tmp_conf.append("allow {};".format(i))

            tmp_conf.append("deny all; # 除开上述IP外,其他IP全部禁止访问")
        elif self.restrict_type == "black":
            for i in self.black_list:
                tmp_conf.append("deny {};".format(i))
        else:
            raise ValueError("错误的类型,无法操作")

        write_file(self.nginx_sub_file, "\n".join(tmp_conf))
        error_msg = check_server_config()
        if error_msg is not None:
            write_file(self.nginx_sub_file, "")
            return False, "操作失败"
        service_reload()
        return True, "操作成功"

    # 删除网站时调用,删除配置文件
    def remove_config_for_remove_site(self):
        if os.path.isfile(self.nginx_sub_file):
            os.remove(self.nginx_sub_file)

        if os.path.isfile(self._conf_file):
            os.remove(self._conf_file)


class RealIpRestrict:

    def __init__(self, config_prefix: str = ""):
        self.config_prefix = config_prefix
        self.web_server = webserver()

    # 获取某个站点的IP黑白名单详情
    def restrict_conf(self, site_name: str) -> Tuple[bool, Union[str, dict]]:
        if self.web_server != "nginx":
            return False, "不支持除nginx之外的服务器"
        ip_conf = _IpRestrict(site_name, self.config_prefix)
        if not self._get_status_in_nginx_conf(ip_conf):
            ip_conf.restrict_type = "closed"
        return True, ip_conf.to_view()

    # 从配置文件中获取状态
    def _get_status_in_nginx_conf(self, ip_conf: _IpRestrict) -> bool:
        setup_path = "/www/server/panel"
        ng_file = "{}/vhost/nginx/{}{}.conf".format(setup_path, self.config_prefix, ip_conf.site_name)
        rep_include = re.compile(r"\sinclude +.*/ip-restrict/.*\.conf;", re.M)
        ng_conf = read_file(ng_file)
        if not isinstance(ng_conf, str):
            return False
        if rep_include.search(ng_conf):
            return True
        return False

    def _set_nginx_include(self, ip_conf: _IpRestrict) -> Tuple[bool, str]:
        setup_path = "/www/server/panel"
        ng_file = "{}/vhost/nginx/{}{}.conf".format(setup_path, self.config_prefix, ip_conf.site_name)
        if not os.path.exists(os.path.dirname(ip_conf.nginx_sub_file)):
            os.makedirs(os.path.dirname(ip_conf.nginx_sub_file), 0o600)
        if not os.path.isfile(ip_conf.nginx_sub_file):
            write_file(ip_conf.nginx_sub_file, "")

        ng_conf = read_file(ng_file)
        if not isinstance(ng_conf, str):
            return False, "nginx配置文件读取失败"

        rep_include = re.compile(r"\s*include\s+.*/ip-restrict/.*\.conf;", re.M)
        if rep_include.search(ng_conf):
            return True, ""

        _include_str = (
            "\n    #引用IP黑白名单规则,注释后配置的IP黑白名单将无效\n"
            "    include {};"
        ).format(ip_conf.nginx_sub_file)

        rep_redirect_include = re.compile(r"\s*include\s+.*/redirect/.*\.conf;", re.M)  # 如果有重定向,添加到重定向之后
        redirect_include_res = rep_redirect_include.search(ng_conf)
        if redirect_include_res:
            new_conf = ng_conf[:redirect_include_res.end()] + _include_str + ng_conf[redirect_include_res.end():]
        else:
            if "#SSL-END" not in ng_conf:
                return False, "添加配置失败,无法定位SSL相关配置的位置"

            new_conf = ng_conf.replace("#SSL-END", "#SSL-END" + _include_str)
        write_file(ng_file, new_conf)
        if self.web_server == "nginx" and check_server_config() is not None:
            write_file(ng_file, ng_conf)
            return False, "添加配置失败"

        return True, ""

    def set_ip_restrict(self, site_name: str, set_type: str) -> Tuple[bool, str]:
        ip_restrict = _IpRestrict(site_name, self.config_prefix)
        if set_type not in ("black", "white", "closed"):
            return False, "不支持的类型【{}】".format(set_type)
        ip_restrict.restrict_type = set_type
        f, msg = self._set_nginx_include(ip_restrict)
        if not f:
            return False, msg

        return ip_restrict.save()

    def add_black_ip_restrict(self, site_name: str, *ips: str) -> Tuple[bool, str]:
        try:
            for ip in ips:
                _ = ip_address(ip)  # 引发valueError
        except ValueError:
            return False, "ip参数解析错误"
        ip_restrict = _IpRestrict(site_name, self.config_prefix)
        black_list = ip_restrict.black_list
        for i in ips:
            if i not in black_list:
                black_list.append(i)

        ip_restrict.black_list = black_list
        f, msg = self._set_nginx_include(ip_restrict)
        if not f:
            return False, msg

        return ip_restrict.save()

    def remove_black_ip_restrict(self, site_name: str, *ips: str):
        ip_restrict = _IpRestrict(site_name, self.config_prefix)
        black_list = ip_restrict.black_list
        for i in ips:
            if i in black_list:
                black_list.remove(i)

        ip_restrict.black_list = black_list
        f, msg = self._set_nginx_include(ip_restrict)
        if not f:
            return False, msg

        return ip_restrict.save()

    def add_white_ip_restrict(self, site_name: str, *ips: str) -> Tuple[bool, str]:
        try:
            for ip in ips:
                _ = ip_address(ip)  # 引发valueError
        except ValueError:
            return False, "ip参数解析错误"
        ip_restrict = _IpRestrict(site_name, self.config_prefix)
        white_list = ip_restrict.white_list
        for i in ips:
            if i not in white_list:
                white_list.append(i)

        ip_restrict.white_list = white_list
        f, msg = self._set_nginx_include(ip_restrict)
        if not f:
            return False, msg

        return ip_restrict.save()

    def remove_white_ip_restrict(self, site_name: str, *ips: str) -> Tuple[bool, str]:
        ip_restrict = _IpRestrict(site_name, self.config_prefix)
        white_list = ip_restrict.white_list
        for i in ips:
            if i in white_list:
                white_list.remove(i)

        ip_restrict.white_list = white_list

        return ip_restrict.save()

    def remove_site_ip_restrict_info(self, site_name: str):
        ip_restrict = _IpRestrict(site_name, self.config_prefix)
        ip_restrict.remove_config_for_remove_site()


class IpRestrict:

    def __init__(self, config_prefix: str = ""):
        self.config_prefix = config_prefix
        self._ri = RealIpRestrict(self.config_prefix)

    # 获取ip控制信息
    def restrict_conf(self, get):
        try:
            site_name = get.site_name.strip()
        except (AttributeError, json.JSONDecodeError):
            return json_response(status=False, msg="参数错误")

        f, d = self._ri.restrict_conf(site_name)
        if not f:
            return json_response(status=f, msg=d)
        return json_response(status=f, data=d)

    # 设置ip黑白名单状态
    def set_ip_restrict(self, get):
        try:
            site_name = get.site_name.strip()
            set_ip_restrict = get.set_type.strip()
        except (AttributeError, json.JSONDecodeError):
            return json_response(status=False, msg="参数错误")

        f, m = self._ri.set_ip_restrict(site_name, set_ip_restrict)
        return json_response(status=f, msg=m)

    # 添加黑名单
    def add_black_ip_restrict(self, get):
        try:
            site_name = get.site_name.strip()
            value = get.value.strip()
        except AttributeError:
            return json_response(status=False, msg="参数错误")

        f, m = self._ri.add_black_ip_restrict(site_name, value)
        return json_response(status=f, msg=m)

    # 移除黑名单
    def remove_black_ip_restrict(self, get):
        try:
            site_name = get.site_name.strip()
            value = get.value.strip()
        except (AttributeError, json.JSONDecodeError):
            return json_response(status=False, msg="参数错误")

        f, m = self._ri.remove_black_ip_restrict(site_name, value)
        return json_response(status=f, msg=m)

    # 添加白名单
    def add_white_ip_restrict(self, get):
        try:
            site_name = get.site_name.strip()
            value = get.value.strip()
        except (AttributeError, json.JSONDecodeError):
            return json_response(status=False, msg="参数错误")

        f, m = self._ri.add_white_ip_restrict(site_name, value)
        return json_response(status=f, msg=m)

    # 移除白名单
    def remove_white_ip_restrict(self, get):
        try:
            site_name = get.site_name.strip()
            value = get.value.strip()
        except (AttributeError, json.JSONDecodeError):
            return json_response(status=False, msg="参数错误")

        f, m = self._ri.remove_white_ip_restrict(site_name, value)
        return json_response(status=f, msg=m)