| import requests |
| import json |
| import time |
| import argparse |
| import os |
| import re |
|
|
| def check_comfyui_started(url): |
| """ |
| Checks if ComfyUI is running at the given URL. |
| """ |
| try: |
| response = requests.get(url) |
| return response.status_code == 200 |
| except requests.ConnectionError: |
| return False |
|
|
| def load_and_update_prompt(file_path, width, height, length, batch_size, shift_amount, guidance_amount, steps_amount, denoise_amount, frame_rate, prompt): |
| """ |
| Loads the prompt payload from a JSON file and updates the specified fields. |
| |
| Args: |
| file_path (str): Path to the prompt-payload.json file. |
| width (int): Width to set in the prompt. |
| height (int): Height to set in the prompt. |
| length (int): Length (frames) to set in the prompt. |
| batch_size (int): Batch size to set in the prompt. |
| shift_amount (float): Shift amount to set in the prompt. |
| guidance_amount (float): Guidance amount to set in the prompt. |
| steps_amount (int): Steps amount to set in the prompt. |
| denoise_amount (float): Denoise amount to set in the prompt. |
| frame_rate (int): Frame rate to set in the prompt. |
| prompt (str): Prompt text to set in the prompt. |
| |
| Returns: |
| dict: Updated prompt payload as a dictionary. |
| """ |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| payload = json.load(f) |
| except Exception as e: |
| print(f"Error loading JSON file: {e}") |
| return None |
|
|
| |
| payload['prompt']['232']['inputs']['width'] = width |
| payload['prompt']['232']['inputs']['height'] = height |
|
|
| |
| payload['prompt']['295']['inputs']['int'] = length |
|
|
| |
| payload['prompt']['232']['inputs']['batch_size'] = batch_size |
|
|
| |
| payload['prompt']['67']['inputs']['shift'] = shift_amount |
|
|
| |
| payload['prompt']['26']['inputs']['guidance'] = guidance_amount |
|
|
| |
| payload['prompt']['17']['inputs']['steps'] = steps_amount |
|
|
| |
| payload['prompt']['17']['inputs']['denoise'] = denoise_amount |
|
|
| |
| payload['prompt']['82']['inputs']['frame_rate'] = frame_rate |
|
|
| |
| payload['prompt']['44']['inputs']['text'] = prompt |
|
|
| return payload |
|
|
| def post_prompt(url, payload): |
| """ |
| Posts the prompt payload to the ComfyUI API. |
| |
| Args: |
| url (str): ComfyUI API endpoint URL. |
| payload (dict): Prompt payload as a dictionary. |
| |
| Returns: |
| str: Prompt ID if successful, None otherwise. |
| """ |
| try: |
| response = requests.post(url, json=payload) |
| response.raise_for_status() |
| return response.json().get('prompt_id') |
| except requests.exceptions.RequestException as e: |
| print(f"Error posting prompt: {e}") |
| return None |
|
|
| def get_queue_progress(url): |
| """ |
| Retrieves the queue progress from the ComfyUI API. |
| |
| Args: |
| url (str): ComfyUI API queue endpoint URL. |
| |
| Returns: |
| tuple: (queue_pending, queue_running) where queue_pending and queue_running are lists, |
| or (None, None) if an error occurred. |
| """ |
| try: |
| response = requests.get(url) |
| response.raise_for_status() |
| queue_data = response.json() |
| queue_pending = queue_data.get('queue_pending', []) |
| queue_running = queue_data.get('queue_running', []) |
| return queue_pending, queue_running |
| except requests.exceptions.RequestException as e: |
| print(f"Error getting queue progress: {e}") |
| return None, None |
|
|
| def get_history(url, prompt_id): |
| """ |
| Retrieves the history from the ComfyUI API. |
| |
| Args: |
| url (str): ComfyUI API history endpoint URL. |
| prompt_id (str): Prompt ID to look for in the history. |
| |
| Returns: |
| tuple: (status, video_path) where status is a string ("success" or "failure") and video_path is the path to the video, |
| or (None, None) if an error occurred. |
| """ |
| try: |
| response = requests.get(url) |
| response.raise_for_status() |
| history_data = response.json() |
|
|
| if history_data: |
| |
| for history_item_id, history_item in history_data.items(): |
| if history_item_id == prompt_id: |
| status = history_item['status']['status_str'] |
| completed = history_item['status']['completed'] |
|
|
| if status == "success" and completed: |
| try: |
| for output in history_item['outputs'].values(): |
| for gif in output.get('gifs', []): |
| video_path = gif['fullpath'] |
| return "success", video_path |
| except (KeyError, TypeError): |
| print("Video path not found in history data.") |
| return "failure", None |
| else: |
| print("Video generation failed according to history data.") |
| return "failure", None |
| print("Prompt ID not found in history data.") |
| return "failure", None |
| else: |
| print("No history data found.") |
| return "failure", None |
|
|
| except requests.exceptions.RequestException as e: |
| print(f"Error getting history: {e}") |
| return None, None |
|
|
| def extract_video_number(filename): |
| """ |
| Extracts the video number from the filename using a regular expression. |
| """ |
| match = re.search(r'_(\d+)\.mp4', filename) |
| if match: |
| return match.group(1) |
| return None |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Run ComfyUI with specified parameters.") |
| parser.add_argument("--width", type=int, default=320, help="Width of the generated image.") |
| parser.add_argument("--height", type=int, default=480, help="Height of the generated image.") |
| parser.add_argument("--length", type=int, default=45, help="Length (number of frames) of the video.") |
| parser.add_argument("--batchSize", type=int, default=1, help="Batch size for video generation.") |
| parser.add_argument("--shiftAmount", type=float, default=9.0, help="Shift amount for ModelSamplingSD3.") |
| parser.add_argument("--guidanceAmount", type=float, default=7.5, help="Guidance amount for FluxGuidance.") |
| parser.add_argument("--stepsAmount", type=int, default=12, help="Steps amount for BasicScheduler.") |
| parser.add_argument("--denoiseAmount", type=float, default=1.0, help="Denoise amount for BasicScheduler.") |
| parser.add_argument("--frameRate", type=int, default=24, help="Frame rate for video combine.") |
| parser.add_argument("--prompt", type=str, required=True, help="Text prompt for video generation.") |
| parser.add_argument("--upscale", type=bool, default=False, help="Whether to use upscale or not.") |
| parser.add_argument("--comfyui_url", type=str, default="http://localhost:8188", help="ComfyUI base URL.") |
|
|
| args = parser.parse_args() |
|
|
| API_URL = f"{args.comfyui_url}/api/prompt" |
| QUEUE_URL = f"{args.comfyui_url}/api/queue" |
| HISTORY_URL = f"{args.comfyui_url}/api/history?max_items=1" |
|
|
| if args.upscale: |
| prompt_file = "prompt-payload.json" |
| filename_prefix = "Hunyuan_upscaled" |
| else: |
| prompt_file = "prompt-payload-no-upscale.json" |
| filename_prefix = "Hunyuan_raw" |
|
|
| |
| print("Checking if ComfyUI is running...") |
| while not check_comfyui_started(args.comfyui_url): |
| print("ComfyUI not yet running. Waiting...") |
| time.sleep(5) |
| print("ComfyUI is running.") |
|
|
| |
| print("Loading and updating prompt payload...") |
| prompt_file_path = os.path.join(os.path.dirname(__file__), prompt_file) |
| try: |
| prompt_payload = load_and_update_prompt(prompt_file_path, args.width, args.height, args.length, args.batchSize, args.shiftAmount, args.guidanceAmount, args.stepsAmount, args.denoiseAmount, args.frameRate, args.prompt) |
| except Exception as e: |
| print(f"Error loading and updating prompt: {e}") |
| return |
|
|
| |
| print("Posting prompt to ComfyUI...") |
| prompt_id = post_prompt(API_URL, prompt_payload) |
|
|
| if prompt_id: |
| print(f"Prompt submitted successfully with ID: {prompt_id}") |
|
|
| |
| print("Monitoring queue...") |
| while True: |
| queue_pending, queue_running = get_queue_progress(QUEUE_URL) |
| if queue_pending is None or queue_running is None: |
| print("Failed to get queue progress. Aborting.") |
| break |
|
|
| if not queue_pending and not queue_running: |
| print("Queue is empty.") |
| break |
| else: |
| print(f"Queue pending: {len(queue_pending)}, running: {len(queue_running)}. Waiting...") |
| time.sleep(5) |
|
|
| |
| print("Getting history...") |
| status, video_path = get_history(HISTORY_URL, prompt_id) |
|
|
| if status == "success": |
| print(f"Video generation complete! Video path: {video_path}") |
| video_number = extract_video_number(video_path) |
| if video_number: |
| output_filename = f"{filename_prefix}_{video_number}.mp4" |
| else: |
| output_filename = f"{filename_prefix}_latest.mp4" |
| print(output_filename) |
| else: |
| print("Video generation failed.") |
| else: |
| print("Failed to submit prompt.") |
|
|
| if __name__ == "__main__": |
| main() |