Spaces:
Running
Running
File size: 10,144 Bytes
3eae4cc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | """
tests/test_phase1_event_engine.py
Phase 1 validation: event_engine.py β determinism, scenario scaling, event effects
Run: pytest tests/test_phase1_event_engine.py -v
"""
import pytest
from app.models import EventType, ScenarioMode
from app.event_engine import EventEngine, DayEventParams, SCENARIO_MULTIPLIER, BASE_PROBS
from app.tasks import get_task
# βββ DayEventParams defaults βββββββββββββββββββββββββββββββββββββββββββββββββ
class TestDayEventParams:
def test_default_arrival_multiplier_one(self):
p = DayEventParams()
assert p.arrival_multiplier == 1.0
def test_default_officer_reduction_zero(self):
p = DayEventParams()
assert p.officer_reduction == 0
def test_default_no_active_events(self):
p = DayEventParams()
assert p.active_events == []
def test_has_events_false_by_default(self):
p = DayEventParams()
assert p.has_events() is False
def test_has_events_true_when_populated(self):
p = DayEventParams()
p.active_events.append(EventType.SURGE_APPLICATIONS)
assert p.has_events() is True
# βββ ScenarioMultiplier constants ββββββββββββββββββββββββββββββββββββββββββββ
class TestScenarioMultipliers:
def test_normal_multiplier_one(self):
assert SCENARIO_MULTIPLIER[ScenarioMode.NORMAL] == 1.0
def test_crisis_multiplier_greater_than_normal(self):
assert SCENARIO_MULTIPLIER[ScenarioMode.CRISIS] > SCENARIO_MULTIPLIER[ScenarioMode.NORMAL]
def test_extreme_multiplier_greatest(self):
assert (SCENARIO_MULTIPLIER[ScenarioMode.EXTREME_OVERLOAD] >
SCENARIO_MULTIPLIER[ScenarioMode.CRISIS])
def test_all_multipliers_positive(self):
for mode, mult in SCENARIO_MULTIPLIER.items():
assert mult > 0, f"Multiplier for {mode} should be positive"
# βββ EventEngine construction ββββββββββββββββββββββββββββββββββββββββββββββββ
class TestEventEngineConstruction:
def test_engine_initialises_with_seed_and_mode(self):
engine = EventEngine(seed=42, scenario_mode=ScenarioMode.NORMAL)
assert engine.seed == 42
assert engine.scenario_mode == ScenarioMode.NORMAL
def test_engine_stores_correct_multiplier(self):
engine = EventEngine(seed=0, scenario_mode=ScenarioMode.CRISIS)
assert engine._multiplier == SCENARIO_MULTIPLIER[ScenarioMode.CRISIS]
# βββ Determinism guarantee ββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestEventEngineDeterminism:
def test_same_seed_same_day_same_events(self):
task = get_task("cross_department_hard")
engine1 = EventEngine(seed=999, scenario_mode=ScenarioMode.CRISIS)
engine2 = EventEngine(seed=999, scenario_mode=ScenarioMode.CRISIS)
for day in range(1, 10):
e1 = engine1.get_events_for_day(day, task)
e2 = engine2.get_events_for_day(day, task)
assert e1 == e2, f"Day {day}: non-deterministic events {e1} vs {e2}"
def test_different_seeds_can_produce_different_events(self):
task = get_task("cross_department_hard")
engine_a = EventEngine(seed=1, scenario_mode=ScenarioMode.CRISIS)
engine_b = EventEngine(seed=2, scenario_mode=ScenarioMode.CRISIS)
results_a = [engine_a.get_events_for_day(d, task) for d in range(1, 30)]
results_b = [engine_b.get_events_for_day(d, task) for d in range(1, 30)]
# They should differ for at least some days (with high probability)
assert results_a != results_b
def test_day_independence(self):
"""Calling day 5 after day 3 gives same result as calling day 5 directly."""
task = get_task("cross_department_hard")
engine = EventEngine(seed=42, scenario_mode=ScenarioMode.CRISIS)
# Call day 3 first, then day 5
engine.get_events_for_day(3, task)
day5_after = engine.get_events_for_day(5, task)
# Fresh engine, only call day 5
engine2 = EventEngine(seed=42, scenario_mode=ScenarioMode.CRISIS)
day5_direct = engine2.get_events_for_day(5, task)
assert day5_after == day5_direct
# βββ Event output format βββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestEventEngineOutput:
def test_returns_list_of_event_types(self):
task = get_task("cross_department_hard")
engine = EventEngine(seed=42, scenario_mode=ScenarioMode.CRISIS)
events = engine.get_events_for_day(1, task)
assert isinstance(events, list)
for e in events:
assert isinstance(e, EventType)
def test_no_event_returned_when_none_active(self):
"""Easy task with NO_EVENT allowed β must return [NO_EVENT] not []."""
task = get_task("district_backlog_easy")
engine = EventEngine(seed=42, scenario_mode=ScenarioMode.NORMAL)
events = engine.get_events_for_day(1, task)
assert len(events) >= 1
def test_events_only_from_allowed_list(self):
task = get_task("district_backlog_easy")
engine = EventEngine(seed=42, scenario_mode=ScenarioMode.NORMAL)
for day in range(1, 31):
events = engine.get_events_for_day(day, task)
for e in events:
assert e in task.allowed_events or e == EventType.NO_EVENT
def test_hard_task_can_produce_surge_event(self):
"""With crisis mode + 60 days, a surge event must appear at least once."""
task = get_task("cross_department_hard")
engine = EventEngine(seed=999, scenario_mode=ScenarioMode.CRISIS)
all_events = []
for day in range(1, 61):
all_events.extend(engine.get_events_for_day(day, task))
non_null = [e for e in all_events if e != EventType.NO_EVENT]
assert len(non_null) > 0, "Crisis mode should produce at least one real event"
# βββ Apply events effects βββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestApplyEvents:
def _engine(self):
return EventEngine(seed=42, scenario_mode=ScenarioMode.CRISIS)
def test_no_event_gives_no_modification(self):
engine = self._engine()
task = get_task("district_backlog_easy")
params = engine.apply_events([EventType.NO_EVENT], task)
assert params.arrival_multiplier == 1.0
assert params.officer_reduction == 0
def test_surge_event_increases_arrival_multiplier(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events([EventType.SURGE_APPLICATIONS], task)
assert params.arrival_multiplier > 1.0
def test_officer_unavailable_reduces_officers(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events([EventType.OFFICER_UNAVAILABLE], task)
assert params.officer_reduction >= 1
def test_doc_rejection_spike_boosts_defect_rate(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events([EventType.DOCUMENT_REJECTION_SPIKE], task)
assert params.doc_defect_rate_boost > 0.0
def test_revenue_db_delay_boosts_system_dependency(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events([EventType.REVENUE_DB_DELAY], task)
assert params.system_dependency_boost > 0.0
def test_sla_escalation_reduces_sla_window(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events([EventType.SLA_ESCALATION_ORDER], task)
assert params.sla_window_multiplier <= 1.0
def test_multiple_events_compound(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events(
[EventType.SURGE_APPLICATIONS, EventType.OFFICER_UNAVAILABLE], task
)
assert params.arrival_multiplier > 1.0
assert params.officer_reduction >= 1
def test_active_events_populated_correctly(self):
engine = self._engine()
task = get_task("cross_department_hard")
params = engine.apply_events([EventType.SURGE_APPLICATIONS], task)
assert EventType.SURGE_APPLICATIONS in params.active_events
def test_no_event_gives_no_event_in_active_list(self):
engine = self._engine()
task = get_task("district_backlog_easy")
params = engine.apply_events([EventType.NO_EVENT], task)
assert params.active_events == [EventType.NO_EVENT]
# βββ Describe events ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestDescribeEvents:
def _engine(self):
return EventEngine(seed=42, scenario_mode=ScenarioMode.NORMAL)
def test_no_event_description(self):
engine = self._engine()
desc = engine.describe_events([EventType.NO_EVENT])
assert "No active events" in desc
def test_surge_description(self):
engine = self._engine()
desc = engine.describe_events([EventType.SURGE_APPLICATIONS])
assert isinstance(desc, str)
assert len(desc) > 0
def test_multiple_events_description(self):
engine = self._engine()
desc = engine.describe_events([
EventType.SURGE_APPLICATIONS,
EventType.OFFICER_UNAVAILABLE,
])
assert ";" in desc # Two events joined by semicolon
|