purpose-agent / purpose_agent /memory_homeostasis.py
Rohan03's picture
Sprint 3: memory_homeostasis.py β€” budget, archive, consolidation, Q-retriever
4dc4204 verified
"""
memory_homeostasis.py β€” Bounded memory with consolidation, hibernation, and archive.
Solves: active memory must be bounded; archived evidence must remain recoverable.
Components:
- MemoryBudget: hard limits on active cards, injected tokens, per-kind caps
- MemoryArchive: append-only cold storage (JSONL or SQLite)
- ConsolidationEngine: cluster β†’ merge β†’ compress β†’ hibernate
- QFunctionRetriever: budget-aware ranking with recency decay and diversity
Triggers:
- On N new memories
- On active_cards > max_active_cards
- On injected_tokens > max_injected_tokens
- Manual: team.consolidate_memory()
Invariant: active injected memory NEVER exceeds token budget.
"""
from __future__ import annotations
import json
import logging
import math
import time
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore
from purpose_agent.v2_types import MemoryScope
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════
# Memory Budget
# ═══════════════════════════════════════════════════════════════
@dataclass
class MemoryBudget:
"""
Hard limits on active memory. Enforced by the homeostasis engine.
When any limit is exceeded, consolidation/archival is triggered automatically.
"""
max_active_cards: int = 512
max_injected_tokens: int = 500 # Max tokens from memory in any single prompt
max_cards_per_kind: dict[str, int] = field(default_factory=lambda: {
"skill_card": 100,
"episodic_case": 200,
"failure_pattern": 50,
"user_preference": 50,
"critic_calibration": 30,
"tool_policy": 30,
"purpose_contract": 10,
})
archive_after_days: int | None = 90 # Auto-archive unused cards after N days
consolidation_threshold: int = 50 # Trigger consolidation every N new memories
chars_per_token: int = 4 # For token estimation
def estimate_tokens(self, text: str) -> int:
return len(text) // self.chars_per_token
# ═══════════════════════════════════════════════════════════════
# Memory Archive β€” cold storage
# ═══════════════════════════════════════════════════════════════
class MemoryArchive:
"""
Append-only cold storage for archived memories.
Archived memories are never injected into prompts but remain
recoverable by source_trace_id for audit, replay, or re-promotion.
"""
def __init__(self, path: str | None = None):
self._path = Path(path) if path else None
self._archived: list[dict[str, Any]] = []
if self._path and self._path.exists():
self._load()
def archive(self, card: MemoryCard, reason: str = "") -> None:
"""Move a card to cold storage."""
entry = {
"id": card.id,
"kind": card.kind.value,
"pattern": card.pattern,
"strategy": card.strategy,
"content": card.content,
"source_trace_id": card.source_trace_id,
"trust_score": card.trust_score,
"utility_score": card.utility_score,
"times_retrieved": card.times_retrieved,
"archived_at": time.time(),
"reason": reason,
}
self._archived.append(entry)
if self._path:
self._append(entry)
def recover(self, card_id: str) -> dict[str, Any] | None:
"""Recover an archived card by ID."""
for entry in self._archived:
if entry["id"] == card_id:
return entry
return None
def recover_by_trace(self, trace_id: str) -> list[dict[str, Any]]:
"""Recover all archived cards from a specific trace."""
return [e for e in self._archived if e.get("source_trace_id") == trace_id]
@property
def size(self) -> int:
return len(self._archived)
def _append(self, entry: dict) -> None:
if not self._path:
return
self._path.parent.mkdir(parents=True, exist_ok=True)
with open(self._path, "a") as f:
f.write(json.dumps(entry, default=str) + "\n")
def _load(self) -> None:
if not self._path or not self._path.exists():
return
with open(self._path) as f:
for line in f:
line = line.strip()
if line:
try:
self._archived.append(json.loads(line))
except json.JSONDecodeError:
pass
# ═══════════════════════════════════════════════════════════════
# Consolidation Engine
# ═══════════════════════════════════════════════════════════════
class ConsolidationEngine:
"""
Clusters, merges, compresses, and hibernates memories.
Operations:
- cluster: group similar episodic_case cards by pattern similarity
- merge: promote repeated patterns into a single skill_card
- compress: shorten singleton low-utility cases to signatures
- hibernate: deactivate unused skill_cards (recoverable)
All operations preserve source_trace_id for audit trail.
"""
def __init__(self, store: MemoryStore, archive: MemoryArchive, budget: MemoryBudget):
self.store = store
self.archive = archive
self.budget = budget
self._consolidation_count = 0
def run(self) -> dict[str, int]:
"""
Run full consolidation cycle. Returns counts of actions taken.
"""
results = {"clustered": 0, "merged": 0, "compressed": 0, "hibernated": 0, "archived": 0}
# 1. Cluster similar episodic cases
results["merged"] = self._merge_similar_episodics()
# 2. Hibernate low-utility skills
results["hibernated"] = self._hibernate_unused()
# 3. Archive old cards if over budget
results["archived"] = self._archive_over_budget()
# 4. Enforce per-kind limits
results["archived"] += self._enforce_kind_limits()
self._consolidation_count += 1
logger.info(f"Consolidation #{self._consolidation_count}: {results}")
return results
def _merge_similar_episodics(self) -> int:
"""Merge similar episodic cases into skill cards."""
episodics = [c for c in self.store.get_all()
if c.kind == MemoryKind.EPISODIC_CASE and c.status == MemoryStatus.PROMOTED]
if len(episodics) < 3:
return 0
# Group by pattern similarity (simple: exact pattern match)
groups: dict[str, list[MemoryCard]] = defaultdict(list)
for card in episodics:
key = card.pattern.lower().strip()[:50] # Rough grouping key
groups[key].append(card)
merged = 0
for key, cards in groups.items():
if len(cards) >= 3:
# Merge into a skill card
avg_utility = sum(c.utility_score for c in cards) / len(cards)
merged_card = MemoryCard(
kind=MemoryKind.SKILL_CARD,
status=MemoryStatus.PROMOTED,
pattern=cards[0].pattern,
strategy=f"[CONSOLIDATED from {len(cards)} cases] " + cards[0].strategy,
trust_score=min(c.trust_score for c in cards),
utility_score=avg_utility,
source_trace_id=cards[0].source_trace_id,
created_by="consolidation",
)
self.store.add(merged_card)
# Archive the original episodics
for card in cards:
self.store.update_status(card.id, MemoryStatus.ARCHIVED, "consolidated")
self.archive.archive(card, f"merged into {merged_card.id}")
merged += 1
return merged
def _hibernate_unused(self) -> int:
"""Hibernate skill cards that haven't been useful."""
hibernated = 0
for card in self.store.get_all():
if card.status != MemoryStatus.PROMOTED:
continue
if card.kind != MemoryKind.SKILL_CARD:
continue
# Hibernate if: retrieved many times but rarely helped
if card.times_retrieved >= 10 and card.utility_score < 0.2:
self.store.update_status(card.id, MemoryStatus.ARCHIVED, "hibernated: low utility")
self.archive.archive(card, "hibernated")
hibernated += 1
return hibernated
def _archive_over_budget(self) -> int:
"""Archive lowest-utility cards when over max_active_cards."""
active = self.store.get_by_status(MemoryStatus.PROMOTED)
if len(active) <= self.budget.max_active_cards:
return 0
# Sort by utility (lowest first) and archive excess
active.sort(key=lambda c: c.utility_score)
excess = len(active) - self.budget.max_active_cards
archived = 0
for card in active[:excess]:
self.store.update_status(card.id, MemoryStatus.ARCHIVED, "budget: over max_active")
self.archive.archive(card, "budget overflow")
archived += 1
return archived
def _enforce_kind_limits(self) -> int:
"""Enforce per-kind card limits."""
archived = 0
for kind_str, limit in self.budget.max_cards_per_kind.items():
try:
kind = MemoryKind(kind_str)
except ValueError:
continue
cards = [c for c in self.store.get_all()
if c.kind == kind and c.status == MemoryStatus.PROMOTED]
if len(cards) <= limit:
continue
# Remove lowest utility
cards.sort(key=lambda c: c.utility_score)
for card in cards[:len(cards) - limit]:
self.store.update_status(card.id, MemoryStatus.ARCHIVED, f"kind_limit: {kind_str}")
self.archive.archive(card, f"kind limit ({kind_str})")
archived += 1
return archived
# ═══════════════════════════════════════════════════════════════
# Q-Function Retriever β€” budget-aware ranking
# ═══════════════════════════════════════════════════════════════
class QFunctionRetriever:
"""
Budget-aware memory retriever with multi-signal ranking.
score = relevance * trust * utility * recency_decay * scope_match * diversity_penalty
Guarantees: injected tokens NEVER exceed budget.max_injected_tokens.
"""
def __init__(self, store: MemoryStore, budget: MemoryBudget):
self.store = store
self.budget = budget
def retrieve(
self,
query: str,
scope: MemoryScope | None = None,
max_cards: int = 15,
) -> list[MemoryCard]:
"""
Retrieve memories ranked by composite score, bounded by token budget.
Returns only PROMOTED memories that fit within max_injected_tokens.
"""
candidates = self.store.retrieve(
query_text=query,
scope=scope,
statuses=[MemoryStatus.PROMOTED],
top_k=max_cards * 3, # Over-fetch for diversity filtering
)
# Re-rank with full Q-function
now = time.time()
scored = []
for card in candidates:
score = self._compute_score(card, query, now)
scored.append((score, card))
scored.sort(key=lambda x: -x[0])
# Select under token budget
selected = []
token_used = 0
seen_patterns: set[str] = set()
for score, card in scored:
# Diversity: skip near-duplicates
pattern_key = (card.pattern or card.content or "")[:30].lower()
if pattern_key in seen_patterns:
continue
seen_patterns.add(pattern_key)
# Token budget check
card_text = f"{card.pattern} {card.strategy} {' '.join(card.steps)}"
card_tokens = self.budget.estimate_tokens(card_text)
if token_used + card_tokens > self.budget.max_injected_tokens:
break
selected.append(card)
token_used += card_tokens
if len(selected) >= max_cards:
break
return selected
def _compute_score(self, card: MemoryCard, query: str, now: float) -> float:
"""
Composite Q-function score:
score = relevance * trust * utility * recency_decay
"""
# Base scores from card
trust = card.trust_score
utility = card.utility_score
# Relevance (already computed by store.retrieve, use utility as proxy)
relevance = 0.5 + utility * 0.5
# Recency decay: newer memories get slight boost
age_days = (now - card.created_at) / 86400
recency = max(0.3, 1.0 - (age_days / 365)) # Decay over a year
# Combine
score = relevance * trust * utility * recency
# Boost frequently successful cards
if card.times_retrieved > 0 and card.times_helped > 0:
help_rate = card.times_helped / card.times_retrieved
score *= (1.0 + help_rate * 0.5)
return score
# ═══════════════════════════════════════════════════════════════
# Homeostasis Controller β€” ties everything together
# ═══════════════════════════════════════════════════════════════
class MemoryHomeostasis:
"""
Main controller that keeps memory bounded and healthy.
Usage:
homeostasis = MemoryHomeostasis(store, budget=MemoryBudget(max_active_cards=256))
# After each task:
homeostasis.check_and_consolidate()
# Manual trigger:
homeostasis.force_consolidation()
# Budget-aware retrieval:
memories = homeostasis.retrieve("query", scope=scope)
"""
def __init__(
self,
store: MemoryStore,
budget: MemoryBudget | None = None,
archive_path: str | None = None,
):
self.store = store
self.budget = budget or MemoryBudget()
self.archive = MemoryArchive(archive_path)
self.consolidation = ConsolidationEngine(store, self.archive, self.budget)
self.retriever = QFunctionRetriever(store, self.budget)
self._new_since_consolidation = 0
def on_memory_added(self) -> None:
"""Called after a new memory is added. Triggers consolidation if threshold met."""
self._new_since_consolidation += 1
if self._new_since_consolidation >= self.budget.consolidation_threshold:
self.check_and_consolidate()
def check_and_consolidate(self) -> dict[str, int] | None:
"""Check if consolidation is needed and run it if so."""
active_count = len(self.store.get_by_status(MemoryStatus.PROMOTED))
if (active_count > self.budget.max_active_cards or
self._new_since_consolidation >= self.budget.consolidation_threshold):
self._new_since_consolidation = 0
return self.consolidation.run()
return None
def force_consolidation(self) -> dict[str, int]:
"""Force a consolidation cycle regardless of thresholds."""
self._new_since_consolidation = 0
return self.consolidation.run()
def retrieve(self, query: str, scope: MemoryScope | None = None, max_cards: int = 10) -> list[MemoryCard]:
"""Budget-aware retrieval. Guarantees token budget is respected."""
return self.retriever.retrieve(query, scope, max_cards)
@property
def stats(self) -> dict[str, Any]:
active = len(self.store.get_by_status(MemoryStatus.PROMOTED))
return {
"active_cards": active,
"max_active": self.budget.max_active_cards,
"utilization": f"{active/self.budget.max_active_cards:.0%}" if self.budget.max_active_cards else "0%",
"archived": self.archive.size,
"consolidations_run": self.consolidation._consolidation_count,
"new_since_last": self._new_since_consolidation,
}