Spaces:
Sleeping
Sleeping
| import logging | |
| from uuid import uuid4 | |
| from collections import deque | |
| from typing import Dict, Any, List | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from .models import AutomathreasonerAction, AutomathreasonerObservation | |
| from .generator import TaskGenerationEngine | |
| from .verifier import VerifierSystem | |
| from .rewards import RewardSystem | |
| except ImportError: | |
| from env.models import AutomathreasonerAction, AutomathreasonerObservation | |
| from env.generator import TaskGenerationEngine | |
| from env.verifier import VerifierSystem | |
| from env.rewards import RewardSystem | |
| logger = logging.getLogger(__name__) | |
| class AutomathreasonerEnvironment(Environment): | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self.generator = TaskGenerationEngine() | |
| self.verifier = VerifierSystem() | |
| self.reward_system = RewardSystem(max_len=2000) | |
| # Curriculum tracking | |
| self.difficulty_level = 2.0 # Starting difficulty | |
| self.rolling_results = deque(maxlen=20) # Keep track of last 20 results (1 for correct, 0 for incorrect) | |
| # Current problem state | |
| self.current_problem = "" | |
| self.current_solution = "" | |
| self.current_sympy_f = None # Integration Ground Truth | |
| self.times_seen_problem = 0 | |
| self.history: List[Dict[str, Any]] = [] | |
| self.max_steps = 3 | |
| def _update_curriculum(self): | |
| """Update difficulty based on rolling accuracy""" | |
| if len(self.rolling_results) >= 5: | |
| accuracy = sum(self.rolling_results) / len(self.rolling_results) | |
| if accuracy > 0.7: | |
| self.difficulty_level += 0.5 | |
| elif accuracy < 0.6: | |
| self.difficulty_level = max(1.0, self.difficulty_level - 0.5) | |
| logger.info(f"Curriculum Updated: Accuracy={accuracy:.2f}, New Difficulty={self.difficulty_level}") | |
| def reset(self) -> AutomathreasonerObservation: | |
| """Reset environment to a new problem.""" | |
| self._update_curriculum() | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| task = self.generator.generate_task(target_difficulty_band=self.difficulty_level) | |
| self.current_problem = task['problem'] | |
| self.current_solution = task['solution'] | |
| self.current_sympy_f = task.get('sympy_f') | |
| # The generator returns its own continuous difficulty score; we'll expose the target difficulty band | |
| self.times_seen_problem = 0 | |
| self.history = [] | |
| return AutomathreasonerObservation( | |
| problem_text=self.current_problem, | |
| difficulty_level=self.difficulty_level, | |
| history=[], | |
| reward=0.0, | |
| done=False | |
| ) | |
| def step(self, action: AutomathreasonerAction) -> AutomathreasonerObservation: # type: ignore[override] | |
| self._state.step_count += 1 | |
| # Verification | |
| c, q, p_sup, r_ref = self.verifier.verify( | |
| action.reasoning, | |
| action.final_answer, | |
| self.current_solution, | |
| sympy_f=self.current_sympy_f | |
| ) | |
| # Reward | |
| action_str = f"{action.reasoning} \n {action.final_answer}" | |
| total_r, components = self.reward_system.compute_reward( | |
| correctness=c, | |
| reasoning_quality=q, | |
| process_supervision=p_sup, | |
| reflection_score=r_ref, | |
| action_str=action_str, | |
| final_answer=action.final_answer, | |
| history=self.history, | |
| times_seen_problem=self.times_seen_problem | |
| ) | |
| self.times_seen_problem += 1 | |
| # Update history | |
| attempt = { | |
| "prediction": action.final_answer, | |
| "correctness": c | |
| } | |
| self.history.append(attempt) | |
| # Keep only last 3 attempts for observation | |
| obs_history = self.history[-3:] | |
| is_correct = (c == 1.0) | |
| done = is_correct or self._state.step_count >= self.max_steps | |
| if done: | |
| self.rolling_results.append(1 if is_correct else 0) | |
| return AutomathreasonerObservation( | |
| problem_text=self.current_problem, | |
| difficulty_level=self.difficulty_level, | |
| history=obs_history, | |
| reward=total_r, | |
| done=done, | |
| metadata={ | |
| "reward_components": components, | |
| "ground_truth": self.current_solution if done else "HIDDEN", # Only reveal on done or not at all | |
| "is_correct": is_correct | |
| } | |
| ) | |
| def state(self) -> State: | |
| return self._state | |