Spaces:
Sleeping
Sleeping
Major env overhaul: opponent negotiates naturally, gentler time penalty, relative aggression, simplified agent
3f2a3ab | """ | |
| Negotiation Environment Wrapper β OpenEnv Compliant | |
| Implements: reset(), step(), state() | |
| Typed models via Pydantic for Observation, Action, Reward | |
| """ | |
| import random | |
| from typing import Optional, List, Dict, Any | |
| from pydantic import BaseModel, Field | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # OpenEnv Typed Models | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class Observation(BaseModel): | |
| """Observable state visible to the agent.""" | |
| agent_value: int = Field(description="The agent's private valuation/target value for the deal") | |
| current_offer: int = Field(description="Current price on the table") | |
| round: int = Field(description="Current round number (0-indexed before first step)") | |
| max_rounds: int = Field(description="Maximum allowed rounds") | |
| role: str = Field(description="Agent role: 'buyer' or 'seller'") | |
| last_opponent_action: str = Field(description="Opponent's last action: 'START', 'OFFER', 'ACCEPT'") | |
| last_opponent_offer: int = Field(description="Opponent's last offered price") | |
| history: List[Dict[str, Any]] = Field(default_factory=list, description="History of all actions this episode") | |
| class ActionModel(BaseModel): | |
| """Action the agent can take.""" | |
| action_type: str = Field(description="One of: 'OFFER', 'ACCEPT', 'REJECT'") | |
| price: int = Field(default=0, description="Price for OFFER actions, ignored for ACCEPT/REJECT") | |
| class RewardInfo(BaseModel): | |
| """Reward information returned by step().""" | |
| reward: float = Field(description="Numeric reward for this step") | |
| breakdown: Dict[str, float] = Field(default_factory=dict, description="Reward component breakdown") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Opponent Strategy | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class Opponent: | |
| """ | |
| Simulates opponent negotiation behavior. | |
| Three personalities: greedy, fair, impatient. | |
| Each has different concession rates, anchor effects, patience, and noise. | |
| """ | |
| PROFILES = { | |
| "greedy": {"r": 0.05, "alpha": 0.7, "patience": 10, "epsilon": 5}, | |
| "fair": {"r": 0.15, "alpha": 0.4, "patience": 7, "epsilon": 10}, | |
| "impatient": {"r": 0.25, "alpha": 0.2, "patience": 3, "epsilon": 15}, | |
| } | |
| def __init__(self, type_str: str, value: int, role: str): | |
| self.type = type_str | |
| self.opponent_value = value | |
| self.opponent_role = role | |
| self.history: List[Dict[str, Any]] = [] | |
| profile = self.PROFILES.get(type_str, self.PROFILES["fair"]) | |
| self.r = profile["r"] | |
| self.alpha = profile["alpha"] | |
| self.patience = profile["patience"] | |
| self.epsilon = profile["epsilon"] | |
| self.concession_rate = self.r | |
| def reset_state(self): | |
| """Reset concession rate and history for new episode.""" | |
| self.concession_rate = self.r | |
| self.history = [] | |
| def get_response(self, round_num: int, current_offer: int, agent_offer: int, agent_action_type: str): | |
| """ | |
| Generate opponent response to agent's action. | |
| Returns: (action_type: str, price: int) | |
| """ | |
| if agent_action_type != "OFFER": | |
| return "REJECT", 0 | |
| # ββ Acceptance Check ββ | |
| # Opponent negotiates for a minimum number of rounds before accepting. | |
| # Greedy opponents hold out longer; impatient ones settle sooner. | |
| min_round_to_accept = max(2, self.patience // 3) | |
| offer_acceptable = ( | |
| (self.opponent_role == "seller" and agent_offer >= self.opponent_value) or | |
| (self.opponent_role == "buyer" and agent_offer <= self.opponent_value) | |
| ) | |
| if offer_acceptable and round_num >= min_round_to_accept: | |
| self.history.append({"round": round_num, "action": "ACCEPT", "price": agent_offer}) | |
| return "ACCEPT", agent_offer | |
| # ββ Patience-based concession acceleration ββ | |
| if round_num > self.patience: | |
| self.concession_rate = min(0.4, self.concession_rate + 0.05) | |
| # ββ Counter-offer calculation ββ | |
| target = self.opponent_value | |
| delta = target - current_offer | |
| next_offer = current_offer + self.concession_rate * delta | |
| # Anchor effect β blend toward current offer | |
| next_offer = (1.0 - self.alpha) * next_offer + self.alpha * current_offer | |
| # Add noise | |
| next_offer += random.randint(-self.epsilon, self.epsilon) | |
| # ββ VALUE-BASED CLAMPING (Tolerance Bug Fix) ββ | |
| # Seller must not offer below their own value | |
| # Buyer must not offer above their own value | |
| next_offer_int = int(next_offer) | |
| if self.opponent_role == "seller": | |
| next_offer_int = max(next_offer_int, self.opponent_value) | |
| elif self.opponent_role == "buyer": | |
| next_offer_int = min(next_offer_int, self.opponent_value) | |
| # Absolute bounds | |
| next_offer_int = max(100, min(1000, next_offer_int)) | |
| self.history.append({"round": round_num, "action": "OFFER", "price": next_offer_int}) | |
| return "OFFER", next_offer_int | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main Environment Wrapper | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class EnvWrapper: | |
| """ | |
| OpenEnv-compliant negotiation environment. | |
| Exposes: reset(), step(), state() | |
| """ | |
| def __init__(self, opp_type: str = "fair", a_val: int = 800, o_val: int = 500, | |
| agent_role: str = "buyer", max_rounds: int = 20): | |
| self.agent_value = a_val | |
| self.opponent_value = o_val | |
| self.role = agent_role | |
| self.opp_type = opp_type | |
| self.opp_role = "seller" if agent_role == "buyer" else "buyer" | |
| self.max_rounds = max_rounds | |
| self.opp = Opponent(opp_type, o_val, self.opp_role) | |
| # Episode tracking | |
| self.round = 0 | |
| self.current_offer = 0 | |
| self.last_opp_action = "START" | |
| self.last_opp_offer = 0 | |
| self.history: List[Dict[str, Any]] = [] | |
| self.cumulative_aggression_penalty = 0.0 | |
| self.done = False | |
| def reset(self) -> Observation: | |
| """Reset environment and return initial observation.""" | |
| self.round = 0 | |
| self.done = False | |
| self.history = [] | |
| self.cumulative_aggression_penalty = 0.0 | |
| self.opp.reset_state() | |
| # Initial offer is shifted away from agent's value to force negotiation | |
| if self.role == "buyer": | |
| # Start high β agent (buyer) must negotiate DOWN | |
| self.current_offer = min(1000, self.agent_value + 200) | |
| else: | |
| # Start low β agent (seller) must negotiate UP | |
| self.current_offer = max(100, self.agent_value - 200) | |
| self.last_opp_action = "START" | |
| self.last_opp_offer = self.current_offer | |
| return self.state() | |
| def state(self) -> Observation: | |
| """Return current observable state.""" | |
| return Observation( | |
| agent_value=self.agent_value, | |
| current_offer=self.current_offer, | |
| round=self.round, | |
| max_rounds=self.max_rounds, | |
| role=self.role, | |
| last_opponent_action=self.last_opp_action, | |
| last_opponent_offer=self.last_opp_offer, | |
| history=list(self.history), | |
| ) | |
| def _compute_reward(self, deal_price: int) -> tuple: | |
| """ | |
| Compute reward for a completed deal. | |
| Returns: (total_reward, breakdown_dict) | |
| """ | |
| if self.role == "seller": | |
| profit = deal_price - self.agent_value | |
| else: | |
| profit = self.agent_value - deal_price | |
| # Gentle time decay: linear, max 50% loss even if all rounds used. | |
| # This rewards fast deals but doesn't destroy multi-round negotiation. | |
| time_factor = 1.0 - 0.5 * (self.round / self.max_rounds) | |
| base_reward = profit * time_factor | |
| # Penalty for bad deals (agent accepts a losing deal) | |
| bad_deal_penalty = -20.0 if profit < 0 else 0.0 | |
| # Cumulative aggression penalty | |
| aggression = -self.cumulative_aggression_penalty | |
| total = base_reward + bad_deal_penalty + aggression | |
| breakdown = { | |
| "profit": float(profit), | |
| "time_factor": round(time_factor, 4), | |
| "base_reward": round(base_reward, 4), | |
| "bad_deal_penalty": bad_deal_penalty, | |
| "aggression_penalty": aggression, | |
| "total": round(total, 4), | |
| } | |
| return total, breakdown | |
| def _partial_progress_reward(self, action_str: str, action_price: int) -> tuple: | |
| """ | |
| Provide a small shaping reward for intermediate steps. | |
| Rewards the agent for moving toward a deal (improving offers). | |
| """ | |
| reward = 0.0 | |
| breakdown = {} | |
| if action_str.startswith("OFFER") and len(self.history) >= 2: | |
| # Check if agent is making progress toward opponent | |
| prev_agent_offers = [h["agent_price"] for h in self.history[:-1] | |
| if h.get("agent_action", "").startswith("OFFER")] | |
| if prev_agent_offers: | |
| last_agent_offer = prev_agent_offers[-1] | |
| # Positive signal if agent moves toward a reasonable range | |
| if self.role == "buyer": | |
| # Buyer should increase offers (toward seller's value) | |
| improvement = action_price - last_agent_offer | |
| reward = min(2.0, max(-1.0, improvement / 50.0)) | |
| else: | |
| # Seller should decrease offers (toward buyer's value) | |
| improvement = last_agent_offer - action_price | |
| reward = min(2.0, max(-1.0, improvement / 50.0)) | |
| breakdown = {"progress_signal": round(reward, 4)} | |
| return reward, breakdown | |
| def step(self, action_str: str, action_price: int = 0): | |
| """ | |
| Take one step in the environment. | |
| Args: | |
| action_str: "OFFER", "ACCEPT", or "REJECT" | |
| action_price: price for OFFER actions | |
| Returns: | |
| (observation: Observation, reward: float, done: bool, info: dict) | |
| """ | |
| if self.done: | |
| return self.state(), 0.0, True, {"error": "Episode already ended"} | |
| self.round += 1 | |
| reward = 0.0 | |
| done = False | |
| info: Dict[str, Any] = {"error": None} | |
| breakdown: Dict[str, float] = {} | |
| # ββ AGENT OFFER CLAMPING ββ | |
| if action_str.startswith("OFFER"): | |
| action_price = max(100, min(1000, action_price)) | |
| action_str = f"OFFER {action_price}" | |
| # ββ CUMULATIVE AGGRESSION PENALTY ββ | |
| # Scale threshold to ZOPA width so narrow-ZOPA tasks aren't unfairly punished | |
| zopa = abs(self.agent_value - self.opponent_value) | |
| aggression_threshold = max(100, int(zopa * 1.25)) | |
| if abs(action_price - self.opponent_value) > aggression_threshold: | |
| self.cumulative_aggression_penalty += 2.0 | |
| # Record this step in history | |
| step_record = { | |
| "round": self.round, | |
| "agent_action": action_str, | |
| "agent_price": action_price, | |
| } | |
| if action_str == "ACCEPT": | |
| deal_price = self.last_opp_offer | |
| reward, breakdown = self._compute_reward(deal_price) | |
| done = True | |
| info["deal_price"] = deal_price | |
| info["deal_type"] = "agent_accepted" | |
| elif action_str == "REJECT": | |
| reward = -50.0 | |
| breakdown = {"rejection_penalty": -50.0} | |
| done = True | |
| info["deal_type"] = "agent_rejected" | |
| elif action_str.startswith("OFFER"): | |
| opp_action, opp_price = self.opp.get_response( | |
| self.round, self.current_offer, action_price, "OFFER" | |
| ) | |
| if opp_action == "ACCEPT": | |
| deal_price = action_price | |
| reward, breakdown = self._compute_reward(deal_price) | |
| done = True | |
| self.last_opp_action = "ACCEPT" | |
| self.last_opp_offer = deal_price | |
| info["deal_price"] = deal_price | |
| info["deal_type"] = "opponent_accepted" | |
| else: | |
| # Opponent counters | |
| self.current_offer = opp_price | |
| self.last_opp_action = "OFFER" | |
| self.last_opp_offer = opp_price | |
| # Check max rounds | |
| if self.round >= self.max_rounds: | |
| reward = -50.0 | |
| breakdown = {"timeout_penalty": -50.0} | |
| done = True | |
| info["deal_type"] = "timeout" | |
| else: | |
| # Partial progress reward for intermediate steps | |
| step_record["agent_price"] = action_price | |
| self.history.append(step_record) | |
| reward, breakdown = self._partial_progress_reward(action_str, action_price) | |
| info["opponent_counter"] = opp_price | |
| step_record["opp_action"] = opp_action | |
| step_record["opp_price"] = opp_price | |
| # Record history for terminal steps too | |
| if done or action_str == "ACCEPT" or action_str == "REJECT": | |
| # Avoid double-append for non-OFFER terminal steps | |
| if step_record not in self.history: | |
| self.history.append(step_record) | |
| self.done = done | |
| info["reward_breakdown"] = breakdown | |
| return self.state(), reward, done, info | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Convenience β max possible reward for scoring | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_max_possible_reward(agent_value: int, opponent_value: int) -> float: | |
| """ | |
| Maximum reward possible if agent gets the best possible deal on round 1. | |
| """ | |
| return float(abs(agent_value - opponent_value)) | |