negotiation-openenv / env_wrapper.py
MridulNegi2005's picture
Major env overhaul: opponent negotiates naturally, gentler time penalty, relative aggression, simplified agent
3f2a3ab
"""
Negotiation Environment Wrapper β€” OpenEnv Compliant
Implements: reset(), step(), state()
Typed models via Pydantic for Observation, Action, Reward
"""
import random
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
# ─────────────────────────────────────────────
# OpenEnv Typed Models
# ─────────────────────────────────────────────
class Observation(BaseModel):
"""Observable state visible to the agent."""
agent_value: int = Field(description="The agent's private valuation/target value for the deal")
current_offer: int = Field(description="Current price on the table")
round: int = Field(description="Current round number (0-indexed before first step)")
max_rounds: int = Field(description="Maximum allowed rounds")
role: str = Field(description="Agent role: 'buyer' or 'seller'")
last_opponent_action: str = Field(description="Opponent's last action: 'START', 'OFFER', 'ACCEPT'")
last_opponent_offer: int = Field(description="Opponent's last offered price")
history: List[Dict[str, Any]] = Field(default_factory=list, description="History of all actions this episode")
class ActionModel(BaseModel):
"""Action the agent can take."""
action_type: str = Field(description="One of: 'OFFER', 'ACCEPT', 'REJECT'")
price: int = Field(default=0, description="Price for OFFER actions, ignored for ACCEPT/REJECT")
class RewardInfo(BaseModel):
"""Reward information returned by step()."""
reward: float = Field(description="Numeric reward for this step")
breakdown: Dict[str, float] = Field(default_factory=dict, description="Reward component breakdown")
# ─────────────────────────────────────────────
# Opponent Strategy
# ─────────────────────────────────────────────
class Opponent:
"""
Simulates opponent negotiation behavior.
Three personalities: greedy, fair, impatient.
Each has different concession rates, anchor effects, patience, and noise.
"""
PROFILES = {
"greedy": {"r": 0.05, "alpha": 0.7, "patience": 10, "epsilon": 5},
"fair": {"r": 0.15, "alpha": 0.4, "patience": 7, "epsilon": 10},
"impatient": {"r": 0.25, "alpha": 0.2, "patience": 3, "epsilon": 15},
}
def __init__(self, type_str: str, value: int, role: str):
self.type = type_str
self.opponent_value = value
self.opponent_role = role
self.history: List[Dict[str, Any]] = []
profile = self.PROFILES.get(type_str, self.PROFILES["fair"])
self.r = profile["r"]
self.alpha = profile["alpha"]
self.patience = profile["patience"]
self.epsilon = profile["epsilon"]
self.concession_rate = self.r
def reset_state(self):
"""Reset concession rate and history for new episode."""
self.concession_rate = self.r
self.history = []
def get_response(self, round_num: int, current_offer: int, agent_offer: int, agent_action_type: str):
"""
Generate opponent response to agent's action.
Returns: (action_type: str, price: int)
"""
if agent_action_type != "OFFER":
return "REJECT", 0
# ── Acceptance Check ──
# Opponent negotiates for a minimum number of rounds before accepting.
# Greedy opponents hold out longer; impatient ones settle sooner.
min_round_to_accept = max(2, self.patience // 3)
offer_acceptable = (
(self.opponent_role == "seller" and agent_offer >= self.opponent_value) or
(self.opponent_role == "buyer" and agent_offer <= self.opponent_value)
)
if offer_acceptable and round_num >= min_round_to_accept:
self.history.append({"round": round_num, "action": "ACCEPT", "price": agent_offer})
return "ACCEPT", agent_offer
# ── Patience-based concession acceleration ──
if round_num > self.patience:
self.concession_rate = min(0.4, self.concession_rate + 0.05)
# ── Counter-offer calculation ──
target = self.opponent_value
delta = target - current_offer
next_offer = current_offer + self.concession_rate * delta
# Anchor effect β€” blend toward current offer
next_offer = (1.0 - self.alpha) * next_offer + self.alpha * current_offer
# Add noise
next_offer += random.randint(-self.epsilon, self.epsilon)
# ── VALUE-BASED CLAMPING (Tolerance Bug Fix) ──
# Seller must not offer below their own value
# Buyer must not offer above their own value
next_offer_int = int(next_offer)
if self.opponent_role == "seller":
next_offer_int = max(next_offer_int, self.opponent_value)
elif self.opponent_role == "buyer":
next_offer_int = min(next_offer_int, self.opponent_value)
# Absolute bounds
next_offer_int = max(100, min(1000, next_offer_int))
self.history.append({"round": round_num, "action": "OFFER", "price": next_offer_int})
return "OFFER", next_offer_int
# ─────────────────────────────────────────────
# Main Environment Wrapper
# ─────────────────────────────────────────────
class EnvWrapper:
"""
OpenEnv-compliant negotiation environment.
Exposes: reset(), step(), state()
"""
def __init__(self, opp_type: str = "fair", a_val: int = 800, o_val: int = 500,
agent_role: str = "buyer", max_rounds: int = 20):
self.agent_value = a_val
self.opponent_value = o_val
self.role = agent_role
self.opp_type = opp_type
self.opp_role = "seller" if agent_role == "buyer" else "buyer"
self.max_rounds = max_rounds
self.opp = Opponent(opp_type, o_val, self.opp_role)
# Episode tracking
self.round = 0
self.current_offer = 0
self.last_opp_action = "START"
self.last_opp_offer = 0
self.history: List[Dict[str, Any]] = []
self.cumulative_aggression_penalty = 0.0
self.done = False
def reset(self) -> Observation:
"""Reset environment and return initial observation."""
self.round = 0
self.done = False
self.history = []
self.cumulative_aggression_penalty = 0.0
self.opp.reset_state()
# Initial offer is shifted away from agent's value to force negotiation
if self.role == "buyer":
# Start high β€” agent (buyer) must negotiate DOWN
self.current_offer = min(1000, self.agent_value + 200)
else:
# Start low β€” agent (seller) must negotiate UP
self.current_offer = max(100, self.agent_value - 200)
self.last_opp_action = "START"
self.last_opp_offer = self.current_offer
return self.state()
def state(self) -> Observation:
"""Return current observable state."""
return Observation(
agent_value=self.agent_value,
current_offer=self.current_offer,
round=self.round,
max_rounds=self.max_rounds,
role=self.role,
last_opponent_action=self.last_opp_action,
last_opponent_offer=self.last_opp_offer,
history=list(self.history),
)
def _compute_reward(self, deal_price: int) -> tuple:
"""
Compute reward for a completed deal.
Returns: (total_reward, breakdown_dict)
"""
if self.role == "seller":
profit = deal_price - self.agent_value
else:
profit = self.agent_value - deal_price
# Gentle time decay: linear, max 50% loss even if all rounds used.
# This rewards fast deals but doesn't destroy multi-round negotiation.
time_factor = 1.0 - 0.5 * (self.round / self.max_rounds)
base_reward = profit * time_factor
# Penalty for bad deals (agent accepts a losing deal)
bad_deal_penalty = -20.0 if profit < 0 else 0.0
# Cumulative aggression penalty
aggression = -self.cumulative_aggression_penalty
total = base_reward + bad_deal_penalty + aggression
breakdown = {
"profit": float(profit),
"time_factor": round(time_factor, 4),
"base_reward": round(base_reward, 4),
"bad_deal_penalty": bad_deal_penalty,
"aggression_penalty": aggression,
"total": round(total, 4),
}
return total, breakdown
def _partial_progress_reward(self, action_str: str, action_price: int) -> tuple:
"""
Provide a small shaping reward for intermediate steps.
Rewards the agent for moving toward a deal (improving offers).
"""
reward = 0.0
breakdown = {}
if action_str.startswith("OFFER") and len(self.history) >= 2:
# Check if agent is making progress toward opponent
prev_agent_offers = [h["agent_price"] for h in self.history[:-1]
if h.get("agent_action", "").startswith("OFFER")]
if prev_agent_offers:
last_agent_offer = prev_agent_offers[-1]
# Positive signal if agent moves toward a reasonable range
if self.role == "buyer":
# Buyer should increase offers (toward seller's value)
improvement = action_price - last_agent_offer
reward = min(2.0, max(-1.0, improvement / 50.0))
else:
# Seller should decrease offers (toward buyer's value)
improvement = last_agent_offer - action_price
reward = min(2.0, max(-1.0, improvement / 50.0))
breakdown = {"progress_signal": round(reward, 4)}
return reward, breakdown
def step(self, action_str: str, action_price: int = 0):
"""
Take one step in the environment.
Args:
action_str: "OFFER", "ACCEPT", or "REJECT"
action_price: price for OFFER actions
Returns:
(observation: Observation, reward: float, done: bool, info: dict)
"""
if self.done:
return self.state(), 0.0, True, {"error": "Episode already ended"}
self.round += 1
reward = 0.0
done = False
info: Dict[str, Any] = {"error": None}
breakdown: Dict[str, float] = {}
# ── AGENT OFFER CLAMPING ──
if action_str.startswith("OFFER"):
action_price = max(100, min(1000, action_price))
action_str = f"OFFER {action_price}"
# ── CUMULATIVE AGGRESSION PENALTY ──
# Scale threshold to ZOPA width so narrow-ZOPA tasks aren't unfairly punished
zopa = abs(self.agent_value - self.opponent_value)
aggression_threshold = max(100, int(zopa * 1.25))
if abs(action_price - self.opponent_value) > aggression_threshold:
self.cumulative_aggression_penalty += 2.0
# Record this step in history
step_record = {
"round": self.round,
"agent_action": action_str,
"agent_price": action_price,
}
if action_str == "ACCEPT":
deal_price = self.last_opp_offer
reward, breakdown = self._compute_reward(deal_price)
done = True
info["deal_price"] = deal_price
info["deal_type"] = "agent_accepted"
elif action_str == "REJECT":
reward = -50.0
breakdown = {"rejection_penalty": -50.0}
done = True
info["deal_type"] = "agent_rejected"
elif action_str.startswith("OFFER"):
opp_action, opp_price = self.opp.get_response(
self.round, self.current_offer, action_price, "OFFER"
)
if opp_action == "ACCEPT":
deal_price = action_price
reward, breakdown = self._compute_reward(deal_price)
done = True
self.last_opp_action = "ACCEPT"
self.last_opp_offer = deal_price
info["deal_price"] = deal_price
info["deal_type"] = "opponent_accepted"
else:
# Opponent counters
self.current_offer = opp_price
self.last_opp_action = "OFFER"
self.last_opp_offer = opp_price
# Check max rounds
if self.round >= self.max_rounds:
reward = -50.0
breakdown = {"timeout_penalty": -50.0}
done = True
info["deal_type"] = "timeout"
else:
# Partial progress reward for intermediate steps
step_record["agent_price"] = action_price
self.history.append(step_record)
reward, breakdown = self._partial_progress_reward(action_str, action_price)
info["opponent_counter"] = opp_price
step_record["opp_action"] = opp_action
step_record["opp_price"] = opp_price
# Record history for terminal steps too
if done or action_str == "ACCEPT" or action_str == "REJECT":
# Avoid double-append for non-OFFER terminal steps
if step_record not in self.history:
self.history.append(step_record)
self.done = done
info["reward_breakdown"] = breakdown
return self.state(), reward, done, info
# ─────────────────────────────────────────────
# Convenience β€” max possible reward for scoring
# ─────────────────────────────────────────────
def get_max_possible_reward(agent_value: int, opponent_value: int) -> float:
"""
Maximum reward possible if agent gets the best possible deal on round 1.
"""
return float(abs(agent_value - opponent_value))