| import os |
| import pandas as pd |
| import torch |
| import numpy as np |
| import random |
|
|
| from diffusers import StableDiffusionPipeline |
| from diffusers.utils import export_to_video |
|
|
| |
| os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
|
|
| def set_seed(seed: int = 42): |
| """ |
| Set random seed for reproducibility |
| """ |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| torch.backends.cudnn.deterministic = True |
| torch.backends.cudnn.benchmark = False |
|
|
| |
| set_seed(42) |
|
|
| def generate_image(pipeline, prompt: str, output_path: str): |
| """ |
| Generate an image using the Stable Diffusion model and save it |
| """ |
| with torch.autocast("cuda"): |
| image = pipeline(prompt).images[0] |
| image.save(output_path) |
|
|
| import torch |
| from diffusers.utils import export_to_video |
|
|
| def generate_video(pipeline, pipeline_type: str, prompt: str, output_path: str, **kwargs): |
| """ |
| Generate a video using different video generation pipelines and save as mp4 or gif |
| |
| Parameters: |
| pipeline: Loaded video generation pipeline |
| pipeline_type: Type of video model, options are "cogvideo", "ltx", "hunyuan", "animatediff" |
| prompt: Text description |
| output_path: Output video path (animatediff defaults to gif, others to mp4) |
| kwargs: Hyperparameter settings, e.g., width, height, num_frames, num_inference_steps, fps, guidance_scale, etc. |
| """ |
| if pipeline_type == "cogvideo": |
| |
| video = pipeline( |
| prompt=prompt, |
| num_videos_per_prompt=kwargs.get("num_videos_per_prompt", 1), |
| num_inference_steps=kwargs.get("num_inference_steps", 50), |
| num_frames=kwargs.get("num_frames", 49), |
| guidance_scale=kwargs.get("guidance_scale", 6), |
| generator=kwargs.get("generator", torch.Generator(device="cuda").manual_seed(42)) |
| ).frames[0] |
| export_to_video(video, output_path, fps=kwargs.get("fps", 8)) |
| elif pipeline_type == "ltx": |
| |
| video = pipeline( |
| prompt=prompt, |
| negative_prompt=kwargs.get("negative_prompt", "worst quality, inconsistent motion, blurry, jittery, distorted"), |
| width=kwargs.get("width", 704), |
| height=kwargs.get("height", 480), |
| num_frames=kwargs.get("num_frames", 161), |
| num_inference_steps=kwargs.get("num_inference_steps", 50), |
| ).frames[0] |
| export_to_video(video, output_path, fps=kwargs.get("fps", 15)) |
| elif pipeline_type == "hunyuan": |
| |
| video = pipeline( |
| prompt=prompt, |
| width=kwargs.get("width", 512), |
| height=kwargs.get("height", 320), |
| num_frames=kwargs.get("num_frames", 61), |
| num_inference_steps=kwargs.get("num_inference_steps", 30), |
| ).frames[0] |
| export_to_video(video, output_path, fps=kwargs.get("fps", 15)) |
| elif pipeline_type == "animatediff": |
| |
| video = pipeline( |
| prompt=prompt, |
| guidance_scale=kwargs.get("guidance_scale", 1.0), |
| num_inference_steps=kwargs.get("num_inference_steps", 4) |
| ).frames[0] |
| export_to_video(video, output_path) |
| else: |
| raise ValueError(f"Unknown pipeline type: {pipeline_type}") |
|
|
| def load_video_pipeline(pipeline_type: str): |
| """ |
| Load the corresponding video generation model based on pipeline_type |
| |
| Parameters: |
| pipeline_type: Options are "cogvideo", "ltx", "hunyuan", "animatediff" |
| Returns: |
| Loaded and initialized video generation pipeline |
| """ |
| if pipeline_type == "cogvideo": |
| from diffusers import CogVideoXPipeline |
| print("Loading video generation model (CogVideoX-5b)...") |
| pipe = CogVideoXPipeline.from_pretrained( |
| "THUDM/CogVideoX-5b", |
| torch_dtype=torch.bfloat16 |
| ) |
| pipe.vae.enable_slicing() |
| pipe.vae.enable_tiling() |
| pipe.to("cuda") |
| return pipe |
| elif pipeline_type == "ltx": |
| from diffusers import LTXPipeline |
| print("Loading video generation model (LTX-Video)...") |
| pipe = LTXPipeline.from_pretrained( |
| "Lightricks/LTX-Video", |
| torch_dtype=torch.bfloat16 |
| ) |
| pipe.to("cuda") |
| return pipe |
| elif pipeline_type == "hunyuan": |
| from diffusers import BitsAndBytesConfig, HunyuanVideoTransformer3DModel, HunyuanVideoPipeline |
| from diffusers.hooks import apply_layerwise_casting |
| from transformers import LlamaModel |
| print("Loading video generation model (HunyuanVideo)...") |
| model_id = "hunyuanvideo-community/HunyuanVideo" |
| quantization_config = BitsAndBytesConfig( |
| load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16 |
| ) |
| text_encoder = LlamaModel.from_pretrained(model_id, subfolder="text_encoder", torch_dtype=torch.float16) |
| apply_layerwise_casting(text_encoder, storage_dtype=torch.float8_e4m3fn, compute_dtype=torch.float16) |
| transformer = HunyuanVideoTransformer3DModel.from_pretrained( |
| model_id, |
| subfolder="transformer", |
| quantization_config=quantization_config, |
| torch_dtype=torch.bfloat16, |
| ) |
| pipe = HunyuanVideoPipeline.from_pretrained( |
| model_id, transformer=transformer, text_encoder=text_encoder, torch_dtype=torch.float16 |
| ) |
| pipe.vae.enable_tiling() |
| pipe.enable_model_cpu_offload() |
| return pipe |
| elif pipeline_type == "animatediff": |
| from diffusers import AnimateDiffPipeline, MotionAdapter, EulerDiscreteScheduler |
| from huggingface_hub import hf_hub_download |
| from safetensors.torch import load_file |
| print("Loading video generation model (AnimateDiff-Lightning)...") |
| device = "cuda" |
| dtype = torch.float16 |
| step = 4 |
| repo = "ByteDance/AnimateDiff-Lightning" |
| ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors" |
| base = "emilianJR/epiCRealism" |
| adapter = MotionAdapter().to(device, dtype) |
| |
| adapter.load_state_dict(load_file(hf_hub_download(repo, ckpt), device=device)) |
| pipe = AnimateDiffPipeline.from_pretrained(base, motion_adapter=adapter, torch_dtype=dtype).to(device) |
| pipe.scheduler = EulerDiscreteScheduler.from_config( |
| pipe.scheduler.config, timestep_spacing="trailing", beta_schedule="linear" |
| ) |
| return pipe |
| else: |
| raise ValueError(f"Unknown pipeline type: {pipeline_type}") |
|
|
| def main(): |
| |
| |
| print("Loading image generation model (Stable Diffusion)...") |
| pipe_image = StableDiffusionPipeline.from_pretrained( |
| "runwayml/stable-diffusion-v1-5", |
| torch_dtype=torch.float16 |
| ) |
| pipe_image.to("cuda") |
| |
| |
|
|
| |
| video_pipeline_type = "ltx" |
|
|
| |
| tasks1 = [ |
| { |
| "csv_file": "output_prompt_rag_more/prompt_ai_concrete_rag_10_testset.csv", |
| "image_dir": "output_ai_covers_concrete_rag_10_testset", |
| "video_dir": "output_ai_videos_concrete_rag_10_testset_ltx" |
| }, |
| { |
| "csv_file": "output_prompt_rag_more/prompt_ai_abstract_rag_10_testset.csv", |
| "image_dir": "output_ai_covers_abstract_rag_10_testset", |
| "video_dir": "output_ai_videos_abstract_rag_10_testset_ltx" |
| } |
|
|
| ] |
|
|
| |
| |
| |
| |
| |
| tasks = tasks1 |
| pipe_video = load_video_pipeline(video_pipeline_type) |
|
|
| |
| for task in tasks: |
| csv_file = task["csv_file"] |
| image_dir = task["image_dir"] |
| video_dir = task["video_dir"] |
| os.makedirs(image_dir, exist_ok=True) |
| print(f"Ensuring directory exists: {image_dir}") |
| os.makedirs(video_dir, exist_ok=True) |
| print(f"Ensuring directory exists: {video_dir}") |
|
|
| if not os.path.exists(csv_file): |
| print(f"Error: CSV file {csv_file} not found, please check the path.") |
| continue |
|
|
| df = pd.read_csv(csv_file) |
| for idx, row in df.iterrows(): |
| user_prompt = str(row["user prompt"]) |
| title = str(row["title"]) |
| cover_prompt = str(row["cover prompt"]) |
| video_prompt = str(row["video prompt"]) |
|
|
| |
| image_filename = os.path.join(image_dir, f"{user_prompt}.png") |
| video_filename = os.path.join(video_dir, f"{user_prompt}.mp4") |
|
|
| print("-" * 50) |
| print(f"[CSV: {csv_file}] - [{idx}] Starting generation: {user_prompt}") |
| print(f"Title: {title}") |
| print(f"Cover Prompt: {cover_prompt}") |
| print(f"Video Prompt: {video_prompt}") |
|
|
| if os.path.exists(image_filename) and os.path.exists(video_filename): |
| print(f"File already exists, skipping generation: {video_filename}") |
| continue |
|
|
| |
| try: |
| generate_image(pipe_image, cover_prompt, image_filename) |
| print(f"Image saved to {image_filename}") |
| except Exception as e: |
| print(f"Image generation failed: {e}") |
|
|
| |
| try: |
| generate_video( |
| pipe_video, |
| pipeline_type=video_pipeline_type, |
| prompt=video_prompt, |
| output_path=video_filename |
| |
| |
| ) |
| print(f"Video saved to {video_filename}") |
| except Exception as e: |
| print(f"Video generation failed: {e}") |
|
|
| print("All generation tasks completed!") |
|
|
| if __name__ == "__main__": |
| main() |
|
|