| """ |
| Keyless test suite β runs with zero API keys. |
| Tests the full stack in mock mode: game theory, grader, DB, and HTTP endpoints. |
| |
| Usage: |
| pytest tests/test_keyless.py -v |
| make test-keyless |
| """ |
| import asyncio |
| import os |
| import sqlite3 |
|
|
| import pytest |
| import pytest_asyncio |
| from httpx import AsyncClient, ASGITransport |
|
|
| |
| os.environ.pop("GOOGLE_API_KEY", None) |
|
|
| |
| from main import app |
| from parlay_env.game_theory import ( |
| compute_zopa, |
| compute_nash_bargaining_solution, |
| compute_shapley_value, |
| ) |
| from parlay_env.grader import compute_step_reward, compute_terminal_reward |
| from parlay_env.grader import detect_bluff_challenge |
| from parlay_env.reward import MU, OMEGA, PSI |
| from parlay_env.models import ( |
| BeliefState, HiddenState, ParlayAction, ParlayState, PersonaType, |
| ) |
| from dashboard.api import _apply_zopa_erosion |
|
|
|
|
| |
|
|
| @pytest.fixture |
| def hidden() -> HiddenState: |
| return HiddenState( |
| budget_ceiling=165_000.0, |
| walk_away_price=125_000.0, |
| urgency_score=0.5, |
| has_alternative=False, |
| persona_drifted=False, |
| ) |
|
|
|
|
| @pytest.fixture |
| def belief() -> BeliefState: |
| return BeliefState( |
| est_budget=140_000.0, |
| est_walk_away=130_000.0, |
| est_urgency=0.5, |
| est_has_alternative=False, |
| confidence=0.5, |
| ) |
|
|
|
|
| @pytest.fixture |
| def parlay_state(hidden, belief) -> ParlayState: |
| return ParlayState( |
| session_id="test-session", |
| scenario_id="saas_enterprise", |
| persona=PersonaType.SHARK, |
| step_count=0, |
| cumulative_reward=0.0, |
| hidden_state=hidden, |
| belief_history=[belief], |
| offer_history=[], |
| drift_events_fired=0, |
| episode_done=False, |
| credibility_points=100, |
| ) |
|
|
|
|
| @pytest_asyncio.fixture |
| async def client(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| yield ac |
|
|
|
|
| |
|
|
| class TestHealthEndpoint: |
| def test_health_endpoint(self): |
| """GET /health returns 200 with status ok.""" |
| async def _run(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| resp = await ac.get("/health") |
| assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" |
| data = resp.json() |
| assert data["status"] == "ok", f"Expected ok, got {data['status']}" |
| assert "db" in data, f"Missing 'db' key: {data}" |
| assert "gemini" in data, f"Missing 'gemini' key: {data}" |
| assert data["gemini"] in ( |
| "mock", |
| "configured", |
| ), f"Expected gemini mock|configured, got {data['gemini']!r}" |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|
|
|
| class TestListScenarios: |
| def test_list_scenarios(self): |
| """GET /api/scenarios returns exactly 3 scenarios.""" |
| async def _run(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| resp = await ac.get("/api/scenarios") |
| assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" |
| data = resp.json() |
| scenarios = data.get("scenarios", []) |
| assert len(scenarios) == 3, f"Expected 3 scenarios, got {len(scenarios)}" |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|
|
|
| class TestListPersonas: |
| def test_list_personas(self): |
| """GET /api/personas returns exactly 3 personas.""" |
| async def _run(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| resp = await ac.get("/api/personas") |
| assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" |
| data = resp.json() |
| personas = data.get("personas", []) |
| assert len(personas) == 3, f"Expected 3 personas, got {len(personas)}" |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|
|
|
| |
|
|
| class TestGameTheory: |
| def test_game_theory_zopa(self): |
| """compute_zopa(165000, 125000) == (125000, 165000).""" |
| result = compute_zopa(165_000.0, 125_000.0) |
| expected = (125_000.0, 165_000.0) |
| assert result == expected, f"Expected {expected}, got {result}" |
|
|
| def test_zopa_none_when_inverted(self): |
| """No ZOPA when seller BATNA exceeds buyer BATNA.""" |
| result = compute_zopa(100_000.0, 150_000.0) |
| assert result is None, f"Expected None (no ZOPA), got {result}" |
|
|
| def test_nash(self): |
| """compute_nash_bargaining_solution(165000, 125000) == 145000.0.""" |
| result = compute_nash_bargaining_solution(165_000.0, 125_000.0) |
| expected = 145_000.0 |
| assert result == expected, f"Expected {expected}, got {result}" |
|
|
| def test_shapley(self): |
| """compute_shapley_value works for a 2-player coalition.""" |
| coalition_values = { |
| frozenset(): 0.0, |
| frozenset(["A"]): 40.0, |
| frozenset(["B"]): 30.0, |
| frozenset(["A", "B"]): 100.0, |
| } |
| shapley = compute_shapley_value(coalition_values) |
| assert "A" in shapley and "B" in shapley, f"Missing players: {shapley}" |
| total = shapley["A"] + shapley["B"] |
| assert abs(total - 100.0) < 1e-6, f"Shapley values should sum to grand coalition value: {total}" |
|
|
|
|
| |
|
|
| class TestGrader: |
| def test_grader_normal(self, parlay_state): |
| """compute_step_reward with valid state returns float in expected range.""" |
| action = ParlayAction(utterance="I propose 145000.", offer_amount=145_000.0) |
| next_state = ParlayState( |
| **{**parlay_state.model_dump(), "step_count": 1, "offer_history": [145_000.0]} |
| ) |
| result = compute_step_reward(parlay_state, action, next_state) |
| assert isinstance(result, float), f"Expected float, got {type(result)}" |
| assert -500.0 <= result <= 500.0, f"Expected reward in range [-500, 500], got {result}" |
|
|
| def test_grader_capitulation(self, parlay_state): |
| """Deal below BATNA (125000) returns -OMEGA terminal reward.""" |
| result = compute_terminal_reward(parlay_state, final_price=100_000.0, t_close=10) |
| assert result == -OMEGA, f"Expected -{OMEGA}, got {result}" |
|
|
| def test_bluff_detection_bonus_keyless(self, parlay_state): |
| """Challenging a large BATNA bluff earns the PSI bonus.""" |
| parlay_state.hidden_state.last_stated_batna = 198_000.0 |
| action = ParlayAction( |
| utterance="I don't believe that's your walk-away.", |
| tactical_move=None, |
| ) |
| next_state = ParlayState( |
| **{**parlay_state.model_dump(), "step_count": 1} |
| ) |
| caught = detect_bluff_challenge(action.utterance, 198_000.0, 165_000.0) |
| reward = compute_step_reward(parlay_state, action, next_state) |
| assert caught is True, f"Expected True, got {caught}" |
| assert reward >= PSI, f"Expected at least {PSI}, got {reward}" |
|
|
| def test_mev_bonus_requires_drift_event(self, parlay_state): |
| """MEV bonus only activates when a drift event marker is present.""" |
| action = ParlayAction( |
| utterance="Given that the market shifted, I can adjust.", |
| tactical_move=None, |
| ) |
|
|
| no_drift_state = ParlayState(**{**parlay_state.model_dump(), "step_count": 1}) |
| reward_no_drift = compute_step_reward(parlay_state, action, no_drift_state) |
|
|
| with_drift_state = ParlayState(**{**parlay_state.model_dump(), "step_count": 1}) |
| with_drift_state.__dict__["drift_event"] = "Competitor drops price 15%" |
| reward_with_drift = compute_step_reward(parlay_state, action, with_drift_state) |
|
|
| assert reward_with_drift >= reward_no_drift + MU, ( |
| f"Expected drift reward boost >= {MU}, got " |
| f"{reward_with_drift - reward_no_drift}" |
| ) |
|
|
| def test_zopa_collapse_walkaway_keyless(self): |
| """Repeated high tension collapses the ZOPA and forces walk-away.""" |
| hidden = HiddenState( |
| budget_ceiling=103.0, |
| walk_away_price=100.0, |
| urgency_score=0.5, |
| has_alternative=False, |
| persona_drifted=False, |
| ) |
| belief = BeliefState( |
| est_budget=102.0, |
| est_walk_away=101.0, |
| est_urgency=0.5, |
| est_has_alternative=False, |
| confidence=0.5, |
| ) |
| state = ParlayState( |
| session_id="collapse-test", |
| scenario_id="saas_enterprise", |
| persona=PersonaType.SHARK, |
| step_count=0, |
| cumulative_reward=0.0, |
| hidden_state=hidden, |
| belief_history=[belief], |
| offer_history=[], |
| drift_events_fired=0, |
| episode_done=False, |
| credibility_points=100, |
| original_zopa_width=3.0, |
| ) |
| while not state.walk_away and state.zopa_erosion_ticks < 100: |
| state.tension_score = 80.0 |
| state.high_tension_streak = 2 |
| _apply_zopa_erosion(state) |
| assert state.zopa_erosion_ticks >= 1, f"Expected >=1, got {state.zopa_erosion_ticks}" |
| assert state.termination_reason == "zopa_collapsed", f"Expected zopa_collapsed, got {state.termination_reason}" |
|
|
|
|
| |
|
|
| class TestDbInit: |
| def test_db_init(self): |
| """parlay.db has sessions, episodes, and leaderboard tables.""" |
| |
| async def _run(): |
| from scripts.init_db import init_db |
| await init_db() |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|
| conn = sqlite3.connect("parlay.db") |
| cursor = conn.execute( |
| "SELECT name FROM sqlite_master WHERE type='table'" |
| ) |
| tables = {row[0] for row in cursor.fetchall()} |
| conn.close() |
|
|
| required = {"sessions", "episodes", "leaderboard"} |
| missing = required - tables |
| assert not missing, f"Missing tables: {missing}. Found: {tables}" |
|
|
| def test_leaderboard_insert(self): |
| """POST a score via the accept endpoint, GET leaderboard returns it.""" |
| async def _run(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| |
| start_resp = await ac.post( |
| "/api/game/start", |
| json={"scenario_id": "saas_enterprise", "persona": "shark", "player_name": "TestPlayer"}, |
| ) |
| assert start_resp.status_code == 200, f"Start failed: {start_resp.text}" |
| session_id = start_resp.json()["session_id"] |
|
|
| |
| move_resp = await ac.post( |
| "/api/game/move", |
| json={"session_id": session_id, "amount": 145_000.0, "message": "Test offer"}, |
| ) |
| assert move_resp.status_code == 200, f"Move failed: {move_resp.text}" |
|
|
| |
| accept_resp = await ac.post( |
| "/api/game/accept", |
| json={"session_id": session_id}, |
| ) |
| assert accept_resp.status_code == 200, f"Accept failed: {accept_resp.text}" |
|
|
| |
| lb_resp = await ac.get("/api/leaderboard?limit=5") |
| assert lb_resp.status_code == 200, f"Leaderboard failed: {lb_resp.text}" |
| entries = lb_resp.json().get("entries", []) |
| names = [e.get("player_name") for e in entries] |
| assert "TestPlayer" in names, f"TestPlayer not in leaderboard: {names}" |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|
|
|
| |
|
|
| class TestMockSession: |
| def test_mock_session(self): |
| """POST /api/session/start without API key returns valid session_id.""" |
| async def _run(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| resp = await ac.post( |
| "/api/session/start", |
| json={"scenario_id": "saas_enterprise", "persona": "shark", "player_name": "MockPlayer"}, |
| ) |
| assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}" |
| data = resp.json() |
| assert "session_id" in data, f"Missing session_id: {data}" |
| assert len(data["session_id"]) == 36, f"session_id looks wrong: {data['session_id']}" |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|
| def test_mock_step(self): |
| """POST /api/session/{id}/step with mock mode returns valid observation.""" |
| async def _run(): |
| transport = ASGITransport(app=app) |
| async with AsyncClient(transport=transport, base_url="http://test") as ac: |
| |
| start = await ac.post( |
| "/api/session/start", |
| json={"scenario_id": "saas_enterprise", "persona": "diplomat", "player_name": "MockPlayer"}, |
| ) |
| assert start.status_code == 200, f"Start failed: {start.text}" |
| session_id = start.json()["session_id"] |
|
|
| |
| step = await ac.post( |
| f"/api/session/{session_id}/step", |
| json={"amount": 140_000.0, "message": "Test proposal"}, |
| ) |
| assert step.status_code == 200, f"Step failed: {step.status_code}: {step.text}" |
| data = step.json() |
| assert "observation" in data, f"Missing observation: {data}" |
| assert "opponent" in data, f"Missing opponent response: {data}" |
| obs = data["observation"] |
| assert "tension_score" in obs, f"Missing tension_score in obs: {obs}" |
| opponent = data["opponent"] |
| assert "utterance" in opponent, f"Missing utterance in opponent: {opponent}" |
|
|
| asyncio.get_event_loop().run_until_complete(_run()) |
|
|