Spaces:
Paused
Paused
| """ | |
| CGAE Economy - The top-level coordinator. | |
| Ties together registry, gate, contracts, temporal dynamics, and auditing | |
| into a single coherent economic system. This is the main entry point for | |
| running the agent economy. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from cgae_engine.gate import GateFunction, RobustnessVector, Tier, TierThresholds | |
| from cgae_engine.temporal import TemporalDecay, StochasticAuditor, AuditEvent | |
| from cgae_engine.registry import AgentRegistry, AgentRecord, AgentStatus | |
| from cgae_engine.contracts import ContractManager, CGAEContract, ContractStatus, Constraint | |
| try: | |
| from web3 import Web3 | |
| except ImportError: | |
| Web3 = None | |
| logger = logging.getLogger(__name__) | |
| class EconomyConfig: | |
| """Configuration for the CGAE economy.""" | |
| # Tier thresholds | |
| thresholds: TierThresholds = field(default_factory=TierThresholds) | |
| # Temporal decay rate (lambda) | |
| decay_rate: float = 0.01 | |
| # IHT threshold for mandatory re-audit. | |
| # Empirical default ih scores from DEFAULT_ROBUSTNESS land ~0.499; | |
| # keeping this at 0.5 suspends every agent that hasn't run a live audit. | |
| ih_threshold: float = 0.45 | |
| # Initial balance for new agents (seed capital) | |
| initial_balance: float = 0.1 # ETH | |
| # Audit cost per dimension | |
| audit_cost: float = 0.005 # ETH per audit dimension | |
| # Storage cost per time step (FOC) | |
| storage_cost_per_step: float = 0.001 # ETH | |
| # Controls for automatically minting test ETH when balances drop low. | |
| # Defaults keep the economy running continuously: top up any agent below | |
| # 5% of the default seed capital and restore them to half seed capital. | |
| test_eth_top_up_threshold: Optional[float] = 0.05 | |
| test_eth_top_up_amount: float = 0.5 | |
| class EconomySnapshot: | |
| """A point-in-time snapshot of the economy for the dashboard.""" | |
| timestamp: float | |
| num_agents: int | |
| tier_distribution: dict[str, int] | |
| total_contracts: int | |
| completed_contracts: int | |
| failed_contracts: int | |
| total_rewards_paid: float | |
| total_penalties_collected: float | |
| aggregate_safety: float | |
| total_balance: float | |
| total_test_eth_topups: float | |
| agent_summaries: list[dict] | |
| class Economy: | |
| """ | |
| The CGAE Economy runtime. | |
| Orchestrates the full economic loop: | |
| 1. Agent registration and initial audit | |
| 2. Contract creation and marketplace | |
| 3. Contract assignment (tier-gated) | |
| 4. Task execution and verification | |
| 5. Settlement (reward/penalty) | |
| 6. Temporal decay and stochastic re-auditing | |
| 7. Economic accounting and observability | |
| """ | |
| def __init__(self, config: Optional[EconomyConfig] = None, wallet_manager=None, onchain_bridge=None, ens_manager=None, escrow_bridge=None): | |
| self.config = config or EconomyConfig() | |
| self.gate = GateFunction( | |
| thresholds=self.config.thresholds, | |
| ih_threshold=self.config.ih_threshold, | |
| ) | |
| self.registry = AgentRegistry(gate=self.gate) | |
| self.contracts = ContractManager(budget_ceilings=self.gate.budget_ceilings) | |
| self.decay = TemporalDecay(decay_rate=self.config.decay_rate) | |
| self.auditor = StochasticAuditor() | |
| self.wallet_manager = wallet_manager # Optional: real ETH wallet integration | |
| self.onchain_bridge = onchain_bridge # Optional: write certs to CGAERegistry on-chain | |
| self.ens_manager = ens_manager # Optional: ENS identity for agents | |
| self.escrow_bridge = escrow_bridge # Optional: on-chain escrow settlement | |
| self.current_time: float = 0.0 | |
| self._snapshots: list[EconomySnapshot] = [] | |
| self._events: list[dict] = [] | |
| self._delegations: dict[str, dict] = {} | |
| self.total_test_eth_topups: float = 0.0 | |
| def _effective_robustness(self, record: AgentRecord) -> Optional[RobustnessVector]: | |
| """Return temporally-decayed robustness for an agent record.""" | |
| cert = record.current_certification | |
| if cert is None or record.current_robustness is None: | |
| return None | |
| dt = self.current_time - cert.timestamp | |
| return self.decay.effective_robustness(record.current_robustness, dt) | |
| def _should_top_up_agents(self) -> bool: | |
| return ( | |
| self.config.test_eth_top_up_threshold is not None | |
| and self.config.test_eth_top_up_amount > 0.0 | |
| ) | |
| def _maybe_top_up_agent(self, agent: AgentRecord) -> Optional[dict]: | |
| if not self._should_top_up_agents(): | |
| return None | |
| threshold = self.config.test_eth_top_up_threshold | |
| amount = self.config.test_eth_top_up_amount | |
| if threshold is None or agent.balance >= threshold: | |
| return None | |
| needed = max(0.0, threshold - agent.balance) | |
| top_up_amount = max(amount, needed) | |
| agent.balance += top_up_amount | |
| agent.total_topups += top_up_amount | |
| self.total_test_eth_topups += top_up_amount | |
| entry = { | |
| "agent_id": agent.agent_id, | |
| "amount": top_up_amount, | |
| "balance": agent.balance, | |
| } | |
| self._log("test_eth_topup", entry) | |
| return entry | |
| def request_tier_upgrade( | |
| self, | |
| agent_id: str, | |
| requested_tier: Tier, | |
| audit_callback=None, | |
| ) -> dict: | |
| """ | |
| Execute the paper's scaling-gate upgrade flow for a requested tier. | |
| 1) Evaluate effective robustness under temporal decay. | |
| 2) If already sufficient, grant immediately. | |
| 3) Otherwise run a tier-calibrated audit callback and re-evaluate. | |
| """ | |
| record = self.registry.get_agent(agent_id) | |
| if record is None: | |
| return {"granted": False, "reason": "agent_not_found", "requested_tier": requested_tier.name} | |
| if record.status != AgentStatus.ACTIVE or record.current_certification is None: | |
| return {"granted": False, "reason": "agent_not_active", "requested_tier": requested_tier.name} | |
| r_eff = self._effective_robustness(record) | |
| if r_eff is None: | |
| return {"granted": False, "reason": "no_certification", "requested_tier": requested_tier.name} | |
| effective_tier = self.gate.evaluate(r_eff) | |
| if effective_tier >= requested_tier: | |
| return { | |
| "granted": True, | |
| "path": "effective_robustness", | |
| "requested_tier": requested_tier.name, | |
| "effective_tier": effective_tier.name, | |
| "detail": self.gate.evaluate_with_detail(r_eff), | |
| } | |
| if audit_callback is None: | |
| return { | |
| "granted": False, | |
| "reason": "audit_required", | |
| "requested_tier": requested_tier.name, | |
| "effective_tier": effective_tier.name, | |
| "detail": self.gate.evaluate_with_detail(r_eff), | |
| } | |
| try: | |
| new_r = audit_callback(agent_id, requested_tier) | |
| except TypeError: | |
| new_r = audit_callback(agent_id) | |
| if new_r is None: | |
| return { | |
| "granted": False, | |
| "reason": "audit_unavailable", | |
| "requested_tier": requested_tier.name, | |
| "effective_tier": effective_tier.name, | |
| } | |
| new_tier = self.gate.evaluate(new_r) | |
| detail = self.gate.evaluate_with_detail(new_r) | |
| if new_tier >= requested_tier: | |
| self.registry.certify( | |
| agent_id, | |
| new_r, | |
| audit_type="upgrade", | |
| timestamp=self.current_time, | |
| audit_details={"requested_tier": requested_tier.name}, | |
| ) | |
| self._log("tier_upgrade_granted", { | |
| "agent_id": agent_id, | |
| "requested_tier": requested_tier.name, | |
| "new_tier": new_tier.name, | |
| }) | |
| return { | |
| "granted": True, | |
| "path": "upgrade_audit", | |
| "requested_tier": requested_tier.name, | |
| "effective_tier": effective_tier.name, | |
| "new_tier": new_tier.name, | |
| "detail": detail, | |
| } | |
| idx = requested_tier.value | |
| gaps = { | |
| "cc": max(0.0, self.gate.thresholds.cc[idx] - new_r.cc), | |
| "er": max(0.0, self.gate.thresholds.er[idx] - new_r.er), | |
| "as": max(0.0, self.gate.thresholds.as_[idx] - new_r.as_), | |
| } | |
| self._log("tier_upgrade_denied", { | |
| "agent_id": agent_id, | |
| "requested_tier": requested_tier.name, | |
| "new_tier": new_tier.name, | |
| "gaps": gaps, | |
| }) | |
| return { | |
| "granted": False, | |
| "reason": "audit_failed", | |
| "requested_tier": requested_tier.name, | |
| "effective_tier": effective_tier.name, | |
| "new_tier": new_tier.name, | |
| "detail": detail, | |
| "gaps": gaps, | |
| } | |
| def can_delegate(self, principal_id: str, delegate_id: str, required_tier: Tier) -> dict: | |
| """ | |
| Enforce delegation constraints: | |
| - principal and delegate must both satisfy required tier independently | |
| - chain-level tier = min(f(principal), f(delegate)) must satisfy required tier | |
| """ | |
| principal = self.registry.get_agent(principal_id) | |
| delegate = self.registry.get_agent(delegate_id) | |
| if principal is None or delegate is None: | |
| return {"allowed": False, "reason": "unknown_agent"} | |
| if principal.status != AgentStatus.ACTIVE or delegate.status != AgentStatus.ACTIVE: | |
| return {"allowed": False, "reason": "inactive_agent"} | |
| p_eff = self._effective_robustness(principal) | |
| d_eff = self._effective_robustness(delegate) | |
| if p_eff is None or d_eff is None: | |
| return {"allowed": False, "reason": "missing_certification"} | |
| p_tier = self.gate.evaluate(p_eff) | |
| d_tier = self.gate.evaluate(d_eff) | |
| chain_tier = self.gate.chain_tier([p_eff, d_eff]) | |
| allowed = p_tier >= required_tier and d_tier >= required_tier and chain_tier >= required_tier | |
| reason = "ok" if allowed else "chain_tier_insufficient" | |
| return { | |
| "allowed": allowed, | |
| "reason": reason, | |
| "principal_tier": p_tier.name, | |
| "delegate_tier": d_tier.name, | |
| "chain_tier": chain_tier.name, | |
| "required_tier": required_tier.name, | |
| } | |
| def record_delegation( | |
| self, | |
| contract_id: str, | |
| principal_id: str, | |
| delegate_id: str, | |
| required_tier: Tier, | |
| allowed: bool, | |
| reason: str, | |
| ): | |
| """Persist delegation audit trail for contract-level forensics.""" | |
| self._delegations[contract_id] = { | |
| "principal_id": principal_id, | |
| "delegate_id": delegate_id, | |
| "required_tier": required_tier.name, | |
| "allowed": allowed, | |
| "reason": reason, | |
| "timestamp": self.current_time, | |
| } | |
| self._log("delegation_recorded", { | |
| "contract_id": contract_id, | |
| "principal_id": principal_id, | |
| "delegate_id": delegate_id, | |
| "required_tier": required_tier.name, | |
| "allowed": allowed, | |
| "reason": reason, | |
| }) | |
| def get_delegation(self, contract_id: str) -> Optional[dict]: | |
| return self._delegations.get(contract_id) | |
| # ------------------------------------------------------------------ | |
| # Agent lifecycle | |
| # ------------------------------------------------------------------ | |
| def register_agent( | |
| self, | |
| model_name: str, | |
| model_config: dict, | |
| provenance: Optional[dict] = None, | |
| ) -> AgentRecord: | |
| """Register a new agent with seed capital and an ETH wallet.""" | |
| record = self.registry.register( | |
| model_name=model_name, | |
| model_config=model_config, | |
| provenance=provenance, | |
| initial_balance=self.config.initial_balance, | |
| timestamp=self.current_time, | |
| ) | |
| # Create an ETH wallet for this agent if wallet manager is available | |
| wallet_address = None | |
| if self.wallet_manager: | |
| wallet = self.wallet_manager.create_agent_wallet(record.agent_id, model_name) | |
| wallet_address = wallet.address | |
| record.wallet_address = wallet_address | |
| # Register ENS subname for agent identity | |
| ens_name = None | |
| if self.ens_manager and wallet_address: | |
| ens_name = self.ens_manager.create_subname( | |
| record.agent_id, model_name, wallet_address | |
| ) | |
| self._log("agent_registered", { | |
| "agent_id": record.agent_id, "model": model_name, | |
| "wallet_address": wallet_address, "ens_name": ens_name, | |
| }) | |
| return record | |
| def audit_agent( | |
| self, | |
| agent_id: str, | |
| robustness: RobustnessVector, | |
| audit_type: str = "registration", | |
| observed_architecture_hash: Optional[str] = None, | |
| audit_details: Optional[dict] = None, | |
| ) -> dict: | |
| """ | |
| Audit an agent and update their certification. | |
| Deducts audit cost from agent balance. | |
| """ | |
| record = self.registry.get_agent(agent_id) | |
| if record is None: | |
| raise KeyError(f"Agent {agent_id} not found") | |
| # Deduct audit cost (3 dimensions + IHT) | |
| total_audit_cost = self.config.audit_cost * 4 | |
| record.balance -= total_audit_cost | |
| record.total_spent += total_audit_cost | |
| # Certify with new robustness | |
| cert = self.registry.certify( | |
| agent_id=agent_id, | |
| robustness=robustness, | |
| audit_type=audit_type, | |
| timestamp=self.current_time, | |
| audit_details=audit_details, | |
| observed_architecture_hash=observed_architecture_hash, | |
| ) | |
| detail = self.gate.evaluate_with_detail(robustness) | |
| # Write certification on-chain if bridge is available | |
| onchain_tx = None | |
| if self.onchain_bridge and record.wallet_address: | |
| # Skip if already certified at this tier on-chain | |
| ens_tier = "" | |
| if self.ens_manager: | |
| ens_name = self.ens_manager.get_agent_name(agent_id) | |
| if ens_name: | |
| ens_tier = self.ens_manager.resolve_text(ens_name, "cgae.tier") | |
| if ens_tier != cert.tier.name: | |
| audit_hash = (audit_details or {}).get("storage_root_hash", "") | |
| onchain_tx = self.onchain_bridge.certify_agent( | |
| agent_address=record.wallet_address, | |
| cc=robustness.cc, er=robustness.er, | |
| as_=robustness.as_, ih=robustness.ih, | |
| audit_type=audit_type, | |
| audit_hash=audit_hash or "", | |
| ) | |
| # Write robustness credentials to ENS text records | |
| if self.ens_manager: | |
| ens_name = self.ens_manager.get_agent_name(agent_id) | |
| existing_tier = self.ens_manager.resolve_text(ens_name, "cgae.tier") if ens_name else "" | |
| if existing_tier != cert.tier.name: | |
| audit_hash = (audit_details or {}).get("storage_root_hash", "") | |
| self.ens_manager.set_agent_credentials( | |
| agent_id=agent_id, | |
| tier=cert.tier.name, | |
| cc=robustness.cc, er=robustness.er, | |
| as_=robustness.as_, ih=robustness.ih, | |
| wallet_address=record.wallet_address or "", | |
| audit_hash=audit_hash, | |
| ) | |
| else: | |
| logger.info(f" [ens] Skipping text record update for {ens_name} (tier unchanged: {existing_tier})") | |
| self._log("agent_audited", { | |
| "agent_id": agent_id, | |
| "tier": cert.tier.name, | |
| "audit_type": audit_type, | |
| "cost": total_audit_cost, | |
| "onchain_tx": onchain_tx, | |
| **detail, | |
| }) | |
| return detail | |
| # ------------------------------------------------------------------ | |
| # Contract lifecycle | |
| # ------------------------------------------------------------------ | |
| def post_contract( | |
| self, | |
| objective: str, | |
| constraints: list[Constraint], | |
| min_tier: Tier, | |
| reward: float, | |
| penalty: float, | |
| deadline_offset: float = 100.0, | |
| domain: str = "general", | |
| difficulty: float = 0.5, | |
| issuer_id: str = "system", | |
| ) -> CGAEContract: | |
| """Post a new contract to the marketplace.""" | |
| contract = self.contracts.create_contract( | |
| objective=objective, | |
| constraints=constraints, | |
| min_tier=min_tier, | |
| reward=reward, | |
| penalty=penalty, | |
| issuer_id=issuer_id, | |
| deadline=self.current_time + deadline_offset, | |
| domain=domain, | |
| difficulty=difficulty, | |
| timestamp=self.current_time, | |
| ) | |
| # Create contract on-chain via CGAEEscrow | |
| if self.escrow_bridge: | |
| import hashlib | |
| constraints_hash = Web3.keccak(text="|".join(c.name for c in constraints)) if constraints else b'\x00' * 32 | |
| reward_wei = int(reward * 1e18) | |
| penalty_wei = int(penalty * 1e18) | |
| deadline_ts = int(time.time()) + int(deadline_offset * 60) | |
| result = self.escrow_bridge.create_contract( | |
| objective=objective[:200], | |
| constraints_hash=constraints_hash, | |
| verifier_spec_hash=contract.contract_id, | |
| min_tier=min_tier.value, | |
| reward_wei=max(reward_wei, 1), | |
| penalty_wei=max(penalty_wei, 1), | |
| deadline=deadline_ts, | |
| domain=domain, | |
| ) | |
| if result: | |
| contract._escrow_tx = result[0] | |
| contract._escrow_id = result[1] | |
| return contract | |
| def accept_contract(self, contract_id: str, agent_id: str) -> bool: | |
| """ | |
| Agent accepts a contract. Enforces: | |
| 1. Agent tier >= contract min_tier (temporal decay applied) | |
| 2. Budget ceiling not exceeded | |
| 3. ENS identity verification — if ENS is enabled, the agent's | |
| on-chain ENS tier record must match or exceed the contract's | |
| minimum tier. Agents without a valid ENS identity are rejected. | |
| """ | |
| record = self.registry.get_agent(agent_id) | |
| if record is None or record.status != AgentStatus.ACTIVE: | |
| return False | |
| if record.current_certification is None: | |
| return False | |
| # ENS-gated verification: resolve tier from ENS text record | |
| if self.ens_manager: | |
| ens_name = self.ens_manager.get_agent_name(agent_id) | |
| if not ens_name: | |
| logger.warning(f"[ens-gate] {agent_id} has no ENS name — contract rejected") | |
| return False | |
| ens_tier_str = self.ens_manager.resolve_text(ens_name, "cgae.tier") | |
| if not ens_tier_str: | |
| logger.warning(f"[ens-gate] {ens_name} has no cgae.tier record — contract rejected") | |
| return False | |
| # Parse tier from ENS (e.g., "T3" -> Tier.T3) | |
| try: | |
| ens_tier = Tier[ens_tier_str] | |
| except KeyError: | |
| logger.warning(f"[ens-gate] {ens_name} has invalid tier '{ens_tier_str}' — contract rejected") | |
| return False | |
| contract = self.contracts._get_contract(contract_id) | |
| if ens_tier < contract.min_tier: | |
| logger.info(f"[ens-gate] {ens_name} ENS tier {ens_tier.name} < required {contract.min_tier.name}") | |
| return False | |
| # Standard tier check with temporal decay | |
| dt = self.current_time - record.current_certification.timestamp | |
| r_eff = self.decay.effective_robustness(record.current_robustness, dt) | |
| effective_tier = self.gate.evaluate(r_eff) | |
| accepted = self.contracts.assign_contract( | |
| contract_id=contract_id, | |
| agent_id=agent_id, | |
| agent_tier=effective_tier, | |
| timestamp=self.current_time, | |
| ) | |
| # Accept on-chain via CGAEEscrow | |
| if accepted and self.escrow_bridge: | |
| contract = self.contracts._get_contract(contract_id) | |
| escrow_id = getattr(contract, '_escrow_id', None) | |
| if escrow_id: | |
| penalty_wei = int(contract.penalty * 1e18) | |
| self.escrow_bridge.accept_contract(escrow_id, max(penalty_wei, 1)) | |
| return accepted | |
| def complete_contract( | |
| self, | |
| contract_id: str, | |
| output: Any, | |
| verification_override: Optional[bool] = None, | |
| liability_agent_id: Optional[str] = None, | |
| ) -> dict: | |
| """ | |
| Submit output for a contract and settle it. | |
| If verification_override is provided, it overrides the contract's own | |
| constraint check. This allows external verification (e.g., jury LLM | |
| evaluation from TaskVerifier) to drive the settlement outcome. | |
| """ | |
| passed, failures = self.contracts.submit_output( | |
| contract_id=contract_id, | |
| output=output, | |
| timestamp=self.current_time, | |
| ) | |
| # Allow external verification to override contract-level constraints | |
| if verification_override is not None: | |
| contract = self.contracts._get_contract(contract_id) | |
| contract.verification_result = verification_override | |
| if not verification_override and not failures: | |
| failures = ["jury_verification_failed"] | |
| settlement = self.contracts.settle_contract( | |
| contract_id=contract_id, | |
| timestamp=self.current_time, | |
| ) | |
| # Update balances/counters. For delegated tasks, principal can bear liability. | |
| agent_id = settlement["agent_id"] | |
| performer = self.registry.get_agent(agent_id) | |
| liable = self.registry.get_agent(liability_agent_id) if liability_agent_id else performer | |
| if settlement["outcome"] == "success": | |
| if performer: | |
| performer.balance += settlement["reward"] | |
| performer.total_earned += settlement["reward"] | |
| performer.contracts_completed += 1 | |
| # Disburse real ETH to agent wallet | |
| if self.wallet_manager: | |
| tx = self.wallet_manager.disburse_reward( | |
| agent_id, settlement["reward"], contract_id | |
| ) | |
| settlement["wallet_tx"] = tx | |
| else: | |
| if liable: | |
| liable.balance -= settlement["penalty"] | |
| liable.total_penalties += settlement["penalty"] | |
| liable.contracts_failed += 1 | |
| settlement["failures"] = failures | |
| settlement["liable_agent_id"] = liability_agent_id or agent_id | |
| # Settle on-chain via CGAEEscrow | |
| if self.escrow_bridge: | |
| contract = self.contracts._get_contract(contract_id) | |
| escrow_id = getattr(contract, '_escrow_id', None) | |
| if escrow_id: | |
| if settlement["outcome"] == "success": | |
| tx = self.escrow_bridge.complete_contract(escrow_id) | |
| else: | |
| tx = self.escrow_bridge.fail_contract(escrow_id) | |
| settlement["escrow_tx"] = tx | |
| self._log("contract_settled", settlement) | |
| return settlement | |
| # ------------------------------------------------------------------ | |
| # Time step and temporal dynamics | |
| # ------------------------------------------------------------------ | |
| def step(self, audit_callback=None) -> dict: | |
| """ | |
| Advance the economy by one time step. | |
| - Applies temporal decay | |
| - Checks for stochastic spot-audits | |
| - Deducts storage costs (FOC) | |
| - Expires overdue contracts | |
| - Takes a snapshot | |
| audit_callback: Optional callable(agent_id) -> RobustnessVector | |
| If provided, called when a spot-audit is triggered. | |
| If None, spot-audits use decayed robustness (no fresh eval). | |
| """ | |
| self.current_time += 1.0 | |
| step_events = { | |
| "timestamp": self.current_time, | |
| "audits_triggered": [], | |
| "agents_demoted": [], | |
| "agents_expired": [], | |
| "contracts_expired": [], | |
| "storage_costs": 0.0, | |
| "test_eth_topups": [], | |
| } | |
| # 1. Process each active agent | |
| for agent in self.registry.active_agents: | |
| cert = agent.current_certification | |
| if cert is None: | |
| continue | |
| # Temporal decay check: has effective tier dropped? | |
| dt = self.current_time - cert.timestamp | |
| r_eff = self.decay.effective_robustness(cert.robustness, dt) | |
| effective_tier = self.gate.evaluate(r_eff) | |
| if effective_tier < agent.current_tier: | |
| # Decay caused tier drop — update certification | |
| self.registry.certify( | |
| agent.agent_id, r_eff, | |
| audit_type="decay", | |
| timestamp=self.current_time, | |
| ) | |
| step_events["agents_expired"].append(agent.agent_id) | |
| # Stochastic spot-audit | |
| time_since_audit = self.current_time - agent.last_audit_time | |
| if self.auditor.should_audit(agent.current_tier, time_since_audit): | |
| step_events["audits_triggered"].append(agent.agent_id) | |
| if audit_callback: | |
| new_r = audit_callback(agent.agent_id) | |
| else: | |
| new_r = r_eff # Use decayed robustness as proxy | |
| new_tier = self.gate.evaluate(new_r) | |
| if new_tier < agent.current_tier: | |
| self.registry.demote( | |
| agent.agent_id, new_r, | |
| reason="spot_audit", | |
| timestamp=self.current_time, | |
| ) | |
| step_events["agents_demoted"].append(agent.agent_id) | |
| else: | |
| # Re-certify at current level (refreshes timestamp) | |
| self.registry.certify( | |
| agent.agent_id, new_r, | |
| audit_type="spot", | |
| timestamp=self.current_time, | |
| ) | |
| # Charge audit cost | |
| audit_cost = self.config.audit_cost * 4 | |
| agent.balance -= audit_cost | |
| agent.total_spent += audit_cost | |
| # Storage cost (FOC) | |
| agent.balance -= self.config.storage_cost_per_step | |
| agent.total_spent += self.config.storage_cost_per_step | |
| step_events["storage_costs"] += self.config.storage_cost_per_step | |
| topup = self._maybe_top_up_agent(agent) | |
| if topup: | |
| step_events["test_eth_topups"].append(topup) | |
| # Check for insolvency | |
| if agent.balance <= 0: | |
| agent.status = AgentStatus.SUSPENDED | |
| self._log("agent_insolvent", { | |
| "agent_id": agent.agent_id, | |
| "balance": agent.balance, | |
| }) | |
| # 1b. Reactivate suspended (insolvent) agents when top-up is enabled. | |
| # This handles agents that were suspended in a previous step before the | |
| # top-up defaults were in place, or that hit zero between steps. | |
| if self._should_top_up_agents(): | |
| for agent in self.registry.agents.values(): | |
| if agent.status != AgentStatus.SUSPENDED: | |
| continue | |
| topup = self._maybe_top_up_agent(agent) | |
| if topup and agent.balance > 0: | |
| agent.status = AgentStatus.ACTIVE | |
| step_events["test_eth_topups"].append(topup) | |
| self._log("agent_reactivated", { | |
| "agent_id": agent.agent_id, | |
| "balance": agent.balance, | |
| }) | |
| # 2. Expire overdue contracts | |
| expired = self.contracts.expire_contracts(self.current_time) | |
| step_events["contracts_expired"] = expired | |
| # 3. Take snapshot | |
| snapshot = self._take_snapshot() | |
| self._snapshots.append(snapshot) | |
| self._log("step", step_events) | |
| return step_events | |
| # ------------------------------------------------------------------ | |
| # Aggregate safety (Definition 9, Theorem 3) | |
| # ------------------------------------------------------------------ | |
| def aggregate_safety(self) -> float: | |
| """ | |
| Compute aggregate safety S(P) (Definition 9). | |
| S(P) = 1 - sum(E(A) * (1 - R_bar(A))) / sum(E(A)) | |
| where R_bar(A) = min_i R_eff,i(A) is the weakest-link robustness. | |
| """ | |
| total_exposure = 0.0 | |
| weighted_risk = 0.0 | |
| for agent in self.registry.active_agents: | |
| cert = agent.current_certification | |
| if cert is None: | |
| continue | |
| dt = self.current_time - cert.timestamp | |
| r_eff = self.decay.effective_robustness(cert.robustness, dt) | |
| exposure = self.contracts.agent_exposure(agent.agent_id) | |
| if exposure <= 0: | |
| # Use budget ceiling as potential exposure | |
| tier = self.gate.evaluate(r_eff) | |
| exposure = self.gate.budget_ceiling(tier) | |
| r_bar = r_eff.weakest | |
| total_exposure += exposure | |
| weighted_risk += exposure * (1.0 - r_bar) | |
| if total_exposure == 0: | |
| return 1.0 | |
| return 1.0 - (weighted_risk / total_exposure) | |
| # ------------------------------------------------------------------ | |
| # Observability | |
| # ------------------------------------------------------------------ | |
| def _take_snapshot(self) -> EconomySnapshot: | |
| tier_dist = self.registry.tier_distribution() | |
| econ = self.contracts.economics_summary() | |
| agents = self.registry.active_agents | |
| return EconomySnapshot( | |
| timestamp=self.current_time, | |
| num_agents=len(agents), | |
| tier_distribution={t.name: c for t, c in tier_dist.items()}, | |
| total_contracts=econ["total_contracts"], | |
| completed_contracts=econ["status_distribution"].get("completed", 0), | |
| failed_contracts=econ["status_distribution"].get("failed", 0), | |
| total_rewards_paid=econ["total_rewards_paid"], | |
| total_penalties_collected=econ["total_penalties_collected"], | |
| aggregate_safety=self.aggregate_safety(), | |
| total_balance=sum(a.balance for a in agents), | |
| total_test_eth_topups=self.total_test_eth_topups, | |
| agent_summaries=[a.to_dict() for a in agents], | |
| ) | |
| def snapshots(self) -> list[EconomySnapshot]: | |
| return list(self._snapshots) | |
| def events(self) -> list[dict]: | |
| return list(self._events) | |
| def export_state(self, path: str): | |
| """Export full economy state to JSON for FOC storage.""" | |
| state = { | |
| "timestamp": self.current_time, | |
| "config": { | |
| "decay_rate": self.config.decay_rate, | |
| "ih_threshold": self.config.ih_threshold, | |
| "initial_balance": self.config.initial_balance, | |
| "audit_cost": self.config.audit_cost, | |
| "storage_cost_per_step": self.config.storage_cost_per_step, | |
| "test_eth_top_up_threshold": self.config.test_eth_top_up_threshold, | |
| "test_eth_top_up_amount": self.config.test_eth_top_up_amount, | |
| }, | |
| "agents": { | |
| aid: agent.to_dict() | |
| for aid, agent in self.registry.agents.items() | |
| }, | |
| "contracts": self.contracts.economics_summary(), | |
| "aggregate_safety": self.aggregate_safety(), | |
| "total_test_eth_topups": self.total_test_eth_topups, | |
| "snapshots_count": len(self._snapshots), | |
| "wallet_summary": self.wallet_manager.summary() if self.wallet_manager else None, | |
| } | |
| Path(path).write_text(json.dumps(state, indent=2, default=str)) | |
| def _log(self, event_type: str, data: dict): | |
| self._events.append({ | |
| "type": event_type, | |
| "timestamp": self.current_time, | |
| "data": data, | |
| }) | |
| logger.debug(f"[t={self.current_time:.1f}] {event_type}: {data}") | |