import io import os import re import zipfile from urllib.parse import quote import numpy as np REPO_ID = "saliacoel/tmp" REPO_TYPE = "model" BRANCH = "main" CATEGORY = "Salia/HuggingFace" _INVALID_FILENAME_CHARS = re.compile(r'[<>:"\\|?*\x00-\x1F]') def _require_non_empty_string(value, name): if value is None: raise ValueError(f"{name} is required.") value = str(value).strip() if not value: raise ValueError(f"{name} must not be empty.") return value def _sanitize_filename(filename: str) -> str: filename = _require_non_empty_string(filename, "filename") filename = filename.replace("\\", "/").split("/")[-1] filename = _INVALID_FILENAME_CHARS.sub("_", filename) filename = filename.strip().strip(".") if not filename: raise ValueError("filename became empty after sanitization.") return filename def _strip_known_suffixes(filename: str, suffixes) -> str: base = _sanitize_filename(filename) lower = base.lower() for suffix in suffixes: if lower.endswith(suffix.lower()): return base[: -len(suffix)] return base def _ensure_extension(filename: str, extension: str) -> str: filename = _sanitize_filename(filename) if filename.lower().endswith(extension.lower()): return filename return f"{filename}{extension}" def _quote_repo_path(path_in_repo: str) -> str: return "/".join(quote(part, safe="") for part in path_in_repo.split("/")) def _repo_file_url(path_in_repo: str) -> str: quoted_path = _quote_repo_path(path_in_repo) return f"https://huggingface.co/{REPO_ID}/resolve/{BRANCH}/{quoted_path}" def _import_hf_api(): try: from huggingface_hub import HfApi except Exception as exc: raise RuntimeError( "Missing dependency 'huggingface_hub'. Install it in the ComfyUI Python environment with: pip install huggingface_hub" ) from exc return HfApi def _upload_bytes(file_bytes: bytes, path_in_repo: str, hf_token: str, commit_message: str): hf_token = _require_non_empty_string(hf_token, "hf_token") path_in_repo = _sanitize_filename(path_in_repo) HfApi = _import_hf_api() api = HfApi(token=hf_token) commit_info = api.upload_file( path_or_fileobj=file_bytes, path_in_repo=path_in_repo, repo_id=REPO_ID, repo_type=REPO_TYPE, token=hf_token, commit_message=commit_message, ) commit_url = getattr(commit_info, "commit_url", None) if not commit_url: commit_url = str(commit_info) return path_in_repo, _repo_file_url(path_in_repo), commit_url def _tensor_to_png_bytes(img): try: from PIL import Image except Exception as exc: raise RuntimeError( "Missing dependency 'Pillow'. Install it in the ComfyUI Python environment with: pip install pillow" ) from exc if img is None: raise ValueError("img is required.") if hasattr(img, "detach"): img = img.detach() if hasattr(img, "cpu"): img = img.cpu() if hasattr(img, "numpy"): arr = img.numpy() else: arr = np.asarray(img) if arr.ndim == 4: if arr.shape[0] == 0: raise ValueError("Received an empty image batch.") if arr.shape[0] != 1: raise ValueError( f"Expected a single image, but received a batch of {arr.shape[0]}. Use an image selector/split node first." ) arr = arr[0] if arr.ndim != 3: raise ValueError(f"Expected image tensor with 3 dimensions after batch removal, got shape {arr.shape}.") channels = int(arr.shape[2]) if channels not in (3, 4): raise ValueError( f"Expected 3 channels (RGB) or 4 channels (RGBA), but got {channels} channels." ) if np.issubdtype(arr.dtype, np.floating): arr = np.clip(arr, 0.0, 1.0) arr = np.rint(arr * 255.0).astype(np.uint8) else: arr = np.clip(arr, 0, 255).astype(np.uint8) mode = "RGBA" if channels == 4 else "RGB" pil_image = Image.fromarray(arr, mode=mode) buffer = io.BytesIO() pil_image.save(buffer, format="PNG") return buffer.getvalue(), mode def _text_to_bytes(text: str) -> bytes: if text is None: text = "" return str(text).encode("utf-8") def _build_split_text_files(text: str, filename: str, linebreaks_per_file: int): base_name = _strip_known_suffixes(filename, [".zip", ".txt"]) text = "" if text is None else str(text) if linebreaks_per_file is None: linebreaks_per_file = -1 try: linebreaks_per_file = int(linebreaks_per_file) except Exception as exc: raise ValueError("linebreaks_per_file must be an integer.") from exc if linebreaks_per_file <= 0: return [(f"{base_name}.txt", text)] lines = text.splitlines() if not lines: return [(f"{base_name}_1_to_1.txt", "")] files = [] for start_idx in range(0, len(lines), linebreaks_per_file): end_idx = min(start_idx + linebreaks_per_file, len(lines)) start_line = start_idx + 1 end_line = end_idx chunk_name = f"{base_name}_{start_line}_to_{end_line}.txt" chunk_text = "\n".join(lines[start_idx:end_idx]) files.append((chunk_name, chunk_text)) return files def _zip_named_text_files(named_files): buffer = io.BytesIO() with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: for arcname, text in named_files: zf.writestr(arcname, _text_to_bytes(text)) return buffer.getvalue() class _SaliaUploadBase: CATEGORY = CATEGORY OUTPUT_NODE = True RETURN_TYPES = ("STRING", "STRING", "STRING") RETURN_NAMES = ("path_in_repo", "file_url", "commit_url") class Salia_Upload_TMP_img(_SaliaUploadBase): @classmethod def INPUT_TYPES(cls): return { "required": { "img": ("IMAGE", {}), "hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}), "filename": ("STRING", {"default": "image", "multiline": False}), } } FUNCTION = "upload" DESCRIPTION = "Upload one RGB or RGBA image as a PNG to saliacoel/tmp on Hugging Face Hub." SEARCH_ALIASES = ["upload png to hf", "salia tmp image upload"] def upload(self, img, hf_token, filename): png_name = _ensure_extension(filename, ".png") png_bytes, mode = _tensor_to_png_bytes(img) return _upload_bytes( png_bytes, png_name, hf_token, f"ComfyUI upload {png_name} ({mode})", ) class Salia_Upload_TMP_txt(_SaliaUploadBase): @classmethod def INPUT_TYPES(cls): return { "required": { "text": ("STRING", {"default": "", "multiline": True}), "hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}), "filename": ("STRING", {"default": "text", "multiline": False}), } } FUNCTION = "upload" DESCRIPTION = "Upload text as a UTF-8 .txt file to saliacoel/tmp on Hugging Face Hub." SEARCH_ALIASES = ["upload txt to hf", "salia tmp text upload"] def upload(self, text, hf_token, filename): txt_name = _ensure_extension(filename, ".txt") txt_bytes = _text_to_bytes(text) return _upload_bytes( txt_bytes, txt_name, hf_token, f"ComfyUI upload {txt_name}", ) class Salia_Upload_TMP_split_txt_to_zip(_SaliaUploadBase): @classmethod def INPUT_TYPES(cls): return { "required": { "text": ("STRING", {"default": "", "multiline": True}), "hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}), "filename": ("STRING", {"default": "text_bundle", "multiline": False}), "linebreaks_per_file": ("INT", {"default": -1, "min": -1, "max": 100000000, "step": 1}), } } FUNCTION = "upload" DESCRIPTION = "Split text into numbered .txt files, zip them, and upload the .zip to saliacoel/tmp on Hugging Face Hub." SEARCH_ALIASES = [ "Salia_Upload_TMP_split_txt", "split txt to zip", "upload zip to hf", ] def upload(self, text, hf_token, filename, linebreaks_per_file): base_name = _strip_known_suffixes(filename, [".zip", ".txt"]) zip_name = f"{base_name}.zip" named_files = _build_split_text_files(text, base_name, linebreaks_per_file) zip_bytes = _zip_named_text_files(named_files) return _upload_bytes( zip_bytes, zip_name, hf_token, f"ComfyUI upload {zip_name} ({len(named_files)} files)", ) NODE_CLASS_MAPPINGS = { "Salia_Upload_TMP_img": Salia_Upload_TMP_img, "Salia_Upload_TMP_txt": Salia_Upload_TMP_txt, "Salia_Upload_TMP_split_txt_to_zip": Salia_Upload_TMP_split_txt_to_zip, } NODE_DISPLAY_NAME_MAPPINGS = { "Salia_Upload_TMP_img": "Salia_Upload_TMP_img", "Salia_Upload_TMP_txt": "Salia_Upload_TMP_txt", "Salia_Upload_TMP_split_txt_to_zip": "Salia_Upload_TMP_split_txt_to_zip", } __all__ = ["NODE_CLASS_MAPPINGS", "NODE_DISPLAY_NAME_MAPPINGS"]