| """ |
| memory.py β Typed, versioned, scoped, reversible memory system. |
| |
| Replaces the flat heuristic library with a structured memory store where |
| every memory has a kind, status, scope, trust score, and full provenance. |
| |
| Memory lifecycle: |
| candidate β quarantined (immune scan) β promoted (replay-tested) β archived |
| β rejected (if scan/test fails) |
| |
| Memory kinds: |
| purpose_contract β the user's stated goal and constraints |
| user_preference β learned user-specific preferences ("always cite sources") |
| skill_card β reusable procedure extracted from successful trajectories |
| episodic_case β a specific (state, action, outcome) triple worth remembering |
| failure_pattern β a pattern that led to failure (negative heuristic) |
| critic_calibration β learned adjustments to the Purpose Function's scoring |
| tool_policy β usage constraints and tips for specific tools |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import math |
| import time |
| import uuid |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from pathlib import Path |
| from typing import Any |
|
|
| from purpose_agent.v2_types import MemoryScope |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class MemoryKind(Enum): |
| PURPOSE_CONTRACT = "purpose_contract" |
| USER_PREFERENCE = "user_preference" |
| SKILL_CARD = "skill_card" |
| EPISODIC_CASE = "episodic_case" |
| FAILURE_PATTERN = "failure_pattern" |
| CRITIC_CALIBRATION = "critic_calibration" |
| TOOL_POLICY = "tool_policy" |
|
|
|
|
| class MemoryStatus(Enum): |
| CANDIDATE = "candidate" |
| QUARANTINED = "quarantined" |
| PROMOTED = "promoted" |
| REJECTED = "rejected" |
| ARCHIVED = "archived" |
|
|
|
|
| @dataclass |
| class MemoryCard: |
| """ |
| A single unit of agent memory. Every field is tracked for provenance. |
| """ |
| id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) |
| kind: MemoryKind = MemoryKind.SKILL_CARD |
| status: MemoryStatus = MemoryStatus.CANDIDATE |
| scope: MemoryScope = field(default_factory=MemoryScope) |
|
|
| |
| content: str = "" |
| pattern: str = "" |
| strategy: str = "" |
| steps: list[str] = field(default_factory=list) |
|
|
| |
| trust_score: float = 0.5 |
| utility_score: float = 0.5 |
| times_retrieved: int = 0 |
| times_helped: int = 0 |
| times_hurt: int = 0 |
|
|
| |
| source_trace_id: str = "" |
| source_step: int = 0 |
| created_at: float = field(default_factory=time.time) |
| created_by: str = "" |
| version: int = 1 |
| parent_id: str = "" |
| rejection_reason: str = "" |
| immune_scan_result: dict[str, Any] = field(default_factory=dict) |
|
|
| |
| embedding: list[float] | None = None |
|
|
| def update_utility(self, helped: bool, alpha: float = 0.1) -> None: |
| """Monte Carlo utility update: U_new = U + Ξ±(reward - U).""" |
| self.times_retrieved += 1 |
| if helped: |
| self.times_helped += 1 |
| self.utility_score += alpha * (1.0 - self.utility_score) |
| else: |
| self.times_hurt += 1 |
| self.utility_score += alpha * (0.0 - self.utility_score) |
| self.utility_score = max(0.0, min(1.0, self.utility_score)) |
|
|
| @property |
| def empirical_help_rate(self) -> float: |
| if self.times_retrieved == 0: |
| return 0.5 |
| return self.times_helped / self.times_retrieved |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "id": self.id, |
| "kind": self.kind.value, |
| "status": self.status.value, |
| "scope": { |
| "agent_roles": self.scope.agent_roles, |
| "tool_names": self.scope.tool_names, |
| "task_categories": self.scope.task_categories, |
| "user_id": self.scope.user_id, |
| }, |
| "content": self.content, |
| "pattern": self.pattern, |
| "strategy": self.strategy, |
| "steps": self.steps, |
| "trust_score": self.trust_score, |
| "utility_score": self.utility_score, |
| "times_retrieved": self.times_retrieved, |
| "times_helped": self.times_helped, |
| "times_hurt": self.times_hurt, |
| "source_trace_id": self.source_trace_id, |
| "created_at": self.created_at, |
| "created_by": self.created_by, |
| "version": self.version, |
| "parent_id": self.parent_id, |
| "status_detail": self.rejection_reason, |
| "immune_scan": self.immune_scan_result, |
| } |
|
|
| @classmethod |
| def from_dict(cls, d: dict) -> "MemoryCard": |
| scope_d = d.get("scope", {}) |
| return cls( |
| id=d.get("id", uuid.uuid4().hex[:12]), |
| kind=MemoryKind(d.get("kind", "skill_card")), |
| status=MemoryStatus(d.get("status", "candidate")), |
| scope=MemoryScope( |
| agent_roles=scope_d.get("agent_roles", []), |
| tool_names=scope_d.get("tool_names", []), |
| task_categories=scope_d.get("task_categories", []), |
| user_id=scope_d.get("user_id", ""), |
| ), |
| content=d.get("content", ""), |
| pattern=d.get("pattern", ""), |
| strategy=d.get("strategy", ""), |
| steps=d.get("steps", []), |
| trust_score=d.get("trust_score", 0.5), |
| utility_score=d.get("utility_score", 0.5), |
| times_retrieved=d.get("times_retrieved", 0), |
| times_helped=d.get("times_helped", 0), |
| times_hurt=d.get("times_hurt", 0), |
| source_trace_id=d.get("source_trace_id", ""), |
| created_at=d.get("created_at", time.time()), |
| created_by=d.get("created_by", ""), |
| version=d.get("version", 1), |
| parent_id=d.get("parent_id", ""), |
| rejection_reason=d.get("status_detail", ""), |
| immune_scan_result=d.get("immune_scan", {}), |
| ) |
|
|
|
|
| class MemoryStore: |
| """ |
| Persistent, queryable store of MemoryCards. |
| |
| Supports: |
| - Add/retrieve/update/reject memories |
| - Filter by kind, status, scope |
| - Ranked retrieval (relevance Γ trust Γ utility) |
| - Persistence to JSON |
| """ |
|
|
| def __init__(self, persistence_path: str | None = None): |
| self._cards: dict[str, MemoryCard] = {} |
| self._path = Path(persistence_path) if persistence_path else None |
| if self._path and self._path.exists(): |
| self._load() |
|
|
| def add(self, card: MemoryCard) -> MemoryCard: |
| """Add a memory card. Returns the card (with id assigned).""" |
| self._cards[card.id] = card |
| self._save() |
| return card |
|
|
| def get(self, card_id: str) -> MemoryCard | None: |
| return self._cards.get(card_id) |
|
|
| def update_status(self, card_id: str, status: MemoryStatus, reason: str = "") -> None: |
| card = self._cards.get(card_id) |
| if card: |
| card.status = status |
| if reason: |
| card.rejection_reason = reason |
| self._save() |
|
|
| def retrieve( |
| self, |
| query_text: str = "", |
| scope: MemoryScope | None = None, |
| kinds: list[MemoryKind] | None = None, |
| statuses: list[MemoryStatus] | None = None, |
| top_k: int = 10, |
| ) -> list[MemoryCard]: |
| """ |
| Retrieve memories ranked by composite score. |
| Default: only promoted memories. |
| """ |
| statuses = statuses or [MemoryStatus.PROMOTED] |
| candidates = [] |
| query_emb = self._embed(query_text) if query_text else None |
|
|
| for card in self._cards.values(): |
| if card.status not in statuses: |
| continue |
| if kinds and card.kind not in kinds: |
| continue |
| if scope and not card.scope.matches(scope): |
| continue |
|
|
| |
| relevance = 0.5 |
| if query_emb and card.embedding: |
| relevance = self._cosine(query_emb, card.embedding) |
| elif query_emb: |
| card.embedding = self._embed(card.content or card.pattern) |
| relevance = self._cosine(query_emb, card.embedding) |
|
|
| score = 0.4 * relevance + 0.3 * card.trust_score + 0.3 * card.utility_score |
| candidates.append((score, card)) |
|
|
| candidates.sort(key=lambda x: -x[0]) |
| return [c for _, c in candidates[:top_k]] |
|
|
| def get_by_status(self, status: MemoryStatus) -> list[MemoryCard]: |
| return [c for c in self._cards.values() if c.status == status] |
|
|
| def get_all(self) -> list[MemoryCard]: |
| return list(self._cards.values()) |
|
|
| @property |
| def size(self) -> int: |
| return len(self._cards) |
|
|
| def stats(self) -> dict[str, Any]: |
| by_status = {} |
| by_kind = {} |
| for c in self._cards.values(): |
| by_status[c.status.value] = by_status.get(c.status.value, 0) + 1 |
| by_kind[c.kind.value] = by_kind.get(c.kind.value, 0) + 1 |
| return {"total": self.size, "by_status": by_status, "by_kind": by_kind} |
|
|
| |
|
|
| def _save(self) -> None: |
| if not self._path: |
| return |
| self._path.parent.mkdir(parents=True, exist_ok=True) |
| data = [c.to_dict() for c in self._cards.values()] |
| with open(self._path, "w") as f: |
| json.dump(data, f, indent=2, default=str) |
|
|
| def _load(self) -> None: |
| if not self._path or not self._path.exists(): |
| return |
| try: |
| with open(self._path) as f: |
| data = json.load(f) |
| for d in data: |
| card = MemoryCard.from_dict(d) |
| self._cards[card.id] = card |
| logger.info(f"MemoryStore: loaded {len(self._cards)} cards") |
| except Exception as e: |
| logger.error(f"MemoryStore: load failed: {e}") |
|
|
| |
|
|
| @staticmethod |
| def _embed(text: str) -> list[float]: |
| dim = 128 |
| vec = [0.0] * dim |
| for i in range(len(text) - 2): |
| h = hash(text[i:i+3].lower()) % dim |
| vec[h] += 1.0 |
| mag = math.sqrt(sum(x*x for x in vec)) |
| return [x/mag for x in vec] if mag > 0 else vec |
|
|
| @staticmethod |
| def _cosine(a: list[float], b: list[float]) -> float: |
| if not a or not b or len(a) != len(b): |
| return 0.0 |
| dot = sum(x*y for x, y in zip(a, b)) |
| ma = math.sqrt(sum(x*x for x in a)) |
| mb = math.sqrt(sum(x*x for x in b)) |
| return dot / (ma * mb) if ma > 0 and mb > 0 else 0.0 |
|
|