| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| from pathlib import Path |
| from typing import List, Optional |
|
|
| import nemo_run as run |
| import pandas as pd |
| from numpy import nan |
|
|
| from nemo.collections.llm.gpt.data.mock import MockDataModule |
| from nemo.collections.llm.recipes.precision.mixed_precision import ( |
| bf16_with_fp8_current_scaling_mixed, |
| bf16_with_fp8_mixed, |
| bf16_with_fp8_subchannel_scaling_mixed, |
| bf16_with_mxfp8_mixed, |
| ) |
| from nemo.lightning.pytorch.callbacks.flops_callback import FLOPsMeasurementCallback |
| from nemo.lightning.pytorch.callbacks.model_checkpoint import ModelCheckpoint |
| from nemo.utils import logging |
|
|
| from .utils import get_comm_overlap_callback_idx |
|
|
|
|
| def get_csv_configs(gpu: str, task: str, model_name: str, model_size: str, args) -> pd.DataFrame: |
| """ |
| Get recommended configs tuned for performance from a csv file. |
| User (command line) provided args override the recommended configs. |
| """ |
| script_dir = str(Path(__file__).parent.absolute()) |
| recommended_configs_csv = os.path.join(script_dir, "recommended_model_configs", f"model_configs_{gpu}.csv") |
| logging.info(f"Using {recommended_configs_csv} for loading default recommended model configs") |
|
|
| config_df = pd.DataFrame() |
| if os.path.isfile(recommended_configs_csv): |
| df = pd.read_csv(recommended_configs_csv) |
| config_df = df[ |
| (df["task"] == task) |
| & (df["model"] == model_name) |
| & (df["size"] == model_size) |
| & (df["dtype"] == args.compute_dtype) |
| & (args.num_gpus is None or df['num_gpus'] == args.num_gpus) |
| ] |
| config_df = config_df.replace({nan: None}) |
| if len(config_df) == 0: |
| logging.warning(f"Missing performance configs for {task}-{model_name}-{model_size}-{args.compute_dtype}") |
| logging.warning("Make sure you provide all necessary arguments in the command line") |
|
|
| config = config_df.to_dict(orient='records')[0] if len(config_df) > 0 else {} |
|
|
| return config |
|
|
|
|
| def get_user_configs(gpu: str, task: str, model_name: str, model_size: str, args) -> List[int]: |
| """ |
| Choose recommended configs tuned for performance from a csv file if available. |
| User (command line) provided args override the recommended configs. |
| |
| NOTE: pre-train and PEFT recommended configs available for H100 and B200. |
| |
| Args: |
| gpu (str): target GPU machine for experiment. Options- ['h100', 'b200'] |
| task (str): experiment task. Options- ['pre_train', 'sft', 'lora'] |
| model_name (str): target model for experiment. E.g.: 'llama3', 'mixtral' |
| model_size (str): size of target model. E.g.: '8b' (for llama3) |
| """ |
| config = get_csv_configs(gpu.lower(), task, model_name, model_size, args) |
|
|
| if gpu.lower() == "gb200" and args.gpus_per_node > 4: |
| args.gpus_per_node = 4 |
| logging.warning("GB200 has 4 GPUs per node. Setting gpus_per_node to 4.") |
| num_gpus = config.get("num_gpus") if args.num_gpus is None else args.num_gpus |
| num_nodes = -(num_gpus // -args.gpus_per_node) |
| mbs = config.get("mbs") if args.micro_batch_size is None else args.micro_batch_size |
| gbs = config.get("gbs") if args.global_batch_size is None else args.global_batch_size |
| tp_size = config.get("tp_size") if args.tensor_parallel_size is None else args.tensor_parallel_size |
| pp_size = config.get("pp_size") if args.pipeline_parallel_size is None else args.pipeline_parallel_size |
| cp_size = config.get("cp_size") if args.context_parallel_size is None else args.context_parallel_size |
| ep_size = config.get("ep_size") if args.expert_parallel_size is None else args.expert_parallel_size |
| vp_size = args.virtual_pipeline_parallel_size |
| vp_size = config.get("vp_size") if vp_size is None else vp_size |
| etp_size = args.expert_tensor_parallel_size |
| etp_size = config.get("etp_size") if etp_size is None else etp_size |
|
|
| enable_cuda_graphs = config.get("cuda_graphs") if args.cuda_graphs is None else args.cuda_graphs |
| enable_cuda_graphs = False if enable_cuda_graphs is None else bool(int(enable_cuda_graphs)) |
|
|
| use_mcore_fsdp = config.get("use_mcore_fsdp") if args.use_mcore_fsdp is None else args.use_mcore_fsdp |
| use_mcore_fsdp = False if use_mcore_fsdp is None else bool(int(use_mcore_fsdp)) |
|
|
| recompute_layers = config.get("recompute_layers") if args.recompute_layers is None else args.recompute_layers |
| recompute_layers = 0 if recompute_layers is None else int(recompute_layers) |
| activation_offload_layers = ( |
| config.get("activation_offload_layers") |
| if args.activation_offload_layers is None |
| else args.activation_offload_layers |
| ) |
| activation_offload_layers = 0 if activation_offload_layers is None else int(activation_offload_layers) |
|
|
| if args.recompute_modules is not None: |
| recompute_modules = args.recompute_modules |
| assert isinstance(recompute_modules, list), "recompute_modules must be a list" |
| elif config.get("recompute_modules") is not None: |
| recompute_modules = config.get("recompute_modules").split('/') |
| else: |
| recompute_modules = None |
|
|
| keep_fsdp_fp8_transpose_cache = ( |
| config.get("keep_fsdp_fp8_transpose_cache") |
| if args.keep_fsdp_fp8_transpose_cache is None |
| else args.keep_fsdp_fp8_transpose_cache |
| ) |
| keep_fsdp_fp8_transpose_cache = ( |
| False if keep_fsdp_fp8_transpose_cache is None else bool(int(keep_fsdp_fp8_transpose_cache)) |
| ) |
|
|
| use_user_buffer_registration = ( |
| config.get("use_user_buffer_registration") |
| if args.use_user_buffer_registration is None |
| else args.use_user_buffer_registration |
| ) |
| use_user_buffer_registration = ( |
| False if use_user_buffer_registration is None else bool(int(use_user_buffer_registration)) |
| ) |
|
|
| use_sharp = config.get("use_sharp") if args.use_sharp is None else args.use_sharp |
| use_sharp = False if use_sharp is None else bool(int(use_sharp)) |
|
|
| kwargs = num_nodes, mbs, gbs, tp_size, pp_size, cp_size, vp_size, ep_size, etp_size |
| kwargs = [int(arg) if arg is not None else arg for arg in kwargs] |
| kwargs += [ |
| enable_cuda_graphs, |
| use_mcore_fsdp, |
| recompute_layers, |
| activation_offload_layers, |
| recompute_modules, |
| keep_fsdp_fp8_transpose_cache, |
| use_user_buffer_registration, |
| use_sharp, |
| ] |
|
|
| |
| logging.info("Received model parallel configs: ") |
| logging.info(f"{num_nodes=}") |
| logging.info(f"num_gpus_per_node={args.gpus_per_node}") |
| logging.info(f"{mbs=}") |
| logging.info(f"{gbs=}") |
| logging.info(f"{tp_size=}") |
| logging.info(f"{pp_size=}") |
| logging.info(f"{cp_size=}") |
| logging.info(f"{vp_size=}") |
| logging.info(f"{ep_size=}") |
| logging.info(f"{etp_size=}") |
| logging.info(f"{enable_cuda_graphs=}") |
| logging.info(f"{use_mcore_fsdp=}") |
| logging.info(f"{recompute_layers=}") |
| logging.info(f"{activation_offload_layers=}") |
| logging.info(f"{recompute_modules=}") |
| logging.info(f"{keep_fsdp_fp8_transpose_cache=}") |
| logging.info(f"{use_user_buffer_registration=}") |
| logging.info(f"{use_sharp=}") |
|
|
| return kwargs |
|
|
|
|
| def set_mcore_fsdp_configs(recipe, comm_overlap_callback_idx: int | None, tp_size: int | None): |
| """ |
| Set Mcore FSDP related configs. |
| """ |
| recipe.model.config.init_model_with_meta_device = True |
| recipe.trainer.strategy.fsdp = "megatron" |
| recipe.trainer.strategy.ddp.data_parallel_sharding_strategy = "optim_grads_params" |
| |
| if recipe.trainer.plugins.grad_reduce_in_fp32: |
| recipe.trainer.strategy.ddp.average_in_collective = False |
| recipe.trainer.strategy.ddp.keep_fp8_transpose_cache_when_using_custom_fsdp = False |
| recipe.model.config.gradient_accumulation_fusion = False |
| if ( |
| comm_overlap_callback_idx is not None |
| and recipe.trainer.callbacks[comm_overlap_callback_idx].defer_embedding_wgrad_compute |
| ): |
| logging.warning("Disabling deferring embedding wgrad compute because it cannot work with FSDP together.") |
| recipe.trainer.callbacks[comm_overlap_callback_idx].defer_embedding_wgrad_compute = False |
|
|
| return recipe |
|
|
|
|
| def set_precision_configs(recipe, compute_dtype: str, fp8_recipe: str | None = None): |
| """ |
| Set precision related configs. |
| """ |
| if compute_dtype is None: |
| return recipe |
|
|
| if compute_dtype.lower() == "bf16": |
| recipe.optim.config.use_precision_aware_optimizer = True |
|
|
| if compute_dtype is not None and compute_dtype.lower() == "fp8": |
| if fp8_recipe is None: |
| fp8_recipe = "ds" |
| if fp8_recipe.lower() == "ds": |
| recipe.trainer.plugins = bf16_with_fp8_mixed() |
| elif fp8_recipe.lower() == "cs": |
| recipe.trainer.plugins = bf16_with_fp8_current_scaling_mixed() |
| |
| recipe.trainer.plugins.first_last_layers_bf16 = False |
| elif fp8_recipe.lower() == "mxfp8": |
| recipe.trainer.plugins = bf16_with_mxfp8_mixed() |
| elif fp8_recipe.lower() == "ss": |
| recipe.trainer.plugins = bf16_with_fp8_subchannel_scaling_mixed() |
|
|
| recipe.trainer.plugins.grad_reduce_in_fp32 = False |
|
|
| |
| |
| if compute_dtype.lower() == "fp8" and fp8_recipe.lower() == "mxfp8": |
| recipe.trainer.strategy.ddp.reuse_grad_buf_for_mxfp8_param_ag = True |
| recipe.optim.config.reuse_grad_buf_for_mxfp8_param_ag = True |
| comm_overlap_callback_idx = get_comm_overlap_callback_idx(recipe.trainer.callbacks) |
| if comm_overlap_callback_idx is not None: |
| recipe.trainer.callbacks[comm_overlap_callback_idx].overlap_param_gather = False |
| logging.warning( |
| "When using MXFP8, to reduce memory usage, we use reuse_grad_buf_for_mxfp8_param_ag. " |
| "Disabling AG overlap because it is not supported with reuse_grad_buf_for_mxfp8_param_ag." |
| ) |
|
|
| return recipe |
|
|
|
|
| def set_recompute_configs( |
| recipe, |
| recompute_layers: int, |
| activation_offload_layers: int, |
| recompute_modules: Optional[List[str]], |
| ): |
| """ |
| Set activation recomputing and offloading related configs. |
| """ |
| if recompute_layers > 0: |
| recipe.model.config.recompute_granularity = "full" |
| recipe.model.config.recompute_method = "block" |
| recipe.model.config.recompute_num_layers = recompute_layers |
|
|
| |
| if activation_offload_layers > 0: |
| recipe.model.config.cpu_offloading = True |
| recipe.model.config.cpu_offloading_weights = False |
| recipe.model.config.cpu_offloading_num_layers = activation_offload_layers |
|
|
| |
| if recompute_modules is not None: |
| recipe.model.config.recompute_modules = recompute_modules |
| assert ( |
| recipe.model.config.recompute_granularity == "selective" |
| ), "recompute_granularity must be selective when recompute_modules is provided" |
| assert ( |
| recipe.model.config.recompute_num_layers is None |
| ), "recompute_num_layers must be None when recompute_modules is provided" |
|
|
| return recipe |
|
|
|
|
| def set_cuda_graph_configs(recipe, enable_cuda_graphs: bool, task: str): |
| """ |
| Set CUDA graph related configs. |
| """ |
| recipe.model.config.enable_cuda_graph = enable_cuda_graphs |
| recipe.trainer.strategy.use_te_rng_tracker = enable_cuda_graphs |
| if ( |
| task in ["none", "lora"] |
| and hasattr(recipe.data, "packed_sequence_specs") |
| and recipe.data.packed_sequence_specs is not None |
| ): |
| recipe.data.packed_sequence_specs.pad_cu_seqlens = enable_cuda_graphs |
|
|
| return recipe |
|
|
|
|
| def set_perf_optimization_configs( |
| recipe, |
| use_mcore_fsdp: bool, |
| enable_cuda_graphs: bool, |
| task: str, |
| tp_size: int | None, |
| compute_dtype: str, |
| fp8_recipe: str | None, |
| recompute_layers: int, |
| activation_offload_layers: int, |
| recompute_modules: Optional[List[str]], |
| use_fsdp_double_buffer: Optional[bool] = None, |
| use_user_buffer_registration: Optional[bool] = None, |
| use_sharp: Optional[bool] = None, |
| keep_fsdp_fp8_transpose_cache: Optional[bool] = None, |
| ): |
| """ |
| Set performance optimization related configs. |
| """ |
| |
| recipe.model.config.cross_entropy_fusion_impl = "te" |
|
|
| if use_fsdp_double_buffer: |
| assert use_mcore_fsdp == True, "use_fsdp_double_buffer requires use_mcore_fsdp to be True" |
|
|
| if use_mcore_fsdp and enable_cuda_graphs: |
| logging.warning("Currently, cuda graphs are not supported with FSDP. Disabling cuda graphs.") |
| enable_cuda_graphs = False |
| recipe = set_cuda_graph_configs(recipe, enable_cuda_graphs, task) |
|
|
| if use_mcore_fsdp: |
| comm_overlap_callback_idx = get_comm_overlap_callback_idx(recipe.trainer.callbacks) |
| recipe = set_mcore_fsdp_configs(recipe, comm_overlap_callback_idx, tp_size) |
|
|
| recipe = set_precision_configs(recipe, compute_dtype, fp8_recipe) |
|
|
| recipe = set_recompute_configs(recipe, recompute_layers, activation_offload_layers, recompute_modules) |
|
|
| recipe.trainer.strategy.use_sharp = bool(use_sharp) |
|
|
| is_ddp_obj = hasattr(recipe.trainer.strategy, "ddp") and not isinstance(recipe.trainer.strategy.ddp, str) |
| if use_user_buffer_registration and not is_ddp_obj: |
| logging.warning("DDP is not configured. Cannot use user buffer registration.") |
| if is_ddp_obj: |
| |
| recipe.trainer.strategy.ddp.check_for_nan_in_grad = False |
| recipe.trainer.strategy.ddp.check_for_large_grads = False |
| recipe.trainer.strategy.ddp.nccl_ub = bool(use_user_buffer_registration) |
| recipe.trainer.strategy.ddp.fsdp_double_buffer = bool(use_fsdp_double_buffer) |
| recipe.trainer.strategy.ddp.keep_fp8_transpose_cache_when_using_custom_fsdp = bool( |
| keep_fsdp_fp8_transpose_cache |
| ) |
|
|
| return recipe |
|
|
|
|
| def set_primary_perf_configs( |
| recipe, |
| task: str, |
| num_nodes: int, |
| num_gpus_per_node: int, |
| mbs: int, |
| gbs: int, |
| max_steps: int, |
| tp_size: int, |
| pp_size: int, |
| cp_size: int, |
| vp_size: int, |
| ep_size: int, |
| etp_size: Optional[int] = None, |
| enable_cuda_graphs: bool = False, |
| use_mcore_fsdp: bool = False, |
| use_fsdp_double_buffer: Optional[bool] = None, |
| use_user_buffer_registration: Optional[bool] = None, |
| use_sharp: Optional[bool] = None, |
| recompute_layers: int = 0, |
| activation_offload_layers: int = 0, |
| compute_dtype: str = None, |
| fp8_recipe: str = None, |
| recompute_modules: Optional[List[str]] = None, |
| nccl_communicator_config_path: str = None, |
| keep_fsdp_fp8_transpose_cache: Optional[bool] = None, |
| ): |
| """Set experiment configs we usually tune for performance of all models.""" |
| |
| recipe.trainer.num_nodes = num_nodes |
| recipe.trainer.devices = num_gpus_per_node |
| recipe.trainer.max_steps = max_steps |
|
|
| recipe.trainer.val_check_interval = max_steps |
| recipe.trainer.limit_val_batches = 0 |
|
|
| |
| recipe.data.micro_batch_size = mbs |
| recipe.data.global_batch_size = gbs |
| if recipe.data.__fn_or_cls__ == MockDataModule: |
| recipe.data.num_train_samples = max_steps * gbs |
|
|
| |
| recipe.trainer.strategy.tensor_model_parallel_size = tp_size |
| recipe.trainer.strategy.pipeline_model_parallel_size = pp_size |
| recipe.trainer.strategy.context_parallel_size = cp_size |
| recipe.trainer.strategy.virtual_pipeline_model_parallel_size = None if vp_size == 1 else vp_size |
| recipe.trainer.strategy.expert_model_parallel_size = ep_size |
| recipe.trainer.strategy.expert_tensor_parallel_size = etp_size |
| recipe.trainer.strategy.sequence_parallel = bool(tp_size > 1) |
| if nccl_communicator_config_path is not None: |
| recipe.trainer.strategy.nccl_communicator_config_path = nccl_communicator_config_path |
|
|
| |
| comm_overlap_callback_idx = get_comm_overlap_callback_idx(recipe.trainer.callbacks) |
| dp_size = (num_nodes * num_gpus_per_node) / (tp_size * pp_size * cp_size) |
| if comm_overlap_callback_idx is not None: |
| |
| recipe.trainer.callbacks[comm_overlap_callback_idx].overlap_param_gather_with_optimizer_step = bool( |
| dp_size > 1 and pp_size > 1 and vp_size and vp_size > 1 |
| ) |
|
|
| recipe = set_perf_optimization_configs( |
| recipe=recipe, |
| use_mcore_fsdp=use_mcore_fsdp, |
| enable_cuda_graphs=enable_cuda_graphs, |
| task=task, |
| tp_size=tp_size, |
| compute_dtype=compute_dtype, |
| fp8_recipe=fp8_recipe, |
| recompute_layers=recompute_layers, |
| activation_offload_layers=activation_offload_layers, |
| recompute_modules=recompute_modules, |
| use_fsdp_double_buffer=use_fsdp_double_buffer, |
| use_user_buffer_registration=use_user_buffer_registration, |
| use_sharp=use_sharp, |
| keep_fsdp_fp8_transpose_cache=keep_fsdp_fp8_transpose_cache, |
| ) |
|
|
| return recipe |
|
|
|
|
| def set_exp_logging_configs( |
| recipe, |
| task: str, |
| domain: str, |
| model_name: str, |
| enable_tb: bool, |
| enable_wd: bool, |
| wandb_prj_name: str, |
| wandb_job_name: str, |
| ): |
| """Set experiment logging configs.""" |
| if task == "pre_train" and domain == "llm": |
| recipe.trainer.callbacks.append( |
| run.Config( |
| FLOPsMeasurementCallback, |
| model_config=recipe.model.config, |
| data_config=recipe.data, |
| model_name=model_name, |
| ) |
| ) |
|
|
| if not enable_tb: |
| recipe.log.tensorboard = None |
| recipe.trainer.logger = False |
| else: |
| |
| recipe.log.log_dir = "/nemo_run/lightning_logs" |
| if enable_wd: |
| from nemo.collections.llm.recipes.log.default import wandb_logger |
|
|
| recipe.log.wandb = wandb_logger(project=wandb_prj_name, name=wandb_job_name) |
|
|
| |
| recipe.log.ckpt = None |
|
|
| |
| callbacks = recipe.trainer.callbacks |
| checkpoint_callback_idx = None |
| if callbacks: |
| for idx, callback in enumerate(callbacks): |
| if callback.__fn_or_cls__ == ModelCheckpoint: |
| checkpoint_callback_idx = idx |
| break |
| recipe.trainer.enable_checkpointing = checkpoint_callback_idx is not None |
| recipe.trainer.log_every_n_steps = 1 |
|
|
| return recipe |
|
|
|
|
| def args_sanity_check(args: dict) -> None: |
| """ |
| Check the sanity of argument settings |
| """ |
| if args.wandb: |
| assert args.wandb_key is not None, "wandb logger needs \"wandb_key\"" |
| assert args.wandb_prj_name is not None, "wandb logger needs \"wandb_prj_name\"" |
| assert args.wandb_job_name is not None, "wandb logger needs \"wandb_job_name\"" |
|
|