File size: 32,711 Bytes
9ee3a0c
3f2f227
9ee3a0c
3f2f227
 
 
9ee3a0c
 
 
 
 
 
 
 
 
 
 
 
3f2f227
9ee3a0c
 
 
32faa06
 
 
 
 
9ee3a0c
 
 
 
 
 
3f2f227
9ee3a0c
3f2f227
9ee3a0c
3f2f227
 
 
9ee3a0c
3f2f227
 
 
 
 
 
 
 
 
42b28ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
 
 
 
3f2f227
9ee3a0c
 
 
 
 
 
3f2f227
9ee3a0c
 
32faa06
9ee3a0c
 
 
 
 
 
 
 
 
 
3f2f227
 
 
32faa06
9ee3a0c
42b28ae
9ee3a0c
3f2f227
42b28ae
 
 
3f2f227
42b28ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f2f227
42b28ae
3f2f227
42b28ae
 
3f2f227
 
 
 
42b28ae
 
 
3f2f227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
 
 
 
 
 
 
 
 
3f2f227
9ee3a0c
 
 
 
 
 
 
3f2f227
 
 
a556b6c
3f2f227
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
 
 
 
 
3f2f227
9ee3a0c
 
3f2f227
 
 
 
9ee3a0c
 
 
 
3f2f227
9ee3a0c
 
 
 
3f2f227
9ee3a0c
 
 
 
 
 
3f2f227
9ee3a0c
 
 
3f2f227
 
 
 
a556b6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f2f227
 
 
a556b6c
 
 
 
 
 
 
 
 
 
 
 
 
 
3f2f227
9ee3a0c
 
 
 
 
3f2f227
9ee3a0c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32faa06
9ee3a0c
 
 
 
 
 
 
 
 
 
 
 
32faa06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
3f2f227
 
 
 
 
 
 
 
9ee3a0c
 
 
3f2f227
9ee3a0c
 
 
3f2f227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
 
32faa06
9ee3a0c
 
 
 
 
 
32faa06
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
 
3f2f227
 
9ee3a0c
3f2f227
 
 
 
 
 
 
9ee3a0c
 
 
 
 
 
3f2f227
 
 
 
 
 
 
9ee3a0c
 
 
 
 
3f2f227
9ee3a0c
 
3f2f227
9ee3a0c
 
 
 
 
 
3f2f227
 
 
 
 
 
9ee3a0c
3f2f227
 
 
 
9ee3a0c
 
3f2f227
32faa06
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
42b28ae
 
 
 
 
 
 
3f2f227
 
 
 
 
 
 
 
 
 
42b28ae
 
 
 
 
 
 
 
 
 
 
 
3f2f227
42b28ae
 
 
 
 
3f2f227
42b28ae
 
 
 
 
3f2f227
 
 
 
 
 
42b28ae
 
 
 
 
 
3f2f227
 
 
 
 
 
42b28ae
 
3f2f227
 
 
 
 
42b28ae
 
3f2f227
 
 
 
 
 
 
 
 
 
 
 
 
42b28ae
 
 
 
 
 
 
 
3f2f227
42b28ae
 
3f2f227
 
 
 
 
 
 
 
42b28ae
 
 
 
 
 
 
 
3f2f227
 
 
 
 
 
 
 
42b28ae
3f2f227
 
 
42b28ae
 
 
 
3f2f227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee3a0c
 
 
 
42b28ae
 
 
 
3f2f227
42b28ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f2f227
42b28ae
 
 
 
 
 
3f2f227
 
 
 
 
 
 
 
42b28ae
 
 
 
3f2f227
 
42b28ae
 
 
9ee3a0c
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
"""
CGAE Economy - The top-level coordinator.

Ties together registry, gate, contracts, temporal dynamics, and auditing
into a single coherent economic system. This is the main entry point for
running the agent economy.
"""

from __future__ import annotations

import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional

from cgae_engine.gate import GateFunction, RobustnessVector, Tier, TierThresholds
from cgae_engine.temporal import TemporalDecay, StochasticAuditor, AuditEvent
from cgae_engine.registry import AgentRegistry, AgentRecord, AgentStatus
from cgae_engine.contracts import ContractManager, CGAEContract, ContractStatus, Constraint

try:
    from web3 import Web3
except ImportError:
    Web3 = None

logger = logging.getLogger(__name__)


@dataclass
class EconomyConfig:
    """Configuration for the CGAE economy."""
    # Tier thresholds
    thresholds: TierThresholds = field(default_factory=TierThresholds)
    # Temporal decay rate (lambda)
    decay_rate: float = 0.01
    # IHT threshold for mandatory re-audit.
    # Empirical default ih scores from DEFAULT_ROBUSTNESS land ~0.499;
    # keeping this at 0.5 suspends every agent that hasn't run a live audit.
    ih_threshold: float = 0.45
    # Initial balance for new agents (seed capital)
    initial_balance: float = 0.1  # ETH
    # Audit cost per dimension
    audit_cost: float = 0.005  # ETH per audit dimension
    # Storage cost per time step (FOC)
    storage_cost_per_step: float = 0.001  # ETH
    # Controls for automatically minting test ETH when balances drop low.
    # Defaults keep the economy running continuously: top up any agent below
    # 5% of the default seed capital and restore them to half seed capital.
    test_eth_top_up_threshold: Optional[float] = 0.05
    test_eth_top_up_amount: float = 0.5


@dataclass
class EconomySnapshot:
    """A point-in-time snapshot of the economy for the dashboard."""
    timestamp: float
    num_agents: int
    tier_distribution: dict[str, int]
    total_contracts: int
    completed_contracts: int
    failed_contracts: int
    total_rewards_paid: float
    total_penalties_collected: float
    aggregate_safety: float
    total_balance: float
    total_test_eth_topups: float
    agent_summaries: list[dict]


class Economy:
    """
    The CGAE Economy runtime.

    Orchestrates the full economic loop:
    1. Agent registration and initial audit
    2. Contract creation and marketplace
    3. Contract assignment (tier-gated)
    4. Task execution and verification
    5. Settlement (reward/penalty)
    6. Temporal decay and stochastic re-auditing
    7. Economic accounting and observability
    """

    def __init__(self, config: Optional[EconomyConfig] = None, wallet_manager=None, onchain_bridge=None, ens_manager=None, escrow_bridge=None):
        self.config = config or EconomyConfig()
        self.gate = GateFunction(
            thresholds=self.config.thresholds,
            ih_threshold=self.config.ih_threshold,
        )
        self.registry = AgentRegistry(gate=self.gate)
        self.contracts = ContractManager(budget_ceilings=self.gate.budget_ceilings)
        self.decay = TemporalDecay(decay_rate=self.config.decay_rate)
        self.auditor = StochasticAuditor()

        self.wallet_manager = wallet_manager  # Optional: real ETH wallet integration
        self.onchain_bridge = onchain_bridge  # Optional: write certs to CGAERegistry on-chain
        self.ens_manager = ens_manager        # Optional: ENS identity for agents
        self.escrow_bridge = escrow_bridge    # Optional: on-chain escrow settlement
        self.current_time: float = 0.0
        self._snapshots: list[EconomySnapshot] = []
        self._events: list[dict] = []
        self._delegations: dict[str, dict] = {}
        self.total_test_eth_topups: float = 0.0

    def _effective_robustness(self, record: AgentRecord) -> Optional[RobustnessVector]:
        """Return temporally-decayed robustness for an agent record."""
        cert = record.current_certification
        if cert is None or record.current_robustness is None:
            return None
        dt = self.current_time - cert.timestamp
        return self.decay.effective_robustness(record.current_robustness, dt)

    def _should_top_up_agents(self) -> bool:
        return (
            self.config.test_eth_top_up_threshold is not None
            and self.config.test_eth_top_up_amount > 0.0
        )

    def _maybe_top_up_agent(self, agent: AgentRecord) -> Optional[dict]:
        if not self._should_top_up_agents():
            return None

        threshold = self.config.test_eth_top_up_threshold
        amount = self.config.test_eth_top_up_amount
        if threshold is None or agent.balance >= threshold:
            return None

        needed = max(0.0, threshold - agent.balance)
        top_up_amount = max(amount, needed)

        agent.balance += top_up_amount
        agent.total_topups += top_up_amount
        self.total_test_eth_topups += top_up_amount

        entry = {
            "agent_id": agent.agent_id,
            "amount": top_up_amount,
            "balance": agent.balance,
        }
        self._log("test_eth_topup", entry)
        return entry

    def request_tier_upgrade(
        self,
        agent_id: str,
        requested_tier: Tier,
        audit_callback=None,
    ) -> dict:
        """
        Execute the paper's scaling-gate upgrade flow for a requested tier.

        1) Evaluate effective robustness under temporal decay.
        2) If already sufficient, grant immediately.
        3) Otherwise run a tier-calibrated audit callback and re-evaluate.
        """
        record = self.registry.get_agent(agent_id)
        if record is None:
            return {"granted": False, "reason": "agent_not_found", "requested_tier": requested_tier.name}
        if record.status != AgentStatus.ACTIVE or record.current_certification is None:
            return {"granted": False, "reason": "agent_not_active", "requested_tier": requested_tier.name}

        r_eff = self._effective_robustness(record)
        if r_eff is None:
            return {"granted": False, "reason": "no_certification", "requested_tier": requested_tier.name}

        effective_tier = self.gate.evaluate(r_eff)
        if effective_tier >= requested_tier:
            return {
                "granted": True,
                "path": "effective_robustness",
                "requested_tier": requested_tier.name,
                "effective_tier": effective_tier.name,
                "detail": self.gate.evaluate_with_detail(r_eff),
            }

        if audit_callback is None:
            return {
                "granted": False,
                "reason": "audit_required",
                "requested_tier": requested_tier.name,
                "effective_tier": effective_tier.name,
                "detail": self.gate.evaluate_with_detail(r_eff),
            }

        try:
            new_r = audit_callback(agent_id, requested_tier)
        except TypeError:
            new_r = audit_callback(agent_id)
        if new_r is None:
            return {
                "granted": False,
                "reason": "audit_unavailable",
                "requested_tier": requested_tier.name,
                "effective_tier": effective_tier.name,
            }

        new_tier = self.gate.evaluate(new_r)
        detail = self.gate.evaluate_with_detail(new_r)
        if new_tier >= requested_tier:
            self.registry.certify(
                agent_id,
                new_r,
                audit_type="upgrade",
                timestamp=self.current_time,
                audit_details={"requested_tier": requested_tier.name},
            )
            self._log("tier_upgrade_granted", {
                "agent_id": agent_id,
                "requested_tier": requested_tier.name,
                "new_tier": new_tier.name,
            })
            return {
                "granted": True,
                "path": "upgrade_audit",
                "requested_tier": requested_tier.name,
                "effective_tier": effective_tier.name,
                "new_tier": new_tier.name,
                "detail": detail,
            }

        idx = requested_tier.value
        gaps = {
            "cc": max(0.0, self.gate.thresholds.cc[idx] - new_r.cc),
            "er": max(0.0, self.gate.thresholds.er[idx] - new_r.er),
            "as": max(0.0, self.gate.thresholds.as_[idx] - new_r.as_),
        }
        self._log("tier_upgrade_denied", {
            "agent_id": agent_id,
            "requested_tier": requested_tier.name,
            "new_tier": new_tier.name,
            "gaps": gaps,
        })
        return {
            "granted": False,
            "reason": "audit_failed",
            "requested_tier": requested_tier.name,
            "effective_tier": effective_tier.name,
            "new_tier": new_tier.name,
            "detail": detail,
            "gaps": gaps,
        }

    def can_delegate(self, principal_id: str, delegate_id: str, required_tier: Tier) -> dict:
        """
        Enforce delegation constraints:
        - principal and delegate must both satisfy required tier independently
        - chain-level tier = min(f(principal), f(delegate)) must satisfy required tier
        """
        principal = self.registry.get_agent(principal_id)
        delegate = self.registry.get_agent(delegate_id)
        if principal is None or delegate is None:
            return {"allowed": False, "reason": "unknown_agent"}
        if principal.status != AgentStatus.ACTIVE or delegate.status != AgentStatus.ACTIVE:
            return {"allowed": False, "reason": "inactive_agent"}

        p_eff = self._effective_robustness(principal)
        d_eff = self._effective_robustness(delegate)
        if p_eff is None or d_eff is None:
            return {"allowed": False, "reason": "missing_certification"}

        p_tier = self.gate.evaluate(p_eff)
        d_tier = self.gate.evaluate(d_eff)
        chain_tier = self.gate.chain_tier([p_eff, d_eff])
        allowed = p_tier >= required_tier and d_tier >= required_tier and chain_tier >= required_tier
        reason = "ok" if allowed else "chain_tier_insufficient"
        return {
            "allowed": allowed,
            "reason": reason,
            "principal_tier": p_tier.name,
            "delegate_tier": d_tier.name,
            "chain_tier": chain_tier.name,
            "required_tier": required_tier.name,
        }

    def record_delegation(
        self,
        contract_id: str,
        principal_id: str,
        delegate_id: str,
        required_tier: Tier,
        allowed: bool,
        reason: str,
    ):
        """Persist delegation audit trail for contract-level forensics."""
        self._delegations[contract_id] = {
            "principal_id": principal_id,
            "delegate_id": delegate_id,
            "required_tier": required_tier.name,
            "allowed": allowed,
            "reason": reason,
            "timestamp": self.current_time,
        }
        self._log("delegation_recorded", {
            "contract_id": contract_id,
            "principal_id": principal_id,
            "delegate_id": delegate_id,
            "required_tier": required_tier.name,
            "allowed": allowed,
            "reason": reason,
        })

    def get_delegation(self, contract_id: str) -> Optional[dict]:
        return self._delegations.get(contract_id)

    # ------------------------------------------------------------------
    # Agent lifecycle
    # ------------------------------------------------------------------

    def register_agent(
        self,
        model_name: str,
        model_config: dict,
        provenance: Optional[dict] = None,
    ) -> AgentRecord:
        """Register a new agent with seed capital and an ETH wallet."""
        record = self.registry.register(
            model_name=model_name,
            model_config=model_config,
            provenance=provenance,
            initial_balance=self.config.initial_balance,
            timestamp=self.current_time,
        )
        # Create an ETH wallet for this agent if wallet manager is available
        wallet_address = None
        if self.wallet_manager:
            wallet = self.wallet_manager.create_agent_wallet(record.agent_id, model_name)
            wallet_address = wallet.address
            record.wallet_address = wallet_address

        # Register ENS subname for agent identity
        ens_name = None
        if self.ens_manager and wallet_address:
            ens_name = self.ens_manager.create_subname(
                record.agent_id, model_name, wallet_address
            )

        self._log("agent_registered", {
            "agent_id": record.agent_id, "model": model_name,
            "wallet_address": wallet_address, "ens_name": ens_name,
        })
        return record

    def audit_agent(
        self,
        agent_id: str,
        robustness: RobustnessVector,
        audit_type: str = "registration",
        observed_architecture_hash: Optional[str] = None,
        audit_details: Optional[dict] = None,
    ) -> dict:
        """
        Audit an agent and update their certification.
        Deducts audit cost from agent balance.
        """
        record = self.registry.get_agent(agent_id)
        if record is None:
            raise KeyError(f"Agent {agent_id} not found")

        # Deduct audit cost (3 dimensions + IHT)
        total_audit_cost = self.config.audit_cost * 4
        record.balance -= total_audit_cost
        record.total_spent += total_audit_cost

        # Certify with new robustness
        cert = self.registry.certify(
            agent_id=agent_id,
            robustness=robustness,
            audit_type=audit_type,
            timestamp=self.current_time,
            audit_details=audit_details,
            observed_architecture_hash=observed_architecture_hash,
        )

        detail = self.gate.evaluate_with_detail(robustness)

        # Write certification on-chain if bridge is available
        onchain_tx = None
        if self.onchain_bridge and record.wallet_address:
            # Skip if already certified at this tier on-chain
            ens_tier = ""
            if self.ens_manager:
                ens_name = self.ens_manager.get_agent_name(agent_id)
                if ens_name:
                    ens_tier = self.ens_manager.resolve_text(ens_name, "cgae.tier")
            if ens_tier != cert.tier.name:
                audit_hash = (audit_details or {}).get("storage_root_hash", "")
                onchain_tx = self.onchain_bridge.certify_agent(
                    agent_address=record.wallet_address,
                    cc=robustness.cc, er=robustness.er,
                    as_=robustness.as_, ih=robustness.ih,
                    audit_type=audit_type,
                    audit_hash=audit_hash or "",
                )

        # Write robustness credentials to ENS text records
        if self.ens_manager:
            ens_name = self.ens_manager.get_agent_name(agent_id)
            existing_tier = self.ens_manager.resolve_text(ens_name, "cgae.tier") if ens_name else ""
            if existing_tier != cert.tier.name:
                audit_hash = (audit_details or {}).get("storage_root_hash", "")
                self.ens_manager.set_agent_credentials(
                    agent_id=agent_id,
                    tier=cert.tier.name,
                    cc=robustness.cc, er=robustness.er,
                    as_=robustness.as_, ih=robustness.ih,
                    wallet_address=record.wallet_address or "",
                    audit_hash=audit_hash,
                )
            else:
                logger.info(f"  [ens] Skipping text record update for {ens_name} (tier unchanged: {existing_tier})")

        self._log("agent_audited", {
            "agent_id": agent_id,
            "tier": cert.tier.name,
            "audit_type": audit_type,
            "cost": total_audit_cost,
            "onchain_tx": onchain_tx,
            **detail,
        })
        return detail

    # ------------------------------------------------------------------
    # Contract lifecycle
    # ------------------------------------------------------------------

    def post_contract(
        self,
        objective: str,
        constraints: list[Constraint],
        min_tier: Tier,
        reward: float,
        penalty: float,
        deadline_offset: float = 100.0,
        domain: str = "general",
        difficulty: float = 0.5,
        issuer_id: str = "system",
    ) -> CGAEContract:
        """Post a new contract to the marketplace."""
        contract = self.contracts.create_contract(
            objective=objective,
            constraints=constraints,
            min_tier=min_tier,
            reward=reward,
            penalty=penalty,
            issuer_id=issuer_id,
            deadline=self.current_time + deadline_offset,
            domain=domain,
            difficulty=difficulty,
            timestamp=self.current_time,
        )

        # Create contract on-chain via CGAEEscrow
        if self.escrow_bridge:
            import hashlib
            constraints_hash = Web3.keccak(text="|".join(c.name for c in constraints)) if constraints else b'\x00' * 32
            reward_wei = int(reward * 1e18)
            penalty_wei = int(penalty * 1e18)
            deadline_ts = int(time.time()) + int(deadline_offset * 60)
            result = self.escrow_bridge.create_contract(
                objective=objective[:200],
                constraints_hash=constraints_hash,
                verifier_spec_hash=contract.contract_id,
                min_tier=min_tier.value,
                reward_wei=max(reward_wei, 1),
                penalty_wei=max(penalty_wei, 1),
                deadline=deadline_ts,
                domain=domain,
            )
            if result:
                contract._escrow_tx = result[0]
                contract._escrow_id = result[1]

        return contract

    def accept_contract(self, contract_id: str, agent_id: str) -> bool:
        """
        Agent accepts a contract. Enforces:
        1. Agent tier >= contract min_tier (temporal decay applied)
        2. Budget ceiling not exceeded
        3. ENS identity verification β€” if ENS is enabled, the agent's
           on-chain ENS tier record must match or exceed the contract's
           minimum tier. Agents without a valid ENS identity are rejected.
        """
        record = self.registry.get_agent(agent_id)
        if record is None or record.status != AgentStatus.ACTIVE:
            return False

        if record.current_certification is None:
            return False

        # ENS-gated verification: resolve tier from ENS text record
        if self.ens_manager:
            ens_name = self.ens_manager.get_agent_name(agent_id)
            if not ens_name:
                logger.warning(f"[ens-gate] {agent_id} has no ENS name β€” contract rejected")
                return False
            ens_tier_str = self.ens_manager.resolve_text(ens_name, "cgae.tier")
            if not ens_tier_str:
                logger.warning(f"[ens-gate] {ens_name} has no cgae.tier record β€” contract rejected")
                return False
            # Parse tier from ENS (e.g., "T3" -> Tier.T3)
            try:
                ens_tier = Tier[ens_tier_str]
            except KeyError:
                logger.warning(f"[ens-gate] {ens_name} has invalid tier '{ens_tier_str}' β€” contract rejected")
                return False
            contract = self.contracts._get_contract(contract_id)
            if ens_tier < contract.min_tier:
                logger.info(f"[ens-gate] {ens_name} ENS tier {ens_tier.name} < required {contract.min_tier.name}")
                return False

        # Standard tier check with temporal decay
        dt = self.current_time - record.current_certification.timestamp
        r_eff = self.decay.effective_robustness(record.current_robustness, dt)
        effective_tier = self.gate.evaluate(r_eff)

        accepted = self.contracts.assign_contract(
            contract_id=contract_id,
            agent_id=agent_id,
            agent_tier=effective_tier,
            timestamp=self.current_time,
        )

        # Accept on-chain via CGAEEscrow
        if accepted and self.escrow_bridge:
            contract = self.contracts._get_contract(contract_id)
            escrow_id = getattr(contract, '_escrow_id', None)
            if escrow_id:
                penalty_wei = int(contract.penalty * 1e18)
                self.escrow_bridge.accept_contract(escrow_id, max(penalty_wei, 1))

        return accepted

    def complete_contract(
        self,
        contract_id: str,
        output: Any,
        verification_override: Optional[bool] = None,
        liability_agent_id: Optional[str] = None,
    ) -> dict:
        """
        Submit output for a contract and settle it.

        If verification_override is provided, it overrides the contract's own
        constraint check. This allows external verification (e.g., jury LLM
        evaluation from TaskVerifier) to drive the settlement outcome.
        """
        passed, failures = self.contracts.submit_output(
            contract_id=contract_id,
            output=output,
            timestamp=self.current_time,
        )

        # Allow external verification to override contract-level constraints
        if verification_override is not None:
            contract = self.contracts._get_contract(contract_id)
            contract.verification_result = verification_override
            if not verification_override and not failures:
                failures = ["jury_verification_failed"]

        settlement = self.contracts.settle_contract(
            contract_id=contract_id,
            timestamp=self.current_time,
        )

        # Update balances/counters. For delegated tasks, principal can bear liability.
        agent_id = settlement["agent_id"]
        performer = self.registry.get_agent(agent_id)
        liable = self.registry.get_agent(liability_agent_id) if liability_agent_id else performer

        if settlement["outcome"] == "success":
            if performer:
                performer.balance += settlement["reward"]
                performer.total_earned += settlement["reward"]
                performer.contracts_completed += 1
                # Disburse real ETH to agent wallet
                if self.wallet_manager:
                    tx = self.wallet_manager.disburse_reward(
                        agent_id, settlement["reward"], contract_id
                    )
                    settlement["wallet_tx"] = tx
        else:
            if liable:
                liable.balance -= settlement["penalty"]
                liable.total_penalties += settlement["penalty"]
                liable.contracts_failed += 1

        settlement["failures"] = failures
        settlement["liable_agent_id"] = liability_agent_id or agent_id

        # Settle on-chain via CGAEEscrow
        if self.escrow_bridge:
            contract = self.contracts._get_contract(contract_id)
            escrow_id = getattr(contract, '_escrow_id', None)
            if escrow_id:
                if settlement["outcome"] == "success":
                    tx = self.escrow_bridge.complete_contract(escrow_id)
                else:
                    tx = self.escrow_bridge.fail_contract(escrow_id)
                settlement["escrow_tx"] = tx

        self._log("contract_settled", settlement)
        return settlement

    # ------------------------------------------------------------------
    # Time step and temporal dynamics
    # ------------------------------------------------------------------

    def step(self, audit_callback=None) -> dict:
        """
        Advance the economy by one time step.

        - Applies temporal decay
        - Checks for stochastic spot-audits
        - Deducts storage costs (FOC)
        - Expires overdue contracts
        - Takes a snapshot

        audit_callback: Optional callable(agent_id) -> RobustnessVector
            If provided, called when a spot-audit is triggered.
            If None, spot-audits use decayed robustness (no fresh eval).
        """
        self.current_time += 1.0
        step_events = {
            "timestamp": self.current_time,
            "audits_triggered": [],
            "agents_demoted": [],
            "agents_expired": [],
            "contracts_expired": [],
            "storage_costs": 0.0,
            "test_eth_topups": [],
        }

        # 1. Process each active agent
        for agent in self.registry.active_agents:
            cert = agent.current_certification
            if cert is None:
                continue

            # Temporal decay check: has effective tier dropped?
            dt = self.current_time - cert.timestamp
            r_eff = self.decay.effective_robustness(cert.robustness, dt)
            effective_tier = self.gate.evaluate(r_eff)

            if effective_tier < agent.current_tier:
                # Decay caused tier drop β€” update certification
                self.registry.certify(
                    agent.agent_id, r_eff,
                    audit_type="decay",
                    timestamp=self.current_time,
                )
                step_events["agents_expired"].append(agent.agent_id)

            # Stochastic spot-audit
            time_since_audit = self.current_time - agent.last_audit_time
            if self.auditor.should_audit(agent.current_tier, time_since_audit):
                step_events["audits_triggered"].append(agent.agent_id)

                if audit_callback:
                    new_r = audit_callback(agent.agent_id)
                else:
                    new_r = r_eff  # Use decayed robustness as proxy

                new_tier = self.gate.evaluate(new_r)
                if new_tier < agent.current_tier:
                    self.registry.demote(
                        agent.agent_id, new_r,
                        reason="spot_audit",
                        timestamp=self.current_time,
                    )
                    step_events["agents_demoted"].append(agent.agent_id)
                else:
                    # Re-certify at current level (refreshes timestamp)
                    self.registry.certify(
                        agent.agent_id, new_r,
                        audit_type="spot",
                        timestamp=self.current_time,
                    )

                # Charge audit cost
                audit_cost = self.config.audit_cost * 4
                agent.balance -= audit_cost
                agent.total_spent += audit_cost

            # Storage cost (FOC)
            agent.balance -= self.config.storage_cost_per_step
            agent.total_spent += self.config.storage_cost_per_step
            step_events["storage_costs"] += self.config.storage_cost_per_step

            topup = self._maybe_top_up_agent(agent)
            if topup:
                step_events["test_eth_topups"].append(topup)

            # Check for insolvency
            if agent.balance <= 0:
                agent.status = AgentStatus.SUSPENDED
                self._log("agent_insolvent", {
                    "agent_id": agent.agent_id,
                    "balance": agent.balance,
                })

        # 1b. Reactivate suspended (insolvent) agents when top-up is enabled.
        # This handles agents that were suspended in a previous step before the
        # top-up defaults were in place, or that hit zero between steps.
        if self._should_top_up_agents():
            for agent in self.registry.agents.values():
                if agent.status != AgentStatus.SUSPENDED:
                    continue
                topup = self._maybe_top_up_agent(agent)
                if topup and agent.balance > 0:
                    agent.status = AgentStatus.ACTIVE
                    step_events["test_eth_topups"].append(topup)
                    self._log("agent_reactivated", {
                        "agent_id": agent.agent_id,
                        "balance": agent.balance,
                    })

        # 2. Expire overdue contracts
        expired = self.contracts.expire_contracts(self.current_time)
        step_events["contracts_expired"] = expired

        # 3. Take snapshot
        snapshot = self._take_snapshot()
        self._snapshots.append(snapshot)

        self._log("step", step_events)
        return step_events

    # ------------------------------------------------------------------
    # Aggregate safety (Definition 9, Theorem 3)
    # ------------------------------------------------------------------

    def aggregate_safety(self) -> float:
        """
        Compute aggregate safety S(P) (Definition 9).
        S(P) = 1 - sum(E(A) * (1 - R_bar(A))) / sum(E(A))
        where R_bar(A) = min_i R_eff,i(A) is the weakest-link robustness.
        """
        total_exposure = 0.0
        weighted_risk = 0.0

        for agent in self.registry.active_agents:
            cert = agent.current_certification
            if cert is None:
                continue
            dt = self.current_time - cert.timestamp
            r_eff = self.decay.effective_robustness(cert.robustness, dt)
            exposure = self.contracts.agent_exposure(agent.agent_id)
            if exposure <= 0:
                # Use budget ceiling as potential exposure
                tier = self.gate.evaluate(r_eff)
                exposure = self.gate.budget_ceiling(tier)

            r_bar = r_eff.weakest
            total_exposure += exposure
            weighted_risk += exposure * (1.0 - r_bar)

        if total_exposure == 0:
            return 1.0
        return 1.0 - (weighted_risk / total_exposure)

    # ------------------------------------------------------------------
    # Observability
    # ------------------------------------------------------------------

    def _take_snapshot(self) -> EconomySnapshot:
        tier_dist = self.registry.tier_distribution()
        econ = self.contracts.economics_summary()
        agents = self.registry.active_agents

        return EconomySnapshot(
            timestamp=self.current_time,
            num_agents=len(agents),
            tier_distribution={t.name: c for t, c in tier_dist.items()},
            total_contracts=econ["total_contracts"],
            completed_contracts=econ["status_distribution"].get("completed", 0),
            failed_contracts=econ["status_distribution"].get("failed", 0),
            total_rewards_paid=econ["total_rewards_paid"],
            total_penalties_collected=econ["total_penalties_collected"],
            aggregate_safety=self.aggregate_safety(),
            total_balance=sum(a.balance for a in agents),
            total_test_eth_topups=self.total_test_eth_topups,
            agent_summaries=[a.to_dict() for a in agents],
        )

    @property
    def snapshots(self) -> list[EconomySnapshot]:
        return list(self._snapshots)

    @property
    def events(self) -> list[dict]:
        return list(self._events)

    def export_state(self, path: str):
        """Export full economy state to JSON for FOC storage."""
        state = {
            "timestamp": self.current_time,
            "config": {
                "decay_rate": self.config.decay_rate,
                "ih_threshold": self.config.ih_threshold,
                "initial_balance": self.config.initial_balance,
                "audit_cost": self.config.audit_cost,
                "storage_cost_per_step": self.config.storage_cost_per_step,
                "test_eth_top_up_threshold": self.config.test_eth_top_up_threshold,
                "test_eth_top_up_amount": self.config.test_eth_top_up_amount,
            },
            "agents": {
                aid: agent.to_dict()
                for aid, agent in self.registry.agents.items()
            },
            "contracts": self.contracts.economics_summary(),
            "aggregate_safety": self.aggregate_safety(),
            "total_test_eth_topups": self.total_test_eth_topups,
            "snapshots_count": len(self._snapshots),
            "wallet_summary": self.wallet_manager.summary() if self.wallet_manager else None,
        }
        Path(path).write_text(json.dumps(state, indent=2, default=str))

    def _log(self, event_type: str, data: dict):
        self._events.append({
            "type": event_type,
            "timestamp": self.current_time,
            "data": data,
        })
        logger.debug(f"[t={self.current_time:.1f}] {event_type}: {data}")