""" a2a.py — Agent-to-Agent protocol adapters. Implements the A2A standard for Purpose Agent: - AgentCard: declares agent capabilities, schemas, and endpoints - A2AClient: delegate tasks to remote agents - publish_card(): expose local agents as A2A-compatible services Security: - Endpoint allowlist (only approved remote agents) - Schema validation on delegation and response - Timeout + circuit breaker for unreachable peers - Trust tier system (local > verified > unknown) """ from __future__ import annotations import json import logging import time import uuid from dataclasses import dataclass, field from enum import Enum from typing import Any logger = logging.getLogger(__name__) class TrustTier(str, Enum): """Trust level for remote agents.""" LOCAL = "local" # Same system, full trust VERIFIED = "verified" # Authenticated, schema-validated UNKNOWN = "unknown" # No verification, sandboxed BLOCKED = "blocked" # Explicitly denied @dataclass class AgentCapability: """A single capability declared by an agent.""" name: str description: str input_schema: dict[str, Any] = field(default_factory=dict) output_schema: dict[str, Any] = field(default_factory=dict) @dataclass class AgentCard: """ A2A Agent Card — declares an agent's identity, capabilities, and endpoint. This is what gets published for other agents to discover and delegate to. """ agent_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) name: str = "" description: str = "" version: str = "1.0" endpoint: str = "" # URL or local reference capabilities: list[AgentCapability] = field(default_factory=list) trust_tier: TrustTier = TrustTier.UNKNOWN metadata: dict[str, Any] = field(default_factory=dict) created_at: float = field(default_factory=time.time) def to_dict(self) -> dict[str, Any]: return { "agent_id": self.agent_id, "name": self.name, "description": self.description, "version": self.version, "endpoint": self.endpoint, "capabilities": [ {"name": c.name, "description": c.description, "input_schema": c.input_schema, "output_schema": c.output_schema} for c in self.capabilities ], "trust_tier": self.trust_tier.value, "metadata": self.metadata, } @classmethod def from_dict(cls, d: dict[str, Any]) -> "AgentCard": caps = [AgentCapability(**c) for c in d.get("capabilities", [])] return cls( agent_id=d.get("agent_id", ""), name=d.get("name", ""), description=d.get("description", ""), version=d.get("version", "1.0"), endpoint=d.get("endpoint", ""), capabilities=caps, trust_tier=TrustTier(d.get("trust_tier", "unknown")), metadata=d.get("metadata", {}), ) def to_json(self) -> str: return json.dumps(self.to_dict(), indent=2) def has_capability(self, name: str) -> bool: return any(c.name == name for c in self.capabilities) @dataclass class A2ATaskResult: """Result from a delegated A2A task.""" success: bool output: dict[str, Any] = field(default_factory=dict) error: str | None = None duration_s: float = 0.0 agent_id: str = "" class A2AClient: """ Client for delegating tasks to remote agents via A2A protocol. Usage: client = A2AClient() client.register_peer(card) result = client.delegate( agent_id="remote_coder", task="Write a fibonacci function", timeout_s=30.0, ) """ def __init__(self, allowlist: list[str] | None = None): self._peers: dict[str, AgentCard] = {} self._allowlist = set(allowlist) if allowlist else None # None = allow all registered self._circuit_breaker: dict[str, int] = {} # agent_id → consecutive failures self._max_failures = 3 def register_peer(self, card: AgentCard) -> None: """Register a remote agent as a potential delegate.""" if card.trust_tier == TrustTier.BLOCKED: logger.warning(f"A2A: rejected blocked agent '{card.name}'") return self._peers[card.agent_id] = card logger.info(f"A2A: registered peer '{card.name}' ({card.agent_id}) trust={card.trust_tier.value}") def delegate( self, agent_id: str, task: str, input_data: dict[str, Any] | None = None, timeout_s: float = 30.0, ) -> A2ATaskResult: """ Delegate a task to a remote agent. Returns A2ATaskResult with success/failure and output. """ card = self._peers.get(agent_id) if not card: return A2ATaskResult(success=False, error=f"Unknown agent: {agent_id}") # Allowlist check if self._allowlist and agent_id not in self._allowlist: return A2ATaskResult(success=False, error=f"Agent '{agent_id}' not in allowlist") # Circuit breaker if self._circuit_breaker.get(agent_id, 0) >= self._max_failures: return A2ATaskResult( success=False, error=f"Circuit breaker open for '{card.name}' (>{self._max_failures} failures)", agent_id=agent_id, ) t0 = time.time() try: result = self._execute_delegation(card, task, input_data or {}, timeout_s) # Reset circuit breaker on success self._circuit_breaker[agent_id] = 0 result.duration_s = time.time() - t0 result.agent_id = agent_id return result except Exception as e: # Increment circuit breaker self._circuit_breaker[agent_id] = self._circuit_breaker.get(agent_id, 0) + 1 return A2ATaskResult( success=False, error=f"Delegation failed: {e}", duration_s=time.time() - t0, agent_id=agent_id, ) def _execute_delegation( self, card: AgentCard, task: str, input_data: dict, timeout_s: float ) -> A2ATaskResult: """ Execute the actual delegation. Protocol skeleton. Real implementation requires HTTP client for remote agents, or direct function call for local agents. """ # For local trust tier, try direct invocation if card.trust_tier == TrustTier.LOCAL and card.endpoint.startswith("local:"): # Future: invoke local agent directly pass # Protocol skeleton — real transport needs purpose-agent[a2a] extra return A2ATaskResult( success=False, error=f"A2A transport not available. Install: pip install purpose-agent[a2a]. " f"Would delegate to: {card.endpoint}", ) def list_peers(self) -> list[AgentCard]: """List all registered peer agents.""" return list(self._peers.values()) def find_capable(self, capability: str) -> list[AgentCard]: """Find peers that declare a specific capability.""" return [c for c in self._peers.values() if c.has_capability(capability)] @property def peer_count(self) -> int: return len(self._peers) def publish_card( name: str, description: str, capabilities: list[dict[str, str]], endpoint: str = "", ) -> AgentCard: """ Create and return an AgentCard for a local Purpose Agent. Usage: card = publish_card( name="code_reviewer", description="Reviews Python code for bugs and style", capabilities=[ {"name": "code_review", "description": "Review Python code"}, {"name": "security_audit", "description": "Check for security issues"}, ], endpoint="local:code_reviewer", ) """ caps = [AgentCapability(name=c["name"], description=c.get("description", "")) for c in capabilities] return AgentCard( name=name, description=description, capabilities=caps, endpoint=endpoint, trust_tier=TrustTier.LOCAL, )