| import io |
| import os |
| import re |
| import zipfile |
| from urllib.parse import quote |
|
|
| import numpy as np |
|
|
| REPO_ID = "saliacoel/tmp" |
| REPO_TYPE = "model" |
| BRANCH = "main" |
| CATEGORY = "Salia/HuggingFace" |
|
|
| _INVALID_FILENAME_CHARS = re.compile(r'[<>:"\\|?*\x00-\x1F]') |
|
|
|
|
| def _require_non_empty_string(value, name): |
| if value is None: |
| raise ValueError(f"{name} is required.") |
| value = str(value).strip() |
| if not value: |
| raise ValueError(f"{name} must not be empty.") |
| return value |
|
|
|
|
| def _sanitize_filename(filename: str) -> str: |
| filename = _require_non_empty_string(filename, "filename") |
| filename = filename.replace("\\", "/").split("/")[-1] |
| filename = _INVALID_FILENAME_CHARS.sub("_", filename) |
| filename = filename.strip().strip(".") |
| if not filename: |
| raise ValueError("filename became empty after sanitization.") |
| return filename |
|
|
|
|
| def _strip_known_suffixes(filename: str, suffixes) -> str: |
| base = _sanitize_filename(filename) |
| lower = base.lower() |
| for suffix in suffixes: |
| if lower.endswith(suffix.lower()): |
| return base[: -len(suffix)] |
| return base |
|
|
|
|
| def _ensure_extension(filename: str, extension: str) -> str: |
| filename = _sanitize_filename(filename) |
| if filename.lower().endswith(extension.lower()): |
| return filename |
| return f"{filename}{extension}" |
|
|
|
|
| def _quote_repo_path(path_in_repo: str) -> str: |
| return "/".join(quote(part, safe="") for part in path_in_repo.split("/")) |
|
|
|
|
| def _repo_file_url(path_in_repo: str) -> str: |
| quoted_path = _quote_repo_path(path_in_repo) |
| return f"https://huggingface.co/{REPO_ID}/resolve/{BRANCH}/{quoted_path}" |
|
|
|
|
| def _import_hf_api(): |
| try: |
| from huggingface_hub import HfApi |
| except Exception as exc: |
| raise RuntimeError( |
| "Missing dependency 'huggingface_hub'. Install it in the ComfyUI Python environment with: pip install huggingface_hub" |
| ) from exc |
| return HfApi |
|
|
|
|
| def _upload_bytes(file_bytes: bytes, path_in_repo: str, hf_token: str, commit_message: str): |
| hf_token = _require_non_empty_string(hf_token, "hf_token") |
| path_in_repo = _sanitize_filename(path_in_repo) |
|
|
| HfApi = _import_hf_api() |
| api = HfApi(token=hf_token) |
|
|
| commit_info = api.upload_file( |
| path_or_fileobj=file_bytes, |
| path_in_repo=path_in_repo, |
| repo_id=REPO_ID, |
| repo_type=REPO_TYPE, |
| token=hf_token, |
| commit_message=commit_message, |
| ) |
|
|
| commit_url = getattr(commit_info, "commit_url", None) |
| if not commit_url: |
| commit_url = str(commit_info) |
|
|
| return path_in_repo, _repo_file_url(path_in_repo), commit_url |
|
|
|
|
| def _tensor_to_png_bytes(img): |
| try: |
| from PIL import Image |
| except Exception as exc: |
| raise RuntimeError( |
| "Missing dependency 'Pillow'. Install it in the ComfyUI Python environment with: pip install pillow" |
| ) from exc |
|
|
| if img is None: |
| raise ValueError("img is required.") |
|
|
| if hasattr(img, "detach"): |
| img = img.detach() |
| if hasattr(img, "cpu"): |
| img = img.cpu() |
| if hasattr(img, "numpy"): |
| arr = img.numpy() |
| else: |
| arr = np.asarray(img) |
|
|
| if arr.ndim == 4: |
| if arr.shape[0] == 0: |
| raise ValueError("Received an empty image batch.") |
| if arr.shape[0] != 1: |
| raise ValueError( |
| f"Expected a single image, but received a batch of {arr.shape[0]}. Use an image selector/split node first." |
| ) |
| arr = arr[0] |
|
|
| if arr.ndim != 3: |
| raise ValueError(f"Expected image tensor with 3 dimensions after batch removal, got shape {arr.shape}.") |
|
|
| channels = int(arr.shape[2]) |
| if channels not in (3, 4): |
| raise ValueError( |
| f"Expected 3 channels (RGB) or 4 channels (RGBA), but got {channels} channels." |
| ) |
|
|
| if np.issubdtype(arr.dtype, np.floating): |
| arr = np.clip(arr, 0.0, 1.0) |
| arr = np.rint(arr * 255.0).astype(np.uint8) |
| else: |
| arr = np.clip(arr, 0, 255).astype(np.uint8) |
|
|
| mode = "RGBA" if channels == 4 else "RGB" |
| pil_image = Image.fromarray(arr, mode=mode) |
|
|
| buffer = io.BytesIO() |
| pil_image.save(buffer, format="PNG") |
| return buffer.getvalue(), mode |
|
|
|
|
| def _text_to_bytes(text: str) -> bytes: |
| if text is None: |
| text = "" |
| return str(text).encode("utf-8") |
|
|
|
|
| def _build_split_text_files(text: str, filename: str, linebreaks_per_file: int): |
| base_name = _strip_known_suffixes(filename, [".zip", ".txt"]) |
| text = "" if text is None else str(text) |
|
|
| if linebreaks_per_file is None: |
| linebreaks_per_file = -1 |
|
|
| try: |
| linebreaks_per_file = int(linebreaks_per_file) |
| except Exception as exc: |
| raise ValueError("linebreaks_per_file must be an integer.") from exc |
|
|
| if linebreaks_per_file <= 0: |
| return [(f"{base_name}.txt", text)] |
|
|
| lines = text.splitlines() |
| if not lines: |
| return [(f"{base_name}_1_to_1.txt", "")] |
|
|
| files = [] |
| for start_idx in range(0, len(lines), linebreaks_per_file): |
| end_idx = min(start_idx + linebreaks_per_file, len(lines)) |
| start_line = start_idx + 1 |
| end_line = end_idx |
| chunk_name = f"{base_name}_{start_line}_to_{end_line}.txt" |
| chunk_text = "\n".join(lines[start_idx:end_idx]) |
| files.append((chunk_name, chunk_text)) |
|
|
| return files |
|
|
|
|
| def _zip_named_text_files(named_files): |
| buffer = io.BytesIO() |
| with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: |
| for arcname, text in named_files: |
| zf.writestr(arcname, _text_to_bytes(text)) |
| return buffer.getvalue() |
|
|
|
|
| class _SaliaUploadBase: |
| CATEGORY = CATEGORY |
| OUTPUT_NODE = True |
| RETURN_TYPES = ("STRING", "STRING", "STRING") |
| RETURN_NAMES = ("path_in_repo", "file_url", "commit_url") |
|
|
|
|
| class Salia_Upload_TMP_img(_SaliaUploadBase): |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "img": ("IMAGE", {}), |
| "hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}), |
| "filename": ("STRING", {"default": "image", "multiline": False}), |
| } |
| } |
|
|
| FUNCTION = "upload" |
| DESCRIPTION = "Upload one RGB or RGBA image as a PNG to saliacoel/tmp on Hugging Face Hub." |
| SEARCH_ALIASES = ["upload png to hf", "salia tmp image upload"] |
|
|
| def upload(self, img, hf_token, filename): |
| png_name = _ensure_extension(filename, ".png") |
| png_bytes, mode = _tensor_to_png_bytes(img) |
| return _upload_bytes( |
| png_bytes, |
| png_name, |
| hf_token, |
| f"ComfyUI upload {png_name} ({mode})", |
| ) |
|
|
|
|
| class Salia_Upload_TMP_txt(_SaliaUploadBase): |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "text": ("STRING", {"default": "", "multiline": True}), |
| "hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}), |
| "filename": ("STRING", {"default": "text", "multiline": False}), |
| } |
| } |
|
|
| FUNCTION = "upload" |
| DESCRIPTION = "Upload text as a UTF-8 .txt file to saliacoel/tmp on Hugging Face Hub." |
| SEARCH_ALIASES = ["upload txt to hf", "salia tmp text upload"] |
|
|
| def upload(self, text, hf_token, filename): |
| txt_name = _ensure_extension(filename, ".txt") |
| txt_bytes = _text_to_bytes(text) |
| return _upload_bytes( |
| txt_bytes, |
| txt_name, |
| hf_token, |
| f"ComfyUI upload {txt_name}", |
| ) |
|
|
|
|
| class Salia_Upload_TMP_split_txt_to_zip(_SaliaUploadBase): |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "text": ("STRING", {"default": "", "multiline": True}), |
| "hf_token": ("STRING", {"default": "", "multiline": False, "placeholder": "hf_..."}), |
| "filename": ("STRING", {"default": "text_bundle", "multiline": False}), |
| "linebreaks_per_file": ("INT", {"default": -1, "min": -1, "max": 100000000, "step": 1}), |
| } |
| } |
|
|
| FUNCTION = "upload" |
| DESCRIPTION = "Split text into numbered .txt files, zip them, and upload the .zip to saliacoel/tmp on Hugging Face Hub." |
| SEARCH_ALIASES = [ |
| "Salia_Upload_TMP_split_txt", |
| "split txt to zip", |
| "upload zip to hf", |
| ] |
|
|
| def upload(self, text, hf_token, filename, linebreaks_per_file): |
| base_name = _strip_known_suffixes(filename, [".zip", ".txt"]) |
| zip_name = f"{base_name}.zip" |
| named_files = _build_split_text_files(text, base_name, linebreaks_per_file) |
| zip_bytes = _zip_named_text_files(named_files) |
| return _upload_bytes( |
| zip_bytes, |
| zip_name, |
| hf_token, |
| f"ComfyUI upload {zip_name} ({len(named_files)} files)", |
| ) |
|
|
|
|
| NODE_CLASS_MAPPINGS = { |
| "Salia_Upload_TMP_img": Salia_Upload_TMP_img, |
| "Salia_Upload_TMP_txt": Salia_Upload_TMP_txt, |
| "Salia_Upload_TMP_split_txt_to_zip": Salia_Upload_TMP_split_txt_to_zip, |
| } |
|
|
| NODE_DISPLAY_NAME_MAPPINGS = { |
| "Salia_Upload_TMP_img": "Salia_Upload_TMP_img", |
| "Salia_Upload_TMP_txt": "Salia_Upload_TMP_txt", |
| "Salia_Upload_TMP_split_txt_to_zip": "Salia_Upload_TMP_split_txt_to_zip", |
| } |
|
|
| __all__ = ["NODE_CLASS_MAPPINGS", "NODE_DISPLAY_NAME_MAPPINGS"] |
|
|