Spaces:
Running
Running
File size: 18,778 Bytes
bf076b8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 | #!/usr/bin/env python3
"""
Hermes Agent Data Sync Service
Handles data persistence to/from Hugging Face Dataset
"""
import os
import sys
import time
import json
import shutil
import tarfile
import argparse
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List
from huggingface_hub import HfApi, hf_hub_download, upload_folder
from loguru import logger
# 文件监控(可选)
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
logger.warning("watchdog not installed, file change detection disabled")
class DatasetManager:
"""Manages data synchronization with Hugging Face Dataset"""
def __init__(self, dataset_repo: Optional[str] = None, token: Optional[str] = None):
self.dataset_repo = dataset_repo or os.environ.get('HF_DATASET_REPO')
self.token = token or os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN')
self.api = HfApi(token=self.token)
self.hermes_home = Path(os.environ.get('HERMES_HOME', '/data/.hermes'))
self.temp_dir = Path('/tmp/hermes_sync')
# 数据路径映射
self.path_mapping = {
'config': self.hermes_home / 'config.yaml',
'env': self.hermes_home / '.env',
'auth': self.hermes_home / 'auth.json',
'soul': self.hermes_home / 'SOUL.md',
'memories': self.hermes_home / 'memories',
'skills': self.hermes_home / 'skills',
'sessions': self.hermes_home / 'sessions',
'state_db': self.hermes_home / 'state.db',
'logs': self.hermes_home / 'logs',
'cron': self.hermes_home / 'cron',
'webui_token': Path('/data/.hermes-web-ui') / '.token',
'image_cache': self.hermes_home / 'image_cache',
'baoyu_skills': Path('/home/appuser/.baoyu-skills'),
}
def validate(self) -> bool:
"""验证配置是否正确"""
if not self.dataset_repo:
logger.error("HF_DATASET_REPO not set")
return False
if not self.token:
logger.warning("HF_TOKEN not set, will try public dataset")
return True
def prepare_backup_data(self) -> Path:
"""准备备份数据到临时目录"""
logger.info("Preparing backup data...")
# 清理并创建临时目录
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
self.temp_dir.mkdir(parents=True)
# 创建目录结构
(self.temp_dir / 'config').mkdir()
(self.temp_dir / 'personality').mkdir()
(self.temp_dir / 'memories').mkdir()
(self.temp_dir / 'skills').mkdir()
(self.temp_dir / 'sessions').mkdir()
(self.temp_dir / 'state').mkdir()
(self.temp_dir / 'logs').mkdir()
(self.temp_dir / 'cron').mkdir()
(self.temp_dir / 'webui').mkdir()
(self.temp_dir / 'image_cache').mkdir()
(self.temp_dir / 'baoyu_skills').mkdir()
# 复制文件
try:
# 配置文件
if self.path_mapping['config'].exists():
shutil.copy2(self.path_mapping['config'], self.temp_dir / 'config' / 'config.yaml')
# 环境变量(敏感信息)
if self.path_mapping['env'].exists():
shutil.copy2(self.path_mapping['env'], self.temp_dir / 'config' / '.env')
# OAuth 认证
if self.path_mapping['auth'].exists():
shutil.copy2(self.path_mapping['auth'], self.temp_dir / 'config' / 'auth.json')
# 人格定义
if self.path_mapping['soul'].exists():
shutil.copy2(self.path_mapping['soul'], self.temp_dir / 'personality' / 'SOUL.md')
# 记忆
if self.path_mapping['memories'].exists():
shutil.copytree(self.path_mapping['memories'], self.temp_dir / 'memories', dirs_exist_ok=True)
# 技能
if self.path_mapping['skills'].exists():
shutil.copytree(self.path_mapping['skills'], self.temp_dir / 'skills', dirs_exist_ok=True)
# 会话
if self.path_mapping['sessions'].exists():
shutil.copytree(self.path_mapping['sessions'], self.temp_dir / 'sessions', dirs_exist_ok=True)
# 数据库
if self.path_mapping['state_db'].exists():
shutil.copy2(self.path_mapping['state_db'], self.temp_dir / 'state' / 'state.db')
# 日志
if self.path_mapping['logs'].exists():
shutil.copytree(self.path_mapping['logs'], self.temp_dir / 'logs', dirs_exist_ok=True)
# 定时任务
if self.path_mapping['cron'].exists():
shutil.copytree(self.path_mapping['cron'], self.temp_dir / 'cron', dirs_exist_ok=True)
# Image Cache
if self.path_mapping['image_cache'].exists():
shutil.copytree(self.path_mapping['image_cache'], self.temp_dir / 'image_cache', dirs_exist_ok=True)
# baoyu-skills 用户配置 (EXTEND.md 等)
if self.path_mapping['baoyu_skills'].exists():
shutil.copytree(self.path_mapping['baoyu_skills'], self.temp_dir / 'baoyu_skills', dirs_exist_ok=True)
# WebUI 认证 Token
if self.path_mapping['webui_token'].exists():
(self.temp_dir / 'webui').mkdir(exist_ok=True)
shutil.copy2(self.path_mapping['webui_token'], self.temp_dir / 'webui' / '.token')
# 修复临时目录权限(源文件可能被设为只读,如 baoyu-imagine scripts 的 555)
# upload_folder 需要能正常读取所有文件
logger.info("Fixing permissions in temp backup dir...")
import stat
for root, dirs, files in os.walk(self.temp_dir):
for d in dirs:
dir_path = os.path.join(root, d)
try:
os.chmod(dir_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
except Exception:
pass
for f in files:
file_path = os.path.join(root, f)
try:
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
except Exception:
pass
# 添加元数据
metadata = {
'timestamp': datetime.now().isoformat(),
'version': '0.10.0',
'hermes_home': str(self.hermes_home)
}
with open(self.temp_dir / 'metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
logger.success(f"Backup prepared at {self.temp_dir}")
return self.temp_dir
except Exception as e:
logger.error(f"Failed to prepare backup: {e}")
raise
def upload_to_dataset(self, force: bool = False) -> bool:
"""上传数据到 Hugging Face Dataset"""
try:
backup_dir = self.prepare_backup_data()
logger.info(f"Uploading to dataset: {self.dataset_repo}")
# 上传文件夹到 dataset
self.api.upload_folder(
folder_path=str(backup_dir),
repo_id=self.dataset_repo,
repo_type="dataset",
commit_message=f"Hermes Agent backup - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logger.success("Backup uploaded successfully")
return True
except Exception as e:
logger.error(f"Failed to upload to dataset: {e}")
return False
def download_from_dataset(self) -> bool:
"""从 Hugging Face Dataset 下载数据"""
try:
logger.info(f"Downloading from dataset: {self.dataset_repo}")
# 创建临时下载目录
download_dir = Path('/tmp/hermes_download')
if download_dir.exists():
shutil.rmtree(download_dir)
download_dir.mkdir(parents=True)
# 下载所有文件
self.api.snapshot_download(
repo_id=self.dataset_repo,
repo_type="dataset",
local_dir=str(download_dir)
)
logger.success("Download completed")
# 恢复数据到 Hermes 目录
self.restore_from_download(download_dir)
return True
except Exception as e:
logger.error(f"Failed to download from dataset: {e}")
return False
def restore_from_download(self, download_dir: Path):
"""从下载的目录恢复数据
注意: config.yaml 在恢复时被跳过,因为 entrypoint.sh 会根据环境变量
重新生成正确的 config.yaml。如果恢复旧的 config.yaml,会导致模型
配置被覆盖(例如 minimaxai/minimax-m2.7 被替换为旧模型)。
"""
logger.info("Restoring data to Hermes home...")
# 确保目标目录存在
self.hermes_home.mkdir(parents=True, exist_ok=True)
# 恢复策略控制:
# SKIP_CONFIG_RESTORE=true(默认):跳过由 entrypoint.sh 动态生成的配置,防止旧备份覆盖新配置
# 跳过项:config.yaml、baoyu_skills/
# 恢复项:skills/(Skills Hub 安装的技能,entrypoint.sh 不会重新安装 cover/illustrator 等)
# 保留项:memories、sessions、state.db、.env、auth.json、logs、cron、webui/.token、SOUL.md(用户数据)
# SKIP_CONFIG_RESTORE=false:恢复所有备份(配置稳定后使用)
skip_restore = os.environ.get('SKIP_CONFIG_RESTORE', 'true').lower() in ('true', '1', 'yes')
# 定义跳过恢复的路径(entrypoint.sh 会重新生成这些配置)
# NOTE: skills 目录不跳过!它包含 Skills Hub 安装的技能(cover-image, article-illustrator 等),
# entrypoint.sh 只重新安装 baoyu-imagine,不会安装其他技能。
# 恢复后 entrypoint.sh 会覆盖 baoyu-imagine 确保其最新(第652-654行 cp -r)。
if skip_restore:
skip_paths = {
'config/config.yaml', # 模型/供应商配置由 entrypoint.sh 根据环境变量生成
'baoyu_skills', # baoyu-skills EXTEND.md 由 entrypoint.sh 重新生成
}
logger.info(f"SKIP_CONFIG_RESTORE=true, skipping: {', '.join(skip_paths)}")
else:
skip_paths = set()
logger.info("SKIP_CONFIG_RESTORE=false, restoring all backed-up configurations")
restore_mapping = {
'config/.env': self.path_mapping['env'],
'config/auth.json': self.path_mapping['auth'],
'personality/SOUL.md': self.path_mapping['soul'],
'memories': self.path_mapping['memories'],
'skills': self.path_mapping['skills'],
'sessions': self.path_mapping['sessions'],
'state/state.db': self.path_mapping['state_db'],
'logs': self.path_mapping['logs'],
'cron': self.path_mapping['cron'],
'webui/.token': self.path_mapping['webui_token'],
'image_cache': self.path_mapping['image_cache'],
'baoyu_skills': self.path_mapping['baoyu_skills'],
}
# config.yaml 特殊处理:
# 即使不在 skip_paths 中,如果 skip_restore=true,也不直接覆盖,而是保存到 .restored 供合并
if not skip_restore:
restore_mapping['config/config.yaml'] = self.path_mapping['config']
else:
# 恢复到 .restored 文件,供 entrypoint.sh 合并用户修改的配置区块(如 channels、display 等)
restored_path = self.hermes_home / 'config.yaml.restored'
src = download_dir / 'config' / 'config.yaml'
if src.exists():
shutil.copy2(src, restored_path)
logger.info("Restored config.yaml to config.yaml.restored for merge")
for src_rel, dst in restore_mapping.items():
# 检查是否在跳过列表中
if src_rel in skip_paths:
logger.info(f"Skipping restore of {src_rel} (will be regenerated by entrypoint.sh)")
continue
src = download_dir / src_rel
if src.exists():
try:
if src.is_file():
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
logger.info(f"Restored: {src_rel}")
elif src.is_dir():
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
logger.info(f"Restored directory: {src_rel}")
except Exception as e:
logger.error(f"Failed to restore {src_rel}: {e}")
else:
logger.warning(f"Not found in backup: {src_rel}")
logger.success("Data restoration completed")
class ConfigFileHandler(FileSystemEventHandler):
"""配置文件变化处理器 - 实时同步到 Dataset 并触发重载"""
# 启动静默期(秒):在此期间内的文件变更不予备份,避免启动阶段冗余上传
STARTUP_GRACE_PERIOD = 30
def __init__(self, manager: DatasetManager):
self.manager = manager
self.last_backup_time = 0
self.backup_cooldown = 5 # 5秒内不重复备份
self.start_time = time.time() # 记录处理器创建时间
self._startup_logged = False
def on_modified(self, event):
"""文件被修改时触发"""
if event.is_directory:
return
# 启动静默期:跳过启动阶段的配置变更备份
elapsed = time.time() - self.start_time
if elapsed < self.STARTUP_GRACE_PERIOD:
if not self._startup_logged:
logger.info(f"In startup grace period ({int(self.STARTUP_GRACE_PERIOD - elapsed)}s remaining), skipping backup for: {event.src_path}")
self._startup_logged = True
return
# 只关注关键配置文件
watched_files = ['config.yaml', '.env', 'auth.json']
if any(event.src_path.endswith(f) for f in watched_files):
current_time = time.time()
if current_time - self.last_backup_time > self.backup_cooldown:
logger.info(f"Config file changed: {event.src_path}")
logger.info("Triggering immediate backup...")
try:
self.manager.upload_to_dataset()
self.last_backup_time = current_time
logger.success("Immediate backup completed")
# 尝试触发 Hermes 配置重载
self._trigger_reload()
except Exception as e:
logger.error(f"Immediate backup failed: {e}")
def _trigger_reload(self):
"""尝试触发 Hermes 配置重载"""
# 注意:Hermes 目前没有 config reload 命令
# 配置将在下次 Space 重启时自动生效
logger.info("Configuration saved. Please restart Space to apply changes immediately.")
def run_daemon():
"""后台守护进程模式 - 定期同步 + 实时文件监听"""
logger.info("Starting data sync daemon...")
sync_interval = int(os.environ.get('SYNC_INTERVAL', '60')) # 默认60秒(实时模式)
manager = DatasetManager()
if not manager.validate():
logger.error("Configuration invalid, exiting")
sys.exit(1)
logger.info(f"Sync interval: {sync_interval} seconds")
# 如果 watchdog 可用,启动文件监听
observer = None
if WATCHDOG_AVAILABLE:
try:
logger.info("Starting file watcher for real-time sync...")
event_handler = ConfigFileHandler(manager)
observer = Observer()
observer.schedule(event_handler, str(manager.hermes_home), recursive=False)
observer.start()
logger.success("File watcher started - config changes will trigger immediate backup")
except Exception as e:
logger.error(f"Failed to start file watcher: {e}")
logger.warning("Falling back to scheduled sync only")
observer = None
else:
logger.warning("Watchdog not available, using scheduled sync only")
try:
while True:
try:
time.sleep(sync_interval)
logger.info("Performing scheduled backup...")
manager.upload_to_dataset()
except KeyboardInterrupt:
logger.info("Daemon stopped")
break
except Exception as e:
logger.error(f"Sync error: {e}")
finally:
# 清理文件监听器
if observer:
logger.info("Stopping file watcher...")
observer.stop()
observer.join()
logger.info("File watcher stopped")
def main():
parser = argparse.ArgumentParser(description='Hermes Agent Data Sync')
parser.add_argument('action', choices=['backup', 'restore', 'daemon'],
help='Action to perform')
parser.add_argument('--force', '-f', action='store_true',
help='Force backup even if no changes')
args = parser.parse_args()
manager = DatasetManager()
if not manager.validate():
logger.error("Configuration invalid")
sys.exit(1)
if args.action == 'backup':
success = manager.upload_to_dataset(force=args.force)
sys.exit(0 if success else 1)
elif args.action == 'restore':
success = manager.download_from_dataset()
sys.exit(0 if success else 1)
elif args.action == 'daemon':
run_daemon()
if __name__ == '__main__':
main()
|