data_clean_env / server /data_clean_env_environment.py
anugrah55's picture
Upload folder using huggingface_hub
7b49766 verified
from uuid import uuid4
from typing import Any, Optional
import pandas as pd
import numpy as np
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import DataCleanAction, DataCleanObservation
except ImportError:
from models import DataCleanAction, DataCleanObservation
class DataCleanState(State):
current_df_json: str
task_name: str
target_df_json: str
class DataCleanEnvironment(Environment):
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self._state = DataCleanState(episode_id=str(uuid4()), step_count=0, current_df_json="", task_name="", target_df_json="")
self._df: pd.DataFrame = pd.DataFrame()
self._target_df: pd.DataFrame = pd.DataFrame()
def _columns_match_target(self) -> bool:
return list(self._df.columns) == list(self._target_df.columns)
def _series_matches_target(self, column_name: str) -> bool:
if column_name not in self._df.columns or column_name not in self._target_df.columns:
return False
left = self._df[column_name].reset_index(drop=True)
right = self._target_df[column_name].reset_index(drop=True)
return left.equals(right)
def _numeric_series_matches_target(self, column_name: str) -> bool:
if column_name not in self._df.columns or column_name not in self._target_df.columns:
return False
try:
left = pd.to_numeric(self._df[column_name]).reset_index(drop=True)
right = pd.to_numeric(self._target_df[column_name]).reset_index(drop=True)
except Exception:
return False
return left.equals(right)
def _get_obs(self, feedback: Optional[str] = None, error: Optional[str] = None, done: bool = False, reward: float = 0.0) -> DataCleanObservation:
schema = str(self._df.dtypes.to_dict())
missing = str(self._df.isna().sum().to_dict())
head = self._df.head().to_string()
return DataCleanObservation(
df_schema=schema,
missing_values=missing,
head=head,
last_error=error,
feedback=feedback,
done=done,
reward=reward,
)
def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, task: str = "easy_clean", **kwargs: Any) -> DataCleanObservation:
self._state = DataCleanState(episode_id=str(uuid4()), step_count=0, current_df_json="", task_name=task, target_df_json="")
if task == "easy_clean":
self._df = pd.DataFrame({"id": [1, 2, 3], "age": [25.0, np.nan, 30.0]})
self._target_df = pd.DataFrame({"id": [1, 2, 3], "age": [25.0, 0.0, 30.0]})
elif task == "medium_clean":
self._df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie", None],
"age": [25.0, np.nan, 30.0, 22.0],
"ignore_me": [1, 2, 3, 4]
})
self._target_df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25.0, np.nan, 30.0],
}).dropna(subset=["name", "age"])
self._target_df = self._target_df.reset_index(drop=True)
elif task == "hard_clean":
self._df = pd.DataFrame({
"EmployeeID": ["E1", "E2", "E3"],
"Dept": ["IT", "HR", "IT"],
"Salary": ["5000", np.nan, "6000"],
"JoinDate": [np.nan, "2020-01-01", "2021-01-01"]
})
self._target_df = pd.DataFrame({
"emp_id": ["E1", "E2", "E3"],
"Salary": [5000.0, 0.0, 6000.0],
"JoinDate": ["2000-01-01", "2020-01-01", "2021-01-01"]
})
else:
self._df = pd.DataFrame({"col": [1, 2]})
self._target_df = pd.DataFrame({"col": [1, 2]})
self._state.current_df_json = self._df.to_json()
self._state.target_df_json = self._target_df.to_json()
return self._get_obs(feedback=f"Started task {task}.")
def step(self, action: DataCleanAction) -> DataCleanObservation: # type: ignore[override]
self._state.step_count += 1
reward = 0.0
error = None
feedback = None
done = False
if action.action_type == "submit":
done = True
score = self._grade()
reward = score # Final reward based on grader
feedback = f"Submitted. Final score: {score}"
return self._get_obs(feedback=feedback, done=done, reward=reward)
col = action.column_name
val = action.value
try:
if col and col not in self._df.columns:
raise ValueError(f"Column '{col}' not found.")
if action.action_type == "fill_na":
if not col or val is None: raise ValueError("fill_na requires column_name and value.")
# Basic inference of type
try:
typed_val = float(val) if '.' in val else int(val)
except ValueError:
typed_val = val
self._df[col] = self._df[col].fillna(typed_val)
feedback = f"Filled NaNs in {col} with {val}."
reward = 0.1
elif action.action_type == "drop_na":
if not col: raise ValueError("drop_na requires column_name.")
self._df = self._df.dropna(subset=[col])
self._df = self._df.reset_index(drop=True)
feedback = f"Dropped rows with NaNs in {col}."
reward = 0.1
elif action.action_type == "drop_column":
if not col: raise ValueError("drop_column requires column_name.")
self._df = self._df.drop(columns=[col])
feedback = f"Dropped column {col}."
reward = 0.1
elif action.action_type == "rename_column":
if not col or not val: raise ValueError("rename_column requires column_name and value.")
self._df = self._df.rename(columns={col: val})
feedback = f"Renamed column {col} to {val}."
reward = 0.1
elif action.action_type == "change_type":
if not col or not val: raise ValueError("change_type requires column_name and value.")
if val == "int": self._df[col] = self._df[col].astype(int)
elif val == "float": self._df[col] = self._df[col].astype(float)
elif val == "str": self._df[col] = self._df[col].astype(str)
else: raise ValueError("Type must be 'int', 'float', or 'str'.")
feedback = f"Changed type of {col} to {val}."
reward = 0.1
except Exception as e:
error = str(e)
reward = -0.05
self._state.current_df_json = self._df.to_json()
return self._get_obs(feedback=feedback, error=error, done=done, reward=reward)
def _grade(self) -> float:
task = self._state.task_name
score = 0.0
if task == "easy_clean":
max_score = 3.0
current_score = 0.0
if self._columns_match_target():
current_score += 1.0
if "age" in self._df.columns and self._df["age"].isna().sum() == 0:
current_score += 1.0
if self._series_matches_target("age"):
current_score += 1.0
score = current_score / max_score
elif task == "medium_clean":
max_score = 4.0
current_score = 0.0
if self._columns_match_target():
current_score += 1.0
if len(self._df) == len(self._target_df):
current_score += 1.0
if self._series_matches_target("name"):
current_score += 1.0
if self._numeric_series_matches_target("age"):
current_score += 1.0
score = current_score / max_score
elif task == "hard_clean":
max_score = 4.0
current_score = 0.0
if self._columns_match_target():
current_score += 1.0
if self._series_matches_target("emp_id"):
current_score += 1.0
if self._numeric_series_matches_target("Salary"):
current_score += 1.0
if self._series_matches_target("JoinDate"):
current_score += 1.0
score = current_score / max_score
return max(0.01, min(0.99, float(score)))
@property
def state(self) -> DataCleanState:
return self._state