""" CivicAI Agent Orchestrator Coordinates the Analyst -> Policy -> Critic loop and returns the final Action and DebateRound for the environment. """ from __future__ import annotations from civicai.models import SocietyState, Action, DebateRound, Vote from agents.analyst import AnalystAgent from agents.policy import PolicyAgent from agents.critic import CriticAgent from agents.memory import AgentMemory from civicai.environment import CivicAIEnv from typing import Any class Orchestrator: def __init__(self, env: CivicAIEnv): self.env = env self.analyst = AnalystAgent() self.policy = PolicyAgent() self.critic = CriticAgent() self.memory = AgentMemory() self.debate_history = [] def reset(self, task_id: str = "stabilize_economy", max_steps: int | None = None): self.debate_history = [] return self.env.reset(task_id, max_steps) def decide_action(self, state: SocietyState) -> tuple[Action, DebateRound]: """Run the multi-agent decision loop.""" # 1. Analyst reviews state analyst_msg = self.analyst.analyze_state(state) # 2. Policy proposes action action, policy_msg = self.policy.propose_action(state, analyst_msg) # 3. Critic evaluates action critic_msg = self.critic.critique_action(state, action) # Calculate consensus consensus_score = 1.0 if critic_msg.vote == Vote.APPROVE else 0.0 # 4. Save to memory (placeholder for reward, filled later by env) context = analyst_msg.reasoning critique = critic_msg.reasoning self.memory.add_record(state.turn, context, action.model_dump(), 0.0, critique) debate_round = DebateRound( turn=state.turn, messages=[analyst_msg, policy_msg, critic_msg], final_decision="Action Approved" if critic_msg.vote == Vote.APPROVE else "Action Forced Despite Critic Rejection", consensus_score=consensus_score ) return action, debate_round def run_step(self, obs=None) -> tuple[Any, float, bool, dict[str, Any]]: state = self.env.current_state action, debate = self.decide_action(state) self.debate_history.append(debate) obs, reward, done, info = self.env.step(action) info["debate"] = debate.model_dump() return obs, reward, done, info def run_episode(self, task_id: str, max_steps: int | None = None) -> dict[str, Any]: obs = self.reset(task_id, max_steps) total_reward = 0.0 done = False steps = 0 reward_curve: list[float] = [] step_log: list[dict[str, Any]] = [] while not done: obs, reward, done, info = self.run_step() total_reward += reward steps += 1 reward_curve.append(round(reward, 4)) step_log.append({ "turn": obs.turn, "reward": round(reward, 4), "obs": obs.model_dump(), "info": self.env._sanitize_info(info) if hasattr(self.env, "_sanitize_info") else info, }) avg_reward = total_reward / max(1, steps) emergent_summary = self.env.tracker.get_summary() if self.env.tracker else {} return { "task_id": task_id, "total_reward": round(total_reward, 4), "avg_reward": round(avg_reward, 4), "steps": steps, "reward_curve": reward_curve, "step_log": step_log, "final_observation": obs.model_dump(), "emergent_summary": emergent_summary, }