File size: 7,084 Bytes
820edf0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | """
mcp_bridge.py — Model Context Protocol bridge for Purpose Agent.
Connects to MCP servers (local or remote), discovers their tools,
wraps them as Purpose Agent Tool instances, and enforces security policy.
Security:
- Allowlist-based: only explicitly approved servers/tools are callable
- Argument validation against MCP schema before execution
- Timeout enforcement on all calls
- All MCP calls logged as tool events
Usage:
bridge = MCPToolBridge()
bridge.add_server("calc", url="http://localhost:3001", allowlist=["add", "multiply"])
# Discover tools
tools = bridge.discover()
# Use in agent
spark = pa.Spark("assistant", tools=tools)
# Or add to team
team.add_mcp_server("http://localhost:3001")
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any
from purpose_agent.tools import Tool, ToolResult
logger = logging.getLogger(__name__)
@dataclass
class MCPServerConfig:
"""Configuration for a single MCP server connection."""
name: str
url: str
allowlist: list[str] = field(default_factory=list) # Empty = allow all discovered
denylist: list[str] = field(default_factory=list) # Always block these
timeout_s: float = 30.0
max_retries: int = 2
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class MCPToolSchema:
"""Schema for a tool discovered from an MCP server."""
name: str
description: str
parameters: dict[str, Any] = field(default_factory=dict)
server_name: str = ""
server_url: str = ""
class MCPTool(Tool):
"""
A Purpose Agent Tool backed by a remote MCP server.
Wraps an MCP tool call with validation, timeout, and security checks.
"""
def __init__(self, schema: MCPToolSchema, config: MCPServerConfig):
self.name = f"mcp:{config.name}:{schema.name}"
self.description = f"[MCP:{config.name}] {schema.description}"
self.parameters = schema.parameters
self._schema = schema
self._config = config
self.timeout_seconds = config.timeout_s
self.max_retries = config.max_retries
def execute(self, **kwargs) -> str:
"""
Execute the MCP tool call.
In production, this would make an HTTP/stdio call to the MCP server.
Current implementation is a protocol skeleton — actual transport
requires `purpose-agent[mcp]` extra with an MCP client library.
"""
# Validate against allowlist
tool_name = self._schema.name
if self._config.denylist and tool_name in self._config.denylist:
return f"Error: tool '{tool_name}' is denied by security policy"
if self._config.allowlist and tool_name not in self._config.allowlist:
return f"Error: tool '{tool_name}' not in allowlist for server '{self._config.name}'"
# Log the call
logger.info(f"MCP call: {self._config.name}/{tool_name} args={json.dumps(kwargs, default=str)[:200]}")
# Attempt the call (protocol skeleton)
try:
result = self._call_mcp_server(tool_name, kwargs)
return result
except MCPConnectionError as e:
return f"Error: MCP server '{self._config.name}' unavailable: {e}"
except MCPTimeoutError as e:
return f"Error: MCP call timed out after {self._config.timeout_s}s"
except Exception as e:
return f"Error: MCP call failed: {e}"
def _call_mcp_server(self, tool_name: str, args: dict) -> str:
"""
Make the actual MCP call. Protocol-level implementation.
This is a skeleton — real transport (HTTP/stdio/SSE) requires
the optional mcp client library.
"""
# Check if mcp client is available
try:
# Future: from mcp import Client
raise ImportError("MCP client not installed")
except ImportError:
# Graceful degradation: return helpful error
return (
f"MCP transport not available. Install: pip install purpose-agent[mcp]\n"
f"Would call: {self._config.url} / {tool_name}({json.dumps(args, default=str)[:100]})"
)
class MCPConnectionError(Exception):
pass
class MCPTimeoutError(Exception):
pass
class MCPToolBridge:
"""
Bridge that connects Purpose Agent to MCP tool servers.
Manages multiple MCP servers, discovers their tools,
and exposes them as standard Purpose Agent Tools.
Usage:
bridge = MCPToolBridge()
# Add a server
bridge.add_server("math", url="http://localhost:3001",
allowlist=["add", "multiply", "divide"])
# Discover all tools
tools = bridge.discover_all()
# Use with any agent
spark = pa.Spark("helper", tools=tools)
"""
def __init__(self):
self._servers: dict[str, MCPServerConfig] = {}
self._discovered_tools: list[MCPTool] = []
def add_server(
self,
name: str,
url: str,
allowlist: list[str] | None = None,
denylist: list[str] | None = None,
timeout_s: float = 30.0,
) -> None:
"""Register an MCP server."""
config = MCPServerConfig(
name=name, url=url,
allowlist=allowlist or [],
denylist=denylist or [],
timeout_s=timeout_s,
)
self._servers[name] = config
logger.info(f"MCP: registered server '{name}' at {url}")
def discover_all(self) -> list[MCPTool]:
"""Discover tools from all registered servers."""
tools = []
for name, config in self._servers.items():
server_tools = self._discover_server(config)
tools.extend(server_tools)
self._discovered_tools = tools
return tools
def _discover_server(self, config: MCPServerConfig) -> list[MCPTool]:
"""
Discover tools from a single MCP server.
In production, this calls the MCP server's tool/list endpoint.
Skeleton returns empty until transport is available.
"""
logger.info(f"MCP: discovering tools from '{config.name}' at {config.url}")
# Protocol skeleton — real discovery needs MCP client
# Future: response = mcp_client.list_tools(config.url)
# For now, return empty (no tools discovered without transport)
return []
def get_tools(self) -> list[MCPTool]:
"""Get previously discovered tools."""
return self._discovered_tools
def remove_server(self, name: str) -> None:
"""Remove a registered server."""
self._servers.pop(name, None)
self._discovered_tools = [t for t in self._discovered_tools if t._config.name != name]
@property
def server_count(self) -> int:
return len(self._servers)
@property
def tool_count(self) -> int:
return len(self._discovered_tools)
|