import json import os.path import public from sslModel import base, dataModel class main: # 获取dns-api列表和托管的域名 def get_dns_api_and_domains(self, get=None): # 获取域名列表 domain_list = public.M("ssl_domains").select() # 获取dns-api列表 ssl_base_obj = base.sslBase() dns_api_data = ssl_base_obj.get_dns_data(None) # 宝塔域名 dns_api_data.update({"bt": {"dns_name": "宝塔DNS", "id": "bt", "dns_type": "宝塔DNS", "domains": []}}) for domain in domain_list: if not dns_api_data.get(domain["dns_id"], None): if "cloud_id=" in domain["dns_id"]: dns_api_data["bt"]["domains"].append(domain["domain"]) continue if not dns_api_data[domain["dns_id"]].get("domains", None): dns_api_data[domain["dns_id"]]["domains"] = [] dns_api_data[domain["dns_id"]]["domains"].append(domain["domain"]) dns_api_list = [] for k in dns_api_data: if not dns_api_data[k].get("domains", None): dns_api_data[k]["domains"] = [] # 移除敏感数据 dns_api_data[k].pop("user_name", None) dns_api_data[k].pop("api_password", None) dns_api_data[k].pop("secret_id", None) dns_api_data[k].pop("secret_key", None) dns_api_data[k].pop("ID", None) dns_api_data[k].pop("Token", None) dns_api_data[k].pop("SecretKey", None) dns_api_data[k].pop("AccessKey", None) dns_api_data[k].pop("project_id", None) dns_api_data[k].pop("API Key", None) dns_api_data[k].pop("E-Mail", None) dns_api_list.insert(0, dns_api_data[k]) dns_api_list.insert(0, {"dns_name": "手动解析", "id": "manual", "dns_type": "手动解析", "domains": []}) return dns_api_list # 设置解析记录 def set_dns_record(self, domain_name, dns_value, dns_id=None, record_type="TXT",sub_domain=""): if sub_domain: domain_name="{}.{}".format(sub_domain,domain_name) # 宝塔域名 if dns_id == "bt": # 查询域名云端ID root, sub = public.split_domain_sld(domain_name) domain_data = public.M("ssl_domains").where("domain=?", (root,)).find() if not domain_data: return public.returnMsg(False, "域名【{}】非宝塔域名,请确定域名是在宝塔域名注册或已转入宝塔域名后,在域名管理中刷新本地缓存后重试!".format(root)) dns_id = domain_data["dns_id"] if dns_id.find("cloud_id=") == -1: return public.returnMsg(False, "域名【{}】非宝塔域名,请确定域名是在宝塔域名注册或已转入宝塔域名后,在域名管理中刷新本地缓存后重试!".format(root)) domain_id = dns_id.split("cloud_id=")[1] from mod.project.domain import domainMod domain_obj = domainMod.main() rep = domain_obj.request({"url": "/api/v1/dns/record/create", "domain_type": 1, "domain_id": domain_id, "record": sub, "value": dns_value, "type": record_type, "mx": 10, "ttl": 600, "remark": "", "view_id": 0}) return rep elif dns_id == "manual": return public.returnMsg(True, "请手动添加DNS解析记录,解析类型:{},解析值:{}".format(record_type, dns_value)) else: ssl_data_obj = dataModel.main() args = { "fun_name": "create_dns_record", "dns_id": dns_id, "domain_dns_value": dns_value, "record_type": record_type, "domain_name": domain_name, "mx": 10, } return ssl_data_obj.run_fun(public.to_dict_obj(args)) # 写验证文件 def write_verify_file(self, domain_name, file_name, file_value): # 获取域名所在网站 domain_data = public.M("domain").where("name=?", (domain_name,)).find() if not domain_data: return public.returnMsg(False, "域名【{}】未绑定到任何网站,请先将域名解析到本服务器并绑定网站后重试!".format(domain_name)) site_data = public.M("sites").where("id=?", (domain_data["pid"],)).find() if not site_data: return public.returnMsg(False, "域名【{}】绑定的网站不存在,请先将域名解析到本服务器并绑定网站后重试!".format(domain_name)) site_path = site_data["path"] if not site_path or not os.path.exists(site_path): return public.returnMsg(False, "域名【{}】绑定的网站根目录不存在,请先将域名解析到本服务器并绑定网站后重试!".format(domain_name)) verify_path = os.path.join(site_path, file_name) if not os.path.exists(os.path.dirname(verify_path)): os.makedirs(os.path.dirname(verify_path), exist_ok=True) try: public.writeFile(verify_path, file_value) return public.returnMsg(True, "success") except Exception as e: return public.returnMsg(False, "写入验证文件【{}】失败,错误信息:{}".format(verify_path, str(e))) # 自动设置验证 def auto_set_ssl_verify(self, get=None): if not "verify_type" in get or not get.verify_type: return public.returnMsg(False, "验证类型不能为空!") verify_type = get.verify_type if not "verify_data" in get or not get.verify_data: return public.returnMsg(False, "验证数据不能为空!") verify_data = get.verify_data.strip() # 是否设置验证值 set_verify_value = False if "set_verify_value" in get: set_verify_value = get.set_verify_value try: verify_info = json.loads(verify_data) except Exception as e: return public.returnMsg(False, "验证数据格式错误,必须为JSON格式!") result = [] if verify_type == "dns": for item in verify_info: if not item.get("domain", None): return public.returnMsg(False, "域名不能为空!") domain_name = item["domain"].strip() # 特殊处理垦派多级域名的情况 sub_domain = item.get("sub_domain", "") if domain_name.startswith("*."): domain_name = domain_name[2:] root, sub = public.split_domain_sld(domain_name) if "_certum" in sub_domain and len(sub_domain.split(".")) > 1 and sub != "": sub_domain = "_certum" record_type = item.get("record_type", "TXT") dns_value = item.get("dns_value", "") dns_id = item.get("dns_id", None) domain_name,_ = public.split_domain_sld(domain_name) # 本地验证 local_verify = False try: import dns.resolver ns = dns.resolver.query(sub_domain+"."+domain_name, record_type) for j in ns.response.answer: for i in j.items: txt_value = i.to_text().replace('"', '').strip() # 特殊处理CNAME if record_type.lower() == "cname" and txt_value[-1] == ".": txt_value = txt_value[:-1] public.print_log(txt_value) if txt_value == dns_value: local_verify = True except Exception as e: pass rep = {"local_verify": local_verify, "status": None, "msg": None} if not local_verify and set_verify_value in (True, "True", "true"): rep.update(self.set_dns_record(domain_name, dns_value, dns_id=dns_id, record_type=record_type, sub_domain=sub_domain)) try: import dns.resolver ns = dns.resolver.query(sub_domain + "." + domain_name, record_type) for j in ns.response.answer: for i in j.items: txt_value = i.to_text().replace('"', '').strip() if txt_value == dns_value: rep["local_verify"] = True except Exception as e: pass result.append({"domain": domain_name, "rep": rep}) elif verify_type == "file": for item in verify_info: if not item.get("domain", None): return public.returnMsg(False, "域名不能为空!") domain_name = item["domain"].strip() file_name = item.get("file_name", "") file_value = item.get("file_value", "") # 本地验证 local_verify = False import requests url = "http://{}/{}".format(domain_name, file_name) try: response = requests.get(url, timeout=5) if response.status_code == 200: if response.text.strip() == file_value.strip(): local_verify = True except Exception as e: pass rep = {"local_verify": local_verify, "status": None, "msg": None} if not local_verify: rep.update(self.write_verify_file(domain_name, file_name, file_value)) else: rep.update(public.returnMsg(True, "验证文件已存在且内容正确,无需重复写入!")) result.append({"domain": domain_name, "rep": rep}) else: return public.returnMsg(False, "不支持的验证类型:{}!".format(verify_type)) return {"status": True, "msg": "success", "data": result}