| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| import argparse |
| import os |
| from typing import Optional |
|
|
| import nemo_run as run |
|
|
| import nemo.lightning as nl |
| from nemo.collections import llm |
| from nemo.collections.common.tokenizers.huggingface.auto_tokenizer import AutoTokenizer |
| from nemo.collections.llm.gpt.data.hf_dataset import SquadHFDataModule |
| from nemo.utils import logging |
|
|
| |
| DATA_PATH = '' |
|
|
|
|
| def get_parser(): |
| parser = argparse.ArgumentParser(description="NeMo2.0 Pretraining") |
| parser.add_argument('--model', default='nvidia/Llama-3_3-Nemotron-Super-49B-v1') |
| parser.add_argument('--nodes', type=int, default=4) |
| parser.add_argument('--devices', type=int, default=8) |
| parser.add_argument('--max-steps', type=int, default=200) |
| parser.add_argument( |
| "--tag", |
| type=str, |
| help="Optional tag for your experiment title which will be appended after the model/exp name.", |
| required=False, |
| default="", |
| ) |
| parser.add_argument( |
| "--dryrun", |
| action="store_true", |
| help="Do a dryrun and exit", |
| default=False, |
| ) |
| parser.add_argument( |
| "--slurm", |
| action="store_true", |
| help="Run on slurm using run.SlurmExecutor", |
| default=False, |
| ) |
| parser.add_argument( |
| "--hf-token", |
| type=str, |
| help="Huggingface token for downloading models", |
| required=False, |
| default=None, |
| ) |
| return parser |
|
|
|
|
| def slurm_executor( |
| user: str, |
| host: str, |
| remote_job_dir: str, |
| account: str, |
| partition: str, |
| nodes: int, |
| devices: int, |
| time: str = "04:00:00", |
| custom_mounts: Optional[list[str]] = None, |
| custom_env_vars: Optional[dict[str, str]] = None, |
| container_image: str = "nvcr.io/nvidia/nemo:25.02", |
| retries: int = 0, |
| ) -> run.SlurmExecutor: |
| if not (user and host and remote_job_dir and account and partition and nodes and devices): |
| raise RuntimeError( |
| "Please set user, host, remote_job_dir, account, partition, nodes and devices args for using this ", |
| "function.", |
| ) |
|
|
| mounts = [] |
| if custom_mounts: |
| mounts.extend(custom_mounts) |
|
|
| env_vars = { |
| "TRANSFORMERS_OFFLINE": "0", |
| "TORCH_NCCL_AVOID_RECORD_STREAMS": "1", |
| "NCCL_NVLS_ENABLE": "0", |
| "NVTE_DP_AMAX_REDUCE_INTERVAL": "0", |
| "NVTE_ASYNC_AMAX_REDUCTION": "1", |
| } |
| if custom_env_vars: |
| env_vars |= custom_env_vars |
|
|
| executor = run.SlurmExecutor( |
| account=account, |
| partition=partition, |
| tunnel=run.SSHTunnel( |
| user=user, |
| host=host, |
| job_dir=remote_job_dir, |
| ), |
| nodes=nodes, |
| ntasks_per_node=devices, |
| gpus_per_node=devices, |
| mem="0", |
| exclusive=True, |
| gres="gpu:8", |
| packager=run.GitArchivePackager(), |
| ) |
|
|
| executor.container_image = container_image |
| executor.container_mounts = mounts |
| executor.env_vars = env_vars |
| executor.retries = retries |
| executor.time = time |
|
|
| return executor |
|
|
|
|
| def local_executor_torchrun(nodes: int = 1, devices: int = 2) -> run.LocalExecutor: |
| env_vars = { |
| "TRANSFORMERS_OFFLINE": "0", |
| "TORCH_NCCL_AVOID_RECORD_STREAMS": "1", |
| "NCCL_NVLS_ENABLE": "0", |
| "NVTE_DP_AMAX_REDUCE_INTERVAL": "0", |
| "NVTE_ASYNC_AMAX_REDUCTION": "1", |
| "NVTE_FUSED_ATTN": "0", |
| } |
|
|
| executor = run.LocalExecutor(ntasks_per_node=devices, launcher="torchrun", env_vars=env_vars) |
|
|
| return executor |
|
|
|
|
| def main(): |
| args = get_parser().parse_args() |
| if args.tag and not args.tag.startswith("-"): |
| args.tag = "-" + args.tag |
|
|
| exp_name = "HFAutoModelForCausalLM" |
|
|
| |
| recipe = llm.hf_auto_model_for_causal_lm.finetune_recipe( |
| model_name=args.model, |
| name=exp_name, |
| num_nodes=args.nodes, |
| num_gpus_per_node=args.devices, |
| peft_scheme='none', |
| dir="/nemo_run/checkpoints", |
| max_steps=args.max_steps, |
| trust_remote_code=True, |
| attn_implementation='eager', |
| ) |
|
|
| recipe.trainer.val_check_interval = 50 |
|
|
| tokenizer = llm.HFAutoModelForCausalLM.configure_tokenizer(args.model) |
| recipe.data = run.Config( |
| SquadHFDataModule, |
| path_or_dataset=DATA_PATH, |
| split="train[:100]", |
| pad_token_id=tokenizer.tokenizer.eos_token_id, |
| tokenizer=run.Config(AutoTokenizer, pretrained_model_name=args.model), |
| ) |
|
|
| recipe.trainer.strategy = run.Config( |
| nl.FSDP2Strategy, |
| data_parallel_size=1, |
| tensor_parallel_size=1, |
| context_parallel_size=32, |
| ) |
| recipe.trainer.plugins = None |
|
|
| if args.hf_token is not None: |
| os.environ["HF_TOKEN"] = args.hf_token |
|
|
| executor: run.Executor |
|
|
| if args.slurm: |
| if args.hf_token: |
| custom_env_vars = { |
| "HF_TOKEN": args.hf_token, |
| } |
| elif os.environ.get("HF_TOKEN"): |
| custom_env_vars = { |
| "HF_TOKEN": os.environ["HF_TOKEN"], |
| } |
| else: |
| custom_env_vars = {} |
| logging.info("No HF_TOKEN provided, gated repos may be inaccessible.") |
|
|
| |
| executor = slurm_executor( |
| user="", |
| host="", |
| remote_job_dir="", |
| account="", |
| partition="", |
| nodes=recipe.trainer.num_nodes, |
| devices=recipe.trainer.devices, |
| custom_mounts=[], |
| custom_env_vars=custom_env_vars, |
| ) |
| else: |
| executor = local_executor_torchrun(nodes=recipe.trainer.num_nodes, devices=recipe.trainer.devices) |
|
|
| with run.Experiment(f"{exp_name}{args.tag}") as exp: |
| for i in range(1): |
| exp.add( |
| recipe, |
| executor=executor, |
| name=exp_name, |
| tail_logs=True if isinstance(executor, run.LocalExecutor) else False, |
| ) |
|
|
| if args.dryrun: |
| exp.dryrun() |
| else: |
| exp.run(sequential=True, detach=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|