Spaces:
Paused
Paused
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import Action, Observation, State | |
| import random | |
| from typing import Any, Dict, Optional | |
| from dataclasses import dataclass, field | |
| from uuid import uuid4 | |
| from overview_env.models import OverviewObservation, OverviewAction, AnalysisResult, TaskType | |
| from overview_env.tasks.definitions import ( | |
| get_all_tasks, | |
| get_task_by_id, | |
| OverviewTask, | |
| OverviewTaskEvaluator, | |
| GradingResult, | |
| ) | |
| class _EpisodeState: | |
| task: OverviewTask | |
| episode_id: str | |
| current_step: int = 0 | |
| cumulative_reward: float = 0.0 | |
| submitted_analysis: Dict[str, Any] = field(default_factory=dict) | |
| episode_complete: bool = False | |
| DIFFICULTY_WEIGHTS = {"easy": 0.15, "medium": 0.12, "hard": 0.08} | |
| class OverviewEnvironment(Environment): | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self, task_id: Optional[str] = None, seed: Optional[int] = None, max_steps: int = 10): | |
| self._task_id = task_id | |
| self._seed = seed | |
| self._max_steps = max_steps | |
| self._ep: Optional[_EpisodeState] = None | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| if seed is not None: | |
| random.seed(seed) | |
| def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: Optional[str] = None, **kwargs: Any) -> Observation: | |
| if seed is not None: | |
| random.seed(seed) | |
| target_task_id = task_id or self._task_id | |
| if target_task_id: | |
| task = get_task_by_id(target_task_id) | |
| else: | |
| task = random.choice(get_all_tasks()) | |
| eid = episode_id or str(uuid4()) | |
| self._ep = _EpisodeState(task=task, episode_id=eid, current_step=0, cumulative_reward=0.0, submitted_analysis={}, episode_complete=False) | |
| self._state = State(episode_id=eid, step_count=0) | |
| obs = self._build_observation() | |
| return Observation(done=False, reward=0.0, metadata=obs.model_dump()) | |
| def step(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| if self._ep is None: | |
| return Observation(done=True, reward=0.0, metadata={"error": "Environment not reset. Call reset() first."}) | |
| if self._ep.episode_complete: | |
| return Observation(done=True, reward=0.0, metadata={"error": "Episode already finished."}) | |
| action_data = {} | |
| if hasattr(action, "data") and isinstance(action.data, dict): | |
| action_data = action.data | |
| elif isinstance(action, dict): | |
| action_data = action | |
| elif hasattr(action, "__dict__"): | |
| action_data = vars(action) | |
| try: | |
| env_action = OverviewAction.model_validate(action_data) | |
| except Exception: | |
| env_action = OverviewAction(analysis=AnalysisResult()) | |
| self._ep.current_step += 1 | |
| self._state.step_count = self._ep.current_step | |
| analysis_dict = env_action.analysis.model_dump() | |
| self._ep.submitted_analysis = analysis_dict | |
| evaluator = OverviewTaskEvaluator(self._ep.task) | |
| result = evaluator.grade(analysis_dict) | |
| reward_value = result.score * DIFFICULTY_WEIGHTS.get(self._ep.task.difficulty, 0.1) | |
| terminal = env_action.submit or self._ep.current_step >= self._ep.task.max_steps | |
| if terminal: | |
| final_score = result.score | |
| if self._ep.current_step <= self._ep.task.max_steps * 0.5: | |
| final_score += 0.1 | |
| reward_value = min(1.0, final_score) | |
| self._ep.episode_complete = True | |
| self._ep.cumulative_reward += reward_value | |
| obs = self._build_observation() | |
| step_result = {"observation": obs.model_dump(), "reward": {"value": reward_value, "total": result.score, "feedback": result.feedback}, "done": self._ep.episode_complete, "info": {"step": self._ep.current_step}} | |
| return Observation(done=self._ep.episode_complete, reward=reward_value, metadata=step_result) | |
| def state(self) -> State: | |
| return self._state | |
| def _build_observation(self) -> OverviewObservation: | |
| ep = self._ep | |
| return OverviewObservation(task_id=ep.task.task_id, task_type=TaskType(ep.task.task_type), task_name=ep.task.name, task_description=ep.task.description, difficulty=ep.task.difficulty, input_text=ep.task.input_text, question=ep.task.question, expected_output=ep.task.expected_output, max_steps=ep.task.max_steps) |