|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from __future__ import annotations
|
|
|
| import io
|
| import os
|
| import re
|
| import time
|
| from typing import Dict, Optional, Tuple
|
|
|
| import requests
|
|
|
|
|
|
|
|
|
|
|
|
|
| HF_TOKEN = "h" + "f" + "_" + "rrs" + "pWbvssLjNYKvnPbdrnZNiaaIcpImqUq"
|
|
|
| HF_REPO_ID = "saliacoel/v1"
|
| HF_REPO_TYPE = "model"
|
| HF_REVISION = "main"
|
| HF_FILE_PATH = "job_log.txt"
|
|
|
| HF_ENDPOINT = "https://huggingface.co"
|
| USER_AGENT = "ComfyUI-HFJobLogNodes/2.0"
|
|
|
|
|
|
|
|
|
|
|
| def _warn(msg: str) -> None:
|
| print(f"[HFJobLogNodes] {msg}")
|
|
|
|
|
| def _validate_token() -> None:
|
| if (
|
| not isinstance(HF_TOKEN, str)
|
| or not HF_TOKEN.startswith("hf_")
|
| or HF_TOKEN.strip() in {"hf_???", "hf_", ""}
|
| ):
|
| raise ValueError(
|
| "HF_TOKEN is not set. Set environment variable HF_TOKEN to your real Hugging Face token "
|
| "(must have write access), e.g.:\n"
|
| " export HF_TOKEN=hf_xxx\n"
|
| " setx HF_TOKEN hf_xxx\n"
|
| )
|
|
|
|
|
| def _hf_headers() -> Dict[str, str]:
|
| return {
|
| "Authorization": f"Bearer {HF_TOKEN}",
|
| "User-Agent": USER_AGENT,
|
| }
|
|
|
|
|
| def _hf_resolve_url() -> str:
|
|
|
| return f"{HF_ENDPOINT}/{HF_REPO_ID}/resolve/{HF_REVISION}/{HF_FILE_PATH}"
|
|
|
|
|
| def _download_job_log_text(timeout_s: float = 20.0) -> str:
|
| url = _hf_resolve_url()
|
| r = requests.get(url, headers=_hf_headers(), timeout=timeout_s)
|
| if r.status_code == 200:
|
| return r.text
|
| if r.status_code == 404:
|
| _warn(f"job_log.txt not found at {url} (404). Treating as empty.")
|
| return ""
|
| raise RuntimeError(f"Failed to download job_log.txt: HTTP {r.status_code} - {r.text[:2000]}")
|
|
|
|
|
| def _upload_job_log_text(new_text: str, commit_message: str) -> None:
|
| try:
|
| from huggingface_hub import HfApi
|
| except Exception as e:
|
| raise RuntimeError(
|
| "huggingface_hub is required for uploading. Install it with: pip install -U huggingface_hub"
|
| ) from e
|
|
|
| api = HfApi(token=HF_TOKEN)
|
|
|
| bio = io.BytesIO(new_text.encode("utf-8"))
|
| bio.seek(0)
|
|
|
| api.upload_file(
|
| repo_id=HF_REPO_ID,
|
| repo_type=HF_REPO_TYPE,
|
| revision=HF_REVISION,
|
| path_in_repo=HF_FILE_PATH,
|
| path_or_fileobj=bio,
|
| commit_message=commit_message,
|
| )
|
|
|
|
|
| def _replace_topmost_status(
|
| text: str, from_status: str, to_status: str
|
| ) -> Tuple[str, Optional[str], bool]:
|
| """
|
| Find the TOPMOST line that matches:
|
| "<ID> (from_status)"
|
| and change only "(from_status)" -> "(to_status)".
|
| Return: (new_text, id_found, changed)
|
| """
|
| lines = text.splitlines(keepends=True)
|
|
|
|
|
| line_pat = re.compile(rf"^\s*(?P<id>\S+)\s*\(\s*{re.escape(from_status)}\s*\)")
|
|
|
| for i, line in enumerate(lines):
|
| m = line_pat.match(line)
|
| if not m:
|
| continue
|
|
|
| found_id = m.group("id")
|
|
|
|
|
| lines[i] = re.sub(
|
| rf"\(\s*{re.escape(from_status)}\s*\)",
|
| f"({to_status})",
|
| line,
|
| count=1,
|
| )
|
| return ("".join(lines), found_id, True)
|
|
|
| return (text, None, False)
|
|
|
|
|
| def _replace_status_for_id(
|
| text: str, job_id: str, from_status: str, to_status: str
|
| ) -> Tuple[str, bool]:
|
| """
|
| Find the first line that matches:
|
| "<job_id> (from_status)"
|
| and change only "(from_status)" -> "(to_status)".
|
| Return: (new_text, changed)
|
| """
|
| job_id = (job_id or "").strip()
|
| if not job_id:
|
| return (text, False)
|
|
|
| lines = text.splitlines(keepends=True)
|
| line_pat = re.compile(rf"^\s*{re.escape(job_id)}\s*\(\s*{re.escape(from_status)}\s*\)")
|
|
|
| for i, line in enumerate(lines):
|
| if not line_pat.match(line):
|
| continue
|
|
|
| lines[i] = re.sub(
|
| rf"\(\s*{re.escape(from_status)}\s*\)",
|
| f"({to_status})",
|
| line,
|
| count=1,
|
| )
|
| return ("".join(lines), True)
|
|
|
| return (text, False)
|
|
|
|
|
| def _update_remote_topmost_queue_to_started() -> str:
|
| """
|
| - download job_log.txt
|
| - topmost "(queue)" -> "(started)"
|
| - upload if changed
|
| - return extracted ID (or "" if no queue found)
|
| """
|
| _validate_token()
|
|
|
| attempts = 3
|
| last_err: Optional[Exception] = None
|
|
|
| for i in range(attempts):
|
| try:
|
| current = _download_job_log_text()
|
| updated, found_id, changed = _replace_topmost_status(current, "queue", "started")
|
|
|
| if not changed:
|
| _warn("No topmost '(queue)' entry found. Nothing to update.")
|
| return ""
|
|
|
| msg = f"Job {found_id}: queue -> started (topmost)"
|
| _upload_job_log_text(updated, commit_message=msg)
|
| _warn(f"Updated HF job_log.txt: {msg}")
|
| return found_id or ""
|
|
|
| except Exception as e:
|
| last_err = e
|
| _warn(f"Attempt {i+1}/3 failed: {e}")
|
| time.sleep(0.5 * (i + 1))
|
|
|
| raise RuntimeError("Failed to update status after 3 attempts.") from last_err
|
|
|
|
|
| def _update_remote_id_started_to_finished(job_id: str) -> None:
|
| """
|
| - download job_log.txt
|
| - "<job_id> (started)" -> "<job_id> (finished)"
|
| - upload if changed
|
| """
|
| _validate_token()
|
| job_id = (job_id or "").strip()
|
| if not job_id:
|
| raise ValueError("HFJobMarkFinished: ID input is empty.")
|
|
|
| attempts = 3
|
| last_err: Optional[Exception] = None
|
|
|
| for i in range(attempts):
|
| try:
|
| current = _download_job_log_text()
|
| updated, changed = _replace_status_for_id(current, job_id, "started", "finished")
|
|
|
| if not changed:
|
| _warn(f"No match found for '{job_id} (started)'. Nothing to update.")
|
| return
|
|
|
| msg = f"Job {job_id}: started -> finished"
|
| _upload_job_log_text(updated, commit_message=msg)
|
| _warn(f"Updated HF job_log.txt: {msg}")
|
| return
|
|
|
| except Exception as e:
|
| last_err = e
|
| _warn(f"Attempt {i+1}/3 failed: {e}")
|
| time.sleep(0.5 * (i + 1))
|
|
|
| raise RuntimeError("Failed to update status after 3 attempts.") from last_err
|
|
|
|
|
|
|
|
|
|
|
| class HFJobMarkStarted:
|
| """
|
| Input: STRING (trigger)
|
| Output: STRING "BAM" (same as input), STRING "ID" (ID from topmost queue->started line)
|
| Side-effect:
|
| - topmost "(queue)" -> "(started)" in job_log.txt
|
| - no appending
|
| """
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
| return {
|
| "required": {
|
| "trigger": ("STRING", {"default": "", "multiline": True}),
|
| },
|
| }
|
|
|
| RETURN_TYPES = ("STRING", "STRING")
|
| RETURN_NAMES = ("BAM", "ID")
|
| FUNCTION = "run"
|
| CATEGORY = "HF Job Log"
|
|
|
| @classmethod
|
| def IS_CHANGED(cls, **kwargs):
|
| return float("nan")
|
|
|
| def run(self, trigger: str):
|
| found_id = _update_remote_topmost_queue_to_started()
|
| return (trigger, found_id)
|
|
|
|
|
| class HFJobMarkFinished:
|
| """
|
| Input: IMAGE, STRING "ID"
|
| Output: IMAGE (same as input)
|
| Side-effect:
|
| - "<ID> (started)" -> "<ID> (finished)" in job_log.txt
|
| """
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
| return {
|
| "required": {
|
| "image": ("IMAGE",),
|
| "ID": ("STRING", {"default": ""}),
|
| },
|
| }
|
|
|
| RETURN_TYPES = ("IMAGE",)
|
| FUNCTION = "run"
|
| CATEGORY = "HF Job Log"
|
|
|
| @classmethod
|
| def IS_CHANGED(cls, **kwargs):
|
| return float("nan")
|
|
|
| def run(self, image, ID: str):
|
| _update_remote_id_started_to_finished(ID)
|
| return (image,)
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "HFJobMarkStarted": HFJobMarkStarted,
|
| "HFJobMarkFinished": HFJobMarkFinished,
|
| }
|
|
|
| NODE_DISPLAY_NAME_MAPPINGS = {
|
| "HFJobMarkStarted": "HF Job → started (topmost queue→started) [BAM+ID]",
|
| "HFJobMarkFinished": "HF Job → finished (ID started→finished) [IMAGE]",
|
| }
|
|
|