| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Contains `WebhooksServer` and `webhook_endpoint` to create a webhook server easily.""" |
|
|
| import atexit |
| import inspect |
| import os |
| from collections.abc import Callable |
| from functools import wraps |
| from typing import TYPE_CHECKING, Any, Optional |
|
|
| from .utils import experimental, is_fastapi_available, is_gradio_available |
|
|
|
|
| if TYPE_CHECKING: |
| import gradio as gr |
| from fastapi import Request |
|
|
| if is_fastapi_available(): |
| from fastapi import FastAPI, Request |
| from fastapi.responses import JSONResponse |
| else: |
| |
| FastAPI = Request = JSONResponse = None |
|
|
|
|
| _global_app: Optional["WebhooksServer"] = None |
| _is_local = os.environ.get("SPACE_ID") is None |
|
|
|
|
| @experimental |
| class WebhooksServer: |
| """ |
| The [`WebhooksServer`] class lets you create an instance of a Gradio app that can receive Huggingface webhooks. |
| These webhooks can be registered using the [`~WebhooksServer.add_webhook`] decorator. Webhook endpoints are added to |
| the app as a POST endpoint to the FastAPI router. Once all the webhooks are registered, the `launch` method has to be |
| called to start the app. |
| |
| It is recommended to accept [`WebhookPayload`] as the first argument of the webhook function. It is a Pydantic |
| model that contains all the information about the webhook event. The data will be parsed automatically for you. |
| |
| Check out the [webhooks guide](../guides/webhooks_server) for a step-by-step tutorial on how to set up your |
| WebhooksServer and deploy it on a Space. |
| |
| > [!WARNING] |
| > `WebhooksServer` is experimental. Its API is subject to change in the future. |
| |
| > [!WARNING] |
| > You must have `gradio` installed to use `WebhooksServer` (`pip install --upgrade gradio`). |
| |
| Args: |
| ui (`gradio.Blocks`, optional): |
| A Gradio UI instance to be used as the Space landing page. If `None`, a UI displaying instructions |
| about the configured webhooks is created. |
| webhook_secret (`str`, optional): |
| A secret key to verify incoming webhook requests. You can set this value to any secret you want as long as |
| you also configure it in your [webhooks settings panel](https://huggingface.co/settings/webhooks). You |
| can also set this value as the `WEBHOOK_SECRET` environment variable. If no secret is provided, the |
| webhook endpoints are opened without any security. |
| |
| Example: |
| |
| ```python |
| import gradio as gr |
| from huggingface_hub import WebhooksServer, WebhookPayload |
| |
| with gr.Blocks() as ui: |
| ... |
| |
| app = WebhooksServer(ui=ui, webhook_secret="my_secret_key") |
| |
| @app.add_webhook("/say_hello") |
| async def hello(payload: WebhookPayload): |
| return {"message": "hello"} |
| |
| app.launch() |
| ``` |
| """ |
|
|
| def __new__(cls, *args, **kwargs) -> "WebhooksServer": |
| if not is_gradio_available(): |
| raise ImportError( |
| "You must have `gradio` installed to use `WebhooksServer`. Please run `pip install --upgrade gradio`" |
| " first." |
| ) |
| if not is_fastapi_available(): |
| raise ImportError( |
| "You must have `fastapi` installed to use `WebhooksServer`. Please run `pip install --upgrade fastapi`" |
| " first." |
| ) |
| return super().__new__(cls) |
|
|
| def __init__( |
| self, |
| ui: Optional["gr.Blocks"] = None, |
| webhook_secret: str | None = None, |
| ) -> None: |
| self._ui = ui |
|
|
| self.webhook_secret = webhook_secret or os.getenv("WEBHOOK_SECRET") |
| self.registered_webhooks: dict[str, Callable] = {} |
| _warn_on_empty_secret(self.webhook_secret) |
|
|
| def add_webhook(self, path: str | None = None) -> Callable: |
| """ |
| Decorator to add a webhook to the [`WebhooksServer`] server. |
| |
| Args: |
| path (`str`, optional): |
| The URL path to register the webhook function. If not provided, the function name will be used as the |
| path. In any case, all webhooks are registered under `/webhooks`. |
| |
| Raises: |
| ValueError: If the provided path is already registered as a webhook. |
| |
| Example: |
| ```python |
| from huggingface_hub import WebhooksServer, WebhookPayload |
| |
| app = WebhooksServer() |
| |
| @app.add_webhook |
| async def trigger_training(payload: WebhookPayload): |
| if payload.repo.type == "dataset" and payload.event.action == "update": |
| # Trigger a training job if a dataset is updated |
| ... |
| |
| app.launch() |
| ``` |
| """ |
| |
| if callable(path): |
| |
| return self.add_webhook()(path) |
|
|
| |
| @wraps(FastAPI.post) |
| def _inner_post(*args, **kwargs): |
| func = args[0] |
| abs_path = f"/webhooks/{(path or func.__name__).strip('/')}" |
| if abs_path in self.registered_webhooks: |
| raise ValueError(f"Webhook {abs_path} already exists.") |
| self.registered_webhooks[abs_path] = func |
|
|
| return _inner_post |
|
|
| def launch(self, prevent_thread_lock: bool = False, **launch_kwargs: Any) -> None: |
| """Launch the Gradio app and register webhooks to the underlying FastAPI server. |
| |
| Input parameters are forwarded to Gradio when launching the app. |
| """ |
| ui = self._ui or self._get_default_ui() |
|
|
| |
| |
| |
| launch_kwargs.setdefault("share", _is_local) |
| self.fastapi_app, _, _ = ui.launch(prevent_thread_lock=True, **launch_kwargs) |
|
|
| |
| for path, func in self.registered_webhooks.items(): |
| |
| if self.webhook_secret is not None: |
| func = _wrap_webhook_to_check_secret(func, webhook_secret=self.webhook_secret) |
|
|
| |
| self.fastapi_app.post(path)(func) |
|
|
| |
| space_host = os.environ.get("SPACE_HOST") |
| url = "https://" + space_host if space_host is not None else (ui.share_url or ui.local_url) |
| if url is None: |
| raise ValueError("Cannot find the URL of the app. Please provide a valid `ui` or update `gradio` version.") |
| url = url.strip("/") |
| message = "\nWebhooks are correctly setup and ready to use:" |
| message += "\n" + "\n".join(f" - POST {url}{webhook}" for webhook in self.registered_webhooks) |
| message += "\nGo to https://huggingface.co/settings/webhooks to setup your webhooks." |
| print(message) |
|
|
| if not prevent_thread_lock: |
| ui.block_thread() |
|
|
| def _get_default_ui(self) -> "gr.Blocks": |
| """Default UI if not provided (lists webhooks and provides basic instructions).""" |
| import gradio as gr |
|
|
| with gr.Blocks() as ui: |
| gr.Markdown("# This is an app to process 🤗 Webhooks") |
| gr.Markdown( |
| "Webhooks are a foundation for MLOps-related features. They allow you to listen for new changes on" |
| " specific repos or to all repos belonging to particular set of users/organizations (not just your" |
| " repos, but any repo). Check out this [guide](https://huggingface.co/docs/hub/webhooks) to get to" |
| " know more about webhooks on the Huggingface Hub." |
| ) |
| gr.Markdown( |
| f"{len(self.registered_webhooks)} webhook(s) are registered:" |
| + "\n\n" |
| + "\n ".join( |
| f"- [{webhook_path}]({_get_webhook_doc_url(webhook.__name__, webhook_path)})" |
| for webhook_path, webhook in self.registered_webhooks.items() |
| ) |
| ) |
| gr.Markdown( |
| "Go to https://huggingface.co/settings/webhooks to setup your webhooks." |
| + "\nYou app is running locally. Please look at the logs to check the full URL you need to set." |
| if _is_local |
| else ( |
| "\nThis app is running on a Space. You can find the corresponding URL in the options menu" |
| " (top-right) > 'Embed the Space'. The URL looks like 'https://{username}-{repo_name}.hf.space'." |
| ) |
| ) |
| return ui |
|
|
|
|
| @experimental |
| def webhook_endpoint(path: str | None = None) -> Callable: |
| """Decorator to start a [`WebhooksServer`] and register the decorated function as a webhook endpoint. |
| |
| This is a helper to get started quickly. If you need more flexibility (custom landing page or webhook secret), |
| you can use [`WebhooksServer`] directly. You can register multiple webhook endpoints (to the same server) by using |
| this decorator multiple times. |
| |
| Check out the [webhooks guide](../guides/webhooks_server) for a step-by-step tutorial on how to set up your |
| server and deploy it on a Space. |
| |
| > [!WARNING] |
| > `webhook_endpoint` is experimental. Its API is subject to change in the future. |
| |
| > [!WARNING] |
| > You must have `gradio` installed to use `webhook_endpoint` (`pip install --upgrade gradio`). |
| |
| Args: |
| path (`str`, optional): |
| The URL path to register the webhook function. If not provided, the function name will be used as the path. |
| In any case, all webhooks are registered under `/webhooks`. |
| |
| Examples: |
| The default usage is to register a function as a webhook endpoint. The function name will be used as the path. |
| The server will be started automatically at exit (i.e. at the end of the script). |
| |
| ```python |
| from huggingface_hub import webhook_endpoint, WebhookPayload |
| |
| @webhook_endpoint |
| async def trigger_training(payload: WebhookPayload): |
| if payload.repo.type == "dataset" and payload.event.action == "update": |
| # Trigger a training job if a dataset is updated |
| ... |
| |
| # Server is automatically started at the end of the script. |
| ``` |
| |
| Advanced usage: register a function as a webhook endpoint and start the server manually. This is useful if you |
| are running it in a notebook. |
| |
| ```python |
| from huggingface_hub import webhook_endpoint, WebhookPayload |
| |
| @webhook_endpoint |
| async def trigger_training(payload: WebhookPayload): |
| if payload.repo.type == "dataset" and payload.event.action == "update": |
| # Trigger a training job if a dataset is updated |
| ... |
| |
| # Start the server manually |
| trigger_training.launch() |
| ``` |
| """ |
| if callable(path): |
| |
| return webhook_endpoint()(path) |
|
|
| @wraps(WebhooksServer.add_webhook) |
| def _inner(func: Callable) -> Callable: |
| app = _get_global_app() |
| app.add_webhook(path)(func) |
| if len(app.registered_webhooks) == 1: |
| |
| atexit.register(app.launch) |
|
|
| @wraps(app.launch) |
| def _launch_now(): |
| |
| atexit.unregister(app.launch) |
| app.launch() |
|
|
| func.launch = _launch_now |
| return func |
|
|
| return _inner |
|
|
|
|
| def _get_global_app() -> WebhooksServer: |
| global _global_app |
| if _global_app is None: |
| _global_app = WebhooksServer() |
| return _global_app |
|
|
|
|
| def _warn_on_empty_secret(webhook_secret: str | None) -> None: |
| if webhook_secret is None: |
| print("Webhook secret is not defined. This means your webhook endpoints will be open to everyone.") |
| print( |
| "To add a secret, set `WEBHOOK_SECRET` as environment variable or pass it at initialization: " |
| "\n\t`app = WebhooksServer(webhook_secret='my_secret', ...)`" |
| ) |
| print( |
| "For more details about webhook secrets, please refer to" |
| " https://huggingface.co/docs/hub/webhooks#webhook-secret." |
| ) |
| else: |
| print("Webhook secret is correctly defined.") |
|
|
|
|
| def _get_webhook_doc_url(webhook_name: str, webhook_path: str) -> str: |
| """Returns the anchor to a given webhook in the docs (experimental)""" |
| return "/docs#/default/" + webhook_name + webhook_path.replace("/", "_") + "_post" |
|
|
|
|
| def _wrap_webhook_to_check_secret(func: Callable, webhook_secret: str) -> Callable: |
| """Wraps a webhook function to check the webhook secret before calling the function. |
| |
| This is a hacky way to add the `request` parameter to the function signature. Since FastAPI based itself on route |
| parameters to inject the values to the function, we need to hack the function signature to retrieve the `Request` |
| object (and hence the headers). A far cleaner solution would be to use a middleware. However, since |
| `fastapi==0.90.1`, a middleware cannot be added once the app has started. And since the FastAPI app is started by |
| Gradio internals (and not by us), we cannot add a middleware. |
| |
| This method is called only when a secret has been defined by the user. If a request is sent without the |
| "x-webhook-secret", the function will return a 401 error (unauthorized). If the header is sent but is incorrect, |
| the function will return a 403 error (forbidden). |
| |
| Inspired by https://stackoverflow.com/a/33112180. |
| """ |
| initial_sig = inspect.signature(func) |
|
|
| @wraps(func) |
| async def _protected_func(request: Request, **kwargs): |
| request_secret = request.headers.get("x-webhook-secret") |
| if request_secret is None: |
| return JSONResponse({"error": "x-webhook-secret header not set."}, status_code=401) |
| if request_secret != webhook_secret: |
| return JSONResponse({"error": "Invalid webhook secret."}, status_code=403) |
|
|
| |
| if "request" in initial_sig.parameters: |
| kwargs["request"] = request |
|
|
| |
| if inspect.iscoroutinefunction(func): |
| return await func(**kwargs) |
| else: |
| return func(**kwargs) |
|
|
| |
| if "request" not in initial_sig.parameters: |
| _protected_func.__signature__ = initial_sig.replace( |
| parameters=( |
| inspect.Parameter(name="request", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Request), |
| ) |
| + tuple(initial_sig.parameters.values()) |
| ) |
|
|
| |
| return _protected_func |
|
|