import ast import tempfile import subprocess import os import uvicorn from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict from models import Action, Observation, Reward, State from tasks import TASKS app = FastAPI(title="Git Merge Conflict Specialist OpenEnv") # Global State for the hackathon (Single-tenant execution) current_state: State = None last_grader_score: float = 0.0 class ResetRequest(BaseModel): task_id: str def get_markers(files: Dict[str, str]) -> list: return[f for f, code in files.items() if "<<<<<<< HEAD" in code or "=======" in code] def evaluate_submission(task_id: str, files: Dict[str, str]) -> Reward: """Deterministic Grader evaluating partial progress (0.0 -> 1.0)""" # 1. Check for leftover Git markers if get_markers(files): return Reward(score=0.0, feedback="FAIL: Git conflict markers still present.") # 2. Check for Syntax Errors (AST Parser) for fname, code in files.items(): try: ast.parse(code) except SyntaxError as e: return Reward(score=0.2, feedback=f"FAIL: Syntax Error in {fname} at line {e.lineno}: {e.msg}") # 3. Run Hidden Tests in a safe temp directory task = TASKS[task_id] with tempfile.TemporaryDirectory() as tmpdir: for fname, content in files.items(): with open(os.path.join(tmpdir, fname), "w") as f: f.write(content) test_path = os.path.join(tmpdir, "test_hidden.py") with open(test_path, "w") as f: f.write(task["hidden_test"]) try: result = subprocess.run(["python", "test_hidden.py"], cwd=tmpdir, capture_output=True, text=True, timeout=5 ) if result.returncode == 0: return Reward(score=1.0, feedback="SUCCESS: All logic tests passed!") else: return Reward(score=0.5, feedback=f"FAIL: Logic test failed:\n{result.stderr or result.stdout}") except subprocess.TimeoutExpired: return Reward(score=0.4, feedback="FAIL: Test execution timed out (Infinite Loop).") @app.post("/reset", response_model=Observation) def reset_env(req: ResetRequest): global current_state, last_grader_score if req.task_id not in TASKS: raise HTTPException(status_code=404, detail="Task not found") task = TASKS[req.task_id] current_state = State( task_id=req.task_id, files=task["starting_files"].copy(), step_count=0, is_done=False ) last_grader_score = 0.0 return Observation( current_status=f"Started {req.task_id}. Resolve the conflicts.", tool_output="Environment initialized.", files_with_markers=get_markers(current_state.files), available_files=list(current_state.files.keys()) ) @app.post("/step") def step_env(action: Action): global current_state, last_grader_score if not current_state or current_state.is_done: raise HTTPException(status_code=400, detail="Environment not initialized or already done. Call /reset.") current_state.step_count += 1 tool_out = "" reward_obj = Reward(score=0.0, feedback="In progress") done = False if action.tool == "READ_FILE": if action.filepath in current_state.files: tool_out = current_state.files[action.filepath] else: tool_out = f"Error: File {action.filepath} not found." elif action.tool == "WRITE_FILE": if action.filepath and action.content is not None: current_state.files[action.filepath] = action.content tool_out = f"Successfully wrote to {action.filepath}." else: tool_out = "Error: Must provide filepath and content." elif action.tool == "RUN_LINTER": lint_errs =[] for fname, code in current_state.files.items(): try: ast.parse(code) except SyntaxError as e: lint_errs.append(f"{fname}: SyntaxError line {e.lineno}") tool_out = "\n".join(lint_errs) if lint_errs else "Linter Passed: No syntax errors." elif action.tool == "RUN_TESTS": eval_res = evaluate_submission(current_state.task_id, current_state.files) tool_out = eval_res.feedback elif action.tool == "SUBMIT": eval_res = evaluate_submission(current_state.task_id, current_state.files) reward_obj = eval_res last_grader_score = eval_res.score tool_out = f"Submission final. Score: {eval_res.score}. Feedback: {eval_res.feedback}" done = True current_state.is_done = True obs = Observation( current_status=f"Step {current_state.step_count} completed.", tool_output=tool_out, files_with_markers=get_markers(current_state.files), available_files=list(current_state.files.keys()) ) return { "observation": obs.dict(), "reward": reward_obj.score, "done": done, "info": {"feedback": reward_obj.feedback} } @app.get("/state", response_model=State) def get_state(): if not current_state: raise HTTPException(status_code=400, detail="Not initialized") return current_state @app.get("/tasks") def get_tasks(): return { "tasks": [{"id": k, "description": v["difficulty"]} for k, v in TASKS.items()], "action_schema": Action.schema() } @app.get("/grader") def get_grader(): return {"score": last_grader_score} @app.post("/baseline") def run_baseline(): """Trigger the local baseline.py script securely and return scores.""" try: result = subprocess.run(["python", "baseline.py"], capture_output=True, text=True) return {"output": result.stdout, "errors": result.stderr} except Exception as e: return {"error": str(e)} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)