| import json |
| import os |
| from difflib import SequenceMatcher |
| from typing import Any, Dict, Optional, Tuple |
|
|
| from fastapi import FastAPI, Request, Response |
| from huggingface_hub import (DatasetCard, HfApi, ModelCard, comment_discussion, |
| create_discussion, get_discussion_details, |
| get_repo_discussions, login) |
| from huggingface_hub.utils import EntryNotFoundError |
| from tabulate import tabulate |
|
|
| KEY = os.environ.get("WEBHOOK_SECRET") |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
| api = HfApi(token=HF_TOKEN) |
| login(HF_TOKEN) |
|
|
| app = FastAPI() |
|
|
|
|
| @app.get("/") |
| def read_root(): |
| data = """ |
| <h2 style="text-align:center">Metadata Review Bot</h2> |
| <p style="text-align:center">This is a demo app showing how to use webhooks to automate metadata review for models and datasets shared on the Hugging Face Hub.</p> |
| """ |
| return Response(content=data, media_type="text/html") |
|
|
|
|
| def similar(a, b): |
| """Check similarity of two sequences""" |
| return SequenceMatcher(None, a, b).ratio() |
|
|
|
|
| def create_metadata_key_dict(card_data, repo_type: str): |
| shared_keys = ["tags", "license"] |
| if repo_type == "model": |
| model_keys = ["library_name", "datasets", "metrics", "co2", "pipeline_tag"] |
| shared_keys.extend(model_keys) |
| keys = shared_keys |
| return {key: card_data.get(key) for key in keys} |
| if repo_type == "dataset": |
| data_keys = [ |
| "pretty_name", |
| "size_categories", |
| "task_categories", |
| "task_ids", |
| "source_datasets", |
| ] |
| shared_keys.extend(data_keys) |
| keys = shared_keys |
| return {key: card_data.get(key) for key in keys} |
|
|
|
|
| def create_metadata_breakdown_table(desired_metadata_dictionary): |
| data = {k:v or "Field Missing" for k,v in desired_metadata_dictionary.items()} |
| metadata_fields_column = list(data.keys()) |
| metadata_values_column = list(data.values()) |
| table_data = list(zip(metadata_fields_column, metadata_values_column)) |
| return tabulate( |
| table_data, tablefmt="github", headers=("Metadata Field", "Provided Value") |
| ) |
|
|
|
|
| def calculate_grade(desired_metadata_dictionary): |
| metadata_values = list(desired_metadata_dictionary.values()) |
| score = sum(1 if field else 0 for field in metadata_values) / len(metadata_values) |
| return round(score, 2) |
|
|
|
|
| def create_markdown_report( |
| desired_metadata_dictionary, repo_name, repo_type, score, update: bool = False |
| ): |
| report = f"""# {repo_type.title()} metadata report card {"(updated)" if update else ""} |
| \n |
| This is an automatically produced metadata quality report card for {repo_name}. This report is meant as a POC! |
| \n |
| ## Breakdown of metadata fields for your{repo_type} |
| \n |
| {create_metadata_breakdown_table(desired_metadata_dictionary)} |
| \n |
| You scored a metadata coverage grade of: **{score}**% \n {f"We're not angry we're just disappointed! {repo_type.title()} metadata is super important. Please try harder..." |
| if score <= 0.5 else f"Not too shabby! Make sure you also fill in a {repo_type} card too!"} |
| """ |
| return report |
|
|
|
|
| def parse_webhook_post(data: Dict[str, Any]) -> Optional[Tuple[str, str]]: |
| event = data["event"] |
| if event["scope"] != "repo": |
| return None |
| repo = data["repo"] |
| repo_name = repo["name"] |
| repo_type = repo["type"] |
| if repo_type not in {"model", "dataset"}: |
| raise ValueError("Unknown hub type") |
| return repo_type, repo_name |
|
|
|
|
| def load_repo_card_metadata(repo_type, repo_name): |
| if repo_type == "dataset": |
| try: |
| return DatasetCard.load(repo_name).data.to_dict() |
| except EntryNotFoundError: |
| return {} |
| if repo_type == "model": |
| try: |
| return ModelCard.load(repo_name).data.to_dict() |
| except EntryNotFoundError: |
| return {} |
|
|
|
|
| def create_or_update_report(data): |
| if parsed_post := parse_webhook_post(data): |
| repo_type, repo_name = parsed_post |
| else: |
| return Response("Unable to parse webhook data", status_code=400) |
| card_data = load_repo_card_metadata(repo_type, repo_name) |
| desired_metadata_dictionary = create_metadata_key_dict(card_data, repo_type) |
| score = calculate_grade(desired_metadata_dictionary) |
| report = create_markdown_report( |
| desired_metadata_dictionary, repo_name, repo_type, score, update=False |
| ) |
| repo_discussions = get_repo_discussions( |
| repo_name, |
| repo_type=repo_type, |
| ) |
| for discussion in repo_discussions: |
| if ( |
| discussion.title == "Metadata Report Card" and discussion.status == "open" |
| ): |
| discussion_details = get_discussion_details( |
| repo_name, discussion.num, repo_type=repo_type |
| ) |
| last_comment = discussion_details.events[-1].content |
| if similar(report, last_comment) <= 0.999: |
| report = create_markdown_report( |
| desired_metadata_dictionary, |
| repo_name, |
| repo_type, |
| score, |
| update=True, |
| ) |
| comment_discussion( |
| repo_name, |
| discussion.num, |
| comment=report, |
| repo_type=repo_type, |
| ) |
| return True |
| create_discussion( |
| repo_name, |
| "Metadata Report Card", |
| description=report, |
| repo_type=repo_type, |
| ) |
| return True |
|
|
|
|
| @app.post("/webhook") |
| async def webhook(request: Request): |
| if request.method == "POST": |
| if request.headers.get("X-Webhook-Secret") != KEY: |
| return Response("Invalid secret", status_code=401) |
| data = await request.json() |
| result = create_or_update_report(data) |
| return "Webhook received!" if result else result |
|
|