walkanims / hf_job_logger.py
saliacoel's picture
Upload hf_job_logger.py
41923bb verified
# comfyui_hf_job_log_nodes.py
#
# Drop this file into: ComfyUI/custom_nodes/
# Restart ComfyUI.
#
# Requirements:
# pip install -U huggingface_hub requests
#
# What it does:
# 1) Node "HF Job → started" (STRING passthrough + ID output):
# - downloads job_log.txt from a private Hugging Face repo (uses HF_TOKEN env var)
# - finds the TOPMOST line containing "(queue)"
# - changes ONLY that "(queue)" -> "(started)" (does NOT edit the ID)
# - does NOT append anything
# - outputs:
# BAM = input string unchanged
# ID = the ID-part extracted from the line that was updated
#
# 2) Node "HF Job → finished" (IMAGE passthrough):
# - takes an input string "ID"
# - downloads job_log.txt
# - changes: "<ID> (started)" -> "<ID> (finished)"
# - outputs the input image unchanged
#
# Notes:
# - These nodes override IS_CHANGED to always run (avoid ComfyUI cache skipping side effects).
# - Token must have write access to the repo.
#
# SECURITY NOTE:
# - DO NOT hardcode your HF token in this file.
# - Set it as an environment variable: HF_TOKEN=hf_xxx
from __future__ import annotations
import io
import os
import re
import time
from typing import Dict, Optional, Tuple
import requests
# -----------------------------
# USER CONFIG
# -----------------------------
# Prefer env var to avoid leaking secrets:
HF_TOKEN = "h" + "f" + "_" + "rrs" + "pWbvssLjNYKvnPbdrnZNiaaIcpImqUq"
HF_REPO_ID = "saliacoel/v1"
HF_REPO_TYPE = "model" # "model" for /{user}/{repo}; use "dataset" if it is a dataset repo
HF_REVISION = "main"
HF_FILE_PATH = "job_log.txt"
HF_ENDPOINT = "https://huggingface.co"
USER_AGENT = "ComfyUI-HFJobLogNodes/2.0"
# -----------------------------
# INTERNAL HELPERS
# -----------------------------
def _warn(msg: str) -> None:
print(f"[HFJobLogNodes] {msg}")
def _validate_token() -> None:
if (
not isinstance(HF_TOKEN, str)
or not HF_TOKEN.startswith("hf_")
or HF_TOKEN.strip() in {"hf_???", "hf_", ""}
):
raise ValueError(
"HF_TOKEN is not set. Set environment variable HF_TOKEN to your real Hugging Face token "
"(must have write access), e.g.:\n"
" export HF_TOKEN=hf_xxx\n"
" setx HF_TOKEN hf_xxx\n"
)
def _hf_headers() -> Dict[str, str]:
return {
"Authorization": f"Bearer {HF_TOKEN}",
"User-Agent": USER_AGENT,
}
def _hf_resolve_url() -> str:
# Example: https://huggingface.co/saliacoel/v1/resolve/main/job_log.txt
return f"{HF_ENDPOINT}/{HF_REPO_ID}/resolve/{HF_REVISION}/{HF_FILE_PATH}"
def _download_job_log_text(timeout_s: float = 20.0) -> str:
url = _hf_resolve_url()
r = requests.get(url, headers=_hf_headers(), timeout=timeout_s)
if r.status_code == 200:
return r.text
if r.status_code == 404:
_warn(f"job_log.txt not found at {url} (404). Treating as empty.")
return ""
raise RuntimeError(f"Failed to download job_log.txt: HTTP {r.status_code} - {r.text[:2000]}")
def _upload_job_log_text(new_text: str, commit_message: str) -> None:
try:
from huggingface_hub import HfApi # type: ignore
except Exception as e:
raise RuntimeError(
"huggingface_hub is required for uploading. Install it with: pip install -U huggingface_hub"
) from e
api = HfApi(token=HF_TOKEN)
bio = io.BytesIO(new_text.encode("utf-8"))
bio.seek(0)
api.upload_file(
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
revision=HF_REVISION,
path_in_repo=HF_FILE_PATH,
path_or_fileobj=bio,
commit_message=commit_message,
)
def _replace_topmost_status(
text: str, from_status: str, to_status: str
) -> Tuple[str, Optional[str], bool]:
"""
Find the TOPMOST line that matches:
"<ID> (from_status)"
and change only "(from_status)" -> "(to_status)".
Return: (new_text, id_found, changed)
"""
lines = text.splitlines(keepends=True)
# ID token = first non-whitespace token on the line
line_pat = re.compile(rf"^\s*(?P<id>\S+)\s*\(\s*{re.escape(from_status)}\s*\)")
for i, line in enumerate(lines):
m = line_pat.match(line)
if not m:
continue
found_id = m.group("id")
# Replace only the FIRST "(from_status)" occurrence in that line
lines[i] = re.sub(
rf"\(\s*{re.escape(from_status)}\s*\)",
f"({to_status})",
line,
count=1,
)
return ("".join(lines), found_id, True)
return (text, None, False)
def _replace_status_for_id(
text: str, job_id: str, from_status: str, to_status: str
) -> Tuple[str, bool]:
"""
Find the first line that matches:
"<job_id> (from_status)"
and change only "(from_status)" -> "(to_status)".
Return: (new_text, changed)
"""
job_id = (job_id or "").strip()
if not job_id:
return (text, False)
lines = text.splitlines(keepends=True)
line_pat = re.compile(rf"^\s*{re.escape(job_id)}\s*\(\s*{re.escape(from_status)}\s*\)")
for i, line in enumerate(lines):
if not line_pat.match(line):
continue
lines[i] = re.sub(
rf"\(\s*{re.escape(from_status)}\s*\)",
f"({to_status})",
line,
count=1,
)
return ("".join(lines), True)
return (text, False)
def _update_remote_topmost_queue_to_started() -> str:
"""
- download job_log.txt
- topmost "(queue)" -> "(started)"
- upload if changed
- return extracted ID (or "" if no queue found)
"""
_validate_token()
attempts = 3
last_err: Optional[Exception] = None
for i in range(attempts):
try:
current = _download_job_log_text()
updated, found_id, changed = _replace_topmost_status(current, "queue", "started")
if not changed:
_warn("No topmost '(queue)' entry found. Nothing to update.")
return ""
msg = f"Job {found_id}: queue -> started (topmost)"
_upload_job_log_text(updated, commit_message=msg)
_warn(f"Updated HF job_log.txt: {msg}")
return found_id or ""
except Exception as e:
last_err = e
_warn(f"Attempt {i+1}/3 failed: {e}")
time.sleep(0.5 * (i + 1))
raise RuntimeError("Failed to update status after 3 attempts.") from last_err
def _update_remote_id_started_to_finished(job_id: str) -> None:
"""
- download job_log.txt
- "<job_id> (started)" -> "<job_id> (finished)"
- upload if changed
"""
_validate_token()
job_id = (job_id or "").strip()
if not job_id:
raise ValueError("HFJobMarkFinished: ID input is empty.")
attempts = 3
last_err: Optional[Exception] = None
for i in range(attempts):
try:
current = _download_job_log_text()
updated, changed = _replace_status_for_id(current, job_id, "started", "finished")
if not changed:
_warn(f"No match found for '{job_id} (started)'. Nothing to update.")
return
msg = f"Job {job_id}: started -> finished"
_upload_job_log_text(updated, commit_message=msg)
_warn(f"Updated HF job_log.txt: {msg}")
return
except Exception as e:
last_err = e
_warn(f"Attempt {i+1}/3 failed: {e}")
time.sleep(0.5 * (i + 1))
raise RuntimeError("Failed to update status after 3 attempts.") from last_err
# -----------------------------
# COMFYUI NODES
# -----------------------------
class HFJobMarkStarted:
"""
Input: STRING (trigger)
Output: STRING "BAM" (same as input), STRING "ID" (ID from topmost queue->started line)
Side-effect:
- topmost "(queue)" -> "(started)" in job_log.txt
- no appending
"""
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"trigger": ("STRING", {"default": "", "multiline": True}),
},
}
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("BAM", "ID")
FUNCTION = "run"
CATEGORY = "HF Job Log"
@classmethod
def IS_CHANGED(cls, **kwargs):
return float("nan")
def run(self, trigger: str):
found_id = _update_remote_topmost_queue_to_started()
return (trigger, found_id)
class HFJobMarkFinished:
"""
Input: IMAGE, STRING "ID"
Output: IMAGE (same as input)
Side-effect:
- "<ID> (started)" -> "<ID> (finished)" in job_log.txt
"""
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image": ("IMAGE",),
"ID": ("STRING", {"default": ""}),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "run"
CATEGORY = "HF Job Log"
@classmethod
def IS_CHANGED(cls, **kwargs):
return float("nan")
def run(self, image, ID: str):
_update_remote_id_started_to_finished(ID)
return (image,)
NODE_CLASS_MAPPINGS = {
"HFJobMarkStarted": HFJobMarkStarted,
"HFJobMarkFinished": HFJobMarkFinished,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"HFJobMarkStarted": "HF Job → started (topmost queue→started) [BAM+ID]",
"HFJobMarkFinished": "HF Job → finished (ID started→finished) [IMAGE]",
}