repl / src /core /containers /runtime /uv_provider.py
burtenshaw's picture
burtenshaw HF Staff
Upload folder using huggingface_hub
81b02bf verified
"""Providers for launching ASGI applications via ``uv run``."""
from __future__ import annotations
import os
import socket
import subprocess
import time
from typing import Dict, Optional
import requests
from .providers import RuntimeProvider
def _check_uv_installed() -> None:
try:
subprocess.check_output(["uv", "--version"])
except FileNotFoundError as exc:
raise RuntimeError(
"`uv` executable not found. Install uv from https://docs.astral.sh and ensure it is on PATH."
) from exc
def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("", 0))
sock.listen(1)
return sock.getsockname()[1]
def _create_uv_command(
*,
host: str,
port: int,
reload: bool,
workers: int,
app: str,
project_path: str,
) -> list[str]:
command: list[str] = ["uv", "run", "--isolated", "--project", project_path]
command.append("--")
command.extend(
[
"uvicorn",
app,
"--host",
host,
"--port",
str(port),
"--workers",
str(workers),
]
)
if reload:
command.append("--reload")
return command
def _poll_health(health_url: str, timeout_s: float) -> None:
"""Poll a health endpoint until it returns HTTP 200 or times out."""
deadline = time.time() + timeout_s
while time.time() < deadline:
try:
timeout = max(0.0001, min(deadline - time.time(), 2.0))
response = requests.get(health_url, timeout=timeout)
if response.status_code == 200:
return
except requests.RequestException:
continue
time.sleep(0.5)
raise TimeoutError(f"Server did not become ready within {timeout_s:.1f} seconds")
class UVProvider(RuntimeProvider):
"""
RuntimeProvider implementation backed by ``uv run``.
Args:
project_path: Local path to a uv project (passed to ``uv run --project``)
app: ASGI application path for uvicorn (defaults to ``server.app:app``)
host: Host interface to bind to (defaults to ``0.0.0.0``)
reload: Whether to enable uvicorn's reload mode
env_vars: Environment variables to pass through to the spawned process
context_timeout_s: How long to wait for the environment to become ready
Example:
>>> provider = UVProvider(project_path="/path/to/env")
>>> base_url = provider.start()
>>> print(base_url) # http://localhost:8000
>>> # Use the environment via base_url
>>> provider.stop()
"""
def __init__(
self,
*,
project_path: str,
app: str = "server.app:app",
host: str = "0.0.0.0",
reload: bool = False,
env_vars: Optional[Dict[str, str]] = None,
context_timeout_s: float = 60.0,
):
"""Initialize the UVProvider."""
self.project_path = os.path.abspath(project_path)
self.app = app
self.host = host
self.reload = reload
self.env_vars = env_vars
self.context_timeout_s = context_timeout_s
_check_uv_installed()
self._process = None
self._base_url = None
def start(
self,
port: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
workers: int = 1,
**_: Dict[str, str],
) -> str:
"""
Start the environment via `uv run`.
Args:
port: The port to bind the environment to
env_vars: Environment variables to pass to the environment
workers: The number of workers to use
Returns:
The base URL of the environment
Raises:
RuntimeError: If the environment is already running
"""
if self._process is not None and self._process.poll() is None:
raise RuntimeError("UVProvider is already running")
bind_port = port or _find_free_port()
command = _create_uv_command(
host=self.host,
port=bind_port,
reload=self.reload,
workers=workers,
app=self.app,
project_path=self.project_path,
)
env = os.environ.copy()
if self.env_vars:
env.update(self.env_vars)
if env_vars:
env.update(env_vars)
try:
self._process = subprocess.Popen(command, env=env)
except OSError as exc:
raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc
client_host = "127.0.0.1" if self.host in {"0.0.0.0", "::"} else self.host
self._base_url = f"http://{client_host}:{bind_port}"
return self._base_url
def wait_for_ready(self, timeout_s: float = 60.0) -> None:
"""
Wait for the environment to become ready.
Args:
timeout_s: The timeout to wait for the environment to become ready
Raises:
RuntimeError: If the environment is not running
TimeoutError: If the environment does not become ready within the timeout
"""
if self._process and self._process.poll() is not None:
code = self._process.returncode
raise RuntimeError(f"uv process exited prematurely with code {code}")
_poll_health(f"{self._base_url}/health", timeout_s=timeout_s)
def stop(self) -> None:
"""
Stop the environment.
Raises:
RuntimeError: If the environment is not running
"""
if self._process is None:
return
if self._process.poll() is None:
self._process.terminate()
try:
self._process.wait(timeout=10.0)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait(timeout=5.0)
self._process = None
self._base_url = None
@property
def base_url(self) -> str:
"""
The base URL of the environment.
Returns:
The base URL of the environment
Raises:
RuntimeError: If the environment is not running
"""
if self._base_url is None:
raise RuntimeError("UVProvider has not been started")
return self._base_url