sub / backup_to_dataset.py
gallyg's picture
Upload 6 files
25c8da1 verified
#!/usr/bin/env python3
import argparse
import gzip
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from huggingface_hub import HfApi, HfFileSystem
from huggingface_hub import CommitOperationDelete
def env(name: str, default: str | None = None, required: bool = False) -> str:
value = os.getenv(name, default)
if required and not value:
raise RuntimeError(f"Missing required environment variable: {name}")
return value or ""
def run_pg_dump(tmp_sql: Path) -> None:
host = env("DATABASE_HOST", "127.0.0.1")
port = env("DATABASE_PORT", "5432")
user = env("DATABASE_USER", env("POSTGRES_USER", "sub2api"))
password = env("DATABASE_PASSWORD", env("POSTGRES_PASSWORD", ""))
dbname = env("DATABASE_DBNAME", env("POSTGRES_DB", "sub2api"))
cmd = [
"pg_dump",
"-h", host,
"-p", port,
"-U", user,
"-d", dbname,
"--no-owner",
"--no-privileges",
"--clean",
"--if-exists",
"-f", str(tmp_sql),
]
env_map = os.environ.copy()
env_map["PGPASSWORD"] = password
print(f"[backup] running: {' '.join(cmd[:-1])} <output>")
subprocess.run(cmd, check=True, env=env_map)
def gzip_file(src: Path, dst: Path) -> None:
with src.open("rb") as fin, gzip.open(dst, "wb", compresslevel=6) as fout:
shutil.copyfileobj(fin, fout)
def upload_backup(gz_path: Path, metadata: dict) -> None:
hf_token = env("HF_TOKEN", required=True)
dataset_repo_id = env("DATASET_REPO_ID", required=True)
api = HfApi(token=hf_token)
remote_sql_path = metadata["remote_sql_path"]
remote_latest_path = "postgres/latest.json"
print(f"[backup] uploading {gz_path.name} -> {dataset_repo_id}:{remote_sql_path}")
api.upload_file(
path_or_fileobj=str(gz_path),
path_in_repo=remote_sql_path,
repo_id=dataset_repo_id,
repo_type="dataset",
commit_message=f"backup: {metadata['timestamp_utc']}",
)
latest_tmp = gz_path.parent / "latest.json"
latest_tmp.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
api.upload_file(
path_or_fileobj=str(latest_tmp),
path_in_repo=remote_latest_path,
repo_id=dataset_repo_id,
repo_type="dataset",
commit_message=f"update latest backup metadata: {metadata['timestamp_utc']}",
)
def prune_old_backups() -> None:
hf_token = env("HF_TOKEN", required=True)
dataset_repo_id = env("DATASET_REPO_ID", required=True)
keep_last = int(env("BACKUP_KEEP_LAST", "10"))
fs = HfFileSystem(token=hf_token)
api = HfApi(token=hf_token)
pattern = f"datasets/{dataset_repo_id}/postgres/*.sql.gz"
all_files = fs.glob(pattern)
prefix = f"datasets/{dataset_repo_id}/"
remote_paths = sorted([p[len(prefix):] for p in all_files if p.startswith(prefix)])
if len(remote_paths) <= keep_last:
print(f"[backup] retention ok: {len(remote_paths)} <= {keep_last}")
return
to_delete = remote_paths[:-keep_last]
print(f"[backup] pruning {len(to_delete)} old backup(s)")
operations = [CommitOperationDelete(path_in_repo=p) for p in to_delete]
api.create_commit(
repo_id=dataset_repo_id,
repo_type="dataset",
operations=operations,
commit_message=f"prune old backups, keep last {keep_last}",
)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--once", action="store_true", help="Run one backup and exit")
parser.parse_args()
workdir = Path("/tmp/sub2api_backup")
workdir.mkdir(parents=True, exist_ok=True)
now = datetime.now(timezone.utc)
ts = now.strftime("%Y%m%d-%H%M%S")
sql_path = workdir / f"{ts}.sql"
gz_path = workdir / f"{ts}.sql.gz"
try:
run_pg_dump(sql_path)
gzip_file(sql_path, gz_path)
metadata = {
"timestamp_utc": ts,
"generated_at_iso": now.isoformat(),
"database_host": env("DATABASE_HOST", "127.0.0.1"),
"database_port": env("DATABASE_PORT", "5432"),
"database_name": env("DATABASE_DBNAME", env("POSTGRES_DB", "sub2api")),
"dataset_repo_id": env("DATASET_REPO_ID", ""),
"file_name": gz_path.name,
"remote_sql_path": f"postgres/{ts}.sql.gz",
"file_size_bytes": gz_path.stat().st_size,
}
upload_backup(gz_path, metadata)
prune_old_backups()
print("[backup] done")
return 0
except Exception as exc:
print(f"[backup] failed: {exc}", file=sys.stderr)
return 1
finally:
for p in workdir.glob("*"):
try:
p.unlink()
except Exception:
pass
if __name__ == "__main__":
raise SystemExit(main())