File size: 6,924 Bytes
0cb26a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
"""
SRE Regression Tests β€” Verify all 5 critical vulnerability patches work.

These test the EXACT failure scenarios from the SRE audit:
  S1: Dict iteration during modification (MemoryStore)
  S2: UNKNOWN action propagation (Actor)
  S3: Context overflow from heuristic bloat (Actor prompt)
  S4: Race condition in parallel swarm (ExperienceReplay)
  S5: None score crash in trajectory math (Trajectory)
"""
import sys, os, json, threading
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

PASS = FAIL = 0
def check(name, cond, detail=""):
    global PASS, FAIL
    PASS += int(cond); FAIL += int(not cond)
    print(f"  {'βœ“' if cond else 'βœ—'} {name}" + (f": {detail}" if detail and not cond else ""))

# Import triggers sre_patches.apply_all()
import purpose_agent as pa
from purpose_agent.types import State, Action, Trajectory, TrajectoryStep, PurposeScore, Heuristic, MemoryTier
from purpose_agent.memory import MemoryStore, MemoryCard, MemoryKind, MemoryStatus
from purpose_agent.v2_types import MemoryScope

print("═══ SRE Scenario 1: Dict Snapshot During Iteration ═══")
store = MemoryStore()
for i in range(100):
    store.add(MemoryCard(kind=MemoryKind.SKILL_CARD, status=MemoryStatus.PROMOTED,
        pattern=f"p{i}", strategy=f"s{i}"))

# Simulate concurrent modification during retrieval
def modify_during_retrieve():
    """Add cards while retrieve is iterating β€” should NOT crash."""
    for i in range(100, 200):
        store.add(MemoryCard(kind=MemoryKind.SKILL_CARD, status=MemoryStatus.PROMOTED,
            pattern=f"p{i}", strategy=f"s{i}"))

t = threading.Thread(target=modify_during_retrieve)
t.start()
try:
    results = store.retrieve("test query", top_k=10)
    check("S1 No RuntimeError during concurrent modify", True)
except RuntimeError as e:
    check("S1 No RuntimeError during concurrent modify", False, str(e))
t.join()

print("\n═══ SRE Scenario 2: UNKNOWN Action Rejection ═══")
mock = pa.MockLLMBackend()
# Return garbage that parser can't handle
mock.register_handler("goal-directed agent", "totally unparseable garbage !@#$%")
mock.set_structured_default({"phi_before":0,"phi_after":0,"reasoning":"x","evidence":"x","confidence":0.5})

from purpose_agent.orchestrator import SimpleEnvironment
env = SimpleEnvironment(execute_fn=lambda a,s: State(data={"x":1}))
orch = pa.Orchestrator(llm=mock, environment=env, available_actions={"test":"test","DONE":"done"}, critic_mode="standard")

try:
    r = orch.run_task(purpose="test", max_steps=2)
    # Should get DONE (from UNKNOWN rejection) not crash
    check("S2 No crash on garbage LLM output", True)
    # The action should have been converted to DONE
    if r.trajectory.steps:
        last_action = r.trajectory.steps[-1].action.name
        check("S2 UNKNOWN β†’ DONE fallback", last_action == "DONE", f"got {last_action}")
    else:
        check("S2 Has steps", False, "no steps recorded")
except Exception as e:
    check("S2 No crash on garbage LLM output", False, f"{type(e).__name__}: {e}")

print("\n═══ SRE Scenario 3: Heuristic Cap (Context Overflow Prevention) ═══")
mock2 = pa.MockLLMBackend()
mock2.register_handler("goal-directed agent", json.dumps({"thought":"t","action":{"name":"DONE","params":{}},"expected_delta":"d"}))
mock2.set_structured_default({"phi_before":0,"phi_after":5,"reasoning":"r","evidence":"e","confidence":0.7})

env2 = SimpleEnvironment(execute_fn=lambda a,s: State(data={}))
orch2 = pa.Orchestrator(llm=mock2, environment=env2, available_actions={"DONE":"done"}, critic_mode="standard")

# Inject 200 heuristics (would overflow SLM context without cap)
for i in range(200):
    orch2.optimizer.heuristic_library.append(Heuristic(
        pattern=f"Pattern {i} " * 10, strategy=f"Strategy {i} " * 10,
        steps=[], tier=MemoryTier.STRATEGIC, q_value=0.5 + i*0.001,
    ))
orch2.sync_memory()

# Build the prompt β€” should be capped, not 200 entries
prompt = orch2.actor._build_system_prompt()
# Count heuristic entries in prompt
heuristic_lines = [l for l in prompt.split("\n") if l.strip().startswith("- When:") or l.strip().startswith("- Pattern")]
check("S3 Heuristics capped", len(heuristic_lines) <= 10, f"got {len(heuristic_lines)} (should be ≀10)")
check("S3 Prompt not massive", len(prompt) < 5000, f"prompt is {len(prompt)} chars")

print("\n═══ SRE Scenario 4: Thread-Safe ExperienceReplay ═══")
er = pa.ExperienceReplay(capacity=100)
errors = []

def add_many(start):
    for i in range(50):
        try:
            t = Trajectory(task_description=f"task_{start}_{i}", purpose=f"p_{start}_{i}")
            t.steps.append(TrajectoryStep(
                state_before=State(data={}), action=Action(name="x"),
                state_after=State(data={"i": i}),
                score=PurposeScore(phi_before=0, phi_after=5, delta=5, reasoning="r", evidence="e", confidence=0.8),
            ))
            er.add(t)
        except Exception as e:
            errors.append(str(e))

# 4 threads adding concurrently
threads = [threading.Thread(target=add_many, args=(j*100,)) for j in range(4)]
for t in threads: t.start()
for t in threads: t.join()

check("S4 No errors in concurrent add", len(errors) == 0, f"{len(errors)} errors")
check("S4 All items added", er.size > 0, f"size={er.size}")

print("\n═══ SRE Scenario 5: None Score Guard ═══")
t = Trajectory(task_description="test", purpose="test")
# Add steps with None scores (simulates HITL interrupt mid-eval)
t.steps.append(TrajectoryStep(
    state_before=State(data={}), action=Action(name="x"),
    state_after=State(data={}), score=None,  # None score!
))
t.steps.append(TrajectoryStep(
    state_before=State(data={}), action=Action(name="y"),
    state_after=State(data={}),
    score=PurposeScore(phi_before=0, phi_after=7, delta=7, reasoning="r", evidence="e", confidence=0.9),
))

try:
    cr = t.cumulative_reward
    check("S5 cumulative_reward with None score", isinstance(cr, float), f"got {cr}")
except TypeError as e:
    check("S5 cumulative_reward with None score", False, str(e))

try:
    td = t.total_delta
    check("S5 total_delta with None score", isinstance(td, float), f"got {td}")
except TypeError as e:
    check("S5 total_delta with None score", False, str(e))

try:
    sr = t.success_rate
    check("S5 success_rate with None score", isinstance(sr, float))
except TypeError as e:
    check("S5 success_rate with None score", False, str(e))

try:
    fp = t.final_phi
    check("S5 final_phi with None score", fp == 7.0 or fp is not None, f"got {fp}")
except (TypeError, AttributeError) as e:
    check("S5 final_phi with None score", False, str(e))

# ═══ REPORT ═══
print(f"\n{'='*50}")
print(f"  SRE Regression: {PASS} pass, {FAIL} fail")
print(f"  {'ALL PASS βœ“' if FAIL == 0 else f'{FAIL} FAILURES β€” CRITICAL'}")
print(f"{'='*50}")
sys.exit(0 if FAIL == 0 else 1)