| """Providers for launching Hugging Face Spaces via ``uv run``.""" |
|
|
| from __future__ import annotations |
|
|
| import os |
| import socket |
| import subprocess |
| import time |
| from dataclasses import dataclass, field |
| from typing import Dict, Optional |
|
|
| import requests |
|
|
| from .providers import ContainerProvider |
|
|
|
|
| def _poll_health(health_url: str, timeout_s: float) -> None: |
| """Poll a health endpoint until it returns HTTP 200 or times out.""" |
|
|
| deadline = time.time() + timeout_s |
| while time.time() < deadline: |
| try: |
| response = requests.get(health_url, timeout=2.0) |
| if response.status_code == 200: |
| return |
| except requests.RequestException: |
| pass |
|
|
| time.sleep(0.5) |
|
|
| raise TimeoutError( |
| f"Server did not become ready within {timeout_s:.1f} seconds" |
| ) |
|
|
|
|
| def _create_uv_command( |
| space_id: str, |
| host: str, |
| port: int, |
| reload: bool, |
| project_url: Optional[str] = None, |
| ) -> list[str]: |
| command = [ |
| "uv", |
| "run", |
| "--project", |
| project_url or f"git+https://huggingface.co/spaces/{space_id}", |
| "--", |
| "server", |
| "--host", |
| host, |
| "--port", |
| str(port), |
| ] |
| if reload: |
| command.append("--reload") |
| return command |
|
|
|
|
| @dataclass |
| class UVProvider(ContainerProvider): |
| """ContainerProvider implementation backed by ``uv run``.""" |
|
|
| space_id: str |
| host: str = "0.0.0.0" |
| port: Optional[int] = None |
| reload: bool = False |
| project_url: Optional[str] = None |
| connect_host: Optional[str] = None |
| extra_env: Optional[Dict[str, str]] = None |
| context_timeout_s: float = 60.0 |
|
|
| _process: subprocess.Popen | None = field(init=False, default=None) |
| _base_url: str | None = field(init=False, default=None) |
|
|
| def start_container( |
| self, |
| image: str, |
| port: Optional[int] = None, |
| env_vars: Optional[Dict[str, str]] = None, |
| **_: Dict[str, str], |
| ) -> str: |
| if self._process is not None and self._process.poll() is None: |
| raise RuntimeError("UVProvider is already running") |
|
|
| self.space_id = image or self.space_id |
|
|
| bind_port = port or self.port or self._find_free_port() |
|
|
| command = _create_uv_command( |
| self.space_id, |
| self.host, |
| bind_port, |
| self.reload, |
| project_url=self.project_url, |
| ) |
|
|
| env = os.environ.copy() |
| if self.extra_env: |
| env.update(self.extra_env) |
| if env_vars: |
| env.update(env_vars) |
|
|
| try: |
| self._process = subprocess.Popen(command, env=env) |
| except FileNotFoundError as exc: |
| raise RuntimeError( |
| "`uv` executable not found. Install uv from " |
| "https://github.com/astral-sh/uv and ensure it is on PATH." |
| ) from exc |
| except OSError as exc: |
| raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc |
|
|
| client_host = self.connect_host or ( |
| "127.0.0.1" if self.host in {"0.0.0.0", "::"} else self.host |
| ) |
| self._base_url = f"http://{client_host}:{bind_port}" |
| self.port = bind_port |
| return self._base_url |
|
|
| def wait_for_ready(self, base_url: str, timeout_s: float = 60.0) -> None: |
| if self._process and self._process.poll() is not None: |
| code = self._process.returncode |
| raise RuntimeError( |
| f"uv process exited prematurely with code {code}" |
| ) |
|
|
| _poll_health(f"{base_url}/health", timeout_s) |
|
|
| def stop_container(self) -> None: |
| if self._process is None: |
| return |
|
|
| if self._process.poll() is None: |
| self._process.terminate() |
| try: |
| self._process.wait(timeout=10.0) |
| except subprocess.TimeoutExpired: |
| self._process.kill() |
| self._process.wait(timeout=5.0) |
|
|
| self._process = None |
| self._base_url = None |
|
|
| def start(self) -> str: |
| return self.start_container(self.space_id, port=self.port) |
|
|
| def stop(self) -> None: |
| self.stop_container() |
|
|
| def wait_for_ready_default(self, timeout_s: float | None = None) -> None: |
| if self._base_url is None: |
| raise RuntimeError("UVProvider has not been started") |
| self.wait_for_ready( |
| self._base_url, |
| timeout_s or self.context_timeout_s, |
| ) |
|
|
| def close(self) -> None: |
| self.stop_container() |
|
|
| def __enter__(self) -> "UVProvider": |
| if self._base_url is None: |
| base_url = self.start_container(self.space_id, port=self.port) |
| self.wait_for_ready(base_url, timeout_s=self.context_timeout_s) |
| return self |
|
|
| def __exit__(self, exc_type, exc, tb) -> None: |
| self.stop_container() |
|
|
| def _find_free_port(self) -> int: |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
| sock.bind(("", 0)) |
| sock.listen(1) |
| return sock.getsockname()[1] |
|
|
| @property |
| def base_url(self) -> str: |
| if self._base_url is None: |
| raise RuntimeError("UVProvider has not been started") |
| return self._base_url |
|
|
|
|
|
|