| """ |
| mcp_bridge.py — Model Context Protocol bridge for Purpose Agent. |
| |
| Connects to MCP servers (local or remote), discovers their tools, |
| wraps them as Purpose Agent Tool instances, and enforces security policy. |
| |
| Security: |
| - Allowlist-based: only explicitly approved servers/tools are callable |
| - Argument validation against MCP schema before execution |
| - Timeout enforcement on all calls |
| - All MCP calls logged as tool events |
| |
| Usage: |
| bridge = MCPToolBridge() |
| bridge.add_server("calc", url="http://localhost:3001", allowlist=["add", "multiply"]) |
| |
| # Discover tools |
| tools = bridge.discover() |
| |
| # Use in agent |
| spark = pa.Spark("assistant", tools=tools) |
| |
| # Or add to team |
| team.add_mcp_server("http://localhost:3001") |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| from purpose_agent.tools import Tool, ToolResult |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class MCPServerConfig: |
| """Configuration for a single MCP server connection.""" |
| name: str |
| url: str |
| allowlist: list[str] = field(default_factory=list) |
| denylist: list[str] = field(default_factory=list) |
| timeout_s: float = 30.0 |
| max_retries: int = 2 |
| metadata: dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class MCPToolSchema: |
| """Schema for a tool discovered from an MCP server.""" |
| name: str |
| description: str |
| parameters: dict[str, Any] = field(default_factory=dict) |
| server_name: str = "" |
| server_url: str = "" |
|
|
|
|
| class MCPTool(Tool): |
| """ |
| A Purpose Agent Tool backed by a remote MCP server. |
| |
| Wraps an MCP tool call with validation, timeout, and security checks. |
| """ |
|
|
| def __init__(self, schema: MCPToolSchema, config: MCPServerConfig): |
| self.name = f"mcp:{config.name}:{schema.name}" |
| self.description = f"[MCP:{config.name}] {schema.description}" |
| self.parameters = schema.parameters |
| self._schema = schema |
| self._config = config |
| self.timeout_seconds = config.timeout_s |
| self.max_retries = config.max_retries |
|
|
| def execute(self, **kwargs) -> str: |
| """ |
| Execute the MCP tool call. |
| |
| In production, this would make an HTTP/stdio call to the MCP server. |
| Current implementation is a protocol skeleton — actual transport |
| requires `purpose-agent[mcp]` extra with an MCP client library. |
| """ |
| |
| tool_name = self._schema.name |
| if self._config.denylist and tool_name in self._config.denylist: |
| return f"Error: tool '{tool_name}' is denied by security policy" |
|
|
| if self._config.allowlist and tool_name not in self._config.allowlist: |
| return f"Error: tool '{tool_name}' not in allowlist for server '{self._config.name}'" |
|
|
| |
| logger.info(f"MCP call: {self._config.name}/{tool_name} args={json.dumps(kwargs, default=str)[:200]}") |
|
|
| |
| try: |
| result = self._call_mcp_server(tool_name, kwargs) |
| return result |
| except MCPConnectionError as e: |
| return f"Error: MCP server '{self._config.name}' unavailable: {e}" |
| except MCPTimeoutError as e: |
| return f"Error: MCP call timed out after {self._config.timeout_s}s" |
| except Exception as e: |
| return f"Error: MCP call failed: {e}" |
|
|
| def _call_mcp_server(self, tool_name: str, args: dict) -> str: |
| """ |
| Make the actual MCP call. Protocol-level implementation. |
| |
| This is a skeleton — real transport (HTTP/stdio/SSE) requires |
| the optional mcp client library. |
| """ |
| |
| try: |
| |
| raise ImportError("MCP client not installed") |
| except ImportError: |
| |
| return ( |
| f"MCP transport not available. Install: pip install purpose-agent[mcp]\n" |
| f"Would call: {self._config.url} / {tool_name}({json.dumps(args, default=str)[:100]})" |
| ) |
|
|
|
|
| class MCPConnectionError(Exception): |
| pass |
|
|
| class MCPTimeoutError(Exception): |
| pass |
|
|
|
|
| class MCPToolBridge: |
| """ |
| Bridge that connects Purpose Agent to MCP tool servers. |
| |
| Manages multiple MCP servers, discovers their tools, |
| and exposes them as standard Purpose Agent Tools. |
| |
| Usage: |
| bridge = MCPToolBridge() |
| |
| # Add a server |
| bridge.add_server("math", url="http://localhost:3001", |
| allowlist=["add", "multiply", "divide"]) |
| |
| # Discover all tools |
| tools = bridge.discover_all() |
| |
| # Use with any agent |
| spark = pa.Spark("helper", tools=tools) |
| """ |
|
|
| def __init__(self): |
| self._servers: dict[str, MCPServerConfig] = {} |
| self._discovered_tools: list[MCPTool] = [] |
|
|
| def add_server( |
| self, |
| name: str, |
| url: str, |
| allowlist: list[str] | None = None, |
| denylist: list[str] | None = None, |
| timeout_s: float = 30.0, |
| ) -> None: |
| """Register an MCP server.""" |
| config = MCPServerConfig( |
| name=name, url=url, |
| allowlist=allowlist or [], |
| denylist=denylist or [], |
| timeout_s=timeout_s, |
| ) |
| self._servers[name] = config |
| logger.info(f"MCP: registered server '{name}' at {url}") |
|
|
| def discover_all(self) -> list[MCPTool]: |
| """Discover tools from all registered servers.""" |
| tools = [] |
| for name, config in self._servers.items(): |
| server_tools = self._discover_server(config) |
| tools.extend(server_tools) |
| self._discovered_tools = tools |
| return tools |
|
|
| def _discover_server(self, config: MCPServerConfig) -> list[MCPTool]: |
| """ |
| Discover tools from a single MCP server. |
| |
| In production, this calls the MCP server's tool/list endpoint. |
| Skeleton returns empty until transport is available. |
| """ |
| logger.info(f"MCP: discovering tools from '{config.name}' at {config.url}") |
|
|
| |
| |
| |
| return [] |
|
|
| def get_tools(self) -> list[MCPTool]: |
| """Get previously discovered tools.""" |
| return self._discovered_tools |
|
|
| def remove_server(self, name: str) -> None: |
| """Remove a registered server.""" |
| self._servers.pop(name, None) |
| self._discovered_tools = [t for t in self._discovered_tools if t._config.name != name] |
|
|
| @property |
| def server_count(self) -> int: |
| return len(self._servers) |
|
|
| @property |
| def tool_count(self) -> int: |
| return len(self._discovered_tools) |
|
|