x / Salia_Upload_TMP.py
saliacoel's picture
Upload Salia_Upload_TMP.py
ca580d9 verified
import io
import os
import re
import zipfile
from urllib.parse import quote
import numpy as np
REPO_ID = "saliacoel/tmp"
REPO_TYPE = "model"
BRANCH = "main"
CATEGORY = "Salia/HuggingFace"
_INVALID_FILENAME_CHARS = re.compile(r'[<>:"\\|?*\x00-\x1F]')
def _require_non_empty_string(value, name):
if value is None:
raise ValueError(f"{name} is required.")
value = str(value).strip()
if not value:
raise ValueError(f"{name} must not be empty.")
return value
def _sanitize_filename(filename: str) -> str:
filename = _require_non_empty_string(filename, "filename")
filename = filename.replace("\\", "/").split("/")[-1]
filename = _INVALID_FILENAME_CHARS.sub("_", filename)
filename = filename.strip().strip(".")
if not filename:
raise ValueError("filename became empty after sanitization.")
return filename
def _strip_known_suffixes(filename: str, suffixes) -> str:
base = _sanitize_filename(filename)
lower = base.lower()
for suffix in suffixes:
if lower.endswith(suffix.lower()):
return base[: -len(suffix)]
return base
def _ensure_extension(filename: str, extension: str) -> str:
filename = _sanitize_filename(filename)
if filename.lower().endswith(extension.lower()):
return filename
return f"{filename}{extension}"
def _quote_repo_path(path_in_repo: str) -> str:
return "/".join(quote(part, safe="") for part in path_in_repo.split("/"))
def _repo_file_url(path_in_repo: str) -> str:
quoted_path = _quote_repo_path(path_in_repo)
return f"https://huggingface.co/{REPO_ID}/resolve/{BRANCH}/{quoted_path}"
def _import_hf_api():
try:
from huggingface_hub import HfApi
except Exception as exc:
raise RuntimeError(
"Missing dependency 'huggingface_hub'. Install it in the ComfyUI Python environment with: pip install huggingface_hub"
) from exc
return HfApi
def _upload_bytes(file_bytes: bytes, path_in_repo: str, hf_token: str, commit_message: str):
hf_token = _require_non_empty_string(hf_token, "hf_token")
path_in_repo = _sanitize_filename(path_in_repo)
HfApi = _import_hf_api()
api = HfApi(token=hf_token)
commit_info = api.upload_file(
path_or_fileobj=file_bytes,
path_in_repo=path_in_repo,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=hf_token,
commit_message=commit_message,
)
commit_url = getattr(commit_info, "commit_url", None)
if not commit_url:
commit_url = str(commit_info)
return path_in_repo, _repo_file_url(path_in_repo), commit_url
def _tensor_to_png_bytes(img):
try:
from PIL import Image
except Exception as exc:
raise RuntimeError(
"Missing dependency 'Pillow'. Install it in the ComfyUI Python environment with: pip install pillow"
) from exc
if img is None:
raise ValueError("img is required.")
if hasattr(img, "detach"):
img = img.detach()
if hasattr(img, "cpu"):
img = img.cpu()
if hasattr(img, "numpy"):
arr = img.numpy()
else:
arr = np.asarray(img)
if arr.ndim == 4:
if arr.shape[0] == 0:
raise ValueError("Received an empty image batch.")
if arr.shape[0] != 1:
raise ValueError(
f"Expected a single image, but received a batch of {arr.shape[0]}. Use an image selector/split node first."
)
arr = arr[0]
if arr.ndim != 3:
raise ValueError(f"Expected image tensor with 3 dimensions after batch removal, got shape {arr.shape}.")
channels = int(arr.shape[2])
if channels not in (3, 4):
raise ValueError(
f"Expected 3 channels (RGB) or 4 channels (RGBA), but got {channels} channels."
)
if np.issubdtype(arr.dtype, np.floating):
arr = np.clip(arr, 0.0, 1.0)
arr = np.rint(arr * 255.0).astype(np.uint8)
else:
arr = np.clip(arr, 0, 255).astype(np.uint8)
mode = "RGBA" if channels == 4 else "RGB"
pil_image = Image.fromarray(arr, mode=mode)
buffer = io.BytesIO()
pil_image.save(buffer, format="PNG")
return buffer.getvalue(), mode
def _text_to_bytes(text: str) -> bytes:
if text is None:
text = ""
return str(text).encode("utf-8")
def _build_split_text_files(text: str, filename: str, linebreaks_per_file: int):
base_name = _strip_known_suffixes(filename, [".zip", ".txt"])
text = "" if text is None else str(text)
if linebreaks_per_file is None:
linebreaks_per_file = -1
try:
linebreaks_per_file = int(linebreaks_per_file)
except Exception as exc:
raise ValueError("linebreaks_per_file must be an integer.") from exc
if linebreaks_per_file <= 0:
return [(f"{base_name}.txt", text)]
lines = text.splitlines()
if not lines:
return [(f"{base_name}_1_to_1.txt", "")]
files = []
for start_idx in range(0, len(lines), linebreaks_per_file):
end_idx = min(start_idx + linebreaks_per_file, len(lines))
start_line = start_idx + 1
end_line = end_idx
chunk_name = f"{base_name}_{start_line}_to_{end_line}.txt"
chunk_text = "\n".join(lines[start_idx:end_idx])
files.append((chunk_name, chunk_text))
return files
def _zip_named_text_files(named_files):
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for arcname, text in named_files:
zf.writestr(arcname, _text_to_bytes(text))
return buffer.getvalue()
class _SaliaUploadBase:
CATEGORY = CATEGORY
OUTPUT_NODE = True
RETURN_TYPES = ("STRING", "STRING", "STRING")
RETURN_NAMES = ("path_in_repo", "file_url", "commit_url")
class Salia_Upload_TMP_img(_SaliaUploadBase):
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"img": ("IMAGE", {}),
"hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}),
"filename": ("STRING", {"default": "image", "multiline": False}),
}
}
FUNCTION = "upload"
DESCRIPTION = "Upload one RGB or RGBA image as a PNG to saliacoel/tmp on Hugging Face Hub."
SEARCH_ALIASES = ["upload png to hf", "salia tmp image upload"]
def upload(self, img, hf_token, filename):
png_name = _ensure_extension(filename, ".png")
png_bytes, mode = _tensor_to_png_bytes(img)
return _upload_bytes(
png_bytes,
png_name,
hf_token,
f"ComfyUI upload {png_name} ({mode})",
)
class Salia_Upload_TMP_txt(_SaliaUploadBase):
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"text": ("STRING", {"default": "", "multiline": True}),
"hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}),
"filename": ("STRING", {"default": "text", "multiline": False}),
}
}
FUNCTION = "upload"
DESCRIPTION = "Upload text as a UTF-8 .txt file to saliacoel/tmp on Hugging Face Hub."
SEARCH_ALIASES = ["upload txt to hf", "salia tmp text upload"]
def upload(self, text, hf_token, filename):
txt_name = _ensure_extension(filename, ".txt")
txt_bytes = _text_to_bytes(text)
return _upload_bytes(
txt_bytes,
txt_name,
hf_token,
f"ComfyUI upload {txt_name}",
)
class Salia_Upload_TMP_split_txt_to_zip(_SaliaUploadBase):
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"text": ("STRING", {"default": "", "multiline": True}),
"hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}),
"filename": ("STRING", {"default": "text_bundle", "multiline": False}),
"linebreaks_per_file": ("INT", {"default": -1, "min": -1, "max": 100000000, "step": 1}),
}
}
FUNCTION = "upload"
DESCRIPTION = "Split text into numbered .txt files, zip them, and upload the .zip to saliacoel/tmp on Hugging Face Hub."
SEARCH_ALIASES = [
"Salia_Upload_TMP_split_txt",
"split txt to zip",
"upload zip to hf",
]
def upload(self, text, hf_token, filename, linebreaks_per_file):
base_name = _strip_known_suffixes(filename, [".zip", ".txt"])
zip_name = f"{base_name}.zip"
named_files = _build_split_text_files(text, base_name, linebreaks_per_file)
zip_bytes = _zip_named_text_files(named_files)
return _upload_bytes(
zip_bytes,
zip_name,
hf_token,
f"ComfyUI upload {zip_name} ({len(named_files)} files)",
)
NODE_CLASS_MAPPINGS = {
"Salia_Upload_TMP_img": Salia_Upload_TMP_img,
"Salia_Upload_TMP_txt": Salia_Upload_TMP_txt,
"Salia_Upload_TMP_split_txt_to_zip": Salia_Upload_TMP_split_txt_to_zip,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"Salia_Upload_TMP_img": "Salia_Upload_TMP_img",
"Salia_Upload_TMP_txt": "Salia_Upload_TMP_txt",
"Salia_Upload_TMP_split_txt_to_zip": "Salia_Upload_TMP_split_txt_to_zip",
}
__all__ = ["NODE_CLASS_MAPPINGS", "NODE_DISPLAY_NAME_MAPPINGS"]