""" Core environment for the API Debug Environment. Implements the OpenEnv Environment interface with: - 5 task difficulty levels (easy, classify, medium, headers, hard) - Multi-turn episodes with structured feedback - Deterministic grading for easy/classify/medium/headers, LLM-as-judge for hard - Step reward decay to encourage efficient debugging - Auto-curriculum (task="auto") that promotes based on rolling reward """ import copy import json import os import random from typing import Any, Dict, List, Optional, Tuple from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State try: from ..models import APIDebugAction, APIDebugObservation except ImportError: from models import APIDebugAction, APIDebugObservation from .api_specs import get_random_spec from .error_injectors import ( ERROR_TYPES, HEADER_ERROR_TYPES, inject_chained_errors, inject_error, inject_multiple_errors, ) from .response_specs import get_random_response_template, inject_response_issues from .validators import ( validate_field_type, validate_headers_against_spec, validate_request_against_spec, ) # Task configuration: max steps and error count per difficulty TASK_CONFIG = { "easy": {"max_steps": 3, "error_count": 1}, "classify": {"max_steps": 4, "min_errors": 2, "max_errors": 3}, "medium": {"max_steps": 5, "error_count": 1}, "headers": {"max_steps": 4, "error_count": 1}, "hard": {"max_steps": 7, "min_errors": 2, "max_errors": 3}, "response": {"max_steps": 4, "min_issues": 1, "max_issues": 2}, } class APIDebugEnvironment(Environment): """API Contract Validation environment. An LLM agent receives a broken API request and must: - Easy: Identify the error type and affected fields - Medium: Fix the request to match the API spec - Hard: Fix the request and explain the fix for developers Each episode allows multiple attempts. Perfect answers on early steps get full reward. Later steps get decayed reward. """ SUPPORTS_CONCURRENT_SESSIONS: bool = True # Curriculum thresholds for task="auto" mode # When rolling avg reward exceeds threshold, promote to next task AUTO_CURRICULUM = { "easy": {"next": "classify", "threshold": 0.7}, "classify": {"next": "medium", "threshold": 0.6}, "medium": {"next": "headers", "threshold": 0.6}, "headers": {"next": "response", "threshold": 0.5}, "response": {"next": "hard", "threshold": 0.5}, "hard": {"next": None, "threshold": None}, } AUTO_WINDOW = 10 def __init__(self): super().__init__() self._state = State(episode_id=str(uuid4()), step_count=0) self.task = "easy" self.spec: Dict[str, Any] = {} self.broken_request: Dict[str, Any] = {} self.broken_headers: Dict[str, str] = {} self.ground_truths: List[Dict[str, Any]] = [] self.current_step = 0 self.max_steps = 3 self.episode_done = False self.best_reward = 0.0 self.rng = random.Random() # For wrong_http_method error: the method shown to the agent self.shown_http_method = "" # Response task state self.response_body: Dict[str, Any] = {} self.response_status_code: int = 0 self.response_template: Dict[str, Any] = {} # Curriculum state for task="auto" self._auto_task = "easy" self._auto_rewards: List[float] = [] def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task: str = "easy", **kwargs, ) -> APIDebugObservation: """Start a new debugging episode. Args: seed: Random seed for reproducible episodes. episode_id: Custom episode identifier. task: Difficulty level (easy, classify, medium, headers, hard, auto). """ # Initialize RNG if seed is not None: self.rng = random.Random(seed) else: self.rng = random.Random() # Validate task -- "auto" uses curriculum to pick difficulty if task == "auto": self.task = self._auto_task else: self.task = task if task in TASK_CONFIG else "easy" config = TASK_CONFIG[self.task] self.max_steps = config["max_steps"] self.current_step = 0 self.episode_done = False self.best_reward = 0.0 # Fresh state self._state = State( episode_id=episode_id or str(uuid4()), step_count=0, ) # Pick random spec and build valid request self.spec = copy.deepcopy(get_random_spec(self.rng)) valid_request = copy.deepcopy(self.spec["valid_example"]) valid_headers = copy.deepcopy(self.spec["required_headers"]) # Response task has a completely different setup: broken response, not request if self.task == "response": issue_count = self.rng.randint(config["min_issues"], config["max_issues"]) self.response_template = get_random_response_template(self.rng) self.response_body, self.response_status_code, self.ground_truths = ( inject_response_issues(self.response_template, self.rng, issue_count) ) # For response task, the request is correct -- agent examines the response self.broken_request = valid_request self.broken_headers = valid_headers self.shown_http_method = self.spec["http_method"] error_count = len(self.ground_truths) return APIDebugObservation( task=self.task, api_name=self.spec["api_name"], http_method=self.shown_http_method, endpoint=self.spec["endpoint"], broken_request=json.dumps(self.broken_request, indent=2), broken_headers=self.broken_headers, api_spec=self._build_spec_string(), response_body=json.dumps(self.response_body, indent=2), response_status_code=self.response_status_code, error_count=error_count, step_number=0, max_steps=self.max_steps, feedback="", message=( f"Validate the response from {self.shown_http_method} {self.spec['endpoint']}. " f"The response has {error_count} issue(s). " f"You have {self.max_steps} steps." ), done=False, reward=0.0, ) # Inject errors based on difficulty if self.task == "hard": error_count = self.rng.randint(config["min_errors"], config["max_errors"]) # 50% chance of chained errors (header gate + body errors) if self.rng.random() < 0.5: self.broken_request, self.broken_headers, self.ground_truths = ( inject_chained_errors( valid_request, valid_headers, self.spec, self.rng, error_count ) ) else: self.broken_request, self.broken_headers, self.ground_truths = ( inject_multiple_errors( valid_request, valid_headers, self.spec, self.rng, error_count ) ) elif self.task == "classify": error_count = self.rng.randint(config["min_errors"], config["max_errors"]) self.broken_request, self.broken_headers, self.ground_truths = ( inject_multiple_errors( valid_request, valid_headers, self.spec, self.rng, error_count ) ) elif self.task == "headers": error_type = self.rng.choice(HEADER_ERROR_TYPES) self.broken_request, self.broken_headers, gt = inject_error( error_type, valid_request, valid_headers, self.spec, self.rng ) self.ground_truths = [gt] else: error_type = self.rng.choice(ERROR_TYPES) self.broken_request, self.broken_headers, gt = inject_error( error_type, valid_request, valid_headers, self.spec, self.rng ) self.ground_truths = [gt] # Handle wrong_http_method: show the wrong method to the agent self.shown_http_method = self.spec["http_method"] for gt in self.ground_truths: if gt["error_type"] == "wrong_http_method": self.shown_http_method = gt.get("wrong_method", self.spec["http_method"]) break error_count = len(self.ground_truths) return APIDebugObservation( task=self.task, api_name=self.spec["api_name"], http_method=self.shown_http_method, endpoint=self.spec["endpoint"], broken_request=json.dumps(self.broken_request, indent=2), broken_headers=self.broken_headers, api_spec=self._build_spec_string(), error_count=error_count, step_number=0, max_steps=self.max_steps, feedback="", message=( f"Debug this {self.shown_http_method} {self.spec['endpoint']} request. " f"It contains {error_count} error(s). " f"You have {self.max_steps} steps." ), done=False, reward=0.0, ) def step( self, action: APIDebugAction, timeout_s: Optional[float] = None, **kwargs, ) -> APIDebugObservation: """Process the agent's debugging attempt. The agent can submit a partial or complete response. The grader evaluates whatever fields are present. """ self.current_step += 1 self._state.step_count = self.current_step if self.episode_done: return self._make_observation( feedback="Episode already ended.", reward=self.best_reward if self.best_reward > 0 else 0.001, done=True, ) # Grade based on task type if self.task == "easy": raw_score, feedback = self._grade_easy(action) elif self.task == "classify": raw_score, feedback = self._grade_classify(action) elif self.task == "medium": raw_score, feedback = self._grade_medium(action) elif self.task == "headers": raw_score, feedback = self._grade_headers(action) elif self.task == "response": raw_score, feedback = self._grade_response(action) else: raw_score, feedback = self._grade_hard(action) # Apply step decay: step 1 = 1.0x, step 2 = 0.9x, etc. Floor at 0.3x step_multiplier = max(1.0 - 0.1 * (self.current_step - 1), 0.3) reward = round(raw_score * step_multiplier, 4) # Clamp to open interval (0, 1) - evaluator rejects exactly 0.0 and 1.0 reward = max(0.001, min(0.999, reward)) # Track best reward across all steps self.best_reward = max(self.best_reward, reward) # Episode ends if score is near-perfect or out of steps near_perfect = raw_score >= 0.95 out_of_steps = self.current_step >= self.max_steps done = near_perfect or out_of_steps if done: self.episode_done = True # Return best reward achieved during the episode reward = self.best_reward # Track for auto-curriculum promotion self._auto_rewards.append(reward) self._maybe_auto_promote() return self._make_observation( feedback=feedback, reward=reward, done=done, ) @property def state(self) -> State: return self._state def _maybe_auto_promote(self): """Check if auto-curriculum should promote to next difficulty.""" config = self.AUTO_CURRICULUM.get(self._auto_task) if not config or config["next"] is None or config["threshold"] is None: return if len(self._auto_rewards) < self.AUTO_WINDOW: return avg = sum(self._auto_rewards[-self.AUTO_WINDOW:]) / self.AUTO_WINDOW if avg >= config["threshold"]: self._auto_task = config["next"] self._auto_rewards.clear() # ===================================================================== # Grading methods # ===================================================================== def _grade_easy(self, action: APIDebugAction) -> Tuple[float, str]: """Grade error identification. Fully deterministic. Scoring: 0.6 for correct error_type + 0.4 for correct affected_fields. Fields use Jaccard similarity for partial credit. """ score = 0.0 parts = [] # Collect all ground truth error types and affected fields gt_types = {gt["error_type"] for gt in self.ground_truths} gt_fields: set = set() for gt in self.ground_truths: gt_fields.update(gt.get("affected_fields", [])) # Check error type (0.6 weight) if action.error_type and action.error_type in gt_types: score += 0.6 parts.append("error_type: CORRECT") else: given = action.error_type or "(none)" parts.append(f"error_type: INCORRECT (you said '{given}')") # Check affected fields using Jaccard similarity (0.4 weight) agent_fields = set(action.affected_fields or []) if gt_fields and agent_fields: intersection = gt_fields & agent_fields union = gt_fields | agent_fields jaccard = len(intersection) / len(union) if union else 0.0 score += 0.4 * jaccard parts.append( f"affected_fields: {len(intersection)}/{len(gt_fields)} correct, " f"{len(agent_fields - gt_fields)} extra" ) elif not agent_fields: parts.append("affected_fields: MISSING (none provided)") else: parts.append("affected_fields: INCORRECT (0 matches)") return round(score, 4), "; ".join(parts) def _grade_classify(self, action: APIDebugAction) -> Tuple[float, str]: """Grade multi-error classification. Fully deterministic. Like easy but the agent must identify ALL error types and ALL affected fields across multiple injected errors. Scoring: 0.6 for error types (Jaccard) + 0.4 for affected fields (Jaccard). Accepts either error_types (list) or error_type (single) from the agent. """ score = 0.0 parts = [] gt_types = {gt["error_type"] for gt in self.ground_truths} gt_fields: set = set() for gt in self.ground_truths: gt_fields.update(gt.get("affected_fields", [])) # Accept error_types (list) or fall back to error_type (single) agent_types = set(action.error_types or []) if not agent_types and action.error_type: agent_types = {action.error_type} # Error types Jaccard (0.6 weight) if gt_types and agent_types: intersection = gt_types & agent_types union = gt_types | agent_types jaccard = len(intersection) / len(union) if union else 0.0 score += 0.6 * jaccard parts.append( f"error_types: {len(intersection)}/{len(gt_types)} correct, " f"{len(agent_types - gt_types)} extra" ) elif not agent_types: parts.append("error_types: MISSING (none provided)") else: parts.append("error_types: INCORRECT (0 matches)") # Affected fields Jaccard (0.4 weight) agent_fields = set(action.affected_fields or []) if gt_fields and agent_fields: intersection = gt_fields & agent_fields union = gt_fields | agent_fields jaccard = len(intersection) / len(union) if union else 0.0 score += 0.4 * jaccard parts.append( f"affected_fields: {len(intersection)}/{len(gt_fields)} correct, " f"{len(agent_fields - gt_fields)} extra" ) elif not agent_fields: parts.append("affected_fields: MISSING (none provided)") else: parts.append("affected_fields: INCORRECT (0 matches)") return round(score, 4), "; ".join(parts) def _grade_medium(self, action: APIDebugAction) -> Tuple[float, str]: """Grade request fix. Fully deterministic per-field validation. Validates the fixed request against the spec: required fields present, field types correct, headers present. Each check is equally weighted. """ if not action.fixed_request: return 0.0, "No fixed_request provided." try: fixed = json.loads(action.fixed_request) except (json.JSONDecodeError, TypeError): return 0.0, "fixed_request is not valid JSON." if not isinstance(fixed, dict): return 0.0, "fixed_request must be a JSON object." # Validate request body against spec body_score, body_feedback = validate_request_against_spec(fixed, self.spec) # Validate headers if provided header_score = 0.0 header_feedback = "" has_header_errors = any( gt["error_type"] == "missing_auth_header" for gt in self.ground_truths ) if has_header_errors and action.fixed_headers: header_score, header_feedback = validate_headers_against_spec( action.fixed_headers, self.spec ) # Blend: 80% body + 20% headers when header errors exist total_score = 0.8 * body_score + 0.2 * header_score feedback = body_feedback + "\n" + header_feedback elif has_header_errors and not action.fixed_headers: feedback = body_feedback + "\nHeaders: NOT PROVIDED (header fix needed)" total_score = 0.8 * body_score else: total_score = body_score feedback = body_feedback return round(total_score, 4), feedback def _grade_headers(self, action: APIDebugAction) -> Tuple[float, str]: """Grade header fix. Fully deterministic. The agent must provide corrected headers that match the spec. Also awards partial credit for identifying the error type. Scoring: 0.7 for correct headers + 0.3 for error type identification. """ score = 0.0 parts = [] # Error type identification (0.3 weight) gt_types = {gt["error_type"] for gt in self.ground_truths} if action.error_type and action.error_type in gt_types: score += 0.3 parts.append("error_type: CORRECT") else: given = action.error_type or "(none)" parts.append(f"error_type: INCORRECT (you said '{given}')") # Header fix validation (0.7 weight) if action.fixed_headers: header_score, header_feedback = validate_headers_against_spec( action.fixed_headers, self.spec ) score += 0.7 * header_score parts.append(header_feedback) else: parts.append("Headers: NOT PROVIDED (header fix needed)") return round(score, 4), "; ".join(parts) def _grade_response(self, action: APIDebugAction) -> Tuple[float, str]: """Grade response validation. Fully deterministic. Agent must identify issue types and, for wrong_status_code, provide the correct status code. Scoring: 0.5 for issue type identification (Jaccard) + 0.3 for affected field identification (Jaccard) + 0.2 for correct status code (if applicable). """ score = 0.0 parts = [] gt_issue_types = {gt["issue_type"] for gt in self.ground_truths} gt_fields = {gt.get("affected_field", "") for gt in self.ground_truths} - {""} # Issue type identification (0.5 weight) predicted_issues = set(action.response_issues or []) if predicted_issues and gt_issue_types: intersection = predicted_issues & gt_issue_types union = predicted_issues | gt_issue_types jaccard = len(intersection) / len(union) if union else 0.0 score += 0.5 * jaccard parts.append(f"Issue types: {len(intersection)}/{len(gt_issue_types)} correct (Jaccard={jaccard:.2f})") else: parts.append("Issue types: NOT PROVIDED" if not predicted_issues else "Issue types: NONE CORRECT") # Affected field identification via error_type or affected_fields (0.3 weight) predicted_fields = set(action.affected_fields or []) if predicted_fields and gt_fields: intersection = predicted_fields & gt_fields union = predicted_fields | gt_fields jaccard = len(intersection) / len(union) if union else 0.0 score += 0.3 * jaccard parts.append(f"Affected fields: {len(intersection)}/{len(gt_fields)} correct") else: parts.append("Affected fields: NOT PROVIDED" if not predicted_fields else "Affected fields: NONE CORRECT") # Status code check (0.2 weight) -- only if wrong_status_code is a ground truth has_status_issue = any(gt["issue_type"] == "wrong_status_code" for gt in self.ground_truths) if has_status_issue: correct_status = None for gt in self.ground_truths: if gt["issue_type"] == "wrong_status_code": correct_status = int(gt.get("correct_value", 0)) break if action.expected_status_code and action.expected_status_code == correct_status: score += 0.2 parts.append(f"Status code: CORRECT ({correct_status})") else: given = action.expected_status_code or "(none)" parts.append(f"Status code: INCORRECT (you said {given}, expected {correct_status})") else: # No status code issue -- redistribute 0.2 to issue types score += 0.2 * (len(predicted_issues & gt_issue_types) / len(gt_issue_types) if gt_issue_types else 0.0) parts.append("Status code: N/A (no status code issue)") return round(score, 4), "; ".join(parts) def _grade_hard(self, action: APIDebugAction) -> Tuple[float, str]: """Grade fix + explanation. 70% deterministic fix, 30% explanation. The explanation is scored by LLM-as-judge if available, with a heuristic fallback if the LLM is not reachable. """ # Deterministic fix scoring (same as medium) fix_score, fix_feedback = self._grade_medium(action) # Explanation scoring explain_score = 0.0 explain_feedback = "No explanation provided." if action.explanation and len(action.explanation.strip()) > 10: explain_score = self._score_explanation(action.explanation) explain_feedback = f"Explanation quality: {explain_score:.2f}/1.0" total = 0.7 * fix_score + 0.3 * explain_score feedback = ( f"Fix score: {fix_score:.2f} (70% weight)\n" f"{fix_feedback}\n" f"{explain_feedback}" ) return round(total, 4), feedback def _score_explanation(self, explanation: str) -> float: """Score an explanation using LLM-as-judge with heuristic fallback. Tries to call the LLM via the HF router. If that fails for any reason, falls back to a keyword + length heuristic. """ # Try LLM-as-judge first try: llm_score = self._llm_judge_explanation(explanation) if llm_score is not None: return llm_score except Exception: pass # Heuristic fallback return self._heuristic_score_explanation(explanation) def _llm_judge_explanation(self, explanation: str) -> Optional[float]: """Call LLM to score the explanation. Returns None if unavailable. Uses dedicated judge model (JUDGE_MODEL / JUDGE_API_BASE / JUDGE_API_KEY) to avoid the agent grading itself. Falls back to the agent's model env vars if judge-specific vars are not set. """ api_base = os.getenv("JUDGE_API_BASE") or os.getenv("API_BASE_URL") api_key = os.getenv("JUDGE_API_KEY") or os.getenv("OPENAI_API_KEY") or os.getenv("HF_TOKEN") model = os.getenv("JUDGE_MODEL") or os.getenv("MODEL_NAME") if not all([api_base, api_key, model]): return None from openai import OpenAI client = OpenAI(base_url=api_base, api_key=api_key) # Include error type + affected fields so the judge evaluates against actual errors gt_summary = [ {"error_type": gt["error_type"], "affected_fields": gt.get("affected_fields", [])} for gt in self.ground_truths ] prompt = ( "You are grading an AI agent's explanation for debugging a broken API request.\n\n" f"API: {self.spec['api_name']} {self.spec['http_method']} {self.spec['endpoint']}\n" f"Actual errors present:\n{json.dumps(gt_summary, indent=2)}\n\n" f"Agent's explanation:\n{explanation}\n\n" "Score 0.0 to 1.0:\n" "- Root cause: correctly names the error type and affected fields (0 to 0.4)\n" "- Fix guidance: explains the correct remediation (0 to 0.3)\n" "- Developer clarity: actionable and clear for a developer (0 to 0.3)\n\n" 'Return ONLY a JSON object: {"score": 0.0}' ) # timeout=10 prevents blocking step() if the judge LLM is slow response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], max_tokens=50, temperature=0.0, timeout=10, ) text = response.choices[0].message.content or "" # Parse score from response - protected so any bad response falls back to heuristic try: result = json.loads(text) raw_score = float(result["score"]) except (json.JSONDecodeError, KeyError, TypeError, ValueError): return None return max(0.0, min(1.0, raw_score)) def _heuristic_score_explanation(self, explanation: str) -> float: """Simple heuristic scoring based on length and keyword presence. This is the fallback when LLM-as-judge is not available. Not perfect, but ensures non-zero scores for reasonable explanations. """ keywords = [ "because", "should", "instead", "required", "missing", "type", "format", "expected", "invalid", "correct", "field", "header", "value", "fix", "error", "authorization", "authentication", "schema", "endpoint", "method", "body", "payload", "constraint", ] keyword_hits = sum(1 for k in keywords if k in explanation.lower()) keyword_score = min(keyword_hits / 6.0, 1.0) # Length score: reward explanations between 50 and 500 chars length = len(explanation.strip()) if length < 20: length_score = 0.1 elif length < 50: length_score = 0.3 elif length <= 500: length_score = 0.6 else: length_score = 0.5 # Slightly penalize very long explanations return round(0.5 * keyword_score + 0.5 * length_score, 2) # ===================================================================== # Helpers # ===================================================================== def _build_spec_string(self) -> str: """Build a JSON string of the spec info the agent needs to see.""" visible_spec = { "required_fields": self.spec["required_fields"], "optional_fields": self.spec.get("optional_fields", []), "field_types": self.spec["field_types"], "required_headers": list(self.spec.get("required_headers", {}).keys()), } return json.dumps(visible_spec, indent=2) def _make_observation( self, feedback: str, reward: float, done: bool, ) -> APIDebugObservation: """Build an observation with the current episode state.""" if done and not feedback: msg = "Episode complete." elif done: msg = f"Episode complete. Final reward: {reward:.2f}" else: remaining = self.max_steps - self.current_step msg = f"{remaining} step(s) remaining. Use the feedback to improve." obs = APIDebugObservation( task=self.task, api_name=self.spec.get("api_name", ""), http_method=self.shown_http_method, endpoint=self.spec.get("endpoint", ""), broken_request=json.dumps(self.broken_request, indent=2), broken_headers=self.broken_headers, api_spec=self._build_spec_string(), error_count=len(self.ground_truths), step_number=self.current_step, max_steps=self.max_steps, feedback=feedback, message=msg, done=done, reward=reward, ) # Include response data for response task if self.task == "response": obs.response_body = json.dumps(self.response_body, indent=2) obs.response_status_code = self.response_status_code return obs