| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| import argparse |
| from typing import Optional |
|
|
| import nemo_run as run |
|
|
| from nemo.collections.llm import deploy, evaluate |
| from nemo.collections.llm.evaluation.api import ApiEndpoint, ConfigParams, EvaluationConfig, EvaluationTarget |
|
|
|
|
| ENDPOINT_TYPES = {"chat": "chat/completions/", "completions": "completions/"} |
|
|
| COMPLETIONS_TASKS = ( |
| "gsm8k", |
| "mgsm", |
| "mmlu", |
| "mmlu_pro", |
| "mmlu_redux", |
| ) |
|
|
| CHAT_TASKS = ( |
| "gpqa_diamond_cot", |
| "gsm8k_cot_instruct", |
| "ifeval", |
| "mgsm_cot", |
| "mmlu_instruct", |
| "mmlu_pro_instruct", |
| "mmlu_redux_instruct", |
| "wikilingua", |
| ) |
|
|
| EVAL_TASKS = COMPLETIONS_TASKS + CHAT_TASKS |
|
|
|
|
| def get_parser(): |
| parser = argparse.ArgumentParser(description="NeMo2.0 Evaluation") |
| parser.add_argument( |
| "--nemo_checkpoint", |
| type=str, |
| required=True, |
| help="NeMo 2.0 checkpoint to be evaluated", |
| ) |
| parser.add_argument( |
| "--triton_http_address", type=str, default="0.0.0.0", help="IP address at which PyTriton server is created" |
| ) |
| parser.add_argument("--fastapi_port", type=int, default=8080, help="Port at which FastAPI server is created") |
| parser.add_argument( |
| "--endpoint_type", |
| type=str, |
| default="completions", |
| help="Whether to use completions or chat endpoint", |
| choices=list(ENDPOINT_TYPES), |
| ) |
| parser.add_argument( |
| "--max_input_len", |
| type=int, |
| default=4096, |
| help="Max input length of the model", |
| ) |
| parser.add_argument( |
| "--tensor_parallelism_size", |
| type=int, |
| default=1, |
| help="Tensor parallelism size to deploy the model", |
| ) |
| parser.add_argument( |
| "--pipeline_parallelism_size", |
| type=int, |
| default=1, |
| help="Pipeline parallelism size to deploy the model", |
| ) |
| parser.add_argument( |
| "--batch_size", |
| type=int, |
| default=2, |
| help="Batch size for deployment and evaluation", |
| ) |
| parser.add_argument( |
| "--eval_task", |
| type=str, |
| default="mmlu", |
| help="Evaluation benchmark to run.", |
| choices=EVAL_TASKS, |
| ) |
| parser.add_argument( |
| "--limit", type=float, default=None, help="Limit evaluation to `limit` samples. Default: use all samples." |
| ) |
| parser.add_argument( |
| "--parallel_requests", |
| type=int, |
| default=1, |
| help="Number of parallel requests to send to server. Default: use default for the task.", |
| ) |
| parser.add_argument( |
| "--request_timeout", |
| type=int, |
| default=1000, |
| help="Time in seconds for the eval client. Default: 1000s", |
| ) |
| parser.add_argument( |
| "--tag", |
| type=str, |
| help="Optional tag for your experiment title which will be appended after the model/exp name.", |
| required=False, |
| default="", |
| ) |
| parser.add_argument( |
| "--dryrun", |
| action="store_true", |
| help="Do a dryrun and exit", |
| default=False, |
| ) |
| parser.add_argument( |
| "--slurm", |
| action="store_true", |
| help="Run on slurm using run.SlurmExecutor", |
| default=False, |
| ) |
| parser.add_argument('--nodes', type=int, default=1, help="Num nodes for the executor") |
| parser.add_argument('--devices', type=int, default=8, help="Num devices per node for the executor") |
| parser.add_argument( |
| '--container_image', |
| type=str, |
| default="nvcr.io/nvidia/nemo:dev", |
| help="Container image for the run, only used in case of slurm runs." |
| "Can be a path as well in case of .sqsh file.", |
| ) |
| return parser |
|
|
|
|
| def slurm_executor( |
| user: str, |
| host: str, |
| remote_job_dir: str, |
| account: str, |
| partition: str, |
| nodes: int, |
| devices: int, |
| container_image: str, |
| time: str = "04:00:00", |
| custom_mounts: Optional[list[str]] = None, |
| custom_env_vars: Optional[dict[str, str]] = None, |
| retries: int = 0, |
| ) -> run.SlurmExecutor: |
| if not (user and host and remote_job_dir and account and partition and nodes and devices): |
| raise RuntimeError( |
| "Please set user, host, remote_job_dir, account, partition, nodes and devices args for using this ", |
| "function.", |
| ) |
|
|
| mounts = [] |
| if custom_mounts: |
| mounts.extend(custom_mounts) |
|
|
| env_vars = { |
| |
| "HF_DATASETS_TRUST_REMOTE_CODE": "1", |
| "HF_TOKEN": "xxxxxx", |
| } |
| if custom_env_vars: |
| env_vars |= custom_env_vars |
|
|
| executor = run.SlurmExecutor( |
| account=account, |
| partition=partition, |
| tunnel=run.SSHTunnel( |
| user=user, |
| host=host, |
| job_dir=remote_job_dir, |
| ), |
| nodes=nodes, |
| ntasks_per_node=devices, |
| exclusive=True, |
| |
| packager=run.GitArchivePackager(), |
| ) |
|
|
| executor.container_image = container_image |
| executor.container_mounts = mounts |
| executor.env_vars = env_vars |
| executor.retries = retries |
| executor.time = time |
|
|
| return executor |
|
|
|
|
| def local_executor_torchrun() -> run.LocalExecutor: |
| env_vars = { |
| |
| "HF_DATASETS_TRUST_REMOTE_CODE": "1", |
| "HF_TOKEN": "xxxxxx", |
| } |
|
|
| executor = run.LocalExecutor(env_vars=env_vars) |
|
|
| return executor |
|
|
|
|
| def main(): |
| args = get_parser().parse_args() |
| if args.tag and not args.tag.startswith("-"): |
| args.tag = "-" + args.tag |
|
|
| exp_name = "NeMoEvaluation" |
| deploy_fn = run.Partial( |
| deploy, |
| nemo_checkpoint=args.nemo_checkpoint, |
| fastapi_port=args.fastapi_port, |
| triton_http_address=args.triton_http_address, |
| max_input_len=args.max_input_len, |
| tensor_parallelism_size=args.tensor_parallelism_size, |
| pipeline_parallelism_size=args.pipeline_parallelism_size, |
| max_batch_size=args.batch_size, |
| num_gpus=args.devices, |
| num_nodes=args.nodes, |
| ) |
|
|
| api_endpoint = run.Config( |
| ApiEndpoint, |
| url=f"http://{args.triton_http_address}:{args.fastapi_port}/v1/{ENDPOINT_TYPES[args.endpoint_type]}", |
| type=args.endpoint_type, |
| ) |
| eval_target = run.Config(EvaluationTarget, api_endpoint=api_endpoint) |
| eval_params = run.Config( |
| ConfigParams, |
| limit_samples=args.limit, |
| parallelism=args.parallel_requests, |
| request_timeout=args.request_timeout, |
| ) |
| eval_config = run.Config(EvaluationConfig, type=args.eval_task, params=eval_params) |
|
|
| eval_fn = run.Partial(evaluate, target_cfg=eval_target, eval_cfg=eval_config) |
|
|
| executor: run.Executor |
| executor_eval: run.Executor |
| if args.slurm: |
| |
| executor = slurm_executor( |
| user="", |
| host="", |
| remote_job_dir="", |
| account="", |
| partition="", |
| nodes=args.nodes, |
| devices=args.devices, |
| container_image=args.container_image, |
| custom_mounts=[], |
| ) |
| executor.srun_args = ["--mpi=pmix", "--overlap"] |
| executor_eval = executor.clone() |
| executor_eval.srun_args = ["--ntasks-per-node=1", "--nodes=1"] |
| |
| else: |
| executor = local_executor_torchrun() |
| executor_eval = None |
|
|
| with run.Experiment(f"{exp_name}{args.tag}") as exp: |
| if args.slurm: |
| exp.add( |
| [deploy_fn, eval_fn], |
| executor=[executor, executor_eval], |
| name=exp_name, |
| tail_logs=True if isinstance(executor, run.LocalExecutor) else False, |
| ) |
| else: |
| exp.add(deploy_fn, executor=executor, name=f"{exp_name}_deploy") |
| exp.add(eval_fn, executor=executor, name=f"{exp_name}_evaluate") |
|
|
| if args.dryrun: |
| exp.dryrun() |
| else: |
| exp.run() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|