| """ |
| retroformer.py β Structured reflection with gradient-free policy improvement. |
| |
| From Retroformer (arxiv:2308.02151): |
| A retrospective model Ξ that takes the full trajectory (states, actions, |
| rewards) and generates an improved prompt for the next attempt. The key |
| insight: the LLM agent is frozen, but the retrospective model learns |
| to write better prompts for it. |
| |
| Adaptation for Purpose Agent (no weight updates): |
| Instead of training the retrospective model with policy gradients, |
| we use the same LLM to reflect on trajectories and extract structured |
| lessons. These lessons become skill_card and failure_pattern memories |
| that improve future prompts via the PromptCompiler. |
| |
| The reflection is structured (not free-form): |
| 1. What went well? (β skill_card memories) |
| 2. What went wrong? (β failure_pattern memories) |
| 3. What should change next time? (β user_preference or tool_policy memories) |
| 4. What specific state patterns should I watch for? (β episodic_case memories) |
| |
| This replaces V1's raw heuristic distillation with a more rigorous, |
| typed memory extraction process. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from typing import Any |
|
|
| from purpose_agent.llm_backend import LLMBackend, ChatMessage |
| from purpose_agent.trace import Trace |
| from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus |
| from purpose_agent.v2_types import MemoryScope |
| from purpose_agent.memory_ci import MemoryCI |
|
|
| logger = logging.getLogger(__name__) |
|
|
| REFLECTION_PROMPT = """\ |
| You are a RETROSPECTIVE ANALYST. Given a complete task trajectory, extract |
| structured lessons that will help an agent perform better next time. |
| |
| Analyze the trajectory and produce EXACTLY these categories of lessons: |
| |
| 1. SKILLS (what worked well β reusable procedures): |
| - Pattern: when does this apply? |
| - Strategy: what to do? |
| - Steps: concrete action sequence? |
| |
| 2. FAILURES (what went wrong β patterns to avoid): |
| - What happened? |
| - Why was it wrong? |
| - What to do instead? |
| |
| 3. POLICIES (new rules or constraints discovered): |
| - What tool/action needs a new constraint? |
| - What's the constraint? |
| |
| 4. OBSERVATIONS (specific state patterns worth remembering): |
| - What state pattern was significant? |
| - What did it mean? |
| |
| Be concrete. Use {variable} placeholders for generalizable parts. |
| Respond with JSON: |
| { |
| "skills": [{"pattern": "...", "strategy": "...", "steps": ["..."]}], |
| "failures": [{"pattern": "...", "what_happened": "...", "instead": "..."}], |
| "policies": [{"tool": "...", "constraint": "..."}], |
| "observations": [{"state_pattern": "...", "meaning": "..."}] |
| } |
| """ |
|
|
| REFLECTION_SCHEMA = { |
| "type": "object", |
| "properties": { |
| "skills": {"type": "array", "items": {"type": "object"}}, |
| "failures": {"type": "array", "items": {"type": "object"}}, |
| "policies": {"type": "array", "items": {"type": "object"}}, |
| "observations": {"type": "array", "items": {"type": "object"}}, |
| }, |
| "required": ["skills", "failures"], |
| } |
|
|
|
|
| class Retroformer: |
| """ |
| Structured retrospective analysis of trajectories. |
| |
| Replaces V1's raw heuristic distillation with typed memory extraction. |
| Every extracted lesson goes through the Memory CI pipeline (immune scan, |
| quarantine, replay test, promote/reject). |
| |
| Usage: |
| retro = Retroformer(llm=model, memory_ci=ci) |
| |
| # After each task: |
| memories = retro.reflect(trace) |
| # β Extracts skills, failures, policies, observations |
| # β Submits each to Memory CI for quarantine + scanning |
| # β Only safe, useful memories get promoted |
| |
| # Over time: the agent accumulates vetted knowledge |
| """ |
|
|
| def __init__( |
| self, |
| llm: LLMBackend, |
| memory_ci: MemoryCI, |
| agent_role: str = "", |
| ): |
| self.llm = llm |
| self.memory_ci = memory_ci |
| self.agent_role = agent_role |
| self._reflections: list[dict] = [] |
|
|
| def reflect(self, trace: Trace) -> list[MemoryCard]: |
| """ |
| Analyze a trace and extract structured lessons as typed memories. |
| |
| Returns list of submitted MemoryCards (status will be CANDIDATE or QUARANTINED). |
| """ |
| |
| summary = self._build_trajectory_summary(trace) |
|
|
| messages = [ |
| ChatMessage(role="system", content=REFLECTION_PROMPT), |
| ChatMessage(role="user", content=summary), |
| ] |
|
|
| try: |
| result = self.llm.generate_structured(messages, schema=REFLECTION_SCHEMA) |
| except Exception as e: |
| logger.warning(f"Retroformer: Reflection failed ({e}), attempting text parse") |
| raw = self.llm.generate(messages, temperature=0.5) |
| try: |
| result = json.loads(raw) |
| except Exception: |
| result = {"skills": [], "failures": []} |
|
|
| cards = [] |
|
|
| |
| for skill in result.get("skills", []): |
| card = MemoryCard( |
| kind=MemoryKind.SKILL_CARD, |
| pattern=skill.get("pattern", ""), |
| strategy=skill.get("strategy", ""), |
| steps=skill.get("steps", []), |
| source_trace_id=trace.trace_id, |
| created_by="retroformer", |
| scope=MemoryScope( |
| agent_roles=[self.agent_role] if self.agent_role else [], |
| ), |
| ) |
| self.memory_ci.submit(card) |
| cards.append(card) |
|
|
| |
| for failure in result.get("failures", []): |
| card = MemoryCard( |
| kind=MemoryKind.FAILURE_PATTERN, |
| pattern=failure.get("pattern", failure.get("what_happened", "")), |
| strategy=f"AVOID: {failure.get('what_happened', '')}. INSTEAD: {failure.get('instead', '')}", |
| source_trace_id=trace.trace_id, |
| created_by="retroformer", |
| ) |
| self.memory_ci.submit(card) |
| cards.append(card) |
|
|
| |
| for policy in result.get("policies", []): |
| tool_name = policy.get("tool", "") |
| card = MemoryCard( |
| kind=MemoryKind.TOOL_POLICY, |
| content=policy.get("constraint", ""), |
| strategy=policy.get("constraint", ""), |
| source_trace_id=trace.trace_id, |
| created_by="retroformer", |
| scope=MemoryScope(tool_names=[tool_name] if tool_name else []), |
| ) |
| self.memory_ci.submit(card) |
| cards.append(card) |
|
|
| |
| for obs in result.get("observations", []): |
| card = MemoryCard( |
| kind=MemoryKind.EPISODIC_CASE, |
| pattern=obs.get("state_pattern", ""), |
| content=obs.get("meaning", ""), |
| source_trace_id=trace.trace_id, |
| created_by="retroformer", |
| ) |
| self.memory_ci.submit(card) |
| cards.append(card) |
|
|
| self._reflections.append({ |
| "trace_id": trace.trace_id, |
| "skills": len(result.get("skills", [])), |
| "failures": len(result.get("failures", [])), |
| "policies": len(result.get("policies", [])), |
| "observations": len(result.get("observations", [])), |
| "total_cards": len(cards), |
| }) |
|
|
| logger.info( |
| f"Retroformer: Reflected on trace {trace.trace_id} β " |
| f"{len(cards)} memory candidates " |
| f"(skills={len(result.get('skills', []))}, " |
| f"failures={len(result.get('failures', []))})" |
| ) |
| return cards |
|
|
| def _build_trajectory_summary(self, trace: Trace) -> str: |
| """Build a concise trajectory summary for the reflection prompt.""" |
| lines = [f"## Task: {trace.purpose}", f"Run mode: {trace.run_mode}", ""] |
|
|
| step_events = sorted( |
| [e for e in trace.events if e.kind in ("action", "score")], |
| key=lambda e: (e.step, e.kind), |
| ) |
|
|
| current_step = 0 |
| for event in step_events: |
| if event.step != current_step: |
| current_step = event.step |
| lines.append(f"\n### Step {current_step}") |
|
|
| if event.kind == "action": |
| lines.append(f" Action: {event.data.get('name', '?')}({json.dumps(event.data.get('params', {}), default=str)})") |
| if event.data.get("thought"): |
| lines.append(f" Thought: {event.data['thought'][:150]}") |
|
|
| elif event.kind == "score": |
| phi_b = event.data.get("phi_before", 0) |
| phi_a = event.data.get("phi_after", 0) |
| lines.append(f" Score: Ξ¦ {phi_b:.1f} β {phi_a:.1f} (Ξ={phi_a - phi_b:+.2f})") |
| if event.data.get("evidence"): |
| lines.append(f" Evidence: {event.data['evidence'][:150]}") |
|
|
| lines.append(f"\n## Summary") |
| lines.append(f"Total steps: {trace.step_count}") |
| lines.append(f"Duration: {trace.duration_s:.1f}s") |
|
|
| return "\n".join(lines) |
|
|
| @property |
| def reflections(self) -> list[dict]: |
| return self._reflections |
|
|