import os import sys import tarfile import hashlib import logging from datetime import datetime from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError # ── 日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ", ) log = logging.getLogger("sync") # ── 配置 api = HfApi() repo_id = os.getenv("HF_DATASET") token = os.getenv("HF_TOKEN") FILENAME = "latest_backup.tar.gz" BACKUP_PATH = f"/tmp/{FILENAME}" BASE_DIR = "/home/node/.openclaw" PATHS_TO_BACKUP = [ f"{BASE_DIR}/sessions", f"{BASE_DIR}/agents/main/sessions", f"{BASE_DIR}/credentials", f"{BASE_DIR}/workspace", f"{BASE_DIR}/extensions", f"{BASE_DIR}/openclaw.json", ] # ── 工具函数 def _check_env() -> bool: if not repo_id or not token: log.warning("HF_DATASET 或 HF_TOKEN 未设置,跳过同步。") return False return True def _sha256(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest() def _verify_tar(path: str) -> bool: try: with tarfile.open(path, "r:gz") as tar: members = tar.getmembers() if not members: log.warning("压缩包为空,跳过。") return False log.info(f"压缩包验证通过,共 {len(members)} 个条目。") return True except tarfile.TarError as e: log.error(f"压缩包损坏: {e}") return False # ── restore def restore() -> bool: if not _check_env(): return False log.info(f"开始恢复:从 {repo_id} 下载 {FILENAME} ...") try: path = hf_hub_download( repo_id=repo_id, filename=FILENAME, repo_type="dataset", token=token, ) except (EntryNotFoundError, RepositoryNotFoundError): log.info("仓库中尚无备份文件,首次运行,跳过恢复。") return False except Exception as e: log.error(f"下载失败: {e}") return False if not _verify_tar(path): log.error("备份文件验证失败,放弃解压。") return False log.info(f"文件 SHA-256: {_sha256(path)}") try: os.makedirs(BASE_DIR, exist_ok=True) with tarfile.open(path, "r:gz") as tar: # 兼容处理:如果你之前的备份带有 /root 路径,解压时会自动映射到当前目录 tar.extractall(path=BASE_DIR) log.info(f"恢复成功 → {BASE_DIR}") return True except Exception as e: log.error(f"解压失败: {e}") return False # ── backup def backup() -> bool: if not _check_env(): return False existing = [p for p in PATHS_TO_BACKUP if os.path.exists(p)] if not existing: log.warning("所有备份路径均不存在,跳过备份。") return False log.info(f"开始备份,共 {len(existing)} 个路径...") try: with tarfile.open(BACKUP_PATH, "w:gz") as tar: for p in existing: # 剥离前缀,确保解压时不带绝对路径 arcname = p.replace(f"{BASE_DIR}/", "") tar.add(p, arcname=arcname, recursive=True) log.info(f" 已打包: {p} → {arcname}") except Exception as e: log.error(f"打包失败: {e}") return False if not _verify_tar(BACKUP_PATH): log.error("生成的压缩包验证失败,取消上传。") return False log.info(f"压缩包大小: {os.path.getsize(BACKUP_PATH)/1024:.1f} KB,SHA-256: {_sha256(BACKUP_PATH)}") try: api.upload_file( path_or_fileobj=BACKUP_PATH, path_in_repo=FILENAME, repo_id=repo_id, repo_type="dataset", token=token, commit_message=f"backup {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC", ) log.info(f"备份上传成功 → {repo_id}/{FILENAME}") return True except Exception as e: log.error(f"上传失败: {e}") return False finally: if os.path.exists(BACKUP_PATH): os.remove(BACKUP_PATH) log.info("本地临时文件已清理。") # ── 入口 if __name__ == "__main__": action = sys.argv[1] if len(sys.argv) > 1 else "restore" if action == "backup": success = backup() elif action == "restore": success = restore() else: log.error(f"未知命令: {action},用法: python sync.py [backup|restore]") sys.exit(1) sys.exit(0 if success else 1)