| import os |
| import uuid |
| from dataclasses import dataclass, field |
|
|
| from astrbot.api import FunctionTool, logger |
| from astrbot.api.event import MessageChain |
| from astrbot.core.agent.run_context import ContextWrapper |
| from astrbot.core.agent.tool import ToolExecResult |
| from astrbot.core.astr_agent_context import AstrAgentContext |
| from astrbot.core.message.components import File |
| from astrbot.core.utils.astrbot_path import get_astrbot_temp_path |
|
|
| from ..computer_client import get_booter |
| from .permissions import check_admin_permission |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| @dataclass |
| class FileUploadTool(FunctionTool): |
| name: str = "astrbot_upload_file" |
| description: str = "Upload a local file to the sandbox. The file must exist on the local filesystem." |
| parameters: dict = field( |
| default_factory=lambda: { |
| "type": "object", |
| "properties": { |
| "local_path": { |
| "type": "string", |
| "description": "The local file path to upload. This must be an absolute path to an existing file on the local filesystem.", |
| }, |
| |
| |
| |
| |
| }, |
| "required": ["local_path"], |
| } |
| ) |
|
|
| async def call( |
| self, |
| context: ContextWrapper[AstrAgentContext], |
| local_path: str, |
| ) -> str | None: |
| if permission_error := check_admin_permission(context, "File upload/download"): |
| return permission_error |
| sb = await get_booter( |
| context.context.context, |
| context.context.event.unified_msg_origin, |
| ) |
| try: |
| |
| if not os.path.exists(local_path): |
| return f"Error: File does not exist: {local_path}" |
|
|
| if not os.path.isfile(local_path): |
| return f"Error: Path is not a file: {local_path}" |
|
|
| |
| remote_path = os.path.basename(local_path) |
|
|
| |
| result = await sb.upload_file(local_path, remote_path) |
| logger.debug(f"Upload result: {result}") |
| success = result.get("success", False) |
|
|
| if not success: |
| return f"Error uploading file: {result.get('message', 'Unknown error')}" |
|
|
| file_path = result.get("file_path", "") |
| logger.info(f"File {local_path} uploaded to sandbox at {file_path}") |
|
|
| return f"File uploaded successfully to {file_path}" |
| except Exception as e: |
| logger.error(f"Error uploading file {local_path}: {e}") |
| return f"Error uploading file: {str(e)}" |
|
|
|
|
| @dataclass |
| class FileDownloadTool(FunctionTool): |
| name: str = "astrbot_download_file" |
| description: str = "Download a file from the sandbox. Only call this when user explicitly need you to download a file." |
| parameters: dict = field( |
| default_factory=lambda: { |
| "type": "object", |
| "properties": { |
| "remote_path": { |
| "type": "string", |
| "description": "The path of the file in the sandbox to download.", |
| }, |
| "also_send_to_user": { |
| "type": "boolean", |
| "description": "Whether to also send the downloaded file to the user via message. Defaults to true.", |
| }, |
| }, |
| "required": ["remote_path"], |
| } |
| ) |
|
|
| async def call( |
| self, |
| context: ContextWrapper[AstrAgentContext], |
| remote_path: str, |
| also_send_to_user: bool = True, |
| ) -> ToolExecResult: |
| if permission_error := check_admin_permission(context, "File upload/download"): |
| return permission_error |
| sb = await get_booter( |
| context.context.context, |
| context.context.event.unified_msg_origin, |
| ) |
| try: |
| name = os.path.basename(remote_path) |
|
|
| local_path = os.path.join( |
| get_astrbot_temp_path(), f"sandbox_{uuid.uuid4().hex[:4]}_{name}" |
| ) |
|
|
| |
| await sb.download_file(remote_path, local_path) |
| logger.info(f"File {remote_path} downloaded from sandbox to {local_path}") |
|
|
| if also_send_to_user: |
| try: |
| name = os.path.basename(local_path) |
| await context.context.event.send( |
| MessageChain(chain=[File(name=name, file=local_path)]) |
| ) |
| except Exception as e: |
| logger.error(f"Error sending file message: {e}") |
|
|
| |
| |
| |
| |
| |
|
|
| return f"File downloaded successfully to {local_path} and sent to user." |
|
|
| return f"File downloaded successfully to {local_path}" |
| except Exception as e: |
| logger.error(f"Error downloading file {remote_path}: {e}") |
| return f"Error downloading file: {str(e)}" |
|
|