Rohan03's picture
Sprint 4A: MCP bridge — discover, validate, call remote tools via MCP
820edf0 verified
"""
mcp_bridge.py — Model Context Protocol bridge for Purpose Agent.
Connects to MCP servers (local or remote), discovers their tools,
wraps them as Purpose Agent Tool instances, and enforces security policy.
Security:
- Allowlist-based: only explicitly approved servers/tools are callable
- Argument validation against MCP schema before execution
- Timeout enforcement on all calls
- All MCP calls logged as tool events
Usage:
bridge = MCPToolBridge()
bridge.add_server("calc", url="http://localhost:3001", allowlist=["add", "multiply"])
# Discover tools
tools = bridge.discover()
# Use in agent
spark = pa.Spark("assistant", tools=tools)
# Or add to team
team.add_mcp_server("http://localhost:3001")
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any
from purpose_agent.tools import Tool, ToolResult
logger = logging.getLogger(__name__)
@dataclass
class MCPServerConfig:
"""Configuration for a single MCP server connection."""
name: str
url: str
allowlist: list[str] = field(default_factory=list) # Empty = allow all discovered
denylist: list[str] = field(default_factory=list) # Always block these
timeout_s: float = 30.0
max_retries: int = 2
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class MCPToolSchema:
"""Schema for a tool discovered from an MCP server."""
name: str
description: str
parameters: dict[str, Any] = field(default_factory=dict)
server_name: str = ""
server_url: str = ""
class MCPTool(Tool):
"""
A Purpose Agent Tool backed by a remote MCP server.
Wraps an MCP tool call with validation, timeout, and security checks.
"""
def __init__(self, schema: MCPToolSchema, config: MCPServerConfig):
self.name = f"mcp:{config.name}:{schema.name}"
self.description = f"[MCP:{config.name}] {schema.description}"
self.parameters = schema.parameters
self._schema = schema
self._config = config
self.timeout_seconds = config.timeout_s
self.max_retries = config.max_retries
def execute(self, **kwargs) -> str:
"""
Execute the MCP tool call.
In production, this would make an HTTP/stdio call to the MCP server.
Current implementation is a protocol skeleton — actual transport
requires `purpose-agent[mcp]` extra with an MCP client library.
"""
# Validate against allowlist
tool_name = self._schema.name
if self._config.denylist and tool_name in self._config.denylist:
return f"Error: tool '{tool_name}' is denied by security policy"
if self._config.allowlist and tool_name not in self._config.allowlist:
return f"Error: tool '{tool_name}' not in allowlist for server '{self._config.name}'"
# Log the call
logger.info(f"MCP call: {self._config.name}/{tool_name} args={json.dumps(kwargs, default=str)[:200]}")
# Attempt the call (protocol skeleton)
try:
result = self._call_mcp_server(tool_name, kwargs)
return result
except MCPConnectionError as e:
return f"Error: MCP server '{self._config.name}' unavailable: {e}"
except MCPTimeoutError as e:
return f"Error: MCP call timed out after {self._config.timeout_s}s"
except Exception as e:
return f"Error: MCP call failed: {e}"
def _call_mcp_server(self, tool_name: str, args: dict) -> str:
"""
Make the actual MCP call. Protocol-level implementation.
This is a skeleton — real transport (HTTP/stdio/SSE) requires
the optional mcp client library.
"""
# Check if mcp client is available
try:
# Future: from mcp import Client
raise ImportError("MCP client not installed")
except ImportError:
# Graceful degradation: return helpful error
return (
f"MCP transport not available. Install: pip install purpose-agent[mcp]\n"
f"Would call: {self._config.url} / {tool_name}({json.dumps(args, default=str)[:100]})"
)
class MCPConnectionError(Exception):
pass
class MCPTimeoutError(Exception):
pass
class MCPToolBridge:
"""
Bridge that connects Purpose Agent to MCP tool servers.
Manages multiple MCP servers, discovers their tools,
and exposes them as standard Purpose Agent Tools.
Usage:
bridge = MCPToolBridge()
# Add a server
bridge.add_server("math", url="http://localhost:3001",
allowlist=["add", "multiply", "divide"])
# Discover all tools
tools = bridge.discover_all()
# Use with any agent
spark = pa.Spark("helper", tools=tools)
"""
def __init__(self):
self._servers: dict[str, MCPServerConfig] = {}
self._discovered_tools: list[MCPTool] = []
def add_server(
self,
name: str,
url: str,
allowlist: list[str] | None = None,
denylist: list[str] | None = None,
timeout_s: float = 30.0,
) -> None:
"""Register an MCP server."""
config = MCPServerConfig(
name=name, url=url,
allowlist=allowlist or [],
denylist=denylist or [],
timeout_s=timeout_s,
)
self._servers[name] = config
logger.info(f"MCP: registered server '{name}' at {url}")
def discover_all(self) -> list[MCPTool]:
"""Discover tools from all registered servers."""
tools = []
for name, config in self._servers.items():
server_tools = self._discover_server(config)
tools.extend(server_tools)
self._discovered_tools = tools
return tools
def _discover_server(self, config: MCPServerConfig) -> list[MCPTool]:
"""
Discover tools from a single MCP server.
In production, this calls the MCP server's tool/list endpoint.
Skeleton returns empty until transport is available.
"""
logger.info(f"MCP: discovering tools from '{config.name}' at {config.url}")
# Protocol skeleton — real discovery needs MCP client
# Future: response = mcp_client.list_tools(config.url)
# For now, return empty (no tools discovered without transport)
return []
def get_tools(self) -> list[MCPTool]:
"""Get previously discovered tools."""
return self._discovered_tools
def remove_server(self, name: str) -> None:
"""Remove a registered server."""
self._servers.pop(name, None)
self._discovered_tools = [t for t in self._discovered_tools if t._config.name != name]
@property
def server_count(self) -> int:
return len(self._servers)
@property
def tool_count(self) -> int:
return len(self._discovered_tools)