| import requests
|
| import random
|
| from PIL import Image, ImageOps, ImageDraw, ImageFont
|
| from datetime import datetime
|
| from pathlib import Path
|
| import io, re, base64, os
|
| import zipfile
|
| import math
|
|
|
| BASE_URL="https://image.novelai.net"
|
|
|
| def process_xargs(xargs):
|
| processed_xargs = []
|
| skip_until = None
|
|
|
| for i, elem in enumerate(xargs):
|
| if skip_until is not None and i <= skip_until:
|
| continue
|
|
|
| if "add_negative:<" in elem or "rem_negative:<" in elem:
|
|
|
| if elem.endswith(">"):
|
| processed_xargs.append(elem)
|
| else:
|
| combined_elem = elem
|
| for j in range(i + 1, len(xargs)):
|
| combined_elem += ',' + xargs[j]
|
| if xargs[j].endswith(">"):
|
| skip_until = j
|
| break
|
| processed_xargs.append(combined_elem)
|
| else:
|
| processed_xargs.append(elem)
|
|
|
| return processed_xargs
|
|
|
| def event_cond_negative(_prompt, negative, user_input, rating):
|
| def insert_text_after_keyword(negative, user_keyword, user_additional_keyword):
|
| if user_keyword in negative:
|
| index = negative.index(user_keyword) + 1
|
| negative.insert(index, user_additional_keyword)
|
| return negative
|
| def parse_conditional_command(command):
|
| match = re.match(r"\((.*?)\)\:(.*)", command)
|
| if match:
|
| return match.groups()
|
| return '', command
|
| def check_condition(prompt, condition, rating):
|
| if not condition:
|
| return True
|
| sub_conditions = re.split(r'\)\s*&\s*\(', condition)
|
| sub_conditions = [re.sub(r'^\(|\)$', '', cond) for cond in sub_conditions]
|
|
|
| results = []
|
| for sub_cond in sub_conditions:
|
| if '&' in sub_cond:
|
| results.append(all(check_condition(prompt, cond, rating) for cond in sub_cond.split('&')))
|
| elif '|' in sub_cond:
|
| results.append(any(check_condition(prompt, cond, rating) for cond in sub_cond.split('|')))
|
| else:
|
| if sub_cond in ['e', 'q', 's', 'g']:
|
| results.append(sub_cond == rating)
|
| elif sub_cond in ['~e', '~q', '~s', '~g']:
|
| results.append(sub_cond != rating)
|
|
|
| elif sub_cond.startswith('*'):
|
| results.append(sub_cond[1:] in prompt)
|
|
|
| elif sub_cond.startswith('~!'):
|
| results.append(sub_cond[2:] not in prompt)
|
| elif sub_cond.startswith('~'):
|
| results.append(any(sub_cond[1:] not in element for element in prompt))
|
|
|
| else:
|
| results.append(any(sub_cond in element for element in prompt))
|
| return all(results)
|
| def execute_command(negative, command):
|
| if '+=' in command:
|
| keyword, addition = command.split('+=', 1)
|
| addition = addition.replace('^', ', ')
|
| return insert_text_after_keyword(negative, keyword, addition)
|
| elif command.startswith('add '):
|
| keyword = command[4:]
|
| keyword = keyword.replace('^', ', ')
|
| keys = keyword.split(',')
|
| keys = [key.strip() for key in keys]
|
| for key in keys:
|
| if key not in negative:
|
| negative.append(key)
|
| return negative
|
| elif command.startswith('rem '):
|
| keyword = command[4:]
|
| keyword = keyword.replace('^', ', ')
|
| keys = keyword.split(',')
|
| keys = [key.strip() for key in keys]
|
| for key in keys:
|
| if key in negative:
|
| negative.remove(key)
|
| return negative
|
| elif '=' in command:
|
| keyword, replacement = command.split('=', 1)
|
| if keyword in negative:
|
| replacement = replacement.replace('^', ', ')
|
| index = negative.index(keyword)
|
| negative[index] = replacement
|
| return negative
|
| negative = negative.split(',')
|
| negative = [neg.strip() for neg in negative]
|
| prompt = _prompt.split(',')
|
| prompt = [key.strip() for key in prompt]
|
| commands = [cmd.strip() for cmd in user_input.split(',') if not cmd.strip().startswith('#')]
|
| for command in commands:
|
| condition, cmd = parse_conditional_command(command)
|
| if check_condition(prompt, condition, rating):
|
| negative = execute_command(negative, cmd)
|
| return ', '.join(negative)
|
|
|
| def image_to_base64(image):
|
| image_bytesIO = io.BytesIO()
|
| image.save(image_bytesIO, format="PNG")
|
| return base64.b64encode(image_bytesIO.getvalue()).decode()
|
|
|
| def make_turbo_prompt(gen_request):
|
| lines = gen_request['prompt']
|
| result = {
|
| "boys": False,
|
| "girls": False,
|
| "1girl": False,
|
| "1boy": False,
|
| "1other": False,
|
| "others": False
|
| }
|
| state = {
|
| "nude,": False,
|
| "pov,": False,
|
| "cum,": False,
|
| "after ": False,
|
| "pussy juice": False,
|
| "barefoot": False,
|
| "breasts": False,
|
| "ejaculation": False,
|
| }
|
|
|
| def insert_spaces(source_list, reference_list):
|
| modified_list = source_list.copy()
|
| for index, keyword in enumerate(reference_list):
|
| if keyword not in source_list:
|
| space_count = len(keyword)
|
| modified_list.insert(index, ' ' * space_count)
|
| return modified_list
|
|
|
| keywords = gen_request['prompt'].split(', ')
|
| filtered_keywords = []
|
| removed_indices = []
|
| positive0, positive1, positive2, positive3 = gen_request.copy(),gen_request.copy(),gen_request.copy(),gen_request.copy()
|
|
|
| for word in result.keys():
|
| if word in lines:
|
| result[word] = True
|
| for word in state.keys():
|
| if word in gen_request['prompt']:
|
| state[word] = True
|
|
|
| key_index = int((len(keywords)/2)-1)
|
|
|
| if(result["1boy"]) or (result["boys"]):
|
| if(result["1girl"]):
|
| if(', sex' in gen_request['prompt'] or 'group sex' in gen_request['prompt']):
|
| sex_pos_keywords = ['stomach bulge','insertion', 'fucked silly', 'x-ray', 'orgasm', 'cross-section', 'uterus', 'overflow', 'rape', 'vaginal', 'anal']
|
| facial_keywords = ['tongue','ahegao']
|
| temp_sex_pos = []
|
| temp_facial = []
|
| cum_events = []
|
| explicit_check = []
|
| if 'open mouth' in keywords: keywords.remove('open mouth')
|
| if 'closed mouth' in keywords: keywords.remove('closed mouth')
|
| if 'after rape' in keywords:
|
| keywords.remove('after rape')
|
| explicit_check.append('after rape')
|
| if 'used condom' in keywords:
|
| keywords.remove('used condom')
|
| explicit_check.append('used condom')
|
| for keyword in keywords:
|
| if ('sex' not in keyword and 'cum' not in keyword and 'ejaculation' not in keyword and 'vaginal' not in keyword and 'penetration' not in keyword) and all(sex_pos not in keyword for sex_pos in sex_pos_keywords) and all(facial not in keyword for facial in facial_keywords):
|
| filtered_keywords.append(keyword)
|
| elif 'sex' in keyword:
|
| removed_indices.append(keyword)
|
| elif 'penetration' in keyword:
|
| removed_indices.append(keyword)
|
| elif 'cum' in keyword and keyword != 'cum':
|
| cum_events.append(keyword)
|
| elif any(sex_pos in keyword for sex_pos in sex_pos_keywords):
|
| for sex_pos in sex_pos_keywords:
|
| if sex_pos in keyword:
|
| temp_sex_pos.append(sex_pos)
|
| elif any(facial not in keyword for facial in facial_keywords):
|
| for facial in facial_keywords:
|
| if facial in keyword:
|
| temp_facial.append(facial)
|
| filtered_keywords.insert(int((len(filtered_keywords)/2)-1), ' no penetration, imminent penetration')
|
| filtered_keywords_positive0 = filtered_keywords.copy()
|
| filtered_keywords.remove(' no penetration, imminent penetration')
|
|
|
| if 'condom' in filtered_keywords and 'condom on penis' not in filtered_keywords:
|
| t_index = filtered_keywords.index('condom')
|
| rand_num = random.randint(0,2)
|
| if rand_num == 1: filtered_keywords.insert(t_index, 'condom on penis')
|
| for i, keyword in enumerate(filtered_keywords):
|
| if 'pantyhose' in keyword:
|
| filtered_keywords[i] = 'torn ' + filtered_keywords[i]
|
|
|
| key_index = int((len(filtered_keywords)/2)-1)
|
| if 'pussy' in filtered_keywords: key_index = filtered_keywords.index('pussy')
|
| if 'penis' in filtered_keywords: key_index = filtered_keywords.index('penis')
|
| filtered_keywords[key_index:key_index] = ['motion lines', 'surprised']
|
| for keyword in removed_indices:
|
| if 'cum' not in keyword and 'ejaculation' not in keyword:
|
| filtered_keywords.insert(key_index,keyword)
|
| if(temp_sex_pos): filtered_keywords[key_index:key_index] = temp_sex_pos
|
| if 'group sex' in filtered_keywords and 'sex' not in filtered_keywords:
|
| t_index = filtered_keywords.index('group sex')
|
| filtered_keywords.insert(t_index, 'sex')
|
| if('clothed sex' in filtered_keywords and not 'bottomless' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('clothed sex')+1, 'bottomless')
|
| pos1_copied_keywords = filtered_keywords.copy()
|
| for i, keyword in enumerate(pos1_copied_keywords):
|
| if 'closed eyes' in keyword:
|
| rand_num = random.randint(0,2)
|
| if(rand_num == 0): pos1_copied_keywords[i] = 'half-' + pos1_copied_keywords[i]
|
| elif(rand_num == 1 and 'closed eyes' in pos1_copied_keywords):
|
| pos1_copied_keywords.remove('closed eyes')
|
| filtered_keywords[i] = 'half-closed eyes'
|
| filtered_keywords_positive1 = pos1_copied_keywords.copy()
|
|
|
| key_index = filtered_keywords.index('surprised')
|
| filtered_keywords.remove('surprised')
|
| filtered_keywords[key_index:key_index] = ["ejaculation","cum"] if "condom on penis" not in filtered_keywords else ["twitching penis", "[[[[orgasm]]]]"]
|
| for keyword in removed_indices:
|
| if 'cum' in keyword:
|
| filtered_keywords.insert(key_index,keyword)
|
| if(temp_facial): filtered_keywords[key_index:key_index] =temp_facial
|
| filtered_keywords_positive2 = filtered_keywords.copy()
|
|
|
| for i, keyword in enumerate(filtered_keywords):
|
| if 'closed eyes' in keyword:
|
| rand_num = random.randint(0,2)
|
| if(rand_num == 0 and filtered_keywords[i] != 'half-closed eyes'): filtered_keywords[i] = 'half-' + filtered_keywords[i]
|
| elif(rand_num == 1): filtered_keywords[i] = 'empty eyes'
|
| else: filtered_keywords[i] = 'empty eyes, half-closed eyes'
|
| if 'sex' in filtered_keywords:
|
| key_index = filtered_keywords.index('sex')
|
| elif 'group sex' in filtered_keywords:
|
| key_index = filtered_keywords.index('group sex')
|
| if "condom on penis" not in filtered_keywords: filtered_keywords.remove('ejaculation')
|
| else:
|
| filtered_keywords.remove('twitching penis')
|
| filtered_keywords.remove('[[[[orgasm]]]]')
|
| filtered_keywords[key_index:key_index] = ['cum drip', 'erection'] + cum_events if "condom on penis" not in filtered_keywords else ["used condom", "{{used condom on penis}}"]
|
| if(explicit_check): filtered_keywords[key_index:key_index] = explicit_check
|
| if 'sex' in filtered_keywords and 'group sex' not in filtered_keywords:
|
| if('pussy' in filtered_keywords and not 'anal' in filtered_keywords):
|
| if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy')
|
| else: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy, pussy juice puddle')
|
| elif('anal' in filtered_keywords):
|
| if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anal, cum in ass')
|
| else: filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anal, pussy juice')
|
| filtered_keywords.insert(filtered_keywords.index('sex'), 'after sex')
|
| filtered_keywords.remove('sex')
|
| elif 'group sex' in filtered_keywords:
|
| if('vaginal' in filtered_keywords and not 'anal' in filtered_keywords):
|
| filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after vaginal, spread pussy')
|
| if 'multiple penises' in filtered_keywords:
|
| if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake')
|
| else: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'pussy juice, pussy juice puddle')
|
| elif('anal' in filtered_keywords):
|
| if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus, cum in ass')
|
| else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus')
|
| if 'multiple penises' in filtered_keywords:
|
| if "condom on penis" not in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake')
|
| else: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'pussy juice')
|
| else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'cum on body, {bukkake}')
|
| temp_post_keyword = []
|
| for keyword in sex_pos_keywords:
|
| if not (keyword == 'orgasm' or keyword == 'overflow'):
|
| if keyword in filtered_keywords:
|
| temp_post_keyword.append(keyword)
|
| for keyword in temp_post_keyword:
|
| filtered_keywords.remove(keyword)
|
|
|
| positive0['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive0, filtered_keywords)).strip()
|
| positive1['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive1, filtered_keywords)).strip()
|
| positive2['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive2, filtered_keywords)).strip()
|
| positive3['prompt'] = ', '.join(filtered_keywords).strip()
|
| positive0["type"] = "turbo"
|
| positive1["type"] = "turbo"
|
| positive2["type"] = "turbo"
|
| positive3["type"] = "turbo"
|
| return positive0, positive1, positive2, positive3
|
|
|
| def generate_image(access_token, prompt, model, action, parameters):
|
| if re.match(r'^http[s]?://', access_token):
|
| return generate_image_webui(access_token, prompt, model, action, parameters)
|
| return generate_image_NAI(access_token, prompt, model, action, parameters)
|
|
|
| def generate_image_NAI(access_token, prompt, model, action, parameters):
|
| data = {
|
| "input": prompt,
|
| "model": model,
|
| "action": action,
|
| "parameters": parameters,
|
| }
|
|
|
| if data['parameters']['qualityToggle'] == True:
|
| if "nai-diffusion-furry-3" not in data['model']:
|
| data['input'] += ', best quality, amazing quality, very aesthetic, absurdres'
|
| else:
|
| data['input'] += ', {best quality}, {amazing quality}'
|
|
|
|
|
| if data['parameters']['sampler'] not in ["k_euler", "k_euler_ancestral", "k_dpmpp_sde", "k_dpmpp_2s_ancestral", "k_dpmpp_2m", "k_dpmpp_2m_sde"]:
|
| data['parameters']['sampler'] = "k_euler_ancestral"
|
|
|
| if data['parameters']['cfg_rescale'] > 1:
|
| data['parameters']['cfg_rescale'] = 0
|
|
|
| response = requests.post(f"{BASE_URL}/ai/generate-image", json=data, headers={ "Authorization": f"Bearer {access_token}" })
|
|
|
| return response.content
|
|
|
| def augment_image_NAI(gen_request):
|
| def resize_and_fill(image, max_size=None):
|
| if max_size is None:
|
| max_size = gen_request["user_screen_size"]
|
| original_width, original_height = image.size
|
| if original_width > max_size or original_height > max_size:
|
|
|
| image.thumbnail((max_size, max_size))
|
|
|
|
|
| width, height = image.size
|
| new_image = Image.new("RGB", (max_size, max_size), "black")
|
| new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2))
|
| return new_image
|
| else:
|
| return image
|
| def log_error(e, output_file_path="output_file_path"):
|
|
|
| current_time = datetime.now().strftime("%m/%d %H:%M:%S")
|
|
|
|
|
| error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n"
|
|
|
|
|
| with open(f"error_log.txt", "a") as file:
|
| file.write(error_message)
|
| access_token = gen_request["access_token"]
|
| mode = gen_request["mode"]
|
| defry = gen_request["defry"]
|
| prompt = gen_request["prompt"]
|
| iw = gen_request["width"]
|
| ih = gen_request["height"]
|
| image = gen_request["image"]
|
| if mode in ["declutter", "lineart", "sketch", "colorize"]: _mode = mode
|
| else:
|
| _mode = "emotion"
|
| mode = mode.lower()
|
| data = {
|
| "req_type": _mode,
|
| "width": iw,
|
| "height": ih,
|
| "image" : image_to_base64(image)
|
| }
|
| dfval = {
|
| "Normal" : 0,
|
| "Slightly Weak" : 1,
|
| "Weak" : 2,
|
| "Even Weaker" : 3,
|
| "Very Weak" : 4,
|
| "Weakest" : 5
|
| }
|
| if _mode == "emotion":
|
| try: data["prompt"] = mode+";;"+prompt+";"
|
| except: data["prompt"] = mode+";"
|
| try: data["defry"] = dfval[defry]
|
| except: data["defry"] = 0
|
| prompt = data["prompt"]
|
| elif _mode == "colorize":
|
| data["prompt"] = prompt
|
| try: data["defry"] = dfval[defry]
|
| except: data["defry"] = 0
|
| else:
|
| prompt = ""
|
| aug_response = requests.post(f"{BASE_URL}/ai/augment-image", json=data, headers={ "Authorization": f"Bearer {access_token}" })
|
| save_folder = gen_request["save_folder"]
|
| additional_folder = ''
|
| if gen_request["png_rule"] == "count":
|
| additional_folder = "/" + gen_request["start_time"]
|
| if "additional_save_folder" in gen_request:
|
| if gen_request["additional_save_folder"]["command1"] != "":
|
| additional_folder += "/" + gen_request["additional_save_folder"]["command1"].replace('/',"_")
|
| if gen_request["additional_save_folder"]["command2"] != "":
|
| additional_folder += "/" + gen_request["additional_save_folder"]["command2"].replace('/',"_")
|
| forbidden_chars = r'[\:*?"<>|]'
|
| additional_folder = re.sub(forbidden_chars, '_', additional_folder)
|
| additional_folder += "/director"
|
| d = Path(save_folder + additional_folder)
|
| d.mkdir(parents=True, exist_ok=True)
|
| try:
|
| zipped = zipfile.ZipFile(io.BytesIO(aug_response.content))
|
| result_list = []
|
| for idx, file_info in enumerate(zipped.infolist()):
|
| image_bytes = zipped.read(file_info)
|
| if gen_request["png_rule"] == "count":
|
| _count = gen_request["count"]
|
| if "batch_size" in gen_request: filename = (d / f"{_count:05}_{idx}.png" )
|
| else: filename = (d / f"{_count:05}.png" )
|
| else:
|
| if "batch_size" in gen_request: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{idx}.png" )
|
| else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" )
|
| filename.write_bytes(image_bytes)
|
| i = Image.open(io.BytesIO(image_bytes))
|
| i = ImageOps.exif_transpose(i).convert("RGB")
|
| if "artist" not in gen_request:
|
| i_resized = resize_and_fill(i)
|
| next = i_resized, prompt, 0, i.info, str(filename)
|
| result_list.append(next)
|
| return i_resized, prompt, 0, i.info, str(filename) if len(result_list) ==1 else result_list
|
| except Exception as e:
|
| try:
|
| if aug_response.content is None:
|
| raise ValueError("Connection broken (Protocol Error)")
|
| error_message = aug_response.decode('utf-8')[2:-2]
|
| except Exception as inner_exception:
|
| error_message = str(inner_exception)
|
| log_error(error_message, "path_to_output_folder")
|
| return None, error_message, 0, None, None
|
|
|
| def upscale_NAI(access_token, image, parameters):
|
| img_str = image_to_base64(image)
|
| _width, _height = image.size
|
| data = {
|
| "image": img_str,
|
| "width": _width,
|
| "height": _height,
|
| "scale": 4,
|
| }
|
| response = requests.post(f"https://api.novelai.net/ai/upscale", json=data, headers={ "Authorization": f"Bearer {access_token}" })
|
| return response.content
|
|
|
|
|
| def convert_prompt(prompt):
|
| keywords = [keyword.strip() for keyword in prompt.split(',')]
|
| converted_keywords = []
|
| for keyword in keywords:
|
|
|
|
|
|
|
|
|
| if '{' in keyword:
|
| keyword = keyword.replace('{', '(').replace('}', ')')
|
| converted_keywords.append(keyword)
|
| return ', '.join(converted_keywords)
|
|
|
| def convert_prompt_ad(prompt, data):
|
| keywords = [keyword.strip() for keyword in prompt.split(',')]
|
| converted_keywords = []
|
| closed_eyes_check = False if "closed eyes" not in keywords else True
|
| for idx, keyword in enumerate(keywords):
|
| if idx > 4 and (keyword in data.bag_of_tags):
|
| if ("eyes" in keyword or "pupils" in keyword) and closed_eyes_check:
|
| continue
|
| keywords[idx] = "((" + keyword + "))"
|
| if 'artist:' in keyword:
|
| keyword = keyword.replace('artist:', '').replace('[','').replace(']','')
|
| keyword = keyword
|
| if '(' in keyword:
|
| keyword = keyword.replace('(', '\(').replace(')', '\)')
|
| if '{' in keyword:
|
| keyword = keyword.replace('{', '(').replace('}', ')')
|
| if '}' in keyword:
|
| keyword = keyword.replace('}', ')')
|
| converted_keywords.append(keyword)
|
| return ', '.join(converted_keywords)
|
|
|
| def interrogate_webui(img, access_token):
|
| img_str = image_to_base64(img)
|
|
|
| req_img = {
|
| "image": img_str,
|
| "threshold": 0.35,
|
| "model": "wd-v1-4-moat-tagger.v2",
|
| "queue": ""
|
| }
|
|
|
| try:
|
| res = requests.post(f"{access_token}/tagger/v1/interrogate", json=req_img)
|
| if res.status_code != 200:
|
| raise Exception(f"Error code: {res.status_code}")
|
|
|
| text = res.json().get('caption', {}).get('tag', {})
|
| text_list = [txt.replace("_", " ") for txt in text.keys()]
|
| _rating = res.json().get('caption', {}).get('rating', {})
|
| rating = max(_rating, key=_rating.get)
|
| if rating == "sensitive": rv = "s"
|
| elif rating == "questionable": rv = "q"
|
| elif rating == "explicit": rv = "e"
|
| else: rv = "g"
|
| return ", ".join(text_list), rv
|
| except Exception as e:
|
| return f"{e} : WEBUI์ Extension์ stable-diffusion-webui-wd14-tagger ๊ฐ ์ ์์ ์ผ๋ก ์ค์น๋์ด์๋์ง ํ์ธํ์ธ์. (WD14 ๊ธฐ์ค๋ฒ์ : e72d984b, 2023-11-04)", None
|
|
|
| def generate_image_webui(access_token, prompt, model, action, parameters):
|
| samplers = {
|
| "k_euler": "Euler",
|
| "k_euler_ancestral": "Euler a",
|
| "k_dpmpp_2s_ancestral": "DPM++ 2S a",
|
| "k_dpmpp_sde": "DPM++ SDE",
|
| "k_dpm_3m_sde": "DPM++ 3M SDE"
|
| }
|
|
|
|
|
| data = {
|
| "input": prompt,
|
| "model": model,
|
| "action": action,
|
| "parameters": parameters,
|
| }
|
|
|
| if data['parameters']['sampler'] in samplers:
|
| data['parameters']['sampler'] = samplers[data['parameters']['sampler']]
|
|
|
| params = {
|
| "prompt": convert_prompt(data['input']) if "nai_enable_AD" not in parameters else convert_prompt_ad(data['input'], parameters["ad_data"]),
|
| "negative_prompt": convert_prompt(data['parameters']['negative_prompt']) if "nai_enable_AD" not in parameters else convert_prompt_ad(data['parameters']['negative_prompt'], parameters["ad_data"]),
|
| "steps": math.floor(data['parameters']['cfg_rescale'] / 0.5) * 0.5,
|
| "width": data['parameters']['width'],
|
| "height": data['parameters']['height'],
|
| "cfg_scale": data['parameters']['scale'],
|
| "sampler_index": data['parameters']['sampler'],
|
| "seed": data['parameters']['seed'],
|
| "seed_resize_from_h": -1,
|
| "seed_resize_from_w": -1,
|
| "denoising_strength": None,
|
| "n_iter": "1",
|
| "batch_size": data['parameters']['n_samples']
|
| }
|
|
|
| if 'scheduler' in parameters:
|
| params["scheduler"] = data['parameters']['scheduler']
|
|
|
| if data['parameters']['enable_hr'] == True:
|
| params['enable_hr'] = True
|
| params["hr_upscaler"] = data['parameters']["hr_upscaler"]
|
| params["hr_scale"] = data['parameters']["hr_scale"]
|
| params["hr_second_pass_steps"] = data['parameters']["hr_second_pass_steps"]
|
| params["denoising_strength"] = data['parameters']["denoising_strength"]
|
|
|
| if data['parameters']['enable_AD'] == True:
|
| params["alwayson_scripts"] = {"ADetailer":
|
| {
|
| "args": [
|
| True,
|
| False if "nai_enable_AD" not in parameters else True,
|
| {
|
| "ad_model": "face_yolov8n.pt",
|
| "ad_prompt": params["prompt"],
|
| "ad_negative_prompt": params["negative_prompt"],
|
| "ad_denoising_strength": 0.4 if "nai_enable_AD" not in parameters else parameters["ad_data_str"],
|
| "ad_inpaint_only_masked": True,
|
| "ad_inpaint_only_masked_padding": 32,
|
| "ad_sampler" : data['parameters']['sampler']
|
|
|
| }
|
| ]
|
| }
|
| }
|
| if "nai_enable_AD" in parameters:
|
| params["steps"] = 28
|
| else:
|
| params["alwayson_scripts"] = {}
|
| if 'enable_TV' in data['parameters'] and data['parameters']['enable_TV'] == True:
|
| params["alwayson_scripts"]["Tiled VAE"] = {}
|
| params["alwayson_scripts"]["Tiled VAE"]["args"] = data['parameters']["tiled_vae_args"]
|
|
|
| if 'pag' in parameters:
|
| params["alwayson_scripts"]["Incantations"] = {}
|
| pag_pre = math.floor(float(data['parameters']['pag']) / 0.5) * 0.5
|
| params["alwayson_scripts"]["Incantations"]["args"] = [True,pag_pre, 0, 150, False, "Constant", 0, 100, False,False,2,0.1,0.5,0,"",0,25, 1, False,False,False,"BREAK","-",0.2,10]
|
|
|
| if "image" in data['parameters']:
|
| params['init_images'] = [data['parameters']['image']]
|
| params['include_init_images'] = True
|
|
|
| if "strength" in data['parameters']:
|
| params['denoising_strength'] = data['parameters']['strength']
|
|
|
| if "mask" in data['parameters']:
|
| params['mask'] = data['parameters']['mask']
|
| params['inpainting_fill'] = 1
|
| params['inpaint_full_res'] = data['parameters']['add_original_image']
|
| params['inpaint_full_res_padding'] = 32
|
| if 'denoising_strength' not in params: params['denoising_strength'] = 0.7
|
|
|
| res = requests.post(f"{access_token}/sdapi/v1/img2img", json=params)
|
| else: res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params)
|
| imageb64s = res.json()['images']
|
| content = None
|
| for b64 in imageb64s:
|
| img = b64.encode()
|
| content = base64.b64decode(img)
|
|
|
| s = io.BytesIO()
|
| zf = zipfile.ZipFile(s, "w")
|
| for idx, b64 in enumerate(imageb64s):
|
| content = base64.b64decode(b64)
|
| zf.writestr(f"generated_{idx}.png", content)
|
| zf.close()
|
| return s.getvalue()
|
|
|
| def generate_guide_image_webui(access_token, parameters):
|
| samplers = {
|
| "k_euler": "Euler",
|
| "k_euler_ancestral": "Euler a",
|
| "k_dpmpp_2s_ancestral": "DPM++ 2S a",
|
| "k_dpmpp_sde": "DPM++ SDE",
|
| "k_dpm_3m_sde": "DPM++ 3M SDE"
|
| }
|
|
|
| params = {
|
| "prompt": convert_prompt(parameters['prompt']),
|
| "negative_prompt": convert_prompt(parameters['negative prompt']),
|
| "steps": parameters['steps'],
|
| "width": parameters["width"],
|
| "height": parameters["height"],
|
| "cfg_scale": parameters['cfg_scale'],
|
| "sampler_index": samplers[parameters['sampler']] if parameters['sampler'] in samplers else parameters['sampler'],
|
| "seed": parameters['seed'],
|
| "seed_resize_from_h": -1,
|
| "seed_resize_from_w": -1,
|
| "denoising_strength": None,
|
| "n_iter": "1",
|
| "batch_size": 1
|
| }
|
| additional_folder = "/guide"
|
| if "start_time" in parameters and parameters["png_rule"] == "count":
|
| additional_folder = "/" + parameters["start_time"] + "/guide"
|
| if "save_folder" in parameters:
|
| save_folder = parameters["save_folder"]
|
| try:
|
| res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params)
|
| imageb64s = res.json()['images']
|
| image_data = base64.b64decode(imageb64s[0])
|
| image_file = io.BytesIO(image_data)
|
| i = Image.open(image_file)
|
| if "save_folder" in parameters:
|
| s = io.BytesIO()
|
| zf = zipfile.ZipFile(s, "w")
|
| for idx, b64 in enumerate(imageb64s):
|
| img_data = base64.b64decode(b64)
|
| img = Image.open(io.BytesIO(img_data))
|
| jpeg_buffer = io.BytesIO()
|
| try: img.save(jpeg_buffer, format="JPEG", exif=img.info.get('parameters').encode('utf-8'))
|
| except: img.save(jpeg_buffer, format="JPEG")
|
| jpeg_content = jpeg_buffer.getvalue()
|
| zf.writestr(f"generated_{idx}.jpg", jpeg_content)
|
| zf.close()
|
| d = Path(save_folder + additional_folder)
|
| d.mkdir(parents=True, exist_ok=True)
|
| zipped = zipfile.ZipFile(io.BytesIO(s.getvalue()))
|
| result_list = []
|
| for idx, file_info in enumerate(zipped.infolist()):
|
| image_bytes = zipped.read(file_info)
|
| filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" )
|
| filename.write_bytes(image_bytes)
|
| except:
|
| i = Image.new('RGB', (768, 768), 'white')
|
| return i
|
|
|
| def generate_typer_image_webui(access_token, parameters):
|
| params = {
|
| "prompt": convert_prompt(parameters['prompt']),
|
| "negative_prompt": convert_prompt(parameters['negative prompt']),
|
| "steps": parameters['steps'],
|
| "width": parameters["width"],
|
| "height": parameters["height"],
|
| "cfg_scale": parameters['cfg_scale'],
|
| "sampler_index": parameters['sampler'],
|
| "seed": parameters['seed'],
|
| "seed_resize_from_h": -1,
|
| "seed_resize_from_w": -1,
|
| "denoising_strength": None,
|
| "n_iter": "1",
|
| "batch_size": 1
|
| }
|
| if "scheduler" in parameters:
|
| params["scheduler"] = parameters["scheduler"]
|
| try:
|
| res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params)
|
| imageb64s = res.json()['images']
|
| image_data = base64.b64decode(imageb64s[0])
|
| image_file = io.BytesIO(image_data)
|
| i = Image.open(image_file)
|
| except:
|
| i = Image.new('RGB', (768, 768), 'white')
|
| return i
|
|
|
| def generate_dtg(access_token, parameters):
|
| params = {
|
| "prompt": parameters['prompt'],
|
| "negative_prompt": parameters['negative prompt'],
|
| "steps": 1,
|
| "width": 64,
|
| "height": 64,
|
| "cfg_scale": 1,
|
| "sampler_index": "Euler a",
|
| "seed": 1,
|
| "seed_resize_from_h": -1,
|
| "seed_resize_from_w": -1,
|
| "denoising_strength": None,
|
| "n_iter": "1",
|
| "batch_size": 1,
|
| }
|
| if parameters["format"] == "Animagine":
|
| format = "<|special|>, <|characters|>, <|copyrights|>, <|artist|>, <|general|>, <|quality|>, <|meta|>, <|rating|>"
|
| elif parameters["format"] == "Pony":
|
| format = "<|quality|>, <|special|>, <|characters|>, <|copyrights|>, <|artist|>, <|general|>, <|meta|>, <|rating|>"
|
|
|
| params["alwayson_scripts"] = {}
|
| params["alwayson_scripts"]["DanTagGen"] = {
|
| "args" : [
|
| True,
|
| "After applying other prompt processings",
|
| -1,
|
| parameters["length"],
|
| params["negative_prompt"],
|
| format,
|
| parameters["temp"],
|
| 0.95,
|
| 100,
|
| "KBlueLeaf/DanTagGen-delta-rev2",
|
| False,
|
| False
|
| ]
|
| }
|
| try:
|
| res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params)
|
| imageb64s = res.json()['images']
|
| image_data = base64.b64decode(imageb64s[0])
|
| image_file = io.BytesIO(image_data)
|
| i = Image.open(image_file)
|
| except:
|
| return ""
|
| return i.info
|
|
|
| def generate(gen_request, _naia):
|
| def parse_and_execute_commands(_prompt, negative, user_input, rating):
|
| negative = negative.split(',')
|
| negative = [neg.strip() for neg in negative]
|
| prompt = _prompt.split(',')
|
| prompt = [key.strip() for key in prompt]
|
| commands = [cmd.strip() for cmd in user_input.split(',') if not cmd.strip().startswith('#')]
|
| for command in commands:
|
| condition, cmd = parse_conditional_command(command)
|
| if check_condition(prompt, condition, rating):
|
| negative = execute_command(negative, cmd)
|
| return ', '.join(negative)
|
|
|
| def parse_conditional_command(command):
|
| match = re.match(r"\((.*?)\)\:(.*)", command)
|
| if match:
|
| return match.groups()
|
| return '', command
|
|
|
| def check_condition(prompt, condition, rating):
|
| if not condition:
|
| return True
|
| sub_conditions = re.split(r'\)\s*&\s*\(', condition)
|
| sub_conditions = [re.sub(r'^\(|\)$', '', cond) for cond in sub_conditions]
|
|
|
| results = []
|
| for sub_cond in sub_conditions:
|
| if '&' in sub_cond:
|
| results.append(all(check_condition(prompt, cond, rating) for cond in sub_cond.split('&')))
|
| elif '|' in sub_cond:
|
| results.append(any(check_condition(prompt, cond, rating) for cond in sub_cond.split('|')))
|
| else:
|
| if sub_cond in ['e', 'q', 's', 'g']:
|
| results.append(sub_cond == rating)
|
| elif sub_cond in ['~e', '~q', '~s', '~g']:
|
| results.append(sub_cond != rating)
|
|
|
| elif sub_cond.startswith('*'):
|
| results.append(sub_cond[1:] in prompt)
|
|
|
| elif sub_cond.startswith('~!'):
|
| results.append(sub_cond[2:] not in prompt)
|
| elif sub_cond.startswith('~'):
|
| results.append(any(sub_cond[1:] not in element for element in prompt))
|
|
|
| else:
|
| results.append(any(sub_cond in element for element in prompt))
|
| return all(results)
|
|
|
| def execute_command(negative, command):
|
| if '+=' in command:
|
| keyword, addition = command.split('+=', 1)
|
| addition = addition.replace('^', ', ')
|
| return insert_text_after_keyword(negative, keyword, addition)
|
| elif command.startswith('add '):
|
| keyword = command[4:]
|
| keyword = keyword.replace('^', ', ')
|
| keys = keyword.split(',')
|
| keys = [key.strip() for key in keys]
|
| for key in keys:
|
| if key not in negative:
|
| negative.append(key)
|
| return negative
|
| elif command.startswith('rem '):
|
| keyword = command[4:]
|
| keyword = keyword.replace('^', ', ')
|
| keys = keyword.split(',')
|
| keys = [key.strip() for key in keys]
|
| for key in keys:
|
| if key in negative:
|
| negative.remove(key)
|
| return negative
|
| elif '=' in command:
|
| keyword, replacement = command.split('=', 1)
|
| if keyword in negative:
|
| replacement = replacement.replace('^', ', ')
|
| index = negative.index(keyword)
|
| negative[index] = replacement
|
| return negative
|
|
|
| def insert_text_after_keyword(negative, user_keyword, user_additional_keyword):
|
| if user_keyword in negative:
|
| index = negative.index(user_keyword) + 1
|
| negative.insert(index, user_additional_keyword)
|
| return negative
|
|
|
| def open_random_png(folderpath):
|
| files = os.listdir(folderpath)
|
| png_files = [f for f in files if f.endswith('.png')]
|
|
|
| if not png_files:
|
| return None
|
|
|
| random_png = random.choice(png_files)
|
| img = Image.open(os.path.join(folderpath, random_png))
|
|
|
| return img
|
|
|
| try: seed = int(gen_request["seed"])
|
| except: seed = random.randint(0,9999999999)
|
|
|
| try:
|
| width = int(gen_request["width"])
|
| height = int(gen_request["height"])
|
| except:
|
| width, height= 1024, 1024
|
|
|
| params = {
|
| "legacy": False,
|
| "qualityToggle": True if gen_request["quality_toggle"] == 1 else False,
|
| "width": width,
|
| "height": height,
|
| "n_samples": 1 if "batch_size" not in gen_request else gen_request["batch_size"],
|
| "seed": seed,
|
| "extra_noise_seed": random.randint(0,9999999999),
|
| "sampler": gen_request["sampler"],
|
| "steps": 28 if "steps" not in gen_request and gen_request["type"]!="upper" else gen_request["steps"],
|
| "scale": gen_request["scale"],
|
| "negative_prompt": ', '.join([keyword.strip() for keyword in gen_request["negative"].split(',') if not keyword.strip().startswith('#')]),
|
| "sm" : True if gen_request["sema"] == 1 else False,
|
| "sm_dyn" : True if gen_request["sema_dyn"] == 1 else False,
|
| "dynamic_thresholding": True if ("dynamic_thresholding" in gen_request and gen_request["dynamic_thresholding"] == True) else False,
|
| "controlnet_strength": 1.0,
|
| "add_original_image": False,
|
| "cfg_rescale": gen_request["cfg_rescale"],
|
| "noise_schedule": gen_request["noise_schedule"],
|
| "enable_hr" : gen_request["enable_hr"],
|
| "enable_AD" : gen_request["enable_AD"]
|
| }
|
|
|
| if "skip_cfg_above_sigma" in gen_request and gen_request["skip_cfg_above_sigma"] == True:
|
| if float(gen_request["uncond_scale"]) == 19.19:
|
| params["skip_cfg_above_sigma"] = 19.191344202730882
|
| else:
|
| try:
|
| _scale = float(gen_request["uncond_scale"])
|
| params["skip_cfg_above_sigma"] = _scale
|
| except:
|
| params["skip_cfg_above_sigma"] = 19.191344202730882
|
|
|
| if params["enable_hr"] == True:
|
| params["hr_upscaler"] = gen_request["hr_upscaler"]
|
| params["hr_scale"] = gen_request["hr_scale"]
|
| params["hr_second_pass_steps"] = gen_request["hr_second_pass_steps"]
|
| params["denoising_strength"] = gen_request["denoising_strength"]
|
| if "enable_TV" in gen_request and gen_request["enable_TV"] == True:
|
| params["enable_TV"] = True
|
| params["tiled_vae_args"] = gen_request["tiled_vae_args"]
|
|
|
| if "reference_image" in gen_request:
|
| params["reference_image_multiple"] = gen_request["reference_image"] if isinstance(gen_request["reference_image"], list) else [gen_request["reference_image"]]
|
| params["reference_information_extracted_multiple"] = gen_request["reference_information_extracted"] if isinstance(gen_request["reference_information_extracted"], list) else [gen_request["reference_information_extracted"]]
|
| params["reference_strength_multiple"] = gen_request["reference_strength"] if isinstance(gen_request["reference_strength"], list) else [gen_request["reference_strength"]]
|
|
|
| if gen_request["type"]=="inpaint":
|
| if "mask" in gen_request:
|
| params["mask"] = gen_request["mask"]
|
| params['add_original_image'] = gen_request['add_original_image']
|
|
|
| positive = gen_request["prompt"]
|
| positive = re.sub(r'#.*?(\n|,|$)', '', positive)
|
| keywords = [key.strip() for key in positive.split(',')]
|
|
|
| if "cond_negative" in gen_request and gen_request["cond_negative"]:
|
| user_input = gen_request["cond_negative"]
|
| rating = gen_request["rating"]
|
| params["negative_prompt"] = parse_and_execute_commands(positive, params["negative_prompt"], user_input, rating)
|
|
|
| if "repeat" in gen_request:
|
| max = gen_request["repeat_max"]
|
|
|
| for i, key in enumerate(keywords):
|
| if "->" in key:
|
| instant_keyword = [k for k in key.split('->')]
|
| if len(instant_keyword) > gen_request["repeat"]:
|
| current_key = instant_keyword[gen_request["repeat"]]
|
| else:
|
| current_key = instant_keyword[gen_request["repeat"] % len(instant_keyword)]
|
| keywords[i] = ', '.join(current_key.split('^'))
|
|
|
| filename_rule = gen_request["png_rule"]
|
| save_folder = gen_request["save_folder"]
|
|
|
| access_token = gen_request["access_token"]
|
| additional_folder = ""
|
|
|
| request_type = "generate"
|
|
|
| if "*i2i:(" in gen_request["prompt"] and "image" not in gen_request:
|
| try:
|
| start_index = gen_request["prompt"].index('*i2i:(') + len('*i2i:(')
|
| end_index = gen_request["prompt"].index(')', start_index)
|
| i2i_content = gen_request["prompt"][start_index:end_index]
|
| _img, _str = [item.strip() for item in i2i_content.split(':')]
|
| _str = float(_str)
|
| _img = open_random_png(_img)
|
| if _img:
|
| gen_request["image"] = image_to_base64(_img)
|
| gen_request["strength"] = _str
|
| gen_request["noise"] = 0
|
| except:
|
| pass
|
|
|
| if "nai_enable_AD" in gen_request:
|
| params["nai_enable_AD"] = True
|
| params["ad_data"] = gen_request["ad_data"]
|
| params["ad_data_str"] = gen_request["ad_data_str"]
|
|
|
| if "scheduler" in gen_request:
|
| params["scheduler"] = gen_request["scheduler"]
|
| if "pag" in gen_request:
|
| params["pag"] = gen_request["pag"]
|
|
|
| if "image" in gen_request:
|
| params["image"] = gen_request["image"]
|
| if "strength" in gen_request:
|
| params["strength"] = gen_request["strength"]
|
| params["noise"] = gen_request["noise"]
|
| params["sm"] = False
|
| params["sm_dyn"] = False
|
| request_type = "img2img" if "mask" not in gen_request else "infill"
|
|
|
| temp_del = []
|
| for key in keywords:
|
| if key.startswith('*'):
|
| temp_del.append(key)
|
| for key in temp_del:
|
| if key in keywords:
|
| keywords.remove(key)
|
|
|
| positive = ', '.join(keywords)
|
|
|
| def resize_and_fill(image, max_size=None):
|
| if max_size is None:
|
| max_size = gen_request["user_screen_size"]
|
| original_width, original_height = image.size
|
| if original_width > max_size or original_height > max_size:
|
|
|
| image.thumbnail((max_size, max_size))
|
|
|
|
|
| width, height = image.size
|
| new_image = Image.new("RGB", (max_size, max_size), "black")
|
| new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2))
|
| return new_image
|
| else:
|
| return image
|
|
|
| def resize_and_fill_artist(image, max_size=None, _text=""):
|
| if _text != "":
|
| if "_" in _text:
|
| _text0 = _text.split("_")[0]
|
| _text1 = _text.split("_")[1]
|
| _text = _text1+ " : "+_text0
|
| else:
|
| _text1 = _text
|
| if max_size is None:
|
| max_size = gen_request["user_screen_size"]
|
| original_width, original_height = image.size
|
| max_size = (max_size, max_size)
|
| if original_width > max_size[0] or original_height > max_size[1]:
|
|
|
| image.thumbnail(max_size)
|
|
|
|
|
| width, height = image.size
|
| new_image = Image.new("RGB", (max_size[0], max_size[1]), "black")
|
| new_image.paste(image, ((max_size[0] - width) // 2, (max_size[1] - height) // 2))
|
|
|
|
|
| draw = ImageDraw.Draw(new_image)
|
| font_size = 24
|
| font = ImageFont.truetype("arial.ttf", font_size)
|
|
|
| text_x = (max_size[0]) // 2
|
| text_y = (max_size[1] - 40) + (font_size) // 2
|
| bbox = draw.textbbox((text_x, text_y), _text, font=font, anchor="mm")
|
| text_width = bbox[2] - bbox[0]
|
| text_height = bbox[3] - bbox[1]
|
|
|
|
|
| draw.rectangle([text_x - text_width // 2 - 10, text_y - text_height // 2 - 10, text_x + text_width // 2 + 10, text_y + text_height // 2 + 10], fill="white")
|
|
|
|
|
| draw.text((text_x, text_y), _text, fill="black", font=font, anchor="mm")
|
|
|
| return new_image, _text1
|
|
|
|
|
| def log_error(e, output_file_path="output_file_path"):
|
|
|
| current_time = datetime.now().strftime("%m/%d %H:%M:%S")
|
|
|
|
|
| error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n"
|
|
|
|
|
| with open(f"error_log.txt", "a") as file:
|
| file.write(error_message)
|
|
|
| if _naia == "NAID3-Furry":
|
| _m1 = "nai-diffusion-furry-3"
|
| _m2 = "nai-diffusion-furry-3-inpainting"
|
| else:
|
| _m1 = "nai-diffusion-3"
|
| _m2 = "nai-diffusion-3-inpainting"
|
|
|
| try:
|
| zipped_bytes = generate_image(access_token, positive, _m1 if "mask" not in params else _m2, request_type, params)
|
| if gen_request["png_rule"] == "count":
|
| additional_folder = "/" + gen_request["start_time"]
|
| if "additional_save_folder" in gen_request:
|
| if gen_request["additional_save_folder"]["command1"] != "":
|
| additional_folder += "/" + gen_request["additional_save_folder"]["command1"].replace('/',"_")
|
| if gen_request["additional_save_folder"]["command2"] != "":
|
| additional_folder += "/" + gen_request["additional_save_folder"]["command2"].replace('/',"_")
|
| forbidden_chars = r'[\:*?"<>|]'
|
| additional_folder = re.sub(forbidden_chars, '_', additional_folder)
|
| if gen_request["type"] == "turbo":
|
| additional_folder += "/turbo"
|
| if ("hires" in gen_request and gen_request["hires"]) or ("enable_hr" in gen_request and gen_request["enable_hr"] and "http" in access_token) or (int(gen_request["width"]) * int(gen_request["height"]) > 1048576):
|
| additional_folder += "/hires"
|
| if "auto_i2i" in gen_request and gen_request["auto_i2i"]:
|
| if gen_request["png_rule"] == "count": additional_folder = "/" + gen_request["start_time"] + "/autoi2i"
|
| else: additional_folder = "/autoi2i"
|
| if "nai_enable_AD" in gen_request and gen_request["nai_enable_AD"]:
|
| additional_folder += "/Adetailer"
|
| if "webui_nai_i2i" in gen_request and gen_request["webui_nai_i2i"] and "http" not in access_token:
|
| additional_folder += "/webui_nai_i2i"
|
| d = Path(save_folder + additional_folder)
|
| d.mkdir(parents=True, exist_ok=True)
|
| zipped = zipfile.ZipFile(io.BytesIO(zipped_bytes))
|
| result_list = []
|
| for idx, file_info in enumerate(zipped.infolist()):
|
| image_bytes = zipped.read(file_info)
|
| if gen_request["png_rule"] == "count":
|
| _count = gen_request["count"]
|
| if "batch_size" in gen_request: filename = (d / f"{_count:05}_{idx}.png" )
|
| else: filename = (d / f"{_count:05}.png" )
|
| else:
|
| if "batch_size" in gen_request: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{idx}.png" )
|
| else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" )
|
| filename.write_bytes(image_bytes)
|
| i = Image.open(io.BytesIO(image_bytes))
|
| i = ImageOps.exif_transpose(i).convert("RGB")
|
| if "artist" not in gen_request:
|
| i_resized = resize_and_fill(i)
|
| else:
|
| i_resized, _i_temp = resize_and_fill_artist(i, None, gen_request["artist"])
|
| if not os.path.exists(f"artist_thumb"):
|
| os.makedirs(f"artist_thumb")
|
| model_name = gen_request["artist_model"]
|
| if not os.path.exists(f"artist_thumb/{model_name}"):
|
| os.makedirs(f"artist_thumb/{model_name}")
|
| _counts = gen_request["artist"].split("_")[0]
|
| i_resized.save(f"artist_thumb/{model_name}/{_i_temp}.jpg")
|
| if not os.path.exists(f"artist_thumb/{model_name}/add_count"):
|
| os.makedirs(f"artist_thumb/{model_name}/add_count")
|
| i_resized.save(f"artist_thumb/{model_name}/add_count/{_counts}_{_i_temp}.jpg")
|
| next = i_resized, positive, params['seed'], i.info, str(filename)
|
| result_list.append(next)
|
| return i_resized, positive, params['seed'], i.info, str(filename) if len(result_list) ==1 else result_list
|
| except Exception as e:
|
| try:
|
| if zipped_bytes is None:
|
| raise ValueError("Connection broken (Protocol Error)")
|
| error_message = zipped_bytes.decode('utf-8')[2:-2]
|
| except Exception as inner_exception:
|
| error_message = str(inner_exception)
|
| log_error(error_message, "path_to_output_folder")
|
| return None, error_message, params['seed'], None, None |