OpenEnv / examples /baseline_inference.py
mahammadaftab's picture
Update space
3eb9552
"""
Baseline Inference Script for OpenEnv
Uses OpenAI API to run a language model against the environment for reproducible baseline evaluation.
Usage:
export OPENAI_API_KEY=your_key
python examples/baseline_inference.py --task_level medium --n_episodes 5
python examples/baseline_inference.py --all_tasks
"""
import argparse
import json
import os
import sys
from typing import Dict, Any, List
from pathlib import Path
import numpy as np
import openai
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from openenv import OpenEnv, EnvConfig
from openenv.core.grader import create_grader
def get_openai_action(observation, task_description: str) -> np.ndarray:
"""
Get action from OpenAI API based on current observation.
Args:
observation: Current observation (Observation object)
task_description: Description of the task
Returns:
Action array [thrust, yaw, pitch, roll]
"""
# Check API key
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
client = openai.OpenAI(api_key=api_key)
# Format observation for prompt
obs_text = f"""
Current State:
- Position: {observation.position}
- Velocity: {observation.velocity}
- Target: {observation.target}
- Nearest Obstacle: distance={observation.obstacles[0]:.2f}, angle={observation.obstacles[1]:.2f}
- Time Remaining: {observation.time_remaining:.2f}
Task: {task_description}
You are controlling a drone. Output 4 values between -1 and 1 for [thrust, yaw, pitch, roll].
Thrust: vertical movement (-1=down, 1=up)
Yaw: rotation (-1=left, 1=right)
Pitch: forward/back (-1=back, 1=forward)
Roll: lateral movement (-1=left, 1=right)
"""
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an expert drone pilot. Output only 4 comma-separated numbers between -1 and 1."},
{"role": "user", "content": obs_text}
],
max_tokens=50,
temperature=0.1
)
# Parse response
content = response.choices[0].message.content.strip()
values = [float(x.strip()) for x in content.split(',') if x.strip()]
if len(values) != 4:
print(f"Warning: Expected 4 values, got {len(values)}. Using random action.")
return np.random.uniform(-1, 1, 4)
# Clip to valid range
action = np.clip(values, -1, 1)
return action
except Exception as e:
print(f"OpenAI API error: {e}. Using random action.")
return np.random.uniform(-1, 1, 4)
def load_config_from_yaml(yaml_path: str) -> Dict[str, Any]:
"""Load configuration from YAML file."""
try:
import yaml
with open(yaml_path, 'r') as f:
return yaml.safe_load(f)
except ImportError:
print("Warning: PyYAML not installed. Using default configuration.")
return get_default_config()
except FileNotFoundError:
print(f"Warning: {yaml_path} not found. Using default configuration.")
return get_default_config()
def get_default_config() -> Dict[str, Any]:
"""Get default configuration."""
return {
'tasks': {
'easy': {
'config': {
'episode_length': 300,
'boundary_limit': 80.0,
'max_velocity': 60.0,
'gravity': 5.0,
'friction': 0.02,
'obstacle_count': 0,
'wind_disturbance': False,
'sensor_noise': 0.0,
},
'grader': {
'success_threshold': 0.7,
'criteria': [
{'name': 'reached_target', 'weight': 0.6},
{'name': 'time_efficiency', 'weight': 0.2},
{'name': 'energy_efficiency', 'weight': 0.2},
]
}
},
'medium': {
'config': {
'episode_length': 500,
'boundary_limit': 60.0,
'max_velocity': 50.0,
'gravity': 7.0,
'friction': 0.03,
'obstacle_count': 5,
'wind_disturbance': False,
'sensor_noise': 0.05,
},
'grader': {
'success_threshold': 0.75,
'criteria': [
{'name': 'reached_target', 'weight': 0.5},
{'name': 'collision_avoidance', 'weight': 0.25},
{'name': 'time_efficiency', 'weight': 0.15},
{'name': 'energy_efficiency', 'weight': 0.1},
]
}
},
'hard': {
'config': {
'episode_length': 700,
'boundary_limit': 50.0,
'max_velocity': 40.0,
'gravity': 9.0,
'friction': 0.05,
'obstacle_count': 10,
'wind_disturbance': True,
'sensor_noise': 0.1,
},
'grader': {
'success_threshold': 0.8,
'criteria': [
{'name': 'reached_target', 'weight': 0.45},
{'name': 'collision_avoidance', 'weight': 0.25},
{'name': 'wind_compensation', 'weight': 0.15},
{'name': 'time_efficiency', 'weight': 0.1},
{'name': 'energy_efficiency', 'weight': 0.05},
]
}
},
}
}
def run_episode(
env: OpenEnv,
grader,
seed: int,
task_description: str,
render: bool = False,
) -> Dict[str, Any]:
"""
Run single episode and collect metrics.
Args:
env: Environment instance
grader: Task grader instance
seed: Random seed
render: Whether to render
Returns:
Episode results dictionary
"""
# Reset environment and grader
obs, info = env.reset(seed=seed)
grader.reset()
done = False
total_reward = 0.0
steps = 0
prev_position = env.position.copy()
optimal_distance = np.linalg.norm(env.target_position - env.position)
grader.episode_data['optimal_distance'] = optimal_distance
while not done:
# Get action from OpenAI
action = get_openai_action(obs, task_description)
# Take step
obs, reward, terminated, truncated, info = env.step(action)
# Update grader with metrics
current_position = env.position.copy()
distance_delta = np.linalg.norm(current_position - prev_position)
grader.update(
steps=1,
distance_traveled=distance_delta,
energy_consumed=np.sum(np.abs(action)) * 0.5,
)
# Check for collisions (if obstacles exist)
if hasattr(env, 'check_collision') and env.check_collision():
grader.update(collisions=1)
# Track wind deviation
if env.config.wind_disturbance and hasattr(env, 'wind_deviation'):
grader.update(max_wind_deviation=max(
grader.episode_data['max_wind_deviation'],
env.wind_deviation
))
# Update position
prev_position = current_position.copy()
# Accumulate reward
total_reward += reward
steps += 1
# Render if requested
if render:
env.render()
# Check termination
done = terminated or truncated
# Final updates to grader
final_distance = np.linalg.norm(env.position - env.target_position)
grader.update(
target_reached=final_distance < getattr(env, 'target_radius', 5.0),
final_distance_to_target=final_distance,
time_to_complete=steps,
)
# Get grade report
grade_report = grader.get_grade_report()
# Compile results
results = {
'seed': seed,
'steps': steps,
'total_reward': total_reward,
'final_score': grade_report['final_score'],
'passed': grade_report['passed'],
'criteria_scores': grade_report['criteria_scores'],
'episode_data': grade_report['episode_data'],
'feedback': grade_report['feedback'],
}
return results
def evaluate_task(
task_level: str,
config: Dict[str, Any],
n_episodes: int = 10,
seed: int = 42,
render: bool = False,
verbose: bool = True,
) -> Dict[str, Any]:
"""
Evaluate agent on specific task level.
Args:
task_level: Difficulty level
config: Task configuration
n_episodes: Number of episodes
seed: Base random seed
render: Render episodes
verbose: Print progress
Returns:
Aggregated evaluation results
"""
if verbose:
print(f"\n{'='*60}")
print(f"Evaluating {task_level.upper()} task")
print(f"{'='*60}")
print(f"Configuration:")
for key, value in config['config'].items():
print(f" {key}: {value}")
print(f"Grading criteria:")
for criterion in config['grader']['criteria']:
print(f" - {criterion['name']}: {criterion['weight']*100:.0f}%")
print(f"{'='*60}\n")
# Create environment
env_config = EnvConfig(
**config['config'],
task_level=task_level,
verbose=False,
)
env = OpenEnv(config=env_config)
# Create grader
grader = create_grader(task_level, config['grader'])
# Run episodes
episode_results = []
for ep in range(n_episodes):
episode_seed = seed + ep
result = run_episode(env, grader, episode_seed, config['description'], render=render)
episode_results.append(result)
if verbose:
status = "✓ PASSED" if result['passed'] else "✗ FAILED"
print(f"Episode {ep+1}/{n_episodes} (seed={episode_seed}): "
f"Score={result['final_score']:.3f} {status}")
env.close()
# Aggregate results
scores = [r['final_score'] for r in episode_results]
rewards = [r['total_reward'] for r in episode_results]
steps = [r['steps'] for r in episode_results]
passed_count = sum(1 for r in episode_results if r['passed'])
aggregated = {
'task_level': task_level,
'n_episodes': n_episodes,
'base_seed': seed,
'mean_score': float(np.mean(scores)),
'std_score': float(np.std(scores)),
'min_score': float(np.min(scores)),
'max_score': float(np.max(scores)),
'pass_rate': passed_count / n_episodes,
'mean_reward': float(np.mean(rewards)),
'std_reward': float(np.std(rewards)),
'mean_steps': float(np.mean(steps)),
'episode_results': episode_results,
}
if verbose:
print(f"\n{'='*60}")
print(f"Results Summary - {task_level.upper()}")
print(f"{'='*60}")
print(f"Mean Score: {aggregated['mean_score']:.3f} ± {aggregated['std_score']:.3f}")
print(f"Score Range: [{aggregated['min_score']:.3f}, {aggregated['max_score']:.3f}]")
print(f"Pass Rate: {aggregated['pass_rate']*100:.1f}% ({passed_count}/{n_episodes})")
print(f"Mean Reward: {aggregated['mean_reward']:.2f} ± {aggregated['std_reward']:.2f}")
print(f"Mean Steps: {aggregated['mean_steps']:.1f}")
print(f"{'='*60}\n")
return aggregated
def main():
"""Main inference pipeline."""
parser = argparse.ArgumentParser(description='Baseline Inference for OpenEnv')
parser.add_argument('--task_level', type=str, default='medium',
choices=['easy', 'medium', 'hard'],
help='Task difficulty level')
parser.add_argument('--all_tasks', action='store_true',
help='Evaluate on all difficulty levels')
parser.add_argument('--n_episodes', type=int, default=10,
help='Number of evaluation episodes')
parser.add_argument('--seed', type=int, default=42,
help='Base random seed')
parser.add_argument('--config', type=str, default='openenv.yaml',
help='Path to configuration file')
parser.add_argument('--render', action='store_true',
help='Render episodes')
parser.add_argument('--output', type=str, default='results.json',
help='Output file for results')
parser.add_argument('--quiet', action='store_true',
help='Suppress verbose output')
args = parser.parse_args()
print("="*60)
print("OpenEnv Baseline Inference")
print("="*60)
# Load configuration
yaml_config = load_config_from_yaml(args.config)
# Determine which tasks to evaluate
if args.all_tasks:
task_levels = ['easy', 'medium', 'hard']
else:
task_levels = [args.task_level]
all_results = {}
# Evaluate each task level
for task_level in task_levels:
task_config = yaml_config['tasks'][task_level]
results = evaluate_task(
task_level=task_level,
config=task_config,
n_episodes=args.n_episodes,
seed=args.seed,
render=args.render,
verbose=not args.quiet,
)
all_results[task_level] = results
# Save results
output_dir = os.path.dirname(args.output)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(args.output, 'w') as f:
json.dump(all_results, f, indent=2)
print(f"\nResults saved to {args.output}")
# Print overall summary
if len(task_levels) > 1:
print("\n" + "="*60)
print("Overall Performance Summary")
print("="*60)
for task_level in task_levels:
results = all_results[task_level]
print(f"{task_level.upper():10s}: Score={results['mean_score']:.3f} ± "
f"{results['std_score']:.3f}, Pass Rate={results['pass_rate']*100:.1f}%")
print("="*60)
return all_results
if __name__ == "__main__":
main()