| import os |
| import time |
| import subprocess |
| import io |
| from typing import Literal, Tuple, List |
| from PIL import Image |
| from loguru import logger |
| from .computer import Computer, EnvState |
|
|
| class X11Computer(Computer): |
| """X11 Desktop implementation of the Computer interface""" |
| |
| def __init__(self, display: str = ":1"): |
| self.display = display |
| self._screen_size = self._get_screen_size() |
| |
| def _run_cmd(self, cmd: List[str], check: bool = True) -> subprocess.CompletedProcess: |
| """Run a command with the correct DISPLAY environment variable""" |
| env = {**os.environ, "DISPLAY": self.display} |
| return subprocess.run(cmd, env=env, check=check, capture_output=True, text=True) |
|
|
| def _get_screen_size(self) -> Tuple[int, int]: |
| try: |
| |
| res = self._run_cmd(["xdotool", "getdisplaygeometry"]) |
| w, h = map(int, res.stdout.strip().split()) |
| return w, h |
| except Exception as e: |
| logger.error(f"Failed to get screen size: {e}") |
| return 1920, 1080 |
|
|
| def screen_size(self) -> Tuple[int, int]: |
| return self._screen_size |
|
|
| def current_state(self) -> EnvState: |
| """Capture screenshot and active window title""" |
| try: |
| |
| screenshot_path = "/tmp/screenshot_state.png" |
| self._run_cmd(["scrot", "-o", screenshot_path]) |
| |
| with open(screenshot_path, "rb") as f: |
| screenshot_bytes = f.read() |
| |
| |
| try: |
| res = self._run_cmd(["xdotool", "getactivewindow", "getwindowname"]) |
| window_title = res.stdout.strip() |
| except subprocess.CalledProcessError: |
| window_title = "Desktop" |
| |
| return EnvState(screenshot=screenshot_bytes, url=window_title) |
| |
| except Exception as e: |
| logger.error(f"Failed to capture state: {e}") |
| |
| return EnvState(screenshot=b"", url="Error") |
|
|
| def open_web_browser(self) -> EnvState: |
| """Launch Firefox""" |
| subprocess.Popen(["firefox"], env={**os.environ, "DISPLAY": self.display}, |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| time.sleep(3) |
| return self.current_state() |
|
|
| def click_at(self, x: int, y: int) -> EnvState: |
| self._run_cmd(["xdotool", "mousemove", str(x), str(y), "click", "1"]) |
| time.sleep(0.5) |
| return self.current_state() |
|
|
| def hover_at(self, x: int, y: int) -> EnvState: |
| self._run_cmd(["xdotool", "mousemove", str(x), str(y)]) |
| time.sleep(0.5) |
| return self.current_state() |
|
|
| def type_text_at( |
| self, |
| x: int, |
| y: int, |
| text: str, |
| press_enter: bool, |
| clear_before_typing: bool, |
| ) -> EnvState: |
| |
| self.click_at(x, y) |
| |
| if clear_before_typing: |
| |
| self._run_cmd(["xdotool", "key", "ctrl+a", "Delete"]) |
| time.sleep(0.2) |
| |
| self._run_cmd(["xdotool", "type", "--", text]) |
| |
| if press_enter: |
| self._run_cmd(["xdotool", "key", "Return"]) |
| |
| time.sleep(0.5) |
| return self.current_state() |
|
|
| def scroll_document( |
| self, direction: Literal["up", "down", "left", "right"] |
| ) -> EnvState: |
| if direction == "up": |
| self._run_cmd(["xdotool", "click", "4"]) |
| elif direction == "down": |
| self._run_cmd(["xdotool", "click", "5"]) |
| |
| return self.current_state() |
|
|
| def scroll_at( |
| self, |
| x: int, |
| y: int, |
| direction: Literal["up", "down", "left", "right"], |
| magnitude: int, |
| ) -> EnvState: |
| |
| self._run_cmd(["xdotool", "mousemove", str(x), str(y)]) |
| |
| |
| clicks = max(1, magnitude // 100) |
| |
| button = "4" if direction == "up" else "5" |
| if direction in ["left", "right"]: |
| |
| pass |
| else: |
| for _ in range(clicks): |
| self._run_cmd(["xdotool", "click", button]) |
| time.sleep(0.1) |
| |
| return self.current_state() |
|
|
| def wait_5_seconds(self) -> EnvState: |
| time.sleep(5) |
| return self.current_state() |
|
|
| def go_back(self) -> EnvState: |
| |
| self._run_cmd(["xdotool", "key", "alt+Left"]) |
| return self.current_state() |
|
|
| def go_forward(self) -> EnvState: |
| |
| self._run_cmd(["xdotool", "key", "alt+Right"]) |
| return self.current_state() |
|
|
| def search(self) -> EnvState: |
| |
| self.open_web_browser() |
| time.sleep(1) |
| self._run_cmd(["xdotool", "key", "ctrl+l"]) |
| return self.current_state() |
|
|
| def navigate(self, url: str) -> EnvState: |
| |
| subprocess.Popen(["firefox", url], env={**os.environ, "DISPLAY": self.display}, |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| time.sleep(3) |
| return self.current_state() |
|
|
| def key_combination(self, keys: List[str]) -> EnvState: |
| |
| |
| key_str = "+".join(keys) |
| self._run_cmd(["xdotool", "key", key_str]) |
| return self.current_state() |
|
|
| def drag_and_drop( |
| self, x: int, y: int, destination_x: int, destination_y: int |
| ) -> EnvState: |
| self._run_cmd(["xdotool", "mousemove", str(x), str(y)]) |
| self._run_cmd(["xdotool", "mousedown", "1"]) |
| time.sleep(0.2) |
| self._run_cmd(["xdotool", "mousemove", str(destination_x), str(destination_y)]) |
| time.sleep(0.2) |
| self._run_cmd(["xdotool", "mouseup", "1"]) |
| return self.current_state() |
|
|