| import copy |
| import os |
| import shutil |
|
|
| import cv2 |
| import gradio as gr |
| import modules.scripts as scripts |
|
|
| from modules import images |
| from modules.processing import process_images |
| from modules.shared import opts |
| from PIL import Image |
|
|
| import numpy as np |
|
|
| _BASEDIR = "/controlnet-m2m" |
| _BASEFILE = "animation" |
|
|
| def get_all_frames(video_path): |
| if video_path is None: |
| return None |
| cap = cv2.VideoCapture(video_path) |
| frame_list = [] |
| if not cap.isOpened(): |
| return |
| while True: |
| ret, frame = cap.read() |
| if ret: |
| frame_list.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| else: |
| return frame_list |
|
|
| def get_min_frame_num(video_list): |
| min_frame_num = -1 |
| for video in video_list: |
| if video is None: |
| continue |
| else: |
| frame_num = len(video) |
| print(frame_num) |
| if min_frame_num < 0: |
| min_frame_num = frame_num |
| elif frame_num < min_frame_num: |
| min_frame_num = frame_num |
| return min_frame_num |
|
|
| def pil2cv(image): |
| new_image = np.array(image, dtype=np.uint8) |
| if new_image.ndim == 2: |
| pass |
| elif new_image.shape[2] == 3: |
| new_image = new_image[:, :, ::-1] |
| elif new_image.shape[2] == 4: |
| new_image = new_image[:, :, [2, 1, 0, 3]] |
| return new_image |
|
|
|
|
| def save_gif(path, image_list, name, duration): |
| tmp_dir = path + "/tmp/" |
| if os.path.isdir(tmp_dir): |
| shutil.rmtree(tmp_dir) |
| os.mkdir(tmp_dir) |
| for i, image in enumerate(image_list): |
| images.save_image(image, tmp_dir, f"output_{i}") |
|
|
| os.makedirs(f"{path}{_BASEDIR}", exist_ok=True) |
|
|
| image_list[0].save(f"{path}{_BASEDIR}/{name}.gif", save_all=True, append_images=image_list[1:], optimize=False, duration=duration, loop=0) |
| |
|
|
| class Script(scripts.Script): |
| |
| def title(self): |
| return "controlnet m2m" |
|
|
| def show(self, is_img2img): |
| return True |
|
|
| def ui(self, is_img2img): |
| |
| |
| |
| |
| |
| ctrls_group = () |
| max_models = opts.data.get("control_net_max_models_num", 1) |
|
|
| with gr.Group(): |
| with gr.Accordion("ControlNet-M2M", open = False): |
| duration = gr.Slider(label=f"Duration", value=50.0, minimum=10.0, maximum=200.0, step=10, interactive=True, elem_id='controlnet_movie2movie_duration_slider') |
| with gr.Tabs(): |
| for i in range(max_models): |
| with gr.Tab(f"ControlNet-{i}"): |
| with gr.TabItem("Movie Input"): |
| ctrls_group += (gr.Video(format='mp4', source='upload', elem_id = f"video_{i}"), ) |
| with gr.TabItem("Image Input"): |
| ctrls_group += (gr.Image(source='upload', brush_radius=20, mirror_webcam=False, type='numpy', tool='sketch', elem_id=f'image_{i}'), ) |
| ctrls_group += (gr.Checkbox(label=f"Save preprocessed", value=False, elem_id = f"save_pre_{i}"),) |
| |
| ctrls_group += (duration,) |
|
|
| return ctrls_group |
|
|
| def run(self, p, *args): |
| |
| |
| |
| |
| |
| |
| |
| contents_num = opts.data.get("control_net_max_models_num", 1) |
| arg_num = 3 |
| item_list = [] |
| video_list = [] |
| for input_set in [tuple(args[:contents_num * arg_num][i:i+3]) for i in range(0, len(args[:contents_num * arg_num]), arg_num)]: |
| if input_set[0] is not None: |
| item_list.append([get_all_frames(input_set[0]), "video"]) |
| video_list.append(get_all_frames(input_set[0])) |
| if input_set[1] is not None: |
| item_list.append([cv2.cvtColor(pil2cv(input_set[1]["image"]), cv2.COLOR_BGRA2RGB), "image"]) |
|
|
| save_pre = list(args[2:contents_num * arg_num:3]) |
| item_num = len(item_list) |
| video_num = len(video_list) |
| duration, = args[contents_num * arg_num:] |
|
|
| frame_num = get_min_frame_num(video_list) |
| if frame_num > 0: |
| output_image_list = [] |
| pre_output_image_list = [] |
| for i in range(item_num): |
| pre_output_image_list.append([]) |
|
|
| for frame in range(frame_num): |
| copy_p = copy.copy(p) |
| copy_p.control_net_input_image = [] |
| for item in item_list: |
| if item[1] == "video": |
| copy_p.control_net_input_image.append(item[0][frame]) |
| elif item[1] == "image": |
| copy_p.control_net_input_image.append(item[0]) |
| else: |
| continue |
|
|
| proc = process_images(copy_p) |
| img = proc.images[0] |
| output_image_list.append(img) |
|
|
| for i in range(len(save_pre)): |
| if save_pre[i]: |
| try: |
| pre_output_image_list[i].append(proc.images[i + 1]) |
| except: |
| print(f"proc.images[{i} failed") |
|
|
| copy_p.close() |
|
|
| |
|
|
| seq = images.get_next_sequence_number(f"{p.outpath_samples}{_BASEDIR}", "") |
| filename = f"{seq:05}-{proc.seed}-{_BASEFILE}" |
| save_gif(p.outpath_samples, output_image_list, filename, duration) |
| proc.images = [f"{p.outpath_samples}{_BASEDIR}/{filename}.gif"] |
|
|
|
|
| for i in range(len(save_pre)): |
| if save_pre[i]: |
| |
| save_gif(p.outpath_samples, pre_output_image_list[i], f"{filename}-control{i}", duration) |
| proc.images.append(f"{p.outpath_samples}{_BASEDIR}/{filename}-control{i}.gif") |
|
|
| else: |
| proc = process_images(p) |
| |
| return proc |
|
|