import os, sys, json, psutil import time os.chdir('/www/server/panel') if not 'class/' in sys.path: sys.path.insert(0,'class/') import public from sslModel.base import sslBase class main(sslBase): __APIURL2 = public.GetConfigValue('home') + '/api/v2/synchron' # __APIURL2 = "https://dev.bt.cn" + '/api/v2/synchron' __PDATA = None _check_url = None __request_url = None __UPATH = 'data/userInfo.json' __userInfo = None def __init__(self): super().__init__() if not os.path.exists(public.get_panel_path() + '/data/auto_deploy'): os.makedirs(public.get_panel_path() + '/data/auto_deploy') # 本地任务表 self.M("").execute("""CREATE TABLE IF NOT EXISTS `autodeploy_task` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `task_name` varchar(320) NOT NULL, -- 任务名称 `cloud_id` INTEGER NOT NULL, -- 云端任务ID `access_id` INTEGER NOT NULL, -- 接入ID `private_key` varchar(320) NOT NULL, -- 密钥 `sites` varchar(320) NOT NULL, -- 网站 `cycle` INTEGER NOT NULL DEFAULT 12, -- 检测周期 `deploy_time` INTEGER NOT NULL DEFAULT 0, -- 最近部署时间 `deploy_status` INTEGER NOT NULL DEFAULT 0, -- 最近部署状态 0等待部署 1=部署成功 -1部署失败 `ssl_hash` varchar(320) NOT NULL DEFAULT '', -- 证书hash `ssl_cloud_id` varchar(320) NOT NULL DEFAULT '' -- 云端证书id );""", ()) self.get_user_info() # 云端任务表 def request(self, dname): request_url = self.__APIURL2 + '/' + dname self.__request_url = public.get_home_node(request_url) msg = '接口请求失败({})'.format(self.__request_url) try: res = public.httpPost(request_url, self.__PDATA) if res == False: raise public.error_conn_cloud(msg) except Exception as ex: raise public.error_conn_cloud(str(ex)) result = public.returnMsg(False, msg) try: result = json.loads(res) except: pass return result def get_user_info(self): upath = public.get_panel_path() + "/" + self.__UPATH try: self.__userInfo = json.loads(public.readFile(upath)) except: self.__userInfo = {} self.__PDATA = self.__userInfo def M(self, table_name): import db sql = db.Sql() sql._Sql__DB_FILE = public.get_panel_path() + '/data/db/ssl_data.db' sql._Sql__encrypt_keys = [] return sql.table(table_name) def plane_synchron_list(self, get): res = self.request('plane_synchron_list') if not res["success"]: public.returnMsg(False, res["res"]) return public.returnMsg(True, res["res"]) def plane_synchron_access_list(self): # path = public.get_panel_path() + '/data/auto_deploy/plane_synchron_list.json' # try: # data = json.loads(public.readFile(path)) # return public.returnMsg(True, data) # except: # pass res = self.request('plane_synchron_access_list') if not res["success"]: public.returnMsg(False, res["res"]) # public.writeFile(path, json.dumps(res["res"])) return public.returnMsg(True, res["res"]) def plane_synchron_access_set(self, get): self.__PDATA["key"] = get.private_key self.__PDATA["synid"] = int(get.cloud_id) # self.__PDATA["url"] = "https://www.bt.cn" res = self.request('plane_synchron_access_set') if not res["success"]: return public.returnMsg(False, res["res"]) return public.returnMsg(True, res["res"]) def get_task_list(self, get=None): p = 1 if 'p' in get: p = int(get.p) collback = '' if 'collback' in get: collback = get.collback limit = 999999999 if 'limit' in get: limit = int(get.limit) f_sql = '1=1' if 'search' in get and get.search: f_sql = "task_name like '%{}%' or sites like '%{}%' ".format(get.search, get.search) count = self.M("autodeploy_task").where(f_sql, ()).count() page_data = public.get_page(count, p, limit, collback) data = self.M("autodeploy_task").where(f_sql, ()).limit(page_data["shift"]+","+page_data["row"]).order("id desc").select() # public.print_log(data) try: cloud_data = self.plane_synchron_access_list() except: cloud_data = None # public.print_log(cloud_data) deploy = [] for i in data: i["access_state"] = 0 i["synchron_info"] = {} if not cloud_data: continue for j in cloud_data["msg"]: if i["access_id"] == j["id"]: i["access_state"] = j["access_state"] if j["access_state"] == 2 and str(i["ssl_cloud_id"]) != str(j["synchron_info"]["ca_id"]): self.M("autodeploy_task").where("cloud_id=?", (i["cloud_id"],)).update({"ssl_cloud_id": j["synchron_info"]["ca_id"]}) # 写需要更新证书的任务到队列 deploy.append(i["id"]) i["synchron_info"] = j["synchron_info"] if deploy: public.ExecShell("nohup btpython {}/class/sslModel/autodeployModel.py --task_ids={} > /dev/null 2>&1 &".format(public.get_panel_path(), ",".join(str(i) for i in deploy))) page_data["data"] = data return {"status": True, "msg": page_data, "deploy": deploy} def add_task(self, get): task_name = get.task_name cloud_id = get.cloud_id private_key = get.private_key sites = get.sites # cycle = get.cycle cycle = 1 data = self.M("autodeploy_task").where("cloud_id=?", (cloud_id,)).find() if data: return public.returnMsg(False, "选择的证书部署任务在本服务器已存在对应的任务【{}】请勿重复添加!".format(data["task_name"])) res = self.plane_synchron_access_set(get) if not res["status"]: return public.returnMsg(False, res["msg"]) self.M("autodeploy_task").add('task_name,cloud_id,access_id,private_key,sites,cycle', (task_name, cloud_id, res["msg"]["id"], private_key, sites, cycle)) if not res["status"]: return public.returnMsg(False, res["msg"]) return public.returnMsg(True, "添加成功") def plane_synchron_access_delte(self, get): self.__PDATA["id"] = int(get.access_id) res = self.request('plane_synchron_access_delte') if not res["success"]: return public.returnMsg(False, res["res"]) return public.returnMsg(True, res["res"]) def del_task(self, get): data = self.M("autodeploy_task").where("id=?", (get.task_id,)).find() if not data: return public.returnMsg(False, "未找到任务") self.M("autodeploy_task").where("id=?", (get.task_id,)).delete() self.plane_synchron_access_delte(public.to_dict_obj({"access_id": data["access_id"]})) return public.returnMsg(True, "删除成功") def batch_del_task(self, get): task_id_list = get.task_ids.split(",") data = self.M("autodeploy_task").where("id in ({})".format(get.task_ids), ()).select() self.M("autodeploy_task").where("id in ({})".format(get.task_ids), ()).delete() total = len(data) faild = 0 success = 0 faildList = [] successList = [] for i in data: if str(i["id"]) not in task_id_list: faild += 1 faildList.append({"task_id": i["id"], "task_name":i["task_name"],"error_msg": "任务不存在", "status": False}) continue self.plane_synchron_access_delte(public.to_dict_obj({"access_id": i["access_id"]})) success += 1 successList.append({"task_id": i["id"], "task_name":i["task_name"],"error_msg": "删除成功", "status": True}) return public.returnMsg(True, {"total": total, "success": success, "faild": faild, "faildList": faildList, "successList": successList}) def edit_task(self, get): task_id = get.task_id task_name = get.task_name cloud_id = int(get.cloud_id) private_key = get.private_key sites = get.sites # cycle = get.cycle cycle = 1 data = self.M("autodeploy_task").where("id=?", (task_id,)).find() if not data: return public.returnMsg(False, "未找到任务") update_data = {"task_name": task_name, "cloud_id": cloud_id, "private_key": private_key, "sites": sites, "cycle": cycle} if data["cloud_id"] != cloud_id: cloud_data = self.M("autodeploy_task").where("cloud_id=?", (cloud_id)).find() if cloud_data: return public.returnMsg(False, "选择的证书部署任务在本服务器已存在对应的任务【{}】请勿重复添加!".format(cloud_data["task_name"])) res = self.plane_synchron_access_set(get) if not res["status"]: return public.returnMsg(False, res["msg"]) update_data["access_id"] = res["msg"]["id"] elif data["private_key"] != private_key: res = self.plane_synchron_access_set(get) if not res["status"]: return public.returnMsg(False, res["msg"]) if data["sites"] != sites: update_data["ssl_cloud_id"] = "" self.M("autodeploy_task").where("id=?", (get.task_id,)).update(update_data) return public.returnMsg(True, "修改成功") def plane_synchron_get_cert(self, access_id): self.__PDATA["id"] = int(access_id) res = self.request('plane_synchron_get_cert') if not res["success"]: return public.returnMsg(False, res["res"]) return public.returnMsg(True, res["res"]) def set_cert_to_database(self, access_id): cert_data = self.plane_synchron_get_cert(access_id) if not cert_data["status"]: return public.returnMsg(False, cert_data["msg"]) cert_data = cert_data["msg"]["cert"] from sslModel import certModel get = public.dict_obj() get.key = cert_data["privateCert"] get.csr = cert_data["cert"] try: cert = certModel.main().save_cert(get) if not cert["status"]: return False except: return False self.M("autodeploy_task").where("access_id=?", (access_id,)).update({"ssl_hash": cert["ssl_hash"]}) return True def deploy_cert(self, get): data = self.M("autodeploy_task").where("id=?", (int(get.task_id),)).find() if not data: return public.returnMsg(False, "未找到任务") sites = data["sites"].split(",") ssl_hash = data["ssl_hash"] get.BatchInfo = json.dumps([ {"ssl_hash": ssl_hash, "siteName": i} for i in sites ]) import panelSSL res = panelSSL.panelSSL().SetBatchCertToSite(get) deploy_time = int(time.time()) if res["total"] == res["success"]: self.write_deploy_detail(get.task_id, json.dumps(res), data["access_id"], 1, 1) deploy_status = 1 else: self.write_deploy_detail(get.task_id, json.dumps(res), data["access_id"], -1, 1) deploy_status = -1 self.M("autodeploy_task").where("id=?", (int(get.task_id),)).update({"deploy_time": deploy_time, "deploy_status": deploy_status}) def upload_deploy_msg(self, get): self.__PDATA["msg"] = get.error_msg self.__PDATA["id"] = get.access_id self.__PDATA["state"] = get.state self.__PDATA["type"] = get.msg_type # print(self.__PDATA) res = self.request('plane_synchron_deploy_state') if not res["success"]: return public.returnMsg(False, res["res"]) return public.returnMsg(True, res["res"]) def write_deploy_detail(self, task_id, error_msg, access_id, state, msg_type): path = public.get_panel_path() + '/data/auto_deploy/deploy_detail' if not os.path.exists(path): os.makedirs(path) public.writeFile(path + "/{}.json".format(task_id), error_msg) # print(access_id, state, msg_type) res = self.upload_deploy_msg(public.to_dict_obj({"access_id": access_id, "error_msg": error_msg, "state": state, "msg_type": msg_type})) # print(res) def get_task_deploy_detail(self, get): try: res = json.loads(public.readFile(public.get_panel_path() + '/data/auto_deploy/deploy_detail/{}.json'.format(get.task_id))) except: res = None return public.returnMsg(True, res) if __name__ == '__main__': try: if os.path.exists("/www/server/panel/data/auto_deploy.pl"): exit() public.writeFile("/www/server/panel/data/auto_deploy.pl", "") m = main() import argparse p = argparse.ArgumentParser() p.add_argument('--task_ids', default=None, help="任务id列表,用逗号分隔", dest="task_ids") args = p.parse_args() if not args.task_ids: exit() task_ids = args.task_ids.split(",") for i in task_ids: task_data = m.M("autodeploy_task").where("id=?", (int(i),)).find() if not task_data: continue set_cert_status = m.set_cert_to_database(task_data["access_id"]) if not set_cert_status: sites = task_data["sites"].split(",") error_msg = json.dumps({ "total": len(sites), "success": 0, "faild": len(sites), "faildList": [{"siteName": site, "error_msg": "证书保存失败", "status": False} for site in sites], "successList": [] }) m.write_deploy_detail(i, error_msg, task_data["access_id"], -1, 1) m.M("autodeploy_task").where("id=?", (int(i),)).update( {"deploy_time": int(time.time()), "deploy_status": -1}) continue m.deploy_cert(public.to_dict_obj({"task_id": i})) print("部署成功:", task_data["task_name"]) exit() finally: os.remove("/www/server/panel/data/auto_deploy.pl")