from sslModel.base import sslBase import public import hmac import hashlib import json import time import requests from typing import Dict, Any, Optional, Tuple class main(sslBase): def __init__(self): super().__init__() def __init_data(self, data): self.account_id = data["AccountID"] self.access_key = data["AccessKey"] self.secret_key = data["SecretKey"] self.base_url = "https://dmp.bt.cn" def _generate_signature(self, method: str, path: str, body: str) -> Tuple[str, str]: """生成 API 签名""" timestamp = str(int(time.time())) signing_string = f"{self.account_id}\n{timestamp}\n{method.upper()}\n{path}\n{body}" signature = hmac.new( self.secret_key.encode('utf-8'), signing_string.encode('utf-8'), hashlib.sha256 ).hexdigest() return timestamp, signature def _request(self, method: str, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """发起 API 请求""" url = self.base_url + path body_str = "" body_bytes = None if data is not None: body_str = json.dumps(data, separators=(',', ':')) body_bytes = body_str.encode('utf-8') timestamp, signature = self._generate_signature(method, path, body_str) headers = { "Content-Type": "application/json", "X-Account-ID": self.account_id, "X-Access-Key": self.access_key, "X-Timestamp": timestamp, "X-Signature": signature } response = requests.request( method=method.upper(), url=url, data=body_bytes, headers=headers ) result = response.json() if not result.get("status"): raise Exception(f"API 请求失败: {result.get('msg')}") return result def create_dns_record(self, get): domain_name = get.domain_name domain_dns_value = get.domain_dns_value record_type = 'TXT' if 'record_type' in get: record_type = get.record_type mx = 0 if record_type == 'MX': if not get.get('mx'): return public.returnMsg(False, 'MX记录类型必须填写MX值') mx = int(get.mx) domain_name, sub_domain, _ = self.extract_zone(domain_name) if not sub_domain: sub_domain = '@' domain_data = self.get_domain_list(get) if not domain_data['status']: return domain_data domain_list = domain_data['data'] domain_id = None for d in domain_list: if d['name'] == domain_name: domain_id = d['id'] break if not domain_id: return public.returnMsg(False, '域名不存在,请先添加域名') try: body = { "domain_type": 1, "domain_id": domain_id, "record": sub_domain, "type": record_type, "value": domain_dns_value, "TTL": int(get.ttl) if 'ttl' in get else 600, "MX": mx, "line": "默认", "remark": get.remark if 'remark' in get else "", "view_id": 0, } res = self._request("POST", "/api/v1/dns/record/create", body) except Exception as e: public.print_log(e) return public.returnMsg(False, "添加解析记录失败: {}".format(e)) if not res.get("status"): return public.returnMsg(False, "添加解析记录失败: {}".format(res.get("msg"))) return public.returnMsg(True, "添加解析记录成功") def delete_dns_record(self, get): domain_name = get.domain_name domain_name, sub_domain, _ = self.extract_zone(domain_name) domain_data = self.get_domain_list(get) if not domain_data['status']: return domain_data domain_list = domain_data['data'] domain_id = None for d in domain_list: if d['name'] == domain_name: domain_id = d['id'] break if not domain_id: return public.returnMsg(False, '域名不存在,请先添加域名') record_id = get.RecordId try: body = { "record_id": record_id, "domain_type":1, "domain_id":domain_id } res = self._request("POST", "/api/v1/dns/record/delete", body) except Exception as e: public.print_log(e) return public.returnMsg(False, "删除解析记录失败: {}".format(e)) if not res.get("status"): return public.returnMsg(False, "删除解析记录失败: {}".format(res.get("msg"))) return public.returnMsg(True, "删除解析记录成功") def update_dns_record(self, get): domain_name = get.domain_name domain_dns_value = get.domain_dns_value record_type = 'TXT' if 'record_type' in get: record_type = get.record_type mx = 0 if record_type == 'MX': if not get.get('mx'): return public.returnMsg(False, 'MX记录类型必须填写MX值') mx = int(get.mx) domain_name, sub_domain, _ = self.extract_zone(domain_name) if not sub_domain: sub_domain = '@' domain_data = self.get_domain_list(get) if not domain_data['status']: return domain_data domain_list = domain_data['data'] domain_id = None for d in domain_list: if d['name'] == domain_name: domain_id = d['id'] break if not domain_id: return public.returnMsg(False, '域名不存在,请先添加域名') record_id = get.RecordId try: body = { "record_id": record_id, "domain_type": 1, "domain_id": domain_id, "record": sub_domain, "type": record_type, "value": domain_dns_value, "TTL": int(get.ttl) if 'ttl' in get else 600, "MX": mx, "line": "默认", "remark": get.remark if 'remark' in get else "", "view_id": 0, } res = self._request("POST", "/api/v1/dns/record/update", body) except Exception as e: public.print_log(e) return public.returnMsg(False, "修改解析记录失败: {}".format(e)) if not res.get("status"): return public.returnMsg(False, "修改解析记录失败: {}".format(res.get("msg"))) return public.returnMsg(True, "修改解析记录成功") def set_dns_record_status(self, get): domain_name = get.domain_name domain_name, sub_domain, _ = self.extract_zone(domain_name) domain_data = self.get_domain_list(get) if not domain_data['status']: return domain_data domain_list = domain_data['data'] domain_id = None for d in domain_list: if d['name'] == domain_name: domain_id = d['id'] break if not domain_id: return public.returnMsg(False, '域名不存在,请先添加域名') record_id = get.RecordId try: body = { "record_id": record_id, "domain_type":1, "domain_id":domain_id, } res = self._request("POST", "/api/v1/dns/record/{}".format("start" if get.status == '0' else "pause"), body) except Exception as e: public.print_log(e) return public.returnMsg(False, "设置解析记录状态失败: {}".format(e)) if not res.get("status"): return public.returnMsg(False, "设置解析记录状态失败: {}".format(res.get("msg"))) return public.returnMsg(True, "设置解析记录状态成功") def get_dns_record(self, get): domain_name = get.domain_name domain_data = self.get_domain_list(get) if not domain_data['status']: return domain_data domain_list = domain_data['data'] domain_id = None for d in domain_list: if d['name'] == domain_name: domain_id = d['id'] break if not domain_id: return public.returnMsg(False, '域名不存在,请先添加域名') try: body = { "domain_id": domain_id, "searchKey": "record", "domain_type": 1, "p": get.p, "row": get.limit, } if "search" in get and get.search: body["searchValue"] = get.search res = self._request("POST", "/api/v1/dns/record/list", body) except Exception as e: public.print_log(e) return public.returnMsg(False, "获取解析记录失败: {}".format(e)) data = {"info": {"record_total": res.get("data", {}).get("count", 0)}, "list": []} for i in res.get("data", {}).get("data", []): data["list"].append({ "RecordId": i["record_id"], "name": i["record"]+"."+domain_name if i["record"] != "@" else domain_name, "type": i["type"], "value": i["value"], "ttl": i["TTL"], "mx": i.get("MX", 0), "status": "暂停" if i["state"] == 2 else "启用", "line": i.get("line", "默认"), "remark": i.get("remark", ""), }) return data def get_domain_list(self, get): self.__init_data(self.get_dns_data(None)[get.dns_id]) try: res = self._request("POST", "/api/v1/domain/manage/list", {"p":1,"rows":10, "status":1}) except Exception as e: public.print_log(e) return public.returnMsg(False, "获取域名列表失败: {}".format(e)) data = [] local_domain_list = [d['domain'] for d in public.M('ssl_domains').field('domain').select()] for i in res.get("data", {}).get("data", []): data.append({ "id": i["id"], "name": i["full_domain"], "sync": 0 if i["full_domain"] in local_domain_list else 1, }) return {"status": True, "msg": "获取成功", "data": data}