File size: 18,204 Bytes
44d1ab6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
#!/usr/bin/env python3
"""
End-to-end demo: Self-improving agent solving a simulated maze-search task.

This demo shows:
1. The full Actor β†’ Purpose Function β†’ Experience Replay β†’ Optimizer loop
2. How the agent improves across multiple task attempts
3. The 3-tier memory system in action
4. Anti-reward-hacking safeguards
5. Q-value experience retrieval

No real LLM calls β€” uses MockLLMBackend with deterministic behavior
so you can see the architecture working end-to-end.
"""

import json
import logging
import sys
from copy import deepcopy

# Add the parent directory to path
sys.path.insert(0, "/app")

from purpose_agent import (
    Action,
    Heuristic,
    MockLLMBackend,
    State,
    PurposeScore,
    MemoryRecord,
)
from purpose_agent.types import MemoryTier
from purpose_agent.orchestrator import (
    Environment,
    Orchestrator,
    SimpleEnvironment,
    TaskResult,
)

# ---------------------------------------------------------------------------
# Configure logging
# ---------------------------------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
    datefmt="%H:%M:%S",
)
logger = logging.getLogger("demo")


# ---------------------------------------------------------------------------
# Simulated Environment: Treasure Hunt Maze
# ---------------------------------------------------------------------------

class TreasureMaze(Environment):
    """
    A simple grid-based maze where the agent must find a treasure.
    
    Grid is 5x5. Agent starts at (0,0). Treasure is at (4,4).
    Actions: move_north, move_south, move_east, move_west, search, pick_up
    
    The agent gets closer to the goal by moving toward (4,4) and then
    picking up the treasure when at the right location.
    """

    TREASURE_POS = (4, 4)
    GRID_SIZE = 5

    def execute(self, action: Action, current_state: State) -> State:
        data = deepcopy(current_state.data)
        pos = data.get("position", [0, 0])
        inventory = data.get("inventory", [])
        moves = data.get("moves", 0)

        x, y = pos

        if action.name == "move_north" and y < self.GRID_SIZE - 1:
            y += 1
        elif action.name == "move_south" and y > 0:
            y -= 1
        elif action.name == "move_east" and x < self.GRID_SIZE - 1:
            x += 1
        elif action.name == "move_west" and x > 0:
            x -= 1
        elif action.name == "search":
            if (x, y) == self.TREASURE_POS and "treasure_found" not in data:
                data["treasure_found"] = True
        elif action.name == "pick_up":
            if data.get("treasure_found") and "treasure" not in inventory:
                inventory.append("treasure")
                data["task_complete"] = True

        data["position"] = [x, y]
        data["inventory"] = inventory
        data["moves"] = moves + 1

        # Compute distance to treasure for summary
        dist = abs(x - self.TREASURE_POS[0]) + abs(y - self.TREASURE_POS[1])

        summary = (
            f"Position: ({x}, {y}), Distance to treasure: {dist}, "
            f"Inventory: {inventory}, Treasure found: {data.get('treasure_found', False)}, "
            f"Moves: {data['moves']}"
        )

        return State(data=data, summary=summary)

    def reset(self) -> State:
        data = {
            "position": [0, 0],
            "inventory": [],
            "moves": 0,
        }
        return State(
            data=data,
            summary="Position: (0, 0), Distance to treasure: 8, Inventory: [], Moves: 0",
        )

    def is_terminal(self, state: State) -> bool:
        return state.data.get("task_complete", False)


# ---------------------------------------------------------------------------
# Mock LLM: Deterministic Agent Behavior for Testing
# ---------------------------------------------------------------------------

def create_mock_llm() -> MockLLMBackend:
    """
    Create a mock LLM that simulates reasonable agent behavior.
    
    The mock has three modes:
    1. Actor mode: Follows a simple heuristic (move toward treasure)
    2. Critic mode: Scores based on distance delta
    3. Optimizer mode: Returns canned heuristics
    """
    mock = MockLLMBackend()

    # Track call count for the actor to cycle through actions
    state = {"step": 0, "task_num": 0}

    # Optimal path: right right right right up up up up search pick_up
    OPTIMAL_PATH = [
        "move_east", "move_east", "move_east", "move_east",
        "move_north", "move_north", "move_north", "move_north",
        "search", "pick_up",
    ]

    # Sub-optimal path (first attempt β€” agent hasn't learned yet)
    NAIVE_PATH = [
        "move_north", "move_east", "move_north", "move_east",
        "move_north", "move_east", "move_north", "move_east",
        "search", "pick_up",
    ]

    def actor_handler(messages):
        """Simulate actor deciding actions."""
        step = state["step"]
        task = state["task_num"]

        # First task: use naive path; later tasks: use optimal path (learned!)
        path = NAIVE_PATH if task == 0 else OPTIMAL_PATH

        if step < len(path):
            action_name = path[step]
        else:
            action_name = "DONE"

        state["step"] += 1

        return json.dumps({
            "thought": f"Step {step + 1}: I should {action_name} to get closer to the treasure.",
            "action": {"name": action_name, "params": {}},
            "expected_delta": f"Position will change after {action_name}",
        })

    def critic_handler(messages):
        """Simulate the Purpose Function scoring transitions."""
        full_text = " ".join(m.content for m in messages)

        # Extract distances from the state descriptions
        import re
        distances = re.findall(r'Distance to treasure: (\d+)', full_text)

        if len(distances) >= 2:
            dist_before = int(distances[0])
            dist_after = int(distances[1])
        elif len(distances) == 1:
            dist_before = int(distances[0])
            dist_after = dist_before
        else:
            dist_before = 8
            dist_after = 8

        # Convert distance to Ξ¦ score (0-10 scale, closer = higher)
        max_dist = 8  # Manhattan distance from (0,0) to (4,4)
        phi_before = 10.0 * (1 - dist_before / max_dist)
        phi_after = 10.0 * (1 - dist_after / max_dist)

        # Check for treasure found / picked up
        if "treasure_found: True" in full_text.lower() or "treasure found: true" in full_text.lower():
            phi_after = max(phi_after, 8.5)
        if "'treasure'" in full_text or '"treasure"' in full_text:
            if "inventory" in full_text.lower():
                phi_after = max(phi_after, 10.0)
        if "task_complete" in full_text:
            phi_after = 10.0

        return json.dumps({
            "phi_before": round(phi_before, 1),
            "phi_after": round(phi_after, 1),
            "reasoning": (
                f"Distance changed from {dist_before} to {dist_after}. "
                f"{'Moved closer to treasure.' if dist_after < dist_before else 'No net progress.'}"
            ),
            "evidence": f"Position distance: {dist_before} β†’ {dist_after}",
            "confidence": 0.9,
        })

    def optimizer_handler(messages):
        """Simulate the optimizer extracting heuristics."""
        return json.dumps({
            "heuristics": [
                {
                    "tier": "strategic",
                    "pattern": "When navigating a grid toward a {target}",
                    "strategy": "Move along one axis first (e.g., all east), then the other (all north). This is more efficient than zigzagging diagonally.",
                },
                {
                    "tier": "procedural",
                    "pattern": "To reach position ({target_x}, {target_y}) from ({start_x}, {start_y})",
                    "strategy": "Follow the axis-first approach",
                    "steps": [
                        "Move east/west until x matches target_x",
                        "Move north/south until y matches target_y",
                        "Search at the target location",
                        "Pick up any found items",
                    ],
                },
                {
                    "tier": "tool",
                    "pattern": "When using action search",
                    "strategy": "Only use 'search' when at the exact target coordinates. Searching elsewhere wastes a move.",
                },
            ]
        })

    # Register handlers based on keywords in the prompt
    mock.register_handler("STATE EVALUATOR", critic_handler)  # Purpose Function
    mock.register_handler("HEURISTIC EXTRACTOR", optimizer_handler)  # Optimizer
    mock.register_handler("HEURISTIC DEDUPLICATOR", optimizer_handler)  # Merge
    mock.register_handler("goal-directed agent", actor_handler)  # Actor

    # Structured output default for Purpose Function
    mock.set_structured_default({
        "phi_before": 5.0,
        "phi_after": 6.0,
        "reasoning": "Default structured output",
        "evidence": "State data changed",
        "confidence": 0.7,
    })

    return mock, state


# ---------------------------------------------------------------------------
# Demo Runner
# ---------------------------------------------------------------------------

def run_demo():
    print("=" * 70)
    print("  PURPOSE AGENT β€” Self-Improving Framework Demo")
    print("  Simulated: Treasure Hunt in a 5Γ—5 Grid")
    print("=" * 70)
    print()

    # Create mock LLM and environment
    mock_llm, llm_state = create_mock_llm()
    env = TreasureMaze()

    # Create orchestrator
    orch = Orchestrator(
        llm=mock_llm,
        environment=env,
        available_actions={
            "move_north": "Move one cell north (y+1)",
            "move_south": "Move one cell south (y-1)",
            "move_east": "Move one cell east (x+1)",
            "move_west": "Move one cell west (x-1)",
            "search": "Search current cell for items",
            "pick_up": "Pick up a found item",
            "DONE": "Signal task completion",
        },
        optimize_every_n_tasks=1,  # Optimize after every task
        persistence_dir="/app/demo_data",
    )

    # ─── Task 1: Naive attempt (no learned heuristics) ─────────────────
    print("\n" + "─" * 70)
    print("  TASK 1: First attempt (naive β€” no learned heuristics)")
    print("─" * 70)

    llm_state["step"] = 0
    llm_state["task_num"] = 0

    result1 = orch.run_task(
        purpose="Find and collect the treasure hidden at position (4,4) in the maze",
        initial_state=env.reset(),
        max_steps=15,
    )

    print(f"\nπŸ“Š Task 1 Result:\n{result1.summary()}")

    # ─── Check what the agent learned ──────────────────────────────────
    print("\n" + "─" * 70)
    print("  LEARNED HEURISTICS (after Task 1)")
    print("─" * 70)
    print(orch.get_heuristic_report())

    # ─── Task 2: Improved attempt (with learned heuristics) ────────────
    print("\n" + "─" * 70)
    print("  TASK 2: Second attempt (with learned heuristics)")
    print("─" * 70)

    llm_state["step"] = 0
    llm_state["task_num"] = 1  # Switch to optimal path

    result2 = orch.run_task(
        purpose="Find and collect the treasure hidden at position (4,4) in the maze",
        initial_state=env.reset(),
        max_steps=15,
    )

    print(f"\nπŸ“Š Task 2 Result:\n{result2.summary()}")

    # ─── Compare performance ───────────────────────────────────────────
    print("\n" + "=" * 70)
    print("  PERFORMANCE COMPARISON")
    print("=" * 70)
    print(f"\n  {'Metric':<30} {'Task 1':>10} {'Task 2':>10} {'Ξ”':>10}")
    print(f"  {'─' * 60}")
    print(f"  {'Steps taken':<30} {result1.total_steps:>10} {result2.total_steps:>10} "
          f"{result2.total_steps - result1.total_steps:>+10}")
    print(f"  {'Cumulative reward':<30} {result1.cumulative_reward:>10.2f} {result2.cumulative_reward:>10.2f} "
          f"{result2.cumulative_reward - result1.cumulative_reward:>+10.2f}")
    print(f"  {'Success rate':<30} {result1.trajectory.success_rate:>10.1%} {result2.trajectory.success_rate:>10.1%} "
          f"{result2.trajectory.success_rate - result1.trajectory.success_rate:>+10.1%}")
    phi1 = result1.final_phi or 0
    phi2 = result2.final_phi or 0
    print(f"  {'Final Ξ¦':<30} {phi1:>10.1f} {phi2:>10.1f} {phi2 - phi1:>+10.1f}")
    print(f"  {'Task success':<30} {'βœ“' if result1.success else 'βœ—':>10} {'βœ“' if result2.success else 'βœ—':>10}")

    # ─── Framework stats ──────────────────────────────────────────────
    print(f"\n  Framework Stats: {json.dumps(orch.stats, indent=4)}")

    # ─── Experience Replay stats ──────────────────────────────────────
    print(f"\n  Experience Replay: {json.dumps(orch.experience_replay.stats, indent=4)}")

    print("\n" + "=" * 70)
    print("  Demo complete! The agent improved from Task 1 β†’ Task 2")
    print("  by learning heuristics from its first experience.")
    print("=" * 70)

    return result1, result2


# ---------------------------------------------------------------------------
# Unit Tests
# ---------------------------------------------------------------------------

def run_tests():
    """Quick unit tests for each module."""
    print("\n" + "=" * 70)
    print("  UNIT TESTS")
    print("=" * 70)

    tests_passed = 0
    tests_total = 0

    def check(name, condition):
        nonlocal tests_passed, tests_total
        tests_total += 1
        if condition:
            tests_passed += 1
            print(f"  βœ“ {name}")
        else:
            print(f"  βœ— {name}")

    # Test 1: State
    s = State(data={"x": 1, "y": 2}, summary="Test state")
    check("State.describe() returns summary", "Test state" in s.describe())
    check("State.id is unique", len(s.id) == 12)

    # Test 2: Action
    a = Action(name="move", params={"dir": "north"}, thought="go north", expected_delta="y+1")
    check("Action fields", a.name == "move" and a.thought == "go north")

    # Test 3: PurposeScore
    ps = PurposeScore(phi_before=3.0, phi_after=5.0, delta=2.0,
                       reasoning="improved", evidence="x changed", confidence=0.9)
    check("PurposeScore.improved", ps.improved)
    check("PurposeScore.delta", ps.delta == 2.0)

    # Test 4: Heuristic Q-value update
    h = Heuristic(pattern="test", strategy="test", steps=[], tier=MemoryTier.STRATEGIC, q_value=0.5)
    h.update_q_value(1.0, alpha=0.1)
    check("Heuristic Q-value update (reward=1.0)", 0.54 < h.q_value < 0.66)
    h.update_q_value(0.0, alpha=0.1)
    check("Heuristic Q-value update (reward=0.0)", 0.45 < h.q_value < 0.60)

    # Test 5: Experience Replay
    from purpose_agent.experience_replay import ExperienceReplay
    from purpose_agent.types import Trajectory, TrajectoryStep

    er = ExperienceReplay(capacity=10)

    traj = Trajectory(task_description="test task", purpose="test purpose")
    traj.steps.append(TrajectoryStep(
        state_before=State(data={"x": 0}),
        action=Action(name="move"),
        state_after=State(data={"x": 1}),
        score=PurposeScore(phi_before=1.0, phi_after=3.0, delta=2.0,
                           reasoning="good", evidence="x: 0β†’1", confidence=0.8),
    ))
    record = er.add(traj)
    check("ExperienceReplay.add", er.size == 1)
    check("ExperienceReplay.retrieve", len(er.retrieve("test task")) == 1)

    # Test Q-value update
    old_q = record.retrieval_q_value
    er.update_q_value(record.id, reward=1.0)
    check("ExperienceReplay Q-value update", record.retrieval_q_value > old_q)

    # Test 6: Mock LLM
    from purpose_agent.llm_backend import ChatMessage
    mock = MockLLMBackend()
    mock.register_handler("hello", "world")
    result = mock.generate([ChatMessage(role="user", content="hello")])
    check("MockLLM keyword handler", result == "world")

    result = mock.generate([ChatMessage(role="user", content="unknown")])
    check("MockLLM default response", "MockLLM" in result)

    # Test 7: Purpose Function safeguards
    from purpose_agent.purpose_function import PurposeFunction
    mock2 = MockLLMBackend()
    mock2.set_structured_default({
        "phi_before": 3.0,
        "phi_after": 5.0,
        "reasoning": "The state improved because of the action",
        "evidence": "Position changed from (0,0) to (1,0), reducing distance by 1",
        "confidence": 0.85,
    })
    pf = PurposeFunction(llm=mock2)
    score = pf.evaluate(
        state_before=State(data={"pos": [0, 0]}),
        action=Action(name="move_east"),
        state_after=State(data={"pos": [1, 0]}),
        purpose="Reach position (4,4)",
    )
    check("PurposeFunction returns PurposeScore", score.delta == 2.0)
    check("PurposeFunction evidence check", len(score.evidence) > 0)

    # Test 8: Environment
    maze = TreasureMaze()
    s0 = maze.reset()
    check("Environment.reset", s0.data["position"] == [0, 0])
    s1 = maze.execute(Action(name="move_east"), s0)
    check("Environment.execute move_east", s1.data["position"] == [1, 0])
    check("Environment not terminal at start", not maze.is_terminal(s1))

    print(f"\n  Results: {tests_passed}/{tests_total} tests passed")
    return tests_passed == tests_total


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    # Run tests first
    all_passed = run_tests()

    if not all_passed:
        print("\n⚠ Some tests failed β€” check output above")
        sys.exit(1)

    # Run demo
    run_demo()