| |
| """ |
| Sprint 3 Tests β Memory Homeostasis. |
| |
| T3.1 Add 10,000 memories; retrieval injects <= 500 estimated tokens |
| T3.2 Add 50 similar episodic cases; consolidation creates <= 5 skill cards |
| T3.3 All source memories remain recoverable by source_trace_id |
| T3.4 Active promoted memory never exceeds max_active_cards |
| T3.5 SLM 4k-context compile never exceeds reserved prompt budget |
| T3.6 Low-utility skill card hibernates after threshold |
| T3.7 Homeostasis auto-triggers on threshold |
| """ |
| import sys |
| import os |
| import time |
|
|
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) |
|
|
| PASS = 0 |
| FAIL = 0 |
|
|
| def check(name, condition, detail=""): |
| global PASS, FAIL |
| if condition: |
| PASS += 1 |
| print(f" β {name}") |
| else: |
| FAIL += 1 |
| print(f" β {name}" + (f": {detail}" if detail else "")) |
|
|
|
|
| from purpose_agent.memory import MemoryStore, MemoryCard, MemoryKind, MemoryStatus |
| from purpose_agent.v2_types import MemoryScope |
| from purpose_agent.memory_homeostasis import ( |
| MemoryBudget, MemoryArchive, ConsolidationEngine, QFunctionRetriever, MemoryHomeostasis, |
| ) |
|
|
|
|
| |
| print("T3.1: 10,000 memories β retrieval <= 500 tokens") |
|
|
| store1 = MemoryStore() |
| budget1 = MemoryBudget(max_active_cards=512, max_injected_tokens=500) |
|
|
| |
| for i in range(200): |
| store1.add(MemoryCard( |
| kind=MemoryKind.SKILL_CARD, |
| status=MemoryStatus.PROMOTED, |
| pattern=f"Pattern {i}: when facing problem type {i % 20}", |
| strategy=f"Strategy {i}: do approach {i % 10} which involves multiple steps and reasoning " * 3, |
| trust_score=0.5 + (i % 10) * 0.05, |
| utility_score=0.3 + (i % 7) * 0.1, |
| )) |
|
|
| retriever = QFunctionRetriever(store1, budget1) |
| results = retriever.retrieve("problem type 5") |
|
|
| |
| total_tokens = 0 |
| for card in results: |
| text = f"{card.pattern} {card.strategy} {' '.join(card.steps)}" |
| total_tokens += budget1.estimate_tokens(text) |
|
|
| check("Retrieval respects token budget", total_tokens <= 500, |
| f"injected {total_tokens} tokens (budget={budget1.max_injected_tokens})") |
| check("Returns some results", len(results) > 0, f"got {len(results)} cards") |
|
|
|
|
| |
| print("\nT3.2: 50 similar episodics β consolidated skill cards") |
|
|
| store2 = MemoryStore() |
| budget2 = MemoryBudget(max_active_cards=512) |
| archive2 = MemoryArchive() |
|
|
| |
| for i in range(50): |
| store2.add(MemoryCard( |
| kind=MemoryKind.EPISODIC_CASE, |
| status=MemoryStatus.PROMOTED, |
| pattern="when debugging python code", |
| strategy=f"Tried approach {i % 5}: {'add prints' if i%5==0 else 'check types' if i%5==1 else 'read error' if i%5==2 else 'use debugger' if i%5==3 else 'simplify'}", |
| source_trace_id=f"trace_{i}", |
| trust_score=0.6, |
| utility_score=0.4, |
| )) |
|
|
| engine2 = ConsolidationEngine(store2, archive2, budget2) |
| results2 = engine2.run() |
|
|
| |
| skill_cards = [c for c in store2.get_all() |
| if c.kind == MemoryKind.SKILL_CARD and c.status == MemoryStatus.PROMOTED] |
| check("Creates skill cards from clusters", len(skill_cards) > 0, f"got {len(skill_cards)}") |
| check("Creates <= 5 skill cards", len(skill_cards) <= 5, f"got {len(skill_cards)}") |
| check("Merged count > 0", results2["merged"] > 0, f"merged={results2['merged']}") |
|
|
|
|
| |
| print("\nT3.3: Archived sources recoverable by trace_id") |
|
|
| |
| recovered = archive2.recover_by_trace("trace_0") |
| check("Source recoverable by trace_id", len(recovered) > 0, f"found {len(recovered)}") |
|
|
| |
| check("Archive has entries", archive2.size > 0, f"archive size={archive2.size}") |
|
|
|
|
| |
| print("\nT3.4: Active promoted memory bounded") |
|
|
| store4 = MemoryStore() |
| budget4 = MemoryBudget(max_active_cards=20) |
| archive4 = MemoryArchive() |
|
|
| |
| for i in range(50): |
| store4.add(MemoryCard( |
| kind=MemoryKind.SKILL_CARD, |
| status=MemoryStatus.PROMOTED, |
| pattern=f"skill {i}", |
| strategy=f"do thing {i}", |
| utility_score=i * 0.02, |
| )) |
|
|
| engine4 = ConsolidationEngine(store4, archive4, budget4) |
| engine4.run() |
|
|
| active_count = len(store4.get_by_status(MemoryStatus.PROMOTED)) |
| check("Active <= max_active_cards", active_count <= budget4.max_active_cards, |
| f"active={active_count}, max={budget4.max_active_cards}") |
|
|
|
|
| |
| print("\nT3.5: SLM 4K context β never exceeds budget") |
|
|
| store5 = MemoryStore() |
| budget5 = MemoryBudget(max_injected_tokens=200) |
|
|
| for i in range(100): |
| store5.add(MemoryCard( |
| kind=MemoryKind.SKILL_CARD, |
| status=MemoryStatus.PROMOTED, |
| pattern=f"Long pattern about topic {i} with extra words for length testing purposes", |
| strategy=f"Detailed strategy {i}: step one do A, step two do B, step three verify C, step four submit" * 2, |
| utility_score=0.5 + (i % 5) * 0.1, |
| )) |
|
|
| retriever5 = QFunctionRetriever(store5, budget5) |
| results5 = retriever5.retrieve("topic 7") |
|
|
| total5 = sum(budget5.estimate_tokens(f"{c.pattern} {c.strategy}") for c in results5) |
| check("SLM budget respected (200 tokens)", total5 <= 200, |
| f"used {total5} tokens") |
| check("Still returns useful results", len(results5) > 0) |
|
|
|
|
| |
| print("\nT3.6: Low-utility skill hibernation") |
|
|
| store6 = MemoryStore() |
| budget6 = MemoryBudget() |
| archive6 = MemoryArchive() |
|
|
| |
| bad_skill = MemoryCard( |
| kind=MemoryKind.SKILL_CARD, |
| status=MemoryStatus.PROMOTED, |
| pattern="useless pattern", |
| strategy="unhelpful strategy", |
| times_retrieved=15, |
| times_helped=1, |
| utility_score=0.1, |
| ) |
| store6.add(bad_skill) |
|
|
| engine6 = ConsolidationEngine(store6, archive6, budget6) |
| engine6.run() |
|
|
| check("Low-utility skill hibernated", |
| store6.get(bad_skill.id).status == MemoryStatus.ARCHIVED) |
|
|
|
|
| |
| print("\nT3.7: Homeostasis auto-triggers") |
|
|
| store7 = MemoryStore() |
| budget7 = MemoryBudget(consolidation_threshold=5, max_active_cards=100) |
| homeostasis = MemoryHomeostasis(store7, budget7) |
|
|
| |
| for i in range(6): |
| store7.add(MemoryCard( |
| kind=MemoryKind.EPISODIC_CASE, |
| status=MemoryStatus.PROMOTED, |
| pattern=f"case {i}", |
| strategy=f"approach {i}", |
| )) |
| homeostasis.on_memory_added() |
|
|
| |
| check("Auto-trigger fired", homeostasis.consolidation._consolidation_count >= 1, |
| f"consolidations={homeostasis.consolidation._consolidation_count}") |
|
|
|
|
| |
| print("\nT3.x: Homeostasis stats") |
| stats = homeostasis.stats |
| check("Stats has active_cards", "active_cards" in stats) |
| check("Stats has archived", "archived" in stats) |
| check("Stats has utilization", "utilization" in stats) |
|
|
|
|
| |
| print(f"\n{'='*50}") |
| print(f" Sprint 3 Tests: {PASS} pass, {FAIL} fail") |
| print(f" {'ALL PASS β' if FAIL == 0 else f'{FAIL} FAILURES'}") |
| print(f"{'='*50}") |
| sys.exit(0 if FAIL == 0 else 1) |
|
|