| import ast |
| import tempfile |
| import subprocess |
| import os |
| import uvicorn |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| from typing import Dict |
|
|
| from models import Action, Observation, Reward, State |
| from tasks import TASKS |
|
|
| app = FastAPI(title="Git Merge Conflict Specialist OpenEnv") |
|
|
| |
| current_state: State = None |
| last_grader_score: float = 0.0 |
|
|
| class ResetRequest(BaseModel): |
| task_id: str |
|
|
| def get_markers(files: Dict[str, str]) -> list: |
| return[f for f, code in files.items() if "<<<<<<< HEAD" in code or "=======" in code] |
|
|
| def evaluate_submission(task_id: str, files: Dict[str, str]) -> Reward: |
| """Deterministic Grader evaluating partial progress (0.0 -> 1.0)""" |
| |
| if get_markers(files): |
| return Reward(score=0.0, feedback="FAIL: Git conflict markers still present.") |
|
|
| |
| for fname, code in files.items(): |
| try: |
| ast.parse(code) |
| except SyntaxError as e: |
| return Reward(score=0.2, feedback=f"FAIL: Syntax Error in {fname} at line {e.lineno}: {e.msg}") |
|
|
| |
| task = TASKS[task_id] |
| with tempfile.TemporaryDirectory() as tmpdir: |
| for fname, content in files.items(): |
| with open(os.path.join(tmpdir, fname), "w") as f: |
| f.write(content) |
| |
| test_path = os.path.join(tmpdir, "test_hidden.py") |
| with open(test_path, "w") as f: |
| f.write(task["hidden_test"]) |
| |
| try: |
| result = subprocess.run(["python", "test_hidden.py"], |
| cwd=tmpdir, capture_output=True, text=True, timeout=5 |
| ) |
| if result.returncode == 0: |
| return Reward(score=1.0, feedback="SUCCESS: All logic tests passed!") |
| else: |
| return Reward(score=0.5, feedback=f"FAIL: Logic test failed:\n{result.stderr or result.stdout}") |
| except subprocess.TimeoutExpired: |
| return Reward(score=0.4, feedback="FAIL: Test execution timed out (Infinite Loop).") |
|
|
| @app.post("/reset", response_model=Observation) |
| def reset_env(req: ResetRequest): |
| global current_state, last_grader_score |
| if req.task_id not in TASKS: |
| raise HTTPException(status_code=404, detail="Task not found") |
| |
| task = TASKS[req.task_id] |
| current_state = State( |
| task_id=req.task_id, |
| files=task["starting_files"].copy(), |
| step_count=0, |
| is_done=False |
| ) |
| last_grader_score = 0.0 |
| |
| return Observation( |
| current_status=f"Started {req.task_id}. Resolve the conflicts.", |
| tool_output="Environment initialized.", |
| files_with_markers=get_markers(current_state.files), |
| available_files=list(current_state.files.keys()) |
| ) |
|
|
| @app.post("/step") |
| def step_env(action: Action): |
| global current_state, last_grader_score |
| if not current_state or current_state.is_done: |
| raise HTTPException(status_code=400, detail="Environment not initialized or already done. Call /reset.") |
|
|
| current_state.step_count += 1 |
| tool_out = "" |
| reward_obj = Reward(score=0.0, feedback="In progress") |
| done = False |
|
|
| if action.tool == "READ_FILE": |
| if action.filepath in current_state.files: |
| tool_out = current_state.files[action.filepath] |
| else: |
| tool_out = f"Error: File {action.filepath} not found." |
| |
| elif action.tool == "WRITE_FILE": |
| if action.filepath and action.content is not None: |
| current_state.files[action.filepath] = action.content |
| tool_out = f"Successfully wrote to {action.filepath}." |
| else: |
| tool_out = "Error: Must provide filepath and content." |
| |
| elif action.tool == "RUN_LINTER": |
| lint_errs =[] |
| for fname, code in current_state.files.items(): |
| try: |
| ast.parse(code) |
| except SyntaxError as e: |
| lint_errs.append(f"{fname}: SyntaxError line {e.lineno}") |
| tool_out = "\n".join(lint_errs) if lint_errs else "Linter Passed: No syntax errors." |
| |
| elif action.tool == "RUN_TESTS": |
| eval_res = evaluate_submission(current_state.task_id, current_state.files) |
| tool_out = eval_res.feedback |
| |
| elif action.tool == "SUBMIT": |
| eval_res = evaluate_submission(current_state.task_id, current_state.files) |
| reward_obj = eval_res |
| last_grader_score = eval_res.score |
| tool_out = f"Submission final. Score: {eval_res.score}. Feedback: {eval_res.feedback}" |
| done = True |
| current_state.is_done = True |
|
|
| obs = Observation( |
| current_status=f"Step {current_state.step_count} completed.", |
| tool_output=tool_out, |
| files_with_markers=get_markers(current_state.files), |
| available_files=list(current_state.files.keys()) |
| ) |
| |
| return { |
| "observation": obs.dict(), |
| "reward": reward_obj.score, |
| "done": done, |
| "info": {"feedback": reward_obj.feedback} |
| } |
|
|
| @app.get("/state", response_model=State) |
| def get_state(): |
| if not current_state: |
| raise HTTPException(status_code=400, detail="Not initialized") |
| return current_state |
|
|
| @app.get("/tasks") |
| def get_tasks(): |
| return { |
| "tasks": [{"id": k, "description": v["difficulty"]} for k, v in TASKS.items()], |
| "action_schema": Action.schema() |
| } |
|
|
| @app.get("/grader") |
| def get_grader(): |
| return {"score": last_grader_score} |
|
|
| @app.post("/baseline") |
| def run_baseline(): |
| """Trigger the local baseline.py script securely and return scores.""" |
| try: |
| result = subprocess.run(["python", "baseline.py"], capture_output=True, text=True) |
| return {"output": result.stdout, "errors": result.stderr} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |