siddeshwar-kagatikar
Initial clean commit without secrets
d51679d
import ast
import tempfile
import subprocess
import os
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict
from models import Action, Observation, Reward, State
from tasks import TASKS
app = FastAPI(title="Git Merge Conflict Specialist OpenEnv")
# Global State for the hackathon (Single-tenant execution)
current_state: State = None
last_grader_score: float = 0.0
class ResetRequest(BaseModel):
task_id: str
def get_markers(files: Dict[str, str]) -> list:
return[f for f, code in files.items() if "<<<<<<< HEAD" in code or "=======" in code]
def evaluate_submission(task_id: str, files: Dict[str, str]) -> Reward:
"""Deterministic Grader evaluating partial progress (0.0 -> 1.0)"""
# 1. Check for leftover Git markers
if get_markers(files):
return Reward(score=0.0, feedback="FAIL: Git conflict markers still present.")
# 2. Check for Syntax Errors (AST Parser)
for fname, code in files.items():
try:
ast.parse(code)
except SyntaxError as e:
return Reward(score=0.2, feedback=f"FAIL: Syntax Error in {fname} at line {e.lineno}: {e.msg}")
# 3. Run Hidden Tests in a safe temp directory
task = TASKS[task_id]
with tempfile.TemporaryDirectory() as tmpdir:
for fname, content in files.items():
with open(os.path.join(tmpdir, fname), "w") as f:
f.write(content)
test_path = os.path.join(tmpdir, "test_hidden.py")
with open(test_path, "w") as f:
f.write(task["hidden_test"])
try:
result = subprocess.run(["python", "test_hidden.py"],
cwd=tmpdir, capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
return Reward(score=1.0, feedback="SUCCESS: All logic tests passed!")
else:
return Reward(score=0.5, feedback=f"FAIL: Logic test failed:\n{result.stderr or result.stdout}")
except subprocess.TimeoutExpired:
return Reward(score=0.4, feedback="FAIL: Test execution timed out (Infinite Loop).")
@app.post("/reset", response_model=Observation)
def reset_env(req: ResetRequest):
global current_state, last_grader_score
if req.task_id not in TASKS:
raise HTTPException(status_code=404, detail="Task not found")
task = TASKS[req.task_id]
current_state = State(
task_id=req.task_id,
files=task["starting_files"].copy(),
step_count=0,
is_done=False
)
last_grader_score = 0.0
return Observation(
current_status=f"Started {req.task_id}. Resolve the conflicts.",
tool_output="Environment initialized.",
files_with_markers=get_markers(current_state.files),
available_files=list(current_state.files.keys())
)
@app.post("/step")
def step_env(action: Action):
global current_state, last_grader_score
if not current_state or current_state.is_done:
raise HTTPException(status_code=400, detail="Environment not initialized or already done. Call /reset.")
current_state.step_count += 1
tool_out = ""
reward_obj = Reward(score=0.0, feedback="In progress")
done = False
if action.tool == "READ_FILE":
if action.filepath in current_state.files:
tool_out = current_state.files[action.filepath]
else:
tool_out = f"Error: File {action.filepath} not found."
elif action.tool == "WRITE_FILE":
if action.filepath and action.content is not None:
current_state.files[action.filepath] = action.content
tool_out = f"Successfully wrote to {action.filepath}."
else:
tool_out = "Error: Must provide filepath and content."
elif action.tool == "RUN_LINTER":
lint_errs =[]
for fname, code in current_state.files.items():
try:
ast.parse(code)
except SyntaxError as e:
lint_errs.append(f"{fname}: SyntaxError line {e.lineno}")
tool_out = "\n".join(lint_errs) if lint_errs else "Linter Passed: No syntax errors."
elif action.tool == "RUN_TESTS":
eval_res = evaluate_submission(current_state.task_id, current_state.files)
tool_out = eval_res.feedback
elif action.tool == "SUBMIT":
eval_res = evaluate_submission(current_state.task_id, current_state.files)
reward_obj = eval_res
last_grader_score = eval_res.score
tool_out = f"Submission final. Score: {eval_res.score}. Feedback: {eval_res.feedback}"
done = True
current_state.is_done = True
obs = Observation(
current_status=f"Step {current_state.step_count} completed.",
tool_output=tool_out,
files_with_markers=get_markers(current_state.files),
available_files=list(current_state.files.keys())
)
return {
"observation": obs.dict(),
"reward": reward_obj.score,
"done": done,
"info": {"feedback": reward_obj.feedback}
}
@app.get("/state", response_model=State)
def get_state():
if not current_state:
raise HTTPException(status_code=400, detail="Not initialized")
return current_state
@app.get("/tasks")
def get_tasks():
return {
"tasks": [{"id": k, "description": v["difficulty"]} for k, v in TASKS.items()],
"action_schema": Action.schema()
}
@app.get("/grader")
def get_grader():
return {"score": last_grader_score}
@app.post("/baseline")
def run_baseline():
"""Trigger the local baseline.py script securely and return scores."""
try:
result = subprocess.run(["python", "baseline.py"], capture_output=True, text=True)
return {"output": result.stdout, "errors": result.stderr}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)