Spaces:
Sleeping
Sleeping
| import inspect | |
| import itertools | |
| import os | |
| import shlex | |
| import sys | |
| from collections.abc import Callable, Iterable, Sequence | |
| from contextlib import suppress | |
| from functools import partial | |
| from typing import TYPE_CHECKING, Any, NamedTuple, get_origin | |
| from cyclopts._convert import _bool | |
| from cyclopts.annotations import resolve_optional | |
| from cyclopts.argument import Argument, ArgumentCollection | |
| from cyclopts.exceptions import ( | |
| ArgumentOrderError, | |
| CoercionError, | |
| CombinedShortOptionError, | |
| ConsumeMultipleError, | |
| CycloptsError, | |
| MissingArgumentError, | |
| RequiresEqualsError, | |
| UnknownOptionError, | |
| ValidationError, | |
| ) | |
| from cyclopts.field_info import POSITIONAL_ONLY, POSITIONAL_OR_KEYWORD | |
| from cyclopts.token import Token | |
| from cyclopts.utils import UNSET, is_option_like | |
| if sys.version_info < (3, 11): # pragma: no cover | |
| pass | |
| else: # pragma: no cover | |
| pass | |
| if TYPE_CHECKING: | |
| from cyclopts.group import Group | |
| CliToken = partial(Token, source="cli") | |
| class _KeywordMatch(NamedTuple): | |
| """Represents a matched CLI token with its corresponding argument.""" | |
| matched_token: str | |
| """The actual CLI token that was matched (e.g., '-o', '--option').""" | |
| argument: Argument | |
| """The matched Argument object.""" | |
| keys: tuple[str, ...] | |
| """Leftover keys for nested arguments.""" | |
| implicit_value: Any | |
| """Implicit value if this is a flag, otherwise UNSET.""" | |
| def normalize_tokens(tokens: None | str | Iterable[str]) -> list[str]: | |
| if tokens is None: | |
| tokens = sys.argv[1:] # Remove the executable | |
| elif isinstance(tokens, str): | |
| tokens = shlex.split(tokens) | |
| else: | |
| tokens = list(tokens) | |
| return tokens | |
| def _common_root_keys(argument_collection) -> tuple[str, ...]: | |
| if not argument_collection: | |
| return () | |
| common = argument_collection[0].keys | |
| for argument in argument_collection[1:]: | |
| if not argument.keys: | |
| return () | |
| for i, (common_key, argument_key) in enumerate(zip(common, argument.keys, strict=False)): | |
| if common_key != argument_key: | |
| if i == 0: | |
| return () | |
| common = argument.keys[:i] | |
| break | |
| common = common[: len(argument.keys)] | |
| return common | |
| def _parse_kw_and_flags( | |
| argument_collection: ArgumentCollection, | |
| tokens: Sequence[str], | |
| *, | |
| end_of_options_delimiter: str = "--", | |
| stop_at_first_unknown: bool = False, | |
| ) -> tuple[list[str], int | None]: | |
| """Extract keyword arguments and flags from the token stream. | |
| Returns | |
| ------- | |
| unused_tokens: list[str] | |
| Tokens not consumed by any keyword or flag. | |
| contiguous_positional_count: int | None | |
| Number of leading contiguous non-option tokens before the first gap | |
| caused by keyword extraction. ``None`` if all non-option tokens are | |
| contiguous (i.e. no keywords were interleaved among positional tokens). | |
| For example, given ``a b c --bar 8 --baz 10 d``, the unused tokens are | |
| ``['a', 'b', 'c', 'd']`` with original indices ``[0, 1, 2, 6]``. | |
| The gap between indices 2 and 6 yields ``contiguous_positional_count=3``. | |
| This is used by ``_parse_pos`` to prevent positional-only list parameters | |
| from consuming tokens that appeared after keyword arguments. | |
| """ | |
| unused_tokens, positional_only_tokens = [], [] | |
| unused_token_original_indices: list[int] = [] | |
| skip_next_iterations = 0 | |
| if end_of_options_delimiter: | |
| try: | |
| delimiter_index = tokens.index(end_of_options_delimiter) | |
| except ValueError: | |
| pass # end_of_options_delimiter not in token stream | |
| else: | |
| positional_only_tokens = tokens[delimiter_index:] | |
| tokens = tokens[:delimiter_index] | |
| for i, token in enumerate(tokens): | |
| # If the previous argument was a keyword, then this is its value | |
| if skip_next_iterations > 0: | |
| skip_next_iterations -= 1 | |
| continue | |
| if not is_option_like(token, allow_numbers=True): | |
| if stop_at_first_unknown: | |
| # Stop parsing and return all remaining tokens as unused | |
| unused_tokens.extend(tokens[i:]) | |
| unused_token_original_indices.extend(range(i, len(tokens))) | |
| break | |
| unused_tokens.append(token) | |
| unused_token_original_indices.append(i) | |
| continue | |
| cli_values: list[str] = [] | |
| consume_count = 0 | |
| # startswith("-") is redundant, but it's cheap safety. | |
| allow_combined_flags = token.startswith("-") and not token.startswith("--") | |
| # Try splitting on "=" for long options or short options that match exactly | |
| if "=" in token: | |
| cli_option, cli_value = token.split("=", 1) | |
| # Try to match the part before "=" | |
| try: | |
| argument_collection.match(cli_option) | |
| # Matched! Use the split | |
| cli_values.append(cli_value) | |
| consume_count -= 1 | |
| allow_combined_flags = False | |
| except ValueError: | |
| # No match - might be GNU-style like "-pfile=value" | |
| # Don't split, treat whole token as the option | |
| cli_option = token | |
| else: | |
| cli_option = token | |
| matches: list[_KeywordMatch] = [] | |
| attached_value: str | None = None # Track value attached to a GNU-style combined option | |
| try: | |
| matches.append(_KeywordMatch(cli_option, *argument_collection.match(cli_option))) | |
| except ValueError: | |
| # Length has to be greater than 2 (hyphen + character) to be exploded. | |
| # Also exclude numeric values (e.g., -10, -3.14) from combined flag parsing. | |
| if allow_combined_flags and len(token) > 2 and is_option_like(token, allow_numbers=False): | |
| # GNU-style combined short options: process left-to-right | |
| # Once we hit an option that takes a value, the rest is the value | |
| chars = cli_option.lstrip("-") | |
| position = 0 | |
| while position < len(chars): | |
| char = chars[position] | |
| test_flag = f"-{char}" | |
| try: | |
| arg, keys, implicit = argument_collection.match(test_flag) | |
| if implicit is not UNSET or arg.parameter.count: | |
| # This is a flag (boolean or counting) - consume just this character | |
| matches.append(_KeywordMatch(test_flag, arg, keys, implicit)) | |
| position += 1 | |
| else: | |
| # This option takes a value - rest of the string is the value | |
| remainder = chars[position + 1 :] | |
| matches.append(_KeywordMatch(test_flag, arg, keys, implicit)) | |
| if remainder: | |
| # Value is attached: -uroot or -fvuroot | |
| # Store it separately, will be added to cli_values when processing this match | |
| attached_value = remainder | |
| consume_count -= 1 | |
| # Stop processing further characters | |
| break | |
| except ValueError: | |
| # Unknown flag | |
| if stop_at_first_unknown: | |
| unused_tokens.extend(tokens[i:]) | |
| return unused_tokens, None | |
| unused_tokens.append(test_flag) | |
| unused_token_original_indices.append(i) | |
| position += 1 | |
| if not matches: | |
| # No valid matches found at all | |
| continue | |
| else: | |
| if stop_at_first_unknown: | |
| # Unknown option, stop parsing and return all remaining tokens | |
| unused_tokens.extend(tokens[i:]) | |
| return unused_tokens, None | |
| unused_tokens.append(token) | |
| unused_token_original_indices.append(i) | |
| continue | |
| for match_index, match in enumerate(matches): | |
| # For GNU-style combined options, add the attached value only when processing | |
| # the last match (the value-taking option), not for preceding flags | |
| if attached_value is not None and match_index == len(matches) - 1: | |
| cli_values.append(attached_value) | |
| if match.argument.parameter.count: | |
| match.argument.append(CliToken(keyword=match.matched_token, implicit_value=1)) | |
| elif match.implicit_value is not UNSET: | |
| # A flag was parsed | |
| if cli_values: | |
| try: | |
| coerced_value = _bool(cli_values[-1]) | |
| except CoercionError as e: | |
| if e.token is None: | |
| e.token = CliToken(keyword=match.matched_token) | |
| if e.argument is None: | |
| e.argument = match.argument | |
| raise | |
| if coerced_value: # --positive-flag=true or --negative-flag=true or --empty-flag=true | |
| match.argument.append( | |
| CliToken(keyword=match.matched_token, implicit_value=match.implicit_value) | |
| ) | |
| else: # --positive-flag=false or --negative-flag=false or --empty-flag=false | |
| if isinstance(match.implicit_value, bool): | |
| match.argument.append( | |
| CliToken(keyword=match.matched_token, implicit_value=not match.implicit_value) | |
| ) | |
| else: | |
| # A negative for a non-bool field doesn't really make sense; | |
| # e.g. --empty-list=False | |
| # So we'll just silently skip it, as it may make bash scripting easier. | |
| pass | |
| else: | |
| match.argument.append(CliToken(keyword=match.matched_token, implicit_value=match.implicit_value)) | |
| else: | |
| # This is a value-taking option (not a flag or counting parameter) | |
| # Error only if we're trying to combine multiple value-taking options without values | |
| # (e.g., -fu where both -f and -u take values would be invalid) | |
| # But -fu where -f is a flag and -u takes a value is valid (GNU-style) | |
| if len(matches) > 1: | |
| # Count how many value-taking options we have | |
| value_taking_count = sum( | |
| 1 for m in matches if m.implicit_value is UNSET and not m.argument.parameter.count | |
| ) | |
| if value_taking_count > 1: | |
| raise CombinedShortOptionError( | |
| msg=f"Cannot combine multiple value-taking options in token {cli_option}" | |
| ) | |
| tokens_per_element, consume_all = match.argument.token_count(match.keys) | |
| if match.argument.parameter.requires_equals and match.matched_token.startswith("--") and not cli_values: | |
| raise RequiresEqualsError( | |
| argument=match.argument, | |
| keyword=match.matched_token, | |
| ) | |
| # Consume the appropriate number of tokens | |
| # cm_bounds is either None or (min, max) — guaranteed by _consume_multiple_converter | |
| cm_bounds = match.argument.parameter.consume_multiple | |
| assert cm_bounds is None or isinstance(cm_bounds, tuple) | |
| cm_min, cm_max = cm_bounds if cm_bounds is not None else (0, None) | |
| with suppress(IndexError): | |
| if consume_all and cm_bounds is not None: | |
| for j in itertools.count(): | |
| token = tokens[i + 1 + j] | |
| if not match.argument.parameter.allow_leading_hyphen and is_option_like(token): | |
| break | |
| cli_values.append(token) | |
| skip_next_iterations += 1 | |
| else: | |
| consume_count += tokens_per_element | |
| for j in range(consume_count): | |
| if len(cli_values) == 1 and ( | |
| match.argument._should_attempt_json_dict(cli_values) | |
| or match.argument._should_attempt_json_list(cli_values, match.keys) | |
| ): | |
| tokens_per_element = 1 | |
| # Assume that the contents are json and that we shouldn't | |
| # consume any additional tokens. | |
| break | |
| token = tokens[i + 1 + j] | |
| if not match.argument.parameter.allow_leading_hyphen and is_option_like(token): | |
| raise MissingArgumentError( | |
| argument=match.argument, | |
| tokens_so_far=cli_values, | |
| keyword=match.matched_token, | |
| ) | |
| cli_values.append(token) | |
| skip_next_iterations += 1 | |
| if not cli_values: | |
| # No values were consumed after the keyword | |
| if consume_all and cm_bounds is not None: | |
| if cm_min > 0: | |
| # Minimum count not met — treat as missing argument | |
| raise ConsumeMultipleError( | |
| argument=match.argument, | |
| tokens_so_far=cli_values, | |
| keyword=match.matched_token, | |
| min_required=cm_min, | |
| max_allowed=cm_max, | |
| actual_count=0, | |
| ) | |
| # Allow empty iterables (e.g., --urls with no values behaves like --empty-urls) | |
| hint = resolve_optional(match.argument.hint) | |
| empty_container = (get_origin(hint) or hint)() | |
| match.argument.append( | |
| CliToken(keyword=match.matched_token, implicit_value=empty_container, keys=match.keys) | |
| ) | |
| else: | |
| # Non-iterables or consume_multiple=False require at least one value | |
| raise MissingArgumentError( | |
| argument=match.argument, tokens_so_far=cli_values, keyword=match.matched_token | |
| ) | |
| elif len(cli_values) % tokens_per_element: | |
| # For multi-token elements (e.g., tuples), ensure we have complete sets | |
| raise MissingArgumentError( | |
| argument=match.argument, tokens_so_far=cli_values, keyword=match.matched_token | |
| ) | |
| else: | |
| # Check min/max count for consume_multiple | |
| if cm_bounds is not None: | |
| n_elements = len(cli_values) // max(1, tokens_per_element) | |
| if n_elements < cm_min: | |
| raise ConsumeMultipleError( | |
| argument=match.argument, | |
| tokens_so_far=cli_values, | |
| keyword=match.matched_token, | |
| min_required=cm_min, | |
| max_allowed=cm_max, | |
| actual_count=n_elements, | |
| ) | |
| if cm_max is not None and n_elements > cm_max: | |
| raise ConsumeMultipleError( | |
| argument=match.argument, | |
| tokens_so_far=cli_values, | |
| keyword=match.matched_token, | |
| min_required=cm_min, | |
| max_allowed=cm_max, | |
| actual_count=n_elements, | |
| ) | |
| # Normal case: append the consumed values | |
| for index, cli_value in enumerate(cli_values): | |
| match.argument.append( | |
| CliToken(keyword=match.matched_token, value=cli_value, index=index, keys=match.keys) | |
| ) | |
| # Compute the number of contiguous positional (non-option-like) unused tokens | |
| # before the first gap caused by keyword extraction. This prevents positional-only | |
| # list parameters from consuming tokens that appeared after keyword arguments. | |
| # Only set when a gap is detected; None means no gap (all tokens are contiguous). | |
| contiguous_positional_count: int | None = None | |
| for j in range(1, len(unused_token_original_indices)): | |
| if unused_token_original_indices[j] != unused_token_original_indices[j - 1] + 1: | |
| contiguous_positional_count = j | |
| break | |
| unused_tokens.extend(positional_only_tokens) | |
| return unused_tokens, contiguous_positional_count | |
| def _future_positional_only_token_count(argument_collection: ArgumentCollection, starting_index: int) -> int: | |
| n_tokens_to_leave = 0 | |
| for i in itertools.count(): | |
| try: | |
| argument, _, _ = argument_collection.match(starting_index + i) | |
| except ValueError: | |
| break | |
| if argument.field_info.kind is not POSITIONAL_ONLY: | |
| break | |
| future_tokens_per_element, future_consume_all = argument.token_count() | |
| if future_consume_all: | |
| raise ValueError("Cannot have 2 all-consuming positional arguments.") | |
| n_tokens_to_leave += future_tokens_per_element | |
| return n_tokens_to_leave | |
| def _preprocess_positional_tokens(tokens: Sequence[str], end_of_options_delimiter: str) -> list[tuple[str, bool]]: | |
| try: | |
| delimiter_index = tokens.index(end_of_options_delimiter) | |
| return [(t, False) for t in tokens[:delimiter_index]] + [(t, True) for t in tokens[delimiter_index + 1 :]] | |
| except ValueError: # delimiter not found | |
| return [(t, False) for t in tokens] | |
| def _parse_pos( | |
| argument_collection: ArgumentCollection, | |
| tokens: list[str], | |
| *, | |
| end_of_options_delimiter: str = "--", | |
| contiguous_positional_count: int | None = None, | |
| ) -> list[str]: | |
| """Assign positional tokens to positional parameters. | |
| Parameters | |
| ---------- | |
| argument_collection: ArgumentCollection | |
| Arguments whose keyword/flag tokens have already been consumed. | |
| tokens: list[str] | |
| Unused tokens from ``_parse_kw_and_flags``. | |
| end_of_options_delimiter: str | |
| Delimiter after which all tokens are forced positional. | |
| contiguous_positional_count: int | None | |
| If not ``None``, the number of leading contiguous positional tokens | |
| that were adjacent in the original CLI input (before keyword extraction | |
| created a gap). Used to cap how many tokens a ``POSITIONAL_ONLY`` | |
| list/iterable parameter may consume, preventing it from greedily | |
| swallowing tokens that originally appeared after keyword arguments. | |
| See ``_parse_kw_and_flags`` for how this value is computed. | |
| """ | |
| prior_positional_or_keyword_supplied_as_keyword_arguments = [] | |
| if not tokens: | |
| return [] | |
| tokens_and_force_positional = _preprocess_positional_tokens(tokens, end_of_options_delimiter) | |
| for i in itertools.count(): | |
| try: | |
| argument, _, _ = argument_collection.match(i) | |
| except ValueError: | |
| break | |
| if argument.field_info.kind is POSITIONAL_OR_KEYWORD: | |
| if argument.tokens and argument.tokens[0].keyword is not None: | |
| prior_positional_or_keyword_supplied_as_keyword_arguments.append(argument) | |
| # Continue in case we hit a VAR_POSITIONAL argument. | |
| continue | |
| if prior_positional_or_keyword_supplied_as_keyword_arguments: | |
| token = tokens[0] | |
| if not argument.parameter.allow_leading_hyphen and is_option_like(token): | |
| # It's more meaningful to interpret the token as an intended option, | |
| # rather than an intended positional value for ``argument``. | |
| raise UnknownOptionError(token=CliToken(value=token), argument_collection=argument_collection) | |
| else: | |
| raise ArgumentOrderError( | |
| argument=argument, | |
| prior_positional_or_keyword_supplied_as_keyword_arguments=prior_positional_or_keyword_supplied_as_keyword_arguments, | |
| token=tokens_and_force_positional[0][0], | |
| ) | |
| tokens_per_element, consume_all = argument.token_count() | |
| tokens_per_element = max(1, tokens_per_element) | |
| if consume_all and argument.field_info.kind is POSITIONAL_ONLY: | |
| # POSITIONAL_ONLY parameters can come after a POSITIONAL_ONLY list/iterable. | |
| # This makes it easier to create programs that do something like: | |
| # $ python my-program.py input_folder/*.csv output.csv | |
| # Need to see how many tokens we need to leave for subsequent POSITIONAL_ONLY parameters. | |
| n_tokens_to_leave = _future_positional_only_token_count(argument_collection, i + 1) | |
| # Cap at the contiguous positional count to prevent consuming tokens | |
| # that appeared after keyword arguments (issue #763). | |
| if contiguous_positional_count is not None: | |
| n_tokens_to_leave = max( | |
| n_tokens_to_leave, len(tokens_and_force_positional) - contiguous_positional_count | |
| ) | |
| else: | |
| n_tokens_to_leave = 0 | |
| new_tokens = [] | |
| while (len(tokens_and_force_positional) - n_tokens_to_leave) > 0: | |
| if (len(tokens_and_force_positional) - n_tokens_to_leave) < tokens_per_element: | |
| raise MissingArgumentError( | |
| argument=argument, | |
| tokens_so_far=[x[0] for x in tokens_and_force_positional], | |
| ) | |
| for index, (token, force_positional) in enumerate(tokens_and_force_positional[:tokens_per_element]): | |
| if not force_positional and not argument.parameter.allow_leading_hyphen and is_option_like(token): | |
| raise UnknownOptionError(token=CliToken(value=token), argument_collection=argument_collection) | |
| new_tokens.append(CliToken(value=token, index=index)) | |
| tokens_and_force_positional = tokens_and_force_positional[tokens_per_element:] | |
| if not consume_all: | |
| break | |
| argument.tokens[:0] = new_tokens # Prepend the new tokens to the argument. | |
| if not tokens_and_force_positional: | |
| break | |
| return [x[0] for x in tokens_and_force_positional] | |
| def _parse_env(argument_collection: ArgumentCollection): | |
| for argument in argument_collection: | |
| if argument.tokens: | |
| # Don't check environment variables for parameters that already have values from CLI. | |
| continue | |
| assert argument.parameter.env_var is not None | |
| for env_var_name in argument.parameter.env_var: | |
| try: | |
| env_var_value = os.environ[env_var_name] | |
| except KeyError: | |
| pass | |
| else: | |
| argument.tokens.append(Token(keyword=env_var_name, value=env_var_value, source="env")) | |
| break | |
| def _bind( | |
| argument_collection: ArgumentCollection, | |
| func: Callable, | |
| ): | |
| """Bind the mapping to the function signature.""" | |
| bound = inspect.signature(func).bind_partial() | |
| for argument in argument_collection._root_arguments: | |
| if argument.value is not UNSET: | |
| bound.arguments[argument.field_info.name] = argument.value | |
| return bound | |
| def _parse_configs(argument_collection: ArgumentCollection, configs): | |
| for config in configs: | |
| # Each ``config`` is a partial that already has apps and commands provided. | |
| config(argument_collection) | |
| def _sort_group(argument_collection) -> list[tuple["Group", ArgumentCollection]]: | |
| """Sort groups into "deepest common-root-keys first" order. | |
| This is imperfect, but probably works sufficiently well for practical use-cases. | |
| """ | |
| out = {} | |
| # Sort alphabetically by group-name to enfroce some determinism. | |
| for i, group in enumerate(sorted(argument_collection.groups, key=lambda x: x.name)): | |
| group_arguments = argument_collection.filter_by(group=group) | |
| common_root_keys = _common_root_keys(group_arguments) | |
| # Add i to key so that we don't get collisions. | |
| out[(common_root_keys, i)] = (group, group_arguments.filter_by(keys_prefix=common_root_keys)) | |
| return [ga for _, ga in sorted(out.items(), reverse=True)] | |
| def create_bound_arguments( | |
| func: Callable, | |
| argument_collection: ArgumentCollection, | |
| tokens: list[str], | |
| configs: Iterable[Callable], | |
| *, | |
| end_of_options_delimiter: str = "--", | |
| ) -> tuple[inspect.BoundArguments, list[str]]: | |
| """Parse and coerce CLI tokens to match a function's signature. | |
| Parameters | |
| ---------- | |
| func: Callable | |
| Function. | |
| argument_collection: ArgumentCollection | |
| tokens: list[str] | |
| CLI tokens to parse and coerce to match ``f``'s signature. | |
| configs: Iterable[Callable] | |
| end_of_options_delimiter: str | |
| Everything after this special token is forced to be supplied as a positional argument. | |
| Returns | |
| ------- | |
| bound: inspect.BoundArguments | |
| The converted and bound positional and keyword arguments for ``f``. | |
| unused_tokens: list[str] | |
| Remaining tokens that couldn't be matched to ``f``'s signature. | |
| """ | |
| unused_tokens = tokens | |
| try: | |
| unused_tokens, contiguous_positional_count = _parse_kw_and_flags( | |
| argument_collection, unused_tokens, end_of_options_delimiter=end_of_options_delimiter | |
| ) | |
| unused_tokens = _parse_pos( | |
| argument_collection, | |
| unused_tokens, | |
| end_of_options_delimiter=end_of_options_delimiter, | |
| contiguous_positional_count=contiguous_positional_count, | |
| ) | |
| _parse_env(argument_collection) | |
| _parse_configs(argument_collection, configs) | |
| argument_collection._convert() | |
| groups_with_arguments = _sort_group(argument_collection) | |
| try: | |
| for group, group_arguments in groups_with_arguments: | |
| for validator in group.validator: # pyright: ignore | |
| validator(group_arguments) # pyright: ignore[reportOptionalCall] | |
| except (AssertionError, ValueError, TypeError) as e: | |
| raise ValidationError(exception_message=e.args[0] if e.args else "", group=group) from e # pyright: ignore | |
| for argument in argument_collection: | |
| # if a dict-like argument is missing, raise a MissingArgumentError on the first | |
| # required child (as opposed generically to the root dict-like object). | |
| if argument.parse and argument.field_info.required and not argument.keys and not argument.has_tokens: | |
| raise MissingArgumentError(argument=argument) | |
| bound = _bind(argument_collection, func) | |
| except CycloptsError as e: | |
| e.root_input_tokens = tokens | |
| e.unused_tokens = unused_tokens | |
| raise | |
| return bound, unused_tokens | |