purpose-agent / purpose_agent /retroformer.py
Rohan03's picture
V2 merge: purpose_agent/retroformer.py
be96ac2 verified
"""
retroformer.py β€” Structured reflection with gradient-free policy improvement.
From Retroformer (arxiv:2308.02151):
A retrospective model Ξ“ that takes the full trajectory (states, actions,
rewards) and generates an improved prompt for the next attempt. The key
insight: the LLM agent is frozen, but the retrospective model learns
to write better prompts for it.
Adaptation for Purpose Agent (no weight updates):
Instead of training the retrospective model with policy gradients,
we use the same LLM to reflect on trajectories and extract structured
lessons. These lessons become skill_card and failure_pattern memories
that improve future prompts via the PromptCompiler.
The reflection is structured (not free-form):
1. What went well? (β†’ skill_card memories)
2. What went wrong? (β†’ failure_pattern memories)
3. What should change next time? (β†’ user_preference or tool_policy memories)
4. What specific state patterns should I watch for? (β†’ episodic_case memories)
This replaces V1's raw heuristic distillation with a more rigorous,
typed memory extraction process.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from purpose_agent.llm_backend import LLMBackend, ChatMessage
from purpose_agent.trace import Trace
from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus
from purpose_agent.v2_types import MemoryScope
from purpose_agent.memory_ci import MemoryCI
logger = logging.getLogger(__name__)
REFLECTION_PROMPT = """\
You are a RETROSPECTIVE ANALYST. Given a complete task trajectory, extract
structured lessons that will help an agent perform better next time.
Analyze the trajectory and produce EXACTLY these categories of lessons:
1. SKILLS (what worked well β€” reusable procedures):
- Pattern: when does this apply?
- Strategy: what to do?
- Steps: concrete action sequence?
2. FAILURES (what went wrong β€” patterns to avoid):
- What happened?
- Why was it wrong?
- What to do instead?
3. POLICIES (new rules or constraints discovered):
- What tool/action needs a new constraint?
- What's the constraint?
4. OBSERVATIONS (specific state patterns worth remembering):
- What state pattern was significant?
- What did it mean?
Be concrete. Use {variable} placeholders for generalizable parts.
Respond with JSON:
{
"skills": [{"pattern": "...", "strategy": "...", "steps": ["..."]}],
"failures": [{"pattern": "...", "what_happened": "...", "instead": "..."}],
"policies": [{"tool": "...", "constraint": "..."}],
"observations": [{"state_pattern": "...", "meaning": "..."}]
}
"""
REFLECTION_SCHEMA = {
"type": "object",
"properties": {
"skills": {"type": "array", "items": {"type": "object"}},
"failures": {"type": "array", "items": {"type": "object"}},
"policies": {"type": "array", "items": {"type": "object"}},
"observations": {"type": "array", "items": {"type": "object"}},
},
"required": ["skills", "failures"],
}
class Retroformer:
"""
Structured retrospective analysis of trajectories.
Replaces V1's raw heuristic distillation with typed memory extraction.
Every extracted lesson goes through the Memory CI pipeline (immune scan,
quarantine, replay test, promote/reject).
Usage:
retro = Retroformer(llm=model, memory_ci=ci)
# After each task:
memories = retro.reflect(trace)
# β†’ Extracts skills, failures, policies, observations
# β†’ Submits each to Memory CI for quarantine + scanning
# β†’ Only safe, useful memories get promoted
# Over time: the agent accumulates vetted knowledge
"""
def __init__(
self,
llm: LLMBackend,
memory_ci: MemoryCI,
agent_role: str = "",
):
self.llm = llm
self.memory_ci = memory_ci
self.agent_role = agent_role
self._reflections: list[dict] = []
def reflect(self, trace: Trace) -> list[MemoryCard]:
"""
Analyze a trace and extract structured lessons as typed memories.
Returns list of submitted MemoryCards (status will be CANDIDATE or QUARANTINED).
"""
# Build trajectory summary for the LLM
summary = self._build_trajectory_summary(trace)
messages = [
ChatMessage(role="system", content=REFLECTION_PROMPT),
ChatMessage(role="user", content=summary),
]
try:
result = self.llm.generate_structured(messages, schema=REFLECTION_SCHEMA)
except Exception as e:
logger.warning(f"Retroformer: Reflection failed ({e}), attempting text parse")
raw = self.llm.generate(messages, temperature=0.5)
try:
result = json.loads(raw)
except Exception:
result = {"skills": [], "failures": []}
cards = []
# Extract skills β†’ skill_card memories
for skill in result.get("skills", []):
card = MemoryCard(
kind=MemoryKind.SKILL_CARD,
pattern=skill.get("pattern", ""),
strategy=skill.get("strategy", ""),
steps=skill.get("steps", []),
source_trace_id=trace.trace_id,
created_by="retroformer",
scope=MemoryScope(
agent_roles=[self.agent_role] if self.agent_role else [],
),
)
self.memory_ci.submit(card)
cards.append(card)
# Extract failures β†’ failure_pattern memories
for failure in result.get("failures", []):
card = MemoryCard(
kind=MemoryKind.FAILURE_PATTERN,
pattern=failure.get("pattern", failure.get("what_happened", "")),
strategy=f"AVOID: {failure.get('what_happened', '')}. INSTEAD: {failure.get('instead', '')}",
source_trace_id=trace.trace_id,
created_by="retroformer",
)
self.memory_ci.submit(card)
cards.append(card)
# Extract policies β†’ tool_policy memories
for policy in result.get("policies", []):
tool_name = policy.get("tool", "")
card = MemoryCard(
kind=MemoryKind.TOOL_POLICY,
content=policy.get("constraint", ""),
strategy=policy.get("constraint", ""),
source_trace_id=trace.trace_id,
created_by="retroformer",
scope=MemoryScope(tool_names=[tool_name] if tool_name else []),
)
self.memory_ci.submit(card)
cards.append(card)
# Extract observations β†’ episodic_case memories
for obs in result.get("observations", []):
card = MemoryCard(
kind=MemoryKind.EPISODIC_CASE,
pattern=obs.get("state_pattern", ""),
content=obs.get("meaning", ""),
source_trace_id=trace.trace_id,
created_by="retroformer",
)
self.memory_ci.submit(card)
cards.append(card)
self._reflections.append({
"trace_id": trace.trace_id,
"skills": len(result.get("skills", [])),
"failures": len(result.get("failures", [])),
"policies": len(result.get("policies", [])),
"observations": len(result.get("observations", [])),
"total_cards": len(cards),
})
logger.info(
f"Retroformer: Reflected on trace {trace.trace_id} β†’ "
f"{len(cards)} memory candidates "
f"(skills={len(result.get('skills', []))}, "
f"failures={len(result.get('failures', []))})"
)
return cards
def _build_trajectory_summary(self, trace: Trace) -> str:
"""Build a concise trajectory summary for the reflection prompt."""
lines = [f"## Task: {trace.purpose}", f"Run mode: {trace.run_mode}", ""]
step_events = sorted(
[e for e in trace.events if e.kind in ("action", "score")],
key=lambda e: (e.step, e.kind),
)
current_step = 0
for event in step_events:
if event.step != current_step:
current_step = event.step
lines.append(f"\n### Step {current_step}")
if event.kind == "action":
lines.append(f" Action: {event.data.get('name', '?')}({json.dumps(event.data.get('params', {}), default=str)})")
if event.data.get("thought"):
lines.append(f" Thought: {event.data['thought'][:150]}")
elif event.kind == "score":
phi_b = event.data.get("phi_before", 0)
phi_a = event.data.get("phi_after", 0)
lines.append(f" Score: Ξ¦ {phi_b:.1f} β†’ {phi_a:.1f} (Ξ”={phi_a - phi_b:+.2f})")
if event.data.get("evidence"):
lines.append(f" Evidence: {event.data['evidence'][:150]}")
lines.append(f"\n## Summary")
lines.append(f"Total steps: {trace.step_count}")
lines.append(f"Duration: {trace.duration_s:.1f}s")
return "\n".join(lines)
@property
def reflections(self) -> list[dict]:
return self._reflections