fish_farm_env / src /agentic_rl /server /environment.py
rahul24raj's picture
Upload folder using huggingface_hub
9ae446c verified
"""Fish Farm Environment — core game logic.
A real-world OpenEnv environment where an AI agent manages a Nile Tilapia
Recirculating Aquaculture System (RAS) fish farm.
Extends openenv.core.env_server.Environment with the official interface:
reset(seed, episode_id, **kwargs) -> Observation
step(action, timeout_s, **kwargs) -> Observation
state -> State
"""
import uuid
from typing import Any, Dict, List, Optional
from openenv.core.env_server import Environment
from openenv.core.env_server.types import EnvironmentMetadata
from ..models import FarmAction, FarmObservation, FarmState
from ..tasks import get_task
from ..engine.simulator import FishFarmSimulator
from ..engine.events import Event
from ..rewards import calculate_reward
def _build_feedback(sim_state: Dict[str, Any], task_desc: str, hour: int) -> str:
"""Generate narrative feedback from the current simulation state."""
fish = sim_state["fish"]
water = sim_state["water"]
econ = sim_state["economics"]
parts = []
# Time context
time = sim_state["time"]
parts.append(f"Day {time['day']}, Hour {time['hour']:02d}:00.")
# Fish status
resp = fish["feeding_response"]
if resp == "refusing":
parts.append("WARNING: Fish are refusing to eat!")
elif resp == "sluggish":
parts.append("Fish are feeding sluggishly — possible stress.")
elif resp == "eager":
parts.append("Fish are feeding eagerly — good appetite.")
# Water alerts
if water["DO"] < 3.0:
parts.append(f"DANGER: Dissolved oxygen critically low at {water['DO']:.1f} mg/L!")
elif water["DO"] < 5.0:
parts.append(f"Warning: DO below optimal at {water['DO']:.1f} mg/L.")
if water["UIA"] > 0.3:
parts.append(f"DANGER: Toxic ammonia very high at {water['UIA']:.4f} mg/L!")
elif water["UIA"] > 0.05:
parts.append(f"Warning: Toxic ammonia elevated at {water['UIA']:.4f} mg/L.")
# Nighttime DO crash warning (KB-02 Sec 4 — #1 killer in aquaculture)
nighttime_risk = water.get("nighttime_do_risk", 0.0)
if nighttime_risk > 0.7:
parts.append("DANGER: High nighttime DO crash risk! Increase aeration immediately.")
elif nighttime_risk > 0.4:
parts.append("Warning: Elevated nighttime DO crash risk. Monitor aeration.")
if water["temperature"] > 35 or water["temperature"] < 22:
parts.append(f"DANGER: Water temperature at {water['temperature']:.1f}C — outside safe range!")
# Mortality
if fish["mortality_today"] > 50:
parts.append(f"ALERT: {fish['mortality_today']} fish died in the last period!")
elif fish["mortality_today"] > 10:
parts.append(f"Note: {fish['mortality_today']} fish deaths recorded.")
# Disease
disease = sim_state["disease"]
if disease["active"]:
parts.append("Disease outbreak is active! Consider treatment.")
# Events
events = sim_state.get("events", {})
for alert in events.get("active_events", []):
parts.append(f"EVENT: {alert}")
# Economics milestones
if fish["weight_g"] >= 400 and hour > 0:
parts.append(f"Fish have reached market weight ({fish['weight_g']:.0f}g). Consider harvesting.")
# Cost efficiency feedback
breakdown = econ.get("cost_breakdown", {})
feed_pct = breakdown.get("feed", {}).get("pct", 0)
if feed_pct > 75:
parts.append(f"Feed is {feed_pct:.0f}% of costs — high. Consider reducing feeding rate.")
# Vaccination readiness (actionable advice for long episodes)
if (not disease["active"] and
not disease.get("treatment_active", False) and
disease.get("recovered", 0) == 0 and
hour < 48):
parts.append("Tip: Vaccination available ($100, prevents 80% of future infections).")
if not parts[1:]: # no alerts after the time line
parts.append("All systems nominal.")
return " ".join(parts)
def _calculate_reward(
sim_state: Dict[str, Any],
prev_state: Optional[Dict[str, Any]],
reward_weights: Dict[str, float],
) -> float:
"""Calculate hourly reward signal. Delegates to rewards module."""
return calculate_reward(sim_state, prev_state, reward_weights)
class FishFarmEnvironment(Environment[FarmAction, FarmObservation, FarmState]):
"""OpenEnv Fish Farm environment.
AI agents manage a Nile Tilapia RAS fish farm, making hourly decisions
about feeding, aeration, temperature control, water exchange, disease
treatment, and harvest timing across 12 tasks of escalating difficulty.
"""
SUPPORTS_CONCURRENT_SESSIONS = True
def get_metadata(self) -> EnvironmentMetadata:
"""Rich metadata for hackathon discovery and /metadata endpoint."""
import os
readme = None
readme_path = os.path.join(os.path.dirname(__file__), "..", "..", "..", "README.md")
try:
with open(readme_path, "r") as f:
readme = f.read()
except FileNotFoundError:
pass
return EnvironmentMetadata(
name="Fish Farm OpenEnv",
description=(
"AI agent manages a Nile Tilapia Recirculating Aquaculture System (RAS) "
"with 6 continuous controls (feeding, aeration, heating, water exchange, "
"harvest, disease treatment), 13 coupled state variables, SEIR disease "
"dynamics, bioenergetic growth model, stochastic economics, nighttime DO "
"crash risk, and 12 tasks from easy to extreme difficulty."
),
version="1.0.0",
author="Rahul Rajpurohit",
readme_content=readme,
)
def __init__(self):
super().__init__()
self._sim = FishFarmSimulator()
self._state = FarmState()
self._task_config: Dict[str, Any] = {}
self._episode_history: List[Dict[str, Any]] = []
self._prev_sim_state: Optional[Dict[str, Any]] = None
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: str = "feeding_basics",
**kwargs: Any,
) -> FarmObservation:
"""Start a new episode with the given task."""
task = get_task(task_id)
eid = episode_id or str(uuid.uuid4())
actual_seed = seed or 42
ic = task["initial_conditions"]
sim_state = self._sim.reset(
initial_weight=ic["weight_g"],
initial_population=ic["population"],
initial_temp=ic["temp"],
initial_DO=ic["DO"],
initial_TAN=ic["TAN"],
initial_pH=ic["pH"],
day_of_year=ic["day_of_year"],
base_air_temp=ic.get("base_air_temp", 30.0),
seed=actual_seed,
scheduled_events=[
Event(
type=e.type, trigger_hour=e.trigger_hour,
severity=e.severity, duration_hours=e.duration_hours,
description=e.description, equipment=e.equipment,
price_multiplier=e.price_multiplier,
) for e in task["events"]
],
)
self._task_config = task
self._episode_history = []
self._prev_sim_state = None
self._state = FarmState(
episode_id=eid,
step_count=0,
task_id=task_id,
is_complete=False,
final_score=0.0,
max_hours=task["episode_hours"],
sim_state=sim_state,
)
return self._make_observation(sim_state, done=False, reward=None,
feedback=f"TASK: {task['description']}")
def step(
self,
action: FarmAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> FarmObservation:
"""Process the agent's action and advance simulation by 1 hour."""
# Auto-reset if step is called without prior reset (stateless REST API)
if not self._task_config:
self.reset(task_id="feeding_basics")
if self._state.is_complete:
return self._terminal_observation()
self._state.step_count += 1
sim_state = self._sim.step(
feeding_rate=action.feeding_rate,
aeration_rate=action.aeration_rate,
heater_setting=action.heater_setting,
water_exchange_rate=action.water_exchange_rate,
harvest=action.harvest_decision,
treatment=action.treatment,
)
self._episode_history.append(sim_state)
# Calculate reward
reward = _calculate_reward(
sim_state, self._prev_sim_state,
self._task_config["reward_weights"],
)
self._prev_sim_state = sim_state
# Check done
done = sim_state["done"] or self._state.step_count >= self._state.max_hours
if done:
self._state.is_complete = True
# Run grader for final score
from graders.farm_graders import FarmGrader
grader = FarmGrader()
grade_result = grader.grade(
self._state.task_id, sim_state,
self._episode_history, self._task_config,
)
self._state.final_score = grade_result.score
reward = grade_result.score # final reward is the grader score
self._state.sim_state = sim_state
feedback = _build_feedback(sim_state, self._task_config["description"],
self._state.step_count)
return self._make_observation(sim_state, done=done, reward=reward,
feedback=feedback)
@property
def state(self) -> FarmState:
"""Return current internal state (for grading)."""
return self._state
def _make_observation(
self, sim_state: Dict[str, Any],
done: bool, reward: Optional[float], feedback: str,
) -> FarmObservation:
"""Build observation from simulator state."""
fish = sim_state["fish"]
water = sim_state["water"]
econ = sim_state["economics"]
weather = sim_state["weather"]
events = sim_state.get("events", {})
equip = events.get("equipment", {})
# Disease suspected: agent can't see SEIR counts but can infer from
# behavioral signals (mortality spikes + feeding refusal + stress)
disease = sim_state.get("disease", {})
disease_suspected = (
disease.get("active", False)
and (fish["mortality_today"] > 5
or fish["feeding_response"] in ("sluggish", "refusing")
or fish["stress_level"] > 0.4)
)
return FarmObservation(
done=done,
reward=reward,
metadata={"episode_id": self._state.episode_id,
"step": self._state.step_count},
# Fish
avg_fish_weight=fish["weight_g"],
population=fish["population"],
mortality_today=fish["mortality_today"],
cumulative_mortality=fish["cumulative_mortality"],
survival_rate=fish["survival_rate"],
stress_level=fish["stress_level"],
feeding_response=fish["feeding_response"],
biomass_kg=fish["biomass_kg"],
growth_rate_g_day=fish["growth_rate_g_day"],
fcr=fish.get("fcr", 0.0),
sgr=fish.get("sgr", 0.0),
stocking_density=fish.get("stocking_density", 0.0),
# Water
temperature=water["temperature"],
dissolved_oxygen=water["DO"],
ph=water["pH"],
ammonia=water["TAN"],
ammonia_toxic=water["UIA"],
nitrite=water["NO2"],
nitrate=water.get("NO3", 0.0),
water_quality_score=water["water_quality_score"],
algae_bloom=water.get("algae_bloom", False),
nighttime_do_risk=water.get("nighttime_do_risk", 0.0),
# Equipment
aerator_working=equip.get("aerator", True),
biofilter_working=equip.get("biofilter", True),
heater_working=equip.get("heater", True),
feed_remaining_kg=econ["feed_inventory_kg"],
# Economics
current_fish_value=econ["fish_value"],
total_cost_so_far=econ["total_cost"],
current_profit=econ["current_profit"],
feed_price_per_kg=econ.get("feed_price_per_kg", 0.50),
market_price_multiplier=econ.get("market_price_multiplier", 1.0),
marginal_cost_per_hour=econ.get("marginal_cost_per_hour", 0.0),
roi_pct=econ.get("roi_pct", 0.0),
# Weather
weather_forecast=weather["forecast"],
is_daytime=weather.get("is_daytime", True),
storm_active=weather.get("storm_active", False),
humidity=weather.get("humidity", 75.0),
# Context
day_in_cycle=sim_state["time"]["day"],
time_of_day=sim_state["time"]["hour"],
day_of_year=sim_state["time"].get("day_of_year", 1),
alerts=events.get("active_events", []),
# Disease behavioral signal
disease_suspected=disease_suspected,
# Feedback
feedback=feedback,
)
def _terminal_observation(self) -> FarmObservation:
"""Return observation for an already-completed episode."""
sim_state = self._state.sim_state or self._sim.get_state()
return self._make_observation(
sim_state, done=True, reward=self._state.final_score,
feedback=f"Episode ended. Final score: {self._state.final_score:.3f}",
)