Spaces:
Running
Running
| """ | |
| 10 error injection functions for the API Debug Environment. | |
| Each injector takes a valid request + headers + spec + RNG and returns: | |
| (broken_request, broken_headers, ground_truth) | |
| ground_truth contains the error_type, affected_fields, and the original | |
| valid request/headers so the grader knows the correct answer. | |
| """ | |
| import copy | |
| import random as random_module | |
| from typing import Any, Dict, List, Tuple | |
| GroundTruth = Dict[str, Any] | |
| InjectorResult = Tuple[Dict[str, Any], Dict[str, str], GroundTruth] | |
| def _ground_truth( | |
| error_type: str, | |
| affected_fields: List[str], | |
| valid_request: Dict[str, Any], | |
| valid_headers: Dict[str, str], | |
| ) -> GroundTruth: | |
| """Build a standard ground truth dict.""" | |
| return { | |
| "error_type": error_type, | |
| "affected_fields": affected_fields, | |
| "valid_request": valid_request, | |
| "valid_headers": valid_headers, | |
| } | |
| # ========================================================================= | |
| # 1. missing_required_field | |
| # ========================================================================= | |
| def inject_missing_required_field( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Remove a random required field from the request.""" | |
| broken = copy.deepcopy(request) | |
| candidates = [f for f in spec["required_fields"] if f in broken] | |
| if not candidates: | |
| return broken, headers, _ground_truth( | |
| "missing_required_field", [], request, headers | |
| ) | |
| field = rng.choice(candidates) | |
| del broken[field] | |
| return broken, headers, _ground_truth( | |
| "missing_required_field", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 2. wrong_field_type | |
| # ========================================================================= | |
| def inject_wrong_field_type( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Change a field's value to the wrong type (e.g. int to string).""" | |
| broken = copy.deepcopy(request) | |
| candidates = [f for f in spec["required_fields"] if f in broken] | |
| if not candidates: | |
| return broken, headers, _ground_truth( | |
| "wrong_field_type", [], request, headers | |
| ) | |
| field = rng.choice(candidates) | |
| original = broken[field] | |
| # Pick a wrong type based on what the original is | |
| if isinstance(original, int): | |
| broken[field] = str(original) | |
| elif isinstance(original, float): | |
| broken[field] = str(original) | |
| elif isinstance(original, bool): | |
| broken[field] = "true" | |
| elif isinstance(original, str): | |
| broken[field] = 12345 | |
| elif isinstance(original, list): | |
| broken[field] = "should_be_array" | |
| elif isinstance(original, dict): | |
| broken[field] = "should_be_object" | |
| else: | |
| broken[field] = "wrong_type" | |
| return broken, headers, _ground_truth( | |
| "wrong_field_type", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 3. invalid_email_format | |
| # ========================================================================= | |
| def inject_invalid_email_format( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Corrupt an email field to an invalid format.""" | |
| broken = copy.deepcopy(request) | |
| email_fields = [ | |
| f for f in spec["field_types"] | |
| if spec["field_types"][f] == "email" and f in broken | |
| ] | |
| if not email_fields: | |
| # Fallback: inject a missing field instead | |
| return inject_missing_required_field(request, headers, spec, rng) | |
| field = rng.choice(email_fields) | |
| bad_emails = ["not-an-email", "user@", "@domain.com", "user@.com", "user space@example.com"] | |
| broken[field] = rng.choice(bad_emails) | |
| return broken, headers, _ground_truth( | |
| "invalid_email_format", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 4. missing_auth_header | |
| # ========================================================================= | |
| def inject_missing_auth_header( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Remove the Authorization header.""" | |
| broken_headers = copy.deepcopy(headers) | |
| if "Authorization" in broken_headers: | |
| del broken_headers["Authorization"] | |
| return request, broken_headers, _ground_truth( | |
| "missing_auth_header", ["Authorization"], request, headers | |
| ) | |
| # If no auth header exists in spec, remove Content-Type instead | |
| if "Content-Type" in broken_headers: | |
| del broken_headers["Content-Type"] | |
| return request, broken_headers, _ground_truth( | |
| "missing_auth_header", ["Content-Type"], request, headers | |
| ) | |
| return request, broken_headers, _ground_truth( | |
| "missing_auth_header", [], request, headers | |
| ) | |
| # ========================================================================= | |
| # 5. extra_unknown_field | |
| # ========================================================================= | |
| def inject_extra_unknown_field( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Add a field that is not in the spec.""" | |
| broken = copy.deepcopy(request) | |
| unknown_fields = [ | |
| ("unknown_field", "unexpected_value"), | |
| ("debug_mode", True), | |
| ("internal_id", 99999), | |
| ("_private", "should_not_exist"), | |
| ("extra_data", {"nested": "bad"}), | |
| ] | |
| field_name, field_value = rng.choice(unknown_fields) | |
| broken[field_name] = field_value | |
| return broken, headers, _ground_truth( | |
| "extra_unknown_field", [field_name], request, headers | |
| ) | |
| # ========================================================================= | |
| # 6. null_value_in_required | |
| # ========================================================================= | |
| def inject_null_value_in_required( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Set a required field to null.""" | |
| broken = copy.deepcopy(request) | |
| candidates = [f for f in spec["required_fields"] if f in broken] | |
| if not candidates: | |
| return broken, headers, _ground_truth( | |
| "null_value_in_required", [], request, headers | |
| ) | |
| field = rng.choice(candidates) | |
| broken[field] = None | |
| return broken, headers, _ground_truth( | |
| "null_value_in_required", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 7. wrong_http_method | |
| # ========================================================================= | |
| def inject_wrong_http_method( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Indicate the wrong HTTP method was used. | |
| The error is stored in the ground truth. The request body stays the same | |
| but the observation will show a different http_method. | |
| """ | |
| all_methods = ["GET", "POST", "PUT", "PATCH", "DELETE"] | |
| correct = spec["http_method"] | |
| wrong_methods = [m for m in all_methods if m != correct] | |
| wrong = rng.choice(wrong_methods) | |
| gt = _ground_truth("wrong_http_method", ["http_method"], request, headers) | |
| gt["wrong_method"] = wrong | |
| gt["correct_method"] = correct | |
| return request, headers, gt | |
| # ========================================================================= | |
| # 8. malformed_json_value | |
| # ========================================================================= | |
| def inject_malformed_json_value( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Corrupt a field value so it looks like broken JSON. | |
| Since we work with Python dicts (already parsed), we simulate this | |
| by inserting strings that look like malformed JSON fragments. | |
| """ | |
| broken = copy.deepcopy(request) | |
| candidates = [f for f in spec["required_fields"] if f in broken] | |
| if not candidates: | |
| return broken, headers, _ground_truth( | |
| "malformed_json_value", [], request, headers | |
| ) | |
| field = rng.choice(candidates) | |
| bad_values = [ | |
| "{broken", | |
| "[unclosed", | |
| "value with 'mixed\" quotes", | |
| "undefined", | |
| "NaN", | |
| ] | |
| broken[field] = rng.choice(bad_values) | |
| return broken, headers, _ground_truth( | |
| "malformed_json_value", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 9. invalid_enum_value | |
| # ========================================================================= | |
| def inject_invalid_enum_value( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Use a value not in the enum list for an enum field.""" | |
| broken = copy.deepcopy(request) | |
| enum_fields = [ | |
| f for f in spec["field_types"] | |
| if spec["field_types"][f].startswith("enum:") and f in broken | |
| ] | |
| if not enum_fields: | |
| # Fallback: inject wrong type instead | |
| return inject_wrong_field_type(request, headers, spec, rng) | |
| field = rng.choice(enum_fields) | |
| broken[field] = "INVALID_ENUM_VALUE" | |
| return broken, headers, _ground_truth( | |
| "invalid_enum_value", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 10. datetime_format_error | |
| # ========================================================================= | |
| def inject_datetime_format_error( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Replace a datetime field with the wrong format.""" | |
| broken = copy.deepcopy(request) | |
| datetime_fields = [ | |
| f for f in spec["field_types"] | |
| if spec["field_types"][f] == "datetime" and f in broken | |
| ] | |
| if not datetime_fields: | |
| # Fallback: inject wrong type instead | |
| return inject_wrong_field_type(request, headers, spec, rng) | |
| field = rng.choice(datetime_fields) | |
| bad_formats = [ | |
| "04/01/2026", | |
| "2026.04.01", | |
| "April 1, 2026", | |
| "1711929600", | |
| "2026-04-01 09:00", | |
| ] | |
| broken[field] = rng.choice(bad_formats) | |
| return broken, headers, _ground_truth( | |
| "datetime_format_error", [field], request, headers | |
| ) | |
| # ========================================================================= | |
| # 11. wrong_content_type | |
| # ========================================================================= | |
| def inject_wrong_content_type( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Change Content-Type to an incorrect value.""" | |
| broken_headers = copy.deepcopy(headers) | |
| wrong_types = [ | |
| "text/plain", | |
| "application/xml", | |
| "multipart/form-data", | |
| "text/html", | |
| "application/x-www-form-urlencoded", | |
| ] | |
| if "Content-Type" in broken_headers: | |
| broken_headers["Content-Type"] = rng.choice(wrong_types) | |
| else: | |
| broken_headers["Content-Type"] = rng.choice(wrong_types) | |
| return request, broken_headers, _ground_truth( | |
| "wrong_content_type", ["Content-Type"], request, headers | |
| ) | |
| # ========================================================================= | |
| # 12. expired_auth_token | |
| # ========================================================================= | |
| def inject_expired_token( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Replace the Authorization token with an expired/malformed one.""" | |
| broken_headers = copy.deepcopy(headers) | |
| bad_tokens = [ | |
| "Bearer expired_token_abc123", | |
| "Bearer ", | |
| "Basic dXNlcjpwYXNz", | |
| "Token invalid", | |
| "Bearer eyJhbGciOiJub25lIn0.e30.", | |
| ] | |
| if "Authorization" in broken_headers: | |
| broken_headers["Authorization"] = rng.choice(bad_tokens) | |
| return request, broken_headers, _ground_truth( | |
| "expired_auth_token", ["Authorization"], request, headers | |
| ) | |
| # If no auth header in spec, inject wrong content type instead | |
| return inject_wrong_content_type(request, headers, spec, rng) | |
| # ========================================================================= | |
| # 13. wrong_status_code (for response validation / chained scenarios) | |
| # ========================================================================= | |
| def inject_wrong_status_code( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Record that the wrong HTTP status code would be returned. | |
| Simulates a server returning an unexpected status code. | |
| The ground truth stores the wrong code and the expected code. | |
| """ | |
| correct_status = 200 if spec["http_method"] == "GET" else 201 | |
| wrong_codes = [ | |
| (301, "Moved Permanently - resource redirected"), | |
| (302, "Found - temporary redirect to different endpoint"), | |
| (400, "Bad Request - but request is actually valid"), | |
| (403, "Forbidden - incorrect permissions applied"), | |
| (404, "Not Found - wrong endpoint routing"), | |
| (429, "Too Many Requests - rate limit misconfigured"), | |
| (500, "Internal Server Error - server-side issue"), | |
| (502, "Bad Gateway - upstream service down"), | |
| (503, "Service Unavailable - maintenance mode"), | |
| ] | |
| wrong_status, description = rng.choice(wrong_codes) | |
| gt = _ground_truth("wrong_status_code", ["status_code"], request, headers) | |
| gt["wrong_status"] = wrong_status | |
| gt["correct_status"] = correct_status | |
| gt["description"] = description | |
| return request, headers, gt | |
| # ========================================================================= | |
| # 14. redirect_loop | |
| # ========================================================================= | |
| def inject_redirect_loop( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Simulate a redirect chain issue. | |
| The agent must identify that the endpoint redirects and provide | |
| the correct target endpoint. | |
| """ | |
| redirect_scenarios = [ | |
| { | |
| "from": spec["endpoint"], | |
| "to": spec["endpoint"].rstrip("/") + "/v2", | |
| "reason": "API version upgrade - v1 redirects to v2", | |
| }, | |
| { | |
| "from": spec["endpoint"], | |
| "to": spec["endpoint"].replace("/api/", "/api/v2/"), | |
| "reason": "Base path migration", | |
| }, | |
| { | |
| "from": spec["endpoint"], | |
| "to": spec["endpoint"] + "?format=json", | |
| "reason": "Content negotiation redirect", | |
| }, | |
| ] | |
| scenario = rng.choice(redirect_scenarios) | |
| gt = _ground_truth("redirect_loop", ["endpoint"], request, headers) | |
| gt["redirect_from"] = scenario["from"] | |
| gt["redirect_to"] = scenario["to"] | |
| gt["reason"] = scenario["reason"] | |
| return request, headers, gt | |
| # ========================================================================= | |
| # 15. rate_limit_headers | |
| # ========================================================================= | |
| def inject_rate_limit_headers( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Inject missing or wrong rate limit headers. | |
| Real APIs require headers like X-RateLimit-Limit, Retry-After. | |
| The agent must identify the rate limiting issue and provide correct headers. | |
| """ | |
| broken_headers = copy.deepcopy(headers) | |
| # Add rate limit headers that indicate the client is being throttled | |
| broken_headers["X-RateLimit-Remaining"] = "0" | |
| broken_headers["X-RateLimit-Reset"] = "1712000000" | |
| broken_headers["Retry-After"] = "60" | |
| gt = _ground_truth( | |
| "rate_limit_headers", | |
| ["X-RateLimit-Remaining", "Retry-After"], | |
| request, headers, | |
| ) | |
| gt["issue"] = "Client is rate-limited, must wait or reduce request frequency" | |
| return request, broken_headers, gt | |
| # ========================================================================= | |
| # Registry and helpers | |
| # ========================================================================= | |
| # Header-only error types (used by the headers task) | |
| HEADER_ERROR_TYPES = [ | |
| "missing_auth_header", | |
| "wrong_content_type", | |
| "expired_auth_token", | |
| ] | |
| ERROR_TYPES = [ | |
| "missing_required_field", | |
| "wrong_field_type", | |
| "invalid_email_format", | |
| "missing_auth_header", | |
| "extra_unknown_field", | |
| "null_value_in_required", | |
| "wrong_http_method", | |
| "malformed_json_value", | |
| "invalid_enum_value", | |
| "datetime_format_error", | |
| "wrong_content_type", | |
| "expired_auth_token", | |
| "wrong_status_code", | |
| "redirect_loop", | |
| "rate_limit_headers", | |
| ] | |
| INJECTOR_MAP = { | |
| "missing_required_field": inject_missing_required_field, | |
| "wrong_field_type": inject_wrong_field_type, | |
| "invalid_email_format": inject_invalid_email_format, | |
| "missing_auth_header": inject_missing_auth_header, | |
| "extra_unknown_field": inject_extra_unknown_field, | |
| "null_value_in_required": inject_null_value_in_required, | |
| "wrong_http_method": inject_wrong_http_method, | |
| "malformed_json_value": inject_malformed_json_value, | |
| "invalid_enum_value": inject_invalid_enum_value, | |
| "datetime_format_error": inject_datetime_format_error, | |
| "wrong_content_type": inject_wrong_content_type, | |
| "expired_auth_token": inject_expired_token, | |
| "wrong_status_code": inject_wrong_status_code, | |
| "redirect_loop": inject_redirect_loop, | |
| "rate_limit_headers": inject_rate_limit_headers, | |
| } | |
| def inject_error( | |
| error_type: str, | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| ) -> InjectorResult: | |
| """Inject a single error of the specified type.""" | |
| injector = INJECTOR_MAP[error_type] | |
| return injector(request, headers, spec, rng) | |
| # Chain patterns for realistic multi-step debugging scenarios | |
| CHAIN_PATTERNS = [ | |
| # Pattern 1: Auth gate -> body errors | |
| # Real-world: API returns 401 first, body validation only runs after auth passes | |
| { | |
| "name": "auth_gate", | |
| "gate_types": ["missing_auth_header", "expired_auth_token"], | |
| "body_pool": None, # uses all body types | |
| }, | |
| # Pattern 2: Content-type gate -> type mismatches | |
| # Real-world: Wrong Content-Type causes parser to misinterpret the body | |
| { | |
| "name": "content_type_gate", | |
| "gate_types": ["wrong_content_type"], | |
| "body_pool": ["wrong_field_type", "malformed_json_value", "invalid_enum_value"], | |
| }, | |
| # Pattern 3: Method + endpoint chain | |
| # Real-world: Wrong method returns 405, then wrong fields for the correct method | |
| { | |
| "name": "method_chain", | |
| "gate_types": ["wrong_http_method"], | |
| "body_pool": ["missing_required_field", "extra_unknown_field", "null_value_in_required"], | |
| }, | |
| # Pattern 4: Rate limit + auth | |
| # Real-world: Rate limited, and when retrying the token has expired | |
| { | |
| "name": "rate_limit_chain", | |
| "gate_types": ["rate_limit_headers"], | |
| "body_pool": ["expired_auth_token", "missing_required_field"], | |
| }, | |
| # Pattern 5: Redirect + body errors | |
| # Real-world: Endpoint moved, client follows redirect but sends wrong body format | |
| { | |
| "name": "redirect_chain", | |
| "gate_types": ["redirect_loop"], | |
| "body_pool": ["wrong_field_type", "datetime_format_error", "invalid_email_format"], | |
| }, | |
| ] | |
| def inject_chained_errors( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| count: int = 2, | |
| ) -> Tuple[Dict[str, Any], Dict[str, str], List[GroundTruth]]: | |
| """Inject errors in a realistic dependency chain. | |
| Picks a chain pattern, injects the gate error first, then body errors. | |
| Ground truths are ordered: gate errors first, body errors second. | |
| This ordering lets the environment progressively reveal errors. | |
| """ | |
| broken_req = copy.deepcopy(request) | |
| broken_hdrs = copy.deepcopy(headers) | |
| chain: List[GroundTruth] = [] | |
| # Pick a random chain pattern | |
| pattern = rng.choice(CHAIN_PATTERNS) | |
| # Inject the gate error | |
| gate_type = rng.choice(pattern["gate_types"]) | |
| injector = INJECTOR_MAP[gate_type] | |
| broken_req, broken_hdrs, gt = injector(broken_req, broken_hdrs, spec, rng) | |
| chain.append(gt) | |
| # Inject body errors from the pattern's pool (or all body types) | |
| body_pool = pattern["body_pool"] | |
| if body_pool is None: | |
| body_pool = [t for t in ERROR_TYPES if t not in HEADER_ERROR_TYPES | |
| and t not in ("wrong_status_code", "redirect_loop", "rate_limit_headers")] | |
| body_count = max(1, count - 1) | |
| available = [t for t in body_pool if t in INJECTOR_MAP] | |
| chosen = rng.sample(available, min(body_count, len(available))) | |
| for err_type in chosen: | |
| injector = INJECTOR_MAP[err_type] | |
| broken_req, broken_hdrs, gt = injector(broken_req, broken_hdrs, spec, rng) | |
| chain.append(gt) | |
| return broken_req, broken_hdrs, chain | |
| def inject_multiple_errors( | |
| request: Dict[str, Any], | |
| headers: Dict[str, str], | |
| spec: Dict[str, Any], | |
| rng: random_module.Random, | |
| count: int = 2, | |
| ) -> Tuple[Dict[str, Any], Dict[str, str], List[GroundTruth]]: | |
| """Inject multiple errors sequentially. Returns list of ground truths.""" | |
| broken_req = copy.deepcopy(request) | |
| broken_hdrs = copy.deepcopy(headers) | |
| all_truths = [] | |
| chosen_types = rng.sample(ERROR_TYPES, min(count, len(ERROR_TYPES))) | |
| for err_type in chosen_types: | |
| injector = INJECTOR_MAP[err_type] | |
| broken_req, broken_hdrs, gt = injector(broken_req, broken_hdrs, spec, rng) | |
| all_truths.append(gt) | |
| return broken_req, broken_hdrs, all_truths | |