""" mcp_bridge.py — Model Context Protocol bridge for Purpose Agent. Connects to MCP servers (local or remote), discovers their tools, wraps them as Purpose Agent Tool instances, and enforces security policy. Security: - Allowlist-based: only explicitly approved servers/tools are callable - Argument validation against MCP schema before execution - Timeout enforcement on all calls - All MCP calls logged as tool events Usage: bridge = MCPToolBridge() bridge.add_server("calc", url="http://localhost:3001", allowlist=["add", "multiply"]) # Discover tools tools = bridge.discover() # Use in agent spark = pa.Spark("assistant", tools=tools) # Or add to team team.add_mcp_server("http://localhost:3001") """ from __future__ import annotations import json import logging import time from dataclasses import dataclass, field from typing import Any from purpose_agent.tools import Tool, ToolResult logger = logging.getLogger(__name__) @dataclass class MCPServerConfig: """Configuration for a single MCP server connection.""" name: str url: str allowlist: list[str] = field(default_factory=list) # Empty = allow all discovered denylist: list[str] = field(default_factory=list) # Always block these timeout_s: float = 30.0 max_retries: int = 2 metadata: dict[str, Any] = field(default_factory=dict) @dataclass class MCPToolSchema: """Schema for a tool discovered from an MCP server.""" name: str description: str parameters: dict[str, Any] = field(default_factory=dict) server_name: str = "" server_url: str = "" class MCPTool(Tool): """ A Purpose Agent Tool backed by a remote MCP server. Wraps an MCP tool call with validation, timeout, and security checks. """ def __init__(self, schema: MCPToolSchema, config: MCPServerConfig): self.name = f"mcp:{config.name}:{schema.name}" self.description = f"[MCP:{config.name}] {schema.description}" self.parameters = schema.parameters self._schema = schema self._config = config self.timeout_seconds = config.timeout_s self.max_retries = config.max_retries def execute(self, **kwargs) -> str: """ Execute the MCP tool call. In production, this would make an HTTP/stdio call to the MCP server. Current implementation is a protocol skeleton — actual transport requires `purpose-agent[mcp]` extra with an MCP client library. """ # Validate against allowlist tool_name = self._schema.name if self._config.denylist and tool_name in self._config.denylist: return f"Error: tool '{tool_name}' is denied by security policy" if self._config.allowlist and tool_name not in self._config.allowlist: return f"Error: tool '{tool_name}' not in allowlist for server '{self._config.name}'" # Log the call logger.info(f"MCP call: {self._config.name}/{tool_name} args={json.dumps(kwargs, default=str)[:200]}") # Attempt the call (protocol skeleton) try: result = self._call_mcp_server(tool_name, kwargs) return result except MCPConnectionError as e: return f"Error: MCP server '{self._config.name}' unavailable: {e}" except MCPTimeoutError as e: return f"Error: MCP call timed out after {self._config.timeout_s}s" except Exception as e: return f"Error: MCP call failed: {e}" def _call_mcp_server(self, tool_name: str, args: dict) -> str: """ Make the actual MCP call. Protocol-level implementation. This is a skeleton — real transport (HTTP/stdio/SSE) requires the optional mcp client library. """ # Check if mcp client is available try: # Future: from mcp import Client raise ImportError("MCP client not installed") except ImportError: # Graceful degradation: return helpful error return ( f"MCP transport not available. Install: pip install purpose-agent[mcp]\n" f"Would call: {self._config.url} / {tool_name}({json.dumps(args, default=str)[:100]})" ) class MCPConnectionError(Exception): pass class MCPTimeoutError(Exception): pass class MCPToolBridge: """ Bridge that connects Purpose Agent to MCP tool servers. Manages multiple MCP servers, discovers their tools, and exposes them as standard Purpose Agent Tools. Usage: bridge = MCPToolBridge() # Add a server bridge.add_server("math", url="http://localhost:3001", allowlist=["add", "multiply", "divide"]) # Discover all tools tools = bridge.discover_all() # Use with any agent spark = pa.Spark("helper", tools=tools) """ def __init__(self): self._servers: dict[str, MCPServerConfig] = {} self._discovered_tools: list[MCPTool] = [] def add_server( self, name: str, url: str, allowlist: list[str] | None = None, denylist: list[str] | None = None, timeout_s: float = 30.0, ) -> None: """Register an MCP server.""" config = MCPServerConfig( name=name, url=url, allowlist=allowlist or [], denylist=denylist or [], timeout_s=timeout_s, ) self._servers[name] = config logger.info(f"MCP: registered server '{name}' at {url}") def discover_all(self) -> list[MCPTool]: """Discover tools from all registered servers.""" tools = [] for name, config in self._servers.items(): server_tools = self._discover_server(config) tools.extend(server_tools) self._discovered_tools = tools return tools def _discover_server(self, config: MCPServerConfig) -> list[MCPTool]: """ Discover tools from a single MCP server. In production, this calls the MCP server's tool/list endpoint. Skeleton returns empty until transport is available. """ logger.info(f"MCP: discovering tools from '{config.name}' at {config.url}") # Protocol skeleton — real discovery needs MCP client # Future: response = mcp_client.list_tools(config.url) # For now, return empty (no tools discovered without transport) return [] def get_tools(self) -> list[MCPTool]: """Get previously discovered tools.""" return self._discovered_tools def remove_server(self, name: str) -> None: """Remove a registered server.""" self._servers.pop(name, None) self._discovered_tools = [t for t in self._discovered_tools if t._config.name != name] @property def server_count(self) -> int: return len(self._servers) @property def tool_count(self) -> int: return len(self._discovered_tools)