from __future__ import annotations import json from pathlib import Path from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import EnvironmentMetadata from cadquery_env import APP_ROOT, evaluate_code, preprocess_reference, read_task_spec from .openenv_models import CadForgeAction, CadForgeObservation, CadForgeState DEFAULT_TASK_ID = "four_leg_chair_700n" CADQUERY_CONSTRAINTS = [ "Return a complete Python file, not markdown.", "Use import cadquery as cq; the runner also provides cq, exporters, math, Assembly, and Color.", "Assign the final exportable object to fixture, result, model, solid, body, part, or call show_object(obj).", "Do not use file IO, networking, subprocesses, eval, exec, unsupported imports, or CQ-editor-only helpers.", "Prefer named dimensions, helper functions, connected parts, and simple robust CadQuery operations.", ] def seed_code_for(task_id: str) -> str: if task_id == "four_leg_chair_700n": return "\n".join( [ "import cadquery as cq", "", "seat_width = 520", "seat_depth = 470", "seat_thickness = 55", "leg_height = 420", "", "seat = cq.Workplane('XY').box(seat_width, seat_depth, seat_thickness).translate((0, 0, leg_height))", "# Weak seed: only one central support and a floating backrest; repair into a real chair.", "support = cq.Workplane('XY').cylinder(leg_height, 35).translate((0, 0, leg_height / 2))", "backrest = cq.Workplane('XY').box(440, 45, 520).translate((0, -330, 700))", "fixture = seat.union(support).union(backrest).clean()", ] ) return "\n".join( [ "import cadquery as cq", "", "# Generic weak seed: a buildable blocky placeholder that needs task-specific repair.", "base = cq.Workplane('XY').box(90, 50, 14).translate((0, 0, 7))", "boss = cq.Workplane('XY').cylinder(40, 18).translate((35, 0, 34))", "fixture = base.union(boss).clean()", ] ) class CadForgeCadQueryEnvironment(Environment[CadForgeAction, CadForgeObservation, CadForgeState]): """OpenEnv wrapper around the CADForge CadQuery compile/render/reward loop.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: self._state = CadForgeState(episode_id=str(uuid4()), step_count=0) self._done = False def reset( self, seed: int | None = None, episode_id: str | None = None, task_id: str | None = None, task_brief: str | None = None, **_: object, ) -> CadForgeObservation: selected_task_id = task_id or DEFAULT_TASK_ID task_spec = self._task_spec(selected_task_id) prompt = task_brief or task_spec.get("prompt", "") self._state = CadForgeState( episode_id=episode_id or str(uuid4()), step_count=0, task_id=selected_task_id, task_brief=prompt, current_code=seed_code_for(selected_task_id), ) self._done = False return self._observation( reward=0.0, notes=["Ready. Submit complete CadQuery code as the action.code field."], ) def step(self, action: CadForgeAction, timeout_s: float | None = None, **_: object) -> CadForgeObservation: # type: ignore[override] task_id = action.task_id or self._state.task_id or DEFAULT_TASK_ID task_spec = self._task_spec(task_id) task_prompt = action.task_prompt or task_spec.get("prompt", self._state.task_brief) self._state.task_id = task_id self._state.task_brief = task_prompt self._state.step_count += 1 reference_root = self._reference_root(task_id) result = evaluate_code( action.code, self._state.episode_id or str(uuid4()), f"step-{self._state.step_count}", task_prompt, reference_root=reference_root, reward_mode=action.reward_mode, task_spec=task_spec, ) reward_json = result.get("reward", {}) reward = float(reward_json.get("total", -1.0)) self._state.current_code = action.code self._state.last_reward = reward_json self._state.trace.append( { "step": self._state.step_count, "task_id": task_id, "reward_mode": action.reward_mode, "reward": reward_json, "ok": result.get("ok", False), "notes": result.get("notes", []), "artifacts_dir": result.get("artifacts_dir"), } ) self._done = reward >= 0.92 or self._state.step_count >= 12 return self._observation( reward=reward, notes=result.get("notes", []), reward_json=reward_json, artifacts_dir=result.get("artifacts_dir"), renders=result.get("renders", {}), done=self._done, ) @property def state(self) -> CadForgeState: return self._state def get_metadata(self) -> EnvironmentMetadata: return EnvironmentMetadata( name="cadforge_cadquery", description=( "Agentic CadQuery repair environment. Agents submit complete Python CadQuery " "files, the environment executes them in a constrained runner, exports STL, " "scores build/topology/contact/task semantics/reference similarity/editability, " "and returns dense reward for SFT/GRPO loops." ), version="0.1.0", author="Sanjayprasad H S", ) def _task_spec(self, task_id: str) -> dict: try: return read_task_spec(task_id) or {"id": task_id, "prompt": ""} except FileNotFoundError: return {"id": task_id, "prompt": self._state.task_brief} def _reference_root(self, task_id: str) -> Path: packaged_ref_root = APP_ROOT / "data" / "reference-metrics" / task_id if (packaged_ref_root / "reference_summary.json").exists(): return packaged_ref_root ref_root = APP_ROOT / "runs" / "cadquery-task-references" / task_id if (ref_root / "reference_summary.json").exists(): return ref_root glb_path = APP_ROOT / "data" / "generated-assets" / task_id / "reference.glb" if glb_path.exists(): preprocess_reference(glb_path, ideal_code_path=None, out_root=ref_root) return ref_root def _observation( self, reward: float, notes: list[str], reward_json: dict | None = None, artifacts_dir: str | None = None, renders: dict | None = None, done: bool | None = None, ) -> CadForgeObservation: return CadForgeObservation( task_brief=self._state.task_brief, task_id=self._state.task_id, constraints=CADQUERY_CONSTRAINTS, current_code=self._state.current_code, reward_json=reward_json or self._state.last_reward, verifier_notes=notes, artifacts_dir=artifacts_dir, renders=renders or {}, trace=self._state.trace, done=self._done if done is None else done, reward=reward, )