Rohan03's picture
Sprint 4B: A2A — AgentCard, A2AClient for agent-to-agent delegation
7d8b2b6 verified
"""
a2a.py — Agent-to-Agent protocol adapters.
Implements the A2A standard for Purpose Agent:
- AgentCard: declares agent capabilities, schemas, and endpoints
- A2AClient: delegate tasks to remote agents
- publish_card(): expose local agents as A2A-compatible services
Security:
- Endpoint allowlist (only approved remote agents)
- Schema validation on delegation and response
- Timeout + circuit breaker for unreachable peers
- Trust tier system (local > verified > unknown)
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class TrustTier(str, Enum):
"""Trust level for remote agents."""
LOCAL = "local" # Same system, full trust
VERIFIED = "verified" # Authenticated, schema-validated
UNKNOWN = "unknown" # No verification, sandboxed
BLOCKED = "blocked" # Explicitly denied
@dataclass
class AgentCapability:
"""A single capability declared by an agent."""
name: str
description: str
input_schema: dict[str, Any] = field(default_factory=dict)
output_schema: dict[str, Any] = field(default_factory=dict)
@dataclass
class AgentCard:
"""
A2A Agent Card — declares an agent's identity, capabilities, and endpoint.
This is what gets published for other agents to discover and delegate to.
"""
agent_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
name: str = ""
description: str = ""
version: str = "1.0"
endpoint: str = "" # URL or local reference
capabilities: list[AgentCapability] = field(default_factory=list)
trust_tier: TrustTier = TrustTier.UNKNOWN
metadata: dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
def to_dict(self) -> dict[str, Any]:
return {
"agent_id": self.agent_id,
"name": self.name,
"description": self.description,
"version": self.version,
"endpoint": self.endpoint,
"capabilities": [
{"name": c.name, "description": c.description,
"input_schema": c.input_schema, "output_schema": c.output_schema}
for c in self.capabilities
],
"trust_tier": self.trust_tier.value,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentCard":
caps = [AgentCapability(**c) for c in d.get("capabilities", [])]
return cls(
agent_id=d.get("agent_id", ""),
name=d.get("name", ""),
description=d.get("description", ""),
version=d.get("version", "1.0"),
endpoint=d.get("endpoint", ""),
capabilities=caps,
trust_tier=TrustTier(d.get("trust_tier", "unknown")),
metadata=d.get("metadata", {}),
)
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
def has_capability(self, name: str) -> bool:
return any(c.name == name for c in self.capabilities)
@dataclass
class A2ATaskResult:
"""Result from a delegated A2A task."""
success: bool
output: dict[str, Any] = field(default_factory=dict)
error: str | None = None
duration_s: float = 0.0
agent_id: str = ""
class A2AClient:
"""
Client for delegating tasks to remote agents via A2A protocol.
Usage:
client = A2AClient()
client.register_peer(card)
result = client.delegate(
agent_id="remote_coder",
task="Write a fibonacci function",
timeout_s=30.0,
)
"""
def __init__(self, allowlist: list[str] | None = None):
self._peers: dict[str, AgentCard] = {}
self._allowlist = set(allowlist) if allowlist else None # None = allow all registered
self._circuit_breaker: dict[str, int] = {} # agent_id → consecutive failures
self._max_failures = 3
def register_peer(self, card: AgentCard) -> None:
"""Register a remote agent as a potential delegate."""
if card.trust_tier == TrustTier.BLOCKED:
logger.warning(f"A2A: rejected blocked agent '{card.name}'")
return
self._peers[card.agent_id] = card
logger.info(f"A2A: registered peer '{card.name}' ({card.agent_id}) trust={card.trust_tier.value}")
def delegate(
self,
agent_id: str,
task: str,
input_data: dict[str, Any] | None = None,
timeout_s: float = 30.0,
) -> A2ATaskResult:
"""
Delegate a task to a remote agent.
Returns A2ATaskResult with success/failure and output.
"""
card = self._peers.get(agent_id)
if not card:
return A2ATaskResult(success=False, error=f"Unknown agent: {agent_id}")
# Allowlist check
if self._allowlist and agent_id not in self._allowlist:
return A2ATaskResult(success=False, error=f"Agent '{agent_id}' not in allowlist")
# Circuit breaker
if self._circuit_breaker.get(agent_id, 0) >= self._max_failures:
return A2ATaskResult(
success=False,
error=f"Circuit breaker open for '{card.name}' (>{self._max_failures} failures)",
agent_id=agent_id,
)
t0 = time.time()
try:
result = self._execute_delegation(card, task, input_data or {}, timeout_s)
# Reset circuit breaker on success
self._circuit_breaker[agent_id] = 0
result.duration_s = time.time() - t0
result.agent_id = agent_id
return result
except Exception as e:
# Increment circuit breaker
self._circuit_breaker[agent_id] = self._circuit_breaker.get(agent_id, 0) + 1
return A2ATaskResult(
success=False,
error=f"Delegation failed: {e}",
duration_s=time.time() - t0,
agent_id=agent_id,
)
def _execute_delegation(
self, card: AgentCard, task: str, input_data: dict, timeout_s: float
) -> A2ATaskResult:
"""
Execute the actual delegation. Protocol skeleton.
Real implementation requires HTTP client for remote agents,
or direct function call for local agents.
"""
# For local trust tier, try direct invocation
if card.trust_tier == TrustTier.LOCAL and card.endpoint.startswith("local:"):
# Future: invoke local agent directly
pass
# Protocol skeleton — real transport needs purpose-agent[a2a] extra
return A2ATaskResult(
success=False,
error=f"A2A transport not available. Install: pip install purpose-agent[a2a]. "
f"Would delegate to: {card.endpoint}",
)
def list_peers(self) -> list[AgentCard]:
"""List all registered peer agents."""
return list(self._peers.values())
def find_capable(self, capability: str) -> list[AgentCard]:
"""Find peers that declare a specific capability."""
return [c for c in self._peers.values() if c.has_capability(capability)]
@property
def peer_count(self) -> int:
return len(self._peers)
def publish_card(
name: str,
description: str,
capabilities: list[dict[str, str]],
endpoint: str = "",
) -> AgentCard:
"""
Create and return an AgentCard for a local Purpose Agent.
Usage:
card = publish_card(
name="code_reviewer",
description="Reviews Python code for bugs and style",
capabilities=[
{"name": "code_review", "description": "Review Python code"},
{"name": "security_audit", "description": "Check for security issues"},
],
endpoint="local:code_reviewer",
)
"""
caps = [AgentCapability(name=c["name"], description=c.get("description", "")) for c in capabilities]
return AgentCard(
name=name,
description=description,
capabilities=caps,
endpoint=endpoint,
trust_tier=TrustTier.LOCAL,
)