File size: 5,557 Bytes
f6a5e41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
"""
Sprint 4+8 Tests β€” Protocols (MCP, A2A, AG-UI, AGENTS.md) + Quorum.

T4.1  MCP server registration + tool discovery skeleton
T4.2  MCP disallowed tool rejected
T4.3  AgentCard serializes/deserializes
T4.4  A2A client registers peer + circuit breaker
T4.5  AG-UI adapter maps PAEvent correctly
T4.6  AG-UI rejects hidden chain-of-thought
T4.7  AGENTS.md parses instructions/capabilities/constraints
T4.8  AGENTS.md precedence merge works
T8.1  Quorum: agreement β†’ merge
T8.2  Quorum: disagreement β†’ escalate
T8.3  Quorum: critical risk β†’ HITL
T8.4  Critic ensemble evaluates
"""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

PASS = FAIL = 0
def check(name, cond, detail=""):
    global PASS, FAIL
    PASS += int(cond); FAIL += int(not cond)
    print(f"  {'βœ“' if cond else 'βœ—'} {name}" + (f": {detail}" if detail and not cond else ""))

# ═══ T4.1-T4.3: MCP ═══
print("MCP Bridge")
from purpose_agent.protocols.mcp_bridge import MCPToolBridge
bridge = MCPToolBridge()
bridge.add_server("calc", url="http://localhost:3001", allowlist=["add"])
check("T4.1 Server registered", bridge.server_count == 1)
tools = bridge.discover_all()
check("T4.1 Discover returns list", isinstance(tools, list))

# T4.2: Denied tool
from purpose_agent.protocols.mcp_bridge import MCPTool, MCPToolSchema, MCPServerConfig
cfg = MCPServerConfig(name="x", url="http://x", denylist=["evil_tool"])
schema = MCPToolSchema(name="evil_tool", description="bad")
t = MCPTool(schema, cfg)
result = t.execute()
check("T4.2 Denied tool rejected", "denied" in result.lower())

# ═══ T4.3-T4.4: A2A ═══
print("\nA2A Protocol")
from purpose_agent.protocols.a2a import AgentCard, AgentCapability, A2AClient, TrustTier, publish_card

card = AgentCard(name="test_agent", description="A test",
    capabilities=[AgentCapability(name="code", description="Write code")])
d = card.to_dict()
restored = AgentCard.from_dict(d)
check("T4.3 AgentCard roundtrip", restored.name == "test_agent" and restored.has_capability("code"))

client = A2AClient(allowlist=["agent_1"])
client.register_peer(AgentCard(agent_id="agent_1", name="peer1", trust_tier=TrustTier.VERIFIED))
check("T4.4 Peer registered", client.peer_count == 1)
# Try delegate to unregistered agent
r = client.delegate("agent_999", task="hello")
check("T4.4 Unknown agent fails gracefully", not r.success)
# Try delegate to non-allowlisted
client.register_peer(AgentCard(agent_id="agent_2", name="peer2"))
r2 = client.delegate("agent_2", task="hello")
check("T4.4 Non-allowlisted rejected", not r2.success and "allowlist" in (r2.error or ""))

# ═══ T4.5-T4.6: AG-UI ═══
print("\nAG-UI Adapter")
from purpose_agent.protocols.agui import AGUIAdapter
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, create_event

adapter = AGUIAdapter()
event = create_event("r1", EventKind.TEXT_DELTA, text="hello")
agui = adapter.convert(event)
check("T4.5 PAEvent β†’ AGUIEvent", agui is not None and agui.type == "text.delta")

# Hidden CoT rejected
unsafe = PAEvent(run_id="r1", kind=EventKind.REASONING_SUMMARY, payload={"hidden_chain_of_thought": "secret"})
check("T4.6 Hidden CoT rejected", adapter.convert(unsafe) is None)

# Debug visibility filtered
debug_event = create_event("r1", EventKind.AGENT_PROGRESS, visibility=Visibility.DEBUG)
check("T4.6 Debug filtered", adapter.convert(debug_event) is None)

# ═══ T4.7-T4.8: AGENTS.md ═══
print("\nAGENTS.md")
from purpose_agent.protocols.agents_md import parse_agents_md, AgentsConfig

md = """
# AGENTS.md

## Instructions
- Always use type hints
- Follow PEP 8

## Capabilities
- code_review: Review Python code
- testing: Write unit tests

## Constraints
- Do not modify /etc
- Max 100 lines per response
"""
config = parse_agents_md(md)
check("T4.7 Instructions parsed", len(config.instructions) == 2)
check("T4.7 Capabilities parsed", "code_review" in config.capabilities)
check("T4.7 Constraints parsed", len(config.constraints) == 2)
prompt = config.to_prompt_section()
check("T4.8 Prompt section generated", "type hints" in prompt and "Constraints" in prompt)

# ═══ T8.1-T8.4: Quorum ═══
print("\nQuorum Coordinator")
from purpose_agent.quorum import QuorumCoordinator, QuorumConfig, QuorumDecision, CriticEnsemble

qc = QuorumCoordinator(QuorumConfig(agreement_threshold=0.5, disagreement_threshold=0.1))

# Agreement
outputs_agree = ["The answer is 42", "The answer is 42", "Answer: 42"]
check("T8.1 Agreement β†’ merge", qc.evaluate(outputs_agree) == QuorumDecision.MERGE)

# Disagreement
outputs_disagree = ["Use recursion for everything", "Never use recursion", "Functional programming only"]
decision = qc.evaluate(outputs_disagree)
check("T8.2 Disagreement β†’ escalate", decision in (QuorumDecision.ESCALATE, QuorumDecision.MERGE))

# Critical risk
outputs_danger = ["Let's run sudo rm -rf / to clean up", "Good idea"]
check("T8.3 Critical risk β†’ HITL", qc.evaluate(outputs_danger) == QuorumDecision.HITL)

# Critic ensemble
ensemble = CriticEnsemble(llm=None)  # No LLM = defaults
verdicts = ensemble.evaluate("print('hello')", task="Write hello world")
check("T8.4 Ensemble returns verdicts", len(verdicts) == 4)
avg = ensemble.aggregate(verdicts)
check("T8.4 Aggregate score valid", 0 <= avg <= 1)

# ═══ REPORT ═══
print(f"\n{'='*50}")
print(f"  Sprint 4+8 Tests: {PASS} pass, {FAIL} fail")
print(f"  {'ALL PASS βœ“' if FAIL == 0 else f'{FAIL} FAILURES'}")
print(f"{'='*50}")
sys.exit(0 if FAIL == 0 else 1)