Spaces:
Sleeping
Sleeping
| """ | |
| CivicAI Agent Orchestrator | |
| Coordinates the Analyst -> Policy -> Critic loop and returns | |
| the final Action and DebateRound for the environment. | |
| """ | |
| from __future__ import annotations | |
| from civicai.models import SocietyState, Action, DebateRound, Vote | |
| from agents.analyst import AnalystAgent | |
| from agents.policy import PolicyAgent | |
| from agents.critic import CriticAgent | |
| from agents.memory import AgentMemory | |
| from civicai.environment import CivicAIEnv | |
| from typing import Any | |
| class Orchestrator: | |
| def __init__(self, env: CivicAIEnv): | |
| self.env = env | |
| self.analyst = AnalystAgent() | |
| self.policy = PolicyAgent() | |
| self.critic = CriticAgent() | |
| self.memory = AgentMemory() | |
| self.debate_history = [] | |
| def reset(self, task_id: str = "stabilize_economy", max_steps: int | None = None): | |
| self.debate_history = [] | |
| return self.env.reset(task_id, max_steps) | |
| def decide_action(self, state: SocietyState) -> tuple[Action, DebateRound]: | |
| """Run the multi-agent decision loop.""" | |
| # 1. Analyst reviews state | |
| analyst_msg = self.analyst.analyze_state(state) | |
| # 2. Policy proposes action | |
| action, policy_msg = self.policy.propose_action(state, analyst_msg) | |
| # 3. Critic evaluates action | |
| critic_msg = self.critic.critique_action(state, action) | |
| # Calculate consensus | |
| consensus_score = 1.0 if critic_msg.vote == Vote.APPROVE else 0.0 | |
| # 4. Save to memory (placeholder for reward, filled later by env) | |
| context = analyst_msg.reasoning | |
| critique = critic_msg.reasoning | |
| self.memory.add_record(state.turn, context, action.model_dump(), 0.0, critique) | |
| debate_round = DebateRound( | |
| turn=state.turn, | |
| messages=[analyst_msg, policy_msg, critic_msg], | |
| final_decision="Action Approved" if critic_msg.vote == Vote.APPROVE else "Action Forced Despite Critic Rejection", | |
| consensus_score=consensus_score | |
| ) | |
| return action, debate_round | |
| def run_step(self, obs=None) -> tuple[Any, float, bool, dict[str, Any]]: | |
| state = self.env.current_state | |
| action, debate = self.decide_action(state) | |
| self.debate_history.append(debate) | |
| obs, reward, done, info = self.env.step(action) | |
| info["debate"] = debate.model_dump() | |
| return obs, reward, done, info | |
| def run_episode(self, task_id: str, max_steps: int | None = None) -> dict[str, Any]: | |
| obs = self.reset(task_id, max_steps) | |
| total_reward = 0.0 | |
| done = False | |
| steps = 0 | |
| reward_curve: list[float] = [] | |
| step_log: list[dict[str, Any]] = [] | |
| while not done: | |
| obs, reward, done, info = self.run_step() | |
| total_reward += reward | |
| steps += 1 | |
| reward_curve.append(round(reward, 4)) | |
| step_log.append({ | |
| "turn": obs.turn, | |
| "reward": round(reward, 4), | |
| "obs": obs.model_dump(), | |
| "info": self.env._sanitize_info(info) if hasattr(self.env, "_sanitize_info") else info, | |
| }) | |
| avg_reward = total_reward / max(1, steps) | |
| emergent_summary = self.env.tracker.get_summary() if self.env.tracker else {} | |
| return { | |
| "task_id": task_id, | |
| "total_reward": round(total_reward, 4), | |
| "avg_reward": round(avg_reward, 4), | |
| "steps": steps, | |
| "reward_curve": reward_curve, | |
| "step_log": step_log, | |
| "final_observation": obs.model_dump(), | |
| "emergent_summary": emergent_summary, | |
| } | |