page / bt-source /panel /class /sslModel /btModel.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
a757bd3 verified
from sslModel.base import sslBase
import public
import hmac
import hashlib
import json
import time
import requests
from typing import Dict, Any, Optional, Tuple
class main(sslBase):
def __init__(self):
super().__init__()
def __init_data(self, data):
self.account_id = data["AccountID"]
self.access_key = data["AccessKey"]
self.secret_key = data["SecretKey"]
self.base_url = "https://dmp.bt.cn"
def _generate_signature(self, method: str, path: str, body: str) -> Tuple[str, str]:
"""生成 API 签名"""
timestamp = str(int(time.time()))
signing_string = f"{self.account_id}\n{timestamp}\n{method.upper()}\n{path}\n{body}"
signature = hmac.new(
self.secret_key.encode('utf-8'),
signing_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
return timestamp, signature
def _request(self, method: str, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""发起 API 请求"""
url = self.base_url + path
body_str = ""
body_bytes = None
if data is not None:
body_str = json.dumps(data, separators=(',', ':'))
body_bytes = body_str.encode('utf-8')
timestamp, signature = self._generate_signature(method, path, body_str)
headers = {
"Content-Type": "application/json",
"X-Account-ID": self.account_id,
"X-Access-Key": self.access_key,
"X-Timestamp": timestamp,
"X-Signature": signature
}
response = requests.request(
method=method.upper(),
url=url,
data=body_bytes,
headers=headers
)
result = response.json()
if not result.get("status"):
raise Exception(f"API 请求失败: {result.get('msg')}")
return result
def create_dns_record(self, get):
domain_name = get.domain_name
domain_dns_value = get.domain_dns_value
record_type = 'TXT'
if 'record_type' in get:
record_type = get.record_type
mx = 0
if record_type == 'MX':
if not get.get('mx'):
return public.returnMsg(False, 'MX记录类型必须填写MX值')
mx = int(get.mx)
domain_name, sub_domain, _ = self.extract_zone(domain_name)
if not sub_domain:
sub_domain = '@'
domain_data = self.get_domain_list(get)
if not domain_data['status']:
return domain_data
domain_list = domain_data['data']
domain_id = None
for d in domain_list:
if d['name'] == domain_name:
domain_id = d['id']
break
if not domain_id:
return public.returnMsg(False, '域名不存在,请先添加域名')
try:
body = {
"domain_type": 1,
"domain_id": domain_id,
"record": sub_domain,
"type": record_type,
"value": domain_dns_value,
"TTL": int(get.ttl) if 'ttl' in get else 600,
"MX": mx,
"line": "默认",
"remark": get.remark if 'remark' in get else "",
"view_id": 0,
}
res = self._request("POST", "/api/v1/dns/record/create", body)
except Exception as e:
public.print_log(e)
return public.returnMsg(False, "添加解析记录失败: {}".format(e))
if not res.get("status"):
return public.returnMsg(False, "添加解析记录失败: {}".format(res.get("msg")))
return public.returnMsg(True, "添加解析记录成功")
def delete_dns_record(self, get):
domain_name = get.domain_name
domain_name, sub_domain, _ = self.extract_zone(domain_name)
domain_data = self.get_domain_list(get)
if not domain_data['status']:
return domain_data
domain_list = domain_data['data']
domain_id = None
for d in domain_list:
if d['name'] == domain_name:
domain_id = d['id']
break
if not domain_id:
return public.returnMsg(False, '域名不存在,请先添加域名')
record_id = get.RecordId
try:
body = {
"record_id": record_id,
"domain_type":1,
"domain_id":domain_id
}
res = self._request("POST", "/api/v1/dns/record/delete", body)
except Exception as e:
public.print_log(e)
return public.returnMsg(False, "删除解析记录失败: {}".format(e))
if not res.get("status"):
return public.returnMsg(False, "删除解析记录失败: {}".format(res.get("msg")))
return public.returnMsg(True, "删除解析记录成功")
def update_dns_record(self, get):
domain_name = get.domain_name
domain_dns_value = get.domain_dns_value
record_type = 'TXT'
if 'record_type' in get:
record_type = get.record_type
mx = 0
if record_type == 'MX':
if not get.get('mx'):
return public.returnMsg(False, 'MX记录类型必须填写MX值')
mx = int(get.mx)
domain_name, sub_domain, _ = self.extract_zone(domain_name)
if not sub_domain:
sub_domain = '@'
domain_data = self.get_domain_list(get)
if not domain_data['status']:
return domain_data
domain_list = domain_data['data']
domain_id = None
for d in domain_list:
if d['name'] == domain_name:
domain_id = d['id']
break
if not domain_id:
return public.returnMsg(False, '域名不存在,请先添加域名')
record_id = get.RecordId
try:
body = {
"record_id": record_id,
"domain_type": 1,
"domain_id": domain_id,
"record": sub_domain,
"type": record_type,
"value": domain_dns_value,
"TTL": int(get.ttl) if 'ttl' in get else 600,
"MX": mx,
"line": "默认",
"remark": get.remark if 'remark' in get else "",
"view_id": 0,
}
res = self._request("POST", "/api/v1/dns/record/update", body)
except Exception as e:
public.print_log(e)
return public.returnMsg(False, "修改解析记录失败: {}".format(e))
if not res.get("status"):
return public.returnMsg(False, "修改解析记录失败: {}".format(res.get("msg")))
return public.returnMsg(True, "修改解析记录成功")
def set_dns_record_status(self, get):
domain_name = get.domain_name
domain_name, sub_domain, _ = self.extract_zone(domain_name)
domain_data = self.get_domain_list(get)
if not domain_data['status']:
return domain_data
domain_list = domain_data['data']
domain_id = None
for d in domain_list:
if d['name'] == domain_name:
domain_id = d['id']
break
if not domain_id:
return public.returnMsg(False, '域名不存在,请先添加域名')
record_id = get.RecordId
try:
body = {
"record_id": record_id,
"domain_type":1,
"domain_id":domain_id,
}
res = self._request("POST", "/api/v1/dns/record/{}".format("start" if get.status == '0' else "pause"), body)
except Exception as e:
public.print_log(e)
return public.returnMsg(False, "设置解析记录状态失败: {}".format(e))
if not res.get("status"):
return public.returnMsg(False, "设置解析记录状态失败: {}".format(res.get("msg")))
return public.returnMsg(True, "设置解析记录状态成功")
def get_dns_record(self, get):
domain_name = get.domain_name
domain_data = self.get_domain_list(get)
if not domain_data['status']:
return domain_data
domain_list = domain_data['data']
domain_id = None
for d in domain_list:
if d['name'] == domain_name:
domain_id = d['id']
break
if not domain_id:
return public.returnMsg(False, '域名不存在,请先添加域名')
try:
body = {
"domain_id": domain_id,
"searchKey": "record",
"domain_type": 1,
"p": get.p,
"row": get.limit,
}
if "search" in get and get.search:
body["searchValue"] = get.search
res = self._request("POST", "/api/v1/dns/record/list", body)
except Exception as e:
public.print_log(e)
return public.returnMsg(False, "获取解析记录失败: {}".format(e))
data = {"info": {"record_total": res.get("data", {}).get("count", 0)}, "list": []}
for i in res.get("data", {}).get("data", []):
data["list"].append({
"RecordId": i["record_id"],
"name": i["record"]+"."+domain_name if i["record"] != "@" else domain_name,
"type": i["type"],
"value": i["value"],
"ttl": i["TTL"],
"mx": i.get("MX", 0),
"status": "暂停" if i["state"] == 2 else "启用",
"line": i.get("line", "默认"),
"remark": i.get("remark", ""),
})
return data
def get_domain_list(self, get):
self.__init_data(self.get_dns_data(None)[get.dns_id])
try:
res = self._request("POST", "/api/v1/domain/manage/list", {"p":1,"rows":10, "status":1})
except Exception as e:
public.print_log(e)
return public.returnMsg(False, "获取域名列表失败: {}".format(e))
data = []
local_domain_list = [d['domain'] for d in public.M('ssl_domains').field('domain').select()]
for i in res.get("data", {}).get("data", []):
data.append({
"id": i["id"],
"name": i["full_domain"],
"sync": 0 if i["full_domain"] in local_domain_list else 1,
})
return {"status": True, "msg": "获取成功", "data": data}