| """ |
| Human-in-the-Loop — Checkpoint, interrupt, resume, and Φ score overrides. |
| |
| Allows humans to: |
| 1. Pause the agent at any step and inspect state |
| 2. Override the Purpose Function's Φ score (teach the agent what "good" means) |
| 3. Edit the agent's planned action before execution |
| 4. Resume from any checkpoint after review |
| 5. Inject heuristics directly ("when you see X, always do Y") |
| |
| Φ score overrides are the killer feature: humans can correct the critic, |
| and the corrected scores flow into experience replay → the agent permanently |
| learns the human's preference. No fine-tuning needed. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import time |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from pathlib import Path |
| from typing import Any, Callable |
|
|
| from purpose_agent.types import ( |
| Action, Heuristic, MemoryTier, PurposeScore, State, |
| Trajectory, TrajectoryStep, |
| ) |
| from purpose_agent.llm_backend import LLMBackend |
| from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Checkpoint: |
| """ |
| A serializable snapshot of the entire agent state at a point in time. |
| |
| Can be saved to disk and restored later to resume execution. |
| """ |
| step_index: int |
| current_state: dict[str, Any] |
| state_summary: str |
| trajectory_steps: list[dict[str, Any]] |
| purpose: str |
| task_description: str |
| history: list[dict[str, Any]] |
| heuristics: list[dict[str, Any]] |
| timestamp: float = field(default_factory=time.time) |
| checkpoint_id: str = "" |
|
|
| def save(self, path: str) -> None: |
| """Save checkpoint to disk.""" |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w") as f: |
| json.dump(self.__dict__, f, indent=2, default=str) |
| logger.info(f"Checkpoint saved: {path}") |
|
|
| @classmethod |
| def load(cls, path: str) -> "Checkpoint": |
| """Load checkpoint from disk.""" |
| with open(path) as f: |
| data = json.load(f) |
| return cls(**data) |
|
|
|
|
| |
| |
| |
|
|
| class InterruptType(Enum): |
| """Types of human-in-the-loop interrupts.""" |
| APPROVE_ACTION = "approve_action" |
| OVERRIDE_SCORE = "override_score" |
| EDIT_ACTION = "edit_action" |
| INJECT_HEURISTIC = "inject_heuristic" |
| PAUSE = "pause" |
| ABORT = "abort" |
|
|
|
|
| @dataclass |
| class HumanInput: |
| """Input received from a human during an interrupt.""" |
| interrupt_type: InterruptType |
| approved: bool = True |
| edited_action: Action | None = None |
| override_score: PurposeScore | None = None |
| injected_heuristic: Heuristic | None = None |
| message: str = "" |
| timestamp: float = field(default_factory=time.time) |
|
|
|
|
| |
| |
| |
|
|
| class HumanInputHandler: |
| """ |
| Handler that collects human input during interrupts. |
| |
| Override this for your UI: |
| - CLIInputHandler: Terminal prompts (default) |
| - WebInputHandler: Web-based approval UI |
| - SlackInputHandler: Slack bot approvals |
| - AutoApproveHandler: Approve everything (for testing) |
| """ |
|
|
| def request_input( |
| self, |
| interrupt_type: InterruptType, |
| context: dict[str, Any], |
| ) -> HumanInput: |
| """ |
| Request input from a human. |
| |
| Args: |
| interrupt_type: What kind of input is needed |
| context: Current state, proposed action, scores, etc. |
| |
| Returns: |
| HumanInput with the human's decision |
| """ |
| raise NotImplementedError("Override in subclass") |
|
|
|
|
| class AutoApproveHandler(HumanInputHandler): |
| """Automatically approve everything. For testing / autonomous mode.""" |
|
|
| def request_input(self, interrupt_type, context): |
| return HumanInput(interrupt_type=interrupt_type, approved=True) |
|
|
|
|
| class CLIInputHandler(HumanInputHandler): |
| """Command-line human-in-the-loop handler.""" |
|
|
| def request_input( |
| self, |
| interrupt_type: InterruptType, |
| context: dict[str, Any], |
| ) -> HumanInput: |
|
|
| print("\n" + "=" * 60) |
| print(f" 🧑 HUMAN INPUT REQUESTED: {interrupt_type.value}") |
| print("=" * 60) |
|
|
| if interrupt_type == InterruptType.APPROVE_ACTION: |
| print(f"\n Proposed action: {context.get('action_name', '?')}") |
| print(f" Thought: {context.get('thought', '')[:200]}") |
| print(f" Expected delta: {context.get('expected_delta', '')[:200]}") |
| response = input("\n Approve? (y/n/edit): ").strip().lower() |
|
|
| if response == "n": |
| return HumanInput(interrupt_type=interrupt_type, approved=False) |
| elif response.startswith("edit"): |
| new_action = input(" New action name: ").strip() |
| return HumanInput( |
| interrupt_type=InterruptType.EDIT_ACTION, |
| approved=True, |
| edited_action=Action(name=new_action, params={}, thought="Human override"), |
| ) |
| return HumanInput(interrupt_type=interrupt_type, approved=True) |
|
|
| elif interrupt_type == InterruptType.OVERRIDE_SCORE: |
| print(f"\n Current Φ_before: {context.get('phi_before', '?')}") |
| print(f" Current Φ_after: {context.get('phi_after', '?')}") |
| print(f" Current Δ: {context.get('delta', '?')}") |
| print(f" Evidence: {context.get('evidence', '')[:200]}") |
|
|
| override = input("\n Override Φ_after? (number or 'keep'): ").strip() |
| if override.lower() == "keep" or not override: |
| return HumanInput(interrupt_type=interrupt_type, approved=True) |
|
|
| try: |
| new_phi = float(override) |
| return HumanInput( |
| interrupt_type=interrupt_type, |
| approved=True, |
| override_score=PurposeScore( |
| phi_before=context.get("phi_before", 0), |
| phi_after=new_phi, |
| delta=new_phi - context.get("phi_before", 0), |
| reasoning="Human override", |
| evidence="Human judgment", |
| confidence=1.0, |
| ), |
| ) |
| except ValueError: |
| return HumanInput(interrupt_type=interrupt_type, approved=True) |
|
|
| elif interrupt_type == InterruptType.INJECT_HEURISTIC: |
| print("\n Teach the agent a new heuristic:") |
| pattern = input(" When (pattern): ").strip() |
| strategy = input(" Do (strategy): ").strip() |
|
|
| if pattern and strategy: |
| return HumanInput( |
| interrupt_type=interrupt_type, |
| approved=True, |
| injected_heuristic=Heuristic( |
| pattern=pattern, |
| strategy=strategy, |
| steps=[], |
| tier=MemoryTier.STRATEGIC, |
| q_value=1.0, |
| ), |
| ) |
| return HumanInput(interrupt_type=interrupt_type, approved=True) |
|
|
| else: |
| response = input("\n Continue? (y/n): ").strip().lower() |
| return HumanInput( |
| interrupt_type=interrupt_type, |
| approved=(response != "n"), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class HITLOrchestrator: |
| """ |
| Human-in-the-Loop Orchestrator. |
| |
| Wraps the base Orchestrator with interrupt points where humans can: |
| - Approve/reject actions before execution |
| - Override Φ scores (teaches the Purpose Function) |
| - Inject heuristics directly into memory |
| - Checkpoint and resume later |
| |
| The Φ override feature is unique to Purpose Agent: when a human corrects |
| a score, that correction flows through experience replay into the heuristic |
| library. The agent permanently learns the human's preference. |
| |
| Usage: |
| hitl = HITLOrchestrator( |
| orchestrator=orch, |
| input_handler=CLIInputHandler(), |
| approve_actions=True, # Require approval for each action |
| review_scores=True, # Let human override Φ scores |
| checkpoint_dir="./checkpoints", |
| ) |
| result = hitl.run_task(purpose="Do something important") |
| """ |
|
|
| def __init__( |
| self, |
| orchestrator: Orchestrator, |
| input_handler: HumanInputHandler | None = None, |
| approve_actions: bool = False, |
| review_scores: bool = False, |
| checkpoint_dir: str | None = None, |
| interrupt_every_n_steps: int = 0, |
| ): |
| self.orch = orchestrator |
| self.handler = input_handler or AutoApproveHandler() |
| self.approve_actions = approve_actions |
| self.review_scores = review_scores |
| self.checkpoint_dir = checkpoint_dir |
| self.interrupt_every_n = interrupt_every_n_steps |
| self._checkpoints: list[Checkpoint] = [] |
|
|
| def run_task( |
| self, |
| purpose: str, |
| initial_state: State | None = None, |
| max_steps: int = 20, |
| early_stop_phi: float = 9.0, |
| resume_from: str | Checkpoint | None = None, |
| ) -> TaskResult: |
| """ |
| Run a task with human-in-the-loop interrupts. |
| |
| If resume_from is provided, resumes from that checkpoint. |
| """ |
| |
| start_step = 0 |
| history = [] |
| trajectory = Trajectory(task_description=purpose, purpose=purpose) |
|
|
| if resume_from: |
| cp = resume_from if isinstance(resume_from, Checkpoint) else Checkpoint.load(resume_from) |
| current_state = State(data=cp.current_state, summary=cp.state_summary) |
| history = cp.history |
| start_step = cp.step_index |
| logger.info(f"Resuming from checkpoint at step {start_step}") |
| else: |
| current_state = initial_state or self.orch.environment.reset() |
|
|
| self.orch.purpose_fn.reset_trajectory_stats() |
|
|
| for step_idx in range(start_step, max_steps): |
| |
| if self.interrupt_every_n and step_idx > 0 and step_idx % self.interrupt_every_n == 0: |
| human = self.handler.request_input( |
| InterruptType.PAUSE, |
| {"step": step_idx, "state": current_state.describe()[:500]}, |
| ) |
| if not human.approved: |
| logger.info("Human aborted task") |
| break |
|
|
| |
| action = self.orch.actor.decide(purpose, current_state, history) |
|
|
| |
| if self.approve_actions: |
| human = self.handler.request_input( |
| InterruptType.APPROVE_ACTION, |
| { |
| "action_name": action.name, |
| "action_params": action.params, |
| "thought": action.thought, |
| "expected_delta": action.expected_delta, |
| "step": step_idx + 1, |
| }, |
| ) |
|
|
| if not human.approved: |
| logger.info(f"Human rejected action '{action.name}', skipping step") |
| continue |
|
|
| if human.edited_action: |
| action = human.edited_action |
| logger.info(f"Human edited action to '{action.name}'") |
|
|
| if action.name.upper() == "DONE": |
| break |
|
|
| |
| try: |
| new_state = self.orch.environment.execute(action, current_state) |
| except Exception as e: |
| new_state = State(data={**current_state.data, "_error": str(e)}) |
|
|
| |
| score = self.orch.purpose_fn.evaluate(current_state, action, new_state, purpose) |
|
|
| |
| if self.review_scores: |
| human = self.handler.request_input( |
| InterruptType.OVERRIDE_SCORE, |
| { |
| "phi_before": score.phi_before, |
| "phi_after": score.phi_after, |
| "delta": score.delta, |
| "evidence": score.evidence, |
| "confidence": score.confidence, |
| "state_before": current_state.describe()[:200], |
| "state_after": new_state.describe()[:200], |
| }, |
| ) |
|
|
| if human.override_score: |
| logger.info( |
| f"Human overrode score: Φ {score.phi_after:.1f} → {human.override_score.phi_after:.1f}" |
| ) |
| score = human.override_score |
|
|
| |
| step = TrajectoryStep( |
| state_before=current_state, action=action, state_after=new_state, |
| score=score, step_index=step_idx + 1, |
| ) |
| trajectory.steps.append(step) |
| history.append({ |
| "action": f"{action.name}({json.dumps(action.params, default=str)})", |
| "result": new_state.describe()[:200], |
| "score": f"Δ={score.delta:+.2f}", |
| }) |
|
|
| |
| if self.checkpoint_dir: |
| self._save_checkpoint(step_idx + 1, new_state, trajectory, history, purpose) |
|
|
| |
| if score.phi_after >= early_stop_phi: |
| break |
| if self.orch.environment.is_terminal(new_state): |
| break |
|
|
| current_state = new_state |
|
|
| |
| result = TaskResult(trajectory=trajectory, final_state=current_state) |
| self.orch.post_task(trajectory, []) |
| return result |
|
|
| def inject_heuristic(self, pattern: str, strategy: str, tier: str = "strategic") -> None: |
| """Directly inject a human-taught heuristic into the agent's memory.""" |
| h = Heuristic( |
| pattern=pattern, |
| strategy=strategy, |
| steps=[], |
| tier=MemoryTier(tier), |
| q_value=1.0, |
| ) |
| self.orch.optimizer.heuristic_library.append(h) |
| self.orch.sync_memory() |
| logger.info(f"Injected heuristic: '{pattern}' → '{strategy}'") |
|
|
| def _save_checkpoint( |
| self, step: int, state: State, trajectory: Trajectory, |
| history: list, purpose: str, |
| ) -> Checkpoint: |
| cp = Checkpoint( |
| step_index=step, |
| current_state=state.data, |
| state_summary=state.describe()[:500], |
| trajectory_steps=[ |
| { |
| "action": s.action.name, |
| "delta": s.score.delta if s.score else 0, |
| "phi_after": s.score.phi_after if s.score else 0, |
| } |
| for s in trajectory.steps |
| ], |
| purpose=purpose, |
| task_description=trajectory.task_description, |
| history=history, |
| heuristics=[ |
| {"pattern": h.pattern, "strategy": h.strategy, "tier": h.tier.value} |
| for h in self.orch.optimizer.heuristic_library |
| ], |
| checkpoint_id=f"cp_{step}_{int(time.time())}", |
| ) |
|
|
| if self.checkpoint_dir: |
| path = f"{self.checkpoint_dir}/{cp.checkpoint_id}.json" |
| cp.save(path) |
|
|
| self._checkpoints.append(cp) |
| return cp |
|
|
| def list_checkpoints(self) -> list[dict]: |
| """List all saved checkpoints.""" |
| if not self.checkpoint_dir: |
| return [{"step": cp.step_index, "id": cp.checkpoint_id} for cp in self._checkpoints] |
|
|
| checkpoints = [] |
| cp_dir = Path(self.checkpoint_dir) |
| if cp_dir.exists(): |
| for f in sorted(cp_dir.glob("cp_*.json")): |
| try: |
| with open(f) as fh: |
| data = json.load(fh) |
| checkpoints.append({ |
| "file": str(f), |
| "step": data.get("step_index"), |
| "id": data.get("checkpoint_id"), |
| "timestamp": data.get("timestamp"), |
| }) |
| except Exception: |
| pass |
| return checkpoints |
|
|