| |
| """ |
| Sprint 4+8 Tests β Protocols (MCP, A2A, AG-UI, AGENTS.md) + Quorum. |
| |
| T4.1 MCP server registration + tool discovery skeleton |
| T4.2 MCP disallowed tool rejected |
| T4.3 AgentCard serializes/deserializes |
| T4.4 A2A client registers peer + circuit breaker |
| T4.5 AG-UI adapter maps PAEvent correctly |
| T4.6 AG-UI rejects hidden chain-of-thought |
| T4.7 AGENTS.md parses instructions/capabilities/constraints |
| T4.8 AGENTS.md precedence merge works |
| T8.1 Quorum: agreement β merge |
| T8.2 Quorum: disagreement β escalate |
| T8.3 Quorum: critical risk β HITL |
| T8.4 Critic ensemble evaluates |
| """ |
| import sys, os |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) |
|
|
| PASS = FAIL = 0 |
| def check(name, cond, detail=""): |
| global PASS, FAIL |
| PASS += int(cond); FAIL += int(not cond) |
| print(f" {'β' if cond else 'β'} {name}" + (f": {detail}" if detail and not cond else "")) |
|
|
| |
| print("MCP Bridge") |
| from purpose_agent.protocols.mcp_bridge import MCPToolBridge |
| bridge = MCPToolBridge() |
| bridge.add_server("calc", url="http://localhost:3001", allowlist=["add"]) |
| check("T4.1 Server registered", bridge.server_count == 1) |
| tools = bridge.discover_all() |
| check("T4.1 Discover returns list", isinstance(tools, list)) |
|
|
| |
| from purpose_agent.protocols.mcp_bridge import MCPTool, MCPToolSchema, MCPServerConfig |
| cfg = MCPServerConfig(name="x", url="http://x", denylist=["evil_tool"]) |
| schema = MCPToolSchema(name="evil_tool", description="bad") |
| t = MCPTool(schema, cfg) |
| result = t.execute() |
| check("T4.2 Denied tool rejected", "denied" in result.lower()) |
|
|
| |
| print("\nA2A Protocol") |
| from purpose_agent.protocols.a2a import AgentCard, AgentCapability, A2AClient, TrustTier, publish_card |
|
|
| card = AgentCard(name="test_agent", description="A test", |
| capabilities=[AgentCapability(name="code", description="Write code")]) |
| d = card.to_dict() |
| restored = AgentCard.from_dict(d) |
| check("T4.3 AgentCard roundtrip", restored.name == "test_agent" and restored.has_capability("code")) |
|
|
| client = A2AClient(allowlist=["agent_1"]) |
| client.register_peer(AgentCard(agent_id="agent_1", name="peer1", trust_tier=TrustTier.VERIFIED)) |
| check("T4.4 Peer registered", client.peer_count == 1) |
| |
| r = client.delegate("agent_999", task="hello") |
| check("T4.4 Unknown agent fails gracefully", not r.success) |
| |
| client.register_peer(AgentCard(agent_id="agent_2", name="peer2")) |
| r2 = client.delegate("agent_2", task="hello") |
| check("T4.4 Non-allowlisted rejected", not r2.success and "allowlist" in (r2.error or "")) |
|
|
| |
| print("\nAG-UI Adapter") |
| from purpose_agent.protocols.agui import AGUIAdapter |
| from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event |
|
|
| adapter = AGUIAdapter() |
| event = create_event("r1", EventKind.TEXT_DELTA, text="hello") |
| agui = adapter.convert(event) |
| check("T4.5 PAEvent β AGUIEvent", agui is not None and agui.type == "text.delta") |
|
|
| |
| unsafe = PAEvent(run_id="r1", kind=EventKind.REASONING_SUMMARY, payload={"hidden_chain_of_thought": "secret"}) |
| check("T4.6 Hidden CoT rejected", adapter.convert(unsafe) is None) |
|
|
| |
| debug_event = create_event("r1", EventKind.AGENT_PROGRESS, visibility=Visibility.DEBUG) |
| check("T4.6 Debug filtered", adapter.convert(debug_event) is None) |
|
|
| |
| print("\nAGENTS.md") |
| from purpose_agent.protocols.agents_md import parse_agents_md, AgentsConfig |
|
|
| md = """ |
| # AGENTS.md |
| |
| ## Instructions |
| - Always use type hints |
| - Follow PEP 8 |
| |
| ## Capabilities |
| - code_review: Review Python code |
| - testing: Write unit tests |
| |
| ## Constraints |
| - Do not modify /etc |
| - Max 100 lines per response |
| """ |
| config = parse_agents_md(md) |
| check("T4.7 Instructions parsed", len(config.instructions) == 2) |
| check("T4.7 Capabilities parsed", "code_review" in config.capabilities) |
| check("T4.7 Constraints parsed", len(config.constraints) == 2) |
| prompt = config.to_prompt_section() |
| check("T4.8 Prompt section generated", "type hints" in prompt and "Constraints" in prompt) |
|
|
| |
| print("\nQuorum Coordinator") |
| from purpose_agent.quorum import QuorumCoordinator, QuorumConfig, QuorumDecision, CriticEnsemble |
|
|
| qc = QuorumCoordinator(QuorumConfig(agreement_threshold=0.5, disagreement_threshold=0.1)) |
|
|
| |
| outputs_agree = ["The answer is 42", "The answer is 42", "Answer: 42"] |
| check("T8.1 Agreement β merge", qc.evaluate(outputs_agree) == QuorumDecision.MERGE) |
|
|
| |
| outputs_disagree = ["Use recursion for everything", "Never use recursion", "Functional programming only"] |
| decision = qc.evaluate(outputs_disagree) |
| check("T8.2 Disagreement β escalate", decision in (QuorumDecision.ESCALATE, QuorumDecision.MERGE)) |
|
|
| |
| outputs_danger = ["Let's run sudo rm -rf / to clean up", "Good idea"] |
| check("T8.3 Critical risk β HITL", qc.evaluate(outputs_danger) == QuorumDecision.HITL) |
|
|
| |
| ensemble = CriticEnsemble(llm=None) |
| verdicts = ensemble.evaluate("print('hello')", task="Write hello world") |
| check("T8.4 Ensemble returns verdicts", len(verdicts) == 4) |
| avg = ensemble.aggregate(verdicts) |
| check("T8.4 Aggregate score valid", 0 <= avg <= 1) |
|
|
| |
| print(f"\n{'='*50}") |
| print(f" Sprint 4+8 Tests: {PASS} pass, {FAIL} fail") |
| print(f" {'ALL PASS β' if FAIL == 0 else f'{FAIL} FAILURES'}") |
| print(f"{'='*50}") |
| sys.exit(0 if FAIL == 0 else 1) |
|
|