"""Auxiliary server entrypoint required by OpenEnv local validation checks.""" import os from flask import Flask, Response, jsonify, request from environment import EmailTriageEnv from tasks import get_task_scenario_count, list_task_ids FRONTEND_HTML = """ Inbox Helper Practice

Inbox Helper Practice

Practice deciding priority, category, and who should handle each email.

connecting...

Start a Scenario

Pick a difficulty, then click Start.

Ready. Start a scenario.

Your Decision

Choose priority, who should handle it, and a short reason.

Current Email

Details (Advanced)

Waiting for your first action...
""" app = Flask(__name__) current_env = EmailTriageEnv(task_id="task_easy") SCENARIO_COUNTERS = {task_id: 0 for task_id in list_task_ids()} DEFAULT_EVAL_SPLIT = os.getenv("OPENENV_EVAL_SPLIT", "public") ALLOW_CLIENT_EVAL_OVERRIDE = ( os.getenv("OPENENV_ALLOW_CLIENT_EVAL_OVERRIDE", "false").strip().lower() == "true" ) @app.get("/") def root_page(): """Render a lightweight frontend for interacting with the environment.""" return Response(FRONTEND_HTML, mimetype="text/html") @app.get("/meta") def root_endpoint(): """Return service metadata for health checks and machine clients.""" return jsonify( { "name": "email-triage-env", "status": "ok", "endpoints": { "reset": {"method": "POST", "path": "/reset"}, "step": {"method": "POST", "path": "/step"}, "state": {"method": "POST", "path": "/state"}, }, "scenario_pools": { "public": { task_id: get_task_scenario_count(task_id, "public") for task_id in list_task_ids() }, }, "eval_split": DEFAULT_EVAL_SPLIT, "production_runtime_controls": { "production_profile": ["light", "standard", "heavy"], "business_hours_mode": [True, False], "escalation_mode": ["low", "normal", "high"], }, } ) @app.post("/reset") def reset_endpoint(): """Reset the environment with a selected task and return ResetResult JSON.""" global current_env global SCENARIO_COUNTERS payload = request.get_json(silent=True) if payload is None: payload = {} elif not isinstance(payload, dict): return jsonify({"error": "Malformed JSON payload."}), 400 task_id = payload.get("task_id", "task_easy") if not isinstance(task_id, str): return jsonify({"error": "Field 'task_id' must be a string."}), 400 runtime_options: dict[str, object] = {} if task_id == "task_production": production_profile = payload.get("production_profile", "standard") if not isinstance(production_profile, str) or production_profile not in { "light", "standard", "heavy", }: return ( jsonify( { "error": ( "Field 'production_profile' must be one of " "light/standard/heavy." ) } ), 400, ) escalation_mode = payload.get("escalation_mode", "normal") if not isinstance(escalation_mode, str) or escalation_mode not in { "low", "normal", "high", }: return ( jsonify( { "error": ( "Field 'escalation_mode' must be one of " "low/normal/high." ) } ), 400, ) business_hours_mode = payload.get("business_hours_mode", False) if isinstance(business_hours_mode, str): business_hours_mode = business_hours_mode.strip().lower() in { "1", "true", "yes", "on", } elif not isinstance(business_hours_mode, bool): return jsonify({"error": "Field 'business_hours_mode' must be boolean."}), 400 runtime_options = { "production_profile": production_profile, "business_hours_mode": business_hours_mode, "escalation_mode": escalation_mode, } if not ALLOW_CLIENT_EVAL_OVERRIDE and ( "eval_split" in payload or "scenario_index" in payload ): return jsonify( { "error": ( "Client overrides for eval_split/scenario_index are disabled " "by server policy." ) } ), 400 eval_split = DEFAULT_EVAL_SPLIT if ALLOW_CLIENT_EVAL_OVERRIDE: requested_split = payload.get("eval_split", DEFAULT_EVAL_SPLIT) if not isinstance(requested_split, str): return jsonify({"error": "Field 'eval_split' must be a string."}), 400 eval_split = requested_split requested_index = payload.get("scenario_index") if ALLOW_CLIENT_EVAL_OVERRIDE else None if requested_index is not None and (not isinstance(requested_index, int) or requested_index < 0): return jsonify({"error": "Field 'scenario_index' must be a non-negative integer."}), 400 try: scenario_count = get_task_scenario_count(task_id, eval_split) if requested_index is None: scenario_index = SCENARIO_COUNTERS.get(task_id, 0) if scenario_count > 0: SCENARIO_COUNTERS[task_id] = (scenario_index + 1) % scenario_count else: scenario_index = requested_index current_env = EmailTriageEnv( task_id=task_id, scenario_index=scenario_index, split=eval_split, runtime_options=runtime_options, ) reset_result = current_env.reset() except KeyError as error: return jsonify({"error": str(error)}), 400 return jsonify(reset_result.model_dump()) @app.post("/step") def step_endpoint(): """Advance environment by one action and return StepResult JSON.""" payload = request.get_json(silent=True) if payload is None: return jsonify({"error": "Malformed JSON payload."}), 400 step_result = current_env.step(payload) return jsonify(step_result.model_dump()) @app.post("/state") def state_endpoint(): """Return read-only EnvironmentState JSON snapshot.""" state_result = current_env.state() return jsonify(state_result.model_dump()) def main() -> None: """Run the Flask app for local and script-based launches.""" app.run(host="0.0.0.0", port=7860) if __name__ == "__main__": main()