api-debug-env / server /environment.py
avichauhan's picture
Upload folder using huggingface_hub
5dc848a verified
"""
Core environment for the API Debug Environment.
Implements the OpenEnv Environment interface with:
- 5 task difficulty levels (easy, classify, medium, headers, hard)
- Multi-turn episodes with structured feedback
- Deterministic grading for easy/classify/medium/headers, LLM-as-judge for hard
- Step reward decay to encourage efficient debugging
- Auto-curriculum (task="auto") that promotes based on rolling reward
"""
import copy
import json
import os
import random
from typing import Any, Dict, List, Optional, Tuple
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import APIDebugAction, APIDebugObservation
except ImportError:
from models import APIDebugAction, APIDebugObservation
from .api_specs import get_random_spec
from .error_injectors import (
ERROR_TYPES,
HEADER_ERROR_TYPES,
inject_chained_errors,
inject_error,
inject_multiple_errors,
)
from .response_specs import get_random_response_template, inject_response_issues
from .validators import (
validate_field_type,
validate_headers_against_spec,
validate_request_against_spec,
)
# Task configuration: max steps and error count per difficulty
TASK_CONFIG = {
"easy": {"max_steps": 3, "error_count": 1},
"classify": {"max_steps": 4, "min_errors": 2, "max_errors": 3},
"medium": {"max_steps": 5, "error_count": 1},
"headers": {"max_steps": 4, "error_count": 1},
"hard": {"max_steps": 7, "min_errors": 2, "max_errors": 3},
"response": {"max_steps": 4, "min_issues": 1, "max_issues": 2},
}
class APIDebugEnvironment(Environment):
"""API Contract Validation environment.
An LLM agent receives a broken API request and must:
- Easy: Identify the error type and affected fields
- Medium: Fix the request to match the API spec
- Hard: Fix the request and explain the fix for developers
Each episode allows multiple attempts. Perfect answers on early
steps get full reward. Later steps get decayed reward.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
# Curriculum thresholds for task="auto" mode
# When rolling avg reward exceeds threshold, promote to next task
AUTO_CURRICULUM = {
"easy": {"next": "classify", "threshold": 0.7},
"classify": {"next": "medium", "threshold": 0.6},
"medium": {"next": "headers", "threshold": 0.6},
"headers": {"next": "response", "threshold": 0.5},
"response": {"next": "hard", "threshold": 0.5},
"hard": {"next": None, "threshold": None},
}
AUTO_WINDOW = 10
def __init__(self):
super().__init__()
self._state = State(episode_id=str(uuid4()), step_count=0)
self.task = "easy"
self.spec: Dict[str, Any] = {}
self.broken_request: Dict[str, Any] = {}
self.broken_headers: Dict[str, str] = {}
self.ground_truths: List[Dict[str, Any]] = []
self.current_step = 0
self.max_steps = 3
self.episode_done = False
self.best_reward = 0.0
self.rng = random.Random()
# For wrong_http_method error: the method shown to the agent
self.shown_http_method = ""
# Response task state
self.response_body: Dict[str, Any] = {}
self.response_status_code: int = 0
self.response_template: Dict[str, Any] = {}
# Curriculum state for task="auto"
self._auto_task = "easy"
self._auto_rewards: List[float] = []
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task: str = "easy",
**kwargs,
) -> APIDebugObservation:
"""Start a new debugging episode.
Args:
seed: Random seed for reproducible episodes.
episode_id: Custom episode identifier.
task: Difficulty level (easy, classify, medium, headers, hard, auto).
"""
# Initialize RNG
if seed is not None:
self.rng = random.Random(seed)
else:
self.rng = random.Random()
# Validate task -- "auto" uses curriculum to pick difficulty
if task == "auto":
self.task = self._auto_task
else:
self.task = task if task in TASK_CONFIG else "easy"
config = TASK_CONFIG[self.task]
self.max_steps = config["max_steps"]
self.current_step = 0
self.episode_done = False
self.best_reward = 0.0
# Fresh state
self._state = State(
episode_id=episode_id or str(uuid4()),
step_count=0,
)
# Pick random spec and build valid request
self.spec = copy.deepcopy(get_random_spec(self.rng))
valid_request = copy.deepcopy(self.spec["valid_example"])
valid_headers = copy.deepcopy(self.spec["required_headers"])
# Response task has a completely different setup: broken response, not request
if self.task == "response":
issue_count = self.rng.randint(config["min_issues"], config["max_issues"])
self.response_template = get_random_response_template(self.rng)
self.response_body, self.response_status_code, self.ground_truths = (
inject_response_issues(self.response_template, self.rng, issue_count)
)
# For response task, the request is correct -- agent examines the response
self.broken_request = valid_request
self.broken_headers = valid_headers
self.shown_http_method = self.spec["http_method"]
error_count = len(self.ground_truths)
return APIDebugObservation(
task=self.task,
api_name=self.spec["api_name"],
http_method=self.shown_http_method,
endpoint=self.spec["endpoint"],
broken_request=json.dumps(self.broken_request, indent=2),
broken_headers=self.broken_headers,
api_spec=self._build_spec_string(),
response_body=json.dumps(self.response_body, indent=2),
response_status_code=self.response_status_code,
error_count=error_count,
step_number=0,
max_steps=self.max_steps,
feedback="",
message=(
f"Validate the response from {self.shown_http_method} {self.spec['endpoint']}. "
f"The response has {error_count} issue(s). "
f"You have {self.max_steps} steps."
),
done=False,
reward=0.0,
)
# Inject errors based on difficulty
if self.task == "hard":
error_count = self.rng.randint(config["min_errors"], config["max_errors"])
# 50% chance of chained errors (header gate + body errors)
if self.rng.random() < 0.5:
self.broken_request, self.broken_headers, self.ground_truths = (
inject_chained_errors(
valid_request, valid_headers, self.spec, self.rng, error_count
)
)
else:
self.broken_request, self.broken_headers, self.ground_truths = (
inject_multiple_errors(
valid_request, valid_headers, self.spec, self.rng, error_count
)
)
elif self.task == "classify":
error_count = self.rng.randint(config["min_errors"], config["max_errors"])
self.broken_request, self.broken_headers, self.ground_truths = (
inject_multiple_errors(
valid_request, valid_headers, self.spec, self.rng, error_count
)
)
elif self.task == "headers":
error_type = self.rng.choice(HEADER_ERROR_TYPES)
self.broken_request, self.broken_headers, gt = inject_error(
error_type, valid_request, valid_headers, self.spec, self.rng
)
self.ground_truths = [gt]
else:
error_type = self.rng.choice(ERROR_TYPES)
self.broken_request, self.broken_headers, gt = inject_error(
error_type, valid_request, valid_headers, self.spec, self.rng
)
self.ground_truths = [gt]
# Handle wrong_http_method: show the wrong method to the agent
self.shown_http_method = self.spec["http_method"]
for gt in self.ground_truths:
if gt["error_type"] == "wrong_http_method":
self.shown_http_method = gt.get("wrong_method", self.spec["http_method"])
break
error_count = len(self.ground_truths)
return APIDebugObservation(
task=self.task,
api_name=self.spec["api_name"],
http_method=self.shown_http_method,
endpoint=self.spec["endpoint"],
broken_request=json.dumps(self.broken_request, indent=2),
broken_headers=self.broken_headers,
api_spec=self._build_spec_string(),
error_count=error_count,
step_number=0,
max_steps=self.max_steps,
feedback="",
message=(
f"Debug this {self.shown_http_method} {self.spec['endpoint']} request. "
f"It contains {error_count} error(s). "
f"You have {self.max_steps} steps."
),
done=False,
reward=0.0,
)
def step(
self,
action: APIDebugAction,
timeout_s: Optional[float] = None,
**kwargs,
) -> APIDebugObservation:
"""Process the agent's debugging attempt.
The agent can submit a partial or complete response.
The grader evaluates whatever fields are present.
"""
self.current_step += 1
self._state.step_count = self.current_step
if self.episode_done:
return self._make_observation(
feedback="Episode already ended.",
reward=self.best_reward if self.best_reward > 0 else 0.001,
done=True,
)
# Grade based on task type
if self.task == "easy":
raw_score, feedback = self._grade_easy(action)
elif self.task == "classify":
raw_score, feedback = self._grade_classify(action)
elif self.task == "medium":
raw_score, feedback = self._grade_medium(action)
elif self.task == "headers":
raw_score, feedback = self._grade_headers(action)
elif self.task == "response":
raw_score, feedback = self._grade_response(action)
else:
raw_score, feedback = self._grade_hard(action)
# Apply step decay: step 1 = 1.0x, step 2 = 0.9x, etc. Floor at 0.3x
step_multiplier = max(1.0 - 0.1 * (self.current_step - 1), 0.3)
reward = round(raw_score * step_multiplier, 4)
# Clamp to open interval (0, 1) - evaluator rejects exactly 0.0 and 1.0
reward = max(0.001, min(0.999, reward))
# Track best reward across all steps
self.best_reward = max(self.best_reward, reward)
# Episode ends if score is near-perfect or out of steps
near_perfect = raw_score >= 0.95
out_of_steps = self.current_step >= self.max_steps
done = near_perfect or out_of_steps
if done:
self.episode_done = True
# Return best reward achieved during the episode
reward = self.best_reward
# Track for auto-curriculum promotion
self._auto_rewards.append(reward)
self._maybe_auto_promote()
return self._make_observation(
feedback=feedback,
reward=reward,
done=done,
)
@property
def state(self) -> State:
return self._state
def _maybe_auto_promote(self):
"""Check if auto-curriculum should promote to next difficulty."""
config = self.AUTO_CURRICULUM.get(self._auto_task)
if not config or config["next"] is None or config["threshold"] is None:
return
if len(self._auto_rewards) < self.AUTO_WINDOW:
return
avg = sum(self._auto_rewards[-self.AUTO_WINDOW:]) / self.AUTO_WINDOW
if avg >= config["threshold"]:
self._auto_task = config["next"]
self._auto_rewards.clear()
# =====================================================================
# Grading methods
# =====================================================================
def _grade_easy(self, action: APIDebugAction) -> Tuple[float, str]:
"""Grade error identification. Fully deterministic.
Scoring: 0.6 for correct error_type + 0.4 for correct affected_fields.
Fields use Jaccard similarity for partial credit.
"""
score = 0.0
parts = []
# Collect all ground truth error types and affected fields
gt_types = {gt["error_type"] for gt in self.ground_truths}
gt_fields: set = set()
for gt in self.ground_truths:
gt_fields.update(gt.get("affected_fields", []))
# Check error type (0.6 weight)
if action.error_type and action.error_type in gt_types:
score += 0.6
parts.append("error_type: CORRECT")
else:
given = action.error_type or "(none)"
parts.append(f"error_type: INCORRECT (you said '{given}')")
# Check affected fields using Jaccard similarity (0.4 weight)
agent_fields = set(action.affected_fields or [])
if gt_fields and agent_fields:
intersection = gt_fields & agent_fields
union = gt_fields | agent_fields
jaccard = len(intersection) / len(union) if union else 0.0
score += 0.4 * jaccard
parts.append(
f"affected_fields: {len(intersection)}/{len(gt_fields)} correct, "
f"{len(agent_fields - gt_fields)} extra"
)
elif not agent_fields:
parts.append("affected_fields: MISSING (none provided)")
else:
parts.append("affected_fields: INCORRECT (0 matches)")
return round(score, 4), "; ".join(parts)
def _grade_classify(self, action: APIDebugAction) -> Tuple[float, str]:
"""Grade multi-error classification. Fully deterministic.
Like easy but the agent must identify ALL error types and ALL
affected fields across multiple injected errors.
Scoring: 0.6 for error types (Jaccard) + 0.4 for affected fields (Jaccard).
Accepts either error_types (list) or error_type (single) from the agent.
"""
score = 0.0
parts = []
gt_types = {gt["error_type"] for gt in self.ground_truths}
gt_fields: set = set()
for gt in self.ground_truths:
gt_fields.update(gt.get("affected_fields", []))
# Accept error_types (list) or fall back to error_type (single)
agent_types = set(action.error_types or [])
if not agent_types and action.error_type:
agent_types = {action.error_type}
# Error types Jaccard (0.6 weight)
if gt_types and agent_types:
intersection = gt_types & agent_types
union = gt_types | agent_types
jaccard = len(intersection) / len(union) if union else 0.0
score += 0.6 * jaccard
parts.append(
f"error_types: {len(intersection)}/{len(gt_types)} correct, "
f"{len(agent_types - gt_types)} extra"
)
elif not agent_types:
parts.append("error_types: MISSING (none provided)")
else:
parts.append("error_types: INCORRECT (0 matches)")
# Affected fields Jaccard (0.4 weight)
agent_fields = set(action.affected_fields or [])
if gt_fields and agent_fields:
intersection = gt_fields & agent_fields
union = gt_fields | agent_fields
jaccard = len(intersection) / len(union) if union else 0.0
score += 0.4 * jaccard
parts.append(
f"affected_fields: {len(intersection)}/{len(gt_fields)} correct, "
f"{len(agent_fields - gt_fields)} extra"
)
elif not agent_fields:
parts.append("affected_fields: MISSING (none provided)")
else:
parts.append("affected_fields: INCORRECT (0 matches)")
return round(score, 4), "; ".join(parts)
def _grade_medium(self, action: APIDebugAction) -> Tuple[float, str]:
"""Grade request fix. Fully deterministic per-field validation.
Validates the fixed request against the spec: required fields present,
field types correct, headers present. Each check is equally weighted.
"""
if not action.fixed_request:
return 0.0, "No fixed_request provided."
try:
fixed = json.loads(action.fixed_request)
except (json.JSONDecodeError, TypeError):
return 0.0, "fixed_request is not valid JSON."
if not isinstance(fixed, dict):
return 0.0, "fixed_request must be a JSON object."
# Validate request body against spec
body_score, body_feedback = validate_request_against_spec(fixed, self.spec)
# Validate headers if provided
header_score = 0.0
header_feedback = ""
has_header_errors = any(
gt["error_type"] == "missing_auth_header" for gt in self.ground_truths
)
if has_header_errors and action.fixed_headers:
header_score, header_feedback = validate_headers_against_spec(
action.fixed_headers, self.spec
)
# Blend: 80% body + 20% headers when header errors exist
total_score = 0.8 * body_score + 0.2 * header_score
feedback = body_feedback + "\n" + header_feedback
elif has_header_errors and not action.fixed_headers:
feedback = body_feedback + "\nHeaders: NOT PROVIDED (header fix needed)"
total_score = 0.8 * body_score
else:
total_score = body_score
feedback = body_feedback
return round(total_score, 4), feedback
def _grade_headers(self, action: APIDebugAction) -> Tuple[float, str]:
"""Grade header fix. Fully deterministic.
The agent must provide corrected headers that match the spec.
Also awards partial credit for identifying the error type.
Scoring: 0.7 for correct headers + 0.3 for error type identification.
"""
score = 0.0
parts = []
# Error type identification (0.3 weight)
gt_types = {gt["error_type"] for gt in self.ground_truths}
if action.error_type and action.error_type in gt_types:
score += 0.3
parts.append("error_type: CORRECT")
else:
given = action.error_type or "(none)"
parts.append(f"error_type: INCORRECT (you said '{given}')")
# Header fix validation (0.7 weight)
if action.fixed_headers:
header_score, header_feedback = validate_headers_against_spec(
action.fixed_headers, self.spec
)
score += 0.7 * header_score
parts.append(header_feedback)
else:
parts.append("Headers: NOT PROVIDED (header fix needed)")
return round(score, 4), "; ".join(parts)
def _grade_response(self, action: APIDebugAction) -> Tuple[float, str]:
"""Grade response validation. Fully deterministic.
Agent must identify issue types and, for wrong_status_code, provide
the correct status code.
Scoring: 0.5 for issue type identification (Jaccard) +
0.3 for affected field identification (Jaccard) +
0.2 for correct status code (if applicable).
"""
score = 0.0
parts = []
gt_issue_types = {gt["issue_type"] for gt in self.ground_truths}
gt_fields = {gt.get("affected_field", "") for gt in self.ground_truths} - {""}
# Issue type identification (0.5 weight)
predicted_issues = set(action.response_issues or [])
if predicted_issues and gt_issue_types:
intersection = predicted_issues & gt_issue_types
union = predicted_issues | gt_issue_types
jaccard = len(intersection) / len(union) if union else 0.0
score += 0.5 * jaccard
parts.append(f"Issue types: {len(intersection)}/{len(gt_issue_types)} correct (Jaccard={jaccard:.2f})")
else:
parts.append("Issue types: NOT PROVIDED" if not predicted_issues else "Issue types: NONE CORRECT")
# Affected field identification via error_type or affected_fields (0.3 weight)
predicted_fields = set(action.affected_fields or [])
if predicted_fields and gt_fields:
intersection = predicted_fields & gt_fields
union = predicted_fields | gt_fields
jaccard = len(intersection) / len(union) if union else 0.0
score += 0.3 * jaccard
parts.append(f"Affected fields: {len(intersection)}/{len(gt_fields)} correct")
else:
parts.append("Affected fields: NOT PROVIDED" if not predicted_fields else "Affected fields: NONE CORRECT")
# Status code check (0.2 weight) -- only if wrong_status_code is a ground truth
has_status_issue = any(gt["issue_type"] == "wrong_status_code" for gt in self.ground_truths)
if has_status_issue:
correct_status = None
for gt in self.ground_truths:
if gt["issue_type"] == "wrong_status_code":
correct_status = int(gt.get("correct_value", 0))
break
if action.expected_status_code and action.expected_status_code == correct_status:
score += 0.2
parts.append(f"Status code: CORRECT ({correct_status})")
else:
given = action.expected_status_code or "(none)"
parts.append(f"Status code: INCORRECT (you said {given}, expected {correct_status})")
else:
# No status code issue -- redistribute 0.2 to issue types
score += 0.2 * (len(predicted_issues & gt_issue_types) / len(gt_issue_types) if gt_issue_types else 0.0)
parts.append("Status code: N/A (no status code issue)")
return round(score, 4), "; ".join(parts)
def _grade_hard(self, action: APIDebugAction) -> Tuple[float, str]:
"""Grade fix + explanation. 70% deterministic fix, 30% explanation.
The explanation is scored by LLM-as-judge if available,
with a heuristic fallback if the LLM is not reachable.
"""
# Deterministic fix scoring (same as medium)
fix_score, fix_feedback = self._grade_medium(action)
# Explanation scoring
explain_score = 0.0
explain_feedback = "No explanation provided."
if action.explanation and len(action.explanation.strip()) > 10:
explain_score = self._score_explanation(action.explanation)
explain_feedback = f"Explanation quality: {explain_score:.2f}/1.0"
total = 0.7 * fix_score + 0.3 * explain_score
feedback = (
f"Fix score: {fix_score:.2f} (70% weight)\n"
f"{fix_feedback}\n"
f"{explain_feedback}"
)
return round(total, 4), feedback
def _score_explanation(self, explanation: str) -> float:
"""Score an explanation using LLM-as-judge with heuristic fallback.
Tries to call the LLM via the HF router. If that fails for any
reason, falls back to a keyword + length heuristic.
"""
# Try LLM-as-judge first
try:
llm_score = self._llm_judge_explanation(explanation)
if llm_score is not None:
return llm_score
except Exception:
pass
# Heuristic fallback
return self._heuristic_score_explanation(explanation)
def _llm_judge_explanation(self, explanation: str) -> Optional[float]:
"""Call LLM to score the explanation. Returns None if unavailable.
Uses dedicated judge model (JUDGE_MODEL / JUDGE_API_BASE / JUDGE_API_KEY)
to avoid the agent grading itself. Falls back to the agent's model env
vars if judge-specific vars are not set.
"""
api_base = os.getenv("JUDGE_API_BASE") or os.getenv("API_BASE_URL")
api_key = os.getenv("JUDGE_API_KEY") or os.getenv("OPENAI_API_KEY") or os.getenv("HF_TOKEN")
model = os.getenv("JUDGE_MODEL") or os.getenv("MODEL_NAME")
if not all([api_base, api_key, model]):
return None
from openai import OpenAI
client = OpenAI(base_url=api_base, api_key=api_key)
# Include error type + affected fields so the judge evaluates against actual errors
gt_summary = [
{"error_type": gt["error_type"], "affected_fields": gt.get("affected_fields", [])}
for gt in self.ground_truths
]
prompt = (
"You are grading an AI agent's explanation for debugging a broken API request.\n\n"
f"API: {self.spec['api_name']} {self.spec['http_method']} {self.spec['endpoint']}\n"
f"Actual errors present:\n{json.dumps(gt_summary, indent=2)}\n\n"
f"Agent's explanation:\n{explanation}\n\n"
"Score 0.0 to 1.0:\n"
"- Root cause: correctly names the error type and affected fields (0 to 0.4)\n"
"- Fix guidance: explains the correct remediation (0 to 0.3)\n"
"- Developer clarity: actionable and clear for a developer (0 to 0.3)\n\n"
'Return ONLY a JSON object: {"score": 0.0}'
)
# timeout=10 prevents blocking step() if the judge LLM is slow
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=50,
temperature=0.0,
timeout=10,
)
text = response.choices[0].message.content or ""
# Parse score from response - protected so any bad response falls back to heuristic
try:
result = json.loads(text)
raw_score = float(result["score"])
except (json.JSONDecodeError, KeyError, TypeError, ValueError):
return None
return max(0.0, min(1.0, raw_score))
def _heuristic_score_explanation(self, explanation: str) -> float:
"""Simple heuristic scoring based on length and keyword presence.
This is the fallback when LLM-as-judge is not available.
Not perfect, but ensures non-zero scores for reasonable explanations.
"""
keywords = [
"because", "should", "instead", "required", "missing",
"type", "format", "expected", "invalid", "correct",
"field", "header", "value", "fix", "error",
"authorization", "authentication", "schema", "endpoint",
"method", "body", "payload", "constraint",
]
keyword_hits = sum(1 for k in keywords if k in explanation.lower())
keyword_score = min(keyword_hits / 6.0, 1.0)
# Length score: reward explanations between 50 and 500 chars
length = len(explanation.strip())
if length < 20:
length_score = 0.1
elif length < 50:
length_score = 0.3
elif length <= 500:
length_score = 0.6
else:
length_score = 0.5 # Slightly penalize very long explanations
return round(0.5 * keyword_score + 0.5 * length_score, 2)
# =====================================================================
# Helpers
# =====================================================================
def _build_spec_string(self) -> str:
"""Build a JSON string of the spec info the agent needs to see."""
visible_spec = {
"required_fields": self.spec["required_fields"],
"optional_fields": self.spec.get("optional_fields", []),
"field_types": self.spec["field_types"],
"required_headers": list(self.spec.get("required_headers", {}).keys()),
}
return json.dumps(visible_spec, indent=2)
def _make_observation(
self,
feedback: str,
reward: float,
done: bool,
) -> APIDebugObservation:
"""Build an observation with the current episode state."""
if done and not feedback:
msg = "Episode complete."
elif done:
msg = f"Episode complete. Final reward: {reward:.2f}"
else:
remaining = self.max_steps - self.current_step
msg = f"{remaining} step(s) remaining. Use the feedback to improve."
obs = APIDebugObservation(
task=self.task,
api_name=self.spec.get("api_name", ""),
http_method=self.shown_http_method,
endpoint=self.spec.get("endpoint", ""),
broken_request=json.dumps(self.broken_request, indent=2),
broken_headers=self.broken_headers,
api_spec=self._build_spec_string(),
error_count=len(self.ground_truths),
step_number=self.current_step,
max_steps=self.max_steps,
feedback=feedback,
message=msg,
done=done,
reward=reward,
)
# Include response data for response task
if self.task == "response":
obs.response_body = json.dumps(self.response_body, indent=2)
obs.response_status_code = self.response_status_code
return obs