| |
| """ |
| HuggingClaw Workspace Sync β HuggingFace Hub based backup |
| Uses huggingface_hub Python library instead of git for more reliable |
| HF Dataset operations (handles auth, LFS, retries automatically). |
| |
| Falls back to git-based sync if HF_USERNAME or HF_TOKEN are not set. |
| """ |
|
|
| import os |
| import sys |
| import time |
| import signal |
| import shutil |
| import subprocess |
| from pathlib import Path |
|
|
| os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") |
|
|
| OPENCLAW_HOME = Path("/home/node/.openclaw") |
| WORKSPACE = OPENCLAW_HOME / "workspace" |
| STATE_DIR = WORKSPACE / ".huggingclaw-state" |
| OPENCLAW_STATE_BACKUP_DIR = STATE_DIR / "openclaw" |
| EXCLUDED_STATE_NAMES = { |
| "workspace", |
| "openclaw-app", |
| "gateway.log", |
| "browser", |
| } |
| WHATSAPP_CREDS_DIR = Path("/home/node/.openclaw/credentials/whatsapp/default") |
| WHATSAPP_BACKUP_DIR = STATE_DIR / "credentials" / "whatsapp" / "default" |
| RESET_MARKER = WORKSPACE / ".reset_credentials" |
| INTERVAL = int(os.environ.get("SYNC_INTERVAL", "180")) |
| INITIAL_DELAY = int(os.environ.get("SYNC_START_DELAY", "10")) |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") |
| HF_USERNAME = os.environ.get("HF_USERNAME", "") |
| BACKUP_DATASET = os.environ.get("BACKUP_DATASET_NAME", "huggingclaw-backup") |
| WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "") |
| WHATSAPP_ENABLED = os.environ.get("WHATSAPP_ENABLED", "").strip().lower() == "true" |
|
|
| running = True |
|
|
| def signal_handler(sig, frame): |
| global running |
| running = False |
|
|
| signal.signal(signal.SIGTERM, signal_handler) |
| signal.signal(signal.SIGINT, signal_handler) |
|
|
|
|
| def count_files(path: Path) -> int: |
| """Count regular files recursively under a path.""" |
| if not path.exists(): |
| return 0 |
| return sum(1 for child in path.rglob("*") if child.is_file()) |
|
|
|
|
| def snapshot_state_into_workspace() -> None: |
| """ |
| Mirror persistent state into the workspace-backed dataset repo. |
| |
| This keeps WhatsApp credentials in a hidden folder that is synced together |
| with the workspace, without changing the live credentials location. |
| """ |
| try: |
| STATE_DIR.mkdir(parents=True, exist_ok=True) |
| if OPENCLAW_STATE_BACKUP_DIR.exists(): |
| shutil.rmtree(OPENCLAW_STATE_BACKUP_DIR, ignore_errors=True) |
| OPENCLAW_STATE_BACKUP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| for source_path in OPENCLAW_HOME.iterdir(): |
| if source_path.name in EXCLUDED_STATE_NAMES: |
| continue |
|
|
| backup_path = OPENCLAW_STATE_BACKUP_DIR / source_path.name |
| if source_path.is_dir(): |
| shutil.copytree(source_path, backup_path) |
| elif source_path.is_file(): |
| shutil.copy2(source_path, backup_path) |
| except Exception as e: |
| print(f" β οΈ Could not snapshot OpenClaw state: {e}") |
|
|
| try: |
| if not WHATSAPP_ENABLED: |
| return |
|
|
| STATE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| if RESET_MARKER.exists(): |
| if WHATSAPP_BACKUP_DIR.exists(): |
| shutil.rmtree(WHATSAPP_BACKUP_DIR, ignore_errors=True) |
| print("π§Ή Removed backed-up WhatsApp credentials after reset request.") |
| RESET_MARKER.unlink(missing_ok=True) |
| return |
|
|
| if not WHATSAPP_CREDS_DIR.exists(): |
| return |
|
|
| file_count = count_files(WHATSAPP_CREDS_DIR) |
| if file_count < 2: |
| if file_count > 0: |
| print(f"π¦ WhatsApp backup skipped: credentials incomplete ({file_count} files).") |
| return |
|
|
| WHATSAPP_BACKUP_DIR.parent.mkdir(parents=True, exist_ok=True) |
| if WHATSAPP_BACKUP_DIR.exists(): |
| shutil.rmtree(WHATSAPP_BACKUP_DIR, ignore_errors=True) |
| shutil.copytree(WHATSAPP_CREDS_DIR, WHATSAPP_BACKUP_DIR) |
| except Exception as e: |
| print(f" β οΈ Could not snapshot WhatsApp state: {e}") |
|
|
|
|
| def has_changes(): |
| """Check if workspace has uncommitted changes (git-based check).""" |
| try: |
| subprocess.run(["git", "add", "-A"], cwd=WORKSPACE, capture_output=True) |
| result = subprocess.run( |
| ["git", "diff", "--cached", "--quiet"], |
| cwd=WORKSPACE, capture_output=True |
| ) |
| return result.returncode != 0 |
| except Exception: |
| return False |
|
|
| def write_sync_status(status, message=""): |
| """Write sync status to file for the health server dashboard.""" |
| try: |
| import json |
| data = { |
| "status": status, |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
| "message": message |
| } |
| with open("/tmp/sync-status.json", "w") as f: |
| json.dump(data, f) |
| except Exception as e: |
| print(f" β οΈ Could not write sync status: {e}") |
|
|
| def trigger_webhook(event, status, message): |
| """Trigger webhook notification.""" |
| if not WEBHOOK_URL: |
| return |
| try: |
| import urllib.request |
| import json |
| data = json.dumps({ |
| "event": event, |
| "status": status, |
| "message": message, |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) |
| }).encode('utf-8') |
| req = urllib.request.Request(WEBHOOK_URL, data=data, headers={'Content-Type': 'application/json'}) |
| urllib.request.urlopen(req, timeout=10) |
| except Exception as e: |
| print(f" β οΈ Webhook delivery failed: {e}") |
|
|
| def sync_with_hf_hub(): |
| """Sync workspace using huggingface_hub library.""" |
| try: |
| from huggingface_hub import HfApi, upload_folder |
|
|
| api = HfApi(token=HF_TOKEN) |
| repo_id = f"{HF_USERNAME}/{BACKUP_DATASET}" |
|
|
| |
| try: |
| api.repo_info(repo_id=repo_id, repo_type="dataset") |
| except Exception: |
| print(f" π Creating dataset {repo_id}...") |
| try: |
| api.create_repo(repo_id=repo_id, repo_type="dataset", private=True) |
| print(f" β
Dataset created: {repo_id}") |
| except Exception as e: |
| print(f" β οΈ Could not create dataset: {e}") |
| return False |
|
|
| |
| upload_folder( |
| folder_path=str(WORKSPACE), |
| repo_id=repo_id, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| commit_message=f"Auto-sync {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}", |
| ignore_patterns=[".git/*", ".git"], |
| ) |
| return True |
|
|
| except ImportError: |
| print(" β οΈ huggingface_hub not installed, falling back to git") |
| return False |
| except Exception as e: |
| print(f" β οΈ HF Hub sync failed: {e}") |
| return False |
|
|
|
|
| def sync_with_git(): |
| """Fallback: sync workspace using git.""" |
| try: |
| ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) |
| subprocess.run(["git", "add", "-A"], cwd=WORKSPACE, capture_output=True) |
| subprocess.run( |
| ["git", "commit", "-m", f"Auto-sync {ts}"], |
| cwd=WORKSPACE, capture_output=True |
| ) |
| result = subprocess.run( |
| ["git", "push", "origin", "main"], |
| cwd=WORKSPACE, capture_output=True |
| ) |
| return result.returncode == 0 |
| except Exception: |
| return False |
|
|
|
|
| def run_sync_pass(use_hf_hub: bool) -> None: |
| """Snapshot state and push it if anything changed.""" |
| snapshot_state_into_workspace() |
|
|
| if not has_changes(): |
| return |
|
|
| ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) |
| write_sync_status("syncing", f"Starting sync at {ts}") |
|
|
| if use_hf_hub: |
| if sync_with_hf_hub(): |
| print(f"π Workspace sync (hf_hub): pushed changes ({ts})") |
| write_sync_status("success", "Successfully pushed to HF Hub") |
| return |
|
|
| if sync_with_git(): |
| print(f"π Workspace sync (git fallback): pushed changes ({ts})") |
| write_sync_status("success", "Successfully pushed via git fallback") |
| return |
|
|
| msg = f"Workspace sync: failed ({ts}), will retry" |
| print(f"π {msg}") |
| write_sync_status("error", msg) |
| trigger_webhook("sync", "error", msg) |
| return |
|
|
| if sync_with_git(): |
| print(f"π Workspace sync (git): pushed changes ({ts})") |
| write_sync_status("success", "Successfully pushed via git") |
| return |
|
|
| msg = f"Workspace sync: push failed ({ts}), will retry" |
| print(f"π {msg}") |
| write_sync_status("error", msg) |
| trigger_webhook("sync", "error", msg) |
|
|
|
|
| def main(): |
| if "--snapshot-once" in sys.argv: |
| snapshot_state_into_workspace() |
| write_sync_status("configured", "State snapshot refreshed during shutdown.") |
| return |
|
|
| if "--sync-once" in sys.argv: |
| if not WORKSPACE.exists(): |
| print("π Workspace sync: workspace not found, exiting.") |
| return |
|
|
| use_hf_hub = bool(HF_TOKEN and HF_USERNAME) |
| git_dir = WORKSPACE / ".git" |
|
|
| if not use_hf_hub and not git_dir.exists(): |
| print("π Workspace sync: no git repo and no HF credentials, skipping.") |
| return |
|
|
| snapshot_state_into_workspace() |
|
|
| if not has_changes(): |
| print("π Workspace sync: no changes to persist.") |
| write_sync_status("configured", "No new state changes to sync.") |
| return |
|
|
| ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) |
| write_sync_status("syncing", f"Shutdown sync started at {ts}") |
|
|
| if use_hf_hub: |
| if sync_with_hf_hub(): |
| print(f"π Workspace sync (hf_hub): pushed changes ({ts})") |
| write_sync_status("success", "Shutdown sync pushed to HF Hub") |
| return |
| if sync_with_git(): |
| print(f"π Workspace sync (git fallback): pushed changes ({ts})") |
| write_sync_status("success", "Shutdown sync pushed via git fallback") |
| return |
| write_sync_status("error", "Shutdown sync failed") |
| print("π Workspace sync: shutdown sync failed.") |
| return |
|
|
| if sync_with_git(): |
| print(f"π Workspace sync (git): pushed changes ({ts})") |
| write_sync_status("success", "Shutdown sync pushed via git") |
| return |
|
|
| write_sync_status("error", "Shutdown sync failed") |
| print("π Workspace sync: shutdown sync failed.") |
| return |
|
|
| if not WORKSPACE.exists(): |
| print("π Workspace sync: workspace not found, exiting.") |
| return |
|
|
| use_hf_hub = bool(HF_TOKEN and HF_USERNAME) |
| git_dir = WORKSPACE / ".git" |
|
|
| if not use_hf_hub and not git_dir.exists(): |
| print("π Workspace sync: no git repo and no HF credentials, skipping.") |
| return |
|
|
| |
| if use_hf_hub: |
| write_sync_status("configured", f"Backup enabled. Waiting for next sync in {INTERVAL}s.") |
| else: |
| write_sync_status("configured", f"Git sync enabled. Waiting for next sync in {INTERVAL}s.") |
|
|
| |
| time.sleep(INITIAL_DELAY) |
|
|
| if use_hf_hub: |
| print(f"π Workspace sync started (huggingface_hub): every {INTERVAL}s β {HF_USERNAME}/{BACKUP_DATASET}") |
| else: |
| print(f"π Workspace sync started (git): every {INTERVAL}s") |
|
|
| run_sync_pass(use_hf_hub) |
|
|
| while running: |
| time.sleep(INTERVAL) |
| if not running: |
| break |
|
|
| run_sync_pass(use_hf_hub) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|