File size: 10,255 Bytes
020c337
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import json
import os.path

import public
from sslModel import base, dataModel
class main:

    # 获取dns-api列表和托管的域名
    def get_dns_api_and_domains(self, get=None):
        # 获取域名列表
        domain_list = public.M("ssl_domains").select()
        # 获取dns-api列表
        ssl_base_obj = base.sslBase()
        dns_api_data = ssl_base_obj.get_dns_data(None)

        # 宝塔域名
        dns_api_data.update({"bt": {"dns_name": "宝塔DNS", "id": "bt", "dns_type": "宝塔DNS", "domains": []}})

        for domain in domain_list:
            if not dns_api_data.get(domain["dns_id"], None):
                if "cloud_id=" in domain["dns_id"]:
                    dns_api_data["bt"]["domains"].append(domain["domain"])
                continue
            if not dns_api_data[domain["dns_id"]].get("domains", None):
                dns_api_data[domain["dns_id"]]["domains"] = []

            dns_api_data[domain["dns_id"]]["domains"].append(domain["domain"])

        dns_api_list = []
        for k in dns_api_data:
            if not dns_api_data[k].get("domains", None):
                dns_api_data[k]["domains"] = []
            # 移除敏感数据
            dns_api_data[k].pop("user_name", None)
            dns_api_data[k].pop("api_password", None)
            dns_api_data[k].pop("secret_id", None)
            dns_api_data[k].pop("secret_key", None)
            dns_api_data[k].pop("ID", None)
            dns_api_data[k].pop("Token", None)
            dns_api_data[k].pop("SecretKey", None)
            dns_api_data[k].pop("AccessKey", None)
            dns_api_data[k].pop("project_id", None)
            dns_api_data[k].pop("API Key", None)
            dns_api_data[k].pop("E-Mail", None)
            dns_api_list.insert(0, dns_api_data[k])

        dns_api_list.insert(0, {"dns_name": "手动解析", "id": "manual", "dns_type": "手动解析", "domains": []})

        return dns_api_list

    # 设置解析记录
    def set_dns_record(self, domain_name, dns_value, dns_id=None, record_type="TXT",sub_domain=""):
        if sub_domain:
            domain_name="{}.{}".format(sub_domain,domain_name)
        # 宝塔域名
        if dns_id == "bt":
            # 查询域名云端ID
            root, sub = public.split_domain_sld(domain_name)
            domain_data = public.M("ssl_domains").where("domain=?", (root,)).find()
            if not domain_data:
                return public.returnMsg(False, "域名【{}】非宝塔域名,请确定域名是在宝塔域名注册或已转入宝塔域名后,在域名管理中刷新本地缓存后重试!".format(root))
            dns_id = domain_data["dns_id"]
            if dns_id.find("cloud_id=") == -1:
                return public.returnMsg(False, "域名【{}】非宝塔域名,请确定域名是在宝塔域名注册或已转入宝塔域名后,在域名管理中刷新本地缓存后重试!".format(root))
            domain_id = dns_id.split("cloud_id=")[1]
            from mod.project.domain import domainMod
            domain_obj = domainMod.main()
            rep = domain_obj.request({"url": "/api/v1/dns/record/create", "domain_type": 1, "domain_id": domain_id,
                                      "record": sub, "value": dns_value, "type": record_type,
                                      "mx": 10, "ttl": 600, "remark": "", "view_id": 0})
            return rep
        elif dns_id == "manual":
            return public.returnMsg(True, "请手动添加DNS解析记录,解析类型:{},解析值:{}".format(record_type, dns_value))
        else:
            ssl_data_obj = dataModel.main()
            args = {
                "fun_name": "create_dns_record",
                "dns_id": dns_id,
                "domain_dns_value": dns_value,
                "record_type": record_type,
                "domain_name": domain_name,
                "mx": 10,
            }
            return ssl_data_obj.run_fun(public.to_dict_obj(args))

    # 写验证文件
    def write_verify_file(self, domain_name, file_name, file_value):
        # 获取域名所在网站
        domain_data = public.M("domain").where("name=?", (domain_name,)).find()
        if not domain_data:
            return public.returnMsg(False, "域名【{}】未绑定到任何网站,请先将域名解析到本服务器并绑定网站后重试!".format(domain_name))
        site_data = public.M("sites").where("id=?", (domain_data["pid"],)).find()
        if not site_data:
            return public.returnMsg(False, "域名【{}】绑定的网站不存在,请先将域名解析到本服务器并绑定网站后重试!".format(domain_name))
        site_path = site_data["path"]
        if not site_path or not os.path.exists(site_path):
            return public.returnMsg(False, "域名【{}】绑定的网站根目录不存在,请先将域名解析到本服务器并绑定网站后重试!".format(domain_name))
        verify_path = os.path.join(site_path, file_name)
        if not os.path.exists(os.path.dirname(verify_path)):
            os.makedirs(os.path.dirname(verify_path), exist_ok=True)
        try:
            public.writeFile(verify_path, file_value)
            return public.returnMsg(True, "success")
        except Exception as e:
            return public.returnMsg(False, "写入验证文件【{}】失败,错误信息:{}".format(verify_path, str(e)))

    # 自动设置验证
    def auto_set_ssl_verify(self, get=None):
        if not "verify_type" in get or not get.verify_type:
            return public.returnMsg(False, "验证类型不能为空!")
        verify_type = get.verify_type
        if not "verify_data" in get or not get.verify_data:
            return public.returnMsg(False, "验证数据不能为空!")
        verify_data = get.verify_data.strip()
        # 是否设置验证值
        set_verify_value = False
        if "set_verify_value" in get:
           set_verify_value = get.set_verify_value
        try:
            verify_info = json.loads(verify_data)
        except Exception as e:
            return public.returnMsg(False, "验证数据格式错误,必须为JSON格式!")
        result = []
        if verify_type == "dns":
            for item in verify_info:
                if not item.get("domain", None):
                    return public.returnMsg(False, "域名不能为空!")
                domain_name = item["domain"].strip()
                # 特殊处理垦派多级域名的情况
                sub_domain = item.get("sub_domain", "")
                if domain_name.startswith("*."):
                    domain_name = domain_name[2:]
                root, sub = public.split_domain_sld(domain_name)
                if "_certum" in sub_domain and len(sub_domain.split(".")) > 1 and sub != "":
                    sub_domain = "_certum"

                record_type = item.get("record_type", "TXT")
                dns_value = item.get("dns_value", "")
                dns_id = item.get("dns_id", None)

                domain_name,_ = public.split_domain_sld(domain_name)

                # 本地验证
                local_verify = False
                try:
                    import dns.resolver
                    ns = dns.resolver.query(sub_domain+"."+domain_name, record_type)
                    for j in ns.response.answer:
                        for i in j.items:
                            txt_value = i.to_text().replace('"', '').strip()
                            # 特殊处理CNAME
                            if record_type.lower() == "cname" and txt_value[-1] == ".":
                                txt_value = txt_value[:-1]

                            public.print_log(txt_value)
                            if txt_value == dns_value:
                                local_verify = True
                except Exception as e:
                    pass
                rep = {"local_verify": local_verify, "status": None, "msg": None}
                if not local_verify and set_verify_value in (True, "True", "true"):
                    rep.update(self.set_dns_record(domain_name, dns_value, dns_id=dns_id, record_type=record_type, sub_domain=sub_domain))
                    try:
                        import dns.resolver
                        ns = dns.resolver.query(sub_domain + "." + domain_name, record_type)
                        for j in ns.response.answer:
                            for i in j.items:
                                txt_value = i.to_text().replace('"', '').strip()
                                if txt_value == dns_value:
                                    rep["local_verify"] = True
                    except Exception as e:
                        pass
                result.append({"domain": domain_name, "rep": rep})
        elif verify_type == "file":
            for item in verify_info:
                if not item.get("domain", None):
                    return public.returnMsg(False, "域名不能为空!")
                domain_name = item["domain"].strip()
                file_name = item.get("file_name", "")
                file_value = item.get("file_value", "")

                # 本地验证
                local_verify = False
                import requests
                url = "http://{}/{}".format(domain_name, file_name)
                try:
                    response = requests.get(url, timeout=5)

                    if response.status_code == 200:
                        if response.text.strip() == file_value.strip():
                            local_verify = True
                except Exception as e:
                    pass
                rep = {"local_verify": local_verify, "status": None, "msg": None}
                if not local_verify:
                    rep.update(self.write_verify_file(domain_name, file_name, file_value))
                else:
                    rep.update(public.returnMsg(True, "验证文件已存在且内容正确,无需重复写入!"))
                result.append({"domain": domain_name, "rep": rep})
        else:
            return public.returnMsg(False, "不支持的验证类型:{}!".format(verify_type))
        return {"status": True, "msg": "success", "data": result}