"""Fish Farm Environment — core game logic. A real-world OpenEnv environment where an AI agent manages a Nile Tilapia Recirculating Aquaculture System (RAS) fish farm. Extends openenv.core.env_server.Environment with the official interface: reset(seed, episode_id, **kwargs) -> Observation step(action, timeout_s, **kwargs) -> Observation state -> State """ import uuid from typing import Any, Dict, List, Optional from openenv.core.env_server import Environment from openenv.core.env_server.types import EnvironmentMetadata from ..models import FarmAction, FarmObservation, FarmState from ..tasks import get_task from ..engine.simulator import FishFarmSimulator from ..engine.events import Event from ..rewards import calculate_reward def _build_feedback(sim_state: Dict[str, Any], task_desc: str, hour: int) -> str: """Generate narrative feedback from the current simulation state.""" fish = sim_state["fish"] water = sim_state["water"] econ = sim_state["economics"] parts = [] # Time context time = sim_state["time"] parts.append(f"Day {time['day']}, Hour {time['hour']:02d}:00.") # Fish status resp = fish["feeding_response"] if resp == "refusing": parts.append("WARNING: Fish are refusing to eat!") elif resp == "sluggish": parts.append("Fish are feeding sluggishly — possible stress.") elif resp == "eager": parts.append("Fish are feeding eagerly — good appetite.") # Water alerts if water["DO"] < 3.0: parts.append(f"DANGER: Dissolved oxygen critically low at {water['DO']:.1f} mg/L!") elif water["DO"] < 5.0: parts.append(f"Warning: DO below optimal at {water['DO']:.1f} mg/L.") if water["UIA"] > 0.3: parts.append(f"DANGER: Toxic ammonia very high at {water['UIA']:.4f} mg/L!") elif water["UIA"] > 0.05: parts.append(f"Warning: Toxic ammonia elevated at {water['UIA']:.4f} mg/L.") # Nighttime DO crash warning (KB-02 Sec 4 — #1 killer in aquaculture) nighttime_risk = water.get("nighttime_do_risk", 0.0) if nighttime_risk > 0.7: parts.append("DANGER: High nighttime DO crash risk! Increase aeration immediately.") elif nighttime_risk > 0.4: parts.append("Warning: Elevated nighttime DO crash risk. Monitor aeration.") if water["temperature"] > 35 or water["temperature"] < 22: parts.append(f"DANGER: Water temperature at {water['temperature']:.1f}C — outside safe range!") # Mortality if fish["mortality_today"] > 50: parts.append(f"ALERT: {fish['mortality_today']} fish died in the last period!") elif fish["mortality_today"] > 10: parts.append(f"Note: {fish['mortality_today']} fish deaths recorded.") # Disease disease = sim_state["disease"] if disease["active"]: parts.append("Disease outbreak is active! Consider treatment.") # Events events = sim_state.get("events", {}) for alert in events.get("active_events", []): parts.append(f"EVENT: {alert}") # Economics milestones if fish["weight_g"] >= 400 and hour > 0: parts.append(f"Fish have reached market weight ({fish['weight_g']:.0f}g). Consider harvesting.") # Cost efficiency feedback breakdown = econ.get("cost_breakdown", {}) feed_pct = breakdown.get("feed", {}).get("pct", 0) if feed_pct > 75: parts.append(f"Feed is {feed_pct:.0f}% of costs — high. Consider reducing feeding rate.") # Vaccination readiness (actionable advice for long episodes) if (not disease["active"] and not disease.get("treatment_active", False) and disease.get("recovered", 0) == 0 and hour < 48): parts.append("Tip: Vaccination available ($100, prevents 80% of future infections).") if not parts[1:]: # no alerts after the time line parts.append("All systems nominal.") return " ".join(parts) def _calculate_reward( sim_state: Dict[str, Any], prev_state: Optional[Dict[str, Any]], reward_weights: Dict[str, float], ) -> float: """Calculate hourly reward signal. Delegates to rewards module.""" return calculate_reward(sim_state, prev_state, reward_weights) class FishFarmEnvironment(Environment[FarmAction, FarmObservation, FarmState]): """OpenEnv Fish Farm environment. AI agents manage a Nile Tilapia RAS fish farm, making hourly decisions about feeding, aeration, temperature control, water exchange, disease treatment, and harvest timing across 12 tasks of escalating difficulty. """ SUPPORTS_CONCURRENT_SESSIONS = True def get_metadata(self) -> EnvironmentMetadata: """Rich metadata for hackathon discovery and /metadata endpoint.""" import os readme = None readme_path = os.path.join(os.path.dirname(__file__), "..", "..", "..", "README.md") try: with open(readme_path, "r") as f: readme = f.read() except FileNotFoundError: pass return EnvironmentMetadata( name="Fish Farm OpenEnv", description=( "AI agent manages a Nile Tilapia Recirculating Aquaculture System (RAS) " "with 6 continuous controls (feeding, aeration, heating, water exchange, " "harvest, disease treatment), 13 coupled state variables, SEIR disease " "dynamics, bioenergetic growth model, stochastic economics, nighttime DO " "crash risk, and 12 tasks from easy to extreme difficulty." ), version="1.0.0", author="Rahul Rajpurohit", readme_content=readme, ) def __init__(self): super().__init__() self._sim = FishFarmSimulator() self._state = FarmState() self._task_config: Dict[str, Any] = {} self._episode_history: List[Dict[str, Any]] = [] self._prev_sim_state: Optional[Dict[str, Any]] = None def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: str = "feeding_basics", **kwargs: Any, ) -> FarmObservation: """Start a new episode with the given task.""" task = get_task(task_id) eid = episode_id or str(uuid.uuid4()) actual_seed = seed or 42 ic = task["initial_conditions"] sim_state = self._sim.reset( initial_weight=ic["weight_g"], initial_population=ic["population"], initial_temp=ic["temp"], initial_DO=ic["DO"], initial_TAN=ic["TAN"], initial_pH=ic["pH"], day_of_year=ic["day_of_year"], base_air_temp=ic.get("base_air_temp", 30.0), seed=actual_seed, scheduled_events=[ Event( type=e.type, trigger_hour=e.trigger_hour, severity=e.severity, duration_hours=e.duration_hours, description=e.description, equipment=e.equipment, price_multiplier=e.price_multiplier, ) for e in task["events"] ], ) self._task_config = task self._episode_history = [] self._prev_sim_state = None self._state = FarmState( episode_id=eid, step_count=0, task_id=task_id, is_complete=False, final_score=0.0, max_hours=task["episode_hours"], sim_state=sim_state, ) return self._make_observation(sim_state, done=False, reward=None, feedback=f"TASK: {task['description']}") def step( self, action: FarmAction, timeout_s: Optional[float] = None, **kwargs: Any, ) -> FarmObservation: """Process the agent's action and advance simulation by 1 hour.""" # Auto-reset if step is called without prior reset (stateless REST API) if not self._task_config: self.reset(task_id="feeding_basics") if self._state.is_complete: return self._terminal_observation() self._state.step_count += 1 sim_state = self._sim.step( feeding_rate=action.feeding_rate, aeration_rate=action.aeration_rate, heater_setting=action.heater_setting, water_exchange_rate=action.water_exchange_rate, harvest=action.harvest_decision, treatment=action.treatment, ) self._episode_history.append(sim_state) # Calculate reward reward = _calculate_reward( sim_state, self._prev_sim_state, self._task_config["reward_weights"], ) self._prev_sim_state = sim_state # Check done done = sim_state["done"] or self._state.step_count >= self._state.max_hours if done: self._state.is_complete = True # Run grader for final score from graders.farm_graders import FarmGrader grader = FarmGrader() grade_result = grader.grade( self._state.task_id, sim_state, self._episode_history, self._task_config, ) self._state.final_score = grade_result.score reward = grade_result.score # final reward is the grader score self._state.sim_state = sim_state feedback = _build_feedback(sim_state, self._task_config["description"], self._state.step_count) return self._make_observation(sim_state, done=done, reward=reward, feedback=feedback) @property def state(self) -> FarmState: """Return current internal state (for grading).""" return self._state def _make_observation( self, sim_state: Dict[str, Any], done: bool, reward: Optional[float], feedback: str, ) -> FarmObservation: """Build observation from simulator state.""" fish = sim_state["fish"] water = sim_state["water"] econ = sim_state["economics"] weather = sim_state["weather"] events = sim_state.get("events", {}) equip = events.get("equipment", {}) # Disease suspected: agent can't see SEIR counts but can infer from # behavioral signals (mortality spikes + feeding refusal + stress) disease = sim_state.get("disease", {}) disease_suspected = ( disease.get("active", False) and (fish["mortality_today"] > 5 or fish["feeding_response"] in ("sluggish", "refusing") or fish["stress_level"] > 0.4) ) return FarmObservation( done=done, reward=reward, metadata={"episode_id": self._state.episode_id, "step": self._state.step_count}, # Fish avg_fish_weight=fish["weight_g"], population=fish["population"], mortality_today=fish["mortality_today"], cumulative_mortality=fish["cumulative_mortality"], survival_rate=fish["survival_rate"], stress_level=fish["stress_level"], feeding_response=fish["feeding_response"], biomass_kg=fish["biomass_kg"], growth_rate_g_day=fish["growth_rate_g_day"], fcr=fish.get("fcr", 0.0), sgr=fish.get("sgr", 0.0), stocking_density=fish.get("stocking_density", 0.0), # Water temperature=water["temperature"], dissolved_oxygen=water["DO"], ph=water["pH"], ammonia=water["TAN"], ammonia_toxic=water["UIA"], nitrite=water["NO2"], nitrate=water.get("NO3", 0.0), water_quality_score=water["water_quality_score"], algae_bloom=water.get("algae_bloom", False), nighttime_do_risk=water.get("nighttime_do_risk", 0.0), # Equipment aerator_working=equip.get("aerator", True), biofilter_working=equip.get("biofilter", True), heater_working=equip.get("heater", True), feed_remaining_kg=econ["feed_inventory_kg"], # Economics current_fish_value=econ["fish_value"], total_cost_so_far=econ["total_cost"], current_profit=econ["current_profit"], feed_price_per_kg=econ.get("feed_price_per_kg", 0.50), market_price_multiplier=econ.get("market_price_multiplier", 1.0), marginal_cost_per_hour=econ.get("marginal_cost_per_hour", 0.0), roi_pct=econ.get("roi_pct", 0.0), # Weather weather_forecast=weather["forecast"], is_daytime=weather.get("is_daytime", True), storm_active=weather.get("storm_active", False), humidity=weather.get("humidity", 75.0), # Context day_in_cycle=sim_state["time"]["day"], time_of_day=sim_state["time"]["hour"], day_of_year=sim_state["time"].get("day_of_year", 1), alerts=events.get("active_events", []), # Disease behavioral signal disease_suspected=disease_suspected, # Feedback feedback=feedback, ) def _terminal_observation(self) -> FarmObservation: """Return observation for an already-completed episode.""" sim_state = self._state.sim_state or self._sim.get_state() return self._make_observation( sim_state, done=True, reward=self._state.final_score, feedback=f"Episode ended. Final score: {self._state.final_score:.3f}", )