purpose-agent / purpose_agent /experience_replay.py
Rohan03's picture
Track 1: purpose_agent/experience_replay.py
8bdd2de verified
"""
Experience Replay — Trajectory storage and retrieval.
Stores completed trajectories with their scores and supports retrieval
ranked by a combination of:
1. Semantic similarity (embedding-based) — from MemRL (arxiv:2601.03192)
2. Learned Q-value utility scores — from REMEMBERER (arxiv:2306.07929)
The two-phase retrieval (recall by similarity → re-rank by Q-value) separates
"semantically similar" from "functionally useful" — a key insight from MemRL.
This module is the "database" — it stores but doesn't analyze. The Optimizer
module reads from here and writes heuristics back.
"""
from __future__ import annotations
import json
import logging
import math
import os
import time
from pathlib import Path
from typing import Any
from purpose_agent.types import (
Heuristic,
MemoryRecord,
MemoryTier,
Trajectory,
TrajectoryStep,
)
logger = logging.getLogger(__name__)
class ExperienceReplay:
"""
Experience Replay buffer with two-phase retrieval.
Phase 1 (Recall): Retrieve top-k records by semantic similarity to query
Phase 2 (Re-rank): Re-order by learned Q-value utility scores
The buffer supports:
- Adding trajectories with automatic scoring
- Retrieving similar past experiences for the Actor's context
- Q-value updates after heuristics are applied (Bellman-style)
- Persistence to disk (JSON)
- Capacity management (evict lowest Q-value records when full)
Args:
capacity: Maximum number of records to store
similarity_weight: λ in retrieval score = λ·similarity + (1-λ)·q_value
persistence_path: If set, auto-save/load buffer to this file
"""
def __init__(
self,
capacity: int = 500,
similarity_weight: float = 0.6,
persistence_path: str | Path | None = None,
):
self.capacity = capacity
self.similarity_weight = similarity_weight
self.persistence_path = Path(persistence_path) if persistence_path else None
self.records: list[MemoryRecord] = []
# Load from disk if available
if self.persistence_path and self.persistence_path.exists():
self._load()
# ------------------------------------------------------------------
# Core Operations
# ------------------------------------------------------------------
def add(self, trajectory: Trajectory) -> MemoryRecord:
"""
Add a completed trajectory to the buffer.
Automatically computes a task embedding (simple TF-IDF-style hash)
and initial Q-value based on trajectory performance.
"""
# Compute initial Q-value from trajectory performance
initial_q = self._compute_initial_q(trajectory)
record = MemoryRecord(
trajectory=trajectory,
heuristics=[],
task_embedding=self._compute_embedding(trajectory.task_description),
retrieval_q_value=initial_q,
)
# Capacity management: evict lowest Q-value if full
if len(self.records) >= self.capacity:
self._evict()
self.records.append(record)
logger.info(
f"Experience Replay: Added trajectory '{trajectory.id}' "
f"(q={initial_q:.3f}, steps={len(trajectory.steps)}, "
f"Σreward={trajectory.cumulative_reward:.2f})"
)
if self.persistence_path:
self._save()
return record
def retrieve(
self,
query: str,
top_k: int = 5,
min_q_value: float = 0.0,
) -> list[MemoryRecord]:
"""
Two-phase retrieval (per MemRL arxiv:2601.03192):
Phase 1: Recall candidates by semantic similarity
Phase 2: Re-rank by Q-value utility
Returns top-k records sorted by combined score.
"""
if not self.records:
return []
query_embedding = self._compute_embedding(query)
# Phase 1: Compute similarity scores for all records
scored: list[tuple[float, MemoryRecord]] = []
for record in self.records:
if record.retrieval_q_value < min_q_value:
continue
sim = self._cosine_similarity(
query_embedding, record.task_embedding or []
)
# Phase 2: Combined score
combined = (
self.similarity_weight * sim
+ (1 - self.similarity_weight) * record.retrieval_q_value
)
scored.append((combined, record))
# Sort descending by combined score
scored.sort(key=lambda x: -x[0])
results = [record for _, record in scored[:top_k]]
logger.debug(
f"Experience Replay: Retrieved {len(results)} records for query "
f"(top score={scored[0][0]:.3f})" if scored else "no records"
)
return results
def update_q_value(
self,
record_id: str,
reward: float,
alpha: float = 0.1,
) -> None:
"""
Update a record's retrieval Q-value using Monte Carlo update.
Q_new = Q_old + α * (reward - Q_old)
From REMEMBERER (arxiv:2306.07929): α = 1/N where N = number of
updates. We use a fixed α for simplicity; override for REMEMBERER-exact.
"""
for record in self.records:
if record.id == record_id:
old_q = record.retrieval_q_value
record.retrieval_q_value += alpha * (reward - old_q)
record.retrieval_q_value = max(0.0, min(1.0, record.retrieval_q_value))
logger.debug(
f"Experience Replay: Q-value update for {record_id}: "
f"{old_q:.3f}{record.retrieval_q_value:.3f}"
)
if self.persistence_path:
self._save()
return
logger.warning(f"Experience Replay: Record {record_id} not found for Q-update")
def attach_heuristics(
self, record_id: str, heuristics: list[Heuristic]
) -> None:
"""Attach extracted heuristics to a memory record."""
for record in self.records:
if record.id == record_id:
record.heuristics = heuristics
if self.persistence_path:
self._save()
return
# ------------------------------------------------------------------
# Statistics & Queries
# ------------------------------------------------------------------
def get_top_trajectories(
self,
n: int = 10,
min_success_rate: float = 0.5,
) -> list[Trajectory]:
"""Get the n best trajectories by cumulative reward."""
candidates = [
r.trajectory for r in self.records
if r.trajectory.success_rate >= min_success_rate
]
candidates.sort(key=lambda t: -t.cumulative_reward)
return candidates[:n]
def get_all_heuristics(self, tier: MemoryTier | None = None) -> list[Heuristic]:
"""Get all extracted heuristics, optionally filtered by tier."""
heuristics = []
for record in self.records:
for h in record.heuristics:
if tier is None or h.tier == tier:
heuristics.append(h)
return heuristics
@property
def size(self) -> int:
return len(self.records)
def clear(self) -> None:
"""Reset the replay buffer. Removes all records and persists the empty state."""
self.records.clear()
if self.persistence_path:
self._save()
logger.info("Experience Replay: cleared all records")
@property
def stats(self) -> dict[str, Any]:
if not self.records:
return {"size": 0}
q_values = [r.retrieval_q_value for r in self.records]
rewards = [r.trajectory.cumulative_reward for r in self.records]
return {
"size": len(self.records),
"avg_q_value": sum(q_values) / len(q_values),
"max_q_value": max(q_values),
"avg_cumulative_reward": sum(rewards) / len(rewards),
"total_heuristics": sum(len(r.heuristics) for r in self.records),
}
# ------------------------------------------------------------------
# Embedding & Similarity (lightweight — no external deps)
# ------------------------------------------------------------------
@staticmethod
def _compute_embedding(text: str) -> list[float]:
"""
Lightweight text embedding using character n-gram hashing.
This is intentionally simple — for production, swap in a real
embedding model (sentence-transformers, OpenAI embeddings, etc.).
To use real embeddings, subclass ExperienceReplay and override
_compute_embedding() and _cosine_similarity().
"""
# Character trigram hashing into a fixed-size vector
dim = 128
vec = [0.0] * dim
text_lower = text.lower()
for i in range(len(text_lower) - 2):
trigram = text_lower[i:i + 3]
h = hash(trigram) % dim
vec[h] += 1.0
# L2 normalize
magnitude = math.sqrt(sum(x * x for x in vec))
if magnitude > 0:
vec = [x / magnitude for x in vec]
return vec
@staticmethod
def _cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors."""
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a))
mag_b = math.sqrt(sum(x * x for x in b))
if mag_a == 0 or mag_b == 0:
return 0.0
return dot / (mag_a * mag_b)
# ------------------------------------------------------------------
# Initial Q-Value Estimation
# ------------------------------------------------------------------
@staticmethod
def _compute_initial_q(trajectory: Trajectory) -> float:
"""
Compute initial Q-value from trajectory performance.
Uses a combination of:
- Success rate (fraction of steps that improved state)
- Total delta (net improvement)
- Trajectory length efficiency (shorter = better for same delta)
"""
if not trajectory.steps:
return 0.3 # Uninformative prior
success_rate = trajectory.success_rate
total_delta = trajectory.total_delta
length = len(trajectory.steps)
# Normalize total_delta to 0-1 (assuming max meaningful delta is ~10)
delta_normalized = max(0.0, min(1.0, total_delta / 10.0))
# Efficiency bonus: more progress per step = higher Q
efficiency = delta_normalized / max(length, 1)
q = 0.4 * success_rate + 0.4 * delta_normalized + 0.2 * min(efficiency * 5, 1.0)
return max(0.0, min(1.0, q))
# ------------------------------------------------------------------
# Capacity Management
# ------------------------------------------------------------------
def _evict(self) -> None:
"""Evict the lowest Q-value record."""
if not self.records:
return
worst = min(self.records, key=lambda r: r.retrieval_q_value)
self.records.remove(worst)
logger.debug(
f"Experience Replay: Evicted record {worst.id} "
f"(q={worst.retrieval_q_value:.3f})"
)
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _save(self) -> None:
"""Save buffer to disk as JSON."""
if not self.persistence_path:
return
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
data = []
for record in self.records:
data.append({
"id": record.id,
"retrieval_q_value": record.retrieval_q_value,
"task_embedding": record.task_embedding,
"trajectory": {
"id": record.trajectory.id,
"task_description": record.trajectory.task_description,
"purpose": record.trajectory.purpose,
"created_at": record.trajectory.created_at,
"cumulative_reward": record.trajectory.cumulative_reward,
"total_delta": record.trajectory.total_delta,
"success_rate": record.trajectory.success_rate,
"num_steps": len(record.trajectory.steps),
},
"heuristics": [
{
"id": h.id,
"pattern": h.pattern,
"strategy": h.strategy,
"steps": h.steps,
"tier": h.tier.value,
"q_value": h.q_value,
"times_used": h.times_used,
"times_succeeded": h.times_succeeded,
}
for h in record.heuristics
],
})
with open(self.persistence_path, "w") as f:
json.dump(data, f, indent=2, default=str)
def _load(self) -> None:
"""Load buffer from disk."""
if not self.persistence_path or not self.persistence_path.exists():
return
try:
with open(self.persistence_path) as f:
data = json.load(f)
for entry in data:
traj_data = entry["trajectory"]
trajectory = Trajectory(
task_description=traj_data["task_description"],
purpose=traj_data["purpose"],
id=traj_data["id"],
created_at=traj_data.get("created_at", time.time()),
)
heuristics = [
Heuristic(
id=h["id"],
pattern=h["pattern"],
strategy=h["strategy"],
steps=h["steps"],
tier=MemoryTier(h["tier"]),
q_value=h["q_value"],
times_used=h.get("times_used", 0),
times_succeeded=h.get("times_succeeded", 0),
)
for h in entry.get("heuristics", [])
]
record = MemoryRecord(
id=entry["id"],
trajectory=trajectory,
heuristics=heuristics,
task_embedding=entry.get("task_embedding"),
retrieval_q_value=entry.get("retrieval_q_value", 0.5),
)
self.records.append(record)
logger.info(f"Experience Replay: Loaded {len(self.records)} records from disk")
except Exception as e:
logger.error(f"Experience Replay: Failed to load from disk: {e}")