""" memory_homeostasis.py — Bounded memory with consolidation, hibernation, and archive. Solves: active memory must be bounded; archived evidence must remain recoverable. Components: - MemoryBudget: hard limits on active cards, injected tokens, per-kind caps - MemoryArchive: append-only cold storage (JSONL or SQLite) - ConsolidationEngine: cluster → merge → compress → hibernate - QFunctionRetriever: budget-aware ranking with recency decay and diversity Triggers: - On N new memories - On active_cards > max_active_cards - On injected_tokens > max_injected_tokens - Manual: team.consolidate_memory() Invariant: active injected memory NEVER exceeds token budget. """ from __future__ import annotations import json import logging import math import time from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path from typing import Any from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore from purpose_agent.v2_types import MemoryScope logger = logging.getLogger(__name__) # ═══════════════════════════════════════════════════════════════ # Memory Budget # ═══════════════════════════════════════════════════════════════ @dataclass class MemoryBudget: """ Hard limits on active memory. Enforced by the homeostasis engine. When any limit is exceeded, consolidation/archival is triggered automatically. """ max_active_cards: int = 512 max_injected_tokens: int = 500 # Max tokens from memory in any single prompt max_cards_per_kind: dict[str, int] = field(default_factory=lambda: { "skill_card": 100, "episodic_case": 200, "failure_pattern": 50, "user_preference": 50, "critic_calibration": 30, "tool_policy": 30, "purpose_contract": 10, }) archive_after_days: int | None = 90 # Auto-archive unused cards after N days consolidation_threshold: int = 50 # Trigger consolidation every N new memories chars_per_token: int = 4 # For token estimation def estimate_tokens(self, text: str) -> int: return len(text) // self.chars_per_token # ═══════════════════════════════════════════════════════════════ # Memory Archive — cold storage # ═══════════════════════════════════════════════════════════════ class MemoryArchive: """ Append-only cold storage for archived memories. Archived memories are never injected into prompts but remain recoverable by source_trace_id for audit, replay, or re-promotion. """ def __init__(self, path: str | None = None): self._path = Path(path) if path else None self._archived: list[dict[str, Any]] = [] if self._path and self._path.exists(): self._load() def archive(self, card: MemoryCard, reason: str = "") -> None: """Move a card to cold storage.""" entry = { "id": card.id, "kind": card.kind.value, "pattern": card.pattern, "strategy": card.strategy, "content": card.content, "source_trace_id": card.source_trace_id, "trust_score": card.trust_score, "utility_score": card.utility_score, "times_retrieved": card.times_retrieved, "archived_at": time.time(), "reason": reason, } self._archived.append(entry) if self._path: self._append(entry) def recover(self, card_id: str) -> dict[str, Any] | None: """Recover an archived card by ID.""" for entry in self._archived: if entry["id"] == card_id: return entry return None def recover_by_trace(self, trace_id: str) -> list[dict[str, Any]]: """Recover all archived cards from a specific trace.""" return [e for e in self._archived if e.get("source_trace_id") == trace_id] @property def size(self) -> int: return len(self._archived) def _append(self, entry: dict) -> None: if not self._path: return self._path.parent.mkdir(parents=True, exist_ok=True) with open(self._path, "a") as f: f.write(json.dumps(entry, default=str) + "\n") def _load(self) -> None: if not self._path or not self._path.exists(): return with open(self._path) as f: for line in f: line = line.strip() if line: try: self._archived.append(json.loads(line)) except json.JSONDecodeError: pass # ═══════════════════════════════════════════════════════════════ # Consolidation Engine # ═══════════════════════════════════════════════════════════════ class ConsolidationEngine: """ Clusters, merges, compresses, and hibernates memories. Operations: - cluster: group similar episodic_case cards by pattern similarity - merge: promote repeated patterns into a single skill_card - compress: shorten singleton low-utility cases to signatures - hibernate: deactivate unused skill_cards (recoverable) All operations preserve source_trace_id for audit trail. """ def __init__(self, store: MemoryStore, archive: MemoryArchive, budget: MemoryBudget): self.store = store self.archive = archive self.budget = budget self._consolidation_count = 0 def run(self) -> dict[str, int]: """ Run full consolidation cycle. Returns counts of actions taken. """ results = {"clustered": 0, "merged": 0, "compressed": 0, "hibernated": 0, "archived": 0} # 1. Cluster similar episodic cases results["merged"] = self._merge_similar_episodics() # 2. Hibernate low-utility skills results["hibernated"] = self._hibernate_unused() # 3. Archive old cards if over budget results["archived"] = self._archive_over_budget() # 4. Enforce per-kind limits results["archived"] += self._enforce_kind_limits() self._consolidation_count += 1 logger.info(f"Consolidation #{self._consolidation_count}: {results}") return results def _merge_similar_episodics(self) -> int: """Merge similar episodic cases into skill cards.""" episodics = [c for c in self.store.get_all() if c.kind == MemoryKind.EPISODIC_CASE and c.status == MemoryStatus.PROMOTED] if len(episodics) < 3: return 0 # Group by pattern similarity (simple: exact pattern match) groups: dict[str, list[MemoryCard]] = defaultdict(list) for card in episodics: key = card.pattern.lower().strip()[:50] # Rough grouping key groups[key].append(card) merged = 0 for key, cards in groups.items(): if len(cards) >= 3: # Merge into a skill card avg_utility = sum(c.utility_score for c in cards) / len(cards) merged_card = MemoryCard( kind=MemoryKind.SKILL_CARD, status=MemoryStatus.PROMOTED, pattern=cards[0].pattern, strategy=f"[CONSOLIDATED from {len(cards)} cases] " + cards[0].strategy, trust_score=min(c.trust_score for c in cards), utility_score=avg_utility, source_trace_id=cards[0].source_trace_id, created_by="consolidation", ) self.store.add(merged_card) # Archive the original episodics for card in cards: self.store.update_status(card.id, MemoryStatus.ARCHIVED, "consolidated") self.archive.archive(card, f"merged into {merged_card.id}") merged += 1 return merged def _hibernate_unused(self) -> int: """Hibernate skill cards that haven't been useful.""" hibernated = 0 for card in self.store.get_all(): if card.status != MemoryStatus.PROMOTED: continue if card.kind != MemoryKind.SKILL_CARD: continue # Hibernate if: retrieved many times but rarely helped if card.times_retrieved >= 10 and card.utility_score < 0.2: self.store.update_status(card.id, MemoryStatus.ARCHIVED, "hibernated: low utility") self.archive.archive(card, "hibernated") hibernated += 1 return hibernated def _archive_over_budget(self) -> int: """Archive lowest-utility cards when over max_active_cards.""" active = self.store.get_by_status(MemoryStatus.PROMOTED) if len(active) <= self.budget.max_active_cards: return 0 # Sort by utility (lowest first) and archive excess active.sort(key=lambda c: c.utility_score) excess = len(active) - self.budget.max_active_cards archived = 0 for card in active[:excess]: self.store.update_status(card.id, MemoryStatus.ARCHIVED, "budget: over max_active") self.archive.archive(card, "budget overflow") archived += 1 return archived def _enforce_kind_limits(self) -> int: """Enforce per-kind card limits.""" archived = 0 for kind_str, limit in self.budget.max_cards_per_kind.items(): try: kind = MemoryKind(kind_str) except ValueError: continue cards = [c for c in self.store.get_all() if c.kind == kind and c.status == MemoryStatus.PROMOTED] if len(cards) <= limit: continue # Remove lowest utility cards.sort(key=lambda c: c.utility_score) for card in cards[:len(cards) - limit]: self.store.update_status(card.id, MemoryStatus.ARCHIVED, f"kind_limit: {kind_str}") self.archive.archive(card, f"kind limit ({kind_str})") archived += 1 return archived # ═══════════════════════════════════════════════════════════════ # Q-Function Retriever — budget-aware ranking # ═══════════════════════════════════════════════════════════════ class QFunctionRetriever: """ Budget-aware memory retriever with multi-signal ranking. score = relevance * trust * utility * recency_decay * scope_match * diversity_penalty Guarantees: injected tokens NEVER exceed budget.max_injected_tokens. """ def __init__(self, store: MemoryStore, budget: MemoryBudget): self.store = store self.budget = budget def retrieve( self, query: str, scope: MemoryScope | None = None, max_cards: int = 15, ) -> list[MemoryCard]: """ Retrieve memories ranked by composite score, bounded by token budget. Returns only PROMOTED memories that fit within max_injected_tokens. """ candidates = self.store.retrieve( query_text=query, scope=scope, statuses=[MemoryStatus.PROMOTED], top_k=max_cards * 3, # Over-fetch for diversity filtering ) # Re-rank with full Q-function now = time.time() scored = [] for card in candidates: score = self._compute_score(card, query, now) scored.append((score, card)) scored.sort(key=lambda x: -x[0]) # Select under token budget selected = [] token_used = 0 seen_patterns: set[str] = set() for score, card in scored: # Diversity: skip near-duplicates pattern_key = (card.pattern or card.content or "")[:30].lower() if pattern_key in seen_patterns: continue seen_patterns.add(pattern_key) # Token budget check card_text = f"{card.pattern} {card.strategy} {' '.join(card.steps)}" card_tokens = self.budget.estimate_tokens(card_text) if token_used + card_tokens > self.budget.max_injected_tokens: break selected.append(card) token_used += card_tokens if len(selected) >= max_cards: break return selected def _compute_score(self, card: MemoryCard, query: str, now: float) -> float: """ Composite Q-function score: score = relevance * trust * utility * recency_decay """ # Base scores from card trust = card.trust_score utility = card.utility_score # Relevance (already computed by store.retrieve, use utility as proxy) relevance = 0.5 + utility * 0.5 # Recency decay: newer memories get slight boost age_days = (now - card.created_at) / 86400 recency = max(0.3, 1.0 - (age_days / 365)) # Decay over a year # Combine score = relevance * trust * utility * recency # Boost frequently successful cards if card.times_retrieved > 0 and card.times_helped > 0: help_rate = card.times_helped / card.times_retrieved score *= (1.0 + help_rate * 0.5) return score # ═══════════════════════════════════════════════════════════════ # Homeostasis Controller — ties everything together # ═══════════════════════════════════════════════════════════════ class MemoryHomeostasis: """ Main controller that keeps memory bounded and healthy. Usage: homeostasis = MemoryHomeostasis(store, budget=MemoryBudget(max_active_cards=256)) # After each task: homeostasis.check_and_consolidate() # Manual trigger: homeostasis.force_consolidation() # Budget-aware retrieval: memories = homeostasis.retrieve("query", scope=scope) """ def __init__( self, store: MemoryStore, budget: MemoryBudget | None = None, archive_path: str | None = None, ): self.store = store self.budget = budget or MemoryBudget() self.archive = MemoryArchive(archive_path) self.consolidation = ConsolidationEngine(store, self.archive, self.budget) self.retriever = QFunctionRetriever(store, self.budget) self._new_since_consolidation = 0 def on_memory_added(self) -> None: """Called after a new memory is added. Triggers consolidation if threshold met.""" self._new_since_consolidation += 1 if self._new_since_consolidation >= self.budget.consolidation_threshold: self.check_and_consolidate() def check_and_consolidate(self) -> dict[str, int] | None: """Check if consolidation is needed and run it if so.""" active_count = len(self.store.get_by_status(MemoryStatus.PROMOTED)) if (active_count > self.budget.max_active_cards or self._new_since_consolidation >= self.budget.consolidation_threshold): self._new_since_consolidation = 0 return self.consolidation.run() return None def force_consolidation(self) -> dict[str, int]: """Force a consolidation cycle regardless of thresholds.""" self._new_since_consolidation = 0 return self.consolidation.run() def retrieve(self, query: str, scope: MemoryScope | None = None, max_cards: int = 10) -> list[MemoryCard]: """Budget-aware retrieval. Guarantees token budget is respected.""" return self.retriever.retrieve(query, scope, max_cards) @property def stats(self) -> dict[str, Any]: active = len(self.store.get_by_status(MemoryStatus.PROMOTED)) return { "active_cards": active, "max_active": self.budget.max_active_cards, "utilization": f"{active/self.budget.max_active_cards:.0%}" if self.budget.max_active_cards else "0%", "archived": self.archive.size, "consolidations_run": self.consolidation._consolidation_count, "new_since_last": self._new_since_consolidation, }