File size: 3,354 Bytes
a757bd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import ipaddress
import re

from .util import get_config_value


class WxAccountMsgBase:

    @classmethod
    def new_msg(cls):
        return cls()

    def set_ip_address(self, server_ip, local_ip):
        pass

    def to_send_data(self):
        return "", {}


class WxAccountMsg(WxAccountMsgBase):
    def __init__(self):
        self.ip_address: str = ""
        self.thing_type: str = ""
        self.msg: str = ""
        self.next_msg: str = ""

    def set_ip_address(self, server_ip, local_ip):
        self.ip_address = "{}({})".format(server_ip, local_ip)
        if len(self.ip_address) > 32:
            self.ip_address = self.ip_address[:29] + "..."

    def to_send_data(self):
        res = {
            "first": {},
            "keyword1": {
                "value": self.ip_address,
            },
            "keyword2": {
                "value": self.thing_type,
            },
            "keyword3": {
                "value": self.msg,
            }
        }

        if self.next_msg != "":
            res["keyword4"] = {"value": self.next_msg}

        return "", res


class WxAccountLoginMsg(WxAccountMsgBase):
    tid = "RJNG8dBZ5Tb9EK6j6gOlcAgGs2Fjn5Fb07vZIsYg1P4"

    def __init__(self):
        self.login_name: str = ""
        self.login_ip: str = ""
        self.thing_type: str = ""
        self.login_type: str = ""
        self.address: str = ""
        self._server_name: str = ""

    def set_ip_address(self, server_ip, local_ip):
        if self._server_name == "":
            self._server_name = "服务器IP{}".format(server_ip)

    def _get_server_name(self):
        data = get_config_value("title")  # 若获得别名,则使用别名.

        if data != "":
            self._server_name = data

    def to_send_data(self):
        self._get_server_name()
        if self.address.startswith(">归属地:"):
            self.address = self.address[5:]
        if self.address == "":
            self.address = "未知的归属地"

        if not _is_ipv4(self.login_ip):
            self.login_ip = "ipv6-can not show"

        res = {
            "thing10": {
                "value": self._server_name,
            },
            "character_string9": {
                "value": self.login_ip,
            },
            "thing7": {
                "value": self.login_type,
            },
            "thing11": {
                "value": self.address,
            },
            "thing2": {
                "value": self.login_name,
            }
        }
        return self.tid, res


# 处理短信告警信息的不规范问题
def sms_msg_normalize(sm_args: dict) -> dict:
    for key, val in sm_args.items():
        sm_args[key] = _norm_sms_push_argv(str(val))
    return sm_args


def _norm_sms_push_argv(data):
    """
    @处理短信参数,否则会被拦截
    """
    if _is_ipv4(data):
        tmp1 = data.split('.')
        return '{}_***_***_{}'.format(tmp1[0], tmp1[3])

    data = data.replace(".", "_").replace("+", "+")
    return data


def _is_ipv4(data: str) -> bool:
    try:
        ipaddress.IPv4Address(data)
    except:
        return False
    return True


def _is_domain(domain):
    rep_domain = re.compile(r"^([\w\-*]{1,100}\.){1,10}([\w\-]{1,24}|[\w\-]{1,24}\.[\w\-]{1,24})$")
    if rep_domain.match(domain):
        return True
    return False