ycwhencpp's picture
Sync repo: updated train_grpo notebook for training run
5e9fb2f verified
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains commands to interact with repositories on the Hugging Face Hub.
Usage:
# create a new dataset repo on the Hub
hf repos create my-cool-dataset --repo-type=dataset
# create a private model repo on the Hub
hf repos create my-cool-model --private
# delete files from a repo on the Hub
hf repos delete-files my-model file.txt
"""
import enum
from typing import Annotated
import typer
from huggingface_hub import SpaceHardware, SpaceStorage
from huggingface_hub.errors import CLIError, HfHubHTTPError, RepositoryNotFoundError, RevisionNotFoundError
from ._cli_utils import (
EnvFileOpt,
EnvOpt,
FormatWithAutoOpt,
PrivateOpt,
RepoIdArg,
RepoType,
RepoTypeOpt,
RevisionOpt,
SecretsFileOpt,
SecretsOpt,
TokenOpt,
VolumesOpt,
env_map_to_key_value_list,
get_hf_api,
parse_env_map,
parse_volumes,
typer_factory,
)
from ._output import OutputFormatWithAuto, out
repos_cli = typer_factory(help="Manage repos on the Hub.")
@repos_cli.callback(invoke_without_command=True)
def _repos_callback(ctx: typer.Context) -> None:
if ctx.info_name == "repo":
out.warning("`hf repo` is deprecated in favor of `hf repos`.")
tag_cli = typer_factory(help="Manage tags for a repo on the Hub.")
branch_cli = typer_factory(help="Manage branches for a repo on the Hub.")
repos_cli.add_typer(tag_cli, name="tag")
repos_cli.add_typer(branch_cli, name="branch")
class GatedChoices(str, enum.Enum):
auto = "auto"
manual = "manual"
false = "false"
PublicOpt = Annotated[
bool | None,
typer.Option(
"--public",
help="Whether to make the repo public. Ignored if the repo already exists.",
),
]
ProtectedOpt = Annotated[
bool | None,
typer.Option(
"--protected",
help="Whether to make the Space protected (Spaces only). Ignored if the repo already exists.",
),
]
SpaceHardwareOpt = Annotated[
SpaceHardware | None,
typer.Option(
"--flavor",
help="Space hardware flavor (e.g. 'cpu-basic', 't4-medium', 'l4x4'). Only for Spaces.",
),
]
SpaceStorageOpt = Annotated[
SpaceStorage | None,
typer.Option(
"--storage",
help="(Deprecated, use volumes instead) Space persistent storage tier ('small', 'medium', or 'large'). Only for Spaces.",
),
]
SpaceSleepTimeOpt = Annotated[
int | None,
typer.Option(
"--sleep-time",
help="Seconds of inactivity before the Space is put to sleep. Use -1 to disable. Only for Spaces.",
),
]
@repos_cli.command(
"create",
examples=[
"hf repos create my-model",
"hf repos create my-dataset --repo-type dataset --private",
"hf repos create my-space --type space --space-sdk gradio --flavor t4-medium --secrets HF_TOKEN -e THEME=dark --protected",
"hf repos create my-space --type space --space-sdk gradio -v hf://gpt2:/models -v hf://buckets/org/b:/data",
],
)
def repo_create(
repo_id: RepoIdArg,
repo_type: RepoTypeOpt = RepoType.model,
space_sdk: Annotated[
str | None,
typer.Option(
help="Hugging Face Spaces SDK type. Required when --type is set to 'space'.",
),
] = None,
private: PrivateOpt = None,
public: PublicOpt = None,
protected: ProtectedOpt = None,
token: TokenOpt = None,
exist_ok: Annotated[
bool,
typer.Option(
help="Do not raise an error if repo already exists.",
),
] = False,
resource_group_id: Annotated[
str | None,
typer.Option(
help="Resource group in which to create the repo. Resource groups is only available for Enterprise Hub organizations.",
),
] = None,
hardware: SpaceHardwareOpt = None,
storage: SpaceStorageOpt = None,
sleep_time: SpaceSleepTimeOpt = None,
secrets: SecretsOpt = None,
secrets_file: SecretsFileOpt = None,
env: EnvOpt = None,
env_file: EnvFileOpt = None,
volume: VolumesOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Create a new repo on the Hub."""
api = get_hf_api(token=token)
repo_url = api.create_repo(
repo_id=repo_id,
repo_type=repo_type.value,
visibility="private" if private else "public" if public else "protected" if protected else None, # type: ignore [arg-type]
token=token,
exist_ok=exist_ok,
resource_group_id=resource_group_id,
space_sdk=space_sdk,
space_hardware=hardware,
space_storage=storage,
space_sleep_time=sleep_time,
space_secrets=env_map_to_key_value_list(parse_env_map(secrets, secrets_file)),
space_variables=env_map_to_key_value_list(parse_env_map(env, env_file)),
space_volumes=parse_volumes(volume),
)
out.result("Repo created", repo_id=repo_url.repo_id, url=str(repo_url))
@repos_cli.command(
"duplicate",
examples=[
"hf repos duplicate openai/gdpval --type dataset",
"hf repos duplicate multimodalart/dreambooth-training my-dreambooth --type space --flavor l4x4 --secrets HF_TOKEN --private",
"hf repos duplicate org/my-space my-space --type space -v hf://gpt2:/models -v hf://buckets/org/b:/data",
],
)
def repo_duplicate(
from_id: RepoIdArg,
to_id: Annotated[
str | None,
typer.Argument(
help="Destination repo ID (e.g. `myorg/my-copy`). Defaults to your namespace with the same repo name.",
),
] = None,
repo_type: RepoTypeOpt = RepoType.model,
private: PrivateOpt = None,
public: PublicOpt = None,
protected: ProtectedOpt = None,
token: TokenOpt = None,
exist_ok: Annotated[
bool,
typer.Option(
help="Do not raise an error if repo already exists.",
),
] = False,
hardware: SpaceHardwareOpt = None,
storage: SpaceStorageOpt = None,
sleep_time: SpaceSleepTimeOpt = None,
secrets: SecretsOpt = None,
secrets_file: SecretsFileOpt = None,
env: EnvOpt = None,
env_file: EnvFileOpt = None,
volume: VolumesOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Duplicate a repo on the Hub (model, dataset, or Space)."""
api = get_hf_api(token=token)
repo_url = api.duplicate_repo(
from_id=from_id,
to_id=to_id,
repo_type=repo_type.value,
visibility="private" if private else "public" if public else "protected" if protected else None, # type: ignore [arg-type]
token=token,
exist_ok=exist_ok,
space_hardware=hardware,
space_storage=storage,
space_sleep_time=sleep_time,
space_secrets=env_map_to_key_value_list(parse_env_map(secrets, secrets_file)),
space_variables=env_map_to_key_value_list(parse_env_map(env, env_file)),
space_volumes=parse_volumes(volume),
)
out.result("Repo duplicated", from_id=from_id, to_id=repo_url.repo_id, url=str(repo_url))
@repos_cli.command("delete", examples=["hf repos delete my-model"])
def repo_delete(
repo_id: RepoIdArg,
repo_type: RepoTypeOpt = RepoType.model,
token: TokenOpt = None,
missing_ok: Annotated[
bool,
typer.Option(
help="If set to True, do not raise an error if repo does not exist.",
),
] = False,
yes: Annotated[
bool,
typer.Option(
"-y",
"--yes",
help="Answer Yes to prompt automatically.",
),
] = False,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Delete a repo from the Hub. This is an irreversible operation."""
out.confirm(f"You are about to permanently delete {repo_type.value} '{repo_id}'. Proceed?", yes=yes)
api = get_hf_api(token=token)
api.delete_repo(
repo_id=repo_id,
repo_type=repo_type.value,
missing_ok=missing_ok,
)
out.result("Repo deleted", repo_id=repo_id)
@repos_cli.command("move", examples=["hf repos move old-namespace/my-model new-namespace/my-model"])
def repo_move(
from_id: RepoIdArg,
to_id: RepoIdArg,
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Move a repository from a namespace to another namespace."""
api = get_hf_api(token=token)
api.move_repo(
from_id=from_id,
to_id=to_id,
repo_type=repo_type.value,
)
out.result("Repo moved", from_id=from_id, to_id=to_id)
@repos_cli.command(
"settings",
examples=[
"hf repos settings my-model --private",
"hf repos settings my-model --gated auto",
"hf repos settings my-space --repo-type space --protected",
],
)
def repo_settings(
repo_id: RepoIdArg,
gated: Annotated[
GatedChoices | None,
typer.Option(
help="The gated status for the repository.",
),
] = None,
private: PrivateOpt = None,
public: PublicOpt = None,
protected: ProtectedOpt = None,
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Update the settings of a repository."""
api = get_hf_api(token=token)
api.update_repo_settings(
repo_id=repo_id,
gated=(None if gated is None else False if gated is GatedChoices.false else gated.value),
visibility="private" if private else "public" if public else "protected" if protected else None, # type: ignore [arg-type]
repo_type=repo_type.value,
)
out.result("Repo settings updated", repo_id=repo_id)
@repos_cli.command(
"delete-files",
examples=[
"hf repos delete-files my-model file.txt",
'hf repos delete-files my-model "*.json"',
"hf repos delete-files my-model folder/",
],
)
def repo_delete_files(
repo_id: RepoIdArg,
patterns: Annotated[
list[str],
typer.Argument(
help="Glob patterns to match files to delete. Based on fnmatch, '*' matches files recursively.",
),
],
repo_type: RepoTypeOpt = RepoType.model,
revision: RevisionOpt = None,
commit_message: Annotated[
str | None,
typer.Option(
help="The summary / title / first line of the generated commit.",
),
] = None,
commit_description: Annotated[
str | None,
typer.Option(
help="The description of the generated commit.",
),
] = None,
create_pr: Annotated[
bool,
typer.Option(
help="Whether to create a new Pull Request for these changes.",
),
] = False,
token: TokenOpt = None,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Delete files from a repo on the Hub."""
api = get_hf_api(token=token)
url = api.delete_files(
delete_patterns=patterns,
repo_id=repo_id,
repo_type=repo_type.value,
revision=revision,
commit_message=commit_message,
commit_description=commit_description,
create_pr=create_pr,
)
out.result("Files deleted", repo_id=repo_id, commit_url=url)
@branch_cli.command(
"create",
examples=[
"hf repos branch create my-model dev",
"hf repos branch create my-model dev --revision abc123",
],
)
def branch_create(
repo_id: RepoIdArg,
branch: Annotated[
str,
typer.Argument(
help="The name of the branch to create.",
),
],
revision: RevisionOpt = None,
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
exist_ok: Annotated[
bool,
typer.Option(
help="If set to True, do not raise an error if branch already exists.",
),
] = False,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Create a new branch for a repo on the Hub."""
api = get_hf_api(token=token)
api.create_branch(
repo_id=repo_id,
branch=branch,
revision=revision,
repo_type=repo_type.value,
exist_ok=exist_ok,
)
out.result("Branch created", branch=branch, repo_type=repo_type.value, repo_id=repo_id)
@branch_cli.command("delete", examples=["hf repos branch delete my-model dev"])
def branch_delete(
repo_id: RepoIdArg,
branch: Annotated[
str,
typer.Argument(
help="The name of the branch to delete.",
),
],
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Delete a branch from a repo on the Hub."""
api = get_hf_api(token=token)
api.delete_branch(
repo_id=repo_id,
branch=branch,
repo_type=repo_type.value,
)
out.result("Branch deleted", branch=branch, repo_type=repo_type.value, repo_id=repo_id)
@tag_cli.command(
"create",
examples=[
"hf repos tag create my-model v1.0",
'hf repos tag create my-model v1.0 -m "First release"',
],
)
def tag_create(
repo_id: RepoIdArg,
tag: Annotated[
str,
typer.Argument(
help="The name of the tag to create.",
),
],
message: Annotated[
str | None,
typer.Option(
"-m",
"--message",
help="The description of the tag to create.",
),
] = None,
revision: RevisionOpt = None,
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Create a tag for a repo."""
repo_type_str = repo_type.value
api = get_hf_api(token=token)
try:
api.create_tag(repo_id=repo_id, tag=tag, tag_message=message, revision=revision, repo_type=repo_type_str)
except RepositoryNotFoundError as e:
raise CLIError(f"{repo_type_str.capitalize()} '{repo_id}' not found.") from e
except RevisionNotFoundError as e:
raise CLIError(f"Revision '{revision}' not found.") from e
except HfHubHTTPError as e:
if e.response.status_code == 409:
raise CLIError(f"Tag '{tag}' already exists on '{repo_id}'.") from e
raise
out.result("Tag created", tag=tag, repo_type=repo_type_str, repo_id=repo_id)
@tag_cli.command("list | ls", examples=["hf repos tag list my-model"])
def tag_list(
repo_id: RepoIdArg,
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""List tags for a repo."""
repo_type_str = repo_type.value
api = get_hf_api(token=token)
try:
refs = api.list_repo_refs(repo_id=repo_id, repo_type=repo_type_str)
except RepositoryNotFoundError as e:
raise CLIError(f"{repo_type_str.capitalize()} '{repo_id}' not found.") from e
items = [{"name": t.name, "target_commit": t.target_commit, "ref": t.ref} for t in refs.tags]
out.table(items)
@tag_cli.command("delete", examples=["hf repos tag delete my-model v1.0"])
def tag_delete(
repo_id: RepoIdArg,
tag: Annotated[
str,
typer.Argument(
help="The name of the tag to delete.",
),
],
yes: Annotated[
bool,
typer.Option(
"-y",
"--yes",
help="Answer Yes to prompt automatically",
),
] = False,
token: TokenOpt = None,
repo_type: RepoTypeOpt = RepoType.model,
format: FormatWithAutoOpt = OutputFormatWithAuto.auto,
) -> None:
"""Delete a tag for a repo."""
repo_type_str = repo_type.value
out.text(f"You are about to delete tag {tag} on {repo_type_str} {repo_id}")
out.confirm("Proceed?", yes=yes)
api = get_hf_api(token=token)
try:
api.delete_tag(repo_id=repo_id, tag=tag, repo_type=repo_type_str)
except RepositoryNotFoundError as e:
raise CLIError(f"{repo_type_str.capitalize()} '{repo_id}' not found.") from e
except RevisionNotFoundError as e:
raise CLIError(f"Tag '{tag}' not found on '{repo_id}'.") from e
out.result("Tag deleted", tag=tag, repo_type=repo_type_str, repo_id=repo_id)