| import json |
| import os |
| import shutil |
| import uuid |
| from pathlib import Path |
|
|
| from astrbot.api import logger |
| from astrbot.core.skills.skill_manager import SANDBOX_SKILLS_ROOT, SkillManager |
| from astrbot.core.star.context import Context |
| from astrbot.core.utils.astrbot_path import ( |
| get_astrbot_skills_path, |
| get_astrbot_temp_path, |
| ) |
|
|
| from .booters.base import ComputerBooter |
| from .booters.local import LocalBooter |
|
|
| session_booter: dict[str, ComputerBooter] = {} |
| local_booter: ComputerBooter | None = None |
| _MANAGED_SKILLS_FILE = ".astrbot_managed_skills.json" |
|
|
|
|
| def _list_local_skill_dirs(skills_root: Path) -> list[Path]: |
| skills: list[Path] = [] |
| for entry in sorted(skills_root.iterdir()): |
| if not entry.is_dir(): |
| continue |
| skill_md = entry / "SKILL.md" |
| if skill_md.exists(): |
| skills.append(entry) |
| return skills |
|
|
|
|
| def _discover_bay_credentials(endpoint: str) -> str: |
| """Try to auto-discover Bay API key from credentials.json. |
| |
| Search order: |
| 1. BAY_DATA_DIR env var |
| 2. Mono-repo relative path: ../pkgs/bay/ (dev layout) |
| 3. Current working directory |
| |
| Returns: |
| API key string, or empty string if not found. |
| """ |
| candidates: list[Path] = [] |
|
|
| |
| bay_data_dir = os.environ.get("BAY_DATA_DIR") |
| if bay_data_dir: |
| candidates.append(Path(bay_data_dir) / "credentials.json") |
|
|
| |
| astrbot_root = Path(__file__).resolve().parents[3] |
| candidates.append(astrbot_root.parent / "pkgs" / "bay" / "credentials.json") |
|
|
| |
| candidates.append(Path.cwd() / "credentials.json") |
|
|
| for cred_path in candidates: |
| if not cred_path.is_file(): |
| continue |
| try: |
| data = json.loads(cred_path.read_text()) |
| api_key = data.get("api_key", "") |
| if api_key: |
| |
| cred_endpoint = data.get("endpoint", "") |
| if ( |
| cred_endpoint |
| and endpoint |
| and cred_endpoint.rstrip("/") != endpoint.rstrip("/") |
| ): |
| logger.warning( |
| "[Computer] credentials.json endpoint mismatch: " |
| "file=%s, configured=%s — using key anyway", |
| cred_endpoint, |
| endpoint, |
| ) |
| masked_key = f"{api_key[:4]}..." if len(api_key) >= 6 else "redacted" |
| logger.info( |
| "[Computer] Auto-discovered Bay API key from %s (prefix=%s)", |
| cred_path, |
| masked_key, |
| ) |
| return api_key |
| except (json.JSONDecodeError, OSError) as exc: |
| logger.debug("[Computer] Failed to read %s: %s", cred_path, exc) |
|
|
| logger.debug("[Computer] No Bay credentials.json found in search paths") |
| return "" |
|
|
|
|
| def _build_python_exec_command(script: str) -> str: |
| return ( |
| "if command -v python3 >/dev/null 2>&1; then PYBIN=python3; " |
| "elif command -v python >/dev/null 2>&1; then PYBIN=python; " |
| "else echo 'python not found in sandbox' >&2; exit 127; fi; " |
| "$PYBIN - <<'PY'\n" |
| f"{script}\n" |
| "PY" |
| ) |
|
|
|
|
| def _build_apply_sync_command() -> str: |
| """Build shell command for sync stage only. |
| |
| This stage mutates sandbox files (managed skill replacement) but does not scan |
| metadata. Keeping it separate allows callers to preserve old behavior while |
| reusing the apply step independently. |
| """ |
| script = f""" |
| import json |
| import shutil |
| import zipfile |
| from pathlib import Path |
| |
| root = Path({SANDBOX_SKILLS_ROOT!r}) |
| zip_path = root / "skills.zip" |
| tmp_extract = Path(f"{{root}}_tmp_extract") |
| managed_file = root / {_MANAGED_SKILLS_FILE!r} |
| |
| |
| def remove_tree(path: Path) -> None: |
| if not path.exists(): |
| return |
| if path.is_dir(): |
| shutil.rmtree(path, ignore_errors=True) |
| else: |
| path.unlink(missing_ok=True) |
| |
| |
| def load_managed_skills() -> list[str]: |
| if not managed_file.exists(): |
| return [] |
| try: |
| payload = json.loads(managed_file.read_text(encoding="utf-8")) |
| except Exception: |
| return [] |
| if not isinstance(payload, dict): |
| return [] |
| items = payload.get("managed_skills", []) |
| if not isinstance(items, list): |
| return [] |
| result: list[str] = [] |
| for item in items: |
| if isinstance(item, str) and item.strip(): |
| result.append(item.strip()) |
| return result |
| |
| |
| root.mkdir(parents=True, exist_ok=True) |
| for managed_name in load_managed_skills(): |
| remove_tree(root / managed_name) |
| |
| current_managed: list[str] = [] |
| if zip_path.exists(): |
| remove_tree(tmp_extract) |
| tmp_extract.mkdir(parents=True, exist_ok=True) |
| with zipfile.ZipFile(zip_path) as zf: |
| zf.extractall(tmp_extract) |
| for entry in sorted(tmp_extract.iterdir()): |
| if not entry.is_dir(): |
| continue |
| target = root / entry.name |
| remove_tree(target) |
| shutil.copytree(entry, target) |
| current_managed.append(entry.name) |
| |
| remove_tree(tmp_extract) |
| remove_tree(zip_path) |
| managed_file.write_text( |
| json.dumps({{"managed_skills": current_managed}}, ensure_ascii=False, indent=2), |
| encoding="utf-8", |
| ) |
| print(json.dumps({{"managed_skills": current_managed}}, ensure_ascii=False)) |
| """.strip() |
| return _build_python_exec_command(script) |
|
|
|
|
| def _build_scan_command() -> str: |
| """Build shell command for scan stage only. |
| |
| This stage is read-oriented: it scans SKILL.md metadata and returns the |
| historical payload shape consumed by cache update logic. |
| |
| The scan resolves the absolute path of the skills root at runtime so |
| that the LLM can reliably ``cat`` skill files regardless of cwd. |
| Only the ``description`` field is extracted from frontmatter. |
| """ |
| script = f""" |
| import json |
| from pathlib import Path |
| |
| root = Path({SANDBOX_SKILLS_ROOT!r}) |
| managed_file = root / {_MANAGED_SKILLS_FILE!r} |
| |
| # Resolve absolute path at runtime so prompts always have a reliable path |
| root_abs = str(root.resolve()) |
| |
| |
| # NOTE: This parser mirrors skill_manager._parse_frontmatter_description. |
| # Keep the two implementations in sync when changing parsing logic. |
| def parse_description(text: str) -> str: |
| if not text.startswith("---"): |
| return "" |
| lines = text.splitlines() |
| if not lines or lines[0].strip() != "---": |
| return "" |
| end_idx = None |
| for i in range(1, len(lines)): |
| if lines[i].strip() == "---": |
| end_idx = i |
| break |
| if end_idx is None: |
| return "" |
| for line in lines[1:end_idx]: |
| if ":" not in line: |
| continue |
| key, value = line.split(":", 1) |
| if key.strip().lower() == "description": |
| return value.strip().strip('"').strip("'") |
| return "" |
| |
| |
| def load_managed_skills() -> list[str]: |
| if not managed_file.exists(): |
| return [] |
| try: |
| payload = json.loads(managed_file.read_text(encoding="utf-8")) |
| except Exception: |
| return [] |
| if not isinstance(payload, dict): |
| return [] |
| items = payload.get("managed_skills", []) |
| if not isinstance(items, list): |
| return [] |
| result: list[str] = [] |
| for item in items: |
| if isinstance(item, str) and item.strip(): |
| result.append(item.strip()) |
| return result |
| |
| |
| def collect_skills() -> list[dict[str, str]]: |
| skills: list[dict[str, str]] = [] |
| if not root.exists(): |
| return skills |
| for skill_dir in sorted(root.iterdir()): |
| if not skill_dir.is_dir(): |
| continue |
| skill_md = skill_dir / "SKILL.md" |
| if not skill_md.is_file(): |
| continue |
| description = "" |
| try: |
| text = skill_md.read_text(encoding="utf-8") |
| description = parse_description(text) |
| except Exception: |
| description = "" |
| skills.append( |
| {{ |
| "name": skill_dir.name, |
| "description": description, |
| "path": f"{{root_abs}}/{{skill_dir.name}}/SKILL.md", |
| }} |
| ) |
| return skills |
| |
| |
| print( |
| json.dumps( |
| {{ |
| "managed_skills": load_managed_skills(), |
| "skills": collect_skills(), |
| }}, |
| ensure_ascii=False, |
| ) |
| ) |
| """.strip() |
| return _build_python_exec_command(script) |
|
|
|
|
| def _build_sync_and_scan_command() -> str: |
| """Legacy combined command kept for backward compatibility. |
| |
| New code paths should prefer apply + scan split helpers. |
| """ |
| return f"{_build_apply_sync_command()}\n{_build_scan_command()}" |
|
|
|
|
| def _shell_exec_succeeded(result: dict) -> bool: |
| if "success" in result: |
| return bool(result.get("success")) |
| exit_code = result.get("exit_code") |
| return exit_code in (0, None) |
|
|
|
|
| def _format_exec_error_detail(result: dict) -> str: |
| """Format shell execution details for better observability. |
| |
| Keep the message compact while still surfacing exit code and stderr/stdout. |
| """ |
| exit_code = result.get("exit_code") |
| stderr = str(result.get("stderr", "") or "").strip() |
| stdout = str(result.get("stdout", "") or "").strip() |
| stderr_text = stderr[:500] |
| stdout_text = stdout[:300] |
| return f"exit_code={exit_code}, stderr={stderr_text!r}, stdout_tail={stdout_text!r}" |
|
|
|
|
| def _decode_sync_payload(stdout: str) -> dict | None: |
| text = stdout.strip() |
| if not text: |
| return None |
| candidates = [text] |
| candidates.extend([line.strip() for line in text.splitlines() if line.strip()]) |
| for candidate in reversed(candidates): |
| try: |
| payload = json.loads(candidate) |
| except Exception: |
| continue |
| if isinstance(payload, dict): |
| return payload |
| return None |
|
|
|
|
| def _update_sandbox_skills_cache(payload: dict | None) -> None: |
| if not isinstance(payload, dict): |
| return |
| skills = payload.get("skills", []) |
| if not isinstance(skills, list): |
| return |
| SkillManager().set_sandbox_skills_cache(skills) |
|
|
|
|
| async def _apply_skills_to_sandbox(booter: ComputerBooter) -> None: |
| """Apply local skill bundle to sandbox filesystem only. |
| |
| This function is intentionally limited to file mutation. Metadata scanning is |
| executed in a separate phase to keep failure domains clear. |
| """ |
| logger.info("[Computer] Skill sync phase=apply start") |
| apply_result = await booter.shell.exec(_build_apply_sync_command()) |
| if not _shell_exec_succeeded(apply_result): |
| detail = _format_exec_error_detail(apply_result) |
| logger.error("[Computer] Skill sync phase=apply failed: %s", detail) |
| raise RuntimeError(f"Failed to apply sandbox skill sync strategy: {detail}") |
| logger.info("[Computer] Skill sync phase=apply done") |
|
|
|
|
| async def _scan_sandbox_skills(booter: ComputerBooter) -> dict | None: |
| """Scan sandbox skills and return normalized payload for cache update.""" |
| logger.info("[Computer] Skill sync phase=scan start") |
| scan_result = await booter.shell.exec(_build_scan_command()) |
| if not _shell_exec_succeeded(scan_result): |
| detail = _format_exec_error_detail(scan_result) |
| logger.error("[Computer] Skill sync phase=scan failed: %s", detail) |
| raise RuntimeError(f"Failed to scan sandbox skills after sync: {detail}") |
|
|
| payload = _decode_sync_payload(str(scan_result.get("stdout", "") or "")) |
| if payload is None: |
| logger.warning("[Computer] Skill sync phase=scan returned empty payload") |
| else: |
| logger.info("[Computer] Skill sync phase=scan done") |
| return payload |
|
|
|
|
| async def _sync_skills_to_sandbox(booter: ComputerBooter) -> None: |
| """Sync local skills to sandbox and refresh cache. |
| |
| Backward-compatible orchestrator: keep historical behavior while internally |
| splitting into `apply` and `scan` phases. |
| """ |
| skills_root = Path(get_astrbot_skills_path()) |
| if not skills_root.is_dir(): |
| return |
| local_skill_dirs = _list_local_skill_dirs(skills_root) |
|
|
| temp_dir = Path(get_astrbot_temp_path()) |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| zip_base = temp_dir / "skills_bundle" |
| zip_path = zip_base.with_suffix(".zip") |
|
|
| try: |
| if local_skill_dirs: |
| if zip_path.exists(): |
| zip_path.unlink() |
| shutil.make_archive(str(zip_base), "zip", str(skills_root)) |
| remote_zip = Path(SANDBOX_SKILLS_ROOT) / "skills.zip" |
| logger.info("Uploading skills bundle to sandbox...") |
| await booter.shell.exec(f"mkdir -p {SANDBOX_SKILLS_ROOT}") |
| upload_result = await booter.upload_file(str(zip_path), str(remote_zip)) |
| if not upload_result.get("success", False): |
| raise RuntimeError("Failed to upload skills bundle to sandbox.") |
| else: |
| logger.info( |
| "No local skills found. Keeping sandbox built-ins and refreshing metadata." |
| ) |
| await booter.shell.exec(f"rm -f {SANDBOX_SKILLS_ROOT}/skills.zip") |
|
|
| |
| |
| await _apply_skills_to_sandbox(booter) |
| payload = await _scan_sandbox_skills(booter) |
| _update_sandbox_skills_cache(payload) |
| managed = payload.get("managed_skills", []) if isinstance(payload, dict) else [] |
| logger.info( |
| "[Computer] Sandbox skill sync complete: managed=%d", |
| len(managed), |
| ) |
| finally: |
| if zip_path.exists(): |
| try: |
| zip_path.unlink() |
| except Exception: |
| logger.warning(f"Failed to remove temp skills zip: {zip_path}") |
|
|
|
|
| async def get_booter( |
| context: Context, |
| session_id: str, |
| ) -> ComputerBooter: |
| config = context.get_config(umo=session_id) |
|
|
| runtime = config.get("provider_settings", {}).get("computer_use_runtime", "local") |
| if runtime == "local": |
| return get_local_booter() |
| elif runtime == "none": |
| raise RuntimeError("Sandbox runtime is disabled by configuration.") |
|
|
| sandbox_cfg = config.get("provider_settings", {}).get("sandbox", {}) |
| booter_type = sandbox_cfg.get("booter", "shipyard_neo") |
|
|
| if session_id in session_booter: |
| booter = session_booter[session_id] |
| if not await booter.available(): |
| |
| session_booter.pop(session_id, None) |
| if session_id not in session_booter: |
| uuid_str = uuid.uuid5(uuid.NAMESPACE_DNS, session_id).hex |
| logger.info( |
| f"[Computer] Initializing booter: type={booter_type}, session={session_id}" |
| ) |
| if booter_type == "shipyard": |
| from .booters.shipyard import ShipyardBooter |
|
|
| ep = sandbox_cfg.get("shipyard_endpoint", "") |
| token = sandbox_cfg.get("shipyard_access_token", "") |
| ttl = sandbox_cfg.get("shipyard_ttl", 3600) |
| max_sessions = sandbox_cfg.get("shipyard_max_sessions", 10) |
|
|
| client = ShipyardBooter( |
| endpoint_url=ep, access_token=token, ttl=ttl, session_num=max_sessions |
| ) |
| elif booter_type == "shipyard_neo": |
| from .booters.shipyard_neo import ShipyardNeoBooter |
|
|
| ep = sandbox_cfg.get("shipyard_neo_endpoint", "") |
| token = sandbox_cfg.get("shipyard_neo_access_token", "") |
| ttl = sandbox_cfg.get("shipyard_neo_ttl", 3600) |
| profile = sandbox_cfg.get("shipyard_neo_profile", "python-default") |
|
|
| |
| if not token: |
| token = _discover_bay_credentials(ep) |
|
|
| logger.info( |
| f"[Computer] Shipyard Neo config: endpoint={ep}, profile={profile}, ttl={ttl}" |
| ) |
| client = ShipyardNeoBooter( |
| endpoint_url=ep, |
| access_token=token, |
| profile=profile, |
| ttl=ttl, |
| ) |
| elif booter_type == "boxlite": |
| from .booters.boxlite import BoxliteBooter |
|
|
| client = BoxliteBooter() |
| else: |
| raise ValueError(f"Unknown booter type: {booter_type}") |
|
|
| try: |
| await client.boot(uuid_str) |
| logger.info( |
| f"[Computer] Sandbox booted successfully: type={booter_type}, session={session_id}" |
| ) |
| await _sync_skills_to_sandbox(client) |
| except Exception as e: |
| logger.error(f"Error booting sandbox for session {session_id}: {e}") |
| raise e |
|
|
| session_booter[session_id] = client |
| return session_booter[session_id] |
|
|
|
|
| async def sync_skills_to_active_sandboxes() -> None: |
| """Best-effort skills synchronization for all active sandbox sessions.""" |
| logger.info( |
| "[Computer] Syncing skills to %d active sandbox(es)", len(session_booter) |
| ) |
| for session_id, booter in list(session_booter.items()): |
| try: |
| if not await booter.available(): |
| continue |
| await _sync_skills_to_sandbox(booter) |
| except Exception as e: |
| logger.warning( |
| "Failed to sync skills to sandbox for session %s: %s", |
| session_id, |
| e, |
| ) |
|
|
|
|
| def get_local_booter() -> ComputerBooter: |
| global local_booter |
| if local_booter is None: |
| local_booter = LocalBooter() |
| return local_booter |
|
|