| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| import os |
| import public |
| import docker.errors |
| import projectModel.bt_docker.dk_public as dp |
|
|
| class main: |
| __log_path = '/tmp/dockertmp.log' |
| def docker_client(self,url): |
| import projectModel.bt_docker.dk_public as dp |
| return dp.docker_client(url) |
|
|
| |
| def save(self,args): |
| """ |
| :param path 要镜像tar要存放的路径 |
| :param name 包名 |
| :param id 镜像 |
| :param |
| :param args: |
| :return: |
| """ |
| try: |
| if "tar" in args.name: |
| filename = '{}/{}'.format(args.path,args.name) |
| else: |
| filename = '{}/{}.tar'.format(args.path, args.name) |
| if not os.path.exists(args.path): |
| os.makedirs(args.path) |
| public.writeFile(filename,"") |
| f = open(filename,'wb') |
| image = self.docker_client(args.url).images.get(args.id) |
| for chunk in image.save(named=True): |
| f.write(chunk) |
| f.close() |
| dp.write_log("镜像 [{}] 导出到 [{}] 成功".format(args.id,filename)) |
| return public.returnMsg(True, "成功保存到:{}".format(filename)) |
| except docker.errors.APIError as e: |
| if "empty export - not implemented" in str(e): |
| return public.returnMsg(False,"不能导出镜像!") |
| return public.get_error_info() |
|
|
| |
| def load(self,args): |
| """ |
| :param path: 需要导入的镜像路径具体到文件名 |
| :param args: |
| :return: |
| """ |
| images = self.docker_client(args.url).images |
| with open(args.path,'rb') as f: |
| images.load( |
| f |
| ) |
| dp.write_log("镜像 [{}] 导入成功!".format(args.path)) |
| return public.returnMsg(True,"镜像导入成功!{}".format(args.path)) |
|
|
| |
| def image_list(self,args): |
| """ |
| :param url |
| :param args: |
| :return: |
| """ |
| import projectModel.bt_docker.dk_registry as dr |
| import projectModel.bt_docker.dk_setup as ds |
| data = list() |
| client = self.docker_client(args.url) |
| docker_setup = ds.main() |
| installed = docker_setup.check_docker_program() |
| service_status = docker_setup.get_service_status() |
| if not client: |
| data = { |
| "images_list": [], |
| "registry_list": [], |
| "installed": installed, |
| "service_status": service_status |
| } |
| return public.returnMsg(True,data) |
| images = client.images |
| image_attr = self.get_image_attr(images) |
| registry_list = dr.main().registry_list(args) |
| if registry_list['status']: |
| registry_list = registry_list['msg']['registry'] |
| else: |
| registry_list = [] |
| for attr in image_attr: |
| if len(attr['RepoTags']) == 1: |
| tmp = { |
| "id": attr["Id"], |
| "tags": attr["RepoTags"], |
| "time": attr["Created"], |
| "name": attr['RepoTags'][0], |
| "size": attr["Size"], |
| "detail": attr |
| } |
| data.append(tmp) |
| elif len(attr['RepoTags']) > 1: |
| for i in range(len(attr['RepoTags'])): |
| tmp = { |
| "id": attr["Id"], |
| "tags": attr["RepoTags"], |
| "time": attr["Created"], |
| "name": attr['RepoTags'][i], |
| "size": attr["Size"], |
| "detail": attr |
| } |
| data.append(tmp) |
| elif not attr['RepoTags']: |
| tmp = { |
| "id": attr["Id"], |
| "tags": attr["RepoTags"], |
| "time": attr["Created"], |
| "name": attr["Id"], |
| "size": attr["Size"], |
| "detail": attr |
| } |
| data.append(tmp) |
| data = { |
| "images_list":data, |
| "registry_list": registry_list, |
| "installed": installed, |
| "service_status": service_status |
| } |
| return public.returnMsg(True,data) |
|
|
| def get_image_attr(self,images): |
| image = images.list() |
| return [i.attrs for i in image] |
|
|
| def get_logs(self, args): |
| import files |
| logs_file = args.logs_file |
| return public.returnMsg(True,files.files().GetLastLine(logs_file, 20)) |
|
|
| |
| def build(self,args): |
| """ |
| :param path dockerfile dir |
| :param pull 如果引用的镜像有更新自动拉取 |
| :param tag 标签 jose:v1 |
| :param data 在线编辑配置 |
| :param args: |
| :return: |
| """ |
| public.writeFile(self.__log_path,"开始构建镜像!") |
| public.writeFile('/tmp/dockertmp.log', "开始构建镜像!") |
| if not hasattr(args,"pull"): |
| args.pull = False |
| if hasattr(args,"data") and args.data: |
| args.path = "/tmp/dockerfile" |
| public.writeFile(args.path,args.data) |
| with open(args.path, 'rb') as f: |
| image_obj,generator = self.docker_client(args.url).images.build( |
| pull=True if args.pull == "1" else False, |
| fileobj=f, |
| tag=args.tag, |
| forcerm=True |
| ) |
| os.remove(args.path) |
| else: |
| if not os.path.exists(args.path): |
| return public.returnMsg(True, "请输入正确的DockerFile路径!") |
| if not os.path.isdir(args.path): |
| args.path = '/'.join(args.path.split('/')[:-1]) |
| image_obj,generator = self.docker_client(args.url).images.build( |
| pull=True if args.pull == "1" else False, |
| path=args.path, |
| tag=args.tag, |
| forcerm=True |
| ) |
| |
| dp.log_docker(generator,"Docker 构建任务!") |
| dp.write_log("构建镜像 [{}] 成功!".format(args.tag)) |
| return public.returnMsg(True,"构建镜像成功!") |
|
|
| |
| def remove(self,args): |
| """ |
| :param url |
| :param id 镜像id |
| :param name 镜像tag |
| :force 0/1 强制删除镜像 |
| :param args: |
| :return: |
| """ |
| try: |
| self.docker_client(args.url).images.remove(args.name) |
| dp.write_log("删除镜像【{}】成功!".format(args.name)) |
| return public.returnMsg(True,"删除镜像成功!") |
| except docker.errors.ImageNotFound as e: |
| return public.returnMsg(False,"删除进行失败,镜像可能不存在!") |
| except docker.errors.APIError as e: |
| if "image is referenced in multiple repositories" in str(e): |
| return public.returnMsg(False, "镜像 ID 用在多个镜像中,请强制删除镜像!") |
| if "using its referenced image" in str(e): |
| return public.returnMsg(False, "镜像正在使用中,请删除容器后再删除镜像!") |
| return public.returnMsg(False,"删除镜像失败!<br> {}".format(e)) |
|
|
| |
| def pull_from_some_registry(self,args): |
| """ |
| :param name 仓库名 |
| :param url |
| :param image |
| :param args: |
| :return: |
| """ |
| import projectModel.bt_docker.dk_registry as br |
| r_info = br.main().registry_info(args.name) |
| login = br.main().login(args.url,r_info['url'],r_info['username'],r_info['password'])['status'] |
| if not login: |
| return login |
| args.username = r_info['username'] |
| args.password = r_info['password'] |
| args.registry = r_info['url'] |
| args.namespace = r_info['namespace'] |
| return self.pull(args) |
|
|
| |
| def push(self,args): |
| """ |
| :param id 镜像ID |
| :param url 连接docker的url |
| :param tag 标签 镜像名+版本号v1 |
| :param name 仓库名 |
| :param args: |
| :return: |
| """ |
| if "/" in args.tag: |
| return public.returnMsg(False,"推送的镜像不能包含 [/] , 请使用以下格式: image:v1 (镜像名:版本)") |
| if ":" not in args.tag: |
| return public.returnMsg(False,"推送的镜像不能包含 [ : ] , 请使用以下格式: image:v1 (image_name:version_number)") |
| public.writeFile(self.__log_path, "开始推镜像!\n") |
| import projectModel.bt_docker.dk_registry as br |
| r_info = br.main().registry_info(args.name) |
| if args.name == "docker official" and r_info['url'] == "docker.io": |
| public.writeFile(self.__log_path, "镜像无法推送到 Docker 公共仓库!\n") |
| return public.returnMsg(False,"无法推送到 Docker 公共仓库!") |
| login = br.main().login(args.url,r_info['url'],r_info['username'],r_info['password'])['status'] |
| tag = args.tag |
| if not login: |
| return login |
| auth_conf = {"username": r_info['username'], |
| "password": r_info['password'], |
| "registry": r_info['url'] |
| } |
| |
| if ":" not in tag: |
| tag = "{}:latest".format(tag) |
| repository = r_info['url'] |
| image = "{}/{}/{}".format(repository,r_info['namespace'],args.tag) |
| self.tag(args.url,args.id,image) |
| ret = self.docker_client(args.url).images.push( |
| repository=image.split(":")[0], |
| tag=tag.split(":")[-1], |
| auth_config=auth_conf, |
| stream=True |
| ) |
| dp.log_docker(ret,"Image push task") |
| |
| args.name = image |
| self.remove(args) |
| dp.write_log("镜像 [{}] 推送成功!".format(image)) |
| return public.returnMsg(True,"推送成功!{}".format(str(ret))) |
|
|
| def tag(self,url,image_id,tag): |
| """ |
| 为镜像打标签 |
| :param repository 仓库namespace/images |
| :param image_id: 镜像ID |
| :param tag: 镜像标签jose:v1 |
| :return: |
| """ |
| image = tag.split(":")[0] |
| tag_ver = tag.split(":")[1] |
| self.docker_client(url).images.get(image_id).tag( |
| repository=image, |
| tag=tag_ver |
| ) |
| return public.returnMsg(True,"设置成功!") |
|
|
| def pull(self,args): |
| """ |
| :param image |
| :param url |
| :param registry |
| :param username 拉取私有镜像时填写 |
| :param password 拉取私有镜像时填写 |
| :param args: |
| :return: |
| """ |
| public.writeFile(self.__log_path, "开始推送镜像!") |
| import docker.errors |
| try: |
| if ':' not in args.image: |
| args.image = '{}:latest'.format(args.image) |
| auth_conf = {"username": args.username, |
| "password": args.password, |
| "registry":args.registry if args.registry else None |
| } if args.username else None |
| if not hasattr(args,"tag"): |
| args.tag = args.image.split(":")[-1] |
| if args.registry != "docker.io": |
| args.image = "{}/{}/{}".format(args.registry,args.namespace,args.image) |
| ret = dp.docker_client_low(args.url).pull( |
| repository=args.image, |
| auth_config=auth_conf, |
| tag=args.tag, |
| stream=True |
| ) |
| dp.log_docker(ret,"镜像拉取任务") |
| if ret: |
| dp.write_log("镜像拉取 [{}:{}] 成功".format(args.image,args.tag)) |
| return public.returnMsg(True, '镜像拉取成功.') |
| else: |
| return public.returnMsg(False, '可能没有这个镜像.') |
| except docker.errors.ImageNotFound as e: |
| if "pull access denied for" in str(e): |
| return public.returnMsg(False,"拉取失败,镜像为私有镜像,需要输入dockerhub的账号密码!") |
|
|
| return public.returnMsg(False,"拉取失败<br><br>原因: {}".format(e)) |
|
|
| except docker.errors.NotFound as e: |
| if "not found: manifest unknown" in str(e): |
| return public.returnMsg(False,"镜像拉取失败,仓库中没有这个镜像!") |
| return public.returnMsg(False, "Pull failed<br><br>reason:{}".format(e)) |
| except docker.errors.APIError as e: |
| if "invalid tag format" in str(e): |
| return public.returnMsg(False,"拉取失败, 镜像格式错误, 如: nginx:v 1!") |
| return public.returnMsg(False,"拉取失败!{}".format(e)) |
|
|
|
|
| |
| def pull_high_api(self,args): |
| """ |
| :param image |
| :param url |
| :param registry |
| :param username 拉取私有镜像时填写 |
| :param password 拉取私有镜像时填写 |
| :param args: |
| :return: |
| """ |
| import docker.errors |
| try: |
| if ':' not in args.image: |
| args.image = '{}:latest'.format(args.image) |
| auth_conf = {"username": args.username, |
| "password": args.password, |
| "registry":args.registry if args.registry else None |
| } if args.username else None |
|
|
| if args.registry != "docker.io": |
| args.image = "{}/{}/{}".format(args.registry,args.namespace,args.image) |
| ret = self.docker_client(args.url).images.pull( |
| repository=args.image, |
| auth_config=auth_conf, |
| ) |
| if ret: |
| return public.returnMsg(True, '拉取镜像成功.') |
| else: |
| return public.returnMsg(False, '可能没有这个镜像.') |
| except docker.errors.ImageNotFound as e: |
| if "pull access denied for" in str(e): |
| return public.returnMsg(False,"拉取镜像失败, 这个是私有镜像,请输入账号密码!") |
| return public.returnMsg(False,"拉取镜像失败<br><br>原因: {}".format(e)) |
|
|
| def image_for_host(self,args): |
| """ |
| 获取镜像大小和获取镜像数量 |
| :param args: |
| :return: |
| """ |
| res = self.image_list(args) |
| if not res['status']: |
| return res |
| num = len(res['msg']['images_list']) |
| size = 0 |
| for i in res['msg']['images_list']: |
| size += i['size'] |
| return public.returnMsg(True,{'num':num,'size':size}) |