Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| API Integration Debugging Environment Implementation. | |
| A real-world environment where an AI agent diagnoses and fixes broken | |
| API integrations by reading error logs, inspecting configurations, | |
| and submitting corrected configurations. | |
| Key design features: | |
| - Dynamic state: fixing issues changes service health and produces new logs | |
| - Cascading failures: upstream fixes reveal downstream issues | |
| - Multi-dimensional rubric grading (diagnosis, fix, efficiency, strategy) | |
| - Rich reward signal with partial credit and diminishing returns | |
| """ | |
| import copy | |
| from typing import Any, Dict, List, Optional, Set, Tuple | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ApiDebugAction, ApiDebugObservation | |
| from ..scenarios import Issue, Scenario, get_all_task_ids, get_scenario | |
| except ImportError: | |
| from models import ApiDebugAction, ApiDebugObservation | |
| from scenarios import Issue, Scenario, get_all_task_ids, get_scenario | |
| class ApiDebugEnvironment(Environment): | |
| """ | |
| API Integration Debugging Environment. | |
| An agent must diagnose and fix broken API integrations by: | |
| 1. Inspecting error logs to identify issues | |
| 2. Inspecting service configurations | |
| 3. Testing endpoints to observe failures | |
| 4. Submitting configuration fixes | |
| Supports 3 difficulty levels (easy, medium, hard) with cascading | |
| failure dynamics and multi-dimensional grading. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self, task_id: str = "easy"): | |
| """ | |
| Initialize the environment. | |
| Args: | |
| task_id: One of 'easy', 'medium', 'hard' | |
| """ | |
| self._task_id = task_id | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._scenario: Optional[Scenario] = None | |
| self._current_configs: Dict[str, Dict[str, Any]] = {} | |
| self._issues_found: Set[str] = set() | |
| self._issues_fixed: Set[str] = set() | |
| self._inspected_targets: Set[str] = set() | |
| self._done = False | |
| self._last_action_result = "" | |
| self._cumulative_reward = 0.0 | |
| # Dynamic state tracking | |
| self._service_health: Dict[str, str] = {} | |
| self._dynamic_log_buffer: Dict[str, List[str]] = {} | |
| # Strategy tracking for grading | |
| self._action_history: List[Dict[str, Any]] = [] | |
| self._diagnosed_before_fix: Set[str] = set() | |
| # Track which services were inspected before a fix was submitted | |
| def reset(self, task_id: Optional[str] = None, seed: Optional[int] = None) -> ApiDebugObservation: | |
| """ | |
| Reset the environment, optionally with a new task. | |
| Args: | |
| task_id: Override the task difficulty. One of 'easy', 'medium', 'hard'. | |
| seed: Optional seed for reproducible randomized scenarios. | |
| Returns: | |
| Initial observation with task description and available targets. | |
| """ | |
| if task_id is not None: | |
| self._task_id = task_id | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._scenario = get_scenario(self._task_id, seed=seed) | |
| self._current_configs = copy.deepcopy(self._scenario.configs) | |
| self._issues_found = set() | |
| self._issues_fixed = set() | |
| self._inspected_targets = set() | |
| self._done = False | |
| self._last_action_result = "" | |
| self._cumulative_reward = 0.0 | |
| self._action_history = [] | |
| self._diagnosed_before_fix = set() | |
| # Initialize service health from scenario graph | |
| self._service_health = {} | |
| for svc_name, node in self._scenario.service_graph.items(): | |
| self._service_health[svc_name] = node.health_status | |
| # Fill in any services not in graph | |
| for svc in self._scenario.services: | |
| if svc not in self._service_health: | |
| self._service_health[svc] = "unknown" | |
| # Initialize dynamic log buffer | |
| self._dynamic_log_buffer = {svc: [] for svc in self._scenario.services} | |
| # Build dependency graph for observation | |
| dep_graph = {} | |
| for svc_name, node in self._scenario.service_graph.items(): | |
| dep_graph[svc_name] = node.depends_on | |
| return ApiDebugObservation( | |
| task_id=self._task_id, | |
| task_description=self._scenario.description, | |
| logs=[], | |
| config_snapshot={}, | |
| api_response=None, | |
| hints=self._get_hints(), | |
| remaining_steps=self._scenario.max_steps, | |
| issues_found=0, | |
| issues_fixed=0, | |
| issues_total=len(self._scenario.issues), | |
| action_result="Environment reset. Use 'inspect_logs' or 'inspect_config' to start debugging.", | |
| available_targets=self._scenario.services, | |
| done=False, | |
| reward=0.0, | |
| service_status=dict(self._service_health), | |
| dependency_graph=dep_graph, | |
| error_trace=self._build_error_trace(), | |
| ) | |
| def step(self, action: ApiDebugAction) -> ApiDebugObservation: # type: ignore[override] | |
| """ | |
| Execute one debugging step. | |
| Args: | |
| action: ApiDebugAction with action_type, target, and optional fix_payload | |
| Returns: | |
| ApiDebugObservation with results of the action | |
| """ | |
| if self._scenario is None: | |
| # Auto-reset if not initialized | |
| self.reset() | |
| assert self._scenario is not None # for type checker | |
| self._state.step_count += 1 | |
| reward = -0.01 # Small step cost to encourage efficiency | |
| logs: List[str] = [] | |
| config_snapshot: Dict[str, Any] = {} | |
| api_response: Optional[Dict[str, Any]] = None | |
| # Record action for strategy scoring | |
| self._action_history.append({ | |
| "step": self._state.step_count, | |
| "action_type": action.action_type, | |
| "target": action.target, | |
| }) | |
| # Validate target | |
| if action.target not in self._scenario.services: | |
| self._last_action_result = ( | |
| f"Invalid target '{action.target}'. " | |
| f"Valid targets: {self._scenario.services}" | |
| ) | |
| reward = -0.05 | |
| elif action.action_type == "inspect_logs": | |
| logs, reward = self._handle_inspect_logs(action.target) | |
| elif action.action_type == "inspect_config": | |
| config_snapshot, reward = self._handle_inspect_config(action.target) | |
| elif action.action_type == "inspect_endpoint": | |
| api_response, reward = self._handle_inspect_endpoint(action.target) | |
| elif action.action_type == "submit_fix": | |
| reward = self._handle_submit_fix(action.target, action.fix_payload or {}) | |
| else: | |
| self._last_action_result = ( | |
| f"Invalid action_type '{action.action_type}'. " | |
| "Valid types: inspect_logs, inspect_config, inspect_endpoint, submit_fix" | |
| ) | |
| reward = -0.05 | |
| self._cumulative_reward += reward | |
| # Check episode termination | |
| remaining = self._scenario.max_steps - self._state.step_count | |
| all_fixed = len(self._issues_fixed) == len(self._scenario.issues) | |
| if all_fixed: | |
| self._done = True | |
| reward += 0.2 # completion bonus | |
| self._cumulative_reward += 0.2 | |
| self._last_action_result += " 🎉 All issues fixed! Episode complete." | |
| if remaining <= 0 and not self._done: | |
| self._done = True | |
| self._last_action_result += " ⏰ Out of steps. Episode ended." | |
| # Build dependency graph | |
| dep_graph = {} | |
| for svc_name, node in self._scenario.service_graph.items(): | |
| dep_graph[svc_name] = node.depends_on | |
| return ApiDebugObservation( | |
| task_id=self._task_id, | |
| task_description=self._scenario.description, | |
| logs=logs, | |
| config_snapshot=config_snapshot, | |
| api_response=api_response, | |
| hints=self._get_hints(), | |
| remaining_steps=max(0, remaining), | |
| issues_found=len(self._issues_found), | |
| issues_fixed=len(self._issues_fixed), | |
| issues_total=len(self._scenario.issues), | |
| action_result=self._last_action_result, | |
| available_targets=self._scenario.services, | |
| done=self._done, | |
| reward=reward, | |
| service_status=dict(self._service_health), | |
| dependency_graph=dep_graph, | |
| error_trace=self._build_error_trace(), | |
| metadata={ | |
| "cumulative_reward": self._cumulative_reward, | |
| "step": self._state.step_count, | |
| "issues_found_ids": list(self._issues_found), | |
| "issues_fixed_ids": list(self._issues_fixed), | |
| }, | |
| ) | |
| def state(self) -> State: | |
| """Get current environment state.""" | |
| return self._state | |
| # ─── Action Handlers ────────────────────────────────────────────────── | |
| def _handle_inspect_logs(self, target: str) -> tuple: | |
| """Return logs for a service and reward for relevant inspection.""" | |
| assert self._scenario is not None | |
| # Combine static logs with dynamic logs from fixes | |
| static_logs = self._scenario.logs.get(target, []) | |
| dynamic_logs = self._dynamic_log_buffer.get(target, []) | |
| logs = static_logs + dynamic_logs | |
| inspect_key = f"logs:{target}" | |
| is_repeat = inspect_key in self._inspected_targets | |
| self._inspected_targets.add(inspect_key) | |
| # Track that this service was inspected (for strategy scoring) | |
| self._diagnosed_before_fix.add(target) | |
| # Check if any unfound issues have log hints in these logs | |
| found_new = False | |
| for issue in self._scenario.issues: | |
| if issue.issue_id not in self._issues_found: | |
| for log_line in logs: | |
| if issue.log_hint in log_line: | |
| self._issues_found.add(issue.issue_id) | |
| found_new = True | |
| if found_new: | |
| reward = 0.15 | |
| self._last_action_result = f"Inspected logs for '{target}'. Found relevant error patterns!" | |
| elif is_repeat and not dynamic_logs: | |
| reward = 0.0 # No reward for re-inspecting same logs with no changes | |
| self._last_action_result = f"Re-inspected logs for '{target}'. No new information." | |
| elif is_repeat and dynamic_logs: | |
| reward = 0.05 # Some reward for checking updated logs | |
| self._last_action_result = f"Re-inspected logs for '{target}'. New entries found after recent fixes." | |
| elif logs: | |
| reward = 0.05 | |
| self._last_action_result = f"Inspected logs for '{target}'. {len(logs)} log entries found." | |
| else: | |
| reward = 0.0 | |
| self._last_action_result = f"No logs available for '{target}'." | |
| return logs, reward | |
| def _handle_inspect_config(self, target: str) -> tuple: | |
| """Return current config for a service.""" | |
| assert self._scenario is not None | |
| config = self._current_configs.get(target, {}) | |
| inspect_key = f"config:{target}" | |
| is_repeat = inspect_key in self._inspected_targets | |
| self._inspected_targets.add(inspect_key) | |
| # Track that this service was inspected (for strategy scoring) | |
| self._diagnosed_before_fix.add(target) | |
| # Reward based on relevance and novelty | |
| has_issues = any( | |
| i.service == target | |
| for i in self._scenario.issues | |
| if i.issue_id not in self._issues_fixed | |
| ) | |
| if is_repeat: | |
| reward = 0.0 # No reward for re-inspecting same config | |
| self._last_action_result = f"Re-inspected config for '{target}'. No changes since last check." | |
| elif has_issues: | |
| reward = 0.05 | |
| self._last_action_result = f"Inspected config for '{target}'. Configuration retrieved." | |
| else: | |
| reward = 0.01 | |
| self._last_action_result = f"Inspected config for '{target}'. No issues detected in this service." | |
| return config, reward | |
| def _handle_inspect_endpoint(self, target: str) -> tuple: | |
| """Simulate testing an endpoint. Response changes based on current fix state.""" | |
| assert self._scenario is not None | |
| # Track that this service was inspected | |
| self._diagnosed_before_fix.add(target) | |
| # Find unfixed issues for this service | |
| unfixed = [ | |
| i for i in self._scenario.issues | |
| if i.service == target and i.issue_id not in self._issues_fixed | |
| ] | |
| # Also check if any DEPENDENCY issues are unfixed (cascade simulation) | |
| upstream_broken = False | |
| if target in self._scenario.service_graph: | |
| node = self._scenario.service_graph[target] | |
| for dep_svc in node.depends_on: | |
| dep_unfixed = [ | |
| i for i in self._scenario.issues | |
| if i.service == dep_svc and i.issue_id not in self._issues_fixed | |
| ] | |
| if dep_unfixed: | |
| upstream_broken = True | |
| if unfixed: | |
| issue = unfixed[0] | |
| # Determine status code based on issue category | |
| status_codes = { | |
| "authentication": 401, | |
| "protocol": 415, | |
| "networking": 504, | |
| "configuration": 500, | |
| } | |
| status_code = status_codes.get(issue.category, 500) | |
| api_response = { | |
| "status": "error", | |
| "status_code": status_code, | |
| "error": issue.description, | |
| "hint": f"Check the {issue.fix_key} configuration for '{target}'", | |
| "service_health": self._service_health.get(target, "unknown"), | |
| } | |
| reward = 0.05 | |
| self._last_action_result = f"Tested endpoint on '{target}'. Got {status_code} error response." | |
| elif upstream_broken: | |
| api_response = { | |
| "status": "degraded", | |
| "status_code": 503, | |
| "error": f"{target} configuration is correct but upstream dependencies are failing.", | |
| "hint": "Fix upstream services first — check the dependency graph.", | |
| "service_health": "degraded", | |
| } | |
| reward = 0.03 | |
| self._last_action_result = f"Tested '{target}'. Service config OK but upstream is broken." | |
| else: | |
| api_response = { | |
| "status": "success", | |
| "status_code": 200, | |
| "message": f"{target} is working correctly.", | |
| "service_health": "healthy", | |
| } | |
| reward = 0.02 | |
| self._last_action_result = f"Tested endpoint on '{target}'. Service responding OK." | |
| return api_response, reward | |
| def _handle_submit_fix(self, target: str, fix_payload: Dict[str, Any]) -> float: | |
| """Process a fix submission with strict validation and cascade effects.""" | |
| assert self._scenario is not None | |
| if not fix_payload: | |
| self._last_action_result = "Fix rejected: fix_payload cannot be empty." | |
| return -0.1 | |
| # Find issues for this target service | |
| target_issues = [ | |
| i for i in self._scenario.issues | |
| if i.service == target and i.issue_id not in self._issues_fixed | |
| ] | |
| if not target_issues: | |
| self._last_action_result = f"No unfixed issues found for '{target}'." | |
| return -0.05 | |
| reward = 0.0 | |
| fixed_any = False | |
| partial_credit = False | |
| # Check if the agent inspected this service before submitting | |
| inspected_first = target in self._diagnosed_before_fix | |
| for issue in target_issues: | |
| match_result = self._check_fix(issue, fix_payload) | |
| if match_result == "exact": | |
| self._issues_fixed.add(issue.issue_id) | |
| self._issues_found.add(issue.issue_id) | |
| self._apply_fix(target, fix_payload) | |
| self._update_service_health(issue) | |
| self._inject_dynamic_logs(issue) | |
| reward += 0.25 | |
| fixed_any = True | |
| # Bonus for inspecting before fixing (strategy reward) | |
| if inspected_first: | |
| reward += 0.05 | |
| elif match_result == "partial": | |
| # Right key, close value — give partial credit | |
| partial_credit = True | |
| reward += 0.03 | |
| if fixed_any: | |
| fixed_count = sum(1 for i in target_issues if i.issue_id in self._issues_fixed) | |
| self._last_action_result = ( | |
| f"Fix accepted for '{target}'! " | |
| f"Fixed {fixed_count} issue(s). " | |
| f"Total fixed: {len(self._issues_fixed)}/{len(self._scenario.issues)}" | |
| ) | |
| elif partial_credit: | |
| self._last_action_result = ( | |
| f"Fix partially correct for '{target}'. " | |
| "The key is right but the value isn't quite right. Check the logs for exact values." | |
| ) | |
| else: | |
| self._last_action_result = ( | |
| f"Fix rejected for '{target}'. The payload doesn't address any known issues. " | |
| "Try inspecting logs and config to identify the correct fix." | |
| ) | |
| reward = -0.1 | |
| return reward | |
| # ─── Dynamic State Methods ──────────────────────────────────────────── | |
| def _update_service_health(self, fixed_issue: Issue) -> None: | |
| """Update service health status after an issue is fixed.""" | |
| assert self._scenario is not None | |
| # Check if the fixed service has any remaining issues | |
| remaining = [ | |
| i for i in self._scenario.issues | |
| if i.service == fixed_issue.service and i.issue_id not in self._issues_fixed | |
| ] | |
| if not remaining: | |
| self._service_health[fixed_issue.service] = "healthy" | |
| else: | |
| self._service_health[fixed_issue.service] = "degraded" | |
| # Update downstream services affected by cascade | |
| for affected_svc, _effect in fixed_issue.cascade_effects.items(): | |
| if affected_svc in self._service_health: | |
| # Check if the affected service still has its own issues | |
| svc_issues = [ | |
| i for i in self._scenario.issues | |
| if i.service == affected_svc and i.issue_id not in self._issues_fixed | |
| ] | |
| if not svc_issues: | |
| # Check if all upstream deps are healthy | |
| if affected_svc in self._scenario.service_graph: | |
| upstream_healthy = all( | |
| self._service_health.get(dep, "error") == "healthy" | |
| for dep in self._scenario.service_graph[affected_svc].depends_on | |
| ) | |
| if upstream_healthy: | |
| self._service_health[affected_svc] = "healthy" | |
| else: | |
| self._service_health[affected_svc] = "degraded" | |
| else: | |
| self._service_health[affected_svc] = "healthy" | |
| def _inject_dynamic_logs(self, fixed_issue: Issue) -> None: | |
| """Inject new log entries after an issue is fixed.""" | |
| assert self._scenario is not None | |
| if fixed_issue.issue_id in self._scenario.dynamic_logs: | |
| for svc, new_logs in self._scenario.dynamic_logs[fixed_issue.issue_id].items(): | |
| if svc in self._dynamic_log_buffer: | |
| self._dynamic_log_buffer[svc].extend(new_logs) | |
| def _build_error_trace(self) -> List[str]: | |
| """Build an error propagation trace showing cascade chain.""" | |
| if self._scenario is None: | |
| return [] | |
| trace = [] | |
| for issue in self._scenario.issues: | |
| if issue.issue_id not in self._issues_fixed: | |
| trace.append( | |
| f"[{issue.severity.upper()}] {issue.service}: {issue.description}" | |
| ) | |
| for affected_svc, effect in issue.cascade_effects.items(): | |
| trace.append(f" └─> {affected_svc}: {effect}") | |
| if not trace: | |
| trace.append("All issues resolved. No error cascades active.") | |
| return trace | |
| # ─── Helper Methods ─────────────────────────────────────────────────── | |
| def _normalize_value(value: Any) -> Any: | |
| """Normalize a value for comparison (lowercase strings, sort lists, etc.).""" | |
| if isinstance(value, str): | |
| return value.strip().lower() | |
| if isinstance(value, list): | |
| return sorted([ApiDebugEnvironment._normalize_value(v) for v in value], key=str) | |
| if isinstance(value, dict): | |
| return {k: ApiDebugEnvironment._normalize_value(v) for k, v in value.items()} | |
| return value | |
| def _values_match(self, expected: Any, submitted: Any) -> bool: | |
| """ | |
| Check if a submitted value matches the expected value. | |
| Supports: | |
| - Exact match | |
| - Case-insensitive string match | |
| - Numeric tolerance (10%) | |
| - Boolean coercion (e.g., "true" -> True) | |
| - List containment (submitted must contain all expected elements) | |
| - Pattern match for token-like values (Bearer <anything> matches Bearer <token>) | |
| """ | |
| # Normalize both | |
| norm_expected = self._normalize_value(expected) | |
| norm_submitted = self._normalize_value(submitted) | |
| # Exact match after normalization | |
| if norm_expected == norm_submitted: | |
| return True | |
| # Numeric comparison with tolerance (10% — tighter than before) | |
| if isinstance(expected, (int, float)) and isinstance(submitted, (int, float)): | |
| if expected == 0: | |
| return submitted == 0 | |
| return abs(expected - submitted) / max(abs(expected), 1) < 0.10 | |
| # Boolean coercion | |
| if isinstance(expected, bool): | |
| if isinstance(submitted, str): | |
| return submitted.lower() in ("true", "1", "yes") if expected else submitted.lower() in ("false", "0", "no") | |
| return bool(submitted) == expected | |
| # String pattern match for tokens: "Bearer <token>" matches "Bearer <anything>" | |
| if isinstance(expected, str) and isinstance(submitted, str): | |
| exp_lower = expected.strip().lower() | |
| sub_lower = submitted.strip().lower() | |
| # If expected has a placeholder like <token>, accept any non-empty value | |
| if "<" in exp_lower and ">" in exp_lower: | |
| prefix = exp_lower.split("<")[0].strip() | |
| if prefix and sub_lower.startswith(prefix) and len(sub_lower) > len(prefix): | |
| return True | |
| # If submitted has same prefix structure | |
| if exp_lower.startswith("bearer ") and sub_lower.startswith("bearer "): | |
| return len(sub_lower) > len("bearer ") | |
| # List: submitted must contain all expected elements | |
| if isinstance(expected, list) and isinstance(submitted, list): | |
| return all(any(self._values_match(e, s) for s in submitted) for e in expected) | |
| return False | |
| def _values_close(self, expected: Any, submitted: Any) -> bool: | |
| """Check if values are 'close' for partial credit (same type, right ballpark).""" | |
| if isinstance(expected, (int, float)) and isinstance(submitted, (int, float)): | |
| if expected == 0: | |
| return abs(submitted) < 5 | |
| return abs(expected - submitted) / max(abs(expected), 1) < 0.50 | |
| if isinstance(expected, str) and isinstance(submitted, str): | |
| # Same prefix / similar structure | |
| return expected.split("/")[0].lower() == submitted.split("/")[0].lower() | |
| if isinstance(expected, bool) and isinstance(submitted, bool): | |
| return True # Right type at least | |
| return False | |
| def _check_fix(self, issue: Issue, fix_payload: Dict[str, Any]) -> str: | |
| """ | |
| Check if a fix payload correctly addresses an issue. | |
| Returns: | |
| 'exact' if fix is correct | |
| 'partial' if fix has right key but wrong value | |
| 'none' if fix doesn't match at all | |
| """ | |
| found_key = False | |
| # Direct key match with value validation | |
| if issue.fix_key in fix_payload: | |
| found_key = True | |
| expected_val = issue.expected_fix.get(issue.fix_key) | |
| if expected_val is not None: | |
| if self._values_match(expected_val, fix_payload[issue.fix_key]): | |
| return "exact" | |
| elif self._values_close(expected_val, fix_payload[issue.fix_key]): | |
| return "partial" | |
| return "none" # Right key, wrong value | |
| # If the submitted value is a dict and expected_fix has nested keys | |
| submitted_val = fix_payload[issue.fix_key] | |
| if isinstance(submitted_val, dict): | |
| nested_prefix = issue.fix_key + "." | |
| nested_expected = { | |
| k[len(nested_prefix):]: v | |
| for k, v in issue.expected_fix.items() | |
| if k.startswith(nested_prefix) | |
| } | |
| if nested_expected: | |
| all_match = all( | |
| k in submitted_val and self._values_match(v, submitted_val[k]) | |
| for k, v in nested_expected.items() | |
| ) | |
| if all_match: | |
| return "exact" | |
| # Check partial | |
| any_match = any( | |
| k in submitted_val and self._values_match(v, submitted_val[k]) | |
| for k, v in nested_expected.items() | |
| ) | |
| if any_match: | |
| return "partial" | |
| return "none" | |
| # No expected value found — this shouldn't happen with well-defined issues | |
| # Do NOT accept blindly — require value validation | |
| return "none" | |
| # Check nested key (e.g., "headers.Authorization" -> check payload for "Authorization") | |
| if "." in issue.fix_key: | |
| parts = issue.fix_key.split(".") | |
| leaf_key = parts[-1] | |
| if leaf_key in fix_payload: | |
| found_key = True | |
| expected_val = issue.expected_fix.get(issue.fix_key) | |
| if expected_val is not None: | |
| if self._values_match(expected_val, fix_payload[leaf_key]): | |
| return "exact" | |
| elif self._values_close(expected_val, fix_payload[leaf_key]): | |
| return "partial" | |
| return "none" | |
| return "none" | |
| # Check expected fix keys with value validation | |
| for key, expected_val in issue.expected_fix.items(): | |
| # Direct key in payload | |
| if key in fix_payload: | |
| found_key = True | |
| if self._values_match(expected_val, fix_payload[key]): | |
| return "exact" | |
| # Nested key leaf match | |
| if "." in key: | |
| leaf = key.split(".")[-1] | |
| if leaf in fix_payload: | |
| found_key = True | |
| if self._values_match(expected_val, fix_payload[leaf]): | |
| return "exact" | |
| if found_key: | |
| return "partial" # Found the key but value didn't match | |
| return "none" | |
| def _apply_fix(self, target: str, fix_payload: Dict[str, Any]) -> None: | |
| """Apply a fix to the current configuration.""" | |
| if target not in self._current_configs: | |
| return | |
| config = self._current_configs[target] | |
| for key, value in fix_payload.items(): | |
| if "." in key: | |
| # Nested key: e.g., "headers.Authorization" | |
| parts = key.split(".") | |
| obj = config | |
| for part in parts[:-1]: | |
| if part not in obj: | |
| obj[part] = {} | |
| obj = obj[part] | |
| obj[parts[-1]] = value | |
| else: | |
| config[key] = value | |
| def _get_hints(self) -> List[str]: | |
| """Return progressive hints based on step count and progress.""" | |
| if self._scenario is None: | |
| return [] | |
| hints = [] | |
| step = self._state.step_count | |
| total_issues = len(self._scenario.issues) | |
| unfixed = total_issues - len(self._issues_fixed) | |
| if step == 0: | |
| hints.append("Start by inspecting error logs for each service to find clues.") | |
| hints.append(f"There are {total_issues} issues to find and fix.") | |
| if self._scenario.context: | |
| hints.append(f"Context: {self._scenario.context}") | |
| elif step > 0 and len(self._issues_found) == 0: | |
| hints.append("Try 'inspect_logs' on different services to find error patterns.") | |
| elif len(self._issues_found) > 0 and len(self._issues_fixed) == 0: | |
| hints.append("You've found issues! Use 'inspect_config' to see current settings, then 'submit_fix'.") | |
| elif unfixed > 0: | |
| hints.append(f"{unfixed} issue(s) remaining. Check services you haven't inspected yet.") | |
| # Dependency hints | |
| for issue in self._scenario.issues: | |
| if issue.issue_id not in self._issues_fixed and issue.depends_on: | |
| deps_met = all(d in self._issues_fixed for d in issue.depends_on) | |
| if not deps_met: | |
| dep_names = [ | |
| next((i.service for i in self._scenario.issues if i.issue_id == d), d) | |
| for d in issue.depends_on | |
| ] | |
| if len(self._issues_fixed) > 0: | |
| hints.append( | |
| f"Some issues may be masked by upstream failures. " | |
| f"Check services: {', '.join(set(dep_names))}" | |
| ) | |
| break | |
| # Late-game hints | |
| if self._scenario.max_steps - step <= 5 and unfixed > 0: | |
| for issue in self._scenario.issues: | |
| if issue.issue_id not in self._issues_fixed: | |
| hints.append( | |
| f"Hint: Check '{issue.service}' — look for '{issue.fix_key}' in the config." | |
| ) | |
| return hints | |
| # ─── Multi-Dimensional Grading ──────────────────────────────────────── | |
| def grade(self) -> float: | |
| """ | |
| Grade the agent's performance using a multi-dimensional rubric. | |
| Score = weighted_average( | |
| diagnosis_score × 0.20, # Did the agent inspect before fixing? | |
| fix_score × 0.40, # Issues fixed / total | |
| efficiency_score × 0.15, # Steps used vs available | |
| strategy_score × 0.25, # Logical debugging approach | |
| ) | |
| Returns: | |
| Score strictly between 0 and 1 (exclusive): in range (0.001, 0.999) | |
| """ | |
| if self._scenario is None: | |
| return 0.001 | |
| total = len(self._scenario.issues) | |
| if total == 0: | |
| return 0.999 | |
| # 1. Fix Score (40% weight) — most important | |
| fix_ratio = len(self._issues_fixed) / total | |
| fix_score = fix_ratio | |
| # 2. Diagnosis Score (20% weight) — did you inspect before fixing? | |
| if self._issues_fixed: | |
| diagnosed_count = sum( | |
| 1 for issue_id in self._issues_fixed | |
| if any( | |
| i.service in self._diagnosed_before_fix | |
| for i in self._scenario.issues | |
| if i.issue_id == issue_id | |
| ) | |
| ) | |
| diagnosis_score = diagnosed_count / len(self._issues_fixed) | |
| else: | |
| # Give partial credit for exploration even without fixes | |
| diagnosis_score = min(1.0, len(self._inspected_targets) / max(1, len(self._scenario.services))) | |
| # 3. Efficiency Score (15% weight) — faster is better | |
| remaining = max(0, self._scenario.max_steps - self._state.step_count) | |
| efficiency_score = remaining / self._scenario.max_steps | |
| # 4. Strategy Score (25% weight) — logical debugging approach | |
| strategy_score = self._compute_strategy_score() | |
| # Weighted combination | |
| score = ( | |
| fix_score * 0.40 + | |
| diagnosis_score * 0.20 + | |
| efficiency_score * 0.15 + | |
| strategy_score * 0.25 | |
| ) | |
| # Clamp strictly to (0.001, 0.999) — NEVER exactly 0.0 or 1.0 | |
| return max(0.001, min(0.999, round(score, 4))) | |
| def _compute_strategy_score(self) -> float: | |
| """ | |
| Score the agent's debugging strategy. | |
| Good strategy: | |
| - Inspect logs before configs (logs have more diagnostic info) | |
| - Don't repeat the same inspection | |
| - Fix issues in dependency order | |
| - Don't submit fixes without inspecting first | |
| """ | |
| if not self._action_history: | |
| return 0.0 | |
| score = 0.0 | |
| total_checks = 0 | |
| # Check 1: Did the agent inspect logs before submitting any fix? | |
| first_fix_step = None | |
| first_inspect_step = None | |
| for action in self._action_history: | |
| if action["action_type"] == "submit_fix" and first_fix_step is None: | |
| first_fix_step = action["step"] | |
| if action["action_type"] in ("inspect_logs", "inspect_config") and first_inspect_step is None: | |
| first_inspect_step = action["step"] | |
| total_checks += 1 | |
| if first_inspect_step is not None and (first_fix_step is None or first_inspect_step < first_fix_step): | |
| score += 1.0 # Inspected before fixing | |
| # Check 2: Ratio of unique inspections to total inspections | |
| total_inspections = sum( | |
| 1 for a in self._action_history | |
| if a["action_type"] in ("inspect_logs", "inspect_config", "inspect_endpoint") | |
| ) | |
| unique_inspections = len(self._inspected_targets) | |
| total_checks += 1 | |
| if total_inspections > 0: | |
| score += min(1.0, unique_inspections / total_inspections) | |
| # Check 3: Did fixes follow dependency order? | |
| if self._scenario and self._scenario.optimal_fix_order and len(self._issues_fixed) > 1: | |
| total_checks += 1 | |
| fix_order = [] | |
| for action in self._action_history: | |
| if action["action_type"] == "submit_fix": | |
| # Find which issue was fixed in this step | |
| for issue_id in self._issues_fixed: | |
| issue = next((i for i in self._scenario.issues if i.issue_id == issue_id), None) | |
| if issue and issue_id not in fix_order: | |
| fix_order.append(issue_id) | |
| # Compare fix order with optimal order | |
| optimal = [o for o in self._scenario.optimal_fix_order if o in fix_order] | |
| if len(optimal) > 1: | |
| in_order = sum( | |
| 1 for i in range(len(fix_order) - 1) | |
| if fix_order[i] in optimal and fix_order[i+1] in optimal | |
| and optimal.index(fix_order[i]) < optimal.index(fix_order[i+1]) | |
| ) | |
| score += in_order / max(1, len(fix_order) - 1) | |
| # Check 4: Did the agent use a variety of action types? | |
| total_checks += 1 | |
| action_types_used = set(a["action_type"] for a in self._action_history) | |
| score += len(action_types_used) / 4.0 # 4 possible action types | |
| return score / total_checks if total_checks > 0 else 0.0 | |
| def get_task_info(self) -> Dict[str, Any]: | |
| """Return information about the current task.""" | |
| if self._scenario is None: | |
| return {"error": "Environment not initialized. Call reset() first."} | |
| return { | |
| "task_id": self._task_id, | |
| "difficulty": self._scenario.difficulty, | |
| "description": self._scenario.description, | |
| "max_steps": self._scenario.max_steps, | |
| "issues_total": len(self._scenario.issues), | |
| "services": self._scenario.services, | |
| "service_dependencies": { | |
| svc: node.depends_on | |
| for svc, node in self._scenario.service_graph.items() | |
| }, | |
| "context": self._scenario.context, | |
| "action_schema": { | |
| "action_type": { | |
| "type": "string", | |
| "enum": ["inspect_logs", "inspect_config", "inspect_endpoint", "submit_fix"], | |
| "description": "The type of debugging action to take", | |
| }, | |
| "target": { | |
| "type": "string", | |
| "enum": self._scenario.services, | |
| "description": "The service to act on", | |
| }, | |
| "fix_payload": { | |
| "type": "object", | |
| "description": "Configuration fix (required for submit_fix action)", | |
| "required": False, | |
| }, | |
| }, | |
| } | |