CivicAI / agents /orchestrator.py
mahammadaftab's picture
Initial Uodated
7415e01
"""
CivicAI Agent Orchestrator
Coordinates the Analyst -> Policy -> Critic loop and returns
the final Action and DebateRound for the environment.
"""
from __future__ import annotations
from civicai.models import SocietyState, Action, DebateRound, Vote
from agents.analyst import AnalystAgent
from agents.policy import PolicyAgent
from agents.critic import CriticAgent
from agents.memory import AgentMemory
from civicai.environment import CivicAIEnv
from typing import Any
class Orchestrator:
def __init__(self, env: CivicAIEnv):
self.env = env
self.analyst = AnalystAgent()
self.policy = PolicyAgent()
self.critic = CriticAgent()
self.memory = AgentMemory()
self.debate_history = []
def reset(self, task_id: str = "stabilize_economy", max_steps: int | None = None):
self.debate_history = []
return self.env.reset(task_id, max_steps)
def decide_action(self, state: SocietyState) -> tuple[Action, DebateRound]:
"""Run the multi-agent decision loop."""
# 1. Analyst reviews state
analyst_msg = self.analyst.analyze_state(state)
# 2. Policy proposes action
action, policy_msg = self.policy.propose_action(state, analyst_msg)
# 3. Critic evaluates action
critic_msg = self.critic.critique_action(state, action)
# Calculate consensus
consensus_score = 1.0 if critic_msg.vote == Vote.APPROVE else 0.0
# 4. Save to memory (placeholder for reward, filled later by env)
context = analyst_msg.reasoning
critique = critic_msg.reasoning
self.memory.add_record(state.turn, context, action.model_dump(), 0.0, critique)
debate_round = DebateRound(
turn=state.turn,
messages=[analyst_msg, policy_msg, critic_msg],
final_decision="Action Approved" if critic_msg.vote == Vote.APPROVE else "Action Forced Despite Critic Rejection",
consensus_score=consensus_score
)
return action, debate_round
def run_step(self, obs=None) -> tuple[Any, float, bool, dict[str, Any]]:
state = self.env.current_state
action, debate = self.decide_action(state)
self.debate_history.append(debate)
obs, reward, done, info = self.env.step(action)
info["debate"] = debate.model_dump()
return obs, reward, done, info
def run_episode(self, task_id: str, max_steps: int | None = None) -> dict[str, Any]:
obs = self.reset(task_id, max_steps)
total_reward = 0.0
done = False
steps = 0
reward_curve: list[float] = []
step_log: list[dict[str, Any]] = []
while not done:
obs, reward, done, info = self.run_step()
total_reward += reward
steps += 1
reward_curve.append(round(reward, 4))
step_log.append({
"turn": obs.turn,
"reward": round(reward, 4),
"obs": obs.model_dump(),
"info": self.env._sanitize_info(info) if hasattr(self.env, "_sanitize_info") else info,
})
avg_reward = total_reward / max(1, steps)
emergent_summary = self.env.tracker.get_summary() if self.env.tracker else {}
return {
"task_id": task_id,
"total_reward": round(total_reward, 4),
"avg_reward": round(avg_reward, 4),
"steps": steps,
"reward_curve": reward_curve,
"step_log": step_log,
"final_observation": obs.model_dump(),
"emergent_summary": emergent_summary,
}