| |
| import os |
| import sys |
| import tarfile |
| import tempfile |
| import shutil |
| import subprocess |
| from pathlib import Path |
| from huggingface_hub import HfApi, hf_hub_download |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError, HfHubHTTPError |
|
|
| api = HfApi() |
|
|
| REPO_ID = os.getenv("HF_DATASET") |
| TOKEN = os.getenv("HF_TOKEN") |
| FILENAME = "latest_backup.tar.gz" |
| BASE_DIR = Path("/root/.openclaw") |
|
|
| |
| PATHS_TO_BACKUP = [ |
| BASE_DIR / "sessions", |
| BASE_DIR / "agents" / "main" / "sessions", |
| BASE_DIR / "openclaw.json", |
| BASE_DIR / "workspace", |
| ] |
|
|
|
|
| def log(msg: str) -> None: |
| print(msg, flush=True) |
|
|
|
|
| def log_err(msg: str) -> None: |
| print(msg, file=sys.stderr, flush=True) |
|
|
|
|
| def is_subpath(child: Path, parent: Path) -> bool: |
| try: |
| child.resolve().relative_to(parent.resolve()) |
| return True |
| except ValueError: |
| return False |
|
|
|
|
| def safe_extract(tar: tarfile.TarFile, target_dir: Path) -> None: |
| target_dir = target_dir.resolve() |
|
|
| for member in tar.getmembers(): |
| member_path = target_dir / member.name |
| if not is_subpath(member_path, target_dir): |
| raise RuntimeError(f"Unsafe path detected in archive: {member.name}") |
|
|
| tar.extractall(path=target_dir) |
|
|
|
|
| def restore() -> bool: |
| if not REPO_ID or not TOKEN: |
| log("[RESTORE] Skip: HF_DATASET or HF_TOKEN not set") |
| return False |
|
|
| try: |
| log(f"[RESTORE] Downloading {FILENAME} from dataset repo: {REPO_ID}") |
| path = hf_hub_download( |
| repo_id=REPO_ID, |
| filename=FILENAME, |
| repo_type="dataset", |
| token=TOKEN, |
| ) |
|
|
| BASE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| with tarfile.open(path, "r:gz") as tar: |
| safe_extract(tar, BASE_DIR) |
|
|
| log(f"[RESTORE] Success: restored from {FILENAME}") |
| return True |
|
|
| except EntryNotFoundError: |
| log(f"[RESTORE] Note: {FILENAME} not found in repo, probably first run") |
| return False |
| except RepositoryNotFoundError: |
| log_err(f"[RESTORE] Error: dataset repo not found: {REPO_ID}") |
| return False |
| except HfHubHTTPError as e: |
| log_err(f"[RESTORE] Hub HTTP error: {e}") |
| return False |
| except tarfile.TarError as e: |
| log_err(f"[RESTORE] Invalid tar archive: {e}") |
| return False |
| except Exception as e: |
| log_err(f"[RESTORE] Unexpected error: {e}") |
| return False |
|
|
|
|
| def backup() -> bool: |
| if not REPO_ID or not TOKEN: |
| log("[BACKUP] Skip: HF_DATASET or HF_TOKEN not set") |
| return False |
|
|
| existing_paths = [] |
| for p in PATHS_TO_BACKUP: |
| if p.exists(): |
| existing_paths.append(p) |
| log(f"[BACKUP] Found: {p}") |
| else: |
| log(f"[BACKUP] Warning: {p} does not exist, skipping") |
| |
| if not existing_paths: |
| log("[BACKUP] Skip: no paths to backup") |
| return False |
|
|
| temp_path = None |
|
|
| try: |
| with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp: |
| temp_path = Path(tmp.name) |
|
|
| log("[BACKUP] Creating archive...") |
| with tarfile.open(temp_path, "w:gz") as tar: |
| for p in existing_paths: |
| if p.is_relative_to(BASE_DIR): |
| arcname = p.relative_to(BASE_DIR) |
| else: |
| arcname = p.name |
| tar.add(str(p), arcname=str(arcname)) |
| log(f"[BACKUP] Added: {p} -> {arcname}") |
|
|
| log(f"[BACKUP] Uploading to Hugging Face: {REPO_ID}/{FILENAME}") |
| api.upload_file( |
| path_or_fileobj=str(temp_path), |
| path_in_repo=FILENAME, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| token=TOKEN, |
| commit_message=f"Update {FILENAME} - {len(existing_paths)} items", |
| ) |
|
|
| log(f"[BACKUP] Success: uploaded {FILENAME}") |
| return True |
|
|
| except RepositoryNotFoundError: |
| log_err(f"[BACKUP] Error: dataset repo not found: {REPO_ID}") |
| return False |
| except HfHubHTTPError as e: |
| log_err(f"[BACKUP] Hub HTTP error: {e}") |
| return False |
| except tarfile.TarError as e: |
| log_err(f"[BACKUP] Tar error: {e}") |
| return False |
| except Exception as e: |
| log_err(f"[BACKUP] Unexpected error: {e}") |
| return False |
| finally: |
| if temp_path and temp_path.exists(): |
| try: |
| temp_path.unlink() |
| log("[BACKUP] Cleaned up temp file") |
| except Exception as e: |
| log_err(f"[BACKUP] Warning: failed to delete temp file {temp_path}: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| action = sys.argv[1].strip().lower() if len(sys.argv) > 1 else "restore" |
| |
| if action == "backup": |
| log("=" * 50) |
| log("Starting backup process...") |
| log("=" * 50) |
| ok = backup() |
| log("=" * 50) |
| log(f"Backup {'successful' if ok else 'failed'}") |
| log("=" * 50) |
| sys.exit(0 if ok else 1) |
| else: |
| log("=" * 50) |
| log("Starting restore process...") |
| log("=" * 50) |
| ok = restore() |
| log("=" * 50) |
| |
| |
| if not ok: |
| |
| |
| log("Restore skipped or not needed (continuing anyway)") |
| sys.exit(0) |
| else: |
| log(f"Restore successful") |
| sys.exit(0) |