from __future__ import annotations import os import subprocess import threading import time from pathlib import Path from huggingface_hub import HfApi, snapshot_download BASE_DIR = Path(__file__).resolve().parent PROJECT_DIR = BASE_DIR / "grok2api" DATA_DIR = Path(os.getenv("DATA_DIR", str(PROJECT_DIR / "data"))).expanduser() LOG_DIR = Path(os.getenv("LOG_DIR", str(PROJECT_DIR / "logs"))).expanduser() HF_TOKEN = os.getenv("HF_TOKEN", "") DATASET_ID = os.getenv("DATASET_ID", "") SYNC_INTERVAL = max(int(os.getenv("HF_SYNC_INTERVAL", "1800")), 60) SERVER_HOST = os.getenv("SERVER_HOST", "0.0.0.0") SERVER_PORT = os.getenv("SERVER_PORT") or os.getenv("PORT") or "8000" SERVER_WORKERS = os.getenv("SERVER_WORKERS", "1") SYNC_ALLOW_PATTERNS = ["data/**"] SYNC_IGNORE_PATTERNS = [ "data/.locks/**", "data/tmp/**", "logs/**", "**/__pycache__/**", ] def log(message: str) -> None: print(f"[HF-Space] {message}", flush=True) def ensure_local_dirs() -> None: DATA_DIR.mkdir(parents=True, exist_ok=True) LOG_DIR.mkdir(parents=True, exist_ok=True) def download_data() -> None: if not DATASET_ID: log("未配置 DATASET_ID,跳过启动数据同步。") return try: log(f"开始从 Dataset 拉取数据: {DATASET_ID}") snapshot_download( repo_id=DATASET_ID, repo_type="dataset", local_dir=str(PROJECT_DIR), token=HF_TOKEN or None, allow_patterns=SYNC_ALLOW_PATTERNS, ignore_patterns=SYNC_IGNORE_PATTERNS, ) log("数据拉取完成。") except Exception as exc: log(f"数据拉取失败,继续本地启动: {exc}") def upload_data(run_as_future: bool) -> None: if not DATASET_ID: return if not HF_TOKEN: log("已配置 DATASET_ID,但未配置 HF_TOKEN,跳过数据上传。") return try: api = HfApi(token=HF_TOKEN) api.upload_folder( folder_path=str(PROJECT_DIR), repo_id=DATASET_ID, repo_type="dataset", commit_message="chore: sync Grok2API data from Space", allow_patterns=SYNC_ALLOW_PATTERNS, ignore_patterns=SYNC_IGNORE_PATTERNS, run_as_future=run_as_future, ) if run_as_future: log("已提交后台数据同步任务。") else: log("退出前数据同步完成。") except Exception as exc: if "No files have been modified" not in str(exc): log(f"数据上传失败: {exc}") def upload_loop() -> None: while True: time.sleep(SYNC_INTERVAL) upload_data(run_as_future=True) def init_storage() -> None: subprocess.run( ["sh", "scripts/init_storage.sh"], cwd=PROJECT_DIR, check=True, env=os.environ.copy(), ) def run_server() -> None: env = os.environ.copy() env.setdefault("DATA_DIR", str(DATA_DIR)) env.setdefault("LOG_DIR", str(LOG_DIR)) env.setdefault("LOG_FILE_ENABLED", "false") command = [ "granian", "--interface", "asgi", "--host", SERVER_HOST, "--port", SERVER_PORT, "--workers", SERVER_WORKERS, "main:app", ] log( f"启动 Grok2API: host={SERVER_HOST} port={SERVER_PORT} workers={SERVER_WORKERS}" ) subprocess.run(command, cwd=PROJECT_DIR, check=True, env=env) if __name__ == "__main__": ensure_local_dirs() download_data() init_storage() backup_thread = threading.Thread(target=upload_loop, daemon=True) backup_thread.start() try: run_server() finally: upload_data(run_as_future=False)