Yihan-Wang's picture
Upload folder using huggingface_hub
4d2fcd2 verified
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import termcolor
import time
import os
import sys
from ..computer import (
Computer,
EnvState,
)
import playwright.sync_api
from playwright.sync_api import sync_playwright
from typing import Literal
import glob
# Define a mapping from the user-friendly key names to Playwright's expected key names.
# Playwright is generally good with case-insensitivity for these, but it's best to be canonical.
# See: https://playwright.dev/docs/api/class-keyboard#keyboard-press
# Keys like 'a', 'b', '1', '$' are passed directly.
PLAYWRIGHT_KEY_MAP = {
"backspace": "Backspace",
"tab": "Tab",
"return": "Enter", # Playwright uses 'Enter'
"enter": "Enter",
"shift": "Shift",
"control": "ControlOrMeta",
"alt": "Alt",
"escape": "Escape",
"space": "Space", # Can also just be " "
"pageup": "PageUp",
"pagedown": "PageDown",
"end": "End",
"home": "Home",
"left": "ArrowLeft",
"up": "ArrowUp",
"right": "ArrowRight",
"down": "ArrowDown",
"insert": "Insert",
"delete": "Delete",
"semicolon": ";", # For actual character ';'
"equals": "=", # For actual character '='
"multiply": "Multiply", # NumpadMultiply
"add": "Add", # NumpadAdd
"separator": "Separator", # Numpad specific
"subtract": "Subtract", # NumpadSubtract, or just '-' for character
"decimal": "Decimal", # NumpadDecimal, or just '.' for character
"divide": "Divide", # NumpadDivide, or just '/' for character
"f1": "F1",
"f2": "F2",
"f3": "F3",
"f4": "F4",
"f5": "F5",
"f6": "F6",
"f7": "F7",
"f8": "F8",
"f9": "F9",
"f10": "F10",
"f11": "F11",
"f12": "F12",
"command": "Meta", # 'Meta' is Command on macOS, Windows key on Windows
}
class PlaywrightComputer(Computer):
"""Connects to a local Playwright instance."""
def __init__(
self,
screen_size: tuple[int, int],
initial_url: str = "https://www.google.com",
search_engine_url: str = "https://www.google.com",
highlight_mouse: bool = False,
):
self._initial_url = initial_url
self._screen_size = screen_size
self._search_engine_url = search_engine_url
self._highlight_mouse = highlight_mouse
def _handle_new_page(self, new_page: playwright.sync_api.Page):
"""The Computer Use model only supports a single tab at the moment.
Some websites, however, try to open links in a new tab.
For those situations, we intercept the page-opening behavior, and instead overwrite the current page.
"""
new_url = new_page.url
new_page.close()
self._page.goto(new_url)
# del
def cleanup_old_screenshots(self, logs_dir="logs", max_count=30):
"""
删除旧的截图文件,只保留最新的 max_count 个
"""
# 确保日志目录存在
if not os.path.exists(logs_dir):
return
# 获取所有的png文件
pattern = os.path.join(logs_dir, "screenshot_*.png")
png_files = glob.glob(pattern)
# 如果文件数量超过最大限制,删除旧的
if len(png_files) > max_count:
# 按修改时间排序,最新的在前面
png_files.sort(key=os.path.getmtime, reverse=True)
# 要删除的文件(保留最新的 max_count 个)
files_to_delete = png_files[max_count:]
# 删除旧文件
for file_path in files_to_delete:
try:
os.remove(file_path)
except Exception as e:
raise ValueError(f"Failed to delete screenshots {file_path}: {e}")
def __enter__(self):
print("Creating session...")
self._playwright = sync_playwright().start()
self._browser = self._playwright.chromium.launch(
args=[
"--disable-extensions",
"--disable-file-system",
"--disable-plugins",
"--disable-dev-shm-usage",
"--disable-background-networking",
"--disable-default-apps",
"--disable-sync",
# No '--no-sandbox' arg means the sandbox is on.
],
headless=bool(os.environ.get("PLAYWRIGHT_HEADLESS", False)),
)
self.cleanup_old_screenshots()
os.makedirs("cache/user_data", exist_ok=True) # 定义用户数据存储路径
storage_state = "cache/user_data/state.json" if os.path.exists("cache/user_data/state.json") else None
self._context = self._browser.new_context(
viewport={
"width": self._screen_size[0],
"height": self._screen_size[1],
},
storage_state=storage_state
)
# user_data_dir="cache/user_data", # 保存用户信息,避免重复登录
self._page = self._context.new_page()
self._page.goto(self._initial_url, wait_until="domcontentloaded")
# # 放大110%字体大小,避免看不清
# self._page.evaluate("""
# (() => {
# const style = document.createElement('style');
# style.innerHTML = `
# * {
# font-size: 100% !important;
# }
# `;
# document.head.appendChild(style);
# })();
# """)
# 作为监听器,当self._context打开一个新tab时触发。拦截打开新的tab, 但记录打开的tab需要去到的url, 在当前页面中导航去到新url
# 这个CUM在一个时刻只支持一个tab
self._context.on("page", self._handle_new_page)
# 关于左侧历史消息导航的侧边拦,对齐prompt中对genflow UI的描述,如果不是导航genflow, 可以跳过
try:
chat_nav = self._page.locator('div[data-v-74198486].chat-nav')
is_expand = None
if chat_nav.count() > 0:
class_name = chat_nav.get_attribute("class")
if "show" in class_name:
is_expand = True
else:
is_expand = False
print(class_name)
else:
print("未找到左侧历史消息扩展栏元素")
if is_expand:
expand_icon = self._page.locator('.expand-icon')
if expand_icon.count() > 0:
expand_icon.click()
is_expand = False
else:
print("未找到左侧历史消息扩展栏点击元素")
except Exception as e:
raise ValueError("未找到左侧历史消息扩展栏元素")
termcolor.cprint(
f"Started local playwright.",
color="green",
attrs=["bold"],
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._context:
self._context.storage_state(path="cache/user_data/state.json") # 这里在异常结束的时候是保存不了的
self._context.close()
try:
self._browser.close()
except Exception as e:
# Browser was already shut down because of SIGINT or such.
if "Browser.close: Connection closed while reading from the driver" in str(
e
):
pass
else:
raise
self._playwright.stop()
def open_web_browser(self) -> EnvState:
return self.current_state()
def click_at(self, x: int, y: int):
self.highlight_mouse(x, y)
self._page.mouse.click(x, y)
self._page.wait_for_load_state()
return self.current_state()
def hover_at(self, x: int, y: int):
self.highlight_mouse(x, y)
self._page.mouse.move(x, y)
self._page.wait_for_load_state()
return self.current_state()
def type_text_at(
self,
x: int,
y: int,
text: str,
press_enter: bool = False,
clear_before_typing: bool = True,
) -> EnvState:
self.highlight_mouse(x, y)
self._page.mouse.click(x, y)
self._page.wait_for_load_state()
if clear_before_typing:
if sys.platform == "darwin":
self.key_combination(["Command", "A"])
else:
self.key_combination(["Control", "A"])
self.key_combination(["Delete"])
self._page.keyboard.type(text)
self._page.wait_for_load_state()
if press_enter:
self.key_combination(["Enter"])
self._page.wait_for_load_state()
return self.current_state()
def _horizontal_document_scroll(
self, direction: Literal["left", "right"]
) -> EnvState:
# Scroll by 50% of the viewport size.
horizontal_scroll_amount = self.screen_size()[0] // 2
if direction == "left":
sign = "-"
else:
sign = ""
scroll_argument = f"{sign}{horizontal_scroll_amount}"
# Scroll using JS.
self._page.evaluate(f"window.scrollBy({scroll_argument}, 0); ")
self._page.wait_for_load_state()
return self.current_state()
def scroll_document(
self, direction: Literal["up", "down", "left", "right"]
) -> EnvState:
if direction == "down":
return self.key_combination(["PageDown"])
elif direction == "up":
return self.key_combination(["PageUp"])
elif direction in ("left", "right"):
return self._horizontal_document_scroll(direction)
else:
raise ValueError("Unsupported direction: ", direction)
def scroll_at(
self,
x: int,
y: int,
direction: Literal["up", "down", "left", "right"],
magnitude: int = 800,
) -> EnvState:
self.highlight_mouse(x, y)
self._page.mouse.move(x, y)
self._page.wait_for_load_state()
dx = 0
dy = 0
if direction == "up":
dy = -magnitude
elif direction == "down":
dy = magnitude
elif direction == "left":
dx = -magnitude
elif direction == "right":
dx = magnitude
else:
raise ValueError("Unsupported direction: ", direction)
self._page.mouse.wheel(dx, dy)
self._page.wait_for_load_state()
return self.current_state()
def wait_5_seconds(self) -> EnvState:
# del
time.sleep(20)
return self.current_state()
def go_back(self) -> EnvState:
self._page.go_back()
self._page.wait_for_load_state()
return self.current_state()
def go_forward(self) -> EnvState:
self._page.go_forward()
self._page.wait_for_load_state()
return self.current_state()
def search(self) -> EnvState:
return self.navigate(self._search_engine_url)
def navigate(self, url: str) -> EnvState:
normalized_url = url
if not normalized_url.startswith(("http://", "https://")):
normalized_url = "https://" + normalized_url
self._page.goto(normalized_url)
self._page.wait_for_load_state()
return self.current_state()
def key_combination(self, keys: list[str]) -> EnvState:
# Normalize all keys to the Playwright compatible version.
keys = [PLAYWRIGHT_KEY_MAP.get(k.lower(), k) for k in keys]
for key in keys[:-1]:
self._page.keyboard.down(key)
self._page.keyboard.press(keys[-1])
for key in reversed(keys[:-1]):
self._page.keyboard.up(key)
return self.current_state()
def drag_and_drop(
self, x: int, y: int, destination_x: int, destination_y: int
) -> EnvState:
self.highlight_mouse(x, y)
self._page.mouse.move(x, y)
self._page.wait_for_load_state()
self._page.mouse.down()
self._page.wait_for_load_state()
self.highlight_mouse(destination_x, destination_y)
self._page.mouse.move(destination_x, destination_y)
self._page.wait_for_load_state()
self._page.mouse.up()
return self.current_state()
def current_state(self) -> EnvState:
self._page.wait_for_load_state()
# Even if Playwright reports the page as loaded, it may not be so.
# Add a manual sleep to make sure the page has finished rendering.
time.sleep(0.5)
screenshot_bytes = self._page.screenshot(type="png", full_page=False)
# del 保存截图
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"logs/screenshot_{timestamp}.png"
with open(filename, "wb") as f:
f.write(screenshot_bytes)
return EnvState(screenshot=screenshot_bytes, url=self._page.url)
def screen_size(self) -> tuple[int, int]:
viewport_size = self._page.viewport_size
# If available, try to take the local playwright viewport size.
if viewport_size:
return viewport_size["width"], viewport_size["height"]
# If unavailable, fall back to the original provided size.
return self._screen_size
def highlight_mouse(self, x: int, y: int):
if not self._highlight_mouse:
return
self._page.evaluate(
f"""
() => {{
const element_id = "playwright-feedback-circle";
const div = document.createElement('div');
div.id = element_id;
div.style.pointerEvents = 'none';
div.style.border = '4px solid red';
div.style.borderRadius = '50%';
div.style.width = '20px';
div.style.height = '20px';
div.style.position = 'fixed';
div.style.zIndex = '9999';
document.body.appendChild(div);
div.hidden = false;
div.style.left = {x} - 10 + 'px';
div.style.top = {y} - 10 + 'px';
setTimeout(() => {{
div.hidden = true;
}}, 2000);
}}
"""
)
# Wait a bit for the user to see the cursor.
time.sleep(1)