""" Baseline Inference Script for OpenEnv Uses OpenAI API to run a language model against the environment for reproducible baseline evaluation. Usage: export OPENAI_API_KEY=your_key python examples/baseline_inference.py --task_level medium --n_episodes 5 python examples/baseline_inference.py --all_tasks """ import argparse import json import os import sys from typing import Dict, Any, List from pathlib import Path import numpy as np import openai # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from openenv import OpenEnv, EnvConfig from openenv.core.grader import create_grader def get_openai_action(observation, task_description: str) -> np.ndarray: """ Get action from OpenAI API based on current observation. Args: observation: Current observation (Observation object) task_description: Description of the task Returns: Action array [thrust, yaw, pitch, roll] """ # Check API key api_key = os.getenv('OPENAI_API_KEY') if not api_key: raise ValueError("OPENAI_API_KEY environment variable not set") client = openai.OpenAI(api_key=api_key) # Format observation for prompt obs_text = f""" Current State: - Position: {observation.position} - Velocity: {observation.velocity} - Target: {observation.target} - Nearest Obstacle: distance={observation.obstacles[0]:.2f}, angle={observation.obstacles[1]:.2f} - Time Remaining: {observation.time_remaining:.2f} Task: {task_description} You are controlling a drone. Output 4 values between -1 and 1 for [thrust, yaw, pitch, roll]. Thrust: vertical movement (-1=down, 1=up) Yaw: rotation (-1=left, 1=right) Pitch: forward/back (-1=back, 1=forward) Roll: lateral movement (-1=left, 1=right) """ try: response = client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": "You are an expert drone pilot. Output only 4 comma-separated numbers between -1 and 1."}, {"role": "user", "content": obs_text} ], max_tokens=50, temperature=0.1 ) # Parse response content = response.choices[0].message.content.strip() values = [float(x.strip()) for x in content.split(',') if x.strip()] if len(values) != 4: print(f"Warning: Expected 4 values, got {len(values)}. Using random action.") return np.random.uniform(-1, 1, 4) # Clip to valid range action = np.clip(values, -1, 1) return action except Exception as e: print(f"OpenAI API error: {e}. Using random action.") return np.random.uniform(-1, 1, 4) def load_config_from_yaml(yaml_path: str) -> Dict[str, Any]: """Load configuration from YAML file.""" try: import yaml with open(yaml_path, 'r') as f: return yaml.safe_load(f) except ImportError: print("Warning: PyYAML not installed. Using default configuration.") return get_default_config() except FileNotFoundError: print(f"Warning: {yaml_path} not found. Using default configuration.") return get_default_config() def get_default_config() -> Dict[str, Any]: """Get default configuration.""" return { 'tasks': { 'easy': { 'config': { 'episode_length': 300, 'boundary_limit': 80.0, 'max_velocity': 60.0, 'gravity': 5.0, 'friction': 0.02, 'obstacle_count': 0, 'wind_disturbance': False, 'sensor_noise': 0.0, }, 'grader': { 'success_threshold': 0.7, 'criteria': [ {'name': 'reached_target', 'weight': 0.6}, {'name': 'time_efficiency', 'weight': 0.2}, {'name': 'energy_efficiency', 'weight': 0.2}, ] } }, 'medium': { 'config': { 'episode_length': 500, 'boundary_limit': 60.0, 'max_velocity': 50.0, 'gravity': 7.0, 'friction': 0.03, 'obstacle_count': 5, 'wind_disturbance': False, 'sensor_noise': 0.05, }, 'grader': { 'success_threshold': 0.75, 'criteria': [ {'name': 'reached_target', 'weight': 0.5}, {'name': 'collision_avoidance', 'weight': 0.25}, {'name': 'time_efficiency', 'weight': 0.15}, {'name': 'energy_efficiency', 'weight': 0.1}, ] } }, 'hard': { 'config': { 'episode_length': 700, 'boundary_limit': 50.0, 'max_velocity': 40.0, 'gravity': 9.0, 'friction': 0.05, 'obstacle_count': 10, 'wind_disturbance': True, 'sensor_noise': 0.1, }, 'grader': { 'success_threshold': 0.8, 'criteria': [ {'name': 'reached_target', 'weight': 0.45}, {'name': 'collision_avoidance', 'weight': 0.25}, {'name': 'wind_compensation', 'weight': 0.15}, {'name': 'time_efficiency', 'weight': 0.1}, {'name': 'energy_efficiency', 'weight': 0.05}, ] } }, } } def run_episode( env: OpenEnv, grader, seed: int, task_description: str, render: bool = False, ) -> Dict[str, Any]: """ Run single episode and collect metrics. Args: env: Environment instance grader: Task grader instance seed: Random seed render: Whether to render Returns: Episode results dictionary """ # Reset environment and grader obs, info = env.reset(seed=seed) grader.reset() done = False total_reward = 0.0 steps = 0 prev_position = env.position.copy() optimal_distance = np.linalg.norm(env.target_position - env.position) grader.episode_data['optimal_distance'] = optimal_distance while not done: # Get action from OpenAI action = get_openai_action(obs, task_description) # Take step obs, reward, terminated, truncated, info = env.step(action) # Update grader with metrics current_position = env.position.copy() distance_delta = np.linalg.norm(current_position - prev_position) grader.update( steps=1, distance_traveled=distance_delta, energy_consumed=np.sum(np.abs(action)) * 0.5, ) # Check for collisions (if obstacles exist) if hasattr(env, 'check_collision') and env.check_collision(): grader.update(collisions=1) # Track wind deviation if env.config.wind_disturbance and hasattr(env, 'wind_deviation'): grader.update(max_wind_deviation=max( grader.episode_data['max_wind_deviation'], env.wind_deviation )) # Update position prev_position = current_position.copy() # Accumulate reward total_reward += reward steps += 1 # Render if requested if render: env.render() # Check termination done = terminated or truncated # Final updates to grader final_distance = np.linalg.norm(env.position - env.target_position) grader.update( target_reached=final_distance < getattr(env, 'target_radius', 5.0), final_distance_to_target=final_distance, time_to_complete=steps, ) # Get grade report grade_report = grader.get_grade_report() # Compile results results = { 'seed': seed, 'steps': steps, 'total_reward': total_reward, 'final_score': grade_report['final_score'], 'passed': grade_report['passed'], 'criteria_scores': grade_report['criteria_scores'], 'episode_data': grade_report['episode_data'], 'feedback': grade_report['feedback'], } return results def evaluate_task( task_level: str, config: Dict[str, Any], n_episodes: int = 10, seed: int = 42, render: bool = False, verbose: bool = True, ) -> Dict[str, Any]: """ Evaluate agent on specific task level. Args: task_level: Difficulty level config: Task configuration n_episodes: Number of episodes seed: Base random seed render: Render episodes verbose: Print progress Returns: Aggregated evaluation results """ if verbose: print(f"\n{'='*60}") print(f"Evaluating {task_level.upper()} task") print(f"{'='*60}") print(f"Configuration:") for key, value in config['config'].items(): print(f" {key}: {value}") print(f"Grading criteria:") for criterion in config['grader']['criteria']: print(f" - {criterion['name']}: {criterion['weight']*100:.0f}%") print(f"{'='*60}\n") # Create environment env_config = EnvConfig( **config['config'], task_level=task_level, verbose=False, ) env = OpenEnv(config=env_config) # Create grader grader = create_grader(task_level, config['grader']) # Run episodes episode_results = [] for ep in range(n_episodes): episode_seed = seed + ep result = run_episode(env, grader, episode_seed, config['description'], render=render) episode_results.append(result) if verbose: status = "✓ PASSED" if result['passed'] else "✗ FAILED" print(f"Episode {ep+1}/{n_episodes} (seed={episode_seed}): " f"Score={result['final_score']:.3f} {status}") env.close() # Aggregate results scores = [r['final_score'] for r in episode_results] rewards = [r['total_reward'] for r in episode_results] steps = [r['steps'] for r in episode_results] passed_count = sum(1 for r in episode_results if r['passed']) aggregated = { 'task_level': task_level, 'n_episodes': n_episodes, 'base_seed': seed, 'mean_score': float(np.mean(scores)), 'std_score': float(np.std(scores)), 'min_score': float(np.min(scores)), 'max_score': float(np.max(scores)), 'pass_rate': passed_count / n_episodes, 'mean_reward': float(np.mean(rewards)), 'std_reward': float(np.std(rewards)), 'mean_steps': float(np.mean(steps)), 'episode_results': episode_results, } if verbose: print(f"\n{'='*60}") print(f"Results Summary - {task_level.upper()}") print(f"{'='*60}") print(f"Mean Score: {aggregated['mean_score']:.3f} ± {aggregated['std_score']:.3f}") print(f"Score Range: [{aggregated['min_score']:.3f}, {aggregated['max_score']:.3f}]") print(f"Pass Rate: {aggregated['pass_rate']*100:.1f}% ({passed_count}/{n_episodes})") print(f"Mean Reward: {aggregated['mean_reward']:.2f} ± {aggregated['std_reward']:.2f}") print(f"Mean Steps: {aggregated['mean_steps']:.1f}") print(f"{'='*60}\n") return aggregated def main(): """Main inference pipeline.""" parser = argparse.ArgumentParser(description='Baseline Inference for OpenEnv') parser.add_argument('--task_level', type=str, default='medium', choices=['easy', 'medium', 'hard'], help='Task difficulty level') parser.add_argument('--all_tasks', action='store_true', help='Evaluate on all difficulty levels') parser.add_argument('--n_episodes', type=int, default=10, help='Number of evaluation episodes') parser.add_argument('--seed', type=int, default=42, help='Base random seed') parser.add_argument('--config', type=str, default='openenv.yaml', help='Path to configuration file') parser.add_argument('--render', action='store_true', help='Render episodes') parser.add_argument('--output', type=str, default='results.json', help='Output file for results') parser.add_argument('--quiet', action='store_true', help='Suppress verbose output') args = parser.parse_args() print("="*60) print("OpenEnv Baseline Inference") print("="*60) # Load configuration yaml_config = load_config_from_yaml(args.config) # Determine which tasks to evaluate if args.all_tasks: task_levels = ['easy', 'medium', 'hard'] else: task_levels = [args.task_level] all_results = {} # Evaluate each task level for task_level in task_levels: task_config = yaml_config['tasks'][task_level] results = evaluate_task( task_level=task_level, config=task_config, n_episodes=args.n_episodes, seed=args.seed, render=args.render, verbose=not args.quiet, ) all_results[task_level] = results # Save results output_dir = os.path.dirname(args.output) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) with open(args.output, 'w') as f: json.dump(all_results, f, indent=2) print(f"\nResults saved to {args.output}") # Print overall summary if len(task_levels) > 1: print("\n" + "="*60) print("Overall Performance Summary") print("="*60) for task_level in task_levels: results = all_results[task_level] print(f"{task_level.upper():10s}: Score={results['mean_score']:.3f} ± " f"{results['std_score']:.3f}, Pass Rate={results['pass_rate']*100:.1f}%") print("="*60) return all_results if __name__ == "__main__": main()