| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| import json |
|
|
| import public |
| from btdockerModel import dk_public as dp |
| from btdockerModel.dockerBase import dockerBase |
|
|
|
|
| class main(dockerBase): |
|
|
| def docker_client(self, url): |
| return dp.docker_client(url) |
|
|
| def add(self, args): |
| """ |
| 添加仓库 |
| :param registry 仓库URL docker.io |
| :param name |
| :parma username |
| :parma password |
| :param namespace 仓库命名空间 |
| :param remark 备注 |
| :param args: |
| :return: |
| """ |
| |
| if not args.registry: |
| args.registry = "docker.io" |
| res = self.login(self._url, args.registry, args.username, args.password) |
| if not res['status']: |
| return res |
|
|
| r_list = self.registry_list(args) |
| |
| username_enc = public.aes_encrypt(args.username, self.aes_key) |
| password_enc = public.aes_encrypt(args.password, self.aes_key) |
| |
| pdata = { |
| "reg_name": args.name, |
| "url": args.registry, |
| "namespace": args.namespace, |
| "username": '' if not args.username else username_enc, |
| "password": '' if not args.password else password_enc, |
| "remark": public.xsssec(args.remark) |
| } |
| |
| if len(r_list) > 0: |
| for r in r_list: |
| if "reg_name" in r: |
| if r['reg_name'] == args.name and r["url"] == args.registry and r['username'] == pdata['username']: |
| return public.returnMsg(False, "仓库信息已存在!") |
|
|
| if r['name'] == args.name and r["url"] == args.registry and r['username'] == pdata['username']: |
| return public.returnMsg(False, "仓库信息已存在!") |
|
|
| dp.sql("registry").insert(pdata) |
| dp.write_log("添加仓库 [{}] [{}] 成功!".format(args.name, args.registry)) |
| return public.returnMsg(True, "添加成功!") |
|
|
| def edit(self, args): |
| """ |
| 添加仓库 |
| :param registry 仓库URL docker.io |
| :param id 仓库id |
| :parma username |
| :parma password |
| :param namespace |
| :param remark |
| :param args: |
| :return: |
| """ |
| |
| if str(args.id) == "1": |
| return public.returnMsg(False, "【Docker官方仓库】不可编辑!") |
| if not args.registry: |
| args.registry = "docker.io" |
|
|
| |
| try: |
| is_encrypt = False |
| res = self.login(self._url, args.registry, args.username, args.password) |
| if not res['status']: |
| res = self.login( |
| self._url, |
| args.registry, |
| public.aes_decrypt(args.username, self.aes_key), |
| public.aes_decrypt(args.password, self.aes_key) |
| ) |
| if not res['status']: |
| return res |
| is_encrypt = True |
| except Exception as e: |
| if "binascii.Error: Incorrect padding" in str(e): |
| return public.returnMsg(False, "编辑失败!原因:账号密码解密失败!请删除仓库后重新添加试试") |
| return public.returnMsg(False, "编辑失败!原因:{}".format(e)) |
|
|
| res = dp.sql("registry").where("id=?", (args.id,)).find() |
| if not res: |
| return public.returnMsg(False, "找不到此仓库") |
| pdata = { |
| "reg_name": args.name, |
| "url": args.registry, |
| "username": public.aes_encrypt(args.username, self.aes_key) if is_encrypt is False else args.username, |
| "password": public.aes_encrypt(args.password, self.aes_key) if is_encrypt is False else args.password, |
| "namespace": args.namespace, |
| "remark": args.remark |
| } |
| dp.sql("registry").where("id=?", (args.id,)).update(pdata) |
| dp.write_log("编辑仓库 [{}][{}] 成功!".format(args.name, args.registry)) |
| return public.returnMsg(True, "编辑成功!") |
|
|
| def remove(self, args): |
| """ |
| 删除某个仓库 |
| :param id |
| :param rags: |
| :return: |
| """ |
| if str(args.id) == "1": |
| return public.returnMsg(False, "【Docker官方仓库】无法删除!") |
|
|
| data = dp.sql("registry").where("id=?", (args.id)).find() |
|
|
| if len(data) < 1: |
| return public.returnMsg(True, "删除失败,仓库id可能不存在!") |
|
|
| dp.sql("registry").where("id=?", (args.id,)).delete() |
|
|
| dp.write_log("删除存储库 [{}][{}] 成功!".format(data['name'], data['url'])) |
| return public.returnMsg(True, "成功删除!") |
|
|
| def registry_list(self, get): |
| """ |
| 获取仓库列表 |
| :return: |
| """ |
| db_obj = dp.sql("registry") |
| |
| search_result = db_obj.where('id=? or name=?', (1, "docker官方库")).select() |
| if db_obj.ERR_INFO: |
| return [] |
|
|
| if len(search_result) == 0: |
| dp.sql("registry").insert({ |
| "name": "docker官方库", |
| "reg_name": "docker官方库", |
| "url": "docker.io", |
| "username": "", |
| "password": "", |
| "namespace": "", |
| "remark": "docker官方库" |
| }) |
|
|
| res = dp.sql("registry").select() |
| if not isinstance(res, list): |
| res = [] |
|
|
| for r in res: |
| if "reg_name" not in r: continue |
| if r["name"] == "" or not r["name"] or r["name"] is None: |
| r["name"] = r["reg_name"] |
|
|
| return res |
|
|
| |
| def set_remark(self, get): |
| ''' |
| @name 设置备注 |
| @param "data":{"id":"仓库ID","remark":"备注"} |
| @return dict{"status":True/False,"msg":"提示信息"} |
| ''' |
| try: |
| get.remark = get.get("remark/s", "") |
| if get.remark != "": |
| get.remark = public.xssencode2(get.remark) |
|
|
| dp.sql("registry").where("id=?", (get.id,)).setField("remark", get.remark) |
| return public.returnMsg(True, "设置成功!") |
| except Exception as e: |
| return public.returnMsg(False, "设置失败!{}".format(str(e))) |
|
|
| def get_com_registry(self, get): |
| """ |
| 获取常用仓库列表 |
| @param get: |
| @return: |
| """ |
| com_registry_file = "{}/class/btdockerModel/config/com_registry.json".format(public.get_panel_path()) |
| try: |
| com_registry = json.loads(public.readFile(com_registry_file)) |
| except: |
| public.ExecShell("rm -f {}".format(com_registry_file)) |
| public.downloadFile("{}/src/com_registry.json".format(public.get_url()), com_registry_file) |
| try: |
| com_registry = json.loads(public.readFile(com_registry_file)) |
| except: |
| com_registry = { |
| "https://mirror.ccs.tencentyun.com": "腾讯云镜像加速站", |
| } |
|
|
| return com_registry |
|
|
| def registry_info(self, get): |
| ''' |
| @name 获取指定仓库的信息 |
| @author wzz <2024/5/24 下午3:16> |
| @param "data":{"参数名":""} <数据类型> 参数描述 |
| @return dict{"status":True/False,"msg":"提示信息"} |
| ''' |
| registry_info = dp.sql("registry").where("reg_name=? and url=?", (get.name, get.url)).find() |
| if not registry_info: |
| registry_info = dp.sql("registry").where("name=? and url=?", (get.name, get.url)).find() |
|
|
| return registry_info |
|
|
| def login(self, url, registry, username, password): |
| """ |
| 仓库登录测试 |
| :param args: |
| :return: |
| """ |
| if not username and not username.strip(): |
| return public.returnMsg(True, "登录测试成功!") |
| import docker.errors |
| try: |
| res = self.docker_client(url).login( |
| registry=registry, |
| username=username, |
| password=password, |
| reauth=False |
| ) |
| return public.returnMsg(True, str(res)) |
| except docker.errors.APIError as e: |
| if "authentication required" in str(e): |
| return public.returnMsg(False, "登录测试失败!原因:可能是账号密码错误,请检查!") |
| if "unauthorized: incorrect username or password" in str(e): |
| return public.returnMsg(False, "登录测试失败!原因:可能是账号密码错误,请检查!") |
| return public.returnMsg(False, "登录测试失败!原因:{}".format(e)) |
|
|