File size: 5,557 Bytes
f6a5e41 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | #!/usr/bin/env python3
"""
Sprint 4+8 Tests β Protocols (MCP, A2A, AG-UI, AGENTS.md) + Quorum.
T4.1 MCP server registration + tool discovery skeleton
T4.2 MCP disallowed tool rejected
T4.3 AgentCard serializes/deserializes
T4.4 A2A client registers peer + circuit breaker
T4.5 AG-UI adapter maps PAEvent correctly
T4.6 AG-UI rejects hidden chain-of-thought
T4.7 AGENTS.md parses instructions/capabilities/constraints
T4.8 AGENTS.md precedence merge works
T8.1 Quorum: agreement β merge
T8.2 Quorum: disagreement β escalate
T8.3 Quorum: critical risk β HITL
T8.4 Critic ensemble evaluates
"""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
PASS = FAIL = 0
def check(name, cond, detail=""):
global PASS, FAIL
PASS += int(cond); FAIL += int(not cond)
print(f" {'β' if cond else 'β'} {name}" + (f": {detail}" if detail and not cond else ""))
# βββ T4.1-T4.3: MCP βββ
print("MCP Bridge")
from purpose_agent.protocols.mcp_bridge import MCPToolBridge
bridge = MCPToolBridge()
bridge.add_server("calc", url="http://localhost:3001", allowlist=["add"])
check("T4.1 Server registered", bridge.server_count == 1)
tools = bridge.discover_all()
check("T4.1 Discover returns list", isinstance(tools, list))
# T4.2: Denied tool
from purpose_agent.protocols.mcp_bridge import MCPTool, MCPToolSchema, MCPServerConfig
cfg = MCPServerConfig(name="x", url="http://x", denylist=["evil_tool"])
schema = MCPToolSchema(name="evil_tool", description="bad")
t = MCPTool(schema, cfg)
result = t.execute()
check("T4.2 Denied tool rejected", "denied" in result.lower())
# βββ T4.3-T4.4: A2A βββ
print("\nA2A Protocol")
from purpose_agent.protocols.a2a import AgentCard, AgentCapability, A2AClient, TrustTier, publish_card
card = AgentCard(name="test_agent", description="A test",
capabilities=[AgentCapability(name="code", description="Write code")])
d = card.to_dict()
restored = AgentCard.from_dict(d)
check("T4.3 AgentCard roundtrip", restored.name == "test_agent" and restored.has_capability("code"))
client = A2AClient(allowlist=["agent_1"])
client.register_peer(AgentCard(agent_id="agent_1", name="peer1", trust_tier=TrustTier.VERIFIED))
check("T4.4 Peer registered", client.peer_count == 1)
# Try delegate to unregistered agent
r = client.delegate("agent_999", task="hello")
check("T4.4 Unknown agent fails gracefully", not r.success)
# Try delegate to non-allowlisted
client.register_peer(AgentCard(agent_id="agent_2", name="peer2"))
r2 = client.delegate("agent_2", task="hello")
check("T4.4 Non-allowlisted rejected", not r2.success and "allowlist" in (r2.error or ""))
# βββ T4.5-T4.6: AG-UI βββ
print("\nAG-UI Adapter")
from purpose_agent.protocols.agui import AGUIAdapter
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event
adapter = AGUIAdapter()
event = create_event("r1", EventKind.TEXT_DELTA, text="hello")
agui = adapter.convert(event)
check("T4.5 PAEvent β AGUIEvent", agui is not None and agui.type == "text.delta")
# Hidden CoT rejected
unsafe = PAEvent(run_id="r1", kind=EventKind.REASONING_SUMMARY, payload={"hidden_chain_of_thought": "secret"})
check("T4.6 Hidden CoT rejected", adapter.convert(unsafe) is None)
# Debug visibility filtered
debug_event = create_event("r1", EventKind.AGENT_PROGRESS, visibility=Visibility.DEBUG)
check("T4.6 Debug filtered", adapter.convert(debug_event) is None)
# βββ T4.7-T4.8: AGENTS.md βββ
print("\nAGENTS.md")
from purpose_agent.protocols.agents_md import parse_agents_md, AgentsConfig
md = """
# AGENTS.md
## Instructions
- Always use type hints
- Follow PEP 8
## Capabilities
- code_review: Review Python code
- testing: Write unit tests
## Constraints
- Do not modify /etc
- Max 100 lines per response
"""
config = parse_agents_md(md)
check("T4.7 Instructions parsed", len(config.instructions) == 2)
check("T4.7 Capabilities parsed", "code_review" in config.capabilities)
check("T4.7 Constraints parsed", len(config.constraints) == 2)
prompt = config.to_prompt_section()
check("T4.8 Prompt section generated", "type hints" in prompt and "Constraints" in prompt)
# βββ T8.1-T8.4: Quorum βββ
print("\nQuorum Coordinator")
from purpose_agent.quorum import QuorumCoordinator, QuorumConfig, QuorumDecision, CriticEnsemble
qc = QuorumCoordinator(QuorumConfig(agreement_threshold=0.5, disagreement_threshold=0.1))
# Agreement
outputs_agree = ["The answer is 42", "The answer is 42", "Answer: 42"]
check("T8.1 Agreement β merge", qc.evaluate(outputs_agree) == QuorumDecision.MERGE)
# Disagreement
outputs_disagree = ["Use recursion for everything", "Never use recursion", "Functional programming only"]
decision = qc.evaluate(outputs_disagree)
check("T8.2 Disagreement β escalate", decision in (QuorumDecision.ESCALATE, QuorumDecision.MERGE))
# Critical risk
outputs_danger = ["Let's run sudo rm -rf / to clean up", "Good idea"]
check("T8.3 Critical risk β HITL", qc.evaluate(outputs_danger) == QuorumDecision.HITL)
# Critic ensemble
ensemble = CriticEnsemble(llm=None) # No LLM = defaults
verdicts = ensemble.evaluate("print('hello')", task="Write hello world")
check("T8.4 Ensemble returns verdicts", len(verdicts) == 4)
avg = ensemble.aggregate(verdicts)
check("T8.4 Aggregate score valid", 0 <= avg <= 1)
# βββ REPORT βββ
print(f"\n{'='*50}")
print(f" Sprint 4+8 Tests: {PASS} pass, {FAIL} fail")
print(f" {'ALL PASS β' if FAIL == 0 else f'{FAIL} FAILURES'}")
print(f"{'='*50}")
sys.exit(0 if FAIL == 0 else 1)
|