File size: 7,084 Bytes
820edf0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""
mcp_bridge.py — Model Context Protocol bridge for Purpose Agent.

Connects to MCP servers (local or remote), discovers their tools,
wraps them as Purpose Agent Tool instances, and enforces security policy.

Security:
  - Allowlist-based: only explicitly approved servers/tools are callable
  - Argument validation against MCP schema before execution
  - Timeout enforcement on all calls
  - All MCP calls logged as tool events

Usage:
    bridge = MCPToolBridge()
    bridge.add_server("calc", url="http://localhost:3001", allowlist=["add", "multiply"])
    
    # Discover tools
    tools = bridge.discover()
    
    # Use in agent
    spark = pa.Spark("assistant", tools=tools)
    
    # Or add to team
    team.add_mcp_server("http://localhost:3001")
"""
from __future__ import annotations

import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any

from purpose_agent.tools import Tool, ToolResult

logger = logging.getLogger(__name__)


@dataclass
class MCPServerConfig:
    """Configuration for a single MCP server connection."""
    name: str
    url: str
    allowlist: list[str] = field(default_factory=list)   # Empty = allow all discovered
    denylist: list[str] = field(default_factory=list)    # Always block these
    timeout_s: float = 30.0
    max_retries: int = 2
    metadata: dict[str, Any] = field(default_factory=dict)


@dataclass
class MCPToolSchema:
    """Schema for a tool discovered from an MCP server."""
    name: str
    description: str
    parameters: dict[str, Any] = field(default_factory=dict)
    server_name: str = ""
    server_url: str = ""


class MCPTool(Tool):
    """
    A Purpose Agent Tool backed by a remote MCP server.
    
    Wraps an MCP tool call with validation, timeout, and security checks.
    """

    def __init__(self, schema: MCPToolSchema, config: MCPServerConfig):
        self.name = f"mcp:{config.name}:{schema.name}"
        self.description = f"[MCP:{config.name}] {schema.description}"
        self.parameters = schema.parameters
        self._schema = schema
        self._config = config
        self.timeout_seconds = config.timeout_s
        self.max_retries = config.max_retries

    def execute(self, **kwargs) -> str:
        """
        Execute the MCP tool call.
        
        In production, this would make an HTTP/stdio call to the MCP server.
        Current implementation is a protocol skeleton — actual transport
        requires `purpose-agent[mcp]` extra with an MCP client library.
        """
        # Validate against allowlist
        tool_name = self._schema.name
        if self._config.denylist and tool_name in self._config.denylist:
            return f"Error: tool '{tool_name}' is denied by security policy"

        if self._config.allowlist and tool_name not in self._config.allowlist:
            return f"Error: tool '{tool_name}' not in allowlist for server '{self._config.name}'"

        # Log the call
        logger.info(f"MCP call: {self._config.name}/{tool_name} args={json.dumps(kwargs, default=str)[:200]}")

        # Attempt the call (protocol skeleton)
        try:
            result = self._call_mcp_server(tool_name, kwargs)
            return result
        except MCPConnectionError as e:
            return f"Error: MCP server '{self._config.name}' unavailable: {e}"
        except MCPTimeoutError as e:
            return f"Error: MCP call timed out after {self._config.timeout_s}s"
        except Exception as e:
            return f"Error: MCP call failed: {e}"

    def _call_mcp_server(self, tool_name: str, args: dict) -> str:
        """
        Make the actual MCP call. Protocol-level implementation.
        
        This is a skeleton — real transport (HTTP/stdio/SSE) requires
        the optional mcp client library.
        """
        # Check if mcp client is available
        try:
            # Future: from mcp import Client
            raise ImportError("MCP client not installed")
        except ImportError:
            # Graceful degradation: return helpful error
            return (
                f"MCP transport not available. Install: pip install purpose-agent[mcp]\n"
                f"Would call: {self._config.url} / {tool_name}({json.dumps(args, default=str)[:100]})"
            )


class MCPConnectionError(Exception):
    pass

class MCPTimeoutError(Exception):
    pass


class MCPToolBridge:
    """
    Bridge that connects Purpose Agent to MCP tool servers.
    
    Manages multiple MCP servers, discovers their tools,
    and exposes them as standard Purpose Agent Tools.
    
    Usage:
        bridge = MCPToolBridge()
        
        # Add a server
        bridge.add_server("math", url="http://localhost:3001", 
                         allowlist=["add", "multiply", "divide"])
        
        # Discover all tools
        tools = bridge.discover_all()
        
        # Use with any agent
        spark = pa.Spark("helper", tools=tools)
    """

    def __init__(self):
        self._servers: dict[str, MCPServerConfig] = {}
        self._discovered_tools: list[MCPTool] = []

    def add_server(
        self,
        name: str,
        url: str,
        allowlist: list[str] | None = None,
        denylist: list[str] | None = None,
        timeout_s: float = 30.0,
    ) -> None:
        """Register an MCP server."""
        config = MCPServerConfig(
            name=name, url=url,
            allowlist=allowlist or [],
            denylist=denylist or [],
            timeout_s=timeout_s,
        )
        self._servers[name] = config
        logger.info(f"MCP: registered server '{name}' at {url}")

    def discover_all(self) -> list[MCPTool]:
        """Discover tools from all registered servers."""
        tools = []
        for name, config in self._servers.items():
            server_tools = self._discover_server(config)
            tools.extend(server_tools)
        self._discovered_tools = tools
        return tools

    def _discover_server(self, config: MCPServerConfig) -> list[MCPTool]:
        """
        Discover tools from a single MCP server.
        
        In production, this calls the MCP server's tool/list endpoint.
        Skeleton returns empty until transport is available.
        """
        logger.info(f"MCP: discovering tools from '{config.name}' at {config.url}")

        # Protocol skeleton — real discovery needs MCP client
        # Future: response = mcp_client.list_tools(config.url)
        # For now, return empty (no tools discovered without transport)
        return []

    def get_tools(self) -> list[MCPTool]:
        """Get previously discovered tools."""
        return self._discovered_tools

    def remove_server(self, name: str) -> None:
        """Remove a registered server."""
        self._servers.pop(name, None)
        self._discovered_tools = [t for t in self._discovered_tools if t._config.name != name]

    @property
    def server_count(self) -> int:
        return len(self._servers)

    @property
    def tool_count(self) -> int:
        return len(self._discovered_tools)