File size: 9,319 Bytes
be96ac2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | """
retroformer.py β Structured reflection with gradient-free policy improvement.
From Retroformer (arxiv:2308.02151):
A retrospective model Ξ that takes the full trajectory (states, actions,
rewards) and generates an improved prompt for the next attempt. The key
insight: the LLM agent is frozen, but the retrospective model learns
to write better prompts for it.
Adaptation for Purpose Agent (no weight updates):
Instead of training the retrospective model with policy gradients,
we use the same LLM to reflect on trajectories and extract structured
lessons. These lessons become skill_card and failure_pattern memories
that improve future prompts via the PromptCompiler.
The reflection is structured (not free-form):
1. What went well? (β skill_card memories)
2. What went wrong? (β failure_pattern memories)
3. What should change next time? (β user_preference or tool_policy memories)
4. What specific state patterns should I watch for? (β episodic_case memories)
This replaces V1's raw heuristic distillation with a more rigorous,
typed memory extraction process.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from purpose_agent.llm_backend import LLMBackend, ChatMessage
from purpose_agent.trace import Trace
from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus
from purpose_agent.v2_types import MemoryScope
from purpose_agent.memory_ci import MemoryCI
logger = logging.getLogger(__name__)
REFLECTION_PROMPT = """\
You are a RETROSPECTIVE ANALYST. Given a complete task trajectory, extract
structured lessons that will help an agent perform better next time.
Analyze the trajectory and produce EXACTLY these categories of lessons:
1. SKILLS (what worked well β reusable procedures):
- Pattern: when does this apply?
- Strategy: what to do?
- Steps: concrete action sequence?
2. FAILURES (what went wrong β patterns to avoid):
- What happened?
- Why was it wrong?
- What to do instead?
3. POLICIES (new rules or constraints discovered):
- What tool/action needs a new constraint?
- What's the constraint?
4. OBSERVATIONS (specific state patterns worth remembering):
- What state pattern was significant?
- What did it mean?
Be concrete. Use {variable} placeholders for generalizable parts.
Respond with JSON:
{
"skills": [{"pattern": "...", "strategy": "...", "steps": ["..."]}],
"failures": [{"pattern": "...", "what_happened": "...", "instead": "..."}],
"policies": [{"tool": "...", "constraint": "..."}],
"observations": [{"state_pattern": "...", "meaning": "..."}]
}
"""
REFLECTION_SCHEMA = {
"type": "object",
"properties": {
"skills": {"type": "array", "items": {"type": "object"}},
"failures": {"type": "array", "items": {"type": "object"}},
"policies": {"type": "array", "items": {"type": "object"}},
"observations": {"type": "array", "items": {"type": "object"}},
},
"required": ["skills", "failures"],
}
class Retroformer:
"""
Structured retrospective analysis of trajectories.
Replaces V1's raw heuristic distillation with typed memory extraction.
Every extracted lesson goes through the Memory CI pipeline (immune scan,
quarantine, replay test, promote/reject).
Usage:
retro = Retroformer(llm=model, memory_ci=ci)
# After each task:
memories = retro.reflect(trace)
# β Extracts skills, failures, policies, observations
# β Submits each to Memory CI for quarantine + scanning
# β Only safe, useful memories get promoted
# Over time: the agent accumulates vetted knowledge
"""
def __init__(
self,
llm: LLMBackend,
memory_ci: MemoryCI,
agent_role: str = "",
):
self.llm = llm
self.memory_ci = memory_ci
self.agent_role = agent_role
self._reflections: list[dict] = []
def reflect(self, trace: Trace) -> list[MemoryCard]:
"""
Analyze a trace and extract structured lessons as typed memories.
Returns list of submitted MemoryCards (status will be CANDIDATE or QUARANTINED).
"""
# Build trajectory summary for the LLM
summary = self._build_trajectory_summary(trace)
messages = [
ChatMessage(role="system", content=REFLECTION_PROMPT),
ChatMessage(role="user", content=summary),
]
try:
result = self.llm.generate_structured(messages, schema=REFLECTION_SCHEMA)
except Exception as e:
logger.warning(f"Retroformer: Reflection failed ({e}), attempting text parse")
raw = self.llm.generate(messages, temperature=0.5)
try:
result = json.loads(raw)
except Exception:
result = {"skills": [], "failures": []}
cards = []
# Extract skills β skill_card memories
for skill in result.get("skills", []):
card = MemoryCard(
kind=MemoryKind.SKILL_CARD,
pattern=skill.get("pattern", ""),
strategy=skill.get("strategy", ""),
steps=skill.get("steps", []),
source_trace_id=trace.trace_id,
created_by="retroformer",
scope=MemoryScope(
agent_roles=[self.agent_role] if self.agent_role else [],
),
)
self.memory_ci.submit(card)
cards.append(card)
# Extract failures β failure_pattern memories
for failure in result.get("failures", []):
card = MemoryCard(
kind=MemoryKind.FAILURE_PATTERN,
pattern=failure.get("pattern", failure.get("what_happened", "")),
strategy=f"AVOID: {failure.get('what_happened', '')}. INSTEAD: {failure.get('instead', '')}",
source_trace_id=trace.trace_id,
created_by="retroformer",
)
self.memory_ci.submit(card)
cards.append(card)
# Extract policies β tool_policy memories
for policy in result.get("policies", []):
tool_name = policy.get("tool", "")
card = MemoryCard(
kind=MemoryKind.TOOL_POLICY,
content=policy.get("constraint", ""),
strategy=policy.get("constraint", ""),
source_trace_id=trace.trace_id,
created_by="retroformer",
scope=MemoryScope(tool_names=[tool_name] if tool_name else []),
)
self.memory_ci.submit(card)
cards.append(card)
# Extract observations β episodic_case memories
for obs in result.get("observations", []):
card = MemoryCard(
kind=MemoryKind.EPISODIC_CASE,
pattern=obs.get("state_pattern", ""),
content=obs.get("meaning", ""),
source_trace_id=trace.trace_id,
created_by="retroformer",
)
self.memory_ci.submit(card)
cards.append(card)
self._reflections.append({
"trace_id": trace.trace_id,
"skills": len(result.get("skills", [])),
"failures": len(result.get("failures", [])),
"policies": len(result.get("policies", [])),
"observations": len(result.get("observations", [])),
"total_cards": len(cards),
})
logger.info(
f"Retroformer: Reflected on trace {trace.trace_id} β "
f"{len(cards)} memory candidates "
f"(skills={len(result.get('skills', []))}, "
f"failures={len(result.get('failures', []))})"
)
return cards
def _build_trajectory_summary(self, trace: Trace) -> str:
"""Build a concise trajectory summary for the reflection prompt."""
lines = [f"## Task: {trace.purpose}", f"Run mode: {trace.run_mode}", ""]
step_events = sorted(
[e for e in trace.events if e.kind in ("action", "score")],
key=lambda e: (e.step, e.kind),
)
current_step = 0
for event in step_events:
if event.step != current_step:
current_step = event.step
lines.append(f"\n### Step {current_step}")
if event.kind == "action":
lines.append(f" Action: {event.data.get('name', '?')}({json.dumps(event.data.get('params', {}), default=str)})")
if event.data.get("thought"):
lines.append(f" Thought: {event.data['thought'][:150]}")
elif event.kind == "score":
phi_b = event.data.get("phi_before", 0)
phi_a = event.data.get("phi_after", 0)
lines.append(f" Score: Ξ¦ {phi_b:.1f} β {phi_a:.1f} (Ξ={phi_a - phi_b:+.2f})")
if event.data.get("evidence"):
lines.append(f" Evidence: {event.data['evidence'][:150]}")
lines.append(f"\n## Summary")
lines.append(f"Total steps: {trace.step_count}")
lines.append(f"Duration: {trace.duration_s:.1f}s")
return "\n".join(lines)
@property
def reflections(self) -> list[dict]:
return self._reflections
|