Spaces:
Build error
Build error
| # Copyright 2025 Google LLC | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import logging | |
| import termcolor | |
| import time | |
| import os | |
| import sys | |
| from ..computer import ( | |
| Computer, | |
| EnvState, | |
| ) | |
| import playwright.sync_api | |
| from playwright.sync_api import sync_playwright | |
| from typing import Literal | |
| import glob | |
| # Define a mapping from the user-friendly key names to Playwright's expected key names. | |
| # Playwright is generally good with case-insensitivity for these, but it's best to be canonical. | |
| # See: https://playwright.dev/docs/api/class-keyboard#keyboard-press | |
| # Keys like 'a', 'b', '1', '$' are passed directly. | |
| PLAYWRIGHT_KEY_MAP = { | |
| "backspace": "Backspace", | |
| "tab": "Tab", | |
| "return": "Enter", # Playwright uses 'Enter' | |
| "enter": "Enter", | |
| "shift": "Shift", | |
| "control": "ControlOrMeta", | |
| "alt": "Alt", | |
| "escape": "Escape", | |
| "space": "Space", # Can also just be " " | |
| "pageup": "PageUp", | |
| "pagedown": "PageDown", | |
| "end": "End", | |
| "home": "Home", | |
| "left": "ArrowLeft", | |
| "up": "ArrowUp", | |
| "right": "ArrowRight", | |
| "down": "ArrowDown", | |
| "insert": "Insert", | |
| "delete": "Delete", | |
| "semicolon": ";", # For actual character ';' | |
| "equals": "=", # For actual character '=' | |
| "multiply": "Multiply", # NumpadMultiply | |
| "add": "Add", # NumpadAdd | |
| "separator": "Separator", # Numpad specific | |
| "subtract": "Subtract", # NumpadSubtract, or just '-' for character | |
| "decimal": "Decimal", # NumpadDecimal, or just '.' for character | |
| "divide": "Divide", # NumpadDivide, or just '/' for character | |
| "f1": "F1", | |
| "f2": "F2", | |
| "f3": "F3", | |
| "f4": "F4", | |
| "f5": "F5", | |
| "f6": "F6", | |
| "f7": "F7", | |
| "f8": "F8", | |
| "f9": "F9", | |
| "f10": "F10", | |
| "f11": "F11", | |
| "f12": "F12", | |
| "command": "Meta", # 'Meta' is Command on macOS, Windows key on Windows | |
| } | |
| class PlaywrightComputer(Computer): | |
| """Connects to a local Playwright instance.""" | |
| def __init__( | |
| self, | |
| screen_size: tuple[int, int], | |
| initial_url: str = "https://www.google.com", | |
| search_engine_url: str = "https://www.google.com", | |
| highlight_mouse: bool = False, | |
| ): | |
| self._initial_url = initial_url | |
| self._screen_size = screen_size | |
| self._search_engine_url = search_engine_url | |
| self._highlight_mouse = highlight_mouse | |
| def _handle_new_page(self, new_page: playwright.sync_api.Page): | |
| """The Computer Use model only supports a single tab at the moment. | |
| Some websites, however, try to open links in a new tab. | |
| For those situations, we intercept the page-opening behavior, and instead overwrite the current page. | |
| """ | |
| new_url = new_page.url | |
| new_page.close() | |
| self._page.goto(new_url) | |
| # del | |
| def cleanup_old_screenshots(self, logs_dir="logs", max_count=30): | |
| """ | |
| 删除旧的截图文件,只保留最新的 max_count 个 | |
| """ | |
| # 确保日志目录存在 | |
| if not os.path.exists(logs_dir): | |
| return | |
| # 获取所有的png文件 | |
| pattern = os.path.join(logs_dir, "screenshot_*.png") | |
| png_files = glob.glob(pattern) | |
| # 如果文件数量超过最大限制,删除旧的 | |
| if len(png_files) > max_count: | |
| # 按修改时间排序,最新的在前面 | |
| png_files.sort(key=os.path.getmtime, reverse=True) | |
| # 要删除的文件(保留最新的 max_count 个) | |
| files_to_delete = png_files[max_count:] | |
| # 删除旧文件 | |
| for file_path in files_to_delete: | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| raise ValueError(f"Failed to delete screenshots {file_path}: {e}") | |
| def __enter__(self): | |
| print("Creating session...") | |
| self._playwright = sync_playwright().start() | |
| self._browser = self._playwright.chromium.launch( | |
| args=[ | |
| "--disable-extensions", | |
| "--disable-file-system", | |
| "--disable-plugins", | |
| "--disable-dev-shm-usage", | |
| "--disable-background-networking", | |
| "--disable-default-apps", | |
| "--disable-sync", | |
| # No '--no-sandbox' arg means the sandbox is on. | |
| ], | |
| headless=bool(os.environ.get("PLAYWRIGHT_HEADLESS", False)), | |
| ) | |
| self.cleanup_old_screenshots() | |
| os.makedirs("cache/user_data", exist_ok=True) # 定义用户数据存储路径 | |
| storage_state = "cache/user_data/state.json" if os.path.exists("cache/user_data/state.json") else None | |
| self._context = self._browser.new_context( | |
| viewport={ | |
| "width": self._screen_size[0], | |
| "height": self._screen_size[1], | |
| }, | |
| storage_state=storage_state | |
| ) | |
| # user_data_dir="cache/user_data", # 保存用户信息,避免重复登录 | |
| self._page = self._context.new_page() | |
| self._page.goto(self._initial_url, wait_until="domcontentloaded") | |
| # # 放大110%字体大小,避免看不清 | |
| # self._page.evaluate(""" | |
| # (() => { | |
| # const style = document.createElement('style'); | |
| # style.innerHTML = ` | |
| # * { | |
| # font-size: 100% !important; | |
| # } | |
| # `; | |
| # document.head.appendChild(style); | |
| # })(); | |
| # """) | |
| # 作为监听器,当self._context打开一个新tab时触发。拦截打开新的tab, 但记录打开的tab需要去到的url, 在当前页面中导航去到新url | |
| # 这个CUM在一个时刻只支持一个tab | |
| self._context.on("page", self._handle_new_page) | |
| # 关于左侧历史消息导航的侧边拦,对齐prompt中对genflow UI的描述,如果不是导航genflow, 可以跳过 | |
| try: | |
| chat_nav = self._page.locator('div[data-v-74198486].chat-nav') | |
| is_expand = None | |
| if chat_nav.count() > 0: | |
| class_name = chat_nav.get_attribute("class") | |
| if "show" in class_name: | |
| is_expand = True | |
| else: | |
| is_expand = False | |
| print(class_name) | |
| else: | |
| print("未找到左侧历史消息扩展栏元素") | |
| if is_expand: | |
| expand_icon = self._page.locator('.expand-icon') | |
| if expand_icon.count() > 0: | |
| expand_icon.click() | |
| is_expand = False | |
| else: | |
| print("未找到左侧历史消息扩展栏点击元素") | |
| except Exception as e: | |
| raise ValueError("未找到左侧历史消息扩展栏元素") | |
| termcolor.cprint( | |
| f"Started local playwright.", | |
| color="green", | |
| attrs=["bold"], | |
| ) | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self._context: | |
| self._context.storage_state(path="cache/user_data/state.json") # 这里在异常结束的时候是保存不了的 | |
| self._context.close() | |
| try: | |
| self._browser.close() | |
| except Exception as e: | |
| # Browser was already shut down because of SIGINT or such. | |
| if "Browser.close: Connection closed while reading from the driver" in str( | |
| e | |
| ): | |
| pass | |
| else: | |
| raise | |
| self._playwright.stop() | |
| def open_web_browser(self) -> EnvState: | |
| return self.current_state() | |
| def click_at(self, x: int, y: int): | |
| self.highlight_mouse(x, y) | |
| self._page.mouse.click(x, y) | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def hover_at(self, x: int, y: int): | |
| self.highlight_mouse(x, y) | |
| self._page.mouse.move(x, y) | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def type_text_at( | |
| self, | |
| x: int, | |
| y: int, | |
| text: str, | |
| press_enter: bool = False, | |
| clear_before_typing: bool = True, | |
| ) -> EnvState: | |
| self.highlight_mouse(x, y) | |
| self._page.mouse.click(x, y) | |
| self._page.wait_for_load_state() | |
| if clear_before_typing: | |
| if sys.platform == "darwin": | |
| self.key_combination(["Command", "A"]) | |
| else: | |
| self.key_combination(["Control", "A"]) | |
| self.key_combination(["Delete"]) | |
| self._page.keyboard.type(text) | |
| self._page.wait_for_load_state() | |
| if press_enter: | |
| self.key_combination(["Enter"]) | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def _horizontal_document_scroll( | |
| self, direction: Literal["left", "right"] | |
| ) -> EnvState: | |
| # Scroll by 50% of the viewport size. | |
| horizontal_scroll_amount = self.screen_size()[0] // 2 | |
| if direction == "left": | |
| sign = "-" | |
| else: | |
| sign = "" | |
| scroll_argument = f"{sign}{horizontal_scroll_amount}" | |
| # Scroll using JS. | |
| self._page.evaluate(f"window.scrollBy({scroll_argument}, 0); ") | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def scroll_document( | |
| self, direction: Literal["up", "down", "left", "right"] | |
| ) -> EnvState: | |
| if direction == "down": | |
| return self.key_combination(["PageDown"]) | |
| elif direction == "up": | |
| return self.key_combination(["PageUp"]) | |
| elif direction in ("left", "right"): | |
| return self._horizontal_document_scroll(direction) | |
| else: | |
| raise ValueError("Unsupported direction: ", direction) | |
| def scroll_at( | |
| self, | |
| x: int, | |
| y: int, | |
| direction: Literal["up", "down", "left", "right"], | |
| magnitude: int = 800, | |
| ) -> EnvState: | |
| self.highlight_mouse(x, y) | |
| self._page.mouse.move(x, y) | |
| self._page.wait_for_load_state() | |
| dx = 0 | |
| dy = 0 | |
| if direction == "up": | |
| dy = -magnitude | |
| elif direction == "down": | |
| dy = magnitude | |
| elif direction == "left": | |
| dx = -magnitude | |
| elif direction == "right": | |
| dx = magnitude | |
| else: | |
| raise ValueError("Unsupported direction: ", direction) | |
| self._page.mouse.wheel(dx, dy) | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def wait_5_seconds(self) -> EnvState: | |
| # del | |
| time.sleep(20) | |
| return self.current_state() | |
| def go_back(self) -> EnvState: | |
| self._page.go_back() | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def go_forward(self) -> EnvState: | |
| self._page.go_forward() | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def search(self) -> EnvState: | |
| return self.navigate(self._search_engine_url) | |
| def navigate(self, url: str) -> EnvState: | |
| normalized_url = url | |
| if not normalized_url.startswith(("http://", "https://")): | |
| normalized_url = "https://" + normalized_url | |
| self._page.goto(normalized_url) | |
| self._page.wait_for_load_state() | |
| return self.current_state() | |
| def key_combination(self, keys: list[str]) -> EnvState: | |
| # Normalize all keys to the Playwright compatible version. | |
| keys = [PLAYWRIGHT_KEY_MAP.get(k.lower(), k) for k in keys] | |
| for key in keys[:-1]: | |
| self._page.keyboard.down(key) | |
| self._page.keyboard.press(keys[-1]) | |
| for key in reversed(keys[:-1]): | |
| self._page.keyboard.up(key) | |
| return self.current_state() | |
| def drag_and_drop( | |
| self, x: int, y: int, destination_x: int, destination_y: int | |
| ) -> EnvState: | |
| self.highlight_mouse(x, y) | |
| self._page.mouse.move(x, y) | |
| self._page.wait_for_load_state() | |
| self._page.mouse.down() | |
| self._page.wait_for_load_state() | |
| self.highlight_mouse(destination_x, destination_y) | |
| self._page.mouse.move(destination_x, destination_y) | |
| self._page.wait_for_load_state() | |
| self._page.mouse.up() | |
| return self.current_state() | |
| def current_state(self) -> EnvState: | |
| self._page.wait_for_load_state() | |
| # Even if Playwright reports the page as loaded, it may not be so. | |
| # Add a manual sleep to make sure the page has finished rendering. | |
| time.sleep(0.5) | |
| screenshot_bytes = self._page.screenshot(type="png", full_page=False) | |
| # del 保存截图 | |
| from datetime import datetime | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"logs/screenshot_{timestamp}.png" | |
| with open(filename, "wb") as f: | |
| f.write(screenshot_bytes) | |
| return EnvState(screenshot=screenshot_bytes, url=self._page.url) | |
| def screen_size(self) -> tuple[int, int]: | |
| viewport_size = self._page.viewport_size | |
| # If available, try to take the local playwright viewport size. | |
| if viewport_size: | |
| return viewport_size["width"], viewport_size["height"] | |
| # If unavailable, fall back to the original provided size. | |
| return self._screen_size | |
| def highlight_mouse(self, x: int, y: int): | |
| if not self._highlight_mouse: | |
| return | |
| self._page.evaluate( | |
| f""" | |
| () => {{ | |
| const element_id = "playwright-feedback-circle"; | |
| const div = document.createElement('div'); | |
| div.id = element_id; | |
| div.style.pointerEvents = 'none'; | |
| div.style.border = '4px solid red'; | |
| div.style.borderRadius = '50%'; | |
| div.style.width = '20px'; | |
| div.style.height = '20px'; | |
| div.style.position = 'fixed'; | |
| div.style.zIndex = '9999'; | |
| document.body.appendChild(div); | |
| div.hidden = false; | |
| div.style.left = {x} - 10 + 'px'; | |
| div.style.top = {y} - 10 + 'px'; | |
| setTimeout(() => {{ | |
| div.hidden = true; | |
| }}, 2000); | |
| }} | |
| """ | |
| ) | |
| # Wait a bit for the user to see the cursor. | |
| time.sleep(1) | |