Spaces:
Sleeping
Sleeping
File size: 6,235 Bytes
53dbcc1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | from collections.abc import Sequence
from contextlib import contextmanager
from itertools import chain
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
from cyclopts.group_extractors import inverse_groups_from_app
from cyclopts.parameter import Parameter
if TYPE_CHECKING:
from cyclopts.core import App
V = TypeVar("V")
class AppStack:
def __init__(self, app):
# the ``stack`` is guaranteed to have the self-referencing app at the top of the stack.
self.stack: list[list[App]] = [[app]]
# Stack of overrides passed to parse_args/call that should be propagated
self.overrides_stack: list[dict[str, Any]] = [{}]
@contextmanager
def __call__(self, apps: Sequence["App"] | Sequence[str], overrides: dict[str, Any] | None = None):
# set `overrides` default-values with current overrides so that they properly propagate down the call-stack.
overrides = self.overrides | (overrides or {})
self.overrides_stack.append(overrides or {})
if not apps:
try:
yield
finally:
self.overrides_stack.pop()
return
# Convert strings to Apps if needed
if isinstance(apps[0], str):
str_apps = cast(Sequence[str], apps)
_, apps_tuple, _ = self.stack[0][0].parse_commands(str_apps, include_parent_meta=True)
resolved_apps: list[App] = list(apps_tuple)
else:
resolved_apps = cast(list["App"], list(apps))
del apps
if not resolved_apps:
try:
yield
finally:
self.overrides_stack.pop()
return
so_far = []
app_ids = {id(app) for app in resolved_apps}
for app in resolved_apps:
if app._meta_parent is None:
# Do not include the prior meta-app.
while so_far and so_far[-1]._meta_parent is not None:
so_far.pop()
so_far.append(app)
app.app_stack.stack.append(so_far.copy())
# Also push the overrides onto this app's stack
app.app_stack.overrides_stack.append(overrides or {})
# Also traverse the app's meta app
meta_app = app
while (meta_app := meta_app._meta) is not None:
if id(meta_app) in app_ids:
# It will be handled conventionally
continue
meta_subapps = so_far.copy()
meta_subapps.append(meta_app)
meta_app.app_stack.stack.append(meta_subapps)
# Also push the overrides onto the meta app's stack
meta_app.app_stack.overrides_stack.append(overrides or {})
try:
yield
finally:
for app in resolved_apps:
app.app_stack.stack.pop()
app.app_stack.overrides_stack.pop()
# Also pop from meta apps
meta_app = app
while (meta_app := meta_app._meta) is not None:
if id(meta_app) in app_ids:
continue
meta_app.app_stack.stack.pop()
meta_app.app_stack.overrides_stack.pop()
# Pop overrides from stack
self.overrides_stack.pop()
@property
def overrides(self) -> dict:
out = {}
for overrides_frame in reversed(self.overrides_stack):
for key, value in overrides_frame.items():
if value is not None:
out.setdefault(key, value)
return out
@property
def default_parameter(self) -> Parameter:
"""default_parameter has special resolution since it needs to include the command groups in the derivation."""
cparams = []
for child_app in chain.from_iterable(self.stack):
if child_app._meta_parent:
continue
cparams.extend([group.default_parameter for group in child_app.app_stack.command_groups])
cparams.append(child_app.default_parameter)
return Parameter.combine(*cparams)
@property
def current_frame(self) -> list["App"]:
if not self.stack:
raise ValueError
return self.stack[-1]
@overload
def resolve(self, attribute: str) -> Any: ...
@overload
def resolve(self, attribute: str, override: V) -> V: ...
@overload
def resolve(self, attribute: str, override: V | None, fallback: V) -> V: ...
@overload
def resolve(self, attribute: str, override: V | None = None, *, fallback: V) -> V: ...
def resolve(self, attribute: str, override: V | None = None, fallback: V | None = None) -> V | None:
"""Resolve an attribute from the App hierarchy."""
if override is not None:
return override
# Check if we have a stored override from parent invocations (most recent first)
for overrides_frame in reversed(self.overrides_stack):
if attribute in overrides_frame:
value = overrides_frame[attribute]
if value is not None:
return value
# `reversed` so that "closer" apps have higher priority.
for app in reversed(list(chain.from_iterable(self.stack))):
result = getattr(app, attribute)
if result is not None:
return result
# Check parenting meta app(s)
meta_app = app
while (meta_app := meta_app._meta_parent) is not None:
result = getattr(meta_app, attribute)
if result is not None:
return result
return fallback
@property
def command_groups(self) -> list:
command_app = self.current_frame[-1]
try:
current_app: App | None = self.current_frame[-2]
except IndexError:
current_app = None
while current_app is not None:
try:
return next(x for x in inverse_groups_from_app(current_app) if x[0] is command_app)[1]
except StopIteration:
current_app = current_app._meta_parent
return []
|