""" retroformer.py — Structured reflection with gradient-free policy improvement. From Retroformer (arxiv:2308.02151): A retrospective model Γ that takes the full trajectory (states, actions, rewards) and generates an improved prompt for the next attempt. The key insight: the LLM agent is frozen, but the retrospective model learns to write better prompts for it. Adaptation for Purpose Agent (no weight updates): Instead of training the retrospective model with policy gradients, we use the same LLM to reflect on trajectories and extract structured lessons. These lessons become skill_card and failure_pattern memories that improve future prompts via the PromptCompiler. The reflection is structured (not free-form): 1. What went well? (→ skill_card memories) 2. What went wrong? (→ failure_pattern memories) 3. What should change next time? (→ user_preference or tool_policy memories) 4. What specific state patterns should I watch for? (→ episodic_case memories) This replaces V1's raw heuristic distillation with a more rigorous, typed memory extraction process. """ from __future__ import annotations import json import logging from typing import Any from purpose_agent.llm_backend import LLMBackend, ChatMessage from purpose_agent.trace import Trace from purpose_agent.memory import MemoryCard, MemoryKind, MemoryStatus from purpose_agent.v2_types import MemoryScope from purpose_agent.memory_ci import MemoryCI logger = logging.getLogger(__name__) REFLECTION_PROMPT = """\ You are a RETROSPECTIVE ANALYST. Given a complete task trajectory, extract structured lessons that will help an agent perform better next time. Analyze the trajectory and produce EXACTLY these categories of lessons: 1. SKILLS (what worked well — reusable procedures): - Pattern: when does this apply? - Strategy: what to do? - Steps: concrete action sequence? 2. FAILURES (what went wrong — patterns to avoid): - What happened? - Why was it wrong? - What to do instead? 3. POLICIES (new rules or constraints discovered): - What tool/action needs a new constraint? - What's the constraint? 4. OBSERVATIONS (specific state patterns worth remembering): - What state pattern was significant? - What did it mean? Be concrete. Use {variable} placeholders for generalizable parts. Respond with JSON: { "skills": [{"pattern": "...", "strategy": "...", "steps": ["..."]}], "failures": [{"pattern": "...", "what_happened": "...", "instead": "..."}], "policies": [{"tool": "...", "constraint": "..."}], "observations": [{"state_pattern": "...", "meaning": "..."}] } """ REFLECTION_SCHEMA = { "type": "object", "properties": { "skills": {"type": "array", "items": {"type": "object"}}, "failures": {"type": "array", "items": {"type": "object"}}, "policies": {"type": "array", "items": {"type": "object"}}, "observations": {"type": "array", "items": {"type": "object"}}, }, "required": ["skills", "failures"], } class Retroformer: """ Structured retrospective analysis of trajectories. Replaces V1's raw heuristic distillation with typed memory extraction. Every extracted lesson goes through the Memory CI pipeline (immune scan, quarantine, replay test, promote/reject). Usage: retro = Retroformer(llm=model, memory_ci=ci) # After each task: memories = retro.reflect(trace) # → Extracts skills, failures, policies, observations # → Submits each to Memory CI for quarantine + scanning # → Only safe, useful memories get promoted # Over time: the agent accumulates vetted knowledge """ def __init__( self, llm: LLMBackend, memory_ci: MemoryCI, agent_role: str = "", ): self.llm = llm self.memory_ci = memory_ci self.agent_role = agent_role self._reflections: list[dict] = [] def reflect(self, trace: Trace) -> list[MemoryCard]: """ Analyze a trace and extract structured lessons as typed memories. Returns list of submitted MemoryCards (status will be CANDIDATE or QUARANTINED). """ # Build trajectory summary for the LLM summary = self._build_trajectory_summary(trace) messages = [ ChatMessage(role="system", content=REFLECTION_PROMPT), ChatMessage(role="user", content=summary), ] try: result = self.llm.generate_structured(messages, schema=REFLECTION_SCHEMA) except Exception as e: logger.warning(f"Retroformer: Reflection failed ({e}), attempting text parse") raw = self.llm.generate(messages, temperature=0.5) try: result = json.loads(raw) except Exception: result = {"skills": [], "failures": []} cards = [] # Extract skills → skill_card memories for skill in result.get("skills", []): card = MemoryCard( kind=MemoryKind.SKILL_CARD, pattern=skill.get("pattern", ""), strategy=skill.get("strategy", ""), steps=skill.get("steps", []), source_trace_id=trace.trace_id, created_by="retroformer", scope=MemoryScope( agent_roles=[self.agent_role] if self.agent_role else [], ), ) self.memory_ci.submit(card) cards.append(card) # Extract failures → failure_pattern memories for failure in result.get("failures", []): card = MemoryCard( kind=MemoryKind.FAILURE_PATTERN, pattern=failure.get("pattern", failure.get("what_happened", "")), strategy=f"AVOID: {failure.get('what_happened', '')}. INSTEAD: {failure.get('instead', '')}", source_trace_id=trace.trace_id, created_by="retroformer", ) self.memory_ci.submit(card) cards.append(card) # Extract policies → tool_policy memories for policy in result.get("policies", []): tool_name = policy.get("tool", "") card = MemoryCard( kind=MemoryKind.TOOL_POLICY, content=policy.get("constraint", ""), strategy=policy.get("constraint", ""), source_trace_id=trace.trace_id, created_by="retroformer", scope=MemoryScope(tool_names=[tool_name] if tool_name else []), ) self.memory_ci.submit(card) cards.append(card) # Extract observations → episodic_case memories for obs in result.get("observations", []): card = MemoryCard( kind=MemoryKind.EPISODIC_CASE, pattern=obs.get("state_pattern", ""), content=obs.get("meaning", ""), source_trace_id=trace.trace_id, created_by="retroformer", ) self.memory_ci.submit(card) cards.append(card) self._reflections.append({ "trace_id": trace.trace_id, "skills": len(result.get("skills", [])), "failures": len(result.get("failures", [])), "policies": len(result.get("policies", [])), "observations": len(result.get("observations", [])), "total_cards": len(cards), }) logger.info( f"Retroformer: Reflected on trace {trace.trace_id} → " f"{len(cards)} memory candidates " f"(skills={len(result.get('skills', []))}, " f"failures={len(result.get('failures', []))})" ) return cards def _build_trajectory_summary(self, trace: Trace) -> str: """Build a concise trajectory summary for the reflection prompt.""" lines = [f"## Task: {trace.purpose}", f"Run mode: {trace.run_mode}", ""] step_events = sorted( [e for e in trace.events if e.kind in ("action", "score")], key=lambda e: (e.step, e.kind), ) current_step = 0 for event in step_events: if event.step != current_step: current_step = event.step lines.append(f"\n### Step {current_step}") if event.kind == "action": lines.append(f" Action: {event.data.get('name', '?')}({json.dumps(event.data.get('params', {}), default=str)})") if event.data.get("thought"): lines.append(f" Thought: {event.data['thought'][:150]}") elif event.kind == "score": phi_b = event.data.get("phi_before", 0) phi_a = event.data.get("phi_after", 0) lines.append(f" Score: Φ {phi_b:.1f} → {phi_a:.1f} (Δ={phi_a - phi_b:+.2f})") if event.data.get("evidence"): lines.append(f" Evidence: {event.data['evidence'][:150]}") lines.append(f"\n## Summary") lines.append(f"Total steps: {trace.step_count}") lines.append(f"Duration: {trace.duration_s:.1f}s") return "\n".join(lines) @property def reflections(self) -> list[dict]: return self._reflections