| |
| """Backup and restore OpenClaw state to/from Hugging Face Dataset.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import datetime as dt |
| import fcntl |
| import hashlib |
| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import tarfile |
| import tempfile |
| import time |
| import urllib.request |
| import uuid |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
|
|
| def load_env_files(): |
| """Load environment variables from OpenClaw env files. |
| |
| Note: HF Space variables (already in os.environ) take priority over env files. |
| This function only fills in missing variables from env files. |
| """ |
| env_files = [ |
| "/etc/profile.d/openclaw-env.sh", |
| "/root/.env.d/openclaw-backup.env", |
| ] |
|
|
| for env_file in env_files: |
| if os.path.isfile(env_file): |
| try: |
| result = subprocess.run( |
| ["bash", "-c", f"source '{env_file}' 2>/dev/null && env"], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| if result.returncode == 0: |
| for line in result.stdout.strip().split("\n"): |
| if "=" in line: |
| key, _, value = line.partition("=") |
| if key and key not in os.environ: |
| os.environ[key] = value |
| except Exception: |
| pass |
|
|
|
|
| def env_get_with_hf_priority(name: str, default: str = "") -> str: |
| """Get environment variable with HF Space variable taking priority. |
| |
| Priority: |
| 1. Existing os.environ value (HF Space variable) |
| 2. /etc/profile.d/openclaw-env.sh |
| 3. /root/.env.d/openclaw-backup.env |
| 4. default value |
| |
| Args: |
| name: Environment variable name |
| default: Default value if not found anywhere |
| |
| Returns: |
| The environment variable value |
| """ |
| if name in os.environ and os.environ[name]: |
| return os.environ[name] |
|
|
| env_files = [ |
| "/etc/profile.d/openclaw-env.sh", |
| "/root/.env.d/openclaw-backup.env", |
| ] |
|
|
| for env_file in env_files: |
| if os.path.isfile(env_file): |
| try: |
| result = subprocess.run( |
| ["bash", "-c", f"source '{env_file}' 2>/dev/null && echo ${name}"], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| if result.returncode == 0: |
| value = result.stdout.strip() |
| if value: |
| return value |
| except Exception: |
| pass |
|
|
| return default |
|
|
| |
| |
| os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "300") |
| os.environ.setdefault("HF_HUB_ETAG_TIMEOUT", "60") |
|
|
| try: |
| from huggingface_hub import HfApi, hf_hub_download |
| from huggingface_hub.utils import HfHubHTTPError |
| except ModuleNotFoundError: |
| HfApi = None |
| hf_hub_download = None |
|
|
| class HfHubHTTPError(Exception): |
| """Fallback error when huggingface_hub is not installed.""" |
|
|
|
|
| def env_bool(name: str, default: bool) -> bool: |
| raw = os.getenv(name) |
| if raw is None: |
| return default |
| return raw.strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def utc_now() -> dt.datetime: |
| return dt.datetime.now(dt.UTC) |
|
|
|
|
| |
| METADATA_VERSION = "2.1" |
| MIN_SUPPORTED_METADATA_VERSION = "1.0" |
|
|
|
|
| def parse_metadata_version(version_str: str) -> tuple[int, int]: |
| """解析元数据版本字符串为版本号元组。""" |
| try: |
| parts = version_str.split(".") |
| major = int(parts[0]) |
| minor = int(parts[1]) if len(parts) > 1 else 0 |
| return (major, minor) |
| except (ValueError, IndexError): |
| return (1, 0) |
|
|
|
|
| def is_metadata_compatible(version_str: str) -> bool: |
| """检查元数据版本是否兼容当前版本。 |
| |
| 兼容性规则: |
| - 目标主版本 <= 当前主版本:向后兼容(旧备份可以用新版本恢复) |
| - 目标主版本 > 当前主版本:不兼容(新备份不能用旧版本恢复,需升级) |
| - 次版本在支持范围内 |
| """ |
| current = parse_metadata_version(METADATA_VERSION) |
| target = parse_metadata_version(version_str) |
| min_supported = parse_metadata_version(MIN_SUPPORTED_METADATA_VERSION) |
|
|
| |
| if target[0] > current[0]: |
| return False |
|
|
| |
| |
| if target[0] < current[0]: |
| return min_supported[1] <= target[1] |
|
|
| |
| return min_supported[1] <= target[1] <= current[1] |
|
|
|
|
| @dataclass |
| class BackupConfig: |
| dataset_repo: str |
| state_dir: Path |
| backup_source_dir: Path |
| work_dir: Path |
| metadata_dir: Path |
| repo_type: str |
| path_prefix: str |
| private: bool |
| |
| restore_dataset_repo: Optional[str] = None |
| backup_npm_enabled: bool = True |
| restore_npm_enabled: bool = True |
| |
| incremental: bool = True |
| incremental_interval_minutes: int = 10 |
| compression_level: int = 6 |
| split_size: str = "" |
| size_warning_mb: int = 1500 |
| |
| dynamic_backup: bool = True |
| dynamic_small_threshold_mb: int = 500 |
| dynamic_medium_threshold_mb: int = 2000 |
| dynamic_high_change_rate: int = 10 |
| dynamic_low_change_rate: int = 2 |
| dynamic_min_changed_files: int = 5 |
| dynamic_min_changed_size_kb: int = 100 |
| |
| full_backup_interval_hours: int = 2 |
| max_incremental_backups: int = 10 |
| root_config_dir: Path = Path("/root/.config").resolve() |
| root_codex_dir: Path = Path("/root/.codex").resolve() |
| root_claude_dir: Path = Path("/root/.claude").resolve() |
| root_agents_dir: Path = Path("/root/.agents").resolve() |
| root_ssh_dir: Path = Path("/root/.ssh").resolve() |
| root_env_dir: Path = Path("/root/.env.d").resolve() |
| root_npm_dir: Path = Path("/root/.npm").resolve() |
| root_lark_cli_dir: Path = Path("/root/.lark-cli").resolve() |
| extra_dirs: dict[str, Path] = field(default_factory=dict) |
| extra_files: dict[str, Path] = field(default_factory=dict) |
| keep_count: int = 48 |
| |
| keep_local_backup: bool = False |
| local_backup_dir: Optional[Path] = None |
| keep_local_count: int = 5 |
| |
| encryption_enabled: bool = False |
| encryption_password: Optional[str] = None |
|
|
| @classmethod |
| def from_env(cls) -> "BackupConfig": |
| |
| load_env_files() |
|
|
| |
| dataset_repo = env_get_with_hf_priority("OPENCLAW_BACKUP_DATASET_REPO") |
| if not dataset_repo: |
| raise ValueError("OPENCLAW_BACKUP_DATASET_REPO is not set") |
|
|
| |
| |
| restore_dataset_repo = env_get_with_hf_priority("OPENCLAW_RESTORE_DATASET_REPO", dataset_repo) |
| backup_npm_enabled = env_get_with_hf_priority("OPENCLAW_BACKUP_NPM_ENABLED", "true").lower() in ("true", "1", "yes") |
| restore_npm_enabled = env_get_with_hf_priority("OPENCLAW_RESTORE_NPM_ENABLED", "true").lower() in ("true", "1", "yes") |
| |
|
|
| |
| encryption_enabled = env_bool("OPENCLAW_BACKUP_ENCRYPTION_ENABLED", False) |
| encryption_password = os.getenv("OPENCLAW_BACKUP_ENCRYPTION_PASSWORD") or None |
|
|
| state_dir = Path(os.getenv("OPENCLAW_STATE_DIR", "/root/.openclaw")).resolve() |
| backup_source_raw = (os.getenv("OPENCLAW_BACKUP_SOURCE_DIR") or "").strip() |
| if backup_source_raw: |
| backup_source_dir = Path(backup_source_raw).resolve() |
| else: |
| backup_source_dir = state_dir |
| root_config_dir = Path( |
| (os.getenv("OPENCLAW_BACKUP_ROOT_CONFIG_DIR") or "/root/.config").strip() or "/root/.config" |
| ).resolve() |
| root_codex_dir = Path( |
| (os.getenv("OPENCLAW_BACKUP_ROOT_CODEX_DIR") or "/root/.codex").strip() or "/root/.codex" |
| ).resolve() |
| root_claude_dir = Path( |
| (os.getenv("OPENCLAW_BACKUP_ROOT_CLAUDE_DIR") or "/root/.claude").strip() or "/root/.claude" |
| ).resolve() |
| root_agents_dir = Path( |
| (os.getenv("OPENCLAW_BACKUP_ROOT_AGENTS_DIR") or "/root/.agents").strip() or "/root/.agents" |
| ).resolve() |
| root_ssh_dir = Path((os.getenv("OPENCLAW_BACKUP_ROOT_SSH_DIR") or "/root/.ssh").strip() or "/root/.ssh").resolve() |
| root_env_dir = Path((os.getenv("OPENCLAW_BACKUP_ROOT_ENV_DIR") or "/root/.env.d").strip() or "/root/.env.d").resolve() |
| root_npm_dir = Path((os.getenv("OPENCLAW_BACKUP_ROOT_NPM_DIR") or "/root/.npm").strip() or "/root/.npm").resolve() |
|
|
| root_lark_cli_dir = Path( |
| (os.getenv("OPENCLAW_BACKUP_ROOT_LARK_CLI_DIR") or "/root/.lark-cli").strip() or "/root/.lark-cli" |
| ).resolve() |
| work_dir = Path(os.getenv("OPENCLAW_BACKUP_WORK_DIR", "/tmp/openclaw-backup")).resolve() |
| metadata_dir = Path(os.getenv("OPENCLAW_BACKUP_METADATA_DIR", "/root/.backup-info")).resolve() |
| repo_type = (os.getenv("OPENCLAW_BACKUP_REPO_TYPE") or "dataset").strip() or "dataset" |
| path_prefix = (os.getenv("OPENCLAW_BACKUP_PATH_PREFIX") or "backups").strip("/") |
| private = env_bool("OPENCLAW_BACKUP_PRIVATE", True) |
| keep_count = 48 |
| keep_count_raw = (os.getenv("OPENCLAW_BACKUP_KEEP_COUNT") or "").strip() |
| if keep_count_raw: |
| try: |
| parsed_keep_count = int(keep_count_raw) |
| if parsed_keep_count >= 1: |
| keep_count = parsed_keep_count |
| else: |
| print( |
| f"invalid OPENCLAW_BACKUP_KEEP_COUNT={keep_count_raw!r} (must be >= 1), using default 48", |
| file=sys.stderr, |
| ) |
| except ValueError: |
| print( |
| f"invalid OPENCLAW_BACKUP_KEEP_COUNT={keep_count_raw!r} (not a number), using default 48", |
| file=sys.stderr, |
| ) |
|
|
| |
| |
| extra_dirs: dict[str, Path] = {} |
| extra_dirs_raw = (os.getenv("OPENCLAW_BACKUP_EXTRA_DIRS") or "").strip() |
| if extra_dirs_raw: |
| for item in extra_dirs_raw.split(","): |
| item = item.strip() |
| if not item: |
| continue |
| if ":" not in item: |
| print( |
| f"invalid extra dir entry (missing ':'): {item!r}, skipping", |
| file=sys.stderr, |
| ) |
| continue |
| archive_name, dir_path = item.split(":", 1) |
| archive_name = archive_name.strip() |
| dir_path = dir_path.strip() |
| if not archive_name or not dir_path: |
| print( |
| f"invalid extra dir entry (empty name or path): {item!r}, skipping", |
| file=sys.stderr, |
| ) |
| continue |
| extra_dirs[archive_name] = Path(dir_path).resolve() |
|
|
| |
| |
| extra_files: dict[str, Path] = {} |
| extra_files_raw = (os.getenv("OPENCLAW_BACKUP_EXTRA_FILES") or "").strip() |
| if extra_files_raw: |
| for item in extra_files_raw.split(","): |
| item = item.strip() |
| if not item: |
| continue |
| if ":" not in item: |
| print( |
| f"invalid extra file entry (missing ':'): {item!r}, skipping", |
| file=sys.stderr, |
| ) |
| continue |
| archive_name, file_path = item.split(":", 1) |
| archive_name = archive_name.strip() |
| file_path = file_path.strip() |
| if not archive_name or not file_path: |
| print( |
| f"invalid extra file entry (empty name or path): {item!r}, skipping", |
| file=sys.stderr, |
| ) |
| continue |
| extra_files[archive_name] = Path(file_path).resolve() |
|
|
| return cls( |
| dataset_repo=dataset_repo, |
| state_dir=state_dir, |
| backup_source_dir=backup_source_dir, |
| work_dir=work_dir, |
| metadata_dir=metadata_dir, |
| repo_type=repo_type, |
| path_prefix=path_prefix, |
| private=private, |
| incremental=env_bool("OPENCLAW_INCREMENTAL_BACKUP", True), |
| incremental_interval_minutes=int(os.getenv("OPENCLAW_INCREMENTAL_INTERVAL_MINUTES", "5")), |
| compression_level=int(os.getenv("OPENCLAW_BACKUP_COMPRESSION_LEVEL", "6")), |
| split_size=(os.getenv("OPENCLAW_BACKUP_SPLIT_SIZE") or "").strip(), |
| size_warning_mb=int(os.getenv("OPENCLAW_BACKUP_SIZE_WARNING_MB", "1500")), |
| |
| dynamic_backup=env_bool("OPENCLAW_DYNAMIC_BACKUP", True), |
| dynamic_small_threshold_mb=int(os.getenv("OPENCLAW_DYNAMIC_SMALL_THRESHOLD_MB", "500")), |
| dynamic_medium_threshold_mb=int(os.getenv("OPENCLAW_DYNAMIC_MEDIUM_THRESHOLD_MB", "2000")), |
| dynamic_high_change_rate=int(os.getenv("OPENCLAW_DYNAMIC_HIGH_CHANGE_RATE", "10")), |
| dynamic_low_change_rate=int(os.getenv("OPENCLAW_DYNAMIC_LOW_CHANGE_RATE", "2")), |
| dynamic_min_changed_files=int(os.getenv("OPENCLAW_DYNAMIC_MIN_CHANGED_FILES", "5")), |
| dynamic_min_changed_size_kb=int(os.getenv("OPENCLAW_DYNAMIC_MIN_CHANGED_SIZE_KB", "100")), |
| full_backup_interval_hours=int(os.getenv("OPENCLAW_FULL_BACKUP_INTERVAL_HOURS", "2")), |
| max_incremental_backups=int(os.getenv("OPENCLAW_MAX_INCREMENTAL_BACKUPS", "10")), |
| root_config_dir=root_config_dir, |
| root_codex_dir=root_codex_dir, |
| root_claude_dir=root_claude_dir, |
| root_agents_dir=root_agents_dir, |
| root_ssh_dir=root_ssh_dir, |
| root_env_dir=root_env_dir, |
| root_npm_dir=root_npm_dir, |
| root_lark_cli_dir=root_lark_cli_dir, |
| extra_dirs=extra_dirs, |
| extra_files=extra_files, |
| keep_count=keep_count, |
| |
| restore_dataset_repo=restore_dataset_repo, |
| backup_npm_enabled=backup_npm_enabled, |
| restore_npm_enabled=restore_npm_enabled, |
| |
| encryption_enabled=encryption_enabled, |
| encryption_password=encryption_password, |
| ) |
|
|
|
|
| class OpenClawBackup: |
| _ARCHIVE_NAME_RE = re.compile(r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?(?:\.part-[a-z]{2})?$") |
|
|
| def __init__(self, config: BackupConfig, api: Optional[Any] = None, token: Optional[str] = None) -> None: |
| self.config = config |
| self.token = self._resolve_token(token) |
| config.metadata_dir.mkdir(parents=True, exist_ok=True) |
| self._meta_cache: dict[str, dict] = {} |
| self._meta_file_cache: dict[str, Path] = {} |
| if api is not None: |
| self.api = api |
| return |
| if HfApi is None: |
| raise ModuleNotFoundError( |
| "huggingface_hub is required. Install with: pip install 'huggingface_hub[cli]'" |
| ) |
| self.api = HfApi(token=self.token) |
|
|
| def _resolve_token(self, explicit_token: Optional[str] = None) -> Optional[str]: |
| """Resolve HF token with proper fallback hierarchy. |
| |
| Priority: |
| 1. Explicitly passed token (from constructor) |
| 2. HF_TOKEN from os.environ (HF Space variables, highest priority) |
| 3. HUGGINGFACE_HUB_TOKEN from os.environ |
| 4. HF_TOKEN from env files (/etc/profile.d/openclaw-env.sh, /root/.env.d/openclaw-backup.env) |
| 5. HUGGINGFACE_HUB_TOKEN from env files |
| 6. None (no token available) |
| """ |
| if explicit_token: |
| return explicit_token |
|
|
| for env_var in ("HF_TOKEN", "HUGGINGFACE_HUB_TOKEN"): |
| token = os.getenv(env_var) |
| if token: |
| return token |
|
|
| env_files = [ |
| "/etc/profile.d/openclaw-env.sh", |
| "/root/.env.d/openclaw-backup.env", |
| ] |
| for env_file in env_files: |
| if os.path.isfile(env_file): |
| try: |
| result = subprocess.run( |
| ["bash", "-c", f"source '{env_file}' 2>/dev/null && env"], |
| capture_output=True, |
| text=True, |
| timeout=10, |
| ) |
| if result.returncode == 0: |
| for line in result.stdout.strip().split("\n"): |
| if "=" in line: |
| key, _, value = line.partition("=") |
| if key in ("HF_TOKEN", "HUGGINGFACE_HUB_TOKEN") and value: |
| return value |
| except Exception: |
| pass |
| return None |
|
|
| @staticmethod |
| def _calculate_checksum(file_path: Path, algorithm: str = "sha256") -> str: |
| """Calculate checksum for a file. |
| |
| Enterprise-grade data integrity verification. |
| """ |
| hash_obj = hashlib.new(algorithm) |
| with open(file_path, "rb") as f: |
| for chunk in iter(lambda: f.read(8192), b""): |
| hash_obj.update(chunk) |
| return hash_obj.hexdigest() |
|
|
| def _verify_backup_integrity(self, archive_path: Path, is_split: bool = False) -> dict: |
| """Verify backup archive integrity. |
| |
| For split volumes, only checks existence and size (cannot validate tar structure of partial file). |
| |
| Returns a dictionary with verification results. |
| """ |
| result = { |
| "valid": False, |
| "checksum": None, |
| "size": 0, |
| "error": None, |
| } |
| |
| try: |
| |
| if not archive_path.exists(): |
| result["error"] = "Archive does not exist" |
| return result |
| |
| |
| result["size"] = archive_path.stat().st_size |
| if result["size"] == 0: |
| result["error"] = "Archive is empty" |
| return result |
| |
| |
| result["checksum"] = self._calculate_checksum(archive_path) |
| |
| |
| if not is_split: |
| |
| with tarfile.open(archive_path, "r:gz") as tar: |
| |
| members = tar.getmembers() |
| if not members: |
| result["error"] = "Archive is empty (no members)" |
| return result |
|
|
| |
| print(f" Archive contains {len(members)} entries") |
| for m in members[:10]: |
| print(f" - {m.name}") |
| if len(members) > 10: |
| print(f" ... and {len(members) - 10} more entries") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| result["valid"] = True |
| |
| except tarfile.TarError as e: |
| result["error"] = f"Invalid tar archive: {e}" |
| except Exception as e: |
| result["error"] = f"Verification failed: {e}" |
| |
| return result |
|
|
| def ensure_dataset_repo(self) -> None: |
| """Ensure the backup target repository exists. |
| |
| """ |
| target_repo = self.config.dataset_repo |
| self.api.create_repo( |
| repo_id=target_repo, |
| repo_type=self.config.repo_type, |
| private=self.config.private, |
| exist_ok=True, |
| ) |
|
|
| def _check_repo_exists(self, repo_id: str, repo_type: str = "dataset") -> bool: |
| """Check if a repository exists. |
| |
| Args: |
| repo_id: Repository ID to check |
| repo_type: Repository type (dataset, model, space) |
| |
| Returns: |
| True if repository exists, False otherwise |
| """ |
| try: |
| self.api.list_repo_files( |
| repo_id=repo_id, |
| repo_type=repo_type, |
| ) |
| return True |
| except Exception as e: |
| error_str = str(e).lower() |
| if "not found" in error_str or "404" in error_str or "does not exist" in error_str: |
| return False |
| raise |
|
|
| def _get_restore_repo(self) -> str: |
| """Get the repository to restore from, with fallback logic. |
| |
| If OPENCLAW_RESTORE_DATASET_REPO is set but doesn't exist, |
| falls back to OPENCLAW_BACKUP_DATASET_REPO. |
| |
| Returns: |
| Repository ID to restore from |
| """ |
| restore_repo = self.config.restore_dataset_repo |
| backup_repo = self.config.dataset_repo |
|
|
| if restore_repo and restore_repo != backup_repo: |
| if self._check_repo_exists(restore_repo, self.config.repo_type): |
| print(f"Using restore dataset: {restore_repo}") |
| return restore_repo |
| else: |
| print(f"Warning: Restore dataset '{restore_repo}' does not exist or is not accessible") |
| print(f"Falling back to backup dataset: {backup_repo}") |
| return backup_repo |
|
|
| print(f"Using backup dataset for restore: {backup_repo}") |
| return backup_repo |
|
|
| def _calculate_change_rate(self) -> float: |
| """Calculate file change rate (files per minute) since last backup.""" |
| rate, _ = self._calculate_change_rate_and_count() |
| return rate |
|
|
| def _calculate_change_rate_and_count(self) -> tuple[float, int]: |
| """Calculate file change rate (files per minute) and count since last backup. |
| |
| Returns: |
| Tuple of (change_rate, changed_files_count) |
| """ |
| last_backup_time = self._get_last_backup_time() |
| if last_backup_time is None: |
| return (float('inf'), 0) |
| |
| minutes_since_last = (utc_now() - last_backup_time).total_seconds() / 60 |
| if minutes_since_last < 1: |
| minutes_since_last = 1 |
| |
| |
| changed_files = 0 |
| for directory in [self.config.backup_source_dir] + list(self._extra_root_dirs().values()): |
| if directory.exists(): |
| changed_files += len(self._get_changed_files(directory, last_backup_time)) |
| |
| return (changed_files / minutes_since_last, changed_files) |
|
|
| def _estimate_backup_size_mb(self) -> float: |
| """Estimate the size of the next backup in MB.""" |
| total_size = 0 |
| |
| if not self.config.incremental or self._should_do_full_backup(use_cached=True): |
| |
| for directory in [self.config.backup_source_dir] + list(self._extra_root_dirs().values()): |
| if directory.exists(): |
| for item in directory.rglob("*"): |
| if item.is_file(): |
| try: |
| total_size += item.stat().st_size |
| except OSError: |
| continue |
| else: |
| |
| last_backup_time = self._get_last_backup_time() |
| if last_backup_time: |
| for directory in [self.config.backup_source_dir] + list(self._extra_root_dirs().values()): |
| if directory.exists(): |
| for file_path in self._get_changed_files(directory, last_backup_time): |
| try: |
| total_size += file_path.stat().st_size |
| except OSError: |
| continue |
| |
| return total_size / (1024 * 1024) |
|
|
| def _apply_dynamic_strategy(self, force_full_backup: bool = False) -> dict[str, any]: |
| """Apply dynamic backup strategy based on file size and change rate. |
| |
| Args: |
| force_full_backup: If True, skip checking for incremental backup eligibility |
| (backup type decision already made by caller). |
| """ |
| if not self.config.dynamic_backup: |
| return { |
| "compression_level": self.config.compression_level, |
| "split_size": self.config.split_size, |
| "skip_backup": False, |
| "reason": "Dynamic backup disabled", |
| } |
| |
| |
| estimated_size_mb = self._estimate_backup_size_mb() |
| estimated_size_kb = estimated_size_mb * 1024 |
| change_rate, changed_files = self._calculate_change_rate_and_count() |
| |
| print(f"[Dynamic Strategy] Estimated size: {estimated_size_mb:.1f}MB, Change rate: {change_rate:.1f} files/min, Changed files: {changed_files}") |
| |
| |
| if estimated_size_mb < self.config.dynamic_small_threshold_mb: |
| size_category = "small" |
| compression = 3 |
| split = "" |
| elif estimated_size_mb < self.config.dynamic_medium_threshold_mb: |
| size_category = "medium" |
| compression = 6 |
| split = "500M" if estimated_size_mb > 1000 else "" |
| else: |
| size_category = "large" |
| compression = 9 |
| split = "500M" |
| |
| |
| skip_reason = None |
|
|
| if change_rate > self.config.dynamic_high_change_rate: |
| |
| compression = max(1, compression - 2) |
| elif change_rate < self.config.dynamic_low_change_rate: |
| |
| compression = min(9, compression + 1) |
| |
| |
| if changed_files < self.config.dynamic_min_changed_files and estimated_size_kb < self.config.dynamic_min_changed_size_kb and not force_full_backup: |
| current_skipped = self._load_skipped_count() + 1 |
| self._save_skipped_count(current_skipped) |
|
|
| max_skipped_incrementals = max(3, self.config.max_incremental_backups // 3) |
| if current_skipped >= max_skipped_incrementals: |
| print(f"[Dynamic Strategy] Too many skipped incrementals ({current_skipped}), forcing backup to maintain chain continuity") |
| self._clear_skipped_count() |
| else: |
| skip_reason = f"Very low change rate ({change_rate:.1f} files/min, {changed_files} files, {estimated_size_kb:.0f}KB), skipping incremental backup ({current_skipped}/{max_skipped_incrementals})" |
| print(f"[Dynamic Strategy] {skip_reason}") |
| return { |
| "compression_level": compression, |
| "split_size": split, |
| "skip_backup": True, |
| "reason": skip_reason, |
| } |
| else: |
| self._clear_skipped_count() |
| |
| |
| if size_category == "large" and estimated_size_mb > 3000: |
| |
| compression = 9 |
| |
| strategy = { |
| "compression_level": compression, |
| "split_size": split, |
| "skip_backup": False, |
| "reason": f"Size: {size_category} ({estimated_size_mb:.1f}MB), Change rate: {change_rate:.1f} files/min", |
| } |
| |
| print(f"[Dynamic Strategy] Applied: compression={compression}, split={split or 'none'}") |
| |
| return strategy |
|
|
| def _extra_root_dirs(self) -> dict[str, Path]: |
| dirs = { |
| "root-config": self.config.root_config_dir, |
| "root-codex": self.config.root_codex_dir, |
| "root-claude": self.config.root_claude_dir, |
| "root-agents": self.config.root_agents_dir, |
| "root-ssh": self.config.root_ssh_dir, |
| "root-env": self.config.root_env_dir, |
| "root-lark-cli": self.config.root_lark_cli_dir, |
| "backup-info": self.config.metadata_dir, |
| } |
| if self.config.backup_npm_enabled: |
| dirs["root-npm"] = self.config.root_npm_dir |
| if self.config.extra_dirs: |
| dirs.update(self.config.extra_dirs) |
| return dirs |
|
|
| def _restore_root_dirs(self) -> dict[str, Path]: |
| dirs = { |
| "root-config": self.config.root_config_dir, |
| "root-codex": self.config.root_codex_dir, |
| "root-claude": self.config.root_claude_dir, |
| "root-agents": self.config.root_agents_dir, |
| "root-ssh": self.config.root_ssh_dir, |
| "root-env": self.config.root_env_dir, |
| "root-lark-cli": self.config.root_lark_cli_dir, |
| "backup-info": self.config.metadata_dir, |
| } |
| if self.config.restore_npm_enabled: |
| dirs["root-npm"] = self.config.root_npm_dir |
| if self.config.extra_dirs: |
| dirs.update(self.config.extra_dirs) |
| return dirs |
|
|
| def _get_last_backup_time(self) -> Optional[dt.datetime]: |
| """Get the timestamp of the last successful backup from metadata. |
| |
| Checks metadata_dir for last-backup-metadata.json, then falls back to |
| downloading latest-backup.json and the corresponding .meta.json from |
| remote storage. |
| """ |
| metadata_path = self.config.metadata_dir / "last-backup-metadata.json" |
| try: |
| if metadata_path.exists(): |
| with open(metadata_path, "r") as f: |
| metadata = json.load(f) |
| last_backup_str = metadata.get("last_backup_time") |
| if last_backup_str: |
| return dt.datetime.fromisoformat(last_backup_str) |
| except Exception: |
| pass |
|
|
| return self._get_last_backup_time_from_remote() |
|
|
| def _get_remote_backup_chain_info(self) -> Optional[dict]: |
| """Get the complete backup chain info from remote storage. |
| |
| Downloads latest-backup.json and the latest .meta.json to get: |
| - last_backup_time: timestamp of last backup |
| - parent_meta_path: path to parent backup's .meta.json |
| - chain_id: the chain this backup belongs to |
| - volumes: list of volume filenames (for split backups) |
| - backup_type: "full" or "incremental" |
| |
| This is the source of truth for backup chain continuity. |
| """ |
| try: |
| latest_json_path = "latest-backup.json" |
| latest_json_local = self._download_backup(latest_json_path, self.config.work_dir) |
| with open(latest_json_local, "r") as f: |
| latest_json = json.load(f) |
| meta_path = latest_json.get("latest", "") |
| if not meta_path: |
| return None |
|
|
| meta_local = self._download_backup(meta_path, self.config.work_dir) |
| with open(meta_local, "r") as f: |
| meta = json.load(f) |
|
|
| meta_filename = Path(meta_path).name |
| base_archive_name = meta_filename.replace(".meta.json", "") |
|
|
| return { |
| "last_backup_time": meta.get("last_backup_time") or meta.get("created_at_utc"), |
| "parent_meta_path": meta.get("parent"), |
| "chain_id": meta.get("chain_id"), |
| "volumes": meta.get("volumes", [base_archive_name]), |
| "backup_type": meta.get("backup_type", "incremental"), |
| "is_split": meta.get("is_split", False), |
| } |
| except Exception: |
| return None |
|
|
| def _get_last_backup_time_from_remote(self) -> Optional[dt.datetime]: |
| """Download latest-backup.json and corresponding .meta.json from remote to get last backup time. |
| |
| This is used when local metadata is not available (e.g., first run on a new machine). |
| """ |
| try: |
| latest_json_path = "latest-backup.json" |
| print(f"[_get_last_backup_time_from_remote] Downloading {latest_json_path}...") |
| latest_json_local = self._download_backup(latest_json_path, self.config.work_dir) |
| print(f"[_get_last_backup_time_from_remote] Downloaded to {latest_json_local}") |
| with open(latest_json_local, "r") as f: |
| latest_json = json.load(f) |
| meta_path = latest_json.get("latest", "") |
| print(f"[_get_last_backup_time_from_remote] latest path: {meta_path}") |
| if not meta_path: |
| return None |
|
|
| print(f"[_get_last_backup_time_from_remote] Downloading {meta_path}...") |
| cached_meta = self._get_cached_meta(meta_path) |
| if cached_meta is not None: |
| meta = cached_meta |
| print(f"[_get_last_backup_time_from_remote] Using cached metadata for {meta_path}") |
| else: |
| meta_local = self._download_backup(meta_path, self.config.work_dir) |
| print(f"[_get_last_backup_time_from_remote] Downloaded to {meta_local}") |
| with open(meta_local, "r") as f: |
| meta = json.load(f) |
| self._set_cached_meta(meta_path, meta) |
| last_backup_str = meta.get("last_backup_time") |
| print(f"[_get_last_backup_time_from_remote] last_backup_time: {last_backup_str}") |
| if last_backup_str: |
| return dt.datetime.fromisoformat(last_backup_str) |
| except Exception as e: |
| print(f"[_get_last_backup_time_from_remote] Error: {type(e).__name__}: {e}") |
| pass |
| return None |
|
|
| def _save_backup_metadata( |
| self, |
| archive_path: Path, |
| file_count: int, |
| is_full_backup: bool = True, |
| parent_meta_path: Optional[str] = None, |
| chain_id: Optional[str] = None |
| ) -> None: |
| """Save metadata about this backup for future incremental backups. |
| |
| This creates the per-backup .meta.json file content. |
| The metadata is saved locally and also uploaded to remote storage. |
| |
| Args: |
| archive_path: Path to the archive file (may be split into multiple volumes) |
| file_count: Number of files in the backup |
| is_full_backup: Whether this is a full backup |
| parent_meta_path: Path to parent backup's .meta.json file (for incremental) |
| chain_id: Chain ID for this backup (generated for full, inherited for incremental) |
| """ |
| if chain_id is None: |
| chain_id = str(uuid.uuid4())[:8] |
| prev_metadata = self._get_backup_metadata() |
|
|
| if is_full_backup: |
| backup_type = "full" |
| chain_id = str(uuid.uuid4())[:8] |
| else: |
| backup_type = "incremental" |
| |
| if not chain_id: |
| chain_id = prev_metadata.get("chain_id", str(uuid.uuid4())[:8]) |
|
|
| checksum = self._calculate_checksum(archive_path) |
| archive_size = archive_path.stat().st_size |
|
|
| if is_full_backup: |
| incremental_count = 0 |
| last_full_backup_time = utc_now().isoformat(timespec="seconds") |
| else: |
| incremental_count = (prev_metadata.get("incremental_count", 0) + 1) if prev_metadata else 1 |
| last_full_backup_time = prev_metadata.get("last_full_backup_time") if prev_metadata else None |
|
|
| metadata = { |
| "version": METADATA_VERSION, |
| "backup_type": backup_type, |
| "chain_id": chain_id, |
| "parent": parent_meta_path, |
| "volumes": [archive_path.name], |
| "checksum": f"sha256:{checksum}", |
| "created_at_utc": utc_now().isoformat(timespec="seconds"), |
| "last_backup_time": utc_now().isoformat(timespec="seconds"), |
| "file_count": file_count, |
| "archive_size": archive_size, |
| "is_latest": True, |
| "created_by": "openclaw-backup", |
| "incremental_count": incremental_count, |
| "last_full_backup_time": last_full_backup_time, |
| } |
|
|
| if prev_metadata: |
| prev_metadata["is_latest"] = False |
| prev_path = self.config.metadata_dir / "last-backup-metadata.json" |
| with open(prev_path, "w") as f: |
| json.dump(prev_metadata, f, indent=2) |
|
|
| metadata_path = self.config.metadata_dir / "last-backup-metadata.json" |
| with open(metadata_path, "w") as f: |
| json.dump(metadata, f, indent=2) |
|
|
| def _get_backup_metadata(self) -> Optional[dict]: |
| """Get the current backup metadata from local files. |
| |
| Checks metadata_dir for last-backup-metadata.json. |
| """ |
| metadata_path = self.config.metadata_dir / "last-backup-metadata.json" |
| try: |
| if metadata_path.exists(): |
| with open(metadata_path, "r") as f: |
| return json.load(f) |
| except Exception: |
| pass |
| return None |
|
|
| def _get_skipped_count_file(self) -> Path: |
| """Get path to the skipped incremental count file.""" |
| return self.config.metadata_dir / ".skipped-incremental-count" |
|
|
| def _load_skipped_count(self) -> int: |
| """Load the count of consecutively skipped incremental backups.""" |
| skipped_file = self._get_skipped_count_file() |
| try: |
| if skipped_file.exists(): |
| return int(skipped_file.read_text().strip()) |
| except (ValueError, OSError): |
| pass |
| return 0 |
|
|
| def _save_skipped_count(self, count: int) -> None: |
| """Save the count of consecutively skipped incremental backups.""" |
| skipped_file = self._get_skipped_count_file() |
| try: |
| self.config.metadata_dir.mkdir(parents=True, exist_ok=True) |
| skipped_file.write_text(str(count)) |
| except OSError: |
| pass |
|
|
| def _clear_skipped_count(self) -> None: |
| """Clear the skipped incremental count (when a backup is performed).""" |
| skipped_file = self._get_skipped_count_file() |
| try: |
| skipped_file.unlink(missing_ok=True) |
| except OSError: |
| pass |
|
|
| def _update_volumes_in_metadata(self, volumes: list[str], checksum: Optional[str] = None, archive_size: Optional[int] = None) -> None: |
| """Update volumes field in metadata after split/encryption. |
| |
| This is necessary because _save_backup_metadata is called BEFORE split/encryption, |
| so it records the original archive name. After splitting and/or encrypting, we need |
| to update with the actual volume names and checksums. |
| |
| Args: |
| volumes: List of split volume filenames |
| checksum: New checksum for the encrypted archive (optional) |
| archive_size: New archive size in bytes (optional) |
| """ |
| try: |
| metadata_path = self.config.metadata_dir / "last-backup-metadata.json" |
| if metadata_path.exists(): |
| with open(metadata_path, "r") as f: |
| metadata = json.load(f) |
| metadata["volumes"] = volumes |
| if checksum: |
| metadata["checksum"] = checksum |
| if archive_size is not None: |
| metadata["archive_size"] = archive_size |
| if any(v.endswith(".enc") for v in volumes): |
| metadata["encrypted"] = True |
| metadata["encryption_algorithm"] = "AES-256-CBC" |
| if any(".part-" in v for v in volumes): |
| metadata["is_split"] = True |
| with open(metadata_path, "w") as f: |
| json.dump(metadata, f, indent=2) |
| except Exception as e: |
| print(f"Warning: Could not update metadata: {e}") |
|
|
| def _should_do_full_backup(self, use_cached: bool = False) -> bool: |
| """Determine if a full backup should be performed instead of incremental. |
| |
| Args: |
| use_cached: If True, return cached result from previous call in same backup cycle. |
| If False, always compute fresh (but will cache if this is first call). |
| """ |
| cached_attr = '_cached_should_full_backup' |
|
|
| if use_cached and hasattr(self, cached_attr): |
| return getattr(self, cached_attr) |
|
|
| if not self.config.incremental: |
| return True |
|
|
| last_backup_time = self._get_last_backup_time() |
| if last_backup_time is None: |
| print("[DEBUG] No previous backup found, performing full backup") |
| result = True |
| else: |
| time_since_last = utc_now() - last_backup_time |
| if time_since_last.total_seconds() < self.config.incremental_interval_minutes * 60: |
| print(f"[DEBUG] Incremental interval not reached ({time_since_last.total_seconds():.0f}s < {self.config.incremental_interval_minutes * 60}s), skipping backup") |
| result = False |
| else: |
| result = self._should_force_full_backup() |
|
|
| setattr(self, cached_attr, result) |
| return result |
| |
| def _should_force_full_backup(self) -> bool: |
| """Check if full backup should be forced based on interval/count rules.""" |
| metadata = self._get_backup_metadata() |
|
|
| last_full_backup_time_str = None |
| incremental_count = 0 |
| if metadata: |
| last_full_backup_time_str = metadata.get("last_full_backup_time") |
| incremental_count = metadata.get("incremental_count", 0) |
|
|
| if last_full_backup_time_str: |
| last_full_backup_time = dt.datetime.fromisoformat(last_full_backup_time_str) |
| hours_since_full = (utc_now() - last_full_backup_time).total_seconds() / 3600 |
| if hours_since_full >= self.config.full_backup_interval_hours: |
| print(f"[DEBUG] Full backup interval reached ({hours_since_full:.1f}h >= {self.config.full_backup_interval_hours}h), forcing full backup") |
| return True |
|
|
| if incremental_count >= self.config.max_incremental_backups: |
| print(f"[DEBUG] Max incremental backups reached ({incremental_count}/{self.config.max_incremental_backups}), forcing full backup") |
| return True |
|
|
| return False |
|
|
| def _get_changed_files(self, directory: Path, since: dt.datetime) -> list[Path]: |
| """Get list of files that have been modified since the given timestamp.""" |
| changed_files = [] |
| if not directory.exists(): |
| return changed_files |
|
|
| for item in directory.rglob("*"): |
| if item.is_file(): |
| try: |
| mtime = dt.datetime.fromtimestamp(item.stat().st_mtime, tz=dt.UTC) |
| if mtime > since: |
| changed_files.append(item) |
| except (OSError, ValueError): |
| |
| continue |
| return changed_files |
|
|
| def create_archive(self, timestamp: Optional[str] = None) -> Path | None: |
| if not self.config.backup_source_dir.exists(): |
| raise FileNotFoundError(f"backup source directory does not exist: {self.config.backup_source_dir}") |
|
|
| stamp = timestamp or utc_now().strftime("%Y%m%d-%H%M%S") |
| self.config.work_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| self._cleanup_stale_checkpoints() |
|
|
| |
| |
| remote_chain_info = self._get_remote_backup_chain_info() |
|
|
| |
| |
| do_full_backup = self._should_do_full_backup(use_cached=False) |
| print(f"[BACKUP] Full backup decision: {do_full_backup} (cached=False call)") |
|
|
| |
| strategy = self._apply_dynamic_strategy(force_full_backup=do_full_backup) |
|
|
| |
| if strategy.get("skip_backup", False): |
| print(f"[Dynamic Strategy] Backup skipped: {strategy.get('reason', 'Unknown')}") |
| return None |
|
|
| |
| dynamic_compression = strategy.get("compression_level", self.config.compression_level) |
| dynamic_split = strategy.get("split_size", self.config.split_size) |
|
|
| |
| original_compression = self.config.compression_level |
| original_split = self.config.split_size |
| self.config.compression_level = dynamic_compression |
| self.config.split_size = dynamic_split |
|
|
| try: |
| |
| last_backup_time = self._get_last_backup_time() |
|
|
| |
| parent_meta_path = None |
| chain_id = None |
| if remote_chain_info: |
| chain_id = remote_chain_info.get("chain_id") |
| if do_full_backup: |
| parent_meta_path = None |
| print(f"[BACKUP] Creating new chain (full backup), chain_id={chain_id}") |
| else: |
| |
| |
| chain = self._find_all_backups_by_chain_id(chain_id, repo_id=self.config.dataset_repo) |
| if chain and len(chain) > 0: |
| parent_meta_path = chain[-1] |
| print(f"[BACKUP] Remote chain: parent={parent_meta_path}, chain_id={chain_id}") |
| else: |
| |
| |
| parent_meta_path = remote_chain_info.get("parent_meta_path") |
| if parent_meta_path: |
| print(f"[BACKUP] Remote chain (fallback): parent={parent_meta_path}, chain_id={chain_id}") |
| else: |
| print(f"[BACKUP] Warning: No parent found, starting new chain, chain_id={chain_id}") |
|
|
| archive = self.config.work_dir / f"openclaw-backup-{stamp}.tar.gz" |
| file_count = 0 |
|
|
| |
| |
| compression_level = max(1, min(9, self.config.compression_level)) |
| |
| with tarfile.open(archive, "w:gz", compresslevel=compression_level) as tar: |
| if do_full_backup or last_backup_time is None: |
| print(f"[BACKUP] Creating FULL backup: {archive.name}") |
| print(f"[BACKUP] ============================================") |
| print(f"[BACKUP] Backup source: {self.config.backup_source_dir}") |
|
|
| |
| if self.config.backup_source_dir.exists(): |
| state_files = [f for f in self.config.backup_source_dir.rglob("*") if f.is_file()] |
| tar.add(str(self.config.backup_source_dir), arcname="openclaw-state") |
| file_count += len(state_files) |
| print(f"[BACKUP] ✓ openclaw-state: {len(state_files)} files") |
| else: |
| print(f"[BACKUP] ✗ openclaw-state: directory not found!") |
|
|
| |
| for archive_root, local_dir in self._extra_root_dirs().items(): |
| if local_dir.exists(): |
| extra_files = [f for f in local_dir.rglob("*") if f.is_file()] |
| tar.add(str(local_dir), arcname=archive_root) |
| file_count += len(extra_files) |
| print(f"[BACKUP] ✓ {archive_root}: {len(extra_files)} files") |
| else: |
| print(f"[BACKUP] - {archive_root}: directory not found (skipped)") |
| else: |
| print(f"[BACKUP] Creating INCREMENTAL backup: {archive.name}") |
| print(f"[BACKUP] ============================================") |
| print(f"[BACKUP] Last backup: {last_backup_time.isoformat()}") |
|
|
| |
| changed_files = self._get_changed_files(self.config.backup_source_dir, last_backup_time) |
| for file_path in changed_files: |
| arcname = f"openclaw-state/{file_path.relative_to(self.config.backup_source_dir)}" |
| tar.add(str(file_path), arcname=arcname) |
| file_count += len(changed_files) |
| print(f"[BACKUP] ✓ openclaw-state: {len(changed_files)} changed files") |
| if changed_files: |
| for fp in changed_files[:5]: |
| print(f"[BACKUP] - {fp.name}") |
| if len(changed_files) > 5: |
| print(f"[BACKUP] ... and {len(changed_files) - 5} more") |
|
|
| |
| for archive_root, local_dir in self._extra_root_dirs().items(): |
| if local_dir.exists(): |
| changed_files = self._get_changed_files(local_dir, last_backup_time) |
| for file_path in changed_files: |
| arcname = f"{archive_root}/{file_path.relative_to(local_dir)}" |
| tar.add(str(file_path), arcname=arcname) |
| file_count += len(changed_files) |
| print(f"[BACKUP] ✓ {archive_root}: {len(changed_files)} changed files") |
| if changed_files: |
| for fp in changed_files[:5]: |
| print(f"[BACKUP] - {fp.name}") |
| if len(changed_files) > 5: |
| print(f"[BACKUP] ... and {len(changed_files) - 5} more") |
| else: |
| print(f"[BACKUP] - {archive_root}: directory not found (skipped)") |
|
|
| |
| for archive_name, local_file in self.config.extra_files.items(): |
| if local_file.exists() and local_file.is_file(): |
| tar.add(str(local_file), arcname=archive_name) |
| file_count += 1 |
|
|
| |
| is_full_backup = do_full_backup or last_backup_time is None |
| |
| |
| |
| if not is_full_backup and remote_chain_info: |
| |
| pass |
| elif is_full_backup: |
| parent_meta_path = None |
| chain_id = None |
| self._save_backup_metadata(archive, file_count, is_full_backup, parent_meta_path, chain_id) |
|
|
| |
| archive_size_mb = archive.stat().st_size / (1024 * 1024) |
| if archive_size_mb > self.config.size_warning_mb: |
| print(f"⚠️ WARNING: Backup size ({archive_size_mb:.1f}MB) exceeds warning threshold ({self.config.size_warning_mb}MB)") |
| print(f" Consider: 1) Excluding large files 2) Using split archives 3) Reducing backup frequency") |
|
|
| if is_full_backup: |
| print(f"Full backup complete: {file_count} files, {archive_size_mb:.1f}MB") |
| else: |
| print(f"Incremental backup complete: {file_count} files, {archive_size_mb:.1f}MB") |
|
|
| |
| if self.config.encryption_enabled and self.config.encryption_password: |
| result = self._encrypt_archive(archive) |
| if isinstance(result, Path): |
| enc_checksum = self._calculate_checksum(result) |
| enc_size = result.stat().st_size |
| enc_volumes = [result.name] |
| self._update_volumes_in_metadata(enc_volumes, f"sha256:{enc_checksum}", enc_size) |
| else: |
| result = archive |
|
|
| |
| |
| if self.config.split_size and archive_size_mb > 500: |
| |
| if not (self.config.encryption_enabled and self.config.encryption_password): |
| pre_split_checksum = self._calculate_checksum(result) |
| pre_split_size = result.stat().st_size |
| result = self._split_archive(result) |
| |
| |
| if isinstance(result, list): |
| is_encrypted = self.config.encryption_enabled and self.config.encryption_password |
| if is_encrypted: |
| |
| |
| |
| self._update_volumes_in_metadata([v.name for v in result]) |
| else: |
| |
| total_size = sum(v.stat().st_size for v in result) |
| self._update_volumes_in_metadata( |
| [v.name for v in result], |
| checksum=f"sha256:{pre_split_checksum}", |
| archive_size=total_size |
| ) |
| |
| return result |
| |
| finally: |
| |
| self.config.compression_level = original_compression |
| self.config.split_size = original_split |
|
|
| def _encrypt_archive(self, archive: Path) -> Path: |
| """Encrypt archive using AES-256-CBC with password. |
| |
| Args: |
| archive: Path to the archive to encrypt |
| |
| Returns: |
| Path to the encrypted archive (.tar.gz.enc) |
| """ |
| if not self.config.encryption_password: |
| raise ValueError("Encryption password not set") |
|
|
| encrypted_path = archive.with_suffix(archive.suffix + ".enc") |
|
|
| print(f"Encrypting archive: {archive.name} -> {encrypted_path.name}") |
|
|
| try: |
| import subprocess |
| import tempfile |
| import os as _os |
|
|
| with tempfile.NamedTemporaryFile(mode="w", delete=False) as pw_file: |
| _os.chmod(pw_file.file.fileno(), 0o600) |
| pw_file.write(self.config.encryption_password) |
| pw_file.flush() |
| pw_file_path = pw_file.name |
|
|
| try: |
| result = subprocess.run( |
| [ |
| "openssl", "enc", "-aes-256-cbc", "-salt", "-pbkdf2", |
| "-in", str(archive), |
| "-out", str(encrypted_path), |
| "-pass", f"file:{pw_file_path}", |
| ], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| finally: |
| _os.unlink(pw_file_path) |
|
|
| archive.unlink() |
| print(f"Encrypted: {encrypted_path.name} ({encrypted_path.stat().st_size / (1024*1024):.1f}MB)") |
| return encrypted_path |
| except subprocess.CalledProcessError as e: |
| raise RuntimeError(f"Encryption failed: {e.stderr}") |
|
|
| def _decrypt_archive(self, encrypted_path: Path, target_path: Path) -> Path: |
| """Decrypt archive using AES-256-CBC with password. |
| |
| Args: |
| encrypted_path: Path to the encrypted archive (.tar.gz.enc) |
| target_path: Path where decrypted archive should be written |
| |
| Returns: |
| Path to the decrypted archive |
| """ |
| if not self.config.encryption_password: |
| raise ValueError("Decryption password not set") |
|
|
| print(f"Decrypting archive: {encrypted_path.name} -> {target_path.name}") |
|
|
| try: |
| import subprocess |
| import tempfile |
| import os as _os |
|
|
| with tempfile.NamedTemporaryFile(mode="w", delete=False) as pw_file: |
| _os.chmod(pw_file.file.fileno(), 0o600) |
| pw_file.write(self.config.encryption_password) |
| pw_file.flush() |
| pw_file_path = pw_file.name |
|
|
| try: |
| result = subprocess.run( |
| [ |
| "openssl", "enc", "-d", "-aes-256-cbc", "-pbkdf2", |
| "-in", str(encrypted_path), |
| "-out", str(target_path), |
| "-pass", f"file:{pw_file_path}", |
| ], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| finally: |
| _os.unlink(pw_file_path) |
|
|
| print(f"Decrypted: {target_path.name} ({target_path.stat().st_size / (1024*1024):.1f}MB)") |
| return target_path |
| except subprocess.CalledProcessError as e: |
| raise RuntimeError(f"Decryption failed: {e.stderr}") |
|
|
| def _split_archive(self, archive: Path) -> list[Path]: |
| """Split large archive into smaller volumes.""" |
| import subprocess |
| |
| print(f"Splitting archive into {self.config.split_size} volumes...") |
| |
| |
| split_mb = 500 |
| if self.config.split_size: |
| size_str = self.config.split_size.upper() |
| if size_str.endswith('M'): |
| split_mb = int(size_str[:-1]) |
| elif size_str.endswith('G'): |
| split_mb = int(size_str[:-1]) * 1024 |
| |
| split_bytes = split_mb * 1024 * 1024 |
| |
| |
| base_name = archive.name |
| volumes_pattern = self.config.work_dir / f"{base_name}.part-" |
| |
| try: |
| result = subprocess.run( |
| ["split", "-b", str(split_bytes), str(archive), str(volumes_pattern)], |
| capture_output=True, |
| text=True, |
| check=True |
| ) |
| |
| |
| archive.unlink() |
| |
| |
| volumes = sorted(self.config.work_dir.glob(f"{base_name}.part-*")) |
| print(f"Created {len(volumes)} volume(s)") |
| for vol in volumes: |
| vol_size_mb = vol.stat().st_size / (1024 * 1024) |
| print(f" - {vol.name}: {vol_size_mb:.1f}MB") |
| |
| return volumes |
| |
| except subprocess.CalledProcessError as e: |
| print(f"Warning: Failed to split archive: {e}") |
| print(f"Continuing with unsplit archive") |
| return [archive] |
| except FileNotFoundError: |
| print(f"Warning: 'split' command not found. Install coreutils.") |
| print(f"Continuing with unsplit archive") |
| return [archive] |
|
|
| def _upload_file_with_retry(self, path_or_fileobj, path_in_repo: str, commit_message: str, max_retries: int = 3, repo_id: Optional[str] = None) -> None: |
| """Upload file with retry logic for rate limiting. |
| |
| Args: |
| path_or_fileobj: File path or file object to upload |
| path_in_repo: Path in the repository |
| commit_message: Commit message |
| max_retries: Maximum number of retries |
| repo_id: Optional repository ID to upload to. If not specified, uses dataset_repo |
| """ |
| upload_repo = repo_id or self.config.dataset_repo |
|
|
| for attempt in range(max_retries): |
| try: |
| self.api.upload_file( |
| repo_id=upload_repo, |
| repo_type=self.config.repo_type, |
| path_or_fileobj=path_or_fileobj, |
| path_in_repo=path_in_repo, |
| commit_message=commit_message, |
| ) |
| return |
| except Exception as e: |
| error_str = str(e) |
| |
| if "429" in error_str or "Too Many Requests" in error_str: |
| if attempt < max_retries - 1: |
| |
| match = re.search(r'Retry after (\d+) seconds', error_str) |
| if match: |
| wait_time = int(match.group(1)) |
| print(f"Rate limit hit (429), waiting {wait_time} seconds (server requested) before retry {attempt + 1}/{max_retries}...") |
| else: |
| |
| wait_time = 60 * (2 ** attempt) |
| print(f"Rate limit hit (429), waiting {wait_time} seconds (exponential backoff) before retry {attempt + 1}/{max_retries}...") |
| time.sleep(wait_time) |
| continue |
| |
| raise |
|
|
| def upload_backup(self, archive: Path | list[Path]) -> str: |
| if isinstance(archive, list): |
| volumes = archive |
| is_split = True |
| else: |
| volumes = [archive] |
| is_split = False |
|
|
| |
| backup_meta = self._get_backup_metadata() |
| if not backup_meta: |
| |
| remote_chain_info = self._get_remote_backup_chain_info() |
| if remote_chain_info: |
| |
| backup_meta = { |
| "chain_id": remote_chain_info.get("chain_id"), |
| "parent": remote_chain_info.get("parent_meta_path"), |
| "backup_type": remote_chain_info.get("backup_type"), |
| } |
| print(f"[UPLOAD] Using remote chain info: chain_id={backup_meta.get('chain_id')}") |
| else: |
| raise RuntimeError("Backup metadata not found. Cannot upload.") |
|
|
| |
| first_volume_name = volumes[0].name |
| base_name = first_volume_name |
| if ".part-" in base_name: |
| base_name = base_name.rsplit(".part-", 1)[0] |
| meta_json_name = f"{base_name}.meta.json" |
| if self.config.path_prefix: |
| meta_json_remote = f"{self.config.path_prefix}/{meta_json_name}" |
| else: |
| meta_json_remote = meta_json_name |
|
|
| |
| checkpoint_json = { |
| "status": "uploading", |
| "backup_meta_path": meta_json_remote, |
| "volumes": [vol.name for vol in volumes], |
| "volumes_uploaded": [], |
| "created_at_utc": utc_now().isoformat(timespec="seconds"), |
| } |
| checkpoint_name = f"{base_name}.checkpoint.json" |
| if self.config.path_prefix: |
| checkpoint_remote = f"{self.config.path_prefix}/{checkpoint_name}" |
| else: |
| checkpoint_remote = checkpoint_name |
|
|
| print(f"Creating checkpoint file: {checkpoint_name}") |
| self._upload_file_with_retry( |
| path_or_fileobj=(json.dumps(checkpoint_json, ensure_ascii=False, indent=2) + "\n").encode("utf-8"), |
| path_in_repo=checkpoint_remote, |
| commit_message=f"backup: {checkpoint_name} (checkpoint)", |
| ) |
|
|
| |
| remote_volumes = [] |
| for vol in volumes: |
| if self.config.path_prefix: |
| remote_path = f"{self.config.path_prefix}/{vol.name}" |
| else: |
| remote_path = vol.name |
|
|
| print(f"Uploading {vol.name}...") |
| self._upload_file_with_retry( |
| path_or_fileobj=str(vol), |
| path_in_repo=remote_path, |
| commit_message=f"backup: {vol.name}", |
| ) |
| remote_volumes.append(remote_path) |
|
|
| |
| checkpoint_json["volumes_uploaded"].append(vol.name) |
| print(f"Updating checkpoint: {vol.name} uploaded") |
| self._upload_file_with_retry( |
| path_or_fileobj=(json.dumps(checkpoint_json, ensure_ascii=False, indent=2) + "\n").encode("utf-8"), |
| path_in_repo=checkpoint_remote, |
| commit_message=f"backup: {checkpoint_name} (checkpoint update)", |
| ) |
|
|
| |
| |
| |
| backup_meta["volumes"] = [vol.name for vol in volumes] |
| backup_meta["is_split"] = is_split |
|
|
| |
| print(f"Uploading {meta_json_name}...") |
| self._upload_file_with_retry( |
| path_or_fileobj=(json.dumps(backup_meta, ensure_ascii=False, indent=2) + "\n").encode("utf-8"), |
| path_in_repo=meta_json_remote, |
| commit_message=f"backup: {meta_json_name}", |
| ) |
|
|
| |
| latest_backup_json = { |
| "dataset": self.config.dataset_repo, |
| "latest": meta_json_remote, |
| "created_at_utc": utc_now().isoformat(timespec="seconds"), |
| } |
|
|
| |
| latest_json_path = "latest-backup.json" |
|
|
| print("Updating latest-backup.json...") |
| self._upload_file_with_retry( |
| path_or_fileobj=(json.dumps(latest_backup_json, ensure_ascii=False, indent=2) + "\n").encode("utf-8"), |
| path_in_repo=latest_json_path, |
| commit_message="backup: update latest-backup.json", |
| ) |
|
|
| |
| print(f"Deleting checkpoint file: {checkpoint_name}") |
| self._delete_checkpoint_with_retry(checkpoint_remote) |
|
|
| def _delete_checkpoint_with_retry(self, checkpoint_remote: str, max_retries: int = 3) -> None: |
| """Delete checkpoint file with retry logic. |
| |
| Args: |
| checkpoint_remote: Path to checkpoint file in repo |
| max_retries: Maximum deletion attempts |
| """ |
| for attempt in range(max_retries): |
| try: |
| self.api.delete_file( |
| path_in_repo=checkpoint_remote, |
| repo_id=self.config.dataset_repo, |
| repo_type=self.config.repo_type, |
| ) |
| print(f"Checkpoint file deleted successfully") |
| return |
| except Exception as e: |
| error_str = str(e) |
| if "404" in error_str or "Entry Not Found" in error_str: |
| print(f"Checkpoint file already deleted or never existed") |
| return |
| if attempt < max_retries - 1: |
| wait_time = 2 ** attempt |
| print(f"Warning: Could not delete checkpoint file (attempt {attempt + 1}/{max_retries}): {e}") |
| print(f"Retrying in {wait_time}s...") |
| time.sleep(wait_time) |
| else: |
| print(f"Warning: Could not delete checkpoint file after {max_retries} attempts: {e}") |
| print(f"The checkpoint will be cleaned up during the next prune operation") |
| raise |
|
|
| def _cleanup_stale_checkpoints(self) -> None: |
| """Clean up stale checkpoint files from previous interrupted uploads. |
| |
| A checkpoint is considered stale if: |
| - It is older than 24 hours |
| - It has no corresponding backup archive uploaded |
| """ |
| try: |
| files = self.api.list_repo_files( |
| repo_id=self.config.dataset_repo, |
| repo_type=self.config.repo_type, |
| ) |
|
|
| cutoff_time = utc_now() - dt.timedelta(hours=24) |
|
|
| cleaned = 0 |
| for path in files: |
| if not isinstance(path, str): |
| continue |
| if not path.endswith(".checkpoint.json"): |
| continue |
|
|
| try: |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| checkpoint_local = self._download_backup(path, Path(tmp_dir)) |
| with open(checkpoint_local, "r") as f: |
| checkpoint_data = json.load(f) |
| created_at = checkpoint_data.get("created_at_utc", "") |
| if created_at: |
| checkpoint_time = dt.datetime.fromisoformat(created_at) |
| if checkpoint_time < cutoff_time: |
| self.api.delete_file( |
| path_in_repo=path, |
| repo_id=self.config.dataset_repo, |
| repo_type=self.config.repo_type, |
| ) |
| print(f"[CLEANUP] Deleted stale checkpoint: {path}") |
| cleaned += 1 |
| except Exception: |
| match = re.search(r"(\d{8}-\d{6})", path) |
| if match: |
| try: |
| file_time = dt.datetime.strptime(match.group(1), "%Y%m%d-%H%M%S") |
| if file_time < cutoff_time: |
| self.api.delete_file( |
| path_in_repo=path, |
| repo_id=self.config.dataset_repo, |
| repo_type=self.config.repo_type, |
| ) |
| print(f"[CLEANUP] Deleted stale checkpoint (by filename): {path}") |
| cleaned += 1 |
| except (ValueError, Exception): |
| pass |
|
|
| if cleaned > 0: |
| print(f"[CLEANUP] Cleaned up {cleaned} stale checkpoint(s)") |
| except Exception as e: |
| print(f"[CLEANUP] Warning: Could not clean up stale checkpoints: {e}") |
|
|
| def _verify_upload(self, files_to_verify: list[str], repo_id: Optional[str] = None) -> None: |
| """Verify that the uploaded files exist in the repository. |
| |
| Uses lightweight API checks for large files and selective download |
| verification for small metadata files only. |
| |
| Args: |
| files_to_verify: List of file paths to verify |
| repo_id: Optional repository ID to verify in. If not specified, uses dataset_repo |
| """ |
| files_to_verify = [f for f in files_to_verify if f] |
| if not files_to_verify: |
| raise RuntimeError("Upload verification failed: no files to verify") |
|
|
| verify_repo = repo_id or self.config.dataset_repo |
|
|
| time.sleep(2) |
|
|
| try: |
| repo_files = self.api.list_repo_files( |
| repo_id=verify_repo, |
| repo_type=self.config.repo_type, |
| ) |
| repo_files_set = set(repo_files) |
|
|
| for path in files_to_verify: |
| if path not in repo_files_set: |
| raise RuntimeError(f"Upload verification failed: {path} not found in repository") |
|
|
| |
| |
| if path.endswith(".meta.json") or path.endswith("latest-backup.json"): |
| try: |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| self.api.hf_hub_download( |
| repo_id=verify_repo, |
| filename=path, |
| local_dir=tmp_dir, |
| repo_type=self.config.repo_type, |
| token=self.token, |
| ) |
| except Exception: |
| raise RuntimeError(f"Upload verification failed: {path} exists but cannot be downloaded") |
|
|
| print(f" Verified: {path}") |
|
|
| except Exception as e: |
| raise RuntimeError(f"Upload verification failed: {e}") |
|
|
| def _list_backup_archive_paths(self) -> list[str]: |
| if not hasattr(self.api, "list_repo_files"): |
| return [] |
|
|
| files = self.api.list_repo_files( |
| repo_id=self.config.dataset_repo, |
| repo_type=self.config.repo_type, |
| ) |
| path_prefix = self.config.path_prefix.strip("/") |
| path_prefix_slash = f"{path_prefix}/" if path_prefix else "" |
| |
| |
| backups_by_time: dict[str, list[str]] = {} |
|
|
| for path in files: |
| if not isinstance(path, str): |
| continue |
|
|
| relative_path = path |
| if path_prefix_slash: |
| if not path.startswith(path_prefix_slash): |
| continue |
| relative_path = path[len(path_prefix_slash) :] |
|
|
| if "/" in relative_path: |
| continue |
|
|
| match = self._ARCHIVE_NAME_RE.fullmatch(relative_path) |
| if match is None: |
| continue |
|
|
| timestamp = match.group(1) |
| if timestamp not in backups_by_time: |
| backups_by_time[timestamp] = [] |
| backups_by_time[timestamp].append(path) |
|
|
| |
| sorted_timestamps = sorted(backups_by_time.keys(), reverse=True) |
| |
| |
| result = [] |
| for timestamp in sorted_timestamps: |
| result.extend(backups_by_time[timestamp]) |
|
|
| return result |
|
|
| def _find_meta_file_for_timestamp(self, timestamp: str, files: list, path_prefix_slash: str) -> str | None: |
| """Find the meta.json file path for a backup timestamp. |
| |
| Args: |
| timestamp: The backup timestamp (e.g., "20260425-013501") |
| files: List of files in the repository |
| path_prefix_slash: Path prefix with trailing slash |
| |
| Returns: |
| The meta file path if found, None otherwise |
| """ |
| for path in files: |
| if not isinstance(path, str): |
| continue |
| if timestamp in path and path.endswith(".meta.json"): |
| return path |
| return None |
|
|
| def prune_old_backups(self) -> list[str]: |
| print("[PRUNE] Starting old backup cleanup...") |
| print(f"[PRUNE] Retention policy: keep_count={self.config.keep_count}, min_full_backups=12") |
|
|
| if not hasattr(self.api, "delete_file"): |
| print("[PRUNE] API does not support delete_file, skipping cleanup") |
| return [] |
|
|
| keep_count = self.config.keep_count if self.config.keep_count >= 1 else 48 |
| min_full_backups = 12 |
|
|
| if not hasattr(self.api, "list_repo_files"): |
| print("[PRUNE] API does not support list_repo_files, skipping cleanup") |
| return [] |
|
|
| print("[PRUNE] Fetching file list from remote repository...") |
| files = self.api.list_repo_files( |
| repo_id=self.config.dataset_repo, |
| repo_type=self.config.repo_type, |
| ) |
| print(f"[PRUNE] Found {len(files)} total files in repository") |
|
|
| path_prefix = self.config.path_prefix.strip("/") |
| path_prefix_slash = f"{path_prefix}/" if path_prefix else "" |
|
|
| backups_by_time: dict[str, list[str]] = {} |
| skipped_non_backup = 0 |
|
|
| for path in files: |
| if not isinstance(path, str): |
| continue |
|
|
| relative_path = path |
| if path_prefix_slash: |
| if not path.startswith(path_prefix_slash): |
| continue |
| relative_path = path[len(path_prefix_slash) :] |
|
|
| if "/" in relative_path: |
| continue |
|
|
| match = self._ARCHIVE_NAME_RE.fullmatch(relative_path) |
| if match is None: |
| skipped_non_backup += 1 |
| continue |
|
|
| timestamp = match.group(1) |
| if timestamp not in backups_by_time: |
| backups_by_time[timestamp] = [] |
| backups_by_time[timestamp].append(path) |
|
|
| print(f"[PRUNE] Scanned files: {len(files)} total, {skipped_non_backup} non-backup files skipped") |
| print(f"[PRUNE] Identified {len(backups_by_time)} backup timestamp(s)") |
|
|
| if not backups_by_time: |
| print("[PRUNE] No backups found, nothing to prune") |
| return [] |
|
|
| for ts, paths in sorted(backups_by_time.items(), reverse=True): |
| print(f"[PRUNE] Backup {ts}: {len(paths)} volume(s)") |
|
|
| sorted_timestamps = sorted(backups_by_time.keys(), reverse=True) |
|
|
| full_backup_timestamps = set() |
| for ts in sorted_timestamps: |
| meta_file = self._find_meta_file_for_timestamp(ts, files, path_prefix_slash) |
| if meta_file: |
| try: |
| cached_meta = self._get_cached_meta(meta_file) |
| if cached_meta is not None: |
| meta = cached_meta |
| else: |
| local_meta = self._download_backup(meta_file, self.config.work_dir) |
| with open(local_meta, "r") as f: |
| meta = json.load(f) |
| self._set_cached_meta(meta_file, meta) |
| if meta.get("backup_type") == "full": |
| full_backup_timestamps.add(ts) |
| except Exception: |
| pass |
|
|
| print(f"[PRUNE] Identified {len(full_backup_timestamps)} full backup(s), ensuring minimum 12 are kept") |
|
|
| if len(full_backup_timestamps) < min_full_backups: |
| print(f"[PRUNE] Not enough full backups ({len(full_backup_timestamps)}) to meet minimum {min_full_backups}, skipping cleanup") |
| return [] |
|
|
| kept_full_backups_list = sorted(full_backup_timestamps, reverse=True)[:min_full_backups] |
| kept_full_backups = set(kept_full_backups_list) |
| oldest_kept_full = kept_full_backups_list[-1] if kept_full_backups_list else None |
| print(f"[PRUNE] Will keep {len(kept_full_backups)} newest full backups: {kept_full_backups_list[:3]}...") |
| if oldest_kept_full: |
| print(f"[PRUNE] Oldest kept full backup: {oldest_kept_full}") |
|
|
| old_full_timestamps = [ts for ts in sorted_timestamps if ts in full_backup_timestamps and ts not in kept_full_backups] |
|
|
| incremental_timestamps = [ts for ts in sorted_timestamps if ts not in full_backup_timestamps] |
| timestamps_to_delete_candidates = list(reversed(old_full_timestamps)) |
|
|
| for ts in reversed(incremental_timestamps): |
| if oldest_kept_full and ts >= oldest_kept_full: |
| continue |
| timestamps_to_delete_candidates.append(ts) |
|
|
| timestamps_to_delete = timestamps_to_delete_candidates[:keep_count] |
|
|
| if not timestamps_to_delete: |
| print("[PRUNE] No backups to delete after applying retention policy") |
| return [] |
|
|
| print(f"[PRUNE] Will delete {len(timestamps_to_delete)} backup(s): {len([t for t in timestamps_to_delete if t in full_backup_timestamps])} full, {len([t for t in timestamps_to_delete if t not in full_backup_timestamps])} incremental") |
| print(f"[PRUNE] Oldest backup to delete: {timestamps_to_delete[-1]}") |
|
|
| |
| max_files_per_cleanup = 50 |
| total_files_to_delete = 0 |
| for ts in timestamps_to_delete: |
| total_files_to_delete += len(backups_by_time[ts]) |
| |
| total_files_to_delete += 1 |
|
|
| if total_files_to_delete > max_files_per_cleanup: |
| |
| limited_timestamps = [] |
| limited_count = 0 |
| for ts in reversed(timestamps_to_delete): |
| ts_files = len(backups_by_time[ts]) + 1 |
| if limited_count + ts_files <= max_files_per_cleanup or not limited_timestamps: |
| limited_timestamps.append(ts) |
| limited_count += ts_files |
| else: |
| break |
| timestamps_to_delete = list(reversed(limited_timestamps)) |
| print(f"[PRUNE] Limiting cleanup to {len(timestamps_to_delete)} backup(s) ({limited_count} files) to avoid rate limiting") |
|
|
| to_delete = [] |
| seen_paths = set() |
| for timestamp in timestamps_to_delete: |
| archive_paths = backups_by_time[timestamp] |
| for archive_path in archive_paths: |
| if archive_path not in seen_paths: |
| to_delete.append(archive_path) |
| seen_paths.add(archive_path) |
| print(f"[PRUNE] Marking for deletion: backup {timestamp} ({len(archive_paths)} volume(s))") |
|
|
| meta_added = False |
| for archive_path in archive_paths: |
| archive_name = Path(archive_path).name |
| if ".part-" in archive_name: |
| base_name = archive_name.rsplit(".part-", 1)[0] |
| else: |
| base_name = archive_name |
| if base_name.endswith(".tar.gz"): |
| meta_name = base_name[:-7] + ".meta.json" |
| else: |
| meta_name = base_name + ".meta.json" |
|
|
| if path_prefix_slash: |
| meta_path = f"{path_prefix_slash}{meta_name}" |
| else: |
| meta_path = meta_name |
|
|
| if meta_path in files and meta_path not in seen_paths: |
| to_delete.append(meta_path) |
| seen_paths.add(meta_path) |
| if not meta_added: |
| print(f"[PRUNE] + meta.json: {meta_name}") |
| meta_added = True |
|
|
| |
| checkpoint_name = base_name + ".checkpoint.json" |
| if path_prefix_slash: |
| checkpoint_path = f"{path_prefix_slash}{checkpoint_name}" |
| else: |
| checkpoint_path = checkpoint_name |
| if checkpoint_path in files and checkpoint_path not in seen_paths: |
| to_delete.append(checkpoint_path) |
| seen_paths.add(checkpoint_path) |
| print(f"[PRUNE] + checkpoint.json: {checkpoint_name}") |
|
|
| |
| orphaned_checkpoints = [] |
| for path in files: |
| if not isinstance(path, str): |
| continue |
| if not path.endswith(".checkpoint.json"): |
| continue |
| |
| checkpoint_name = Path(path).name |
| base_name = checkpoint_name[:-len(".checkpoint.json")] |
| |
| has_backup = False |
| for ts, archive_paths in backups_by_time.items(): |
| for ap in archive_paths: |
| ap_name = Path(ap).name |
| if ".part-" in ap_name: |
| ap_base = ap_name.rsplit(".part-", 1)[0] |
| else: |
| ap_base = ap_name |
| if ap_base == base_name: |
| has_backup = True |
| break |
| if has_backup: |
| break |
| if not has_backup and path not in seen_paths: |
| orphaned_checkpoints.append(path) |
| seen_paths.add(path) |
|
|
| if orphaned_checkpoints: |
| print(f"[PRUNE] Found {len(orphaned_checkpoints)} orphaned checkpoint(s)") |
| for cp in orphaned_checkpoints: |
| to_delete.append(cp) |
| print(f"[PRUNE] + orphaned checkpoint: {Path(cp).name}") |
|
|
| print(f"[PRUNE] Total files to delete: {len(to_delete)}") |
|
|
| prune_repo = self.config.dataset_repo |
| deleted_count = 0 |
| skipped_count = 0 |
| rate_limit_hits = 0 |
| import time |
| for path in to_delete: |
| print(f"[PRUNE] Deleting ({deleted_count + skipped_count + 1}/{len(to_delete)}): {path}") |
| retry_count = 0 |
| max_retries = 3 |
| while retry_count < max_retries: |
| try: |
| self.api.delete_file( |
| repo_id=prune_repo, |
| repo_type=self.config.repo_type, |
| path_in_repo=path, |
| commit_message=f"backup: prune {Path(path).name}", |
| ) |
| deleted_count += 1 |
| break |
| except Exception as e: |
| error_str = str(e) |
| if "404" in error_str or "Entry Not Found" in error_str: |
| print(f"[PRUNE] ⚠ File already gone or never existed, skipping: {path}") |
| skipped_count += 1 |
| break |
| elif "429" in error_str or "Too Many Requests" in error_str: |
| retry_count += 1 |
| rate_limit_hits += 1 |
| if retry_count >= max_retries: |
| print(f"[PRUNE] ✗ Rate limit exceeded after {max_retries} retries, stopping cleanup: {path}") |
| print(f"[PRUNE] Cleanup interrupted due to rate limiting. Deleted {deleted_count} file(s), skipped {skipped_count} (not found), rate limit hits: {rate_limit_hits}") |
| return to_delete[:deleted_count + skipped_count] |
| wait_time = min(2 ** retry_count * 5, 60) |
| print(f"[PRUNE] ⚠ Rate limit hit ({retry_count}/{max_retries}), waiting {wait_time}s before retry...") |
| time.sleep(wait_time) |
| continue |
| else: |
| print(f"[PRUNE] ✗ Delete failed: {e}") |
| raise |
|
|
| print(f"[PRUNE] Cleanup complete, deleted {deleted_count} file(s), skipped {skipped_count} (not found), rate limit hits: {rate_limit_hits}") |
| return to_delete |
|
|
| def _manage_local_backups(self, archive: Path | list[Path]) -> None: |
| """Manage local backup files based on retention policy.""" |
| if not self.config.keep_local_backup: |
| |
| if isinstance(archive, list): |
| for vol in archive: |
| if vol.exists(): |
| vol.unlink() |
| print(f"Cleaned up temporary volume: {vol.name}") |
| else: |
| if archive.exists(): |
| archive.unlink() |
| print(f"Cleaned up temporary archive: {archive.name}") |
| return |
| |
| |
| target_dir = self.config.local_backup_dir or self.config.work_dir |
| target_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| archives_to_keep = [] |
| if isinstance(archive, list): |
| for vol in archive: |
| target_path = target_dir / vol.name |
| if target_dir != self.config.work_dir: |
| shutil.copy2(vol, target_path) |
| vol.unlink() |
| archives_to_keep.append(target_path) |
| else: |
| target_path = target_dir / archive.name |
| if target_dir != self.config.work_dir: |
| shutil.copy2(archive, target_path) |
| archive.unlink() |
| archives_to_keep.append(target_path) |
| |
| |
| self._prune_local_backups(target_dir) |
| |
| print(f"Local backup kept: {target_dir}") |
| |
| def _prune_local_backups(self, directory: Path) -> None: |
| """Remove old local backups keeping only keep_local_count most recent.""" |
| |
| archives = sorted( |
| directory.glob("openclaw-backup-*.tar.gz*"), |
| key=lambda p: p.stat().st_mtime, |
| reverse=True |
| ) |
| |
| |
| base_names = {} |
| for arch in archives: |
| |
| base_name = arch.name |
| if ".part-" in base_name: |
| base_name = base_name.rsplit(".part-", 1)[0] |
| |
| if base_name not in base_names: |
| base_names[base_name] = [] |
| base_names[base_name].append(arch) |
| |
| |
| keep_count = self.config.keep_local_count |
| sorted_bases = sorted(base_names.keys(), reverse=True) |
| |
| for base_name in sorted_bases[keep_count:]: |
| for arch in base_names[base_name]: |
| arch.unlink() |
| print(f"Pruned old local backup: {arch.name}") |
|
|
| def _cleanup_stale_lock(self, lock_path: Path) -> bool: |
| """Clean up stale lock file from previous crashed process. |
| |
| Returns True if lock was cleaned up or no lock existed. |
| Returns False if lock exists and is held by a live process. |
| """ |
| if not lock_path.exists(): |
| return True |
| try: |
| with open(lock_path, "r") as f: |
| lines = f.readlines() |
| if not lines: |
| lock_path.unlink(missing_ok=True) |
| return True |
| try: |
| pid = int(lines[0].strip()) |
| except (ValueError, IndexError): |
| lock_path.unlink(missing_ok=True) |
| return True |
| try: |
| os.kill(pid, 0) |
| return False |
| except ProcessLookupError: |
| print(f"[LOCK] Stale lock from dead process {pid}, cleaning up") |
| lock_path.unlink(missing_ok=True) |
| return True |
| except OSError: |
| return False |
| except Exception: |
| try: |
| lock_path.unlink(missing_ok=True) |
| except OSError: |
| pass |
| return True |
|
|
| def _acquire_lock(self, lock_path: Path) -> Optional[int]: |
| """Acquire exclusive backup lock with stale lock cleanup and timeout.""" |
| cleanup_result = self._cleanup_stale_lock(lock_path) |
| if not cleanup_result: |
| print("[LOCK] Lock file exists and is held by a live process, waiting...") |
| lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR) |
| start_time = time.time() |
| timeout = 300 |
| while True: |
| try: |
| fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
| lock_uuid = str(uuid.uuid4()) |
| os.write(lock_fd, f"{os.getpid()}\n{utc_now().isoformat()}\n{lock_uuid}\n".encode()) |
| os.fsync(lock_fd) |
| self._lock_uuid = lock_uuid |
| return lock_fd |
| except Exception as e: |
| if time.time() - start_time > timeout: |
| os.close(lock_fd) |
| raise TimeoutError(f"Could not acquire backup lock after {timeout}s") from e |
| time.sleep(1) |
|
|
| def backup(self) -> Path | list[Path] | None: |
| lock_path = self.config.work_dir / "openclaw-backup.lock" |
| lock_path.parent.mkdir(parents=True, exist_ok=True) |
| lock_fd = None |
| result = False |
| try: |
| lock_fd = self._acquire_lock(lock_path) |
| return self._backup_inner() |
| except TimeoutError as e: |
| print(f"[LOCK] {e}, backup skipped") |
| return None |
| finally: |
| if lock_fd is not None: |
| try: |
| fcntl.flock(lock_fd, fcntl.LOCK_UN) |
| except (OSError, IOError): |
| pass |
| os.close(lock_fd) |
| try: |
| lock_path.unlink(missing_ok=True) |
| except OSError: |
| pass |
|
|
| def _backup_inner(self) -> Path | list[Path] | None: |
| self.ensure_dataset_repo() |
|
|
| if not self._verify_existing_backup(): |
| print("backup aborted: cannot verify existing backup is restorable", file=sys.stderr) |
| print(" Please fix the restore issue before creating new backups.", file=sys.stderr) |
| return None |
|
|
| archive = self.create_archive() |
|
|
| if archive is None: |
| print("backup skipped by dynamic strategy") |
| return None |
|
|
| remote_path = self.upload_backup(archive) |
| print(f"backup uploaded: {self.config.dataset_repo}/{remote_path}") |
|
|
| self._manage_local_backups(archive) |
|
|
| return archive |
|
|
| def _verify_existing_backup(self) -> bool: |
| """Verify that the existing latest backup metadata is valid. |
| |
| Note: This method only validates metadata, not the actual backup files. |
| Full integrity verification is deferred to the restore process. |
| |
| Returns True if: |
| - No existing backup (first backup) |
| - Existing backup metadata is valid |
| |
| Returns False if: |
| - Existing backup metadata is invalid or corrupted |
| """ |
| print("Checking if existing backup metadata is valid...") |
| print(f" Checking backup target: {self.config.dataset_repo}") |
|
|
| verify_repo = self.config.dataset_repo |
|
|
| |
| try: |
| |
| meta_filename = "latest-backup.json" |
| print(f" Downloading {meta_filename}...") |
| meta_path = self._download_backup(meta_filename, self.config.work_dir) |
| print(f" Downloaded to {meta_path}") |
| with open(meta_path, "r") as f: |
| latest_meta = json.load(f) |
| print(f" Found backup metadata: {latest_meta.get('latest', 'unknown')}") |
|
|
| |
| actual_meta_path = latest_meta.get("latest", "") |
| if not actual_meta_path: |
| print(" No 'latest' entry found in latest-backup.json") |
| return False |
|
|
| print(f" Downloading actual meta file: {actual_meta_path}...") |
| cached_meta = self._get_cached_meta(actual_meta_path) |
| if cached_meta is not None: |
| backup_meta = cached_meta |
| print(f" Using cached metadata for {actual_meta_path}") |
| else: |
| actual_meta_local = self._download_backup(actual_meta_path, self.config.work_dir, repo_id=verify_repo) |
| with open(actual_meta_local, "r") as f: |
| backup_meta = json.load(f) |
| self._set_cached_meta(actual_meta_path, backup_meta) |
|
|
| is_split = backup_meta.get("is_split", False) |
| volumes = backup_meta.get("volumes", []) |
| print(f" backup_meta keys: {list(backup_meta.keys())}") |
| print(f" is_split: {is_split}") |
| print(f" volumes: {volumes}") |
|
|
| |
| if not volumes: |
| print(" Warning: No volumes found in backup metadata") |
| return False |
|
|
| |
| print(" Verifying backup files exist in repository...") |
| try: |
| repo_files = self.api.list_repo_files( |
| repo_id=verify_repo, |
| repo_type=self.config.repo_type, |
| ) |
| repo_files_set = set(repo_files) |
|
|
| for vol_path in volumes: |
| |
| check_path = vol_path |
| if self.config.path_prefix and not vol_path.startswith(self.config.path_prefix): |
| check_path = f"{self.config.path_prefix}/{vol_path}" |
|
|
| if check_path not in repo_files_set: |
| print(f" ✗ Backup file not found: {check_path}") |
| return False |
| print(f" ✓ Backup file exists: {check_path}") |
|
|
| print(f" ✓ All {len(volumes)} backup file(s) verified to exist") |
| except Exception as e: |
| print(f" Warning: Could not verify backup files exist: {e}") |
| |
|
|
| print(" Note: Full integrity verification will be performed during restore") |
| return True |
|
|
| except Exception as e: |
| |
| print(f" No existing backup found (first backup): {type(e).__name__}: {e}") |
| return True |
|
|
| @staticmethod |
| def replace_dir_contents(source_dir: Path, target_dir: Path) -> None: |
| target_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for existing in target_dir.iterdir(): |
| try: |
| if existing.is_dir() and not existing.is_symlink(): |
| shutil.rmtree(existing) |
| else: |
| existing.unlink() |
| except (OSError, PermissionError): |
| pass |
|
|
| for item in source_dir.iterdir(): |
| destination = target_dir / item.name |
| if item.is_symlink(): |
| try: |
| if destination.exists() or destination.is_symlink(): |
| destination.unlink() |
| except OSError: |
| pass |
| try: |
| destination.symlink_to(os.readlink(item)) |
| except OSError: |
| pass |
| elif item.is_dir(): |
| if destination.exists(): |
| shutil.rmtree(destination) |
| shutil.copytree(item, destination, symlinks=True) |
| else: |
| try: |
| shutil.copy2(item, destination) |
| except FileExistsError: |
| try: |
| destination.unlink() |
| shutil.copy2(item, destination) |
| except Exception: |
| pass |
|
|
| def _download_backup(self, remote_path: str, local_dir: Path, repo_id: Optional[str] = None) -> Path: |
| """Download a specific backup from the dataset. |
| |
| First attempts to use hf_hub_download (with cache bypass), then falls back |
| to direct HTTP download via urllib. Always ensures latest version is obtained. |
| |
| Uses in-memory caching for .meta.json files to avoid repeated downloads during |
| the same session. |
| |
| Args: |
| remote_path: Path to the file in the repository |
| local_dir: Local directory to download to |
| repo_id: Optional repository ID to download from. If not specified, |
| uses dataset_repo as default |
| """ |
| |
| |
| download_repo = repo_id or self.config.dataset_repo |
|
|
| |
| if remote_path in self._meta_file_cache: |
| cached_path = self._meta_file_cache[remote_path] |
| if cached_path.exists(): |
| return cached_path |
|
|
| local_dir = Path(local_dir) |
| local_dir.mkdir(parents=True, exist_ok=True) |
| local_file_path = local_dir / remote_path |
|
|
| if local_file_path.exists(): |
| local_file_path.unlink() |
|
|
| if hf_hub_download is not None: |
| try: |
| print(f"[_download_backup] Trying hf_hub_download for {remote_path} from {download_repo}...") |
| downloaded_path = hf_hub_download( |
| repo_id=download_repo, |
| repo_type=self.config.repo_type, |
| filename=remote_path, |
| local_dir=str(local_dir), |
| force_download=True, |
| token=self.token, |
| ) |
| print(f"[_download_backup] hf_hub_download succeeded: {downloaded_path}") |
| result_path = Path(downloaded_path) |
| if remote_path.endswith(".meta.json"): |
| self._meta_file_cache[remote_path] = result_path |
| return result_path |
| except Exception as e: |
| print(f"[_download_backup] hf_hub_download failed: {type(e).__name__}: {e}") |
| print(f"[_download_backup] Falling back to direct HTTP download...") |
|
|
| print(f"[_download_backup] Using direct HTTP for {remote_path} from {download_repo}...") |
| endpoint = f"https://huggingface.co/datasets/{download_repo}/resolve/main/{remote_path}" |
| headers = {} |
| if self.token: |
| headers["Authorization"] = f"Bearer {self.token}" |
|
|
| req = urllib.request.Request(endpoint, headers=headers) |
| with urllib.request.urlopen(req, timeout=60) as response: |
| local_file_path.write_bytes(response.read()) |
|
|
| print(f"[_download_backup] Direct HTTP download succeeded: {local_file_path}") |
| if remote_path.endswith(".meta.json"): |
| self._meta_file_cache[remote_path] = local_file_path |
| return local_file_path |
|
|
| def _clear_hf_cache(self, remote_path: str) -> None: |
| """Clear Hugging Face Hub cache for a specific file to force re-download.""" |
| try: |
| |
| cache_dir = Path.home() / ".cache" / "huggingface" / "hub" |
| if not cache_dir.exists(): |
| return |
|
|
| |
| repo_name = self.config.dataset_repo.replace("/", "--") |
| repo_cache_dir = cache_dir / f"datasets--{repo_name}" |
|
|
| if not repo_cache_dir.exists(): |
| return |
|
|
| |
| |
| |
| filename = Path(remote_path).name |
| removed = False |
|
|
| |
| for blob_file in repo_cache_dir.rglob("*"): |
| if blob_file.is_file() and blob_file.name == filename: |
| blob_file.unlink() |
| print(f" Cleared cache: {blob_file}") |
| removed = True |
|
|
| if not removed: |
| print(f" No cache found for: {filename}") |
|
|
| except Exception as e: |
| print(f" Warning: Could not clear cache: {e}") |
|
|
| def _get_cached_meta(self, meta_path: str) -> Optional[dict]: |
| """Get cached metadata JSON, or None if not cached.""" |
| return self._meta_cache.get(meta_path) |
|
|
| def _set_cached_meta(self, meta_path: str, meta: dict) -> None: |
| """Cache metadata JSON.""" |
| self._meta_cache[meta_path] = meta |
|
|
| def should_restore(self) -> bool: |
| """Determine if restore should be performed based on configuration. |
| |
| Restore logic: |
| - Container restart/rebuild always triggers restore |
| - This is by design to ensure data consistency |
| |
| Users can configure OPENCLAW_RESTORE_DATASET_REPO via HF Space variables |
| to control restore behavior (defaults to OPENCLAW_BACKUP_DATASET_REPO). |
| |
| Returns: |
| True (restore is always performed on container start) |
| """ |
| print("Restore will be performed (container restart/rebuild detected)") |
| return True |
|
|
| def _format_size(self, size_bytes: int) -> str: |
| """Format byte size to human readable string.""" |
| for unit in ["B", "KB", "MB", "GB", "TB"]: |
| if size_bytes < 1024: |
| return f"{size_bytes:.1f} {unit}" |
| size_bytes /= 1024 |
| return f"{size_bytes:.1f} PB" |
|
|
| def _merge_directory(self, source: Path, target: Path) -> None: |
| """Merge source directory into target directory. |
| |
| For incremental backups: newer files overwrite older ones. |
| """ |
| target.mkdir(parents=True, exist_ok=True) |
|
|
| for item in source.rglob("*"): |
| if item.is_file(): |
| rel_path = item.relative_to(source) |
| dest = target / rel_path |
| dest.parent.mkdir(parents=True, exist_ok=True) |
| try: |
| shutil.copy2(item, dest) |
| except FileExistsError: |
| dest.unlink() |
| shutil.copy2(item, dest) |
| elif item.is_dir(): |
| rel_path = item.relative_to(source) |
| dest = target / rel_path |
| dest.mkdir(parents=True, exist_ok=True) |
|
|
| def restore(self) -> bool: |
| """Restore from backup, automatically handling incremental backup chains. |
| |
| Flow according to documentation: |
| 1. Download latest-backup.json to get latest .meta.json path |
| 2. Parse backup chain: download all .meta.json files, trace back to full backup |
| 3. Download archives in order (from full to latest) and merge |
| 4. Restore files to target directory |
| """ |
| lock_path = self.config.work_dir / "openclaw-backup.lock" |
| lock_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| lock_fd = None |
| result = False |
| restore_error_msg = None |
| skip_flag = False |
| try: |
| lock_fd = self._acquire_lock(lock_path) |
| if lock_fd is None: |
| print("restore skipped: a backup or restore is already in progress") |
| skip_flag = True |
| return False |
|
|
| os.write(lock_fd, f"{os.getpid()}\n{utc_now().isoformat()}\nrestore\n".encode()) |
| os.fsync(lock_fd) |
|
|
| result = self._restore_inner() |
| except TimeoutError as e: |
| print(f"[LOCK] {e}, restore skipped") |
| restore_error_msg = str(e) |
| result = False |
| except Exception as e: |
| print(f"[RESTORE] Unexpected error during restore: {type(e).__name__}: {e}", file=sys.stderr) |
| restore_error_msg = f"{type(e).__name__}: {e}" |
| result = False |
| finally: |
| if lock_fd is not None: |
| try: |
| fcntl.flock(lock_fd, fcntl.LOCK_UN) |
| except (OSError, IOError): |
| pass |
| try: |
| os.close(lock_fd) |
| except OSError: |
| pass |
|
|
| try: |
| lock_path.unlink(missing_ok=True) |
| except Exception: |
| pass |
|
|
| if not skip_flag: |
| restore_completed_flag = Path("/tmp/openclaw-restore-completed") |
| restore_completed_flag.touch() |
| print(f"Created restore completion flag: {restore_completed_flag}", file=sys.stderr) |
|
|
| return result |
|
|
| def _restore_inner(self) -> bool: |
| print("Starting restore process...") |
| restore_repo = self._get_restore_repo() |
| print(f"Restore source: {restore_repo}") |
| print(f"Target: {self.config.backup_source_dir}") |
|
|
| |
| |
| specified_archive = os.getenv("OPENCLAW_RESTORE_ARCHIVE") |
| if specified_archive: |
| |
| meta_path = f"{specified_archive}.meta.json" |
| if self.config.path_prefix: |
| meta_path = f"{self.config.path_prefix}/{meta_path}" |
| print(f"Restoring specified backup: {specified_archive}") |
| else: |
| |
| latest_json_path = "latest-backup.json" |
| try: |
| latest_json_local = self._download_backup(latest_json_path, self.config.work_dir, repo_id=restore_repo) |
| with open(latest_json_local, "r") as f: |
| latest_json = json.load(f) |
| meta_path = latest_json.get("latest", "") |
| print(f"Latest backup: {meta_path}") |
| except Exception as e: |
| print(f"Warning: latest-backup.json not found in dataset '{restore_repo}': {e}", file=sys.stderr) |
| print(f"This appears to be a newly initialized dataset", file=sys.stderr) |
| restore_skipped_flag = Path("/tmp/openclaw-restore-skipped-no-backup") |
| restore_skipped_flag.touch() |
| restore_completed_flag = Path("/tmp/openclaw-restore-completed") |
| restore_completed_flag.touch() |
| print(f"Created restore skipped flag: {restore_skipped_flag}", file=sys.stderr) |
| return True |
|
|
| |
| print("\n=== Step 2: Parsing backup chain ===") |
| chain_meta_paths = self._parse_backup_chain(meta_path, repo_id=restore_repo) |
|
|
| if not chain_meta_paths: |
| print("Error: Could not determine backup chain", file=sys.stderr) |
| return False |
|
|
| if len(chain_meta_paths) == 1: |
| print(f"Single backup (full): {chain_meta_paths[0]}") |
| else: |
| print(f"Backup chain with {len(chain_meta_paths)} backup(s):") |
| for i, mp in enumerate(chain_meta_paths, 1): |
| backup_type = "full" if i == 1 else "incremental" |
| print(f" [{i}] {mp} ({backup_type})") |
|
|
| |
| print("\n=== Step 3: Downloading and merging archives ===") |
| extra_dirs = self._restore_root_dirs() |
| extra_files = self.config.extra_files |
|
|
| if extra_dirs: |
| print("Extra directories to restore:") |
| for archive_root, local_dir in extra_dirs.items(): |
| print(f" - {archive_root} -> {local_dir}") |
|
|
| if extra_files: |
| print("Extra files to restore:") |
| for archive_name, local_file in extra_files.items(): |
| print(f" - {archive_name} -> {local_file}") |
|
|
| restored_items = [] |
|
|
| with tempfile.TemporaryDirectory(prefix="openclaw-restore-") as tmp: |
| tmp_dir = Path(tmp) |
|
|
| |
| merged_state = self._download_and_merge_chain(chain_meta_paths, tmp_dir, repo_id=restore_repo) |
|
|
| if not merged_state.exists() or not merged_state.is_dir(): |
| print("Error: Merged state directory missing", file=sys.stderr) |
| return False |
|
|
| |
| print("\n=== Step 4: Restoring files ===") |
|
|
| |
| print(f"Restoring state to: {self.config.backup_source_dir}") |
| self.replace_dir_contents(merged_state / "openclaw-state", self.config.backup_source_dir) |
| restored_items.append(f"state -> {self.config.backup_source_dir}") |
|
|
| |
| for archive_root, local_dir in extra_dirs.items(): |
| extracted_extra_dir = merged_state / archive_root |
| if extracted_extra_dir.exists(): |
| if not extracted_extra_dir.is_dir(): |
| print(f"Warning: backup archive {archive_root} is not a directory, skipping") |
| continue |
| print(f"Restoring {archive_root} to: {local_dir}") |
| self.replace_dir_contents(extracted_extra_dir, local_dir) |
| restored_items.append(f"{archive_root} -> {local_dir}") |
|
|
| |
| for archive_name, local_file in extra_files.items(): |
| extracted_file = merged_state / archive_name |
| if extracted_file.exists() and extracted_file.is_file(): |
| local_file.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(str(extracted_file), str(local_file)) |
| print(f"Restored extra file: {archive_name} -> {local_file}") |
| restored_items.append(f"{archive_name} -> {local_file}") |
|
|
| print("") |
| restore_repo = self._get_restore_repo() |
| print(f"✓ Restore completed successfully from {restore_repo}") |
| print(f" Chain: {len(chain_meta_paths)} backup(s) merged") |
| print(" Restored items:") |
| for item in restored_items: |
| print(f" - {item}") |
|
|
| restored_metadata = merged_state / "backup-info" / "last-backup-metadata.json" |
| if restored_metadata.exists(): |
| self.config.metadata_dir.mkdir(parents=True, exist_ok=True) |
| target_metadata = self.config.metadata_dir / "last-backup-metadata.json" |
| shutil.copy2(str(restored_metadata), str(target_metadata)) |
| print(f" Copied metadata to: {target_metadata}") |
|
|
| print("") |
| print("Backup restore completed successfully") |
| return True |
|
|
| def _parse_backup_chain(self, latest_meta_path: str, repo_id: Optional[str] = None) -> list[str]: |
| """Parse backup chain by following parent pointers in .meta.json files. |
| |
| Args: |
| latest_meta_path: Path to the latest .meta.json file (with path_prefix) |
| repo_id: Optional repository ID to download from. Uses dataset_repo if not specified. |
| |
| Returns: |
| List of .meta.json paths in order from full backup to latest |
| """ |
| download_repo = repo_id or self.config.dataset_repo |
| chain = [] |
| current_path = latest_meta_path |
| visited = set() |
| max_chain_length = 100 |
|
|
| chain_id = None |
| latest_time = None |
|
|
| local_meta = self._download_backup(latest_meta_path, self.config.work_dir, repo_id=download_repo) |
| with open(local_meta, "r") as f: |
| latest_meta = json.load(f) |
| latest_time_str = latest_meta.get("created_at_utc") or latest_meta.get("last_backup_time") |
| if latest_time_str: |
| try: |
| latest_time = dt.datetime.fromisoformat(latest_time_str.replace("Z", "+00:00")) |
| except (ValueError, AttributeError): |
| latest_time = None |
|
|
| while current_path and len(chain) < max_chain_length: |
| if current_path in visited: |
| print(f"Warning: Circular reference detected in backup chain at {current_path}") |
| break |
| visited.add(current_path) |
|
|
| chain.append(current_path) |
|
|
| |
| try: |
| cached_meta = self._get_cached_meta(current_path) |
| if cached_meta is not None: |
| meta = cached_meta |
| else: |
| local_meta = self._download_backup(current_path, self.config.work_dir, repo_id=download_repo) |
| with open(local_meta, "r") as f: |
| meta = json.load(f) |
| self._set_cached_meta(current_path, meta) |
| except Exception as e: |
| print(f"Error: Could not download/parse {current_path}: {e}", file=sys.stderr) |
| break |
|
|
| |
| if chain_id is None: |
| chain_id = meta.get("chain_id") |
|
|
| |
| parent = meta.get("parent") |
| if not parent: |
| |
| if meta.get("backup_type") == "full": |
| |
| break |
| else: |
| |
| |
| print(f"Warning: Incremental backup has null parent (chain may be broken). chain_id={chain_id}") |
| break |
|
|
| |
| if self.config.path_prefix and not parent.startswith(self.config.path_prefix): |
| parent = f"{self.config.path_prefix}/{parent}" |
|
|
| current_path = parent |
|
|
| if len(chain) >= max_chain_length: |
| print(f"Warning: Chain length exceeded safety limit ({max_chain_length})") |
|
|
| |
| if len(chain) == 1 and chain_id: |
| first_meta_path = chain[0] |
| try: |
| local_meta = self._download_backup(first_meta_path, self.config.work_dir, repo_id=download_repo) |
| with open(local_meta, "r") as f: |
| first_meta = json.load(f) |
| if first_meta.get("backup_type") == "incremental" and not first_meta.get("parent"): |
| print(f"Attempting to rebuild chain using chain_id={chain_id}...") |
| all_backups = self._find_all_backups_by_chain_id(chain_id, repo_id=download_repo, latest_time=latest_time) |
| if all_backups: |
| print(f"Found {len(all_backups)} backup(s) in chain") |
| |
| chain = all_backups |
| |
| else: |
| print(f"Warning: Could not find backups for chain_id={chain_id}") |
| print(f" This incremental backup may not restore correctly without its full backup") |
| except Exception as e: |
| print(f"Warning: Could not check first meta for chain rebuild: {e}") |
| else: |
| |
| chain.reverse() |
|
|
| return chain |
|
|
| def _find_all_backups_by_chain_id(self, chain_id: str, repo_id: Optional[str] = None, latest_time: Optional[dt.datetime] = None) -> Optional[list[str]]: |
| """Find all backups (full + incremental) for a given chain_id and sort by time. |
| |
| Args: |
| chain_id: The chain ID to search for |
| repo_id: Optional repository ID to search in. Uses dataset_repo if not specified. |
| latest_time: Optional datetime to filter backups - only backups at or before this time are included |
| |
| Returns: |
| List of .meta.json paths sorted by created_at_utc (earliest first), or None if no full backup found |
| """ |
| download_repo = repo_id or self.config.dataset_repo |
|
|
| try: |
| from huggingface_hub import HfApi |
|
|
| token = self.token |
| api = HfApi(token=token) |
|
|
| files = api.list_repo_files(repo_id=download_repo, repo_type=self.config.repo_type) |
| meta_files = [f for f in files if f.endswith(".meta.json")] |
|
|
| backups_with_time = [] |
| full_backup_path = None |
|
|
| for meta_file in meta_files: |
| try: |
| cached_meta = self._get_cached_meta(meta_file) |
| if cached_meta is not None: |
| meta = cached_meta |
| else: |
| local_meta = self._download_backup(meta_file, self.config.work_dir, repo_id=download_repo) |
| with open(local_meta, "r") as f: |
| meta = json.load(f) |
| self._set_cached_meta(meta_file, meta) |
|
|
| if meta.get("chain_id") == chain_id: |
| created_at = meta.get("created_at_utc") or meta.get("last_backup_time") |
| if created_at: |
| try: |
| if isinstance(created_at, str): |
| created_at = dt.datetime.fromisoformat(created_at.replace("Z", "+00:00")) |
| except (ValueError, AttributeError): |
| created_at = None |
|
|
| if latest_time and created_at and created_at > latest_time: |
| continue |
|
|
| if meta.get("backup_type") == "full": |
| full_backup_path = (meta_file, created_at, meta) |
| else: |
| backups_with_time.append((meta_file, created_at, meta)) |
| except Exception: |
| continue |
|
|
| if not full_backup_path: |
| print(f"Warning: No full backup found for chain_id={chain_id}") |
| return None |
|
|
| backups_with_time.sort(key=lambda x: x[1] if x[1] else dt.datetime.min) |
|
|
| chain = [full_backup_path[0]] |
| chain.extend([b[0] for b in backups_with_time]) |
|
|
| return chain |
|
|
| except Exception as e: |
| print(f"Warning: Could not search for backups: {e}") |
| return None |
|
|
| def _download_and_merge_chain(self, chain_meta_paths: list[str], tmp_dir: Path, repo_id: Optional[str] = None) -> Path: |
| """Download archives in chain order and merge them. |
| |
| Args: |
| chain_meta_paths: List of .meta.json paths (from full to latest) |
| tmp_dir: Temporary directory for downloads and extraction |
| repo_id: Optional repository ID to download from. Uses dataset_repo if not specified. |
| |
| Returns: |
| Path to merged state directory |
| """ |
| |
| download_repo = repo_id or self.config.dataset_repo |
| merged_dir = tmp_dir / "merged-state" |
| merged_dir.mkdir(parents=True, exist_ok=True) |
|
|
| print(f"Merging {len(chain_meta_paths)} backup(s) in chain...") |
|
|
| total_size = 0 |
| successful_backups = 0 |
| for i, meta_path in enumerate(chain_meta_paths): |
| backup_type = "full" if i == 0 else "incremental" |
| print(f" [{i+1}/{len(chain_meta_paths)}] Processing: {meta_path} ({backup_type})") |
|
|
| |
| try: |
| cached_meta = self._get_cached_meta(meta_path) |
| if cached_meta is not None: |
| meta = cached_meta |
| else: |
| local_meta = self._download_backup(meta_path, tmp_dir, repo_id=download_repo) |
| with open(local_meta, "r") as f: |
| meta = json.load(f) |
| self._set_cached_meta(meta_path, meta) |
| except Exception as e: |
| print(f"Error: Could not download/parse {meta_path}: {e}", file=sys.stderr) |
| if i == 0: |
| raise RuntimeError(f"Failed to download metadata for full backup: {meta_path}") |
| print(f" Warning: Stopping backup chain at {meta_path}, using {successful_backups} successful backup(s)") |
| break |
|
|
| is_split = meta.get("is_split", False) |
| is_encrypted = meta.get("encrypted", False) |
| volumes = meta.get("volumes", []) |
| expected_checksum = meta.get("checksum", "") |
|
|
| |
| archive_path = None |
| try: |
| if is_split and volumes: |
| |
| print(f" Downloading {len(volumes)} split volumes...") |
| local_volumes = [] |
| for vol_path in volumes: |
| |
| vol_remote_path = vol_path |
| if self.config.path_prefix and not vol_path.startswith(self.config.path_prefix): |
| vol_remote_path = f"{self.config.path_prefix}/{vol_path}" |
| local_vol = self._download_backup(vol_remote_path, tmp_dir, repo_id=download_repo) |
| local_volumes.append(local_vol) |
|
|
| |
| |
| if is_encrypted: |
| merged_archive = tmp_dir / f"merged-{i}.tar.gz.enc" |
| else: |
| merged_archive = tmp_dir / f"merged-{i}.tar.gz" |
| with open(merged_archive, "wb") as outfile: |
| for vol in sorted(local_volumes): |
| with open(vol, "rb") as infile: |
| outfile.write(infile.read()) |
| archive_path = merged_archive |
| print(f" Merged {len(local_volumes)} volumes into {merged_archive.name}") |
| else: |
| |
| if volumes: |
| |
| vol_remote_path = volumes[0] |
| if self.config.path_prefix and not volumes[0].startswith(self.config.path_prefix): |
| vol_remote_path = f"{self.config.path_prefix}/{volumes[0]}" |
| archive_path = self._download_backup(vol_remote_path, tmp_dir, repo_id=download_repo) |
| else: |
| raise RuntimeError(f"No volumes found in metadata for {meta_path}") |
|
|
| |
| |
| if expected_checksum: |
| try: |
| actual_checksum = self._calculate_checksum(archive_path) |
| if actual_checksum != expected_checksum.replace("sha256:", ""): |
| raise RuntimeError( |
| f"Checksum mismatch for {archive_path.name}: " |
| f"expected {expected_checksum}, got sha256:{actual_checksum}" |
| ) |
| print(f" ✓ Checksum verified: {archive_path.name}") |
| except Exception as e: |
| print(f" Error: Integrity verification failed for {meta_path}: {e}", file=sys.stderr) |
| if i == 0: |
| raise RuntimeError(f"Full backup integrity check failed: {meta_path}") |
| print(f" Warning: Stopping backup chain at {meta_path}, using {successful_backups} successful backup(s)") |
| break |
| elif is_split: |
| print(f" ⚠ Checksum not available for split backup, skipping verification (create new backups to enable)") |
|
|
| if is_encrypted: |
| if not self.config.encryption_password: |
| raise RuntimeError( |
| f"Backup {meta_path} is encrypted but OPENCLAW_BACKUP_ENCRYPTION_PASSWORD is not set. " |
| f"Cannot restore encrypted backup without password." |
| ) |
| decrypted_archive = tmp_dir / f"decrypted-{i}.tar.gz" |
| archive_path = self._decrypt_archive(archive_path, decrypted_archive) |
| except Exception as e: |
| print(f" Error: Failed to download archive for {meta_path}: {e}", file=sys.stderr) |
| if i == 0: |
| raise RuntimeError(f"Failed to download full backup: {meta_path}") |
| print(f" Warning: Stopping backup chain at {meta_path}, using {successful_backups} successful backup(s)") |
| break |
|
|
| |
| |
| |
|
|
| archive_size = archive_path.stat().st_size |
| total_size += archive_size |
| print(f" Archive: {archive_path.name}, size: {self._format_size(archive_size)}") |
|
|
| |
| extract_dir = tmp_dir / f"extract-{i}" |
| extract_dir.mkdir(parents=True, exist_ok=True) |
|
|
| try: |
| with tarfile.open(archive_path, "r:gz") as tar: |
| tar.extractall(extract_dir) |
| except tarfile.ReadError as e: |
| print(f" Error: Failed to extract archive {archive_path}: {e}", file=sys.stderr) |
| if i == 0: |
| raise RuntimeError(f"Failed to extract full backup: {meta_path}") |
| print(f" Warning: Stopping backup chain at {meta_path}, using {successful_backups} successful backup(s)") |
| break |
|
|
| |
| file_count = sum(1 for _ in extract_dir.rglob("*") if _.is_file()) |
| print(f" Files: {file_count}") |
|
|
| |
| state_dir = extract_dir / "openclaw-state" |
| if state_dir.exists(): |
| self._merge_directory(state_dir, merged_dir / "openclaw-state") |
|
|
| |
| for archive_root, _ in self._restore_root_dirs().items(): |
| extra_dir = extract_dir / archive_root |
| if extra_dir.exists(): |
| self._merge_directory(extra_dir, merged_dir / archive_root) |
|
|
| successful_backups += 1 |
|
|
| if successful_backups == 0: |
| raise RuntimeError("No backups could be restored successfully") |
|
|
| print(f" Total downloaded: {self._format_size(total_size)}") |
| print(f" Successfully processed {successful_backups}/{len(chain_meta_paths)} backup(s)") |
| return merged_dir |
|
|
|
|
| def build_parser() -> argparse.ArgumentParser: |
| parser = argparse.ArgumentParser(description="Backup/restore OpenClaw state with Hugging Face Dataset") |
| parser.add_argument("action", choices=["backup", "restore"], help="action to perform") |
| return parser |
|
|
|
|
| def main(argv: Optional[list[str]] = None) -> int: |
| args = build_parser().parse_args(argv) |
|
|
| try: |
| config = BackupConfig.from_env() |
| except ValueError as exc: |
| print(f"skip {args.action}: {exc}", file=sys.stderr) |
| return 0 |
|
|
| try: |
| runner = OpenClawBackup(config=config) |
| if args.action == "backup": |
| runner.backup() |
| return 0 |
|
|
| |
| if args.action == "restore": |
| |
| success = runner.restore() |
| return 0 if success else 1 |
|
|
| return 0 |
| except Exception as exc: |
| print(f"{args.action} failed: {exc}", file=sys.stderr) |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|