| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Contains commands to interact with buckets via the CLI.""" |
|
|
| import json |
| import os |
| import sys |
| from datetime import datetime |
| from typing import Annotated |
|
|
| import typer |
|
|
| from huggingface_hub import logging |
| from huggingface_hub._buckets import ( |
| BUCKET_PREFIX, |
| BucketFile, |
| BucketFolder, |
| FilterMatcher, |
| _is_bucket_path, |
| _parse_bucket_path, |
| _split_bucket_id_and_prefix, |
| ) |
| from huggingface_hub.utils import ( |
| SoftTemporaryDirectory, |
| StatusLine, |
| are_progress_bars_disabled, |
| disable_progress_bars, |
| enable_progress_bars, |
| ) |
|
|
| from ._cli_utils import ( |
| FormatWithAutoOpt, |
| SearchOpt, |
| TokenOpt, |
| api_object_to_dict, |
| get_hf_api, |
| typer_factory, |
| ) |
| from ._output import OutputFormatWithAuto, out |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| buckets_cli = typer_factory(help="Commands to interact with buckets.") |
|
|
|
|
| def _is_hf_handle(path: str) -> bool: |
| return path.startswith("hf://") |
|
|
|
|
| def _parse_bucket_argument(argument: str) -> tuple[str, str]: |
| """Parse a bucket argument accepting both 'namespace/name(/prefix)' and 'hf://buckets/namespace/name(/prefix)'. |
| |
| Returns: |
| tuple: (bucket_id, prefix) where bucket_id is "namespace/bucket_name" and prefix may be empty string. |
| """ |
| if argument.startswith(BUCKET_PREFIX): |
| return _parse_bucket_path(argument) |
| try: |
| return _split_bucket_id_and_prefix(argument) |
| except ValueError: |
| raise ValueError( |
| f"Invalid bucket argument: {argument}. Must be in format namespace/bucket_name" |
| f" or {BUCKET_PREFIX}namespace/bucket_name" |
| ) |
|
|
|
|
| def _format_size(size: int | float, human_readable: bool = False) -> str: |
| """Format a size in bytes.""" |
| if not human_readable: |
| return str(size) |
|
|
| for unit in ["B", "KB", "MB", "GB", "TB"]: |
| if size < 1000: |
| if unit == "B": |
| return f"{size} {unit}" |
| return f"{size:.1f} {unit}" |
| size /= 1000 |
| return f"{size:.1f} PB" |
|
|
|
|
| def _format_mtime(mtime: datetime | None, human_readable: bool = False) -> str: |
| """Format mtime datetime to a readable date string.""" |
| if mtime is None: |
| return "" |
| if human_readable: |
| return mtime.strftime("%b %d %H:%M") |
| return mtime.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
| def _build_tree( |
| items: list[BucketFile | BucketFolder], |
| human_readable: bool = False, |
| quiet: bool = False, |
| ) -> list[str]: |
| """Build a tree representation of files and directories. |
| |
| Produces ASCII tree with size and date columns before the tree connector. |
| When quiet=True, only the tree structure is shown (no size/date). |
| |
| Args: |
| items: List of BucketFile/BucketFolder items |
| human_readable: Whether to show human-readable sizes and short dates |
| quiet: If True, show only the tree structure without sizes/dates |
| |
| Returns: |
| List of formatted tree lines |
| """ |
| |
| tree: dict = {} |
|
|
| for item in items: |
| parts = item.path.split("/") |
| current = tree |
| for part in parts[:-1]: |
| if part not in current: |
| current[part] = {"__children__": {}} |
| current = current[part]["__children__"] |
|
|
| final_part = parts[-1] |
| if isinstance(item, BucketFolder): |
| if final_part not in current: |
| current[final_part] = {"__children__": {}} |
| else: |
| current[final_part] = {"__item__": item} |
|
|
| |
| prefix_width = 0 |
| max_size_width = 0 |
| max_date_width = 0 |
| if not quiet: |
| for item in items: |
| if isinstance(item, BucketFile): |
| size_str = _format_size(item.size, human_readable) |
| max_size_width = max(max_size_width, len(size_str)) |
| date_str = _format_mtime(item.mtime, human_readable) |
| max_date_width = max(max_date_width, len(date_str)) |
| if max_size_width > 0: |
| prefix_width = max_size_width + 2 + max_date_width |
|
|
| |
| lines: list[str] = [] |
| _render_tree( |
| tree, |
| lines, |
| "", |
| prefix_width=prefix_width, |
| max_size_width=max_size_width, |
| human_readable=human_readable, |
| ) |
| return lines |
|
|
|
|
| def _render_tree( |
| node: dict, |
| lines: list[str], |
| indent: str, |
| prefix_width: int = 0, |
| max_size_width: int = 0, |
| human_readable: bool = False, |
| ) -> None: |
| """Recursively render a tree structure with size+date prefix.""" |
| items = sorted(node.items()) |
| for i, (name, value) in enumerate(items): |
| is_last = i == len(items) - 1 |
| connector = "└── " if is_last else "├── " |
|
|
| is_dir = "__children__" in value |
| children = value.get("__children__", {}) |
|
|
| if prefix_width > 0: |
| if is_dir: |
| prefix = " " * prefix_width |
| else: |
| item = value.get("__item__") |
| if item is not None: |
| size_str = _format_size(item.size, human_readable) |
| date_str = _format_mtime(item.mtime, human_readable) |
| prefix = f"{size_str:>{max_size_width}} {date_str}" |
| else: |
| prefix = " " * prefix_width |
| lines.append(f"{prefix} {indent}{connector}{name}{'/' if is_dir else ''}") |
| else: |
| lines.append(f"{indent}{connector}{name}{'/' if is_dir else ''}") |
|
|
| if children: |
| child_indent = indent + (" " if is_last else "│ ") |
| _render_tree( |
| children, |
| lines, |
| child_indent, |
| prefix_width=prefix_width, |
| max_size_width=max_size_width, |
| human_readable=human_readable, |
| ) |
|
|
|
|
| @buckets_cli.command( |
| name="create", |
| examples=[ |
| "hf buckets create my-bucket", |
| "hf buckets create user/my-bucket", |
| "hf buckets create hf://buckets/user/my-bucket", |
| "hf buckets create user/my-bucket --private", |
| "hf buckets create user/my-bucket --exist-ok", |
| ], |
| ) |
| def create( |
| bucket_id: Annotated[ |
| str, |
| typer.Argument( |
| help="Bucket ID: bucket_name, namespace/bucket_name, or hf://buckets/namespace/bucket_name", |
| ), |
| ], |
| private: Annotated[ |
| bool, |
| typer.Option( |
| "--private", |
| help="Create a private bucket.", |
| ), |
| ] = False, |
| exist_ok: Annotated[ |
| bool, |
| typer.Option( |
| "--exist-ok", |
| help="Do not raise an error if the bucket already exists.", |
| ), |
| ] = False, |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """Create a new bucket.""" |
| api = get_hf_api(token=token) |
|
|
| if bucket_id.startswith(BUCKET_PREFIX): |
| try: |
| parsed_id, prefix = _parse_bucket_argument(bucket_id) |
| except ValueError as e: |
| raise typer.BadParameter(str(e)) |
| if prefix: |
| raise typer.BadParameter( |
| f"Cannot specify a prefix for bucket creation: {bucket_id}." |
| f" Use namespace/bucket_name or {BUCKET_PREFIX}namespace/bucket_name." |
| ) |
| bucket_id = parsed_id |
|
|
| bucket_url = api.create_bucket( |
| bucket_id, |
| private=private if private else None, |
| exist_ok=exist_ok, |
| ) |
| out.result("Bucket created", handle=bucket_url.handle, url=bucket_url.url) |
|
|
|
|
| def _is_bucket_id(argument: str) -> bool: |
| """Check if argument is a bucket ID (namespace/name) vs just a namespace.""" |
| if argument.startswith(BUCKET_PREFIX): |
| path = argument[len(BUCKET_PREFIX) :] |
| else: |
| path = argument |
| return "/" in path |
|
|
|
|
| @buckets_cli.command( |
| name="list | ls", |
| examples=[ |
| "hf buckets list", |
| "hf buckets list huggingface", |
| 'hf buckets list --search "my-prefix"', |
| "hf buckets list user/my-bucket", |
| "hf buckets list user/my-bucket -R", |
| "hf buckets list user/my-bucket -h", |
| "hf buckets list user/my-bucket --tree", |
| "hf buckets list user/my-bucket --tree -h", |
| "hf buckets list hf://buckets/user/my-bucket", |
| "hf buckets list user/my-bucket/sub -R", |
| ], |
| ) |
| def list_cmd( |
| argument: Annotated[ |
| str | None, |
| typer.Argument( |
| help=( |
| "Namespace (user or org) to list buckets, or bucket ID" |
| " (namespace/bucket_name(/prefix) or hf://buckets/...) to list files." |
| ), |
| ), |
| ] = None, |
| human_readable: Annotated[ |
| bool, |
| typer.Option( |
| "--human-readable", |
| "-h", |
| help="Show sizes in human readable format.", |
| ), |
| ] = False, |
| as_tree: Annotated[ |
| bool, |
| typer.Option( |
| "--tree", |
| help="List files in tree format (only for listing files).", |
| ), |
| ] = False, |
| recursive: Annotated[ |
| bool, |
| typer.Option( |
| "--recursive", |
| "-R", |
| help="List files recursively (only for listing files).", |
| ), |
| ] = False, |
| search: SearchOpt = None, |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """List buckets or files in a bucket. |
| |
| When called with no argument or a namespace, lists buckets. |
| When called with a bucket ID (namespace/bucket_name), lists files in the bucket. |
| """ |
| |
| is_file_mode = argument is not None and _is_bucket_id(argument) |
|
|
| if is_file_mode: |
| if search is not None: |
| raise typer.BadParameter("Cannot use --search when listing files.") |
| _list_files( |
| argument=argument, |
| human_readable=human_readable, |
| as_tree=as_tree, |
| recursive=recursive, |
| token=token, |
| ) |
| else: |
| _list_buckets( |
| namespace=argument, |
| search=search, |
| human_readable=human_readable, |
| as_tree=as_tree, |
| recursive=recursive, |
| token=token, |
| ) |
|
|
|
|
| def _list_buckets( |
| namespace: str | None, |
| search: str | None, |
| human_readable: bool, |
| as_tree: bool, |
| recursive: bool, |
| token: str | None, |
| ) -> None: |
| """List buckets in a namespace.""" |
| |
| if as_tree: |
| raise typer.BadParameter("Cannot use --tree when listing buckets.") |
| if recursive: |
| raise typer.BadParameter("Cannot use --recursive when listing buckets.") |
|
|
| |
| if namespace is not None and namespace.startswith(BUCKET_PREFIX): |
| namespace = namespace[len(BUCKET_PREFIX) :] |
| |
| namespace = namespace.rstrip("/") |
|
|
| api = get_hf_api(token=token) |
| items = [ |
| { |
| "id": bucket.id, |
| "private": bucket.private, |
| "size": _format_size(bucket.size, human_readable) if human_readable else bucket.size, |
| "total_files": bucket.total_files, |
| "created_at": bucket.created_at, |
| } |
| for bucket in api.list_buckets(namespace=namespace, search=search) |
| ] |
| out.table(items, alignments={"size": "right", "total_files": "right"}) |
|
|
|
|
| def _list_files( |
| argument: str, |
| human_readable: bool, |
| as_tree: bool, |
| recursive: bool, |
| token: str | None, |
| ) -> None: |
| """List files in a bucket.""" |
| |
| if as_tree and out.mode == OutputFormatWithAuto.json: |
| raise typer.BadParameter("Cannot use --tree with --format json.") |
|
|
| api = get_hf_api(token=token) |
|
|
| try: |
| bucket_id, prefix = _parse_bucket_argument(argument) |
| except ValueError as e: |
| raise typer.BadParameter(str(e)) |
|
|
| |
| items = list( |
| api.list_bucket_tree( |
| bucket_id, |
| prefix=prefix or None, |
| recursive=recursive, |
| ) |
| ) |
|
|
| if not items: |
| out.text("(empty)") |
| return |
|
|
| has_directories = any(isinstance(item, BucketFolder) for item in items) |
|
|
| if as_tree: |
| |
| quiet = out.mode == OutputFormatWithAuto.quiet |
| for line in _build_tree(items, human_readable=human_readable, quiet=quiet): |
| print(line) |
| elif out.mode == OutputFormatWithAuto.json: |
| print(json.dumps([api_object_to_dict(item) for item in items], indent=2)) |
| elif out.mode == OutputFormatWithAuto.quiet: |
| for item in items: |
| if isinstance(item, BucketFolder): |
| print(f"{item.path}/") |
| else: |
| print(item.path) |
| else: |
| |
| for item in items: |
| if isinstance(item, BucketFolder): |
| mtime_str = _format_mtime(item.uploaded_at, human_readable) |
| print(f"{'':>12} {mtime_str:>19} {item.path}/") |
| else: |
| size_str = _format_size(item.size, human_readable) |
| mtime_str = _format_mtime(item.mtime, human_readable) |
| print(f"{size_str:>12} {mtime_str:>19} {item.path}") |
|
|
| if not recursive and has_directories: |
| out.hint("Use -R to list files recursively.") |
|
|
|
|
| @buckets_cli.command( |
| name="info", |
| examples=[ |
| "hf buckets info user/my-bucket", |
| "hf buckets info hf://buckets/user/my-bucket", |
| ], |
| ) |
| def info( |
| bucket_id: Annotated[ |
| str, |
| typer.Argument( |
| help="Bucket ID: namespace/bucket_name or hf://buckets/namespace/bucket_name", |
| ), |
| ], |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """Get info about a bucket.""" |
| api = get_hf_api(token=token) |
|
|
| try: |
| parsed_id, _ = _parse_bucket_argument(bucket_id) |
| except ValueError as e: |
| raise typer.BadParameter(str(e)) |
|
|
| bucket = api.bucket_info(parsed_id) |
| out.dict(bucket, id_key="id") |
|
|
|
|
| @buckets_cli.command( |
| name="delete", |
| examples=[ |
| "hf buckets delete user/my-bucket", |
| "hf buckets delete hf://buckets/user/my-bucket", |
| "hf buckets delete user/my-bucket --yes", |
| "hf buckets delete user/my-bucket --missing-ok", |
| ], |
| ) |
| def delete( |
| bucket_id: Annotated[ |
| str, |
| typer.Argument( |
| help="Bucket ID: namespace/bucket_name or hf://buckets/namespace/bucket_name", |
| ), |
| ], |
| yes: Annotated[ |
| bool, |
| typer.Option( |
| "--yes", |
| "-y", |
| help="Skip confirmation prompt.", |
| ), |
| ] = False, |
| missing_ok: Annotated[ |
| bool, |
| typer.Option( |
| "--missing-ok", |
| help="Do not raise an error if the bucket does not exist.", |
| ), |
| ] = False, |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """Delete a bucket. |
| |
| This deletes the entire bucket and all its contents. Use `hf buckets rm` to remove individual files. |
| """ |
| if bucket_id.startswith(BUCKET_PREFIX): |
| try: |
| parsed_id, prefix = _parse_bucket_argument(bucket_id) |
| except ValueError as e: |
| raise typer.BadParameter(str(e)) |
| if prefix: |
| raise typer.BadParameter( |
| f"Cannot specify a prefix for bucket deletion: {bucket_id}." |
| f" Use namespace/bucket_name or {BUCKET_PREFIX}namespace/bucket_name." |
| ) |
| bucket_id = parsed_id |
| elif "/" not in bucket_id: |
| raise typer.BadParameter( |
| f"Invalid bucket ID: {bucket_id}." |
| f" Must be in format namespace/bucket_name or {BUCKET_PREFIX}namespace/bucket_name." |
| ) |
|
|
| out.confirm(f"Are you sure you want to delete bucket '{bucket_id}'?", yes=yes) |
|
|
| api = get_hf_api(token=token) |
| api.delete_bucket(bucket_id, missing_ok=missing_ok) |
| out.result("Bucket deleted", bucket_id=bucket_id) |
|
|
|
|
| @buckets_cli.command( |
| name="remove | rm", |
| examples=[ |
| "hf buckets remove user/my-bucket/file.txt", |
| "hf buckets rm hf://buckets/user/my-bucket/file.txt", |
| "hf buckets rm user/my-bucket/logs/ --recursive", |
| 'hf buckets rm user/my-bucket --recursive --include "*.tmp"', |
| "hf buckets rm user/my-bucket/data/ --recursive --dry-run", |
| ], |
| ) |
| def remove( |
| argument: Annotated[ |
| str, |
| typer.Argument( |
| help=( |
| "Bucket path: namespace/bucket_name/path or hf://buckets/namespace/bucket_name/path." |
| " With --recursive, namespace/bucket_name is also accepted to target all files." |
| ), |
| ), |
| ], |
| recursive: Annotated[ |
| bool, |
| typer.Option( |
| "--recursive", |
| "-R", |
| help="Remove files recursively under the given prefix.", |
| ), |
| ] = False, |
| yes: Annotated[ |
| bool, |
| typer.Option( |
| "--yes", |
| "-y", |
| help="Skip confirmation prompt.", |
| ), |
| ] = False, |
| dry_run: Annotated[ |
| bool, |
| typer.Option( |
| "--dry-run", |
| help="Preview what would be deleted without actually deleting.", |
| ), |
| ] = False, |
| include: Annotated[ |
| list[str] | None, |
| typer.Option( |
| help="Include only files matching pattern (can specify multiple). Requires --recursive.", |
| ), |
| ] = None, |
| exclude: Annotated[ |
| list[str] | None, |
| typer.Option( |
| help="Exclude files matching pattern (can specify multiple). Requires --recursive.", |
| ), |
| ] = None, |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """Remove files from a bucket. |
| |
| To delete an entire bucket, use `hf buckets delete` instead. |
| """ |
| try: |
| bucket_id, prefix = _parse_bucket_argument(argument) |
| except ValueError as e: |
| raise typer.BadParameter(str(e)) |
|
|
| if prefix == "" and not recursive: |
| raise typer.BadParameter( |
| f"No file path specified. To remove files, provide a path" |
| f" (e.g. '{bucket_id}/FILE') or use --recursive to remove all files." |
| f" To delete the entire bucket, use `hf buckets delete {bucket_id}`." |
| ) |
|
|
| if (include or exclude) and not recursive: |
| raise typer.BadParameter("--include and --exclude require --recursive.") |
|
|
| api = get_hf_api(token=token) |
|
|
| if recursive: |
| status = StatusLine(enabled=out.mode == OutputFormatWithAuto.human) |
| status.update("Listing files from remote") |
|
|
| all_files: list[BucketFile] = [] |
| for item in api.list_bucket_tree( |
| bucket_id, |
| prefix=prefix.rstrip("/") or None, |
| recursive=True, |
| ): |
| if isinstance(item, BucketFile): |
| all_files.append(item) |
| status.update(f"Listing files from remote ({len(all_files)} files)") |
| status.done(f"Listing files from remote ({len(all_files)} files)") |
|
|
| if include or exclude: |
| matcher = FilterMatcher(include_patterns=include, exclude_patterns=exclude) |
| matched_files = [f for f in all_files if matcher.matches(f.path)] |
| else: |
| matched_files = all_files |
|
|
| file_paths = [f.path for f in matched_files] |
| total_size = sum(f.size for f in matched_files) |
| size_str = _format_size(total_size, human_readable=True) |
|
|
| if not file_paths: |
| out.text("No files to remove.") |
| return |
|
|
| count_label = f"{len(file_paths)} file(s) totaling {size_str}" |
|
|
| if not yes and not dry_run: |
| out.text("\n".join(f" {path}" for path in file_paths)) |
| out.confirm(f"Remove {count_label} from '{bucket_id}'?", yes=False) |
|
|
| if dry_run: |
| out.text("\n".join(f"delete: {BUCKET_PREFIX}{bucket_id}/{path}" for path in file_paths)) |
| out.text(f"(dry run) {count_label} would be removed.") |
| return |
|
|
| api.batch_bucket_files(bucket_id, delete=file_paths) |
| out.result( |
| f"Removed {count_label} from '{bucket_id}'", |
| bucket_id=bucket_id, |
| files_deleted=len(file_paths), |
| size=size_str, |
| ) |
|
|
| else: |
| file_path = prefix.rstrip("/") |
| if not file_path: |
| raise typer.BadParameter("File path cannot be empty.") |
|
|
| if dry_run: |
| out.text(f"delete: {BUCKET_PREFIX}{bucket_id}/{file_path}") |
| out.text("(dry run) 1 file would be removed.") |
| return |
|
|
| out.confirm(f"Remove '{file_path}' from '{bucket_id}'?", yes=yes) |
|
|
| api.batch_bucket_files(bucket_id, delete=[file_path]) |
| out.result("File removed", path=file_path, bucket_id=bucket_id) |
|
|
|
|
| @buckets_cli.command( |
| name="move", |
| examples=[ |
| "hf buckets move user/old-bucket user/new-bucket", |
| "hf buckets move user/my-bucket my-org/my-bucket", |
| "hf buckets move hf://buckets/user/old-bucket hf://buckets/user/new-bucket", |
| ], |
| ) |
| def move( |
| from_id: Annotated[ |
| str, |
| typer.Argument( |
| help="Source bucket ID: namespace/bucket_name or hf://buckets/namespace/bucket_name", |
| ), |
| ], |
| to_id: Annotated[ |
| str, |
| typer.Argument( |
| help="Destination bucket ID: namespace/bucket_name or hf://buckets/namespace/bucket_name", |
| ), |
| ], |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """Move (rename) a bucket to a new name or namespace.""" |
| |
| parsed_from_id, from_prefix = _parse_bucket_argument(from_id) |
| if from_prefix: |
| raise typer.BadParameter( |
| f"Cannot specify a prefix for bucket move: {from_id}." |
| f" Use namespace/bucket_name or {BUCKET_PREFIX}namespace/bucket_name." |
| ) |
|
|
| |
| parsed_to_id, to_prefix = _parse_bucket_argument(to_id) |
| if to_prefix: |
| raise typer.BadParameter( |
| f"Cannot specify a prefix for bucket move: {to_id}." |
| f" Use namespace/bucket_name or {BUCKET_PREFIX}namespace/bucket_name." |
| ) |
|
|
| api = get_hf_api(token=token) |
| api.move_bucket(from_id=parsed_from_id, to_id=parsed_to_id) |
| out.result("Bucket moved", from_id=parsed_from_id, to_id=parsed_to_id) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @buckets_cli.command( |
| name="sync", |
| examples=[ |
| "hf buckets sync ./data hf://buckets/user/my-bucket", |
| "hf buckets sync hf://buckets/user/my-bucket ./data", |
| "hf buckets sync ./data hf://buckets/user/my-bucket --delete", |
| 'hf buckets sync hf://buckets/user/my-bucket ./data --include "*.safetensors" --exclude "*.tmp"', |
| "hf buckets sync ./data hf://buckets/user/my-bucket --plan sync-plan.jsonl", |
| "hf buckets sync --apply sync-plan.jsonl", |
| "hf buckets sync ./data hf://buckets/user/my-bucket --dry-run", |
| "hf buckets sync ./data hf://buckets/user/my-bucket --dry-run | jq .", |
| ], |
| ) |
| def sync( |
| source: Annotated[ |
| str | None, |
| typer.Argument( |
| help="Source path: local directory or hf://buckets/namespace/bucket_name(/prefix)", |
| ), |
| ] = None, |
| dest: Annotated[ |
| str | None, |
| typer.Argument( |
| help="Destination path: local directory or hf://buckets/namespace/bucket_name(/prefix)", |
| ), |
| ] = None, |
| delete: Annotated[ |
| bool, |
| typer.Option( |
| help="Delete destination files not present in source.", |
| ), |
| ] = False, |
| ignore_times: Annotated[ |
| bool, |
| typer.Option( |
| "--ignore-times", |
| help="Skip files only based on size, ignoring modification times.", |
| ), |
| ] = False, |
| ignore_sizes: Annotated[ |
| bool, |
| typer.Option( |
| "--ignore-sizes", |
| help="Skip files only based on modification times, ignoring sizes.", |
| ), |
| ] = False, |
| plan: Annotated[ |
| str | None, |
| typer.Option( |
| help="Save sync plan to JSONL file for review instead of executing.", |
| ), |
| ] = None, |
| apply: Annotated[ |
| str | None, |
| typer.Option( |
| help="Apply a previously saved plan file.", |
| ), |
| ] = None, |
| dry_run: Annotated[ |
| bool, |
| typer.Option( |
| "--dry-run", |
| help="Print sync plan to stdout as JSONL without executing.", |
| ), |
| ] = False, |
| include: Annotated[ |
| list[str] | None, |
| typer.Option( |
| help="Include files matching pattern (can specify multiple).", |
| ), |
| ] = None, |
| exclude: Annotated[ |
| list[str] | None, |
| typer.Option( |
| help="Exclude files matching pattern (can specify multiple).", |
| ), |
| ] = None, |
| filter_from: Annotated[ |
| str | None, |
| typer.Option( |
| help="Read include/exclude patterns from file.", |
| ), |
| ] = None, |
| existing: Annotated[ |
| bool, |
| typer.Option( |
| "--existing", |
| help="Skip creating new files on receiver (only update existing files).", |
| ), |
| ] = False, |
| ignore_existing: Annotated[ |
| bool, |
| typer.Option( |
| "--ignore-existing", |
| help="Skip updating files that exist on receiver (only create new files).", |
| ), |
| ] = False, |
| verbose: Annotated[ |
| bool, |
| typer.Option( |
| "--verbose", |
| "-v", |
| help="Show detailed logging with reasoning.", |
| ), |
| ] = False, |
| quiet: Annotated[ |
| bool, |
| typer.Option( |
| "--quiet", |
| "-q", |
| help="Minimal output.", |
| ), |
| ] = False, |
| token: TokenOpt = None, |
| ) -> None: |
| """Sync files between local directory and a bucket.""" |
| api = get_hf_api(token=token) |
| api.sync_bucket( |
| source=source, |
| dest=dest, |
| delete=delete, |
| ignore_times=ignore_times, |
| ignore_sizes=ignore_sizes, |
| existing=existing, |
| ignore_existing=ignore_existing, |
| include=include, |
| exclude=exclude, |
| filter_from=filter_from, |
| plan=plan, |
| apply=apply, |
| dry_run=dry_run, |
| verbose=verbose, |
| quiet=quiet, |
| ) |
| if plan and not quiet: |
| out.hint(f"Run `hf buckets sync --apply {plan}` to execute this plan.") |
|
|
|
|
| |
| |
| |
|
|
|
|
| @buckets_cli.command( |
| name="cp", |
| examples=[ |
| "hf buckets cp hf://buckets/user/my-bucket/config.json", |
| "hf buckets cp hf://buckets/user/my-bucket/config.json ./data/", |
| "hf buckets cp hf://buckets/user/my-bucket/config.json my-config.json", |
| "hf buckets cp hf://buckets/user/my-bucket/config.json -", |
| "hf buckets cp my-config.json hf://buckets/user/my-bucket", |
| "hf buckets cp my-config.json hf://buckets/user/my-bucket/logs/", |
| "hf buckets cp my-config.json hf://buckets/user/my-bucket/remote-config.json", |
| "hf buckets cp - hf://buckets/user/my-bucket/config.json", |
| "hf buckets cp hf://buckets/user/my-bucket/logs/ hf://buckets/user/archive-bucket/logs/", |
| "hf buckets cp hf://datasets/user/my-dataset/processed/ hf://buckets/user/my-bucket/dataset/processed/", |
| ], |
| ) |
| def cp( |
| src: Annotated[ |
| str, typer.Argument(help="Source: local file, any hf:// handle (model, dataset, bucket), or - for stdin") |
| ], |
| dst: Annotated[ |
| str | None, typer.Argument(help="Destination: local path, bucket hf://... handle, or - for stdout") |
| ] = None, |
| token: TokenOpt = None, |
| format: FormatWithAutoOpt = OutputFormatWithAuto.auto, |
| ) -> None: |
| """Copy files to or from buckets.""" |
| api = get_hf_api(token=token) |
|
|
| src_is_hf = _is_hf_handle(src) |
| dst_is_hf = dst is not None and _is_hf_handle(dst) |
| src_is_bucket = _is_bucket_path(src) |
| dst_is_bucket = dst is not None and _is_bucket_path(dst) |
| src_is_stdin = src == "-" |
| dst_is_stdout = dst == "-" |
|
|
| |
| if src_is_hf and dst_is_hf: |
| try: |
| api.copy_files(src, dst) |
| except ValueError as e: |
| raise typer.BadParameter(str(e)) |
|
|
| out.result("Copied", src=src, dst=dst) |
| return |
|
|
| |
| |
| if not src_is_bucket and not dst_is_bucket and not src_is_stdin: |
| if dst is None: |
| raise typer.BadParameter("Missing destination. Provide a bucket path as DST.") |
| raise typer.BadParameter("One of SRC or DST must be a bucket path (hf://buckets/...).") |
|
|
| if src_is_stdin and not dst_is_bucket: |
| raise typer.BadParameter("Stdin upload requires a bucket destination.") |
|
|
| if src_is_stdin and dst_is_bucket: |
| _, prefix = _parse_bucket_path(dst) |
| if prefix == "" or prefix.endswith("/"): |
| raise typer.BadParameter("Stdin upload requires a full destination path including filename.") |
|
|
| if dst_is_stdout and not src_is_bucket: |
| raise typer.BadParameter("Cannot pipe to stdout for uploads.") |
|
|
| if not src_is_bucket and not src_is_stdin and os.path.isdir(src): |
| raise typer.BadParameter("Source must be a file, not a directory. Use `hf buckets sync` for directories.") |
|
|
| |
| if src_is_bucket: |
| |
| bucket_id, prefix = _parse_bucket_path(src) |
| if prefix == "" or prefix.endswith("/"): |
| raise typer.BadParameter("Source path must include a file name, not just a bucket or directory path.") |
| filename = prefix.rsplit("/", 1)[-1] |
|
|
| if dst_is_stdout: |
| |
| |
| pbar_was_disabled = are_progress_bars_disabled() |
| if not pbar_was_disabled: |
| disable_progress_bars() |
| try: |
| with SoftTemporaryDirectory() as tmp_dir: |
| tmp_path = os.path.join(tmp_dir, filename) |
| api.download_bucket_files(bucket_id, [(prefix, tmp_path)]) |
| with open(tmp_path, "rb") as f: |
| while chunk := f.read(32_000_000): |
| sys.stdout.buffer.write(chunk) |
| finally: |
| if not pbar_was_disabled: |
| enable_progress_bars() |
| else: |
| |
| if dst is None: |
| local_path = filename |
| elif os.path.isdir(dst) or dst.endswith(os.sep) or dst.endswith("/"): |
| local_path = os.path.join(dst, filename) |
| else: |
| local_path = dst |
|
|
| |
| parent_dir = os.path.dirname(local_path) |
| if parent_dir: |
| os.makedirs(parent_dir, exist_ok=True) |
|
|
| api.download_bucket_files(bucket_id, [(prefix, local_path)]) |
| out.result("Downloaded", src=src, dst=local_path) |
|
|
| elif src_is_stdin: |
| |
| bucket_id, remote_path = _parse_bucket_path(dst) |
| data = sys.stdin.buffer.read() |
|
|
| api.batch_bucket_files(bucket_id, add=[(data, remote_path)]) |
| out.result("Uploaded", src="stdin", dst=dst) |
|
|
| else: |
| |
| if not os.path.isfile(src): |
| raise typer.BadParameter(f"Source file not found: {src}") |
|
|
| bucket_id, prefix = _parse_bucket_path(dst) |
|
|
| if prefix == "": |
| remote_path = os.path.basename(src) |
| elif prefix.endswith("/"): |
| remote_path = prefix + os.path.basename(src) |
| else: |
| remote_path = prefix |
|
|
| api.batch_bucket_files(bucket_id, add=[(src, remote_path)]) |
| out.result("Uploaded", src=src, dst=f"{BUCKET_PREFIX}{bucket_id}/{remote_path}") |
|
|