openclaw / sync.py
aip9105's picture
Create sync.py
b7226d4 verified
import os
import sys
import tarfile
from huggingface_hub import HfApi, hf_hub_download
api = HfApi()
repo_id = os.getenv("HF_DATASET")
token = os.getenv("HF_TOKEN")
FILENAME = "latest_backup.tar.gz"
def restore():
try:
if not repo_id or not token:
print("Skip Restore: HF_DATASET or HF_TOKEN not set")
return
# ็›ดๆŽฅไธ‹่ฝฝๆœ€ๆ–ฐๆ–‡ไปถ
print(f"Downloading {FILENAME} from {repo_id}...")
path = hf_hub_download(repo_id=repo_id, filename=FILENAME, repo_type="dataset", token=token)
with tarfile.open(path, "r:gz") as tar:
tar.extractall(path="/root/.openclaw/")
print(f"Success: Restored from {FILENAME}")
return True
except Exception as e:
# ๅฆ‚ๆžœๆ˜ฏ็ฌฌไธ€ๆฌก่ฟ่กŒ๏ผŒไป“ๅบ“้‡Œๆฒกๆ–‡ไปถ๏ผŒๆŠฅ้”™ๆ˜ฏๆญฃๅธธ็š„
print(f"Restore Note: No existing backup found or error: {e}")
def backup():
try:
if not repo_id or not token:
print("Skip Backup: HF_DATASET or HF_TOKEN not set")
return
with tarfile.open(FILENAME, "w:gz") as tar:
# ๅค‡ไปฝๅ…ณ้”ฎๆ•ฐๆฎ
paths_to_backup = [
"/root/.openclaw/sessions",
"/root/.openclaw/agents/main/sessions",
"/root/.openclaw/openclaw.json"
]
for p in paths_to_backup:
if os.path.exists(p):
arcname = p.replace("/root/.openclaw/", "")
tar.add(p, arcname=arcname)
# ไธŠไผ ๅนถ่ฆ†็›–
api.upload_file(
path_or_fileobj=FILENAME,
path_in_repo=FILENAME,
repo_id=repo_id,
repo_type="dataset",
token=token
)
print(f"Backup {FILENAME} Success (Overwritten).")
except Exception as e:
print(f"Backup Error: {e}")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "backup":
backup()
else:
restore()