from uuid import uuid4 from typing import Any, Optional import pandas as pd import numpy as np from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State try: from ..models import DataCleanAction, DataCleanObservation except ImportError: from models import DataCleanAction, DataCleanObservation class DataCleanState(State): current_df_json: str task_name: str target_df_json: str class DataCleanEnvironment(Environment): SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): self._state = DataCleanState(episode_id=str(uuid4()), step_count=0, current_df_json="", task_name="", target_df_json="") self._df: pd.DataFrame = pd.DataFrame() self._target_df: pd.DataFrame = pd.DataFrame() def _columns_match_target(self) -> bool: return list(self._df.columns) == list(self._target_df.columns) def _series_matches_target(self, column_name: str) -> bool: if column_name not in self._df.columns or column_name not in self._target_df.columns: return False left = self._df[column_name].reset_index(drop=True) right = self._target_df[column_name].reset_index(drop=True) return left.equals(right) def _numeric_series_matches_target(self, column_name: str) -> bool: if column_name not in self._df.columns or column_name not in self._target_df.columns: return False try: left = pd.to_numeric(self._df[column_name]).reset_index(drop=True) right = pd.to_numeric(self._target_df[column_name]).reset_index(drop=True) except Exception: return False return left.equals(right) def _get_obs(self, feedback: Optional[str] = None, error: Optional[str] = None, done: bool = False, reward: float = 0.0) -> DataCleanObservation: schema = str(self._df.dtypes.to_dict()) missing = str(self._df.isna().sum().to_dict()) head = self._df.head().to_string() return DataCleanObservation( df_schema=schema, missing_values=missing, head=head, last_error=error, feedback=feedback, done=done, reward=reward, ) def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, task: str = "easy_clean", **kwargs: Any) -> DataCleanObservation: self._state = DataCleanState(episode_id=str(uuid4()), step_count=0, current_df_json="", task_name=task, target_df_json="") if task == "easy_clean": self._df = pd.DataFrame({"id": [1, 2, 3], "age": [25.0, np.nan, 30.0]}) self._target_df = pd.DataFrame({"id": [1, 2, 3], "age": [25.0, 0.0, 30.0]}) elif task == "medium_clean": self._df = pd.DataFrame({ "name": ["Alice", "Bob", "Charlie", None], "age": [25.0, np.nan, 30.0, 22.0], "ignore_me": [1, 2, 3, 4] }) self._target_df = pd.DataFrame({ "name": ["Alice", "Bob", "Charlie"], "age": [25.0, np.nan, 30.0], }).dropna(subset=["name", "age"]) self._target_df = self._target_df.reset_index(drop=True) elif task == "hard_clean": self._df = pd.DataFrame({ "EmployeeID": ["E1", "E2", "E3"], "Dept": ["IT", "HR", "IT"], "Salary": ["5000", np.nan, "6000"], "JoinDate": [np.nan, "2020-01-01", "2021-01-01"] }) self._target_df = pd.DataFrame({ "emp_id": ["E1", "E2", "E3"], "Salary": [5000.0, 0.0, 6000.0], "JoinDate": ["2000-01-01", "2020-01-01", "2021-01-01"] }) else: self._df = pd.DataFrame({"col": [1, 2]}) self._target_df = pd.DataFrame({"col": [1, 2]}) self._state.current_df_json = self._df.to_json() self._state.target_df_json = self._target_df.to_json() return self._get_obs(feedback=f"Started task {task}.") def step(self, action: DataCleanAction) -> DataCleanObservation: # type: ignore[override] self._state.step_count += 1 reward = 0.0 error = None feedback = None done = False if action.action_type == "submit": done = True score = self._grade() reward = score # Final reward based on grader feedback = f"Submitted. Final score: {score}" return self._get_obs(feedback=feedback, done=done, reward=reward) col = action.column_name val = action.value try: if col and col not in self._df.columns: raise ValueError(f"Column '{col}' not found.") if action.action_type == "fill_na": if not col or val is None: raise ValueError("fill_na requires column_name and value.") # Basic inference of type try: typed_val = float(val) if '.' in val else int(val) except ValueError: typed_val = val self._df[col] = self._df[col].fillna(typed_val) feedback = f"Filled NaNs in {col} with {val}." reward = 0.1 elif action.action_type == "drop_na": if not col: raise ValueError("drop_na requires column_name.") self._df = self._df.dropna(subset=[col]) self._df = self._df.reset_index(drop=True) feedback = f"Dropped rows with NaNs in {col}." reward = 0.1 elif action.action_type == "drop_column": if not col: raise ValueError("drop_column requires column_name.") self._df = self._df.drop(columns=[col]) feedback = f"Dropped column {col}." reward = 0.1 elif action.action_type == "rename_column": if not col or not val: raise ValueError("rename_column requires column_name and value.") self._df = self._df.rename(columns={col: val}) feedback = f"Renamed column {col} to {val}." reward = 0.1 elif action.action_type == "change_type": if not col or not val: raise ValueError("change_type requires column_name and value.") if val == "int": self._df[col] = self._df[col].astype(int) elif val == "float": self._df[col] = self._df[col].astype(float) elif val == "str": self._df[col] = self._df[col].astype(str) else: raise ValueError("Type must be 'int', 'float', or 'str'.") feedback = f"Changed type of {col} to {val}." reward = 0.1 except Exception as e: error = str(e) reward = -0.05 self._state.current_df_json = self._df.to_json() return self._get_obs(feedback=feedback, error=error, done=done, reward=reward) def _grade(self) -> float: task = self._state.task_name score = 0.0 if task == "easy_clean": max_score = 3.0 current_score = 0.0 if self._columns_match_target(): current_score += 1.0 if "age" in self._df.columns and self._df["age"].isna().sum() == 0: current_score += 1.0 if self._series_matches_target("age"): current_score += 1.0 score = current_score / max_score elif task == "medium_clean": max_score = 4.0 current_score = 0.0 if self._columns_match_target(): current_score += 1.0 if len(self._df) == len(self._target_df): current_score += 1.0 if self._series_matches_target("name"): current_score += 1.0 if self._numeric_series_matches_target("age"): current_score += 1.0 score = current_score / max_score elif task == "hard_clean": max_score = 4.0 current_score = 0.0 if self._columns_match_target(): current_score += 1.0 if self._series_matches_target("emp_id"): current_score += 1.0 if self._numeric_series_matches_target("Salary"): current_score += 1.0 if self._series_matches_target("JoinDate"): current_score += 1.0 score = current_score / max_score return max(0.01, min(0.99, float(score))) @property def state(self) -> DataCleanState: return self._state