HZSDU's picture
Add files using upload-large-folder tool
dfb6163 verified
from random import random
import torch.nn as nn
import torch
from PIL import Image
from diffusers import DDIMScheduler
from accelerate.utils import set_seed
from torch.fx.experimental.unification.multipledispatch.dispatcher import source
from torchvision.transforms.functional import to_pil_image, to_tensor, resize
import torch.backends.cudnn as cudnn
from cyclegan_model.model import create_model
from inpaint_model.model.networks import Generator
from inpaint_model.utils.tools import random_bbox, default_loader, normalize, get_model_list,get_image_from_dict
from cyclegan_model.data.base_dataset import get_transform
from cyclegan_model.options.test_options import TestOptions
import torchvision.utils as vutils
import torchvision.transforms as transforms
from pipeline_sd import ADPipeline
from pipeline_sdxl import ADPipeline as ADXLPipeline
from utils import Controller
import os
import yaml
import json
import matplotlib.pyplot as plt
from sddfrcnn_model.network_files import RetinaNet
from sddfrcnn_model.backbone import SSD300,Backbone
from sddfrcnn_model.backbone import resnet50_fpn_backbone,LastLevelP6P7
from sddfrcnn_model.draw_box_utils import draw_objs
from yolov8_model.ultralytics import YOLO
class AttentionRunner:
def __init__(self):
self.sd15 = None
self.sdxl = None
self.loss_fn = torch.nn.L1Loss(reduction="mean")
def load_pipeline(self, model_path_or_name):
if 'xl' in model_path_or_name and self.sdxl is None:
scheduler = DDIMScheduler.from_pretrained(model_path_or_name, subfolder="scheduler")
self.sdxl = ADXLPipeline.from_pretrained(model_path_or_name, scheduler=scheduler, safety_checker=None)
self.sdxl.classifier = self.sdxl.unet
elif self.sd15 is None:
scheduler = DDIMScheduler.from_pretrained(model_path_or_name, subfolder="scheduler")
self.sd15 = ADPipeline.from_pretrained(model_path_or_name, scheduler=scheduler, safety_checker=None)
self.sd15.classifier = self.sd15.unet
def preprocecss(self, image: Image.Image, height=None, width=None):
# TODO: resize the input image
image = resize(image, size=512)
if width is None or height is None:
width, height = image.size
new_width = (width // 64) * 64
new_height = (height // 64) * 64
size = (new_width, new_height)
image = image.resize(size, Image.BICUBIC)
return to_tensor(image).unsqueeze(0)
# @spaces.GPU
def run_style_transfer(self, content_image, style_image, seed, num_steps, lr, content_weight, mixed_precision, model_path,model, **kwargs):
self.load_pipeline(model_path)
content_image = self.preprocecss(content_image)
style_image = self.preprocecss(style_image, height=512, width=512)
print(content_image.shape, style_image.shape)
height, width = content_image.shape[-2:]
set_seed(seed)
controller = Controller(self_layers=(10, 16))
result = self.sd15.optimize(
lr=lr,
batch_size=1,
iters=1,
width=width,
height=height,
weight=content_weight,
controller=controller,
style_image=style_image,
content_image=content_image,
mixed_precision=mixed_precision,
num_inference_steps=num_steps,
enable_gradient_checkpoint=False,
)
output_image = to_pil_image(result[0].float())
del result
torch.cuda.empty_cache()
return [output_image]
# @spaces.GPU
def run_style_t2i_generation(self, style_image, prompt, negative_prompt, guidance_scale, height, width, seed, num_steps, iterations, lr, num_images_per_prompt, mixed_precision, is_adain, model):
self.load_pipeline(model)
use_xl = 'xl' in model
height, width = (1024, 1024) if 'xl' in model else (512, 512)
style_image = self.preprocecss(style_image, height=height, width=width)
set_seed(seed)
self_layers = (64, 70) if use_xl else (10, 16)
controller = Controller(self_layers=self_layers)
pipeline = self.sdxl if use_xl else self.sd15
images = pipeline.sample(
controller=controller,
iters=iterations,
lr=lr,
adain=is_adain,
height=height,
width=width,
mixed_precision=mixed_precision,
style_image=style_image,
prompt=prompt,
negative_prompt=negative_prompt,
guidance_scale=guidance_scale,
num_inference_steps=num_steps,
num_images_per_prompt=num_images_per_prompt,
enable_gradient_checkpoint=False
)
output_images = [to_pil_image(image.float()) for image in images]
del images
torch.cuda.empty_cache()
return output_images
# @spaces.GPU
def run_texture_synthesis(self, texture_image, height, width, seed, num_steps, iterations, lr, mixed_precision, num_images_per_prompt, synthesis_way,model):
self.load_pipeline(model)
texture_image = self.preprocecss(texture_image, height=512, width=512)
set_seed(seed)
controller = Controller(self_layers=(10, 16))
if synthesis_way == 'Sampling':
results = self.sd15.sample(
lr=lr,
adain=False,
iters=iterations,
width=width,
height=height,
weight=0.,
controller=controller,
style_image=texture_image,
content_image=None,
prompt="",
negative_prompt="",
mixed_precision=mixed_precision,
num_inference_steps=num_steps,
guidance_scale=1.,
num_images_per_prompt=num_images_per_prompt,
enable_gradient_checkpoint=False,
)
elif synthesis_way == 'MultiDiffusion':
results = self.sd15.panorama(
lr=lr,
iters=iterations,
width=width,
height=height,
weight=0.,
controller=controller,
style_image=texture_image,
content_image=None,
prompt="",
negative_prompt="",
stride=8,
view_batch_size=8,
mixed_precision=mixed_precision,
num_inference_steps=num_steps,
guidance_scale=1.,
num_images_per_prompt=num_images_per_prompt,
enable_gradient_checkpoint=False,
)
else:
raise ValueError
output_images = [to_pil_image(image.float()) for image in results]
del results
torch.cuda.empty_cache()
return output_images
class InpaintingRunner:
def __init__(self,config='configs/config.yaml',seed=50,iter=0,flow=''):
self.config = yaml.load(open(config,'r'), Loader=yaml.FullLoader)
self.cuda = self.config['cuda']
self.device_ids = self.config['gpu_ids']
self.flow = flow
self.iter = iter
if self.cuda:
os.environ['CUDA_VISIBLE_DEVICES'] = ','.join(str(i) for i in self.device_ids)
device_ids = list(range(len(self.device_ids)))
self.config['gpu_ids'] = device_ids
cudnn.benchmark = True
if seed is None:
seed = random.randint(1, 10000)
self.seed = seed
torch.manual_seed(self.seed)
if self.cuda:
torch.cuda.manual_seed(self.seed)
def preprocess(self, input_image,mask_image):
if mask_image is not None:
input = get_image_from_dict(input_image)
mask_path = mask_image[0][0]
mask=default_loader(mask_path)
input = input.convert('RGB')
x = input
mask = mask
x = transforms.Resize(self.config['image_shape'][:-1])(x)
x = transforms.CenterCrop(self.config['image_shape'][:-1])(x)
mask = transforms.Resize(self.config['image_shape'][:-1])(mask)
mask = transforms.CenterCrop(self.config['image_shape'][:-1])(mask)
x = transforms.ToTensor()(x)
mask = transforms.ToTensor()(mask)[0].unsqueeze(dim=0)
x = normalize(x)
x = x * (1. - mask)
x = x.unsqueeze(dim=0)
mask = mask.unsqueeze(dim=0)
else:
ground_truth = default_loader(input_image)
ground_truth = transforms.Resize(self.config['image_shape'][:-1])(ground_truth)
ground_truth = transforms.CenterCrop(self.config['image_shape'][:-1])(ground_truth)
ground_truth = transforms.ToTensor()(ground_truth)
ground_truth = normalize(ground_truth)
ground_truth = ground_truth.unsqueeze(dim=0)
bboxes = random_bbox(self.config, batch_size=ground_truth.size(0))
x, mask = mask_image(ground_truth, bboxes, self.config)
return x, mask
def run_generative_inpainting(self,input_image,mask_image):
try:
with torch.no_grad():
x, mask = self.preprocess(input_image,mask_image)
checkpoint_path = os.path.join('checkpoints',
self.config['dataset_name'],
self.config['mask_type'] + '_' + self.config['expname'])
netG = Generator(self.config['netG'], self.cuda, self.device_ids)
last_model_name = get_model_list(checkpoint_path, "gen", iteration=self.iter)
netG.load_state_dict(torch.load(last_model_name))
model_iteration = int(last_model_name[-11:-3])
print("Resume from {} at iteration {}".format(checkpoint_path, model_iteration))
if self.cuda:
netG = nn.parallel.DataParallel(netG, device_ids=self.device_ids)
x = x.cuda()
mask = mask.cuda()
# Inference
x1, x2, offset_flow = netG(x, mask)
inpainted_result = x2 * mask + x * (1. - mask)
inpainted_result = inpainted_result.squeeze(0)
inpainted_result = vutils.make_grid(inpainted_result, padding=0, normalize=True)
inpainted_result = transforms.ToPILImage()(inpainted_result)
if self.flow:
vutils.save_image(offset_flow, self.flow, padding=0, normalize=True)
print("Saved offset flow to {}".format(self.flow))
except Exception as e:
print("Error: {}".format(e))
raise e
return [inpainted_result]
class CycleGANRunner:
def __init__(self,checkpoints_dir='./checkpoints',name='ostracoda_cyclegan'):
self.opt = TestOptions().parse()
self.opt.name = name
self.opt.checkpoints_dir = checkpoints_dir
def preprocess(self, input_image,style_image):
input = input_image.convert('RGB')
style = style_image.convert('RGB')
transforms = get_transform(self.opt)
tensor_A = transforms(input).unsqueeze(0)
tensor_B = transforms(style).unsqueeze(0)
return {
'A':tensor_A,
'B':tensor_B
}
def load_model(self,input_nc, output_nc,ngf,netG,norm,no_dropout,init_type,init_gain):
self.opt.input_nc = input_nc
self.opt.output_nc = output_nc
self.opt.ngf = ngf
self.opt.netG = netG
self.opt.norm = norm
self.opt.no_dropout = no_dropout
self.opt.init_type = init_type
self.opt.init_gain = init_gain
self.model = create_model(self.opt)
self.model.setup(self.opt)
def run_cyclegan(self,input_image,style_image,input_nc, output_nc,ngf,netG,norm,use_dropout,init_type,init_gain):
data = self.preprocess(input_image,style_image)
self.load_model(input_nc, output_nc,ngf,netG,norm,use_dropout,init_type,init_gain)
self.model.set_input(data)
self.model.test()
visuals = self.model.get_current_visuals()
fake_A = visuals['fake_A']
fake_A = fake_A.squeeze(0)
fake_A = vutils.make_grid(fake_A, padding=0, normalize=True)
fake_A_image = transforms.ToPILImage()(fake_A)
return [fake_A_image]
class SDDFRCNNRunner:
def __init__(self):
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.model = None
def _create_frcnn_model(self):
backbone = resnet50_fpn_backbone(norm_layer=torch.nn.BatchNorm2d,
returned_layers=[2, 3, 4],
extra_blocks=LastLevelP6P7(256, 256))
model = RetinaNet(backbone, num_classes=9)
return model
def _create_ssd_model(self):
backbone = Backbone()
model = SSD300(backbone,num_classes=10)
return model
def load_model(self,model_type):
if model_type == 'FRCNN':
self.model = self._create_frcnn_model()
weights_path = "sddfrcnn_model/save_weights/resNetFpn-model-20.pth"
else:
self.model = self._create_ssd_model()
weights_path = "sddfrcnn_model/save_weights/ssd300-25.pth"
assert os.path.exists(weights_path), "{} file dose not exist.".format(weights_path)
weights_dict = torch.load(weights_path,map_location= 'cpu')
weights_dict = weights_dict["model"] if "model" in weights_dict else weights_dict
self.model.load_state_dict(weights_dict, strict=False)
self.model.to(self.device)
def load_json(self,json_path='sddfrcnn_model/pascal_voc_classes.json'):
self.label_json_path = json_path
assert os.path.exists((self.label_json_path)), "json file {} dose not exist.".format(self.label_json_path)
with open(self.label_json_path,'r',encoding='utf-8') as f:
class_dict = json.load(f)
category_index = {str(v): str(k) for k, v in class_dict.items()}
return category_index
def run_sddfrcnn(self,input_image,model_type):
self.load_model(model_type)
category_index = self.load_json()
original_image = input_image.convert('RGB')
if model_type == 'FRCNN':
data_transforms = transforms.Compose([transforms.ToTensor()])
img = data_transforms(original_image)
img = torch.unsqueeze(img, 0)
self.model.eval()
with torch.no_grad():
img_height, img_width = img.shape[-2:]
init_img = torch.zeros((1, 3, img_height, img_width), device=self.device)
self.model(init_img)
predictions = self.model(img.to(self.device))[0]
predict_boxes = predictions["boxes"].to("cpu").numpy()
predict_classes = predictions["labels"].to("cpu").numpy()
predict_scores = predictions["scores"].to("cpu").numpy()
if len(predict_boxes) == 0:
return [original_image], "未检测到生物"
plot_img = draw_objs(original_image,
predict_boxes,
predict_classes,
predict_scores,
category_index=category_index,
box_thresh=0,
line_thickness=3,
font='simhei.ttf',
font_size=20)
else:
data_transform = transforms.Compose([transforms.Resize((300, 300)),transforms.ToTensor()])
img = data_transform(original_image)
# expand batch dimension
img = torch.unsqueeze(img, dim=0)
self.model.eval()
with torch.no_grad():
# initial model
init_img = torch.zeros((1, 3, 300, 300), device=self.device)
self.model(init_img)
predictions = self.model(img.to(self.device))[0] # bboxes_out, labels_out, scores_out
predict_boxes = predictions[0].to("cpu").numpy()
predict_boxes[:, [0, 2]] = predict_boxes[:, [0, 2]] * original_image.size[0]
predict_boxes[:, [1, 3]] = predict_boxes[:, [1, 3]] * original_image.size[1]
predict_classes = predictions[1].to("cpu").numpy()
predict_scores = predictions[2].to("cpu").numpy()
if len(predict_boxes) == 0:
print("没有检测到任何目标!")
plot_img = draw_objs(original_image,
predict_boxes,
predict_classes,
predict_scores,
category_index=category_index,
box_thresh=0,
line_thickness=3,
font='simhei.ttf',
font_size=20)
return [plot_img],"完成"
class YOLORunner:
def __init__(self,model_path='yolov8_model/weights/best.pt'):
self.model = YOLO(model_path)
def run_yolov8(self,input_image):
results = self.model.predict(source=input_image, imgsz=320, save=False, visualize=False)
if results and len(results)>0:
plot_img = results[0].plot()
import cv2
rgb_image = cv2.cvtColor(plot_img, cv2.COLOR_BGR2RGB)
return [rgb_image]
else:
return [input_image]