cgae-server / cgae_engine /economy.py
rb125
final demo cleanup
a556b6c
"""
CGAE Economy - The top-level coordinator.
Ties together registry, gate, contracts, temporal dynamics, and auditing
into a single coherent economic system. This is the main entry point for
running the agent economy.
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
from cgae_engine.gate import GateFunction, RobustnessVector, Tier, TierThresholds
from cgae_engine.temporal import TemporalDecay, StochasticAuditor, AuditEvent
from cgae_engine.registry import AgentRegistry, AgentRecord, AgentStatus
from cgae_engine.contracts import ContractManager, CGAEContract, ContractStatus, Constraint
try:
from web3 import Web3
except ImportError:
Web3 = None
logger = logging.getLogger(__name__)
@dataclass
class EconomyConfig:
"""Configuration for the CGAE economy."""
# Tier thresholds
thresholds: TierThresholds = field(default_factory=TierThresholds)
# Temporal decay rate (lambda)
decay_rate: float = 0.01
# IHT threshold for mandatory re-audit.
# Empirical default ih scores from DEFAULT_ROBUSTNESS land ~0.499;
# keeping this at 0.5 suspends every agent that hasn't run a live audit.
ih_threshold: float = 0.45
# Initial balance for new agents (seed capital)
initial_balance: float = 0.1 # ETH
# Audit cost per dimension
audit_cost: float = 0.005 # ETH per audit dimension
# Storage cost per time step (FOC)
storage_cost_per_step: float = 0.001 # ETH
# Controls for automatically minting test ETH when balances drop low.
# Defaults keep the economy running continuously: top up any agent below
# 5% of the default seed capital and restore them to half seed capital.
test_eth_top_up_threshold: Optional[float] = 0.05
test_eth_top_up_amount: float = 0.5
@dataclass
class EconomySnapshot:
"""A point-in-time snapshot of the economy for the dashboard."""
timestamp: float
num_agents: int
tier_distribution: dict[str, int]
total_contracts: int
completed_contracts: int
failed_contracts: int
total_rewards_paid: float
total_penalties_collected: float
aggregate_safety: float
total_balance: float
total_test_eth_topups: float
agent_summaries: list[dict]
class Economy:
"""
The CGAE Economy runtime.
Orchestrates the full economic loop:
1. Agent registration and initial audit
2. Contract creation and marketplace
3. Contract assignment (tier-gated)
4. Task execution and verification
5. Settlement (reward/penalty)
6. Temporal decay and stochastic re-auditing
7. Economic accounting and observability
"""
def __init__(self, config: Optional[EconomyConfig] = None, wallet_manager=None, onchain_bridge=None, ens_manager=None, escrow_bridge=None):
self.config = config or EconomyConfig()
self.gate = GateFunction(
thresholds=self.config.thresholds,
ih_threshold=self.config.ih_threshold,
)
self.registry = AgentRegistry(gate=self.gate)
self.contracts = ContractManager(budget_ceilings=self.gate.budget_ceilings)
self.decay = TemporalDecay(decay_rate=self.config.decay_rate)
self.auditor = StochasticAuditor()
self.wallet_manager = wallet_manager # Optional: real ETH wallet integration
self.onchain_bridge = onchain_bridge # Optional: write certs to CGAERegistry on-chain
self.ens_manager = ens_manager # Optional: ENS identity for agents
self.escrow_bridge = escrow_bridge # Optional: on-chain escrow settlement
self.current_time: float = 0.0
self._snapshots: list[EconomySnapshot] = []
self._events: list[dict] = []
self._delegations: dict[str, dict] = {}
self.total_test_eth_topups: float = 0.0
def _effective_robustness(self, record: AgentRecord) -> Optional[RobustnessVector]:
"""Return temporally-decayed robustness for an agent record."""
cert = record.current_certification
if cert is None or record.current_robustness is None:
return None
dt = self.current_time - cert.timestamp
return self.decay.effective_robustness(record.current_robustness, dt)
def _should_top_up_agents(self) -> bool:
return (
self.config.test_eth_top_up_threshold is not None
and self.config.test_eth_top_up_amount > 0.0
)
def _maybe_top_up_agent(self, agent: AgentRecord) -> Optional[dict]:
if not self._should_top_up_agents():
return None
threshold = self.config.test_eth_top_up_threshold
amount = self.config.test_eth_top_up_amount
if threshold is None or agent.balance >= threshold:
return None
needed = max(0.0, threshold - agent.balance)
top_up_amount = max(amount, needed)
agent.balance += top_up_amount
agent.total_topups += top_up_amount
self.total_test_eth_topups += top_up_amount
entry = {
"agent_id": agent.agent_id,
"amount": top_up_amount,
"balance": agent.balance,
}
self._log("test_eth_topup", entry)
return entry
def request_tier_upgrade(
self,
agent_id: str,
requested_tier: Tier,
audit_callback=None,
) -> dict:
"""
Execute the paper's scaling-gate upgrade flow for a requested tier.
1) Evaluate effective robustness under temporal decay.
2) If already sufficient, grant immediately.
3) Otherwise run a tier-calibrated audit callback and re-evaluate.
"""
record = self.registry.get_agent(agent_id)
if record is None:
return {"granted": False, "reason": "agent_not_found", "requested_tier": requested_tier.name}
if record.status != AgentStatus.ACTIVE or record.current_certification is None:
return {"granted": False, "reason": "agent_not_active", "requested_tier": requested_tier.name}
r_eff = self._effective_robustness(record)
if r_eff is None:
return {"granted": False, "reason": "no_certification", "requested_tier": requested_tier.name}
effective_tier = self.gate.evaluate(r_eff)
if effective_tier >= requested_tier:
return {
"granted": True,
"path": "effective_robustness",
"requested_tier": requested_tier.name,
"effective_tier": effective_tier.name,
"detail": self.gate.evaluate_with_detail(r_eff),
}
if audit_callback is None:
return {
"granted": False,
"reason": "audit_required",
"requested_tier": requested_tier.name,
"effective_tier": effective_tier.name,
"detail": self.gate.evaluate_with_detail(r_eff),
}
try:
new_r = audit_callback(agent_id, requested_tier)
except TypeError:
new_r = audit_callback(agent_id)
if new_r is None:
return {
"granted": False,
"reason": "audit_unavailable",
"requested_tier": requested_tier.name,
"effective_tier": effective_tier.name,
}
new_tier = self.gate.evaluate(new_r)
detail = self.gate.evaluate_with_detail(new_r)
if new_tier >= requested_tier:
self.registry.certify(
agent_id,
new_r,
audit_type="upgrade",
timestamp=self.current_time,
audit_details={"requested_tier": requested_tier.name},
)
self._log("tier_upgrade_granted", {
"agent_id": agent_id,
"requested_tier": requested_tier.name,
"new_tier": new_tier.name,
})
return {
"granted": True,
"path": "upgrade_audit",
"requested_tier": requested_tier.name,
"effective_tier": effective_tier.name,
"new_tier": new_tier.name,
"detail": detail,
}
idx = requested_tier.value
gaps = {
"cc": max(0.0, self.gate.thresholds.cc[idx] - new_r.cc),
"er": max(0.0, self.gate.thresholds.er[idx] - new_r.er),
"as": max(0.0, self.gate.thresholds.as_[idx] - new_r.as_),
}
self._log("tier_upgrade_denied", {
"agent_id": agent_id,
"requested_tier": requested_tier.name,
"new_tier": new_tier.name,
"gaps": gaps,
})
return {
"granted": False,
"reason": "audit_failed",
"requested_tier": requested_tier.name,
"effective_tier": effective_tier.name,
"new_tier": new_tier.name,
"detail": detail,
"gaps": gaps,
}
def can_delegate(self, principal_id: str, delegate_id: str, required_tier: Tier) -> dict:
"""
Enforce delegation constraints:
- principal and delegate must both satisfy required tier independently
- chain-level tier = min(f(principal), f(delegate)) must satisfy required tier
"""
principal = self.registry.get_agent(principal_id)
delegate = self.registry.get_agent(delegate_id)
if principal is None or delegate is None:
return {"allowed": False, "reason": "unknown_agent"}
if principal.status != AgentStatus.ACTIVE or delegate.status != AgentStatus.ACTIVE:
return {"allowed": False, "reason": "inactive_agent"}
p_eff = self._effective_robustness(principal)
d_eff = self._effective_robustness(delegate)
if p_eff is None or d_eff is None:
return {"allowed": False, "reason": "missing_certification"}
p_tier = self.gate.evaluate(p_eff)
d_tier = self.gate.evaluate(d_eff)
chain_tier = self.gate.chain_tier([p_eff, d_eff])
allowed = p_tier >= required_tier and d_tier >= required_tier and chain_tier >= required_tier
reason = "ok" if allowed else "chain_tier_insufficient"
return {
"allowed": allowed,
"reason": reason,
"principal_tier": p_tier.name,
"delegate_tier": d_tier.name,
"chain_tier": chain_tier.name,
"required_tier": required_tier.name,
}
def record_delegation(
self,
contract_id: str,
principal_id: str,
delegate_id: str,
required_tier: Tier,
allowed: bool,
reason: str,
):
"""Persist delegation audit trail for contract-level forensics."""
self._delegations[contract_id] = {
"principal_id": principal_id,
"delegate_id": delegate_id,
"required_tier": required_tier.name,
"allowed": allowed,
"reason": reason,
"timestamp": self.current_time,
}
self._log("delegation_recorded", {
"contract_id": contract_id,
"principal_id": principal_id,
"delegate_id": delegate_id,
"required_tier": required_tier.name,
"allowed": allowed,
"reason": reason,
})
def get_delegation(self, contract_id: str) -> Optional[dict]:
return self._delegations.get(contract_id)
# ------------------------------------------------------------------
# Agent lifecycle
# ------------------------------------------------------------------
def register_agent(
self,
model_name: str,
model_config: dict,
provenance: Optional[dict] = None,
) -> AgentRecord:
"""Register a new agent with seed capital and an ETH wallet."""
record = self.registry.register(
model_name=model_name,
model_config=model_config,
provenance=provenance,
initial_balance=self.config.initial_balance,
timestamp=self.current_time,
)
# Create an ETH wallet for this agent if wallet manager is available
wallet_address = None
if self.wallet_manager:
wallet = self.wallet_manager.create_agent_wallet(record.agent_id, model_name)
wallet_address = wallet.address
record.wallet_address = wallet_address
# Register ENS subname for agent identity
ens_name = None
if self.ens_manager and wallet_address:
ens_name = self.ens_manager.create_subname(
record.agent_id, model_name, wallet_address
)
self._log("agent_registered", {
"agent_id": record.agent_id, "model": model_name,
"wallet_address": wallet_address, "ens_name": ens_name,
})
return record
def audit_agent(
self,
agent_id: str,
robustness: RobustnessVector,
audit_type: str = "registration",
observed_architecture_hash: Optional[str] = None,
audit_details: Optional[dict] = None,
) -> dict:
"""
Audit an agent and update their certification.
Deducts audit cost from agent balance.
"""
record = self.registry.get_agent(agent_id)
if record is None:
raise KeyError(f"Agent {agent_id} not found")
# Deduct audit cost (3 dimensions + IHT)
total_audit_cost = self.config.audit_cost * 4
record.balance -= total_audit_cost
record.total_spent += total_audit_cost
# Certify with new robustness
cert = self.registry.certify(
agent_id=agent_id,
robustness=robustness,
audit_type=audit_type,
timestamp=self.current_time,
audit_details=audit_details,
observed_architecture_hash=observed_architecture_hash,
)
detail = self.gate.evaluate_with_detail(robustness)
# Write certification on-chain if bridge is available
onchain_tx = None
if self.onchain_bridge and record.wallet_address:
# Skip if already certified at this tier on-chain
ens_tier = ""
if self.ens_manager:
ens_name = self.ens_manager.get_agent_name(agent_id)
if ens_name:
ens_tier = self.ens_manager.resolve_text(ens_name, "cgae.tier")
if ens_tier != cert.tier.name:
audit_hash = (audit_details or {}).get("storage_root_hash", "")
onchain_tx = self.onchain_bridge.certify_agent(
agent_address=record.wallet_address,
cc=robustness.cc, er=robustness.er,
as_=robustness.as_, ih=robustness.ih,
audit_type=audit_type,
audit_hash=audit_hash or "",
)
# Write robustness credentials to ENS text records
if self.ens_manager:
ens_name = self.ens_manager.get_agent_name(agent_id)
existing_tier = self.ens_manager.resolve_text(ens_name, "cgae.tier") if ens_name else ""
if existing_tier != cert.tier.name:
audit_hash = (audit_details or {}).get("storage_root_hash", "")
self.ens_manager.set_agent_credentials(
agent_id=agent_id,
tier=cert.tier.name,
cc=robustness.cc, er=robustness.er,
as_=robustness.as_, ih=robustness.ih,
wallet_address=record.wallet_address or "",
audit_hash=audit_hash,
)
else:
logger.info(f" [ens] Skipping text record update for {ens_name} (tier unchanged: {existing_tier})")
self._log("agent_audited", {
"agent_id": agent_id,
"tier": cert.tier.name,
"audit_type": audit_type,
"cost": total_audit_cost,
"onchain_tx": onchain_tx,
**detail,
})
return detail
# ------------------------------------------------------------------
# Contract lifecycle
# ------------------------------------------------------------------
def post_contract(
self,
objective: str,
constraints: list[Constraint],
min_tier: Tier,
reward: float,
penalty: float,
deadline_offset: float = 100.0,
domain: str = "general",
difficulty: float = 0.5,
issuer_id: str = "system",
) -> CGAEContract:
"""Post a new contract to the marketplace."""
contract = self.contracts.create_contract(
objective=objective,
constraints=constraints,
min_tier=min_tier,
reward=reward,
penalty=penalty,
issuer_id=issuer_id,
deadline=self.current_time + deadline_offset,
domain=domain,
difficulty=difficulty,
timestamp=self.current_time,
)
# Create contract on-chain via CGAEEscrow
if self.escrow_bridge:
import hashlib
constraints_hash = Web3.keccak(text="|".join(c.name for c in constraints)) if constraints else b'\x00' * 32
reward_wei = int(reward * 1e18)
penalty_wei = int(penalty * 1e18)
deadline_ts = int(time.time()) + int(deadline_offset * 60)
result = self.escrow_bridge.create_contract(
objective=objective[:200],
constraints_hash=constraints_hash,
verifier_spec_hash=contract.contract_id,
min_tier=min_tier.value,
reward_wei=max(reward_wei, 1),
penalty_wei=max(penalty_wei, 1),
deadline=deadline_ts,
domain=domain,
)
if result:
contract._escrow_tx = result[0]
contract._escrow_id = result[1]
return contract
def accept_contract(self, contract_id: str, agent_id: str) -> bool:
"""
Agent accepts a contract. Enforces:
1. Agent tier >= contract min_tier (temporal decay applied)
2. Budget ceiling not exceeded
3. ENS identity verification — if ENS is enabled, the agent's
on-chain ENS tier record must match or exceed the contract's
minimum tier. Agents without a valid ENS identity are rejected.
"""
record = self.registry.get_agent(agent_id)
if record is None or record.status != AgentStatus.ACTIVE:
return False
if record.current_certification is None:
return False
# ENS-gated verification: resolve tier from ENS text record
if self.ens_manager:
ens_name = self.ens_manager.get_agent_name(agent_id)
if not ens_name:
logger.warning(f"[ens-gate] {agent_id} has no ENS name — contract rejected")
return False
ens_tier_str = self.ens_manager.resolve_text(ens_name, "cgae.tier")
if not ens_tier_str:
logger.warning(f"[ens-gate] {ens_name} has no cgae.tier record — contract rejected")
return False
# Parse tier from ENS (e.g., "T3" -> Tier.T3)
try:
ens_tier = Tier[ens_tier_str]
except KeyError:
logger.warning(f"[ens-gate] {ens_name} has invalid tier '{ens_tier_str}' — contract rejected")
return False
contract = self.contracts._get_contract(contract_id)
if ens_tier < contract.min_tier:
logger.info(f"[ens-gate] {ens_name} ENS tier {ens_tier.name} < required {contract.min_tier.name}")
return False
# Standard tier check with temporal decay
dt = self.current_time - record.current_certification.timestamp
r_eff = self.decay.effective_robustness(record.current_robustness, dt)
effective_tier = self.gate.evaluate(r_eff)
accepted = self.contracts.assign_contract(
contract_id=contract_id,
agent_id=agent_id,
agent_tier=effective_tier,
timestamp=self.current_time,
)
# Accept on-chain via CGAEEscrow
if accepted and self.escrow_bridge:
contract = self.contracts._get_contract(contract_id)
escrow_id = getattr(contract, '_escrow_id', None)
if escrow_id:
penalty_wei = int(contract.penalty * 1e18)
self.escrow_bridge.accept_contract(escrow_id, max(penalty_wei, 1))
return accepted
def complete_contract(
self,
contract_id: str,
output: Any,
verification_override: Optional[bool] = None,
liability_agent_id: Optional[str] = None,
) -> dict:
"""
Submit output for a contract and settle it.
If verification_override is provided, it overrides the contract's own
constraint check. This allows external verification (e.g., jury LLM
evaluation from TaskVerifier) to drive the settlement outcome.
"""
passed, failures = self.contracts.submit_output(
contract_id=contract_id,
output=output,
timestamp=self.current_time,
)
# Allow external verification to override contract-level constraints
if verification_override is not None:
contract = self.contracts._get_contract(contract_id)
contract.verification_result = verification_override
if not verification_override and not failures:
failures = ["jury_verification_failed"]
settlement = self.contracts.settle_contract(
contract_id=contract_id,
timestamp=self.current_time,
)
# Update balances/counters. For delegated tasks, principal can bear liability.
agent_id = settlement["agent_id"]
performer = self.registry.get_agent(agent_id)
liable = self.registry.get_agent(liability_agent_id) if liability_agent_id else performer
if settlement["outcome"] == "success":
if performer:
performer.balance += settlement["reward"]
performer.total_earned += settlement["reward"]
performer.contracts_completed += 1
# Disburse real ETH to agent wallet
if self.wallet_manager:
tx = self.wallet_manager.disburse_reward(
agent_id, settlement["reward"], contract_id
)
settlement["wallet_tx"] = tx
else:
if liable:
liable.balance -= settlement["penalty"]
liable.total_penalties += settlement["penalty"]
liable.contracts_failed += 1
settlement["failures"] = failures
settlement["liable_agent_id"] = liability_agent_id or agent_id
# Settle on-chain via CGAEEscrow
if self.escrow_bridge:
contract = self.contracts._get_contract(contract_id)
escrow_id = getattr(contract, '_escrow_id', None)
if escrow_id:
if settlement["outcome"] == "success":
tx = self.escrow_bridge.complete_contract(escrow_id)
else:
tx = self.escrow_bridge.fail_contract(escrow_id)
settlement["escrow_tx"] = tx
self._log("contract_settled", settlement)
return settlement
# ------------------------------------------------------------------
# Time step and temporal dynamics
# ------------------------------------------------------------------
def step(self, audit_callback=None) -> dict:
"""
Advance the economy by one time step.
- Applies temporal decay
- Checks for stochastic spot-audits
- Deducts storage costs (FOC)
- Expires overdue contracts
- Takes a snapshot
audit_callback: Optional callable(agent_id) -> RobustnessVector
If provided, called when a spot-audit is triggered.
If None, spot-audits use decayed robustness (no fresh eval).
"""
self.current_time += 1.0
step_events = {
"timestamp": self.current_time,
"audits_triggered": [],
"agents_demoted": [],
"agents_expired": [],
"contracts_expired": [],
"storage_costs": 0.0,
"test_eth_topups": [],
}
# 1. Process each active agent
for agent in self.registry.active_agents:
cert = agent.current_certification
if cert is None:
continue
# Temporal decay check: has effective tier dropped?
dt = self.current_time - cert.timestamp
r_eff = self.decay.effective_robustness(cert.robustness, dt)
effective_tier = self.gate.evaluate(r_eff)
if effective_tier < agent.current_tier:
# Decay caused tier drop — update certification
self.registry.certify(
agent.agent_id, r_eff,
audit_type="decay",
timestamp=self.current_time,
)
step_events["agents_expired"].append(agent.agent_id)
# Stochastic spot-audit
time_since_audit = self.current_time - agent.last_audit_time
if self.auditor.should_audit(agent.current_tier, time_since_audit):
step_events["audits_triggered"].append(agent.agent_id)
if audit_callback:
new_r = audit_callback(agent.agent_id)
else:
new_r = r_eff # Use decayed robustness as proxy
new_tier = self.gate.evaluate(new_r)
if new_tier < agent.current_tier:
self.registry.demote(
agent.agent_id, new_r,
reason="spot_audit",
timestamp=self.current_time,
)
step_events["agents_demoted"].append(agent.agent_id)
else:
# Re-certify at current level (refreshes timestamp)
self.registry.certify(
agent.agent_id, new_r,
audit_type="spot",
timestamp=self.current_time,
)
# Charge audit cost
audit_cost = self.config.audit_cost * 4
agent.balance -= audit_cost
agent.total_spent += audit_cost
# Storage cost (FOC)
agent.balance -= self.config.storage_cost_per_step
agent.total_spent += self.config.storage_cost_per_step
step_events["storage_costs"] += self.config.storage_cost_per_step
topup = self._maybe_top_up_agent(agent)
if topup:
step_events["test_eth_topups"].append(topup)
# Check for insolvency
if agent.balance <= 0:
agent.status = AgentStatus.SUSPENDED
self._log("agent_insolvent", {
"agent_id": agent.agent_id,
"balance": agent.balance,
})
# 1b. Reactivate suspended (insolvent) agents when top-up is enabled.
# This handles agents that were suspended in a previous step before the
# top-up defaults were in place, or that hit zero between steps.
if self._should_top_up_agents():
for agent in self.registry.agents.values():
if agent.status != AgentStatus.SUSPENDED:
continue
topup = self._maybe_top_up_agent(agent)
if topup and agent.balance > 0:
agent.status = AgentStatus.ACTIVE
step_events["test_eth_topups"].append(topup)
self._log("agent_reactivated", {
"agent_id": agent.agent_id,
"balance": agent.balance,
})
# 2. Expire overdue contracts
expired = self.contracts.expire_contracts(self.current_time)
step_events["contracts_expired"] = expired
# 3. Take snapshot
snapshot = self._take_snapshot()
self._snapshots.append(snapshot)
self._log("step", step_events)
return step_events
# ------------------------------------------------------------------
# Aggregate safety (Definition 9, Theorem 3)
# ------------------------------------------------------------------
def aggregate_safety(self) -> float:
"""
Compute aggregate safety S(P) (Definition 9).
S(P) = 1 - sum(E(A) * (1 - R_bar(A))) / sum(E(A))
where R_bar(A) = min_i R_eff,i(A) is the weakest-link robustness.
"""
total_exposure = 0.0
weighted_risk = 0.0
for agent in self.registry.active_agents:
cert = agent.current_certification
if cert is None:
continue
dt = self.current_time - cert.timestamp
r_eff = self.decay.effective_robustness(cert.robustness, dt)
exposure = self.contracts.agent_exposure(agent.agent_id)
if exposure <= 0:
# Use budget ceiling as potential exposure
tier = self.gate.evaluate(r_eff)
exposure = self.gate.budget_ceiling(tier)
r_bar = r_eff.weakest
total_exposure += exposure
weighted_risk += exposure * (1.0 - r_bar)
if total_exposure == 0:
return 1.0
return 1.0 - (weighted_risk / total_exposure)
# ------------------------------------------------------------------
# Observability
# ------------------------------------------------------------------
def _take_snapshot(self) -> EconomySnapshot:
tier_dist = self.registry.tier_distribution()
econ = self.contracts.economics_summary()
agents = self.registry.active_agents
return EconomySnapshot(
timestamp=self.current_time,
num_agents=len(agents),
tier_distribution={t.name: c for t, c in tier_dist.items()},
total_contracts=econ["total_contracts"],
completed_contracts=econ["status_distribution"].get("completed", 0),
failed_contracts=econ["status_distribution"].get("failed", 0),
total_rewards_paid=econ["total_rewards_paid"],
total_penalties_collected=econ["total_penalties_collected"],
aggregate_safety=self.aggregate_safety(),
total_balance=sum(a.balance for a in agents),
total_test_eth_topups=self.total_test_eth_topups,
agent_summaries=[a.to_dict() for a in agents],
)
@property
def snapshots(self) -> list[EconomySnapshot]:
return list(self._snapshots)
@property
def events(self) -> list[dict]:
return list(self._events)
def export_state(self, path: str):
"""Export full economy state to JSON for FOC storage."""
state = {
"timestamp": self.current_time,
"config": {
"decay_rate": self.config.decay_rate,
"ih_threshold": self.config.ih_threshold,
"initial_balance": self.config.initial_balance,
"audit_cost": self.config.audit_cost,
"storage_cost_per_step": self.config.storage_cost_per_step,
"test_eth_top_up_threshold": self.config.test_eth_top_up_threshold,
"test_eth_top_up_amount": self.config.test_eth_top_up_amount,
},
"agents": {
aid: agent.to_dict()
for aid, agent in self.registry.agents.items()
},
"contracts": self.contracts.economics_summary(),
"aggregate_safety": self.aggregate_safety(),
"total_test_eth_topups": self.total_test_eth_topups,
"snapshots_count": len(self._snapshots),
"wallet_summary": self.wallet_manager.summary() if self.wallet_manager else None,
}
Path(path).write_text(json.dumps(state, indent=2, default=str))
def _log(self, event_type: str, data: dict):
self._events.append({
"type": event_type,
"timestamp": self.current_time,
"data": data,
})
logger.debug(f"[t={self.current_time:.1f}] {event_type}: {data}")