File size: 4,554 Bytes
7ff7119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""AsyncRuntime — long-lived background event loop for the Streamlit thread.

PROBLEM:
  * Streamlit runs a synchronous event loop (uvloop) that CANNOT be patched
    with ``nest_asyncio``.
  * LangGraph (and every async resource: ChromaDB connections, the LLM HTTP
    session, AsyncSqliteSaver checkpointers) assumes a LONG-LIVED async context.
  * Opening a new loop per invoke means async-bound resources never amortize:
    every chat message rebuilds the SQLite pool, the Chroma client, and the
    HTTP session.

SOLUTION:
  * A DEDICATED background thread that runs a single ``asyncio.new_event_loop()``
    with ``run_forever`` for the entire app lifetime.
  * The Streamlit thread (sync) hands coroutines to the background loop via
    ``asyncio.run_coroutine_threadsafe(coro, loop)``; the returned Future
    blocks the Streamlit thread until the result is ready.
  * Singleton — started once, same instance reused.

This is the classic "embedded async runtime" pattern (see LangChain,
JupyterLab, ipykernel implementations). Robust and scales well.
"""

from __future__ import annotations

import asyncio
import atexit
import threading
from collections.abc import AsyncIterator
from typing import Any, TypeVar

T = TypeVar("T")


class AsyncRuntime:
    """Singleton background event loop. Thread-safe submit + stream API."""

    _instance: AsyncRuntime | None = None
    _lock = threading.Lock()

    def __init__(self) -> None:
        # Lazy start: the loop and thread start on the first submit()
        self._loop: asyncio.AbstractEventLoop | None = None
        self._thread: threading.Thread | None = None
        self._started = threading.Event()

    @classmethod
    def get(cls) -> AsyncRuntime:
        """Singleton accessor — created on first call, same instance after."""
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = AsyncRuntime()
        return cls._instance

    def _ensure_started(self) -> None:
        """Start the background loop if not already running."""
        if self._started.is_set():
            return
        with self._lock:
            if self._started.is_set():
                return

            ready = threading.Event()

            def _run() -> None:
                # Inside the thread, create the loop and run it
                self._loop = asyncio.new_event_loop()
                asyncio.set_event_loop(self._loop)
                ready.set()
                try:
                    self._loop.run_forever()
                finally:
                    self._loop.close()

            self._thread = threading.Thread(
                target=_run,
                name="async-runtime",
                daemon=True,  # auto-stops when the app exits
            )
            self._thread.start()
            ready.wait(timeout=5.0)  # wait until the loop is actually running
            self._started.set()

            # Cleanup at app shutdown
            atexit.register(self._shutdown)

    def submit(self, coro) -> Any:
        """Submit a coroutine to the background loop, block on the result.

        This is the Streamlit thread's main API: synchronous-looking, but the
        coroutine runs on a long-lived loop so async resources (Chroma,
        SqliteSaver, embeddings) stay PERSISTENT across calls.
        """
        self._ensure_started()
        assert self._loop is not None
        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
        return future.result()

    def submit_iter(self, async_gen: AsyncIterator[T]):
        """Async generator → sync iterator wrapper for Streamlit st.write_stream.

        The Streamlit thread iterates over the (token-)stream from the astream call.
        """
        self._ensure_started()
        assert self._loop is not None

        # We drive the async generator on the background loop by submitting
        # ``__anext__()`` calls one at a time.
        while True:
            try:
                future = asyncio.run_coroutine_threadsafe(
                    async_gen.__anext__(), self._loop
                )
                yield future.result()
            except StopAsyncIteration:
                break

    def _shutdown(self) -> None:
        """atexit handler — gracefully stop the background loop."""
        if self._loop is None or not self._started.is_set():
            return
        try:
            self._loop.call_soon_threadsafe(self._loop.stop)
        except Exception:
            pass