| """ |
| memory_homeostasis.py β Bounded memory with consolidation, hibernation, and archive. |
| |
| Solves: active memory must be bounded; archived evidence must remain recoverable. |
| |
| Components: |
| - MemoryBudget: hard limits on active cards, injected tokens, per-kind caps |
| - MemoryArchive: append-only cold storage (JSONL or SQLite) |
| - ConsolidationEngine: cluster β merge β compress β hibernate |
| - QFunctionRetriever: budget-aware ranking with recency decay and diversity |
| |
| Triggers: |
| - On N new memories |
| - On active_cards > max_active_cards |
| - On injected_tokens > max_injected_tokens |
| - Manual: team.consolidate_memory() |
| |
| Invariant: active injected memory NEVER exceeds token budget. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import math |
| import time |
| from collections import defaultdict |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any |
|
|
| from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus, MemoryStore |
| from purpose_agent.v2_types import MemoryScope |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class MemoryBudget: |
| """ |
| Hard limits on active memory. Enforced by the homeostasis engine. |
| |
| When any limit is exceeded, consolidation/archival is triggered automatically. |
| """ |
| max_active_cards: int = 512 |
| max_injected_tokens: int = 500 |
| max_cards_per_kind: dict[str, int] = field(default_factory=lambda: { |
| "skill_card": 100, |
| "episodic_case": 200, |
| "failure_pattern": 50, |
| "user_preference": 50, |
| "critic_calibration": 30, |
| "tool_policy": 30, |
| "purpose_contract": 10, |
| }) |
| archive_after_days: int | None = 90 |
| consolidation_threshold: int = 50 |
| chars_per_token: int = 4 |
|
|
| def estimate_tokens(self, text: str) -> int: |
| return len(text) // self.chars_per_token |
|
|
|
|
| |
| |
| |
|
|
| class MemoryArchive: |
| """ |
| Append-only cold storage for archived memories. |
| |
| Archived memories are never injected into prompts but remain |
| recoverable by source_trace_id for audit, replay, or re-promotion. |
| """ |
|
|
| def __init__(self, path: str | None = None): |
| self._path = Path(path) if path else None |
| self._archived: list[dict[str, Any]] = [] |
| if self._path and self._path.exists(): |
| self._load() |
|
|
| def archive(self, card: MemoryCard, reason: str = "") -> None: |
| """Move a card to cold storage.""" |
| entry = { |
| "id": card.id, |
| "kind": card.kind.value, |
| "pattern": card.pattern, |
| "strategy": card.strategy, |
| "content": card.content, |
| "source_trace_id": card.source_trace_id, |
| "trust_score": card.trust_score, |
| "utility_score": card.utility_score, |
| "times_retrieved": card.times_retrieved, |
| "archived_at": time.time(), |
| "reason": reason, |
| } |
| self._archived.append(entry) |
| if self._path: |
| self._append(entry) |
|
|
| def recover(self, card_id: str) -> dict[str, Any] | None: |
| """Recover an archived card by ID.""" |
| for entry in self._archived: |
| if entry["id"] == card_id: |
| return entry |
| return None |
|
|
| def recover_by_trace(self, trace_id: str) -> list[dict[str, Any]]: |
| """Recover all archived cards from a specific trace.""" |
| return [e for e in self._archived if e.get("source_trace_id") == trace_id] |
|
|
| @property |
| def size(self) -> int: |
| return len(self._archived) |
|
|
| def _append(self, entry: dict) -> None: |
| if not self._path: |
| return |
| self._path.parent.mkdir(parents=True, exist_ok=True) |
| with open(self._path, "a") as f: |
| f.write(json.dumps(entry, default=str) + "\n") |
|
|
| def _load(self) -> None: |
| if not self._path or not self._path.exists(): |
| return |
| with open(self._path) as f: |
| for line in f: |
| line = line.strip() |
| if line: |
| try: |
| self._archived.append(json.loads(line)) |
| except json.JSONDecodeError: |
| pass |
|
|
|
|
| |
| |
| |
|
|
| class ConsolidationEngine: |
| """ |
| Clusters, merges, compresses, and hibernates memories. |
| |
| Operations: |
| - cluster: group similar episodic_case cards by pattern similarity |
| - merge: promote repeated patterns into a single skill_card |
| - compress: shorten singleton low-utility cases to signatures |
| - hibernate: deactivate unused skill_cards (recoverable) |
| |
| All operations preserve source_trace_id for audit trail. |
| """ |
|
|
| def __init__(self, store: MemoryStore, archive: MemoryArchive, budget: MemoryBudget): |
| self.store = store |
| self.archive = archive |
| self.budget = budget |
| self._consolidation_count = 0 |
|
|
| def run(self) -> dict[str, int]: |
| """ |
| Run full consolidation cycle. Returns counts of actions taken. |
| """ |
| results = {"clustered": 0, "merged": 0, "compressed": 0, "hibernated": 0, "archived": 0} |
|
|
| |
| results["merged"] = self._merge_similar_episodics() |
|
|
| |
| results["hibernated"] = self._hibernate_unused() |
|
|
| |
| results["archived"] = self._archive_over_budget() |
|
|
| |
| results["archived"] += self._enforce_kind_limits() |
|
|
| self._consolidation_count += 1 |
| logger.info(f"Consolidation #{self._consolidation_count}: {results}") |
| return results |
|
|
| def _merge_similar_episodics(self) -> int: |
| """Merge similar episodic cases into skill cards.""" |
| episodics = [c for c in self.store.get_all() |
| if c.kind == MemoryKind.EPISODIC_CASE and c.status == MemoryStatus.PROMOTED] |
|
|
| if len(episodics) < 3: |
| return 0 |
|
|
| |
| groups: dict[str, list[MemoryCard]] = defaultdict(list) |
| for card in episodics: |
| key = card.pattern.lower().strip()[:50] |
| groups[key].append(card) |
|
|
| merged = 0 |
| for key, cards in groups.items(): |
| if len(cards) >= 3: |
| |
| avg_utility = sum(c.utility_score for c in cards) / len(cards) |
| merged_card = MemoryCard( |
| kind=MemoryKind.SKILL_CARD, |
| status=MemoryStatus.PROMOTED, |
| pattern=cards[0].pattern, |
| strategy=f"[CONSOLIDATED from {len(cards)} cases] " + cards[0].strategy, |
| trust_score=min(c.trust_score for c in cards), |
| utility_score=avg_utility, |
| source_trace_id=cards[0].source_trace_id, |
| created_by="consolidation", |
| ) |
| self.store.add(merged_card) |
|
|
| |
| for card in cards: |
| self.store.update_status(card.id, MemoryStatus.ARCHIVED, "consolidated") |
| self.archive.archive(card, f"merged into {merged_card.id}") |
|
|
| merged += 1 |
|
|
| return merged |
|
|
| def _hibernate_unused(self) -> int: |
| """Hibernate skill cards that haven't been useful.""" |
| hibernated = 0 |
| for card in self.store.get_all(): |
| if card.status != MemoryStatus.PROMOTED: |
| continue |
| if card.kind != MemoryKind.SKILL_CARD: |
| continue |
| |
| if card.times_retrieved >= 10 and card.utility_score < 0.2: |
| self.store.update_status(card.id, MemoryStatus.ARCHIVED, "hibernated: low utility") |
| self.archive.archive(card, "hibernated") |
| hibernated += 1 |
|
|
| return hibernated |
|
|
| def _archive_over_budget(self) -> int: |
| """Archive lowest-utility cards when over max_active_cards.""" |
| active = self.store.get_by_status(MemoryStatus.PROMOTED) |
| if len(active) <= self.budget.max_active_cards: |
| return 0 |
|
|
| |
| active.sort(key=lambda c: c.utility_score) |
| excess = len(active) - self.budget.max_active_cards |
| archived = 0 |
|
|
| for card in active[:excess]: |
| self.store.update_status(card.id, MemoryStatus.ARCHIVED, "budget: over max_active") |
| self.archive.archive(card, "budget overflow") |
| archived += 1 |
|
|
| return archived |
|
|
| def _enforce_kind_limits(self) -> int: |
| """Enforce per-kind card limits.""" |
| archived = 0 |
| for kind_str, limit in self.budget.max_cards_per_kind.items(): |
| try: |
| kind = MemoryKind(kind_str) |
| except ValueError: |
| continue |
|
|
| cards = [c for c in self.store.get_all() |
| if c.kind == kind and c.status == MemoryStatus.PROMOTED] |
|
|
| if len(cards) <= limit: |
| continue |
|
|
| |
| cards.sort(key=lambda c: c.utility_score) |
| for card in cards[:len(cards) - limit]: |
| self.store.update_status(card.id, MemoryStatus.ARCHIVED, f"kind_limit: {kind_str}") |
| self.archive.archive(card, f"kind limit ({kind_str})") |
| archived += 1 |
|
|
| return archived |
|
|
|
|
| |
| |
| |
|
|
| class QFunctionRetriever: |
| """ |
| Budget-aware memory retriever with multi-signal ranking. |
| |
| score = relevance * trust * utility * recency_decay * scope_match * diversity_penalty |
| |
| Guarantees: injected tokens NEVER exceed budget.max_injected_tokens. |
| """ |
|
|
| def __init__(self, store: MemoryStore, budget: MemoryBudget): |
| self.store = store |
| self.budget = budget |
|
|
| def retrieve( |
| self, |
| query: str, |
| scope: MemoryScope | None = None, |
| max_cards: int = 15, |
| ) -> list[MemoryCard]: |
| """ |
| Retrieve memories ranked by composite score, bounded by token budget. |
| |
| Returns only PROMOTED memories that fit within max_injected_tokens. |
| """ |
| candidates = self.store.retrieve( |
| query_text=query, |
| scope=scope, |
| statuses=[MemoryStatus.PROMOTED], |
| top_k=max_cards * 3, |
| ) |
|
|
| |
| now = time.time() |
| scored = [] |
| for card in candidates: |
| score = self._compute_score(card, query, now) |
| scored.append((score, card)) |
|
|
| scored.sort(key=lambda x: -x[0]) |
|
|
| |
| selected = [] |
| token_used = 0 |
|
|
| seen_patterns: set[str] = set() |
| for score, card in scored: |
| |
| pattern_key = (card.pattern or card.content or "")[:30].lower() |
| if pattern_key in seen_patterns: |
| continue |
| seen_patterns.add(pattern_key) |
|
|
| |
| card_text = f"{card.pattern} {card.strategy} {' '.join(card.steps)}" |
| card_tokens = self.budget.estimate_tokens(card_text) |
|
|
| if token_used + card_tokens > self.budget.max_injected_tokens: |
| break |
|
|
| selected.append(card) |
| token_used += card_tokens |
|
|
| if len(selected) >= max_cards: |
| break |
|
|
| return selected |
|
|
| def _compute_score(self, card: MemoryCard, query: str, now: float) -> float: |
| """ |
| Composite Q-function score: |
| score = relevance * trust * utility * recency_decay |
| """ |
| |
| trust = card.trust_score |
| utility = card.utility_score |
|
|
| |
| relevance = 0.5 + utility * 0.5 |
|
|
| |
| age_days = (now - card.created_at) / 86400 |
| recency = max(0.3, 1.0 - (age_days / 365)) |
|
|
| |
| score = relevance * trust * utility * recency |
|
|
| |
| if card.times_retrieved > 0 and card.times_helped > 0: |
| help_rate = card.times_helped / card.times_retrieved |
| score *= (1.0 + help_rate * 0.5) |
|
|
| return score |
|
|
|
|
| |
| |
| |
|
|
| class MemoryHomeostasis: |
| """ |
| Main controller that keeps memory bounded and healthy. |
| |
| Usage: |
| homeostasis = MemoryHomeostasis(store, budget=MemoryBudget(max_active_cards=256)) |
| |
| # After each task: |
| homeostasis.check_and_consolidate() |
| |
| # Manual trigger: |
| homeostasis.force_consolidation() |
| |
| # Budget-aware retrieval: |
| memories = homeostasis.retrieve("query", scope=scope) |
| """ |
|
|
| def __init__( |
| self, |
| store: MemoryStore, |
| budget: MemoryBudget | None = None, |
| archive_path: str | None = None, |
| ): |
| self.store = store |
| self.budget = budget or MemoryBudget() |
| self.archive = MemoryArchive(archive_path) |
| self.consolidation = ConsolidationEngine(store, self.archive, self.budget) |
| self.retriever = QFunctionRetriever(store, self.budget) |
| self._new_since_consolidation = 0 |
|
|
| def on_memory_added(self) -> None: |
| """Called after a new memory is added. Triggers consolidation if threshold met.""" |
| self._new_since_consolidation += 1 |
| if self._new_since_consolidation >= self.budget.consolidation_threshold: |
| self.check_and_consolidate() |
|
|
| def check_and_consolidate(self) -> dict[str, int] | None: |
| """Check if consolidation is needed and run it if so.""" |
| active_count = len(self.store.get_by_status(MemoryStatus.PROMOTED)) |
|
|
| if (active_count > self.budget.max_active_cards or |
| self._new_since_consolidation >= self.budget.consolidation_threshold): |
| self._new_since_consolidation = 0 |
| return self.consolidation.run() |
| return None |
|
|
| def force_consolidation(self) -> dict[str, int]: |
| """Force a consolidation cycle regardless of thresholds.""" |
| self._new_since_consolidation = 0 |
| return self.consolidation.run() |
|
|
| def retrieve(self, query: str, scope: MemoryScope | None = None, max_cards: int = 10) -> list[MemoryCard]: |
| """Budget-aware retrieval. Guarantees token budget is respected.""" |
| return self.retriever.retrieve(query, scope, max_cards) |
|
|
| @property |
| def stats(self) -> dict[str, Any]: |
| active = len(self.store.get_by_status(MemoryStatus.PROMOTED)) |
| return { |
| "active_cards": active, |
| "max_active": self.budget.max_active_cards, |
| "utilization": f"{active/self.budget.max_active_cards:.0%}" if self.budget.max_active_cards else "0%", |
| "archived": self.archive.size, |
| "consolidations_run": self.consolidation._consolidation_count, |
| "new_since_last": self._new_since_consolidation, |
| } |
|
|