| from fastapi import FastAPI |
| from huggingface_hub import list_repo_files |
| from collections import Counter |
| import asyncio |
| import os |
| import logging |
| from contextlib import asynccontextmanager |
|
|
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| REPO_ID = "Fred808/BG1" |
| SLEEP_INTERVAL = 60 |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
|
|
| async def dataset_counter(): |
| while True: |
| try: |
| files = list_repo_files(repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN) |
| data_files = [f for f in files if not f.endswith('/')] |
| ext_count = Counter(os.path.splitext(f)[1] or "no_ext" for f in data_files) |
| logging.info(f"๐งฎ Total files: {len(data_files)} | File types: {dict(ext_count)}") |
| except Exception as e: |
| logging.error(f"โ Error: {e}") |
| await asyncio.sleep(SLEEP_INTERVAL) |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| logging.info(f"๐ Starting counter for: {REPO_ID}") |
| task = asyncio.create_task(dataset_counter()) |
| yield |
| task.cancel() |
| logging.info("๐ Shutting down counter.") |
|
|
| app = FastAPI(lifespan=lifespan) |
|
|
| @app.get("/health") |
| def healthcheck(): |
| return {"healthy": True} |
|
|
|
|
| @app.get("/") |
| def keep_alive(): |
| return {"status": "Dataset counter running"} |
|
|
|
|