paperhawk / app /async_runtime.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
"""AsyncRuntime — long-lived background event loop for the Streamlit thread.
PROBLEM:
* Streamlit runs a synchronous event loop (uvloop) that CANNOT be patched
with ``nest_asyncio``.
* LangGraph (and every async resource: ChromaDB connections, the LLM HTTP
session, AsyncSqliteSaver checkpointers) assumes a LONG-LIVED async context.
* Opening a new loop per invoke means async-bound resources never amortize:
every chat message rebuilds the SQLite pool, the Chroma client, and the
HTTP session.
SOLUTION:
* A DEDICATED background thread that runs a single ``asyncio.new_event_loop()``
with ``run_forever`` for the entire app lifetime.
* The Streamlit thread (sync) hands coroutines to the background loop via
``asyncio.run_coroutine_threadsafe(coro, loop)``; the returned Future
blocks the Streamlit thread until the result is ready.
* Singleton — started once, same instance reused.
This is the classic "embedded async runtime" pattern (see LangChain,
JupyterLab, ipykernel implementations). Robust and scales well.
"""
from __future__ import annotations
import asyncio
import atexit
import threading
from collections.abc import AsyncIterator
from typing import Any, TypeVar
T = TypeVar("T")
class AsyncRuntime:
"""Singleton background event loop. Thread-safe submit + stream API."""
_instance: AsyncRuntime | None = None
_lock = threading.Lock()
def __init__(self) -> None:
# Lazy start: the loop and thread start on the first submit()
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
self._started = threading.Event()
@classmethod
def get(cls) -> AsyncRuntime:
"""Singleton accessor — created on first call, same instance after."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = AsyncRuntime()
return cls._instance
def _ensure_started(self) -> None:
"""Start the background loop if not already running."""
if self._started.is_set():
return
with self._lock:
if self._started.is_set():
return
ready = threading.Event()
def _run() -> None:
# Inside the thread, create the loop and run it
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
ready.set()
try:
self._loop.run_forever()
finally:
self._loop.close()
self._thread = threading.Thread(
target=_run,
name="async-runtime",
daemon=True, # auto-stops when the app exits
)
self._thread.start()
ready.wait(timeout=5.0) # wait until the loop is actually running
self._started.set()
# Cleanup at app shutdown
atexit.register(self._shutdown)
def submit(self, coro) -> Any:
"""Submit a coroutine to the background loop, block on the result.
This is the Streamlit thread's main API: synchronous-looking, but the
coroutine runs on a long-lived loop so async resources (Chroma,
SqliteSaver, embeddings) stay PERSISTENT across calls.
"""
self._ensure_started()
assert self._loop is not None
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return future.result()
def submit_iter(self, async_gen: AsyncIterator[T]):
"""Async generator → sync iterator wrapper for Streamlit st.write_stream.
The Streamlit thread iterates over the (token-)stream from the astream call.
"""
self._ensure_started()
assert self._loop is not None
# We drive the async generator on the background loop by submitting
# ``__anext__()`` calls one at a time.
while True:
try:
future = asyncio.run_coroutine_threadsafe(
async_gen.__anext__(), self._loop
)
yield future.result()
except StopAsyncIteration:
break
def _shutdown(self) -> None:
"""atexit handler — gracefully stop the background loop."""
if self._loop is None or not self._started.is_set():
return
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except Exception:
pass