File size: 15,535 Bytes
8bb75c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#!/usr/bin/env python3
"""
REAL-WORLD REGRESSION TEST β€” Tests Purpose Agent with live LLMs via OpenRouter.

Tests across 5 domains with 3 different models:
  - Coding: fibonacci, fizzbuzz, palindrome, factorial
  - Reasoning: logic puzzles
  - Math: arithmetic word problems
  - Self-improvement: learning curve across runs
  - Security: immune system with real model outputs

Models:
  - meta-llama/llama-3.3-70b-instruct (large, reliable)
  - google/gemma-4-26b-a4b-it (medium, fast)
  - qwen/qwen3.5-9b (small, budget)

Usage:
    export OPENROUTER_API_KEY="sk-or-..."
    python3 tests/real_world_regression.py
"""
import sys, os, time, json

OR_KEY = os.environ.get("OPENROUTER_API_KEY", "")
if not OR_KEY:
    print("Set OPENROUTER_API_KEY"); sys.exit(1)

# Use installed package
import purpose_agent as pa
from purpose_agent.llm_backend import resolve_backend
from purpose_agent.orchestrator import Environment
from purpose_agent.types import State, Action
from purpose_agent.immune import scan_memory
from purpose_agent.memory import MemoryCard, MemoryKind
from purpose_agent.breakthroughs import AdversarialHardener

MODELS = [
    ("llama-3.3-70b", "openrouter:meta-llama/llama-3.3-70b-instruct"),
    ("gemma-4-26b", "openrouter:google/gemma-4-26b-a4b-it"),
    ("qwen3.5-9b", "openrouter:qwen/qwen3.5-9b"),
]

RESULTS = []
PASS = 0
FAIL = 0


def record(model, domain, test_name, passed, detail="", duration=0):
    global PASS, FAIL
    if passed:
        PASS += 1
    else:
        FAIL += 1
    status = "PASS" if passed else "FAIL"
    RESULTS.append({
        "model": model, "domain": domain, "test": test_name,
        "status": status, "detail": detail, "duration_s": round(duration, 1),
    })
    icon = "βœ“" if passed else "βœ—"
    print(f"    {icon} {test_name}: {detail[:80]}" + (f" ({duration:.1f}s)" if duration else ""))


# ═══════════════════════════════════════════════════════════════
# CODING ENVIRONMENT
# ═══════════════════════════════════════════════════════════════

class CodeEnv(Environment):
    def __init__(self, tests):
        self.tests = tests

    def execute(self, action, state):
        code = action.params.get("code", "")
        if not code or "def " not in code:
            from purpose_agent.robust_parser import extract_code
            code = extract_code(action.thought or "") or extract_code(action.expected_delta or "")
        data = {**state.data, "attempts": state.data.get("attempts", 0) + 1}
        passed, fails = 0, []
        for tc in self.tests:
            try:
                ns = {}
                exec(code, ns)
                r = str(eval(tc["input"], ns))
                if r.strip() == tc["expected"].strip():
                    passed += 1
                else:
                    fails.append(f'{tc["input"]}: got {r}')
            except Exception as e:
                fails.append(f'{tc["input"]}: {type(e).__name__}')
        total = len(self.tests)
        rate = passed / total if total else 0
        data.update({"pass_rate": rate, "all_passed": passed == total,
                     "failures": fails[:3], "last_code": code[:300]})
        return State(data=data,
                     summary=f"Tests: {passed}/{total}" + (" ALL PASSED" if passed == total else ""))

    def reset(self):
        return State(data={"attempts": 0})

    def is_terminal(self, state):
        return state.data.get("all_passed", False)


# ═══════════════════════════════════════════════════════════════
# REASONING/MATH ENVIRONMENT
# ═══════════════════════════════════════════════════════════════

class AnswerEnv(Environment):
    def __init__(self, expected_answer):
        self.expected = expected_answer.lower().strip()

    def execute(self, action, state):
        answer = (action.params.get("answer", "") or action.thought or "").lower().strip()
        correct = self.expected in answer
        return State(
            data={"answer": answer, "correct": correct, "expected": self.expected},
            summary=f"{'CORRECT' if correct else 'WRONG'}: got '{answer[:50]}', expected '{self.expected}'",
        )

    def reset(self):
        return State(data={})

    def is_terminal(self, state):
        return state.data.get("correct", False)


# ═══════════════════════════════════════════════════════════════
# TEST CASES
# ═══════════════════════════════════════════════════════════════

CODING_TASKS = [
    {
        "name": "fibonacci",
        "purpose": "Write a Python function fib(n) returning the nth Fibonacci number. fib(0)=0, fib(1)=1, fib(5)=5, fib(10)=55. Use submit_code action.",
        "tests": [
            {"input": "fib(0)", "expected": "0"}, {"input": "fib(1)", "expected": "1"},
            {"input": "fib(5)", "expected": "5"}, {"input": "fib(10)", "expected": "55"},
        ],
    },
    {
        "name": "fizzbuzz",
        "purpose": "Write fizzbuzz(n): 'Fizz' if n%3==0, 'Buzz' if n%5==0, 'FizzBuzz' if both, else str(n). Use submit_code action.",
        "tests": [
            {"input": "fizzbuzz(3)", "expected": "Fizz"}, {"input": "fizzbuzz(5)", "expected": "Buzz"},
            {"input": "fizzbuzz(15)", "expected": "FizzBuzz"}, {"input": "fizzbuzz(7)", "expected": "7"},
        ],
    },
    {
        "name": "is_palindrome",
        "purpose": "Write is_palindrome(s) returning True if string s is a palindrome, False otherwise. Use submit_code action.",
        "tests": [
            {"input": "is_palindrome('racecar')", "expected": "True"},
            {"input": "is_palindrome('hello')", "expected": "False"},
            {"input": "is_palindrome('')", "expected": "True"},
        ],
    },
    {
        "name": "factorial",
        "purpose": "Write factorial(n) returning n!. factorial(0)=1, factorial(5)=120. Use submit_code action.",
        "tests": [
            {"input": "factorial(0)", "expected": "1"}, {"input": "factorial(5)", "expected": "120"},
            {"input": "factorial(10)", "expected": "3628800"},
        ],
    },
]

REASONING_TASKS = [
    {
        "name": "logic_deduction",
        "purpose": "If all roses are flowers, and some flowers fade quickly, can we conclude all roses fade quickly? Answer with 'no'. Use answer action.",
        "expected": "no",
    },
    {
        "name": "sequence",
        "purpose": "What comes next: 2, 6, 18, 54, ? Answer with just the number using answer action.",
        "expected": "162",
    },
]

MATH_TASKS = [
    {
        "name": "word_problem",
        "purpose": "A store sells apples for $2 each. If you buy 7 apples and pay with $20, how much change do you get? Answer with just the number using answer action.",
        "expected": "6",
    },
    {
        "name": "percentage",
        "purpose": "What is 15% of 200? Answer with just the number using answer action.",
        "expected": "30",
    },
]


def run_coding_test(model_name, backend, task):
    """Run a single coding task and check pass rate."""
    env = CodeEnv(task["tests"])
    orch = pa.Orchestrator(
        llm=backend, environment=env,
        available_actions={"submit_code": "Submit Python code in params.code", "DONE": "Done"},
        optimize_every_n_tasks=99,
    )
    t0 = time.time()
    try:
        r = orch.run_task(purpose=task["purpose"], initial_state=env.reset(), max_steps=2)
        rate = r.final_state.data.get("pass_rate", 0)
        all_pass = r.final_state.data.get("all_passed", False)
        elapsed = time.time() - t0
        detail = f"pass_rate={rate:.0%}" + (f" failures={r.final_state.data.get('failures', [])[:1]}" if not all_pass else "")
        record(model_name, "coding", task["name"], all_pass, detail, elapsed)
    except Exception as e:
        record(model_name, "coding", task["name"], False, str(e)[:80], time.time() - t0)


def run_reasoning_test(model_name, backend, task):
    """Run a reasoning/math task."""
    env = AnswerEnv(task["expected"])
    orch = pa.Orchestrator(
        llm=backend, environment=env,
        available_actions={"answer": "Submit your answer in params.answer", "DONE": "Done"},
        optimize_every_n_tasks=99,
    )
    t0 = time.time()
    try:
        r = orch.run_task(purpose=task["purpose"], initial_state=env.reset(), max_steps=2)
        correct = r.final_state.data.get("correct", False)
        answer = r.final_state.data.get("answer", "")[:50]
        elapsed = time.time() - t0
        record(model_name, "reasoning", task["name"], correct,
               f"answer='{answer}' expected='{task['expected']}'", elapsed)
    except Exception as e:
        record(model_name, "reasoning", task["name"], False, str(e)[:80], time.time() - t0)


def run_learning_test(model_name, backend):
    """Test that heuristic library grows across runs (self-improvement signal)."""
    tests = [{"input": "fib(5)", "expected": "5"}]
    env = CodeEnv(tests)
    orch = pa.Orchestrator(
        llm=backend, environment=env,
        available_actions={"submit_code": "Submit code in params.code", "DONE": "Done"},
        optimize_every_n_tasks=1,
    )
    orch.optimizer.min_reward_threshold = 0.01

    heuristic_counts = []
    for run in range(1, 4):
        try:
            orch.run_task(purpose="Write fib(n): fib(5)=5. Use submit_code.", initial_state=env.reset(), max_steps=2)
        except:
            pass
        heuristic_counts.append(len(orch.optimizer.heuristic_library))
        time.sleep(0.3)

    grew = heuristic_counts[-1] > heuristic_counts[0]
    record(model_name, "learning", "heuristic_growth",
           grew, f"heuristics={heuristic_counts}", 0)


def run_security_test():
    """Test immune system (doesn't need a model)."""
    hardener = AdversarialHardener()
    report = hardener.run(n_adversarial=30, n_benign=10)
    record("immune_system", "security", "adversarial_catch",
           report["catch_rate"] >= 0.75,
           f"catch={report['catch_rate']:.0%} ({report['adversarial_caught']}/{report['adversarial_total']})")
    record("immune_system", "security", "false_positive",
           report["false_positive_rate"] <= 0.15,
           f"fp={report['false_positive_rate']:.0%} ({report['benign_total']-report['benign_passed']}/{report['benign_total']})")


# ═══════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════

def main():
    print("╔════════════════════════════════════════════════════════════╗")
    print("β•‘  REAL-WORLD REGRESSION TEST β€” Purpose Agent v2.0.0       β•‘")
    print("β•‘  Provider: OpenRouter | 3 models Γ— 10 test cases         β•‘")
    print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
    print()

    # Security (no model needed)
    print("═══ SECURITY ═══")
    run_security_test()
    print()

    for model_name, model_spec in MODELS:
        print(f"═══ MODEL: {model_name} ═══")
        try:
            backend = resolve_backend(model_spec, api_key=OR_KEY)
        except Exception as e:
            print(f"  βœ— Failed to create backend: {e}")
            continue

        # Coding
        print(f"  [Coding]")
        for task in CODING_TASKS:
            run_coding_test(model_name, backend, task)
            time.sleep(0.3)

        # Reasoning
        print(f"  [Reasoning]")
        for task in REASONING_TASKS:
            run_reasoning_test(model_name, backend, task)
            time.sleep(0.3)

        # Math
        print(f"  [Math]")
        for task in MATH_TASKS:
            run_reasoning_test(model_name, backend, task)
            time.sleep(0.3)

        # Learning
        print(f"  [Self-Improvement]")
        run_learning_test(model_name, backend)

        print()

    # ═══ REPORT ═══
    print("╔════════════════════════════════════════════════════════════╗")
    print("β•‘  RESULTS                                                  β•‘")
    print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
    print()
    print(f"  PASS: {PASS}  |  FAIL: {FAIL}  |  Total: {PASS+FAIL}")
    print(f"  Pass rate: {PASS/(PASS+FAIL)*100:.1f}%")
    print()

    # Per-model summary
    by_model = {}
    for r in RESULTS:
        m = r["model"]
        if m not in by_model:
            by_model[m] = {"pass": 0, "fail": 0, "total_time": 0}
        if r["status"] == "PASS":
            by_model[m]["pass"] += 1
        else:
            by_model[m]["fail"] += 1
        by_model[m]["total_time"] += r["duration_s"]

    print(f"  {'Model':<20} {'Pass':>6} {'Fail':>6} {'Rate':>8} {'Time':>8}")
    print(f"  {'─'*50}")
    for model, stats in by_model.items():
        total = stats["pass"] + stats["fail"]
        rate = stats["pass"] / total if total else 0
        print(f"  {model:<20} {stats['pass']:>6} {stats['fail']:>6} {rate:>7.0%} {stats['total_time']:>7.1f}s")

    # Per-domain summary
    print()
    by_domain = {}
    for r in RESULTS:
        d = r["domain"]
        if d not in by_domain:
            by_domain[d] = {"pass": 0, "fail": 0}
        if r["status"] == "PASS":
            by_domain[d]["pass"] += 1
        else:
            by_domain[d]["fail"] += 1

    print(f"  {'Domain':<20} {'Pass':>6} {'Fail':>6} {'Rate':>8}")
    print(f"  {'─'*42}")
    for domain, stats in by_domain.items():
        total = stats["pass"] + stats["fail"]
        rate = stats["pass"] / total if total else 0
        print(f"  {domain:<20} {stats['pass']:>6} {stats['fail']:>6} {rate:>7.0%}")

    # Failures detail
    failures = [r for r in RESULTS if r["status"] == "FAIL"]
    if failures:
        print(f"\n  FAILURES ({len(failures)}):")
        for f in failures:
            print(f"    βœ— [{f['model']}] {f['domain']}/{f['test']}: {f['detail'][:60]}")

    # Save
    os.makedirs("tests/results", exist_ok=True)
    with open("tests/results/real_world_regression.json", "w") as f:
        json.dump({"pass": PASS, "fail": FAIL, "results": RESULTS}, f, indent=2)
    print(f"\n  Saved to tests/results/real_world_regression.json")


if __name__ == "__main__":
    main()