| |
| """ |
| SRE Regression Tests β Verify all 5 critical vulnerability patches work. |
| |
| These test the EXACT failure scenarios from the SRE audit: |
| S1: Dict iteration during modification (MemoryStore) |
| S2: UNKNOWN action propagation (Actor) |
| S3: Context overflow from heuristic bloat (Actor prompt) |
| S4: Race condition in parallel swarm (ExperienceReplay) |
| S5: None score crash in trajectory math (Trajectory) |
| """ |
| import sys, os, json, threading |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) |
|
|
| PASS = FAIL = 0 |
| def check(name, cond, detail=""): |
| global PASS, FAIL |
| PASS += int(cond); FAIL += int(not cond) |
| print(f" {'β' if cond else 'β'} {name}" + (f": {detail}" if detail and not cond else "")) |
|
|
| |
| import purpose_agent as pa |
| from purpose_agent.types import State, Action, Trajectory, TrajectoryStep, PurposeScore, Heuristic, MemoryTier |
| from purpose_agent.memory import MemoryStore, MemoryCard, MemoryKind, MemoryStatus |
| from purpose_agent.v2_types import MemoryScope |
|
|
| print("βββ SRE Scenario 1: Dict Snapshot During Iteration βββ") |
| store = MemoryStore() |
| for i in range(100): |
| store.add(MemoryCard(kind=MemoryKind.SKILL_CARD, status=MemoryStatus.PROMOTED, |
| pattern=f"p{i}", strategy=f"s{i}")) |
|
|
| |
| def modify_during_retrieve(): |
| """Add cards while retrieve is iterating β should NOT crash.""" |
| for i in range(100, 200): |
| store.add(MemoryCard(kind=MemoryKind.SKILL_CARD, status=MemoryStatus.PROMOTED, |
| pattern=f"p{i}", strategy=f"s{i}")) |
|
|
| t = threading.Thread(target=modify_during_retrieve) |
| t.start() |
| try: |
| results = store.retrieve("test query", top_k=10) |
| check("S1 No RuntimeError during concurrent modify", True) |
| except RuntimeError as e: |
| check("S1 No RuntimeError during concurrent modify", False, str(e)) |
| t.join() |
|
|
| print("\nβββ SRE Scenario 2: UNKNOWN Action Rejection βββ") |
| mock = pa.MockLLMBackend() |
| |
| mock.register_handler("goal-directed agent", "totally unparseable garbage !@#$%") |
| mock.set_structured_default({"phi_before":0,"phi_after":0,"reasoning":"x","evidence":"x","confidence":0.5}) |
|
|
| from purpose_agent.orchestrator import SimpleEnvironment |
| env = SimpleEnvironment(execute_fn=lambda a,s: State(data={"x":1})) |
| orch = pa.Orchestrator(llm=mock, environment=env, available_actions={"test":"test","DONE":"done"}, critic_mode="standard") |
|
|
| try: |
| r = orch.run_task(purpose="test", max_steps=2) |
| |
| check("S2 No crash on garbage LLM output", True) |
| |
| if r.trajectory.steps: |
| last_action = r.trajectory.steps[-1].action.name |
| check("S2 UNKNOWN β DONE fallback", last_action == "DONE", f"got {last_action}") |
| else: |
| check("S2 Has steps", False, "no steps recorded") |
| except Exception as e: |
| check("S2 No crash on garbage LLM output", False, f"{type(e).__name__}: {e}") |
|
|
| print("\nβββ SRE Scenario 3: Heuristic Cap (Context Overflow Prevention) βββ") |
| mock2 = pa.MockLLMBackend() |
| mock2.register_handler("goal-directed agent", json.dumps({"thought":"t","action":{"name":"DONE","params":{}},"expected_delta":"d"})) |
| mock2.set_structured_default({"phi_before":0,"phi_after":5,"reasoning":"r","evidence":"e","confidence":0.7}) |
|
|
| env2 = SimpleEnvironment(execute_fn=lambda a,s: State(data={})) |
| orch2 = pa.Orchestrator(llm=mock2, environment=env2, available_actions={"DONE":"done"}, critic_mode="standard") |
|
|
| |
| for i in range(200): |
| orch2.optimizer.heuristic_library.append(Heuristic( |
| pattern=f"Pattern {i} " * 10, strategy=f"Strategy {i} " * 10, |
| steps=[], tier=MemoryTier.STRATEGIC, q_value=0.5 + i*0.001, |
| )) |
| orch2.sync_memory() |
|
|
| |
| prompt = orch2.actor._build_system_prompt() |
| |
| heuristic_lines = [l for l in prompt.split("\n") if l.strip().startswith("- When:") or l.strip().startswith("- Pattern")] |
| check("S3 Heuristics capped", len(heuristic_lines) <= 10, f"got {len(heuristic_lines)} (should be β€10)") |
| check("S3 Prompt not massive", len(prompt) < 5000, f"prompt is {len(prompt)} chars") |
|
|
| print("\nβββ SRE Scenario 4: Thread-Safe ExperienceReplay βββ") |
| er = pa.ExperienceReplay(capacity=100) |
| errors = [] |
|
|
| def add_many(start): |
| for i in range(50): |
| try: |
| t = Trajectory(task_description=f"task_{start}_{i}", purpose=f"p_{start}_{i}") |
| t.steps.append(TrajectoryStep( |
| state_before=State(data={}), action=Action(name="x"), |
| state_after=State(data={"i": i}), |
| score=PurposeScore(phi_before=0, phi_after=5, delta=5, reasoning="r", evidence="e", confidence=0.8), |
| )) |
| er.add(t) |
| except Exception as e: |
| errors.append(str(e)) |
|
|
| |
| threads = [threading.Thread(target=add_many, args=(j*100,)) for j in range(4)] |
| for t in threads: t.start() |
| for t in threads: t.join() |
|
|
| check("S4 No errors in concurrent add", len(errors) == 0, f"{len(errors)} errors") |
| check("S4 All items added", er.size > 0, f"size={er.size}") |
|
|
| print("\nβββ SRE Scenario 5: None Score Guard βββ") |
| t = Trajectory(task_description="test", purpose="test") |
| |
| t.steps.append(TrajectoryStep( |
| state_before=State(data={}), action=Action(name="x"), |
| state_after=State(data={}), score=None, |
| )) |
| t.steps.append(TrajectoryStep( |
| state_before=State(data={}), action=Action(name="y"), |
| state_after=State(data={}), |
| score=PurposeScore(phi_before=0, phi_after=7, delta=7, reasoning="r", evidence="e", confidence=0.9), |
| )) |
|
|
| try: |
| cr = t.cumulative_reward |
| check("S5 cumulative_reward with None score", isinstance(cr, float), f"got {cr}") |
| except TypeError as e: |
| check("S5 cumulative_reward with None score", False, str(e)) |
|
|
| try: |
| td = t.total_delta |
| check("S5 total_delta with None score", isinstance(td, float), f"got {td}") |
| except TypeError as e: |
| check("S5 total_delta with None score", False, str(e)) |
|
|
| try: |
| sr = t.success_rate |
| check("S5 success_rate with None score", isinstance(sr, float)) |
| except TypeError as e: |
| check("S5 success_rate with None score", False, str(e)) |
|
|
| try: |
| fp = t.final_phi |
| check("S5 final_phi with None score", fp == 7.0 or fp is not None, f"got {fp}") |
| except (TypeError, AttributeError) as e: |
| check("S5 final_phi with None score", False, str(e)) |
|
|
| |
| print(f"\n{'='*50}") |
| print(f" SRE Regression: {PASS} pass, {FAIL} fail") |
| print(f" {'ALL PASS β' if FAIL == 0 else f'{FAIL} FAILURES β CRITICAL'}") |
| print(f"{'='*50}") |
| sys.exit(0 if FAIL == 0 else 1) |
|
|