| from random import random |
|
|
| import torch.nn as nn |
| import torch |
| from PIL import Image |
| from diffusers import DDIMScheduler |
| from accelerate.utils import set_seed |
| from torch.fx.experimental.unification.multipledispatch.dispatcher import source |
| from torchvision.transforms.functional import to_pil_image, to_tensor, resize |
| import torch.backends.cudnn as cudnn |
|
|
| from cyclegan_model.model import create_model |
| from inpaint_model.model.networks import Generator |
| from inpaint_model.utils.tools import random_bbox, default_loader, normalize, get_model_list,get_image_from_dict |
|
|
| from cyclegan_model.data.base_dataset import get_transform |
| from cyclegan_model.options.test_options import TestOptions |
| import torchvision.utils as vutils |
| import torchvision.transforms as transforms |
| from pipeline_sd import ADPipeline |
| from pipeline_sdxl import ADPipeline as ADXLPipeline |
| from utils import Controller |
| import os |
| import yaml |
|
|
| import json |
| import matplotlib.pyplot as plt |
| from sddfrcnn_model.network_files import RetinaNet |
| from sddfrcnn_model.backbone import SSD300,Backbone |
| from sddfrcnn_model.backbone import resnet50_fpn_backbone,LastLevelP6P7 |
| from sddfrcnn_model.draw_box_utils import draw_objs |
| from yolov8_model.ultralytics import YOLO |
|
|
|
|
| class AttentionRunner: |
| def __init__(self): |
| self.sd15 = None |
| self.sdxl = None |
| self.loss_fn = torch.nn.L1Loss(reduction="mean") |
| |
| def load_pipeline(self, model_path_or_name): |
| if 'xl' in model_path_or_name and self.sdxl is None: |
| scheduler = DDIMScheduler.from_pretrained(model_path_or_name, subfolder="scheduler") |
| self.sdxl = ADXLPipeline.from_pretrained(model_path_or_name, scheduler=scheduler, safety_checker=None) |
| self.sdxl.classifier = self.sdxl.unet |
| elif self.sd15 is None: |
| scheduler = DDIMScheduler.from_pretrained(model_path_or_name, subfolder="scheduler") |
| self.sd15 = ADPipeline.from_pretrained(model_path_or_name, scheduler=scheduler, safety_checker=None) |
| self.sd15.classifier = self.sd15.unet |
|
|
| def preprocecss(self, image: Image.Image, height=None, width=None): |
| |
| image = resize(image, size=512) |
|
|
| if width is None or height is None: |
| width, height = image.size |
| new_width = (width // 64) * 64 |
| new_height = (height // 64) * 64 |
| size = (new_width, new_height) |
| image = image.resize(size, Image.BICUBIC) |
| return to_tensor(image).unsqueeze(0) |
|
|
| |
| def run_style_transfer(self, content_image, style_image, seed, num_steps, lr, content_weight, mixed_precision, model_path,model, **kwargs): |
| self.load_pipeline(model_path) |
|
|
| content_image = self.preprocecss(content_image) |
| style_image = self.preprocecss(style_image, height=512, width=512) |
|
|
| print(content_image.shape, style_image.shape) |
|
|
| height, width = content_image.shape[-2:] |
| set_seed(seed) |
| controller = Controller(self_layers=(10, 16)) |
| result = self.sd15.optimize( |
| lr=lr, |
| batch_size=1, |
| iters=1, |
| width=width, |
| height=height, |
| weight=content_weight, |
| controller=controller, |
| style_image=style_image, |
| content_image=content_image, |
| mixed_precision=mixed_precision, |
| num_inference_steps=num_steps, |
| enable_gradient_checkpoint=False, |
| ) |
| output_image = to_pil_image(result[0].float()) |
| del result |
| torch.cuda.empty_cache() |
| return [output_image] |
|
|
| |
| def run_style_t2i_generation(self, style_image, prompt, negative_prompt, guidance_scale, height, width, seed, num_steps, iterations, lr, num_images_per_prompt, mixed_precision, is_adain, model): |
| self.load_pipeline(model) |
|
|
| use_xl = 'xl' in model |
| height, width = (1024, 1024) if 'xl' in model else (512, 512) |
| style_image = self.preprocecss(style_image, height=height, width=width) |
|
|
| set_seed(seed) |
| self_layers = (64, 70) if use_xl else (10, 16) |
| |
| controller = Controller(self_layers=self_layers) |
|
|
| pipeline = self.sdxl if use_xl else self.sd15 |
| images = pipeline.sample( |
| controller=controller, |
| iters=iterations, |
| lr=lr, |
| adain=is_adain, |
| height=height, |
| width=width, |
| mixed_precision=mixed_precision, |
| style_image=style_image, |
| prompt=prompt, |
| negative_prompt=negative_prompt, |
| guidance_scale=guidance_scale, |
| num_inference_steps=num_steps, |
| num_images_per_prompt=num_images_per_prompt, |
| enable_gradient_checkpoint=False |
| ) |
| output_images = [to_pil_image(image.float()) for image in images] |
|
|
| del images |
| torch.cuda.empty_cache() |
| return output_images |
|
|
| |
| def run_texture_synthesis(self, texture_image, height, width, seed, num_steps, iterations, lr, mixed_precision, num_images_per_prompt, synthesis_way,model): |
| self.load_pipeline(model) |
|
|
| texture_image = self.preprocecss(texture_image, height=512, width=512) |
|
|
| set_seed(seed) |
| controller = Controller(self_layers=(10, 16)) |
|
|
| if synthesis_way == 'Sampling': |
| results = self.sd15.sample( |
| lr=lr, |
| adain=False, |
| iters=iterations, |
| width=width, |
| height=height, |
| weight=0., |
| controller=controller, |
| style_image=texture_image, |
| content_image=None, |
| prompt="", |
| negative_prompt="", |
| mixed_precision=mixed_precision, |
| num_inference_steps=num_steps, |
| guidance_scale=1., |
| num_images_per_prompt=num_images_per_prompt, |
| enable_gradient_checkpoint=False, |
| ) |
| elif synthesis_way == 'MultiDiffusion': |
| results = self.sd15.panorama( |
| lr=lr, |
| iters=iterations, |
| width=width, |
| height=height, |
| weight=0., |
| controller=controller, |
| style_image=texture_image, |
| content_image=None, |
| prompt="", |
| negative_prompt="", |
| stride=8, |
| view_batch_size=8, |
| mixed_precision=mixed_precision, |
| num_inference_steps=num_steps, |
| guidance_scale=1., |
| num_images_per_prompt=num_images_per_prompt, |
| enable_gradient_checkpoint=False, |
| ) |
| else: |
| raise ValueError |
| |
| output_images = [to_pil_image(image.float()) for image in results] |
| del results |
| torch.cuda.empty_cache() |
| return output_images |
|
|
| class InpaintingRunner: |
| def __init__(self,config='configs/config.yaml',seed=50,iter=0,flow=''): |
| self.config = yaml.load(open(config,'r'), Loader=yaml.FullLoader) |
| self.cuda = self.config['cuda'] |
| self.device_ids = self.config['gpu_ids'] |
| self.flow = flow |
| self.iter = iter |
| if self.cuda: |
| os.environ['CUDA_VISIBLE_DEVICES'] = ','.join(str(i) for i in self.device_ids) |
| device_ids = list(range(len(self.device_ids))) |
| self.config['gpu_ids'] = device_ids |
| cudnn.benchmark = True |
| if seed is None: |
| seed = random.randint(1, 10000) |
| self.seed = seed |
| torch.manual_seed(self.seed) |
| if self.cuda: |
| torch.cuda.manual_seed(self.seed) |
| def preprocess(self, input_image,mask_image): |
| if mask_image is not None: |
| input = get_image_from_dict(input_image) |
| mask_path = mask_image[0][0] |
| mask=default_loader(mask_path) |
|
|
| input = input.convert('RGB') |
|
|
| x = input |
| mask = mask |
| x = transforms.Resize(self.config['image_shape'][:-1])(x) |
| x = transforms.CenterCrop(self.config['image_shape'][:-1])(x) |
| mask = transforms.Resize(self.config['image_shape'][:-1])(mask) |
| mask = transforms.CenterCrop(self.config['image_shape'][:-1])(mask) |
| x = transforms.ToTensor()(x) |
| mask = transforms.ToTensor()(mask)[0].unsqueeze(dim=0) |
| x = normalize(x) |
| x = x * (1. - mask) |
| x = x.unsqueeze(dim=0) |
| mask = mask.unsqueeze(dim=0) |
| else: |
| ground_truth = default_loader(input_image) |
| ground_truth = transforms.Resize(self.config['image_shape'][:-1])(ground_truth) |
| ground_truth = transforms.CenterCrop(self.config['image_shape'][:-1])(ground_truth) |
| ground_truth = transforms.ToTensor()(ground_truth) |
| ground_truth = normalize(ground_truth) |
| ground_truth = ground_truth.unsqueeze(dim=0) |
| bboxes = random_bbox(self.config, batch_size=ground_truth.size(0)) |
| x, mask = mask_image(ground_truth, bboxes, self.config) |
| return x, mask |
| def run_generative_inpainting(self,input_image,mask_image): |
| try: |
| with torch.no_grad(): |
| x, mask = self.preprocess(input_image,mask_image) |
| checkpoint_path = os.path.join('checkpoints', |
| self.config['dataset_name'], |
| self.config['mask_type'] + '_' + self.config['expname']) |
| netG = Generator(self.config['netG'], self.cuda, self.device_ids) |
| last_model_name = get_model_list(checkpoint_path, "gen", iteration=self.iter) |
| netG.load_state_dict(torch.load(last_model_name)) |
| model_iteration = int(last_model_name[-11:-3]) |
| print("Resume from {} at iteration {}".format(checkpoint_path, model_iteration)) |
|
|
| if self.cuda: |
| netG = nn.parallel.DataParallel(netG, device_ids=self.device_ids) |
| x = x.cuda() |
| mask = mask.cuda() |
|
|
| |
| x1, x2, offset_flow = netG(x, mask) |
| inpainted_result = x2 * mask + x * (1. - mask) |
| inpainted_result = inpainted_result.squeeze(0) |
| inpainted_result = vutils.make_grid(inpainted_result, padding=0, normalize=True) |
| inpainted_result = transforms.ToPILImage()(inpainted_result) |
|
|
| if self.flow: |
| vutils.save_image(offset_flow, self.flow, padding=0, normalize=True) |
| print("Saved offset flow to {}".format(self.flow)) |
| except Exception as e: |
| print("Error: {}".format(e)) |
| raise e |
| return [inpainted_result] |
|
|
| class CycleGANRunner: |
| def __init__(self,checkpoints_dir='./checkpoints',name='ostracoda_cyclegan'): |
| self.opt = TestOptions().parse() |
| self.opt.name = name |
| self.opt.checkpoints_dir = checkpoints_dir |
| def preprocess(self, input_image,style_image): |
|
|
| input = input_image.convert('RGB') |
| style = style_image.convert('RGB') |
| transforms = get_transform(self.opt) |
|
|
| tensor_A = transforms(input).unsqueeze(0) |
| tensor_B = transforms(style).unsqueeze(0) |
|
|
| return { |
| 'A':tensor_A, |
| 'B':tensor_B |
| } |
|
|
| def load_model(self,input_nc, output_nc,ngf,netG,norm,no_dropout,init_type,init_gain): |
| self.opt.input_nc = input_nc |
| self.opt.output_nc = output_nc |
| self.opt.ngf = ngf |
| self.opt.netG = netG |
| self.opt.norm = norm |
| self.opt.no_dropout = no_dropout |
| self.opt.init_type = init_type |
| self.opt.init_gain = init_gain |
|
|
| self.model = create_model(self.opt) |
| self.model.setup(self.opt) |
|
|
| def run_cyclegan(self,input_image,style_image,input_nc, output_nc,ngf,netG,norm,use_dropout,init_type,init_gain): |
| data = self.preprocess(input_image,style_image) |
| self.load_model(input_nc, output_nc,ngf,netG,norm,use_dropout,init_type,init_gain) |
| self.model.set_input(data) |
| self.model.test() |
| visuals = self.model.get_current_visuals() |
| fake_A = visuals['fake_A'] |
| fake_A = fake_A.squeeze(0) |
| fake_A = vutils.make_grid(fake_A, padding=0, normalize=True) |
| fake_A_image = transforms.ToPILImage()(fake_A) |
|
|
| return [fake_A_image] |
|
|
| class SDDFRCNNRunner: |
| def __init__(self): |
| self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
| self.model = None |
| def _create_frcnn_model(self): |
| backbone = resnet50_fpn_backbone(norm_layer=torch.nn.BatchNorm2d, |
| returned_layers=[2, 3, 4], |
| extra_blocks=LastLevelP6P7(256, 256)) |
| model = RetinaNet(backbone, num_classes=9) |
| return model |
| def _create_ssd_model(self): |
| backbone = Backbone() |
| model = SSD300(backbone,num_classes=10) |
| return model |
| def load_model(self,model_type): |
| if model_type == 'FRCNN': |
| self.model = self._create_frcnn_model() |
| weights_path = "sddfrcnn_model/save_weights/resNetFpn-model-20.pth" |
| else: |
| self.model = self._create_ssd_model() |
| weights_path = "sddfrcnn_model/save_weights/ssd300-25.pth" |
| assert os.path.exists(weights_path), "{} file dose not exist.".format(weights_path) |
| weights_dict = torch.load(weights_path,map_location= 'cpu') |
| weights_dict = weights_dict["model"] if "model" in weights_dict else weights_dict |
| self.model.load_state_dict(weights_dict, strict=False) |
| self.model.to(self.device) |
|
|
| def load_json(self,json_path='sddfrcnn_model/pascal_voc_classes.json'): |
| self.label_json_path = json_path |
| assert os.path.exists((self.label_json_path)), "json file {} dose not exist.".format(self.label_json_path) |
| with open(self.label_json_path,'r',encoding='utf-8') as f: |
| class_dict = json.load(f) |
| category_index = {str(v): str(k) for k, v in class_dict.items()} |
| return category_index |
| def run_sddfrcnn(self,input_image,model_type): |
| self.load_model(model_type) |
| category_index = self.load_json() |
| original_image = input_image.convert('RGB') |
| if model_type == 'FRCNN': |
| data_transforms = transforms.Compose([transforms.ToTensor()]) |
| img = data_transforms(original_image) |
| img = torch.unsqueeze(img, 0) |
| self.model.eval() |
| with torch.no_grad(): |
| img_height, img_width = img.shape[-2:] |
| init_img = torch.zeros((1, 3, img_height, img_width), device=self.device) |
| self.model(init_img) |
| predictions = self.model(img.to(self.device))[0] |
| predict_boxes = predictions["boxes"].to("cpu").numpy() |
| predict_classes = predictions["labels"].to("cpu").numpy() |
| predict_scores = predictions["scores"].to("cpu").numpy() |
|
|
| if len(predict_boxes) == 0: |
| return [original_image], "未检测到生物" |
|
|
| plot_img = draw_objs(original_image, |
| predict_boxes, |
| predict_classes, |
| predict_scores, |
| category_index=category_index, |
| box_thresh=0, |
| line_thickness=3, |
| font='simhei.ttf', |
| font_size=20) |
| else: |
| data_transform = transforms.Compose([transforms.Resize((300, 300)),transforms.ToTensor()]) |
| img = data_transform(original_image) |
| |
| img = torch.unsqueeze(img, dim=0) |
|
|
| self.model.eval() |
| with torch.no_grad(): |
| |
| init_img = torch.zeros((1, 3, 300, 300), device=self.device) |
| self.model(init_img) |
|
|
| predictions = self.model(img.to(self.device))[0] |
|
|
| predict_boxes = predictions[0].to("cpu").numpy() |
| predict_boxes[:, [0, 2]] = predict_boxes[:, [0, 2]] * original_image.size[0] |
| predict_boxes[:, [1, 3]] = predict_boxes[:, [1, 3]] * original_image.size[1] |
| predict_classes = predictions[1].to("cpu").numpy() |
| predict_scores = predictions[2].to("cpu").numpy() |
|
|
| if len(predict_boxes) == 0: |
| print("没有检测到任何目标!") |
|
|
| plot_img = draw_objs(original_image, |
| predict_boxes, |
| predict_classes, |
| predict_scores, |
| category_index=category_index, |
| box_thresh=0, |
| line_thickness=3, |
| font='simhei.ttf', |
| font_size=20) |
| return [plot_img],"完成" |
|
|
| class YOLORunner: |
| def __init__(self,model_path='yolov8_model/weights/best.pt'): |
| self.model = YOLO(model_path) |
|
|
| def run_yolov8(self,input_image): |
| results = self.model.predict(source=input_image, imgsz=320, save=False, visualize=False) |
| if results and len(results)>0: |
| plot_img = results[0].plot() |
| import cv2 |
| rgb_image = cv2.cvtColor(plot_img, cv2.COLOR_BGR2RGB) |
| return [rgb_image] |
| else: |
| return [input_image] |
|
|
|
|