ycwhencpp's picture
Sync repo: updated train_grpo notebook for training run
5e9fb2f verified
"""CLI commands for Hugging Face Inference Endpoints."""
from typing import Annotated
import typer
from huggingface_hub._inference_endpoints import InferenceEndpointScalingMetric
from huggingface_hub.errors import HfHubHTTPError
from ._cli_utils import FormatWithAutoOpt, TokenOpt, get_hf_api, typer_factory
from ._output import OutputFormatWithAuto, out
ie_cli = typer_factory(help="Manage Hugging Face Inference Endpoints.")
catalog_app = typer_factory(help="Interact with the Inference Endpoints catalog.")
NameArg = Annotated[
str,
typer.Argument(help="Endpoint name."),
]
NameOpt = Annotated[
str | None,
typer.Option(help="Endpoint name."),
]
NamespaceOpt = Annotated[
str | None,
typer.Option(
help="The namespace associated with the Inference Endpoint. Defaults to the current user's namespace.",
),
]
@ie_cli.command("list | ls", examples=["hf endpoints ls", "hf endpoints ls --namespace my-org"])
def ls(
namespace: NamespaceOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Lists all Inference Endpoints for the given namespace."""
api = get_hf_api(token=token)
try:
endpoints = api.list_inference_endpoints(namespace=namespace, token=token)
except HfHubHTTPError as error:
out.error(f"Listing failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
results = []
for endpoint in endpoints:
raw = endpoint.raw
status = raw.get("status", {})
model = raw.get("model", {})
compute = raw.get("compute", {})
provider = raw.get("provider", {})
results.append(
{
"name": raw.get("name", ""),
"model": model.get("repository", "") if isinstance(model, dict) else "",
"status": status.get("state", "") if isinstance(status, dict) else "",
"task": model.get("task", "") if isinstance(model, dict) else "",
"framework": model.get("framework", "") if isinstance(model, dict) else "",
"instance": compute.get("instanceType", "") if isinstance(compute, dict) else "",
"vendor": provider.get("vendor", "") if isinstance(provider, dict) else "",
"region": provider.get("region", "") if isinstance(provider, dict) else "",
}
)
out.table(results, id_key="name")
@ie_cli.command(name="deploy", examples=["hf endpoints deploy my-endpoint --repo gpt2 --framework pytorch ..."])
def deploy(
name: NameArg,
repo: Annotated[
str,
typer.Option(
help="The name of the model repository associated with the Inference Endpoint (e.g. 'openai/gpt-oss-120b').",
),
],
framework: Annotated[
str,
typer.Option(
help="The machine learning framework used for the model (e.g. 'vllm').",
),
],
accelerator: Annotated[
str,
typer.Option(
help="The hardware accelerator to be used for inference (e.g. 'cpu').",
),
],
instance_size: Annotated[
str,
typer.Option(
help="The size or type of the instance to be used for hosting the model (e.g. 'x4').",
),
],
instance_type: Annotated[
str,
typer.Option(
help="The cloud instance type where the Inference Endpoint will be deployed (e.g. 'intel-icl').",
),
],
region: Annotated[
str,
typer.Option(
help="The cloud region in which the Inference Endpoint will be created (e.g. 'us-east-1').",
),
],
vendor: Annotated[
str,
typer.Option(
help="The cloud provider or vendor where the Inference Endpoint will be hosted (e.g. 'aws').",
),
],
*,
namespace: NamespaceOpt = None,
task: Annotated[
str | None,
typer.Option(
help="The task on which to deploy the model (e.g. 'text-classification').",
),
] = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
min_replica: Annotated[
int,
typer.Option(
help="The minimum number of replicas (instances) to keep running for the Inference Endpoint.",
),
] = 1,
max_replica: Annotated[
int,
typer.Option(
help="The maximum number of replicas (instances) to scale to for the Inference Endpoint.",
),
] = 1,
scale_to_zero_timeout: Annotated[
int | None,
typer.Option(
help="The duration in minutes before an inactive endpoint is scaled to zero.",
),
] = None,
scaling_metric: Annotated[
InferenceEndpointScalingMetric | None,
typer.Option(
help="The metric reference for scaling.",
),
] = None,
scaling_threshold: Annotated[
float | None,
typer.Option(
help="The scaling metric threshold used to trigger a scale up. Ignored when scaling metric is not provided.",
),
] = None,
) -> None:
"""Deploy an Inference Endpoint from a Hub repository."""
api = get_hf_api(token=token)
endpoint = api.create_inference_endpoint(
name=name,
repository=repo,
framework=framework,
accelerator=accelerator,
instance_size=instance_size,
instance_type=instance_type,
region=region,
vendor=vendor,
namespace=namespace,
task=task,
token=token,
min_replica=min_replica,
max_replica=max_replica,
scaling_metric=scaling_metric,
scaling_threshold=scaling_threshold,
scale_to_zero_timeout=scale_to_zero_timeout,
)
out.dict(endpoint.raw)
@catalog_app.command(name="deploy", examples=["hf endpoints catalog deploy --repo meta-llama/Llama-3.2-1B-Instruct"])
def deploy_from_catalog(
repo: Annotated[
str,
typer.Option(
help="The name of the model repository associated with the Inference Endpoint (e.g. 'openai/gpt-oss-120b').",
),
],
name: NameOpt = None,
accelerator: Annotated[
str | None,
typer.Option(
help="The hardware accelerator to be used for inference (e.g. 'cpu', 'gpu', 'neuron').",
),
] = None,
namespace: NamespaceOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Deploy an Inference Endpoint from the Model Catalog."""
api = get_hf_api(token=token)
try:
endpoint = api.create_inference_endpoint_from_catalog(
repo_id=repo,
name=name,
accelerator=accelerator,
namespace=namespace,
token=token,
)
except HfHubHTTPError as error:
out.error(f"Deployment failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict(endpoint.raw)
def list_catalog(
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""List available Catalog models."""
api = get_hf_api(token=token)
try:
models = api.list_inference_catalog(token=token)
except HfHubHTTPError as error:
out.error(f"Catalog fetch failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict({"models": models})
catalog_app.command(name="list | ls", examples=["hf endpoints catalog ls"])(list_catalog)
ie_cli.command(name="list-catalog", hidden=True)(list_catalog)
ie_cli.add_typer(catalog_app, name="catalog")
@ie_cli.command(examples=["hf endpoints describe my-endpoint"])
def describe(
name: NameArg,
namespace: NamespaceOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Get information about an existing endpoint."""
api = get_hf_api(token=token)
try:
endpoint = api.get_inference_endpoint(name=name, namespace=namespace, token=token)
except HfHubHTTPError as error:
out.error(f"Fetch failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict(endpoint.raw)
@ie_cli.command(examples=["hf endpoints update my-endpoint --min-replica 2"])
def update(
name: NameArg,
namespace: NamespaceOpt = None,
repo: Annotated[
str | None,
typer.Option(
help="The name of the model repository associated with the Inference Endpoint (e.g. 'openai/gpt-oss-120b').",
),
] = None,
accelerator: Annotated[
str | None,
typer.Option(
help="The hardware accelerator to be used for inference (e.g. 'cpu').",
),
] = None,
instance_size: Annotated[
str | None,
typer.Option(
help="The size or type of the instance to be used for hosting the model (e.g. 'x4').",
),
] = None,
instance_type: Annotated[
str | None,
typer.Option(
help="The cloud instance type where the Inference Endpoint will be deployed (e.g. 'intel-icl').",
),
] = None,
framework: Annotated[
str | None,
typer.Option(
help="The machine learning framework used for the model (e.g. 'custom').",
),
] = None,
revision: Annotated[
str | None,
typer.Option(
help="The specific model revision to deploy on the Inference Endpoint (e.g. '6c0e6080953db56375760c0471a8c5f2929baf11').",
),
] = None,
task: Annotated[
str | None,
typer.Option(
help="The task on which to deploy the model (e.g. 'text-classification').",
),
] = None,
min_replica: Annotated[
int | None,
typer.Option(
help="The minimum number of replicas (instances) to keep running for the Inference Endpoint.",
),
] = None,
max_replica: Annotated[
int | None,
typer.Option(
help="The maximum number of replicas (instances) to scale to for the Inference Endpoint.",
),
] = None,
scale_to_zero_timeout: Annotated[
int | None,
typer.Option(
help="The duration in minutes before an inactive endpoint is scaled to zero.",
),
] = None,
scaling_metric: Annotated[
InferenceEndpointScalingMetric | None,
typer.Option(
help="The metric reference for scaling.",
),
] = None,
scaling_threshold: Annotated[
float | None,
typer.Option(
help="The scaling metric threshold used to trigger a scale up. Ignored when scaling metric is not provided.",
),
] = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Update an existing endpoint."""
api = get_hf_api(token=token)
try:
endpoint = api.update_inference_endpoint(
name=name,
namespace=namespace,
repository=repo,
framework=framework,
revision=revision,
task=task,
accelerator=accelerator,
instance_size=instance_size,
instance_type=instance_type,
min_replica=min_replica,
max_replica=max_replica,
scale_to_zero_timeout=scale_to_zero_timeout,
scaling_metric=scaling_metric,
scaling_threshold=scaling_threshold,
token=token,
)
except HfHubHTTPError as error:
out.error(f"Update failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict(endpoint.raw)
@ie_cli.command(examples=["hf endpoints delete my-endpoint"])
def delete(
name: NameArg,
namespace: NamespaceOpt = None,
yes: Annotated[
bool,
typer.Option("--yes", help="Skip confirmation prompts."),
] = False,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Delete an Inference Endpoint permanently."""
out.confirm(f"Delete endpoint '{name}'?", yes=yes)
api = get_hf_api(token=token)
try:
api.delete_inference_endpoint(name=name, namespace=namespace, token=token)
except HfHubHTTPError as error:
out.error(f"Delete failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.result(f"Deleted '{name}'.", name=name)
@ie_cli.command(examples=["hf endpoints pause my-endpoint"])
def pause(
name: NameArg,
namespace: NamespaceOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Pause an Inference Endpoint."""
api = get_hf_api(token=token)
try:
endpoint = api.pause_inference_endpoint(name=name, namespace=namespace, token=token)
except HfHubHTTPError as error:
out.error(f"Pause failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict(endpoint.raw)
@ie_cli.command(examples=["hf endpoints resume my-endpoint"])
def resume(
name: NameArg,
namespace: NamespaceOpt = None,
fail_if_already_running: Annotated[
bool,
typer.Option(
"--fail-if-already-running",
help="If `True`, the method will raise an error if the Inference Endpoint is already running.",
),
] = False,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Resume an Inference Endpoint."""
api = get_hf_api(token=token)
try:
endpoint = api.resume_inference_endpoint(
name=name,
namespace=namespace,
token=token,
running_ok=not fail_if_already_running,
)
except HfHubHTTPError as error:
out.error(f"Resume failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict(endpoint.raw)
@ie_cli.command(examples=["hf endpoints scale-to-zero my-endpoint"])
def scale_to_zero(
name: NameArg,
namespace: NamespaceOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
token: TokenOpt = None,
) -> None:
"""Scale an Inference Endpoint to zero."""
api = get_hf_api(token=token)
try:
endpoint = api.scale_to_zero_inference_endpoint(name=name, namespace=namespace, token=token)
except HfHubHTTPError as error:
out.error(f"Scale To Zero failed: {error}")
raise typer.Exit(code=error.response.status_code) from error
out.dict(endpoint.raw)