| import os
|
| import json
|
| import re
|
| import uuid
|
| import ast
|
| import time
|
| from dataclasses import dataclass, field
|
| from typing import Any, Dict, List, Optional, Literal, Tuple
|
|
|
| import nodes
|
| from server import PromptServer
|
|
|
|
|
|
|
| _WORKFLOW_JSON_PATH = "/ComfyUI//custom_nodes/comfyui-salia_online/assets/workflow_1.json"
|
|
|
|
|
| MARKER_PREFIX = "{{{VAR=="
|
| MARKER_SUFFIX = "==/VAR}}}"
|
| VAR_ID_RE = re.compile(r"^(INT|FLOAT|STR)_(\d+)$", re.IGNORECASE)
|
|
|
|
|
| @dataclass
|
| class VarOccurrence:
|
| node_id: str
|
| input_key: str
|
|
|
|
|
| @dataclass
|
| class VarSpec:
|
| var_id: str
|
| var_type: Literal["INT", "FLOAT", "STR"]
|
| min_val: Optional[float] = None
|
| max_val: Optional[float] = None
|
| max_len: Optional[int] = None
|
| occurrences: List[VarOccurrence] = field(default_factory=list)
|
| default_value: Any = None
|
|
|
|
|
| def _load_prompt_json_fixed() -> Dict[str, Any]:
|
| """
|
| Loads workflow JSON from the fixed path.
|
|
|
| Accepts either:
|
| { ...prompt dict... }
|
| or:
|
| {"prompt": { ...prompt dict... }}
|
| """
|
| with open(_WORKFLOW_JSON_PATH, "r", encoding="utf-8") as f:
|
| data = json.load(f)
|
|
|
|
|
| if isinstance(data, dict) and "prompt" in data and isinstance(data["prompt"], dict):
|
| data = data["prompt"]
|
|
|
| if not isinstance(data, dict):
|
| raise ValueError("Workflow JSON must be a dict mapping node_id -> node_info (ComfyUI prompt format).")
|
|
|
| return data
|
|
|
|
|
| def _extract_marker_text(
|
| text: str,
|
| *,
|
| allow_empty_actual_key: bool = False
|
| ) -> Optional[Tuple[str, str, Optional[float], Optional[float], Optional[int], str]]:
|
| """
|
| Returns (var_id, var_type, min_val, max_val, max_len, actual_key) if text starts with a marker.
|
|
|
| For input-key markers, actual_key is required, e.g.:
|
| "{{{VAR==STR_3, MAXLEN==80==/VAR}}}value"
|
|
|
| For meta-title markers, actual_key may be empty, e.g.:
|
| "{{{VAR==STR_3, MAXLEN==80==/VAR}}}"
|
| """
|
| if not isinstance(text, str):
|
| return None
|
| if not text.startswith(MARKER_PREFIX):
|
| return None
|
|
|
| end = text.find(MARKER_SUFFIX)
|
| if end == -1:
|
| return None
|
|
|
| inner = text[len(MARKER_PREFIX):end]
|
| actual_key = text[end + len(MARKER_SUFFIX):]
|
|
|
| if not actual_key and not allow_empty_actual_key:
|
| raise ValueError("Marker key missing actual input name after marker (e.g. ...}}}seed).")
|
|
|
| parts = [p.strip() for p in inner.split(",") if p.strip()]
|
| if not parts:
|
| raise ValueError("Empty VAR marker.")
|
|
|
| var_id = parts[0].strip().upper()
|
| m = VAR_ID_RE.match(var_id)
|
| if not m:
|
| raise ValueError(f"Invalid VAR id '{var_id}'. Use INT_1 / STR_2 / FLOAT_3 ...")
|
|
|
| var_type = m.group(1).upper()
|
|
|
| constraints: Dict[str, str] = {}
|
| for p in parts[1:]:
|
| if "==" not in p:
|
| raise ValueError(f"Invalid constraint '{p}' in marker for {var_id}. Use KEY==VALUE.")
|
| k, v = p.split("==", 1)
|
| constraints[k.strip().upper()] = v.strip()
|
|
|
| min_val = max_val = None
|
| max_len = None
|
|
|
| if var_type in ("INT", "FLOAT"):
|
|
|
| if "MIN" not in constraints or "MAX" not in constraints:
|
| raise ValueError(f"{var_id} missing MIN==... and/or MAX==... in marker.")
|
| try:
|
| min_val = float(constraints["MIN"])
|
| max_val = float(constraints["MAX"])
|
| except Exception:
|
| raise ValueError(f"{var_id} has non-numeric MIN/MAX in marker.")
|
| if min_val > max_val:
|
| raise ValueError(f"{var_id} has MIN > MAX in marker.")
|
| else:
|
|
|
| for k in ("MAXLEN", "MAX_CHARS", "MAXCHARS", "MAX"):
|
| if k in constraints:
|
| try:
|
| max_len = int(constraints[k])
|
| except Exception:
|
| raise ValueError(f"{var_id} has non-integer {k} in marker.")
|
| break
|
| if max_len is None:
|
| raise ValueError(f"{var_id} missing string max length (MAXLEN==... or MAX==...) in marker.")
|
| if max_len < 0:
|
| raise ValueError(f"{var_id} has negative max length in marker.")
|
|
|
| return (var_id, var_type, min_val, max_val, max_len, actual_key)
|
|
|
|
|
| def _extract_marker(key: str) -> Optional[Tuple[str, str, Optional[float], Optional[float], Optional[int], str]]:
|
| """
|
| Marker extraction for INPUT KEYS (requires an actual key after the marker).
|
| """
|
| return _extract_marker_text(key, allow_empty_actual_key=False)
|
|
|
|
|
| def _default_target_input_key(inputs: Dict[str, Any]) -> str:
|
| """
|
| For meta-title markers (marker has no trailing 'actual_key'), choose which input key to override.
|
|
|
| Rule:
|
| 1) If "value" exists in inputs -> use it (matches your example and common ComfyUI string/int/float nodes).
|
| 2) Else if inputs has exactly one key -> use it.
|
| 3) Else error (ambiguous).
|
| """
|
| if "value" in inputs:
|
| return "value"
|
| if len(inputs) == 1:
|
| return next(iter(inputs.keys()))
|
| raise ValueError(
|
| "Meta marker found in _meta.title but cannot infer which input to override. "
|
| "Add the marker to the input key instead ({{{...}}}value), or ensure the node has a 'value' input "
|
| "or only a single input."
|
| )
|
|
|
|
|
| def _collect_and_strip_markers(prompt: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, VarSpec]]:
|
| """
|
| Collect markers from:
|
| A) input keys:
|
| "{{{VAR==INT_1,...==/VAR}}}seed" becomes "seed" in-memory
|
| B) _meta.title:
|
| "_meta": {"title": "{{{VAR==STR_3,...==/VAR}}}"} (title is NOT modified)
|
| will bind STR_3 to an inferred input key (usually "value").
|
| """
|
| specs: Dict[str, VarSpec] = {}
|
|
|
| for node_id, node in prompt.items():
|
| if not isinstance(node, dict):
|
| continue
|
|
|
| inputs = node.get("inputs")
|
| if not isinstance(inputs, dict):
|
| continue
|
|
|
|
|
| new_inputs: Dict[str, Any] = {}
|
| for k, v in inputs.items():
|
| marker = _extract_marker(k)
|
| if marker is None:
|
| new_inputs[k] = v
|
| continue
|
|
|
| var_id, var_type, min_val, max_val, max_len, actual_key = marker
|
| if actual_key in new_inputs:
|
| raise ValueError(f"Conflict in node {node_id}: input '{actual_key}' already exists.")
|
|
|
|
|
| new_inputs[actual_key] = v
|
|
|
| spec = specs.get(var_id)
|
| if spec is None:
|
| spec = VarSpec(
|
| var_id=var_id,
|
| var_type=var_type,
|
| min_val=min_val,
|
| max_val=max_val,
|
| max_len=max_len,
|
| default_value=v,
|
| )
|
| specs[var_id] = spec
|
| else:
|
| if (
|
| spec.var_type != var_type
|
| or spec.min_val != min_val
|
| or spec.max_val != max_val
|
| or spec.max_len != max_len
|
| ):
|
| raise ValueError(f"Inconsistent constraints for {var_id} across markers.")
|
|
|
| spec.occurrences.append(VarOccurrence(node_id=str(node_id), input_key=actual_key))
|
|
|
| node["inputs"] = new_inputs
|
|
|
|
|
| meta = node.get("_meta")
|
| if isinstance(meta, dict):
|
| title = meta.get("title")
|
| marker2 = _extract_marker_text(title, allow_empty_actual_key=True) if isinstance(title, str) else None
|
| if marker2 is not None:
|
| var_id, var_type, min_val, max_val, max_len, actual_key = marker2
|
|
|
|
|
| target_key = actual_key.strip() if isinstance(actual_key, str) else ""
|
| if not target_key:
|
| target_key = _default_target_input_key(node["inputs"])
|
|
|
| if target_key not in node["inputs"]:
|
| raise ValueError(
|
| f"Meta marker in node {node_id} points to input '{target_key}', but that input does not exist."
|
| )
|
|
|
| default_val = node["inputs"].get(target_key)
|
|
|
| spec = specs.get(var_id)
|
| if spec is None:
|
| spec = VarSpec(
|
| var_id=var_id,
|
| var_type=var_type,
|
| min_val=min_val,
|
| max_val=max_val,
|
| max_len=max_len,
|
| default_value=default_val,
|
| )
|
| specs[var_id] = spec
|
| else:
|
| if (
|
| spec.var_type != var_type
|
| or spec.min_val != min_val
|
| or spec.max_val != max_val
|
| or spec.max_len != max_len
|
| ):
|
| raise ValueError(f"Inconsistent constraints for {var_id} across markers.")
|
|
|
|
|
| already = any(o.node_id == str(node_id) and o.input_key == target_key for o in spec.occurrences)
|
| if not already:
|
| spec.occurrences.append(VarOccurrence(node_id=str(node_id), input_key=target_key))
|
|
|
| return prompt, specs
|
|
|
|
|
|
|
|
|
| def _split_command_entries(command: str) -> List[str]:
|
| s = (command or "").strip()
|
| if not s:
|
| return []
|
|
|
| entries: List[str] = []
|
| buf: List[str] = []
|
| quote: Optional[str] = None
|
| escape = False
|
|
|
| for ch in s:
|
| if escape:
|
| buf.append(ch)
|
| escape = False
|
| continue
|
|
|
| if ch == "\\":
|
| buf.append(ch)
|
| escape = True
|
| continue
|
|
|
| if quote is not None:
|
| buf.append(ch)
|
| if ch == quote:
|
| quote = None
|
| continue
|
|
|
| if ch in ("'", '"'):
|
| buf.append(ch)
|
| quote = ch
|
| continue
|
|
|
| if ch in (" ", "\t", "\n", "\r", ",", ";"):
|
| token = "".join(buf).strip()
|
| if token:
|
| entries.append(token)
|
| buf = []
|
| continue
|
|
|
| buf.append(ch)
|
|
|
| token = "".join(buf).strip()
|
| if token:
|
| entries.append(token)
|
|
|
| return entries
|
|
|
|
|
| def _parse_command(command: str) -> Dict[str, Any]:
|
| out: Dict[str, Any] = {}
|
|
|
| for entry in _split_command_entries(command):
|
| if "==" not in entry:
|
| raise ValueError(f"Bad command entry '{entry}'. Expected VAR==VALUE.")
|
|
|
| var, raw = entry.split("==", 1)
|
| var = var.strip().upper()
|
| raw = raw.strip()
|
|
|
| if not VAR_ID_RE.match(var):
|
| raise ValueError(f"Bad variable name '{var}'. Use INT_1 / STR_2 / FLOAT_3 ...")
|
|
|
| if raw == "":
|
| raise ValueError(f"Missing value for {var}.")
|
|
|
| if raw[0] in ("'", '"'):
|
| if len(raw) < 2 or raw[-1] != raw[0]:
|
| raise ValueError(f"Unterminated quoted string for {var}.")
|
| try:
|
| val = ast.literal_eval(raw)
|
| except Exception as e:
|
| raise ValueError(f"Invalid quoted string for {var}: {e}")
|
| else:
|
| val = raw
|
|
|
| if var in out:
|
| raise ValueError(f"Duplicate assignment for {var}.")
|
| out[var] = val
|
|
|
| return out
|
|
|
|
|
| def _convert_and_validate(var_id: str, spec: VarSpec, raw_val: Any) -> Any:
|
| if spec.var_type == "INT":
|
| try:
|
| val = int(str(raw_val).strip())
|
| except Exception:
|
| raise ValueError(f"{var_id} must be an integer.")
|
| if spec.min_val is not None and val < spec.min_val:
|
| raise ValueError(f"{var_id} out of range: {val} < MIN {int(spec.min_val)}")
|
| if spec.max_val is not None and val > spec.max_val:
|
| raise ValueError(f"{var_id} out of range: {val} > MAX {int(spec.max_val)}")
|
| return val
|
|
|
| if spec.var_type == "FLOAT":
|
| try:
|
| val = float(str(raw_val).strip())
|
| except Exception:
|
| raise ValueError(f"{var_id} must be a float.")
|
| if spec.min_val is not None and val < spec.min_val:
|
| raise ValueError(f"{var_id} out of range: {val} < MIN {spec.min_val}")
|
| if spec.max_val is not None and val > spec.max_val:
|
| raise ValueError(f"{var_id} out of range: {val} > MAX {spec.max_val}")
|
| return val
|
|
|
| if spec.var_type == "STR":
|
| val = str(raw_val)
|
| if spec.max_len is not None and len(val) > spec.max_len:
|
| raise ValueError(f"{var_id} too long: length {len(val)} > MAXLEN {spec.max_len}")
|
| return val
|
|
|
| raise ValueError(f"Unsupported var type for {var_id}: {spec.var_type}")
|
|
|
|
|
| def _apply_assignments(prompt: Dict[str, Any], specs: Dict[str, VarSpec], assigns: Dict[str, Any]) -> None:
|
| for var in assigns:
|
| if var not in specs:
|
| raise ValueError(f"Command references {var} but no matching marker exists in the workflow JSON.")
|
|
|
| for var_id, spec in specs.items():
|
| raw_val = assigns.get(var_id, spec.default_value)
|
| val = _convert_and_validate(var_id, spec, raw_val)
|
|
|
| for occ in spec.occurrences:
|
| prompt[occ.node_id]["inputs"][occ.input_key] = val
|
|
|
|
|
| def _infer_outputs_to_execute(prompt: Dict[str, Any]) -> List[str]:
|
| outputs: List[str] = []
|
| for node_id, node in prompt.items():
|
| if not isinstance(node, dict) or "class_type" not in node:
|
| continue
|
| class_type = node["class_type"]
|
| cls = nodes.NODE_CLASS_MAPPINGS.get(class_type)
|
| if cls is None:
|
| raise ValueError(f"Unknown node class_type '{class_type}' (node {node_id}).")
|
| if getattr(cls, "OUTPUT_NODE", False) is True:
|
| outputs.append(str(node_id))
|
| if not outputs:
|
| raise ValueError("Loaded workflow has no OUTPUT_NODE nodes (e.g. SaveImage/Preview/etc).")
|
| return outputs
|
|
|
|
|
| def _queue_prompt(prompt: Dict[str, Any], outputs_to_execute: List[str]) -> Tuple[str, float]:
|
| ps = PromptServer.instance
|
| prompt_id = str(uuid.uuid4())
|
|
|
| if hasattr(ps, "number"):
|
| number = float(ps.number)
|
| ps.number += 1
|
| else:
|
| number = float(time.time() * 1000.0)
|
|
|
| extra_data: Dict[str, Any] = {}
|
| extra_data["create_time"] = int(time.time() * 1000)
|
|
|
| sensitive: Dict[str, Any] = {}
|
|
|
| ps.prompt_queue.put((number, prompt_id, prompt, extra_data, outputs_to_execute, sensitive))
|
| return prompt_id, number
|
|
|
|
|
| def _float_is_no_input(v: Any) -> bool:
|
| """
|
| Special float sentinel rule:
|
| If v is in [-2.1, -1.9], treat it as "not provided".
|
| """
|
| try:
|
| f = float(v)
|
| except Exception:
|
| return False
|
| return -2.1 <= f <= -1.9
|
|
|
|
|
| class JSONRUNNER_X:
|
| """
|
| Loads the workflow JSON from a fixed path,
|
| strips {{{VAR==...==/VAR}}} markers from INPUT KEYS,
|
| ALSO supports marker stored in _meta.title (your requested behavior),
|
| applies overrides from separate typed inputs,
|
| and queues it like normal /prompt.
|
|
|
| Sentinel rules:
|
| STR_* == "" -> ignore (act like not provided)
|
| INT_* == -1 -> ignore
|
| FLOAT_* in [-2.1, -1.9] -> ignore
|
| """
|
|
|
| OUTPUT_NODE = True
|
| CATEGORY = "utils/workflow"
|
|
|
| RETURN_TYPES = ("STRING",)
|
| RETURN_NAMES = ("status",)
|
| FUNCTION = "run"
|
|
|
| @classmethod
|
| def INPUT_TYPES(cls):
|
|
|
|
|
| JS_SAFE_INT_MAX = 9007199254740991
|
|
|
| return {
|
| "required": {
|
|
|
| "STR_1": ("STRING", {"multiline": False, "default": ""}),
|
| "STR_2": ("STRING", {"multiline": False, "default": ""}),
|
| "STR_3": ("STRING", {"multiline": False, "default": ""}),
|
| "STR_4": ("STRING", {"multiline": False, "default": ""}),
|
| "STR_5": ("STRING", {"multiline": False, "default": ""}),
|
| "STR_6": ("STRING", {"multiline": False, "default": ""}),
|
| "STR_7": ("STRING", {"multiline": False, "default": ""}),
|
|
|
|
|
| "INT_1": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| "INT_2": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| "INT_3": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| "INT_4": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| "INT_5": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
|
|
|
|
| "FLOAT_1": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| "FLOAT_2": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| "FLOAT_3": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| "FLOAT_4": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| "FLOAT_5": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| }
|
| }
|
|
|
| @classmethod
|
| def IS_CHANGED(cls, *args, **kwargs):
|
|
|
| return uuid.uuid4().hex
|
|
|
| def run(
|
| self,
|
| STR_1: str = "",
|
| STR_2: str = "",
|
| STR_3: str = "",
|
| STR_4: str = "",
|
| STR_5: str = "",
|
| STR_6: str = "",
|
| STR_7: str = "",
|
| INT_1: int = -1,
|
| INT_2: int = -1,
|
| INT_3: int = -1,
|
| INT_4: int = -1,
|
| INT_5: int = -1,
|
| FLOAT_1: float = -2.0,
|
| FLOAT_2: float = -2.0,
|
| FLOAT_3: float = -2.0,
|
| FLOAT_4: float = -2.0,
|
| FLOAT_5: float = -2.0,
|
| ):
|
| try:
|
|
|
| prompt = _load_prompt_json_fixed()
|
|
|
|
|
|
|
| prompt, specs = _collect_and_strip_markers(prompt)
|
|
|
|
|
| assigns: Dict[str, Any] = {}
|
|
|
|
|
| str_vals = [STR_1, STR_2, STR_3, STR_4, STR_5, STR_6, STR_7]
|
| for i, v in enumerate(str_vals, start=1):
|
| if isinstance(v, str) and v == "":
|
| continue
|
| assigns[f"STR_{i}"] = v
|
|
|
|
|
| int_vals = [INT_1, INT_2, INT_3, INT_4, INT_5]
|
| for i, v in enumerate(int_vals, start=1):
|
| if v == -1:
|
| continue
|
| assigns[f"INT_{i}"] = v
|
|
|
|
|
| float_vals = [FLOAT_1, FLOAT_2, FLOAT_3, FLOAT_4, FLOAT_5]
|
| for i, v in enumerate(float_vals, start=1):
|
| if _float_is_no_input(v):
|
| continue
|
| assigns[f"FLOAT_{i}"] = v
|
|
|
|
|
| _apply_assignments(prompt, specs, assigns)
|
|
|
|
|
| outputs_to_execute = _infer_outputs_to_execute(prompt)
|
|
|
|
|
| prompt_id, number = _queue_prompt(prompt, outputs_to_execute)
|
|
|
| return (f"Queued workflow as prompt_id={prompt_id} (number={number})",)
|
|
|
| except Exception as e:
|
|
|
| return (f"ERROR: {e}",)
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "JSONRUNNER_X": JSONRUNNER_X,
|
| }
|
|
|
| NODE_DISPLAY_NAME_MAPPINGS = {
|
| "JSONRUNNER_X": "JSONRUNNER_X",
|
| }
|
|
|