File size: 9,799 Bytes
8ede856
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
"""Manage Bay container lifecycle for zero-config Shipyard Neo integration.

When no Bay endpoint is configured, AstrBot can automatically start a Bay
container using the Docker socket (like BoxliteBooter does for Ship
containers).
"""

from __future__ import annotations

import asyncio
import io
import json
import tarfile
from typing import Any

import aiodocker
import aiohttp

from astrbot.api import logger

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

BAY_IMAGE = "ghcr.io/astrbotdevs/shipyard-neo-bay:latest"
BAY_CONTAINER_NAME = "astrbot-bay"
BAY_LABEL = "astrbot.bay.managed"
BAY_PORT = 8114
HEALTH_TIMEOUT_S = 60
HEALTH_POLL_INTERVAL_S = 2


class BayContainerManager:
    """Start / reuse / stop a Bay container via Docker Engine API."""

    def __init__(
        self,
        image: str = BAY_IMAGE,
        host_port: int = BAY_PORT,
    ) -> None:
        self._image = image
        self._host_port = host_port
        self._docker: aiodocker.Docker | None = None
        self._container: Any = None

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    async def ensure_running(self) -> str:
        """Make sure a Bay container is running. Returns the endpoint URL.

        If a container labelled ``astrbot.bay.managed`` already exists
        and is running, it will be reused.  Otherwise a new container is
        created from *self._image*.
        """
        try:
            self._docker = aiodocker.Docker()
        except Exception as exc:
            raise RuntimeError(
                "Failed to connect to Docker daemon. "
                "Ensure Docker is installed and running, or configure "
                "an explicit Bay endpoint instead of auto-start mode."
            ) from exc

        # 1. Look for an existing managed container
        existing = await self._find_managed_container()
        if existing is not None:
            state = existing["State"]
            if state.get("Running"):
                cid = existing["Id"][:12]
                logger.info("[BayManager] Reusing existing Bay container: %s", cid)
                self._container = await self._docker.containers.get(existing["Id"])
                return f"http://127.0.0.1:{self._host_port}"
            else:
                # Container exists but stopped — restart it
                logger.info("[BayManager] Restarting stopped Bay container")
                container = await self._docker.containers.get(existing["Id"])
                await container.start()
                self._container = container
                return f"http://127.0.0.1:{self._host_port}"

        # 2. Pull image if needed
        await self._pull_image_if_needed()

        # 3. Create and start container
        logger.info(
            "[BayManager] Starting Bay container: image=%s, port=%d",
            self._image,
            self._host_port,
        )
        config = {
            "Image": self._image,
            "Labels": {BAY_LABEL: "true"},
            "Env": [
                "BAY_SERVER__HOST=0.0.0.0",
                f"BAY_SERVER__PORT={BAY_PORT}",
                "BAY_DATA_DIR=/app/data",
                # allow_anonymous=false → auto-provisions API key
                "BAY_SECURITY__ALLOW_ANONYMOUS=false",
            ],
            "HostConfig": {
                "PortBindings": {
                    f"{BAY_PORT}/tcp": [{"HostPort": str(self._host_port)}],
                },
                "Binds": [
                    # Bay needs Docker socket to create sandbox containers
                    "/var/run/docker.sock:/var/run/docker.sock",
                ],
                "RestartPolicy": {"Name": "unless-stopped"},
            },
        }
        self._container = await self._docker.containers.create_or_replace(
            BAY_CONTAINER_NAME, config
        )
        await self._container.start()
        logger.info("[BayManager] Bay container started: %s", BAY_CONTAINER_NAME)

        return f"http://127.0.0.1:{self._host_port}"

    async def wait_healthy(self, timeout: int = HEALTH_TIMEOUT_S) -> None:
        """Block until Bay's ``/health`` endpoint returns 200."""
        url = f"http://127.0.0.1:{self._host_port}/health"
        loop = asyncio.get_running_loop()
        deadline = loop.time() + timeout
        last_error: str = ""

        async with aiohttp.ClientSession() as session:
            while loop.time() < deadline:
                try:
                    async with session.get(
                        url, timeout=aiohttp.ClientTimeout(total=3)
                    ) as resp:
                        if resp.status == 200:
                            logger.info("[BayManager] Bay is healthy")
                            return
                        last_error = f"HTTP {resp.status}"
                except Exception as exc:
                    last_error = str(exc)

                await asyncio.sleep(HEALTH_POLL_INTERVAL_S)

        raise TimeoutError(
            f"Bay did not become healthy within {timeout}s (last error: {last_error})"
        )

    async def read_credentials(self) -> str:
        """Read auto-provisioned API key from Bay container.

        Bay writes ``credentials.json`` to its data directory when
        ``allow_anonymous=false`` and no explicit API key is set.
        """
        if self._container is None:
            return ""

        try:
            # Read credentials.json from container filesystem
            tar_stream = await self._container.get_archive("/app/data/credentials.json")
            # get_archive returns (tar_data, stat)
            tar_data = tar_stream

            if isinstance(tar_data, dict):
                raw = tar_data.get("data", b"")
            elif isinstance(tar_data, tuple):
                # (stream, stat_info)
                raw = b""
                stream = tar_data[0]
                if hasattr(stream, "read"):
                    raw = await stream.read()
                elif isinstance(stream, bytes):
                    raw = stream
                else:
                    # It might be a chunked response
                    chunks = []
                    async for chunk in stream:
                        chunks.append(chunk)
                    raw = b"".join(chunks)
            else:
                raw = tar_data if isinstance(tar_data, bytes) else b""

            if not raw:
                logger.debug("[BayManager] Empty tar response from container")
                return ""

            tario = io.BytesIO(raw)
            with tarfile.open(fileobj=tario) as tar:
                for member in tar.getmembers():
                    f = tar.extractfile(member)
                    if f:
                        creds = json.loads(f.read().decode("utf-8"))
                        api_key = creds.get("api_key", "")
                        if api_key:
                            masked = (
                                f"{api_key[:8]}..."
                                if len(api_key) >= 10
                                else "redacted"
                            )
                            logger.info(
                                "[BayManager] Auto-discovered Bay API key: %s",
                                masked,
                            )
                        return api_key
        except Exception as exc:
            logger.debug(
                "[BayManager] Failed to read credentials from container: %s", exc
            )

        return ""

    async def close_client(self) -> None:
        """Close the Docker client without stopping the container.

        The Bay container stays running for reuse by future sessions.
        """
        if self._docker is not None:
            await self._docker.close()
            self._docker = None

    async def stop(self) -> None:
        """Stop and remove the managed Bay container."""
        if self._container is not None:
            try:
                await self._container.stop()
                await self._container.delete(force=True)
                logger.info("[BayManager] Bay container stopped and removed")
            except Exception as exc:
                logger.debug("[BayManager] Error stopping Bay container: %s", exc)
            finally:
                self._container = None

        await self.close_client()

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    async def _find_managed_container(self) -> dict | None:
        """Find an existing container with our management label."""
        assert self._docker is not None
        containers = await self._docker.containers.list(
            all=True,
            filters=json.dumps({"label": [f"{BAY_LABEL}=true"]}),
        )
        if containers:
            # Inspect first match to get full state
            return await containers[0].show()
        return None

    async def _pull_image_if_needed(self) -> None:
        """Pull the Bay image if it doesn't exist locally."""
        assert self._docker is not None
        try:
            await self._docker.images.inspect(self._image)
            logger.debug("[BayManager] Image %s already exists", self._image)
        except aiodocker.exceptions.DockerError:
            logger.info("[BayManager] Pulling image %s ...", self._image)
            # Pull with progress logging
            await self._docker.images.pull(self._image)
            logger.info("[BayManager] Image %s pulled successfully", self._image)