File size: 8,365 Bytes
7d8b2b6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | """
a2a.py — Agent-to-Agent protocol adapters.
Implements the A2A standard for Purpose Agent:
- AgentCard: declares agent capabilities, schemas, and endpoints
- A2AClient: delegate tasks to remote agents
- publish_card(): expose local agents as A2A-compatible services
Security:
- Endpoint allowlist (only approved remote agents)
- Schema validation on delegation and response
- Timeout + circuit breaker for unreachable peers
- Trust tier system (local > verified > unknown)
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class TrustTier(str, Enum):
"""Trust level for remote agents."""
LOCAL = "local" # Same system, full trust
VERIFIED = "verified" # Authenticated, schema-validated
UNKNOWN = "unknown" # No verification, sandboxed
BLOCKED = "blocked" # Explicitly denied
@dataclass
class AgentCapability:
"""A single capability declared by an agent."""
name: str
description: str
input_schema: dict[str, Any] = field(default_factory=dict)
output_schema: dict[str, Any] = field(default_factory=dict)
@dataclass
class AgentCard:
"""
A2A Agent Card — declares an agent's identity, capabilities, and endpoint.
This is what gets published for other agents to discover and delegate to.
"""
agent_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
name: str = ""
description: str = ""
version: str = "1.0"
endpoint: str = "" # URL or local reference
capabilities: list[AgentCapability] = field(default_factory=list)
trust_tier: TrustTier = TrustTier.UNKNOWN
metadata: dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
def to_dict(self) -> dict[str, Any]:
return {
"agent_id": self.agent_id,
"name": self.name,
"description": self.description,
"version": self.version,
"endpoint": self.endpoint,
"capabilities": [
{"name": c.name, "description": c.description,
"input_schema": c.input_schema, "output_schema": c.output_schema}
for c in self.capabilities
],
"trust_tier": self.trust_tier.value,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentCard":
caps = [AgentCapability(**c) for c in d.get("capabilities", [])]
return cls(
agent_id=d.get("agent_id", ""),
name=d.get("name", ""),
description=d.get("description", ""),
version=d.get("version", "1.0"),
endpoint=d.get("endpoint", ""),
capabilities=caps,
trust_tier=TrustTier(d.get("trust_tier", "unknown")),
metadata=d.get("metadata", {}),
)
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
def has_capability(self, name: str) -> bool:
return any(c.name == name for c in self.capabilities)
@dataclass
class A2ATaskResult:
"""Result from a delegated A2A task."""
success: bool
output: dict[str, Any] = field(default_factory=dict)
error: str | None = None
duration_s: float = 0.0
agent_id: str = ""
class A2AClient:
"""
Client for delegating tasks to remote agents via A2A protocol.
Usage:
client = A2AClient()
client.register_peer(card)
result = client.delegate(
agent_id="remote_coder",
task="Write a fibonacci function",
timeout_s=30.0,
)
"""
def __init__(self, allowlist: list[str] | None = None):
self._peers: dict[str, AgentCard] = {}
self._allowlist = set(allowlist) if allowlist else None # None = allow all registered
self._circuit_breaker: dict[str, int] = {} # agent_id → consecutive failures
self._max_failures = 3
def register_peer(self, card: AgentCard) -> None:
"""Register a remote agent as a potential delegate."""
if card.trust_tier == TrustTier.BLOCKED:
logger.warning(f"A2A: rejected blocked agent '{card.name}'")
return
self._peers[card.agent_id] = card
logger.info(f"A2A: registered peer '{card.name}' ({card.agent_id}) trust={card.trust_tier.value}")
def delegate(
self,
agent_id: str,
task: str,
input_data: dict[str, Any] | None = None,
timeout_s: float = 30.0,
) -> A2ATaskResult:
"""
Delegate a task to a remote agent.
Returns A2ATaskResult with success/failure and output.
"""
card = self._peers.get(agent_id)
if not card:
return A2ATaskResult(success=False, error=f"Unknown agent: {agent_id}")
# Allowlist check
if self._allowlist and agent_id not in self._allowlist:
return A2ATaskResult(success=False, error=f"Agent '{agent_id}' not in allowlist")
# Circuit breaker
if self._circuit_breaker.get(agent_id, 0) >= self._max_failures:
return A2ATaskResult(
success=False,
error=f"Circuit breaker open for '{card.name}' (>{self._max_failures} failures)",
agent_id=agent_id,
)
t0 = time.time()
try:
result = self._execute_delegation(card, task, input_data or {}, timeout_s)
# Reset circuit breaker on success
self._circuit_breaker[agent_id] = 0
result.duration_s = time.time() - t0
result.agent_id = agent_id
return result
except Exception as e:
# Increment circuit breaker
self._circuit_breaker[agent_id] = self._circuit_breaker.get(agent_id, 0) + 1
return A2ATaskResult(
success=False,
error=f"Delegation failed: {e}",
duration_s=time.time() - t0,
agent_id=agent_id,
)
def _execute_delegation(
self, card: AgentCard, task: str, input_data: dict, timeout_s: float
) -> A2ATaskResult:
"""
Execute the actual delegation. Protocol skeleton.
Real implementation requires HTTP client for remote agents,
or direct function call for local agents.
"""
# For local trust tier, try direct invocation
if card.trust_tier == TrustTier.LOCAL and card.endpoint.startswith("local:"):
# Future: invoke local agent directly
pass
# Protocol skeleton — real transport needs purpose-agent[a2a] extra
return A2ATaskResult(
success=False,
error=f"A2A transport not available. Install: pip install purpose-agent[a2a]. "
f"Would delegate to: {card.endpoint}",
)
def list_peers(self) -> list[AgentCard]:
"""List all registered peer agents."""
return list(self._peers.values())
def find_capable(self, capability: str) -> list[AgentCard]:
"""Find peers that declare a specific capability."""
return [c for c in self._peers.values() if c.has_capability(capability)]
@property
def peer_count(self) -> int:
return len(self._peers)
def publish_card(
name: str,
description: str,
capabilities: list[dict[str, str]],
endpoint: str = "",
) -> AgentCard:
"""
Create and return an AgentCard for a local Purpose Agent.
Usage:
card = publish_card(
name="code_reviewer",
description="Reviews Python code for bugs and style",
capabilities=[
{"name": "code_review", "description": "Review Python code"},
{"name": "security_audit", "description": "Check for security issues"},
],
endpoint="local:code_reviewer",
)
"""
caps = [AgentCapability(name=c["name"], description=c.get("description", "")) for c in capabilities]
return AgentCard(
name=name,
description=description,
capabilities=caps,
endpoint=endpoint,
trust_tier=TrustTier.LOCAL,
)
|