fish_farm_env / tests /test_environment.py
rahul24raj's picture
Upload folder using huggingface_hub
a0b8672 verified
"""Tests for the Fish Farm environment using official openenv-core types.
Tests the FishFarmEnvironment class against the OpenEnv spec:
- reset(seed, episode_id, **kwargs) -> Observation
- step(action, timeout_s, **kwargs) -> Observation
- state -> State
- SUPPORTS_CONCURRENT_SESSIONS flag
Also validates simulation coupling: the cascade from
overfeed β†’ ammonia β†’ DO crash β†’ stress β†’ disease β†’ mortality
must emerge from the environment's internal dynamics.
"""
import pytest
try:
from openenv.core.env_server import Action, Observation, State, Environment
from agentic_rl.models import FarmAction, FarmObservation, FarmState
from agentic_rl.server.environment import FishFarmEnvironment
HAS_OPENENV = True
except ImportError:
HAS_OPENENV = False
@pytest.mark.skipif(not HAS_OPENENV, reason="openenv-core not installed locally")
class TestSpecCompliance:
"""Verify the environment follows the official OpenEnv spec."""
def test_models_inherit_from_openenv(self):
assert issubclass(FarmAction, Action)
assert issubclass(FarmObservation, Observation)
assert issubclass(FarmState, State)
def test_environment_inherits_from_openenv(self):
assert issubclass(FishFarmEnvironment, Environment)
def test_observation_has_done_and_reward(self):
obs = FarmObservation(done=False, reward=0.5)
assert hasattr(obs, "done")
assert hasattr(obs, "reward")
assert hasattr(obs, "metadata")
def test_state_has_episode_id_and_step_count(self):
state = FarmState()
assert hasattr(state, "episode_id")
assert hasattr(state, "step_count")
def test_action_has_metadata(self):
action = FarmAction()
assert hasattr(action, "metadata")
def test_supports_concurrent_sessions(self):
assert FishFarmEnvironment.SUPPORTS_CONCURRENT_SESSIONS is True
@pytest.mark.skipif(not HAS_OPENENV, reason="openenv-core not installed locally")
class TestReset:
def setup_method(self):
self.env = FishFarmEnvironment()
def test_reset_returns_observation(self):
obs = self.env.reset(task_id="feeding_basics")
assert isinstance(obs, FarmObservation)
assert isinstance(obs, Observation)
assert obs.done is False
assert obs.reward is None # Initial obs has no reward per spec
def test_reset_observation_has_fish_data(self):
obs = self.env.reset(task_id="feeding_basics")
assert obs.avg_fish_weight > 0
assert obs.population > 0
assert obs.biomass_kg > 0
def test_reset_observation_has_water_data(self):
obs = self.env.reset(task_id="feeding_basics")
assert obs.temperature > 0
assert obs.dissolved_oxygen > 0
assert 0 < obs.ph < 14
def test_reset_sets_task_initial_conditions(self):
obs = self.env.reset(task_id="feeding_basics")
assert obs.avg_fish_weight == pytest.approx(50.0, abs=1.0)
assert obs.population == 5000
assert obs.temperature == pytest.approx(30.0, abs=2.0)
def test_reset_produces_clean_state(self):
self.env.reset(task_id="feeding_basics")
# Step once
self.env.step(FarmAction(feeding_rate=0.5, aeration_rate=0.5))
# Reset again
self.env.reset(task_id="oxygen_management")
assert self.env.state.step_count == 0
assert self.env.state.is_complete is False
def test_reset_accepts_seed_and_episode_id(self):
self.env.reset(seed=42, episode_id="test-ep-123", task_id="feeding_basics")
assert self.env.state.episode_id == "test-ep-123"
def test_reset_invalid_task_raises(self):
with pytest.raises(ValueError):
self.env.reset(task_id="nonexistent_task_that_doesnt_exist")
def test_reset_all_12_tasks(self):
"""Every task should reset without error."""
from agentic_rl.tasks import TASKS
for tid in TASKS:
obs = self.env.reset(task_id=tid)
assert obs.done is False
assert obs.population > 0
def test_reset_includes_task_description_in_feedback(self):
obs = self.env.reset(task_id="feeding_basics")
assert "TASK:" in obs.feedback
assert "feed" in obs.feedback.lower() or "fish" in obs.feedback.lower()
@pytest.mark.skipif(not HAS_OPENENV, reason="openenv-core not installed locally")
class TestStep:
def setup_method(self):
self.env = FishFarmEnvironment()
def test_step_returns_observation(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction(feeding_rate=0.5, aeration_rate=0.5)
obs = self.env.step(action)
assert isinstance(obs, FarmObservation)
assert obs.reward is not None
def test_step_advances_time(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction()
obs = self.env.step(action)
assert obs.time_of_day == 1 or self.env.state.step_count == 1
def test_step_counter_increments(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction()
self.env.step(action)
assert self.env.state.step_count == 1
self.env.step(action)
assert self.env.state.step_count == 2
def test_max_steps_ends_episode(self):
"""Episode should end when step_count reaches max_hours."""
self.env.reset(task_id="oxygen_management") # 3 * 24 = 72 hours
action = FarmAction(feeding_rate=0.3, aeration_rate=0.8)
obs = None
for _ in range(72):
obs = self.env.step(action)
assert obs.done is True
def test_harvest_ends_episode(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction(harvest_decision=True)
obs = self.env.step(action)
assert obs.done is True
def test_step_accepts_timeout_s(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction()
obs = self.env.step(action, timeout_s=5.0)
assert isinstance(obs, FarmObservation)
def test_completed_episode_returns_terminal(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction(harvest_decision=True)
self.env.step(action)
# After episode done, stepping again should return terminal obs
obs = self.env.step(FarmAction())
assert obs.done is True
def test_reward_in_valid_range(self):
self.env.reset(task_id="feeding_basics")
action = FarmAction(feeding_rate=0.5, aeration_rate=0.5)
obs = self.env.step(action)
assert -2.0 <= obs.reward <= 2.0 # rewards can be negative for penalties
def test_feeding_rate_affects_ammonia(self):
"""Higher feeding should increase ammonia over time."""
# Run with high feeding
self.env.reset(seed=42, task_id="feeding_basics")
for _ in range(24):
obs_high = self.env.step(FarmAction(feeding_rate=1.0, aeration_rate=0.5,
water_exchange_rate=0.0))
ammonia_high = obs_high.ammonia
# Run with low feeding
self.env.reset(seed=42, task_id="feeding_basics")
for _ in range(24):
obs_low = self.env.step(FarmAction(feeding_rate=0.0, aeration_rate=0.5,
water_exchange_rate=0.0))
ammonia_low = obs_low.ammonia
assert ammonia_high > ammonia_low
def test_aeration_affects_do(self):
"""Higher aeration should maintain higher DO."""
self.env.reset(seed=42, task_id="oxygen_management")
for _ in range(12):
obs_high = self.env.step(FarmAction(aeration_rate=1.0))
do_high = obs_high.dissolved_oxygen
self.env.reset(seed=42, task_id="oxygen_management")
for _ in range(12):
obs_low = self.env.step(FarmAction(aeration_rate=0.0))
do_low = obs_low.dissolved_oxygen
assert do_high > do_low
@pytest.mark.skipif(not HAS_OPENENV, reason="openenv-core not installed locally")
class TestObservationFields:
"""Verify all observation fields are populated correctly."""
def setup_method(self):
self.env = FishFarmEnvironment()
self.env.reset(task_id="feeding_basics")
self.obs = self.env.step(FarmAction(feeding_rate=0.5, aeration_rate=0.5))
def test_fish_fields(self):
assert self.obs.avg_fish_weight > 0
assert self.obs.population > 0
assert self.obs.mortality_today >= 0
assert 0.0 <= self.obs.stress_level <= 1.0
assert self.obs.feeding_response in ("eager", "normal", "reduced", "sluggish", "refusing")
assert self.obs.biomass_kg > 0
def test_water_fields(self):
assert self.obs.temperature > 0
assert self.obs.dissolved_oxygen >= 0
assert 0 < self.obs.ph < 14
assert self.obs.ammonia >= 0
assert self.obs.ammonia_toxic >= 0
assert self.obs.nitrite >= 0
assert 0 <= self.obs.water_quality_score <= 1.0
def test_system_fields(self):
assert isinstance(self.obs.aerator_working, bool)
assert isinstance(self.obs.biofilter_working, bool)
assert isinstance(self.obs.heater_working, bool)
assert self.obs.feed_remaining_kg >= 0
def test_economics_fields(self):
assert self.obs.total_cost_so_far >= 0
assert isinstance(self.obs.current_fish_value, float)
assert isinstance(self.obs.current_profit, float)
def test_context_fields(self):
assert isinstance(self.obs.weather_forecast, str)
assert 0 <= self.obs.day_in_cycle
assert 0 <= self.obs.time_of_day < 24
assert isinstance(self.obs.alerts, list)
assert isinstance(self.obs.feedback, str)
@pytest.mark.skipif(not HAS_OPENENV, reason="openenv-core not installed locally")
class TestCascadeDynamics:
"""Test the core RL challenge: biological cascade emergence."""
def test_overfeed_ammonia_cascade(self):
"""Overfeeding with no water management β†’ ammonia rises β†’ DO drops."""
env = FishFarmEnvironment()
env.reset(seed=42, task_id="feeding_basics")
initial_ammonia = env.step(FarmAction(feeding_rate=0.3, aeration_rate=0.5)).ammonia
# Overfeed for 48 hours with no aeration and no water exchange
for _ in range(47):
obs = env.step(FarmAction(feeding_rate=1.0, aeration_rate=0.0,
water_exchange_rate=0.0))
assert obs.ammonia > initial_ammonia * 2, "Ammonia should rise significantly with overfeeding"
def test_good_management_maintains_health(self):
"""Moderate feeding + adequate aeration = stable conditions for 7 days."""
env = FishFarmEnvironment()
env.reset(seed=42, task_id="feeding_basics")
for _ in range(7 * 24):
obs = env.step(FarmAction(
feeding_rate=0.4, aeration_rate=0.6,
water_exchange_rate=0.02,
))
assert obs.population > 4500, "Population should be mostly intact"
assert obs.avg_fish_weight > 50.0, "Fish should have grown"
assert obs.dissolved_oxygen > 3.0, "DO should be manageable"
@pytest.mark.skipif(not HAS_OPENENV, reason="openenv-core not installed locally")
class TestSerializability:
"""Verify all models serialize to/from JSON (required for WebSocket)."""
def test_action_round_trip(self):
action = FarmAction(
feeding_rate=0.6, aeration_rate=0.8,
heater_setting=-0.3, water_exchange_rate=0.05,
harvest_decision=False, treatment="antibiotics",
)
data = action.model_dump()
restored = FarmAction(**data)
assert restored.feeding_rate == action.feeding_rate
assert restored.treatment == action.treatment
def test_observation_round_trip(self):
env = FishFarmEnvironment()
obs = env.reset(task_id="feeding_basics")
data = obs.model_dump()
restored = FarmObservation(**data)
assert restored.avg_fish_weight == obs.avg_fish_weight
assert restored.done == obs.done
def test_state_round_trip(self):
env = FishFarmEnvironment()
env.reset(task_id="feeding_basics")
data = env.state.model_dump()
restored = FarmState(**data)
assert restored.task_id == env.state.task_id
def test_action_schema_complete(self):
schema = FarmAction.model_json_schema()
expected_fields = {"feeding_rate", "aeration_rate", "heater_setting",
"water_exchange_rate", "harvest_decision", "treatment"}
assert expected_fields.issubset(set(schema["properties"].keys()))
# Every field should have a description
for field_name in expected_fields:
assert "description" in schema["properties"][field_name], \
f"Field {field_name} missing description"