| """ |
| a2a.py — Agent-to-Agent protocol adapters. |
| |
| Implements the A2A standard for Purpose Agent: |
| - AgentCard: declares agent capabilities, schemas, and endpoints |
| - A2AClient: delegate tasks to remote agents |
| - publish_card(): expose local agents as A2A-compatible services |
| |
| Security: |
| - Endpoint allowlist (only approved remote agents) |
| - Schema validation on delegation and response |
| - Timeout + circuit breaker for unreachable peers |
| - Trust tier system (local > verified > unknown) |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| import uuid |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TrustTier(str, Enum): |
| """Trust level for remote agents.""" |
| LOCAL = "local" |
| VERIFIED = "verified" |
| UNKNOWN = "unknown" |
| BLOCKED = "blocked" |
|
|
|
|
| @dataclass |
| class AgentCapability: |
| """A single capability declared by an agent.""" |
| name: str |
| description: str |
| input_schema: dict[str, Any] = field(default_factory=dict) |
| output_schema: dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class AgentCard: |
| """ |
| A2A Agent Card — declares an agent's identity, capabilities, and endpoint. |
| |
| This is what gets published for other agents to discover and delegate to. |
| """ |
| agent_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) |
| name: str = "" |
| description: str = "" |
| version: str = "1.0" |
| endpoint: str = "" |
| capabilities: list[AgentCapability] = field(default_factory=list) |
| trust_tier: TrustTier = TrustTier.UNKNOWN |
| metadata: dict[str, Any] = field(default_factory=dict) |
| created_at: float = field(default_factory=time.time) |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "agent_id": self.agent_id, |
| "name": self.name, |
| "description": self.description, |
| "version": self.version, |
| "endpoint": self.endpoint, |
| "capabilities": [ |
| {"name": c.name, "description": c.description, |
| "input_schema": c.input_schema, "output_schema": c.output_schema} |
| for c in self.capabilities |
| ], |
| "trust_tier": self.trust_tier.value, |
| "metadata": self.metadata, |
| } |
|
|
| @classmethod |
| def from_dict(cls, d: dict[str, Any]) -> "AgentCard": |
| caps = [AgentCapability(**c) for c in d.get("capabilities", [])] |
| return cls( |
| agent_id=d.get("agent_id", ""), |
| name=d.get("name", ""), |
| description=d.get("description", ""), |
| version=d.get("version", "1.0"), |
| endpoint=d.get("endpoint", ""), |
| capabilities=caps, |
| trust_tier=TrustTier(d.get("trust_tier", "unknown")), |
| metadata=d.get("metadata", {}), |
| ) |
|
|
| def to_json(self) -> str: |
| return json.dumps(self.to_dict(), indent=2) |
|
|
| def has_capability(self, name: str) -> bool: |
| return any(c.name == name for c in self.capabilities) |
|
|
|
|
| @dataclass |
| class A2ATaskResult: |
| """Result from a delegated A2A task.""" |
| success: bool |
| output: dict[str, Any] = field(default_factory=dict) |
| error: str | None = None |
| duration_s: float = 0.0 |
| agent_id: str = "" |
|
|
|
|
| class A2AClient: |
| """ |
| Client for delegating tasks to remote agents via A2A protocol. |
| |
| Usage: |
| client = A2AClient() |
| client.register_peer(card) |
| |
| result = client.delegate( |
| agent_id="remote_coder", |
| task="Write a fibonacci function", |
| timeout_s=30.0, |
| ) |
| """ |
|
|
| def __init__(self, allowlist: list[str] | None = None): |
| self._peers: dict[str, AgentCard] = {} |
| self._allowlist = set(allowlist) if allowlist else None |
| self._circuit_breaker: dict[str, int] = {} |
| self._max_failures = 3 |
|
|
| def register_peer(self, card: AgentCard) -> None: |
| """Register a remote agent as a potential delegate.""" |
| if card.trust_tier == TrustTier.BLOCKED: |
| logger.warning(f"A2A: rejected blocked agent '{card.name}'") |
| return |
| self._peers[card.agent_id] = card |
| logger.info(f"A2A: registered peer '{card.name}' ({card.agent_id}) trust={card.trust_tier.value}") |
|
|
| def delegate( |
| self, |
| agent_id: str, |
| task: str, |
| input_data: dict[str, Any] | None = None, |
| timeout_s: float = 30.0, |
| ) -> A2ATaskResult: |
| """ |
| Delegate a task to a remote agent. |
| |
| Returns A2ATaskResult with success/failure and output. |
| """ |
| card = self._peers.get(agent_id) |
| if not card: |
| return A2ATaskResult(success=False, error=f"Unknown agent: {agent_id}") |
|
|
| |
| if self._allowlist and agent_id not in self._allowlist: |
| return A2ATaskResult(success=False, error=f"Agent '{agent_id}' not in allowlist") |
|
|
| |
| if self._circuit_breaker.get(agent_id, 0) >= self._max_failures: |
| return A2ATaskResult( |
| success=False, |
| error=f"Circuit breaker open for '{card.name}' (>{self._max_failures} failures)", |
| agent_id=agent_id, |
| ) |
|
|
| t0 = time.time() |
| try: |
| result = self._execute_delegation(card, task, input_data or {}, timeout_s) |
| |
| self._circuit_breaker[agent_id] = 0 |
| result.duration_s = time.time() - t0 |
| result.agent_id = agent_id |
| return result |
| except Exception as e: |
| |
| self._circuit_breaker[agent_id] = self._circuit_breaker.get(agent_id, 0) + 1 |
| return A2ATaskResult( |
| success=False, |
| error=f"Delegation failed: {e}", |
| duration_s=time.time() - t0, |
| agent_id=agent_id, |
| ) |
|
|
| def _execute_delegation( |
| self, card: AgentCard, task: str, input_data: dict, timeout_s: float |
| ) -> A2ATaskResult: |
| """ |
| Execute the actual delegation. Protocol skeleton. |
| |
| Real implementation requires HTTP client for remote agents, |
| or direct function call for local agents. |
| """ |
| |
| if card.trust_tier == TrustTier.LOCAL and card.endpoint.startswith("local:"): |
| |
| pass |
|
|
| |
| return A2ATaskResult( |
| success=False, |
| error=f"A2A transport not available. Install: pip install purpose-agent[a2a]. " |
| f"Would delegate to: {card.endpoint}", |
| ) |
|
|
| def list_peers(self) -> list[AgentCard]: |
| """List all registered peer agents.""" |
| return list(self._peers.values()) |
|
|
| def find_capable(self, capability: str) -> list[AgentCard]: |
| """Find peers that declare a specific capability.""" |
| return [c for c in self._peers.values() if c.has_capability(capability)] |
|
|
| @property |
| def peer_count(self) -> int: |
| return len(self._peers) |
|
|
|
|
| def publish_card( |
| name: str, |
| description: str, |
| capabilities: list[dict[str, str]], |
| endpoint: str = "", |
| ) -> AgentCard: |
| """ |
| Create and return an AgentCard for a local Purpose Agent. |
| |
| Usage: |
| card = publish_card( |
| name="code_reviewer", |
| description="Reviews Python code for bugs and style", |
| capabilities=[ |
| {"name": "code_review", "description": "Review Python code"}, |
| {"name": "security_audit", "description": "Check for security issues"}, |
| ], |
| endpoint="local:code_reviewer", |
| ) |
| """ |
| caps = [AgentCapability(name=c["name"], description=c.get("description", "")) for c in capabilities] |
| return AgentCard( |
| name=name, |
| description=description, |
| capabilities=caps, |
| endpoint=endpoint, |
| trust_tier=TrustTier.LOCAL, |
| ) |
|
|