cccode / train_avwm_stage1.py
WayneW's picture
Upload folder using huggingface_hub
705a8fd verified
raw
history blame
19.8 kB
from inference_avwm import model_forward_wrapper_v
import torch
# the first flag below was False when we tested this script but True makes A100 training a lot faster:
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
import matplotlib
matplotlib.use('Agg')
from collections import OrderedDict
from copy import deepcopy
from time import time
import argparse
import logging
import os
import matplotlib.pyplot as plt
import yaml
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, ConcatDataset
from torch.utils.data.distributed import DistributedSampler
from diffusers.models import AutoencoderKL
from distributed import init_distributed
from models import AVCDiT_models
from diffusion import create_diffusion
from datasets import TrainingDataset
from misc import transform
#################################################################################
# Training Helper Functions #
#################################################################################
def load_checkpoint_if_available(model, ema, opt, scaler, config, device, logger, args):
start_epoch = 0
train_steps = 0
latest_path = os.path.join(config['results_dir'], config['run_name'], "checkpoints", "latest.pth.tar")
if os.path.isfile(latest_path) or config.get('from_checkpoint', 0):
latest_path = latest_path if os.path.isfile(latest_path) else config.get('from_checkpoint', 0)
print("Loading model from ", latest_path)
checkpoint = torch.load(latest_path, map_location=f"cuda:{device}", weights_only=False)
ema_ckp = {k.replace('_orig_mod.', ''): v for k, v in checkpoint["ema"].items()}
remapped = {}
for k, v in ema_ckp.items():
new_k = k
# 1) pos_embed -> pos_embed_v
if k.startswith("pos_embed"):
new_k = k.replace("pos_embed", "pos_embed_v", 1)
# 2) x_embedder. -> x_embedder_v.
if new_k.startswith("x_embedder."):
new_k = new_k.replace("x_embedder.", "x_embedder_v.", 1)
# 3) blocks.*.mlp.*: .mlp. -> .mlp_v.
if new_k.startswith("blocks.") and ".mlp." in new_k:
new_k = new_k.replace(".mlp.", ".mlp_v.", 1)
remapped[new_k] = v
ema_ckp = remapped
model.load_state_dict(ema_ckp, strict=True)
print("Model weights loaded.")
ema.load_state_dict(ema_ckp, strict=True)
print("EMA weights loaded.")
if args.restart_from_checkpoint:
logger.info("Restarting training: epoch and step counters set to 0.")
else:
if "opt" in checkpoint:
opt_ckp = {k.replace('_orig_mod.', ''): v for k, v in checkpoint["opt"].items()}
opt.load_state_dict(opt_ckp)
print("Optimizer state loaded.")
if "scaler" in checkpoint and scaler is not None:
scaler.load_state_dict(checkpoint["scaler"])
print("GradScaler state loaded.")
if "epoch" in checkpoint:
start_epoch = checkpoint["epoch"] + 1
if "train_steps" in checkpoint:
train_steps = checkpoint["train_steps"]
logger.info(f"Resuming from epoch {start_epoch}, step {train_steps}")
return start_epoch, train_steps
@torch.no_grad()
def update_ema(ema_model, model, decay=0.9999):
"""
Step the EMA model towards the current model.
"""
ema_params = OrderedDict(ema_model.named_parameters())
model_params = OrderedDict(model.named_parameters())
for name, param in model_params.items():
name = name.replace('_orig_mod.', '')
ema_params[name].mul_(decay).add_(param.data, alpha=1 - decay)
def requires_grad(model, flag=True):
"""
Set requires_grad flag for all parameters in a model.
"""
for p in model.parameters():
p.requires_grad = flag
def cleanup():
"""
End DDP training.
"""
dist.destroy_process_group()
def create_logger(logging_dir):
"""
Create a logger that writes to a log file and stdout.
"""
if dist.get_rank() == 0: # real logger
logging.basicConfig(
level=logging.INFO,
format='[\033[34m%(asctime)s\033[0m] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[logging.StreamHandler(), logging.FileHandler(f"{logging_dir}/log.txt")]
)
logger = logging.getLogger(__name__)
else: # dummy logger (does nothing)
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
return logger
#################################################################################
# Training Loop #
#################################################################################
def main(args):
"""
Trains a new AVCDiT model.
"""
assert torch.cuda.is_available(), "Training currently requires at least one GPU."
# Setup DDP:
_, rank, device, _ = init_distributed()
# rank = dist.get_rank()
seed = args.global_seed * dist.get_world_size() + rank
torch.manual_seed(seed)
print(f"Starting rank={rank}, seed={seed}, world_size={dist.get_world_size()}.")
with open("config/eval_config.yaml", "r") as f:
default_config = yaml.safe_load(f)
config = default_config
with open(args.config, "r") as f:
user_config = yaml.safe_load(f)
config.update(user_config)
# Setup an experiment folder:
os.makedirs(config['results_dir'], exist_ok=True) # Make results folder (holds all experiment subfolders)
experiment_dir = f"{config['results_dir']}/{config['run_name']}" # Create an experiment folder
checkpoint_dir = f"{experiment_dir}/checkpoints" # Stores saved model checkpoints
if rank == 0:
os.makedirs(checkpoint_dir, exist_ok=True)
logger = create_logger(experiment_dir)
logger.info(f"Experiment directory created at {experiment_dir}")
else:
logger = create_logger(None)
# Create model:
tokenizer = AutoencoderKL.from_pretrained(f"stabilityai/sd-vae-ft-ema").to(device)
latent_size = config['image_size'] // 8
assert config['image_size'] % 8 == 0, "Image size must be divisible by 8 (for the VAE encoder)."
num_cond = config['context_size']
model = AVCDiT_models[config['model']](context_size=num_cond, input_size=latent_size, in_channels=4, mode="v").to(device)
ema = deepcopy(model).to(device) # Create an EMA of the model for use after training
requires_grad(ema, False)
# Setup optimizer (we used default Adam betas=(0.9, 0.999) and a constant learning rate of 1e-4 in our paper):
lr = float(config.get('lr', 1e-4))
opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0)
bfloat_enable = bool(hasattr(args, 'bfloat16') and args.bfloat16)
if bfloat_enable:
scaler = torch.amp.GradScaler()
# load existing checkpoint
# latest_path = os.path.join(checkpoint_dir, "latest.pth.tar")
# === Load checkpoint or start from a pretrained one ===
start_epoch, train_steps = load_checkpoint_if_available(
model, ema, opt, scaler if bfloat_enable else None, config, device, logger, args
)
# ~40% speedup but might leads to worse performance depending on pytorch version
if args.torch_compile:
model = torch.compile(model)
model = DDP(model, device_ids=[device])
diffusion = create_diffusion(timestep_respacing="") # default: 1000 steps, linear noise schedule
# ,predict_xstart=True
logger.info(f"AVCDiT Parameters: {sum(p.numel() for p in model.parameters()):,}")
train_dataset = []
test_dataset = []
for dataset_name in config["datasets"]:
data_config = config["datasets"][dataset_name]
for data_split_type in ["train", "test"]:
if data_split_type in data_config:
goals_per_obs = int(data_config["goals_per_obs"])
if data_split_type == 'test':
goals_per_obs = 4 # standardize testing
if "distance" in data_config:
min_dist_cat=data_config["distance"]["min_dist_cat"]
max_dist_cat=data_config["distance"]["max_dist_cat"]
else:
min_dist_cat=config["distance"]["min_dist_cat"]
max_dist_cat=config["distance"]["max_dist_cat"]
if "len_traj_pred" in data_config:
len_traj_pred=data_config["len_traj_pred"]
else:
len_traj_pred=config["len_traj_pred"]
dataset = TrainingDataset(
data_folder=data_config["data_folder"],
data_split_folder=data_config[data_split_type],
dataset_name=dataset_name,
image_size=config["image_size"],
min_dist_cat=min_dist_cat,
max_dist_cat=max_dist_cat,
len_traj_pred=len_traj_pred,
context_size=config["context_size"],
normalize=config["normalize"],
goals_per_obs=goals_per_obs,
transform=transform,
predefined_index=None,
traj_stride=1,
evaluate=(data_split_type=="test")
)
if data_split_type == "train":
train_dataset.append(dataset)
else:
test_dataset.append(dataset)
print(f"Dataset: {dataset_name} ({data_split_type}), size: {len(dataset)}")
# combine all the datasets from different robots
print(f"Combining {len(train_dataset)} datasets.")
train_dataset = ConcatDataset(train_dataset)
test_dataset = ConcatDataset(test_dataset)
sampler = DistributedSampler(
train_dataset,
num_replicas=dist.get_world_size(),
rank=rank,
shuffle=True,
seed=args.global_seed
)
loader = DataLoader(
train_dataset,
batch_size=config['batch_size'],
shuffle=False,
sampler=sampler,
num_workers=config['num_workers'],
pin_memory=True,
drop_last=True,
persistent_workers=True
)
logger.info(f"Dataset contains {len(train_dataset):,} images")
# Prepare models for training:
model.train() # important! This enables embedding dropout for classifier-free guidance
ema.eval() # EMA model should always be in eval mode
# Variables for monitoring/logging purposes:
log_steps = 0
running_loss = 0
start_time = time()
logger.info(f"Training for {args.epochs} epochs...")
for epoch in range(start_epoch, args.epochs):
sampler.set_epoch(epoch)
steps_per_epoch = len(loader)
if rank == 0:
logger.info(f"Epoch {epoch} contains {steps_per_epoch} steps.")
logger.info(f"Beginning epoch {epoch}...")
for x, _, y, _, rel_t in loader:
x = x.to(device, non_blocking=True)
y = y.to(device, non_blocking=True)
rel_t = rel_t.to(device, non_blocking=True)
with torch.amp.autocast('cuda', enabled=bfloat_enable, dtype=torch.bfloat16):
with torch.no_grad():
# Map input images to latent space + normalize latents:
B, T = x.shape[:2]
x = x.flatten(0,1)
x = tokenizer.encode(x).latent_dist.sample().mul_(0.18215)
x = x.unflatten(0, (B, T))
num_goals = T - num_cond
x_start = x[:, num_cond:].flatten(0, 1)
x_cond = x[:, :num_cond].unsqueeze(1).expand(B, num_goals, num_cond, x.shape[2], x.shape[3], x.shape[4]).flatten(0, 1)
y = y.flatten(0, 1)
rel_t = rel_t.flatten(0, 1)
t = torch.randint(0, diffusion.num_timesteps, (x_start.shape[0],), device=device)
model_kwargs = dict(y=y, x_cond=x_cond, rel_t=rel_t)
loss_dict = diffusion.training_losses(model, x_start, t, model_kwargs)
loss = loss_dict["loss"].mean()
if not bfloat_enable:
opt.zero_grad()
loss.backward()
opt.step()
else:
scaler.scale(loss).backward()
if config.get('grad_clip_val', 0) > 0:
scaler.unscale_(opt)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=config['grad_clip_val'])
scaler.step(opt)
scaler.update()
update_ema(ema, model.module)
# Log loss values:
running_loss += loss.detach().item()
log_steps += 1
train_steps += 1
if train_steps % args.log_every == 0:
# Measure training speed:
torch.cuda.synchronize()
end_time = time()
steps_per_sec = log_steps / (end_time - start_time)
samples_per_sec = dist.get_world_size()*x_cond.shape[0]*steps_per_sec
# Reduce loss history over all processes:
avg_loss = torch.tensor(running_loss / log_steps, device=device)
dist.all_reduce(avg_loss, op=dist.ReduceOp.SUM)
avg_loss = avg_loss.item() / dist.get_world_size()
total_steps = len(loader) * args.epochs
progress_pct = train_steps / total_steps * 100
remaining_steps = total_steps - train_steps
eta_seconds = remaining_steps / steps_per_sec if steps_per_sec > 0 else 0
eta_hours = eta_seconds / 3600
logger.info(f"(step={train_steps:07d}) Train Loss: {avg_loss:.4f}, Train Steps/Sec: {steps_per_sec:.2f}, Samples/Sec: {samples_per_sec:.2f}")
logger.info(f"Progress: {progress_pct:.2f}% | ETA: {eta_hours:.1f}h")
# Reset monitoring variables:
running_loss = 0
log_steps = 0
start_time = time()
# Save DiT checkpoint:
if train_steps % args.ckpt_every == 0 and train_steps > 0:
if rank == 0:
checkpoint = {
"model": model.module.state_dict(),
"ema": ema.state_dict(),
"opt": opt.state_dict(),
"args": args,
"epoch": epoch,
"train_steps": train_steps
}
if bfloat_enable:
checkpoint.update({"scaler": scaler.state_dict()})
checkpoint_path = f"{checkpoint_dir}/latest.pth.tar"
torch.save(checkpoint, checkpoint_path)
if train_steps % (10*args.ckpt_every) == 0 and train_steps > 0:
checkpoint_path = f"{checkpoint_dir}/{train_steps:07d}.pth.tar"
torch.save(checkpoint, checkpoint_path)
logger.info(f"Saved checkpoint to {checkpoint_path}")
if train_steps % args.eval_every == 0 and train_steps > 0:
eval_start_time = time()
# validation / test set evaluation
save_dir = os.path.join(experiment_dir, str(train_steps))
sim_score_val = evaluate(ema, tokenizer, diffusion, test_dataset, rank, config["batch_size"], config["num_workers"], latent_size, device, save_dir, args.global_seed, bfloat_enable, num_cond)
dist.barrier()
eval_end_time = time()
eval_time = eval_end_time - eval_start_time
# logger.info(f"(step={train_steps:07d}) Val Perceptual Loss: {sim_score_val:.4f}, Train Perceptual Loss: {sim_score_train:.4f}, Eval Time: {eval_time:.2f}")
logger.info(f"(step={train_steps:07d}) Val Perceptual Loss: {sim_score_val:.4f}, Eval Time: {eval_time:.2f}")
model.eval()
logger.info("Done!")
cleanup()
@torch.no_grad
def evaluate(model, vae, diffusion, test_dataloaders, rank, batch_size, num_workers, latent_size, device, save_dir, seed, bfloat_enable, num_cond):
sampler = DistributedSampler(
test_dataloaders,
num_replicas=dist.get_world_size(),
rank=rank,
shuffle=True,
seed=seed
)
loader = DataLoader(
test_dataloaders,
batch_size=batch_size,
shuffle=False,
sampler=sampler,
num_workers=num_workers,
pin_memory=True,
drop_last=True
)
from dreamsim import dreamsim
eval_model, _ = dreamsim(pretrained=True)
score = torch.tensor(0.).to(device)
n_samples = torch.tensor(0).to(device)
# Run for 1 step
for x, _, y, _, rel_t, _ in loader:
x = x.to(device)
y = y.to(device)
rel_t = rel_t.to(device).flatten(0, 1)
with torch.amp.autocast('cuda', enabled=True, dtype=torch.bfloat16):
B, T = x.shape[:2]
num_goals = T - num_cond
samples = model_forward_wrapper_v((model, diffusion, vae), x, y, num_timesteps=None, latent_size=latent_size, device=device, num_cond=num_cond, num_goals=num_goals, rel_t=rel_t)
x_start_pixels = x[:, num_cond:].flatten(0, 1)
x_cond_pixels = x[:, :num_cond].unsqueeze(1).expand(B, num_goals, num_cond, x.shape[2], x.shape[3], x.shape[4]).flatten(0, 1)
samples = samples * 0.5 + 0.5
x_start_pixels = x_start_pixels * 0.5 + 0.5
x_cond_pixels = x_cond_pixels * 0.5 + 0.5
res = eval_model(x_start_pixels, samples)
score += res.sum()
n_samples += len(res)
break
if rank == 0:
os.makedirs(save_dir, exist_ok=True)
for i in range(min(samples.shape[0], 10)):
_, ax = plt.subplots(1,3,dpi=256)
ax[0].imshow((x_cond_pixels[i, -1].permute(1,2,0).cpu().numpy()*255).astype('uint8'))
ax[1].imshow((x_start_pixels[i].permute(1,2,0).cpu().numpy()*255).astype('uint8'))
ax[2].imshow((samples[i].permute(1,2,0).cpu().float().numpy()*255).astype('uint8'))
plt.savefig(f'{save_dir}/{i}.png')
plt.close()
dist.all_reduce(score)
dist.all_reduce(n_samples)
sim_score = score/n_samples
return sim_score
def get_args_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, required=True)
parser.add_argument("--epochs", type=int, default=300)
# parser.add_argument("--global-batch-size", type=int, default=256)
parser.add_argument("--global-seed", type=int, default=0)
parser.add_argument("--log-every", type=int, default=100)
parser.add_argument("--ckpt-every", type=int, default=2000)
parser.add_argument("--eval-every", type=int, default=5000)
parser.add_argument("--bfloat16", type=int, default=1)
parser.add_argument("--torch-compile", type=int, default=1)
parser.add_argument("--restart-from-checkpoint", type=int, default=0,
help="If 1, only load model weights and reset epoch/step to zero (cold start)")
return parser
if __name__ == "__main__":
args = get_args_parser().parse_args()
main(args)