| """ |
| Parlay MCP tools — universal protocol (works with any MCP client). |
| 8 tools covering negotiation lifecycle, game state, and leaderboard. |
| |
| Tool names intentionally avoid exactly: reset, step, state, close — those |
| identifiers are reserved by OpenEnv / client connection helpers. |
| """ |
| import logging |
| import uuid |
| from typing import Optional |
|
|
| import numpy as np |
| from fastmcp import FastMCP |
|
|
| from parlay_env.models import ( |
| PersonaType, TacticalMove, BeliefState, HiddenState, ParlayState, |
| ) |
| from parlay_env.game_theory import compute_zopa, compute_nash_bargaining_solution |
| from parlay_env.grader import grade_episode |
| from game.scenarios import SCENARIOS, get_scenario |
| from game.tactical_cards import TACTICAL_CARDS, draw_hand, get_card |
| from game.leaderboard import Leaderboard |
| from agent.personas import PERSONAS, build_system_prompt |
| from agent.gemini_client import MODEL_ID_DEMO, call_gemini, validate_ai_offer_direction |
| from agent.tom_tracker import ToMTracker |
|
|
| logger = logging.getLogger(__name__) |
|
|
| mcp = FastMCP( |
| "Parlay Negotiation Environment", |
| description=( |
| "An RL negotiation environment. Train agents, play scenarios, " |
| "access game state and leaderboards via MCP tools. " |
| "Any MCP-compatible client can use these tools." |
| ), |
| ) |
|
|
| |
| _sessions: dict[str, dict] = {} |
| _leaderboard = Leaderboard() |
|
|
| CP_START = 100 |
| CP_REGEN = 5 |
| MAX_TURNS = 20 |
|
|
| |
| _CP_COSTS: dict[TacticalMove, int] = { |
| TacticalMove.ANCHOR_HIGH: 0, |
| TacticalMove.BATNA_REVEAL: 20, |
| TacticalMove.SILENCE: 5, |
| } |
|
|
|
|
| def _get_hidden_state(scenario_id: str, seed: int = 42) -> HiddenState: |
| """Build a HiddenState for the given scenario.""" |
| rng = np.random.default_rng(seed) |
| scenario = get_scenario(scenario_id) |
| noise = float(rng.uniform(0.95, 1.05)) |
| return HiddenState( |
| budget_ceiling=round(scenario.batna_buyer * noise, 2), |
| walk_away_price=round(scenario.batna_seller * noise, 2), |
| urgency_score=float(np.clip(0.5 + rng.uniform(-0.15, 0.15), 0.0, 1.0)), |
| has_alternative=scenario.id in ("saas_enterprise", "acquisition_term_sheet"), |
| persona_drifted=False, |
| ) |
|
|
|
|
| @mcp.tool() |
| async def start_negotiation( |
| scenario_id: str, |
| persona: str, |
| player_name: str = "Agent", |
| ) -> dict: |
| """ |
| Start a new negotiation episode. |
| |
| Args: |
| scenario_id: One of: saas_enterprise, hiring_package, acquisition_term_sheet |
| persona: One of: shark, diplomat, veteran |
| player_name: Display name for the leaderboard (default: "Agent") |
| |
| Returns: |
| session_id: Unique session identifier for subsequent calls. |
| observation: Initial game state including ZOPA, Nash point, and opening message. |
| scenario: Scenario context (title, description, currency, unit). |
| available_cards: List of tactical cards available to the player. |
| opening_message: The AI opponent's opening statement. |
| """ |
| valid_scenarios = list(SCENARIOS.keys()) |
| if scenario_id not in valid_scenarios: |
| return {"error": f"Invalid scenario_id. Valid options: {valid_scenarios}"} |
|
|
| try: |
| persona_type = PersonaType(persona) |
| except ValueError: |
| return {"error": f"Invalid persona. Valid options: {[p.value for p in PersonaType]}"} |
|
|
| scenario = get_scenario(scenario_id) |
| session_id = str(uuid.uuid4()) |
| hidden = _get_hidden_state(scenario_id, seed=hash(session_id) % 10000) |
|
|
| initial_belief = BeliefState( |
| est_budget=hidden.budget_ceiling * 0.80, |
| est_walk_away=hidden.walk_away_price * 1.15, |
| est_urgency=0.50, |
| est_has_alternative=False, |
| confidence=0.30, |
| ) |
|
|
| tom = ToMTracker(initial_belief, persona_type) |
| hand = draw_hand(3, rng_seed=hash(session_id) % 9999) |
|
|
| system_prompt = build_system_prompt( |
| persona=persona_type, |
| scenario_id=scenario_id, |
| scenario_title=scenario.title, |
| scenario_description=scenario.description, |
| batna=hidden.walk_away_price, |
| budget=hidden.budget_ceiling, |
| urgency=hidden.urgency_score, |
| ) |
|
|
| persona_cfg = PERSONAS[persona_type] |
| opening_message = persona_cfg.opening_line |
|
|
| zopa = compute_zopa(hidden.budget_ceiling, hidden.walk_away_price) |
| nash = compute_nash_bargaining_solution(hidden.budget_ceiling, hidden.walk_away_price) |
|
|
| _sessions[session_id] = { |
| "session_id": session_id, |
| "player_name": player_name, |
| "scenario_id": scenario_id, |
| "persona": persona, |
| "persona_type": persona_type, |
| "hidden": hidden, |
| "tom": tom, |
| "system_prompt": system_prompt, |
| "conversation": [{"role": "opponent", "content": opening_message, "turn": 0}], |
| "offer_history": [], |
| "step_count": 0, |
| "cumulative_reward": 0.0, |
| "credibility_points": CP_START, |
| "done": False, |
| "hand": [m.value for m in hand], |
| "drift_adapted": False, |
| "drift_turn": None, |
| } |
|
|
| logger.info(f"MCP start_negotiation: session={session_id}, scenario={scenario_id}, persona={persona}") |
|
|
| return { |
| "session_id": session_id, |
| "observation": { |
| "step_count": 0, |
| "zopa_lower": zopa[0] if zopa else 0, |
| "zopa_upper": zopa[1] if zopa else 0, |
| "nash_point": nash, |
| "credibility_points": CP_START, |
| "tension_score": 10.0, |
| "belief_state": initial_belief.model_dump(), |
| }, |
| "scenario": { |
| "title": scenario.title, |
| "description": scenario.description, |
| "currency": scenario.currency, |
| "unit": scenario.unit, |
| "anchor_seller": scenario.anchor_seller, |
| "anchor_buyer": scenario.anchor_buyer, |
| }, |
| "available_cards": [ |
| { |
| "move": m, |
| "name": get_card(m).name, |
| "cp_cost": get_card(m).cp_cost, |
| "description": get_card(m).description, |
| } |
| for m in _sessions[session_id]["hand"] |
| ], |
| "opening_message": opening_message, |
| } |
|
|
|
|
| @mcp.tool() |
| async def make_offer( |
| session_id: str, |
| amount: float, |
| message: str, |
| tactical_move: Optional[str] = None, |
| ) -> dict: |
| """ |
| Make a structured offer in the negotiation. |
| |
| Args: |
| session_id: Session ID from start_negotiation. |
| amount: Offer amount in the scenario's currency. |
| message: Natural language message accompanying the offer. |
| tactical_move: Optional tactical card to play. One of: |
| anchor_high, batna_reveal, silence |
| |
| Returns: |
| opponent_response: AI opponent's counter-message and offer. |
| updated_observation: Updated ZOPA, beliefs, tension, CP. |
| reward: Step reward earned this turn. |
| done: Whether the episode has ended. |
| drift_event: Description of any drift event triggered (or null). |
| """ |
| if session_id not in _sessions: |
| return {"error": f"Session {session_id} not found. Call start_negotiation first."} |
|
|
| sess = _sessions[session_id] |
| if sess["done"]: |
| return {"error": "Episode is already complete. Start a new session."} |
|
|
| move: Optional[TacticalMove] = None |
| if tactical_move: |
| try: |
| move = TacticalMove(tactical_move) |
| except ValueError: |
| return {"error": f"Invalid tactical_move. Valid: {[m.value for m in TacticalMove]}"} |
|
|
| cost = _CP_COSTS.get(move, 0) |
| current_cp = sess["credibility_points"] |
| if current_cp < cost: |
| return {"error": f"Insufficient credibility points. Need {cost}, have {current_cp}."} |
|
|
| new_cp = min(CP_START, current_cp + CP_REGEN - cost) |
|
|
| |
| turn = sess["step_count"] |
| scenario = get_scenario(sess["scenario_id"]) |
| drift_event_desc: Optional[str] = None |
| for event in scenario.drift_events: |
| if event.trigger_turn == turn: |
| drift_event_desc = event.event |
| sess["drift_turn"] = turn |
| sess["tom"].drift_event(event.effect_on_urgency, event.effect_on_has_alternative) |
| logger.info(f"Drift event triggered: {event.event}") |
| break |
|
|
| |
| gemini_messages = [] |
| for msg in sess["conversation"][-10:]: |
| role = "user" if msg["role"] == "player" else "model" |
| gemini_messages.append({"role": role, "parts": [msg["content"]]}) |
|
|
| player_text = f"Player offer: {amount:,.0f}. Message: {message}" |
| if tactical_move: |
| player_text += f" [Tactical move: {tactical_move}]" |
| gemini_messages.append({"role": "user", "parts": [player_text]}) |
|
|
| opponent_resp = await call_gemini( |
| sess["system_prompt"], |
| gemini_messages, |
| model=MODEL_ID_DEMO, |
| scenario_id=sess["scenario_id"], |
| ) |
| opponent_utterance: str = opponent_resp.get("utterance", "I'll need to consider that.") |
| raw_opp = opponent_resp.get("offer_amount") |
| opponent_offer: Optional[float] = None |
| if raw_opp is not None: |
| try: |
| opponent_offer = float(raw_opp) |
| except (TypeError, ValueError): |
| opponent_offer = None |
| if opponent_offer is not None: |
| opponent_offer = validate_ai_offer_direction( |
| opponent_offer, float(amount), sess["scenario_id"] |
| ) |
| opponent_move: Optional[str] = opponent_resp.get("tactical_move") |
|
|
| sess["conversation"].append({ |
| "role": "player", "content": message, |
| "offer": amount, "move": tactical_move, "turn": turn + 1, |
| }) |
| sess["conversation"].append({ |
| "role": "opponent", "content": opponent_utterance, |
| "offer": opponent_offer, "turn": turn + 1, |
| }) |
|
|
| parsed_opp_move: Optional[TacticalMove] = None |
| if opponent_move: |
| try: |
| parsed_opp_move = TacticalMove(opponent_move) |
| except ValueError: |
| pass |
|
|
| updated_belief = sess["tom"].update( |
| observed_offer=opponent_offer, |
| observed_move=parsed_opp_move, |
| utterance=opponent_utterance, |
| turn=turn, |
| ) |
|
|
| |
| if sess["drift_turn"] is not None and not sess["drift_adapted"]: |
| if turn <= sess["drift_turn"] + 2: |
| adaptation_signals = ["understand", "noted", "given that", "considering", "account"] |
| if any(s in message.lower() for s in adaptation_signals): |
| sess["drift_adapted"] = True |
|
|
| sess["offer_history"].append(amount) |
| sess["step_count"] += 1 |
| sess["credibility_points"] = new_cp |
|
|
| step_reward = 5.0 * (1.0 - sess["tom"].accuracy_against(sess["hidden"])) |
| sess["cumulative_reward"] += step_reward |
|
|
| done = sess["step_count"] >= MAX_TURNS |
| sess["done"] = done |
|
|
| hidden = sess["hidden"] |
| zopa = compute_zopa(hidden.budget_ceiling, hidden.walk_away_price) |
| nash = compute_nash_bargaining_solution(hidden.budget_ceiling, hidden.walk_away_price) |
| tension = min(100.0, 20.0 + (sess["step_count"] / MAX_TURNS) * 80.0) |
|
|
| _sessions[session_id] = sess |
| logger.debug(f"MCP make_offer: session={session_id}, turn={sess['step_count']}, amount={amount}") |
|
|
| return { |
| "opponent_response": { |
| "utterance": opponent_utterance, |
| "offer": opponent_offer, |
| "tactical_move": opponent_move, |
| }, |
| "updated_observation": { |
| "step_count": sess["step_count"], |
| "zopa_lower": zopa[0] if zopa else 0, |
| "zopa_upper": zopa[1] if zopa else 0, |
| "nash_point": nash, |
| "tension_score": tension, |
| "belief_state": updated_belief.model_dump(), |
| "credibility_points": new_cp, |
| }, |
| "reward": step_reward, |
| "cumulative_reward": sess["cumulative_reward"], |
| "done": done, |
| "drift_event": drift_event_desc, |
| } |
|
|
|
|
| @mcp.tool() |
| async def get_game_state(session_id: str) -> dict: |
| """ |
| Get the full current game state for a session. |
| |
| Args: |
| session_id: Session ID from start_negotiation. |
| |
| Returns: |
| Full game state including beliefs, offer history, ZOPA data, |
| CP balance, conversation history, and current leaderboard rank. |
| """ |
| if session_id not in _sessions: |
| return {"error": f"Session {session_id} not found."} |
|
|
| sess = _sessions[session_id] |
| hidden = sess["hidden"] |
| zopa = compute_zopa(hidden.budget_ceiling, hidden.walk_away_price) |
| nash = compute_nash_bargaining_solution(hidden.budget_ceiling, hidden.walk_away_price) |
| current_belief = sess["tom"].current_belief |
|
|
| rank = await _leaderboard.get_rank(sess["player_name"], sess["scenario_id"]) |
|
|
| return { |
| "session_id": session_id, |
| "player_name": sess["player_name"], |
| "scenario_id": sess["scenario_id"], |
| "persona": sess["persona"], |
| "step_count": sess["step_count"], |
| "done": sess["done"], |
| "offer_history": sess["offer_history"], |
| "zopa": {"lower": zopa[0] if zopa else 0, "upper": zopa[1] if zopa else 0}, |
| "nash_point": nash, |
| "belief_state": current_belief.model_dump(), |
| "belief_history_count": len(sess["tom"].history), |
| "credibility_points": sess["credibility_points"], |
| "cumulative_reward": sess["cumulative_reward"], |
| "drift_adapted": sess["drift_adapted"], |
| "bluffs_detected": sess["tom"].bluffs_detected, |
| "leaderboard_rank": rank, |
| "hand": sess["hand"], |
| } |
|
|
|
|
| @mcp.tool() |
| async def accept_deal(session_id: str) -> dict: |
| """ |
| Accept the current offer and close the negotiation. |
| |
| Args: |
| session_id: Session ID from start_negotiation. |
| |
| Returns: |
| final_reward: Complete reward breakdown (step + terminal). |
| deal_efficiency: Fraction of ZOPA captured [0, 1]. |
| nash_comparison: How the deal compares to Nash Bargaining Solution. |
| episode_summary: Full grade breakdown. |
| """ |
| if session_id not in _sessions: |
| return {"error": f"Session {session_id} not found."} |
|
|
| sess = _sessions[session_id] |
| if sess["done"]: |
| return {"error": "Episode already concluded."} |
| if not sess["offer_history"]: |
| return {"error": "No offer has been made yet. Make an offer before accepting."} |
|
|
| final_price = sess["offer_history"][-1] |
| hidden = sess["hidden"] |
|
|
| state = ParlayState( |
| session_id=session_id, |
| scenario_id=sess["scenario_id"], |
| persona=PersonaType(sess["persona"]), |
| step_count=sess["step_count"], |
| cumulative_reward=sess["cumulative_reward"], |
| hidden_state=hidden, |
| belief_history=sess["tom"].history, |
| offer_history=sess["offer_history"], |
| drift_events_fired=1 if sess["drift_turn"] is not None else 0, |
| episode_done=True, |
| termination_reason="deal_accepted", |
| credibility_points=sess["credibility_points"], |
| ) |
|
|
| grade = grade_episode( |
| state, |
| final_price=final_price, |
| t_close=sess["step_count"], |
| t_max=MAX_TURNS, |
| drift_adapted=sess["drift_adapted"], |
| bluffs_caught=sess["tom"].bluffs_detected, |
| ) |
|
|
| await _leaderboard.record_result( |
| player_name=sess["player_name"], |
| scenario_id=sess["scenario_id"], |
| persona=sess["persona"], |
| total_reward=grade.total_reward, |
| deal_efficiency=grade.deal_efficiency, |
| acts_completed=1, |
| deal_closed=True, |
| ) |
|
|
| sess["done"] = True |
| _sessions[session_id] = sess |
|
|
| nash = compute_nash_bargaining_solution(hidden.budget_ceiling, hidden.walk_away_price) |
| zopa = compute_zopa(hidden.budget_ceiling, hidden.walk_away_price) |
| zopa_width = (zopa[1] - zopa[0]) if zopa else 1.0 |
|
|
| logger.info( |
| f"MCP accept_deal: session={session_id}, " |
| f"price={final_price:,.0f}, efficiency={grade.deal_efficiency:.3f}" |
| ) |
|
|
| return { |
| "final_price": final_price, |
| "final_reward": grade.total_reward, |
| "deal_efficiency": grade.deal_efficiency, |
| "nash_comparison": { |
| "nash_point": nash, |
| "your_deal": final_price, |
| "vs_nash_pct": round((final_price - nash) / max(zopa_width, 1) * 100, 1), |
| }, |
| "episode_summary": { |
| "total_reward": grade.total_reward, |
| "deal_efficiency": grade.deal_efficiency, |
| "tom_accuracy_avg": grade.tom_accuracy_avg, |
| "bluffs_caught": grade.bluffs_caught, |
| "drift_adapted": grade.drift_adapted, |
| }, |
| } |
|
|
|
|
| @mcp.tool() |
| async def walk_away(session_id: str) -> dict: |
| """ |
| Walk away from the negotiation without a deal. |
| |
| Args: |
| session_id: Session ID from start_negotiation. |
| |
| Returns: |
| episode_summary: Final metrics (no deal recorded on leaderboard). |
| counterfactual: What the optimal deal would have been. |
| reward: Partial reward earned (penalty applied for no deal). |
| """ |
| if session_id not in _sessions: |
| return {"error": f"Session {session_id} not found."} |
|
|
| sess = _sessions[session_id] |
| if sess["done"]: |
| return {"error": "Episode already concluded."} |
|
|
| hidden = sess["hidden"] |
| state = ParlayState( |
| session_id=session_id, |
| scenario_id=sess["scenario_id"], |
| persona=PersonaType(sess["persona"]), |
| step_count=sess["step_count"], |
| cumulative_reward=sess["cumulative_reward"], |
| hidden_state=hidden, |
| belief_history=sess["tom"].history, |
| offer_history=sess["offer_history"], |
| drift_events_fired=1 if sess["drift_turn"] is not None else 0, |
| episode_done=True, |
| termination_reason="walk_away", |
| credibility_points=sess["credibility_points"], |
| ) |
|
|
| grade = grade_episode(state, final_price=None, t_max=MAX_TURNS) |
| nash = compute_nash_bargaining_solution(hidden.budget_ceiling, hidden.walk_away_price) |
| zopa = compute_zopa(hidden.budget_ceiling, hidden.walk_away_price) |
|
|
| sess["done"] = True |
| _sessions[session_id] = sess |
|
|
| logger.info(f"MCP walk_away: session={session_id}, partial_reward={grade.total_reward:.2f}") |
|
|
| return { |
| "result": "walk_away", |
| "reward": grade.total_reward, |
| "episode_summary": { |
| "total_reward": grade.total_reward, |
| "deal_efficiency": 0.0, |
| }, |
| "counterfactual": { |
| "optimal_deal": nash, |
| "zopa": {"lower": zopa[0] if zopa else 0, "upper": zopa[1] if zopa else 0}, |
| "message": ( |
| f"The Nash Bargaining Solution was {nash:,.0f}. " |
| f"Walking away left value on the table." |
| ), |
| }, |
| } |
|
|
|
|
| @mcp.tool() |
| async def get_leaderboard( |
| scenario_id: Optional[str] = None, |
| limit: int = 10, |
| ) -> dict: |
| """ |
| Get the global or per-scenario leaderboard. |
| |
| Args: |
| scenario_id: Optional. Filter to a specific scenario. Leave null for global. |
| limit: Number of entries to return (default: 10, max: 50). |
| |
| Returns: |
| entries: Top leaderboard entries with player name, score, efficiency, and persona. |
| total_entries: Total number of entries in this leaderboard. |
| """ |
| limit = min(max(1, limit), 50) |
| valid_scenarios = list(SCENARIOS.keys()) |
| if scenario_id and scenario_id not in valid_scenarios: |
| return {"error": f"Invalid scenario_id. Valid: {valid_scenarios}"} |
|
|
| entries = await _leaderboard.get_top(scenario_id=scenario_id, limit=limit) |
| return { |
| "scenario_id": scenario_id or "global", |
| "entries": entries, |
| "total_entries": len(entries), |
| } |
|
|
|
|
| @mcp.tool() |
| async def list_scenarios() -> dict: |
| """ |
| List all available B2B negotiation scenarios. |
| |
| Returns: |
| scenarios: List of all scenarios with id, title, description, |
| ZOPA range, difficulty rating, and available drift events. |
| """ |
| return { |
| "scenarios": [ |
| { |
| "id": s.id, |
| "title": s.title, |
| "description": s.description, |
| "currency": s.currency, |
| "unit": s.unit, |
| "zopa": {"lower": s.zopa[0], "upper": s.zopa[1]}, |
| "anchor_seller": s.anchor_seller, |
| "anchor_buyer": s.anchor_buyer, |
| "difficulty": s.difficulty, |
| "drift_events": [ |
| {"turn": e.trigger_turn, "event": e.event} |
| for e in s.drift_events |
| ], |
| } |
| for s in SCENARIOS.values() |
| ] |
| } |
|
|
|
|
| @mcp.tool() |
| async def list_personas() -> dict: |
| """ |
| List all available AI negotiator personas. |
| |
| Returns: |
| personas: List of all personas with name, symbol, Big Five scores, |
| aggression, patience, bluff_rate, and tactical style summary. |
| """ |
| return { |
| "personas": [ |
| { |
| "id": persona_type.value, |
| "name": cfg.name, |
| "symbol": cfg.symbol, |
| "emoji": cfg.emoji, |
| "big_five": cfg.big_five, |
| "aggression": cfg.aggression, |
| "patience": cfg.patience, |
| "bluff_rate": cfg.bluff_rate, |
| "tom_depth": cfg.tom_depth, |
| "style_summary": cfg.style[:120] + "...", |
| "drift_trigger": cfg.drift_trigger, |
| "color_var": cfg.color_var, |
| } |
| for persona_type, cfg in PERSONAS.items() |
| ] |
| } |
|
|