| |
| from __future__ import annotations |
|
|
| from contextvars import ContextVar |
| from typing import Any, Dict, List, Optional, Union |
|
|
| |
| |
| |
| _VERSIONS_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERSIONS", default=None) |
| _CUR_INDEX_CV: ContextVar[int] = ContextVar("CUR_INDEX", default=-1) |
| _VERS_META_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERS_META", default=None) |
|
|
| |
| _STORE: Dict[str, Any] = { |
| "versions": [], |
| "version_meta": [], |
| "cur_index": -1, |
| |
| "df_payload": None, |
| "base_df_payload": None, |
| "sota_bundled": None, |
| "df_summary": None, |
| } |
|
|
| _SOTA_BUNDLED_CV: ContextVar[Optional[list]] = ContextVar("SOTA_BUNDLED", default=None) |
| _DF_SUMMARY_CV: ContextVar[Optional[Dict[str, Any]]] = ContextVar("DF_SUMMARY", default=None) |
|
|
|
|
| |
| def _get_versions() -> List[Dict[str, Any]]: |
| return _VERSIONS_CV.get() or _STORE["versions"] |
|
|
| def _get_meta() -> List[Dict[str, Any]]: |
| return _VERS_META_CV.get() or _STORE["version_meta"] |
|
|
| def _set_versions(vers: List[Dict[str, Any]], meta: List[Dict[str, Any]], cur: int) -> None: |
| _VERSIONS_CV.set(vers) |
| _VERS_META_CV.set(meta) |
| _CUR_INDEX_CV.set(cur) |
| _STORE["versions"] = vers |
| _STORE["version_meta"] = meta |
| _STORE["cur_index"] = cur |
| |
| _STORE["df_payload"] = vers[cur] if (0 <= cur < len(vers)) else None |
| _STORE["base_df_payload"] = vers[0] if vers else None |
|
|
|
|
| |
| |
| |
| def init_dataset(p: Optional[Dict[str, Any]]) -> None: |
| """Initialize version stack with a single BASE version.""" |
| vers = [] if p is None else [p] |
| meta = [] if p is None else [dict(tag="base")] |
| cur = -1 if p is None else 0 |
| _set_versions(vers, meta, cur) |
|
|
| def set_df_payload(p: Optional[Dict[str, Any]], *, new_version: bool = True) -> None: |
| """ |
| Set CURRENT dataset. |
| - new_version=True: truncate any forward history and append p (like a new commit). |
| - new_version=False: replace the current version in place (no new snapshot). |
| """ |
| vers = list(_get_versions()) |
| meta = list(_get_meta()) |
| cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
|
|
| if cur < 0 or not vers: |
| |
| init_dataset(p) |
| return |
|
|
| if new_version: |
| |
| vers = vers[:cur + 1] |
| meta = meta[:cur + 1] |
| vers.append(p) |
| meta.append({}) |
| cur = len(vers) - 1 |
| else: |
| vers[cur] = p |
|
|
| _set_versions(vers, meta, cur) |
|
|
| def annotate_current(**kv) -> None: |
| """Attach metadata to the current version (e.g., step/params/stats).""" |
| vers = list(_get_versions()) |
| meta = list(_get_meta()) |
| cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
| if 0 <= cur < len(meta): |
| meta[cur] = {**meta[cur], **kv} |
| _set_versions(vers, meta, cur) |
|
|
|
|
| |
| |
| |
| def _resolve_index(spec: Union[str, int, None]) -> int: |
| vers = _get_versions() |
| cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
| if spec is None or spec == "current": |
| return cur |
| if spec == "base": |
| return 0 if vers else -1 |
| if spec == "prev": |
| return max(-1, cur - 1) |
| if isinstance(spec, int): |
| idx = spec if spec >= 0 else len(vers) + spec |
| return idx |
| |
| if isinstance(spec, str) and spec.startswith("@"): |
| try: |
| n = int(spec[1:]) |
| except Exception: |
| return cur |
| idx = n if n >= 0 else len(vers) + n |
| return idx |
| return cur |
|
|
| def get_df_payload(version: Union[str, int, None] = None) -> Optional[Dict[str, Any]]: |
| """Return dataset payload for the requested version (default: current).""" |
| vers = _get_versions() |
| idx = _resolve_index(version) |
| if 0 <= idx < len(vers): |
| return vers[idx] |
| |
| return _STORE["df_payload"] |
|
|
| def get_base_df_payload() -> Optional[Dict[str, Any]]: |
| return get_df_payload("base") |
|
|
| def get_prev_df_payload() -> Optional[Dict[str, Any]]: |
| return get_df_payload("prev") |
|
|
| def list_versions() -> Dict[str, Any]: |
| """Lightweight overview for debugging/UI.""" |
| vers = _get_versions() |
| meta = _get_meta() |
| cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
| return { |
| "count": len(vers), |
| "current_index": cur, |
| "has_base": bool(vers), |
| "meta": meta, |
| } |
|
|
| def reset_current_to(version: Union[str, int]) -> None: |
| """Move the current pointer to a prior version (no deletion).""" |
| vers = _get_versions() |
| meta = _get_meta() |
| idx = _resolve_index(version) |
| if 0 <= idx < len(vers): |
| _set_versions(vers, meta, idx) |
|
|
| def reset_current_to_base() -> None: |
| reset_current_to("base") |
|
|
|
|
| |
| |
| |
| def set_sota_bundled(b: Optional[list]) -> None: |
| _SOTA_BUNDLED_CV.set(b) |
| _STORE["sota_bundled"] = b |
|
|
| def get_sota_bundled() -> Optional[list]: |
| return _SOTA_BUNDLED_CV.get() or _STORE["sota_bundled"] |
|
|
| def set_df_summary(s: Optional[Dict[str, Any]]) -> None: |
| _DF_SUMMARY_CV.set(s) |
| _STORE["df_summary"] = s |
|
|
| def get_df_summary() -> Optional[Dict[str, Any]]: |
| return _DF_SUMMARY_CV.get() or _STORE["df_summary"] |
|
|