openclaw03 / sync.py
wenyin's picture
Upload 3 files
eaa272b verified
import os
import sys
import tarfile
import hashlib
import logging
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
# ── 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%SZ",
)
log = logging.getLogger("sync")
# ── 配置
api = HfApi()
repo_id = os.getenv("HF_DATASET")
token = os.getenv("HF_TOKEN")
FILENAME = "latest_backup.tar.gz"
BACKUP_PATH = f"/tmp/{FILENAME}"
BASE_DIR = "/home/node/.openclaw"
PATHS_TO_BACKUP = [
f"{BASE_DIR}/sessions",
f"{BASE_DIR}/agents/main/sessions",
f"{BASE_DIR}/credentials",
f"{BASE_DIR}/workspace",
f"{BASE_DIR}/extensions",
f"{BASE_DIR}/openclaw.json",
]
# ── 工具函数
def _check_env() -> bool:
if not repo_id or not token:
log.warning("HF_DATASET 或 HF_TOKEN 未设置,跳过同步。")
return False
return True
def _sha256(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _verify_tar(path: str) -> bool:
try:
with tarfile.open(path, "r:gz") as tar:
members = tar.getmembers()
if not members:
log.warning("压缩包为空,跳过。")
return False
log.info(f"压缩包验证通过,共 {len(members)} 个条目。")
return True
except tarfile.TarError as e:
log.error(f"压缩包损坏: {e}")
return False
# ── restore
def restore() -> bool:
if not _check_env():
return False
log.info(f"开始恢复:从 {repo_id} 下载 {FILENAME} ...")
try:
path = hf_hub_download(
repo_id=repo_id,
filename=FILENAME,
repo_type="dataset",
token=token,
)
except (EntryNotFoundError, RepositoryNotFoundError):
log.info("仓库中尚无备份文件,首次运行,跳过恢复。")
return False
except Exception as e:
log.error(f"下载失败: {e}")
return False
if not _verify_tar(path):
log.error("备份文件验证失败,放弃解压。")
return False
log.info(f"文件 SHA-256: {_sha256(path)}")
try:
os.makedirs(BASE_DIR, exist_ok=True)
with tarfile.open(path, "r:gz") as tar:
# 兼容处理:如果你之前的备份带有 /root 路径,解压时会自动映射到当前目录
tar.extractall(path=BASE_DIR)
log.info(f"恢复成功 → {BASE_DIR}")
return True
except Exception as e:
log.error(f"解压失败: {e}")
return False
# ── backup
def backup() -> bool:
if not _check_env():
return False
existing = [p for p in PATHS_TO_BACKUP if os.path.exists(p)]
if not existing:
log.warning("所有备份路径均不存在,跳过备份。")
return False
log.info(f"开始备份,共 {len(existing)} 个路径...")
try:
with tarfile.open(BACKUP_PATH, "w:gz") as tar:
for p in existing:
# 剥离前缀,确保解压时不带绝对路径
arcname = p.replace(f"{BASE_DIR}/", "")
tar.add(p, arcname=arcname, recursive=True)
log.info(f" 已打包: {p}{arcname}")
except Exception as e:
log.error(f"打包失败: {e}")
return False
if not _verify_tar(BACKUP_PATH):
log.error("生成的压缩包验证失败,取消上传。")
return False
log.info(f"压缩包大小: {os.path.getsize(BACKUP_PATH)/1024:.1f} KB,SHA-256: {_sha256(BACKUP_PATH)}")
try:
api.upload_file(
path_or_fileobj=BACKUP_PATH,
path_in_repo=FILENAME,
repo_id=repo_id,
repo_type="dataset",
token=token,
commit_message=f"backup {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC",
)
log.info(f"备份上传成功 → {repo_id}/{FILENAME}")
return True
except Exception as e:
log.error(f"上传失败: {e}")
return False
finally:
if os.path.exists(BACKUP_PATH):
os.remove(BACKUP_PATH)
log.info("本地临时文件已清理。")
# ── 入口
if __name__ == "__main__":
action = sys.argv[1] if len(sys.argv) > 1 else "restore"
if action == "backup":
success = backup()
elif action == "restore":
success = restore()
else:
log.error(f"未知命令: {action},用法: python sync.py [backup|restore]")
sys.exit(1)
sys.exit(0 if success else 1)