| import os |
| import time |
| import base64 |
| import subprocess |
| import json |
| import requests |
| from typing import Optional, Dict, Any, List |
| from pathlib import Path |
| from PIL import Image |
| import io |
| from loguru import logger |
| from .x11_computer import X11Computer |
|
|
| |
| logger.add("/app/logs/agent.log", rotation="100 MB", retention="7 days") |
|
|
| class GeminiClient: |
| """Client for interacting with Gemini API""" |
| |
| def __init__(self, api_key: str): |
| self.api_key = api_key |
| self.url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={self.api_key}" |
| |
| def generate_actions(self, task: str, screenshot_base64: Optional[str] = None) -> List[Dict[str, Any]]: |
| """ |
| Generate actions based on task and screenshot |
| """ |
| system_prompt = """ |
| You are a Computer-Using Agent capable of controlling a Linux desktop. |
| You will receive a task description and a screenshot of the current screen. |
| |
| Your goal is to generate a list of actions to accomplish the task. |
| |
| Supported actions: |
| - {"action": "mousemove", "x": int, "y": int} -> Moves mouse to coordinates (click_at/hover_at) |
| - {"action": "click", "button": int} -> Clicks mouse button (1=left) |
| - {"action": "type", "text": str} -> Types text |
| - {"action": "key", "key": str} -> Presses key combination (e.g., "Return", "ctrl+c") |
| - {"action": "launch", "app": str} -> Launches application |
| - {"action": "wait", "seconds": float} -> Waits |
| - {"action": "done", "message": str} -> Task completed |
| - {"action": "fail", "message": str} -> Task failed |
| |
| Return ONLY a JSON array of actions. |
| """ |
| |
| parts = [{"text": system_prompt}, {"text": f"Task: {task}"}] |
| |
| if screenshot_base64: |
| parts.append({ |
| "inline_data": { |
| "mime_type": "image/png", |
| "data": screenshot_base64 |
| } |
| }) |
| |
| data = { |
| "contents": [{"parts": parts}], |
| "generationConfig": { |
| "temperature": 0.1, |
| "maxOutputTokens": 1024, |
| "responseMimeType": "application/json" |
| } |
| } |
| |
| try: |
| response = requests.post(self.url, json=data, headers={"Content-Type": "application/json"}) |
| if response.status_code == 200: |
| result = response.json() |
| try: |
| text = result['candidates'][0]['content']['parts'][0]['text'] |
| text = text.replace("```json", "").replace("```", "").strip() |
| return json.loads(text) |
| except (KeyError, json.JSONDecodeError) as e: |
| logger.error(f"Failed to parse Gemini response: {e}") |
| return [{"action": "fail", "message": "Failed to parse AI response"}] |
| else: |
| logger.error(f"Gemini API error: {response.text}") |
| return [{"action": "fail", "message": f"API Error: {response.status_code}"}] |
| except Exception as e: |
| logger.error(f"Request failed: {e}") |
| return [{"action": "fail", "message": f"Connection failed: {str(e)}"}] |
|
|
|
|
| class ComputerUsingAgent: |
| """ |
| Computer-Using Agent that can interact with desktop environment |
| using the standard Computer interface |
| """ |
| |
| def __init__(self): |
| self.display = os.getenv("DISPLAY", ":1") |
| self.computer = X11Computer(self.display) |
| self.current_task = None |
| self.task_status = "idle" |
| |
| |
| api_key = os.getenv("GEMINI_API_KEY", "AIzaSyCXd43s3-sCSUJPkkXa1-LzXCMzFc9_xMI") |
| self.llm = GeminiClient(api_key) |
| |
| logger.info("Computer-Using Agent initialized with X11Computer") |
| |
| def execute_task(self, task_description: str) -> Dict[str, Any]: |
| """Execute a task using Gemini for reasoning and Computer interface for action""" |
| self.current_task = task_description |
| self.task_status = "running" |
| logger.info(f"Executing task: {task_description}") |
| |
| steps_executed = [] |
| final_message = "" |
| success = False |
| |
| try: |
| |
| state = self.computer.current_state() |
| screenshot_b64 = base64.b64encode(state.screenshot).decode() if state.screenshot else None |
| |
| |
| actions = self.llm.generate_actions(task_description, screenshot_b64) |
| |
| |
| for action in actions: |
| act_type = action.get("action") |
| |
| if act_type == "done": |
| success = True |
| final_message = action.get("message", "Task completed") |
| break |
| |
| if act_type == "fail": |
| success = False |
| final_message = action.get("message", "Task failed") |
| break |
| |
| |
| try: |
| if act_type == "mousemove": |
| self.computer.hover_at(action["x"], action["y"]) |
| elif act_type == "click": |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| subprocess.run(["xdotool", "click", str(action.get("button", 1))], |
| env={**os.environ, "DISPLAY": self.display}) |
| |
| elif act_type == "type": |
| |
| |
| subprocess.run(["xdotool", "type", "--", action["text"]], |
| env={**os.environ, "DISPLAY": self.display}) |
| |
| elif act_type == "key": |
| self.computer.key_combination([action["key"]]) |
| |
| elif act_type == "launch": |
| if action["app"] == "firefox": |
| self.computer.open_web_browser() |
| else: |
| |
| subprocess.Popen([action["app"]], |
| env={**os.environ, "DISPLAY": self.display}, |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| time.sleep(2) |
| |
| elif act_type == "wait": |
| self.computer.wait_5_seconds() |
| |
| steps_executed.append(f"Executed: {act_type} {action}") |
| |
| except Exception as e: |
| logger.error(f"Action execution failed: {e}") |
| steps_executed.append(f"Failed: {act_type} - {e}") |
|
|
| if not final_message: |
| final_message = "Actions executed." |
| success = True |
|
|
| |
| final_state = self.computer.current_state() |
| final_screenshot = base64.b64encode(final_state.screenshot).decode() if final_state.screenshot else None |
| |
| self.task_status = "completed" if success else "failed" |
| |
| return { |
| "success": success, |
| "message": final_message, |
| "steps_executed": steps_executed, |
| "screenshot": final_screenshot, |
| "task": task_description |
| } |
| |
| except Exception as e: |
| logger.error(f"Task execution error: {e}") |
| self.task_status = "error" |
| return { |
| "success": False, |
| "message": f"Error: {str(e)}", |
| "steps_executed": steps_executed, |
| "screenshot": None, |
| "task": task_description |
| } |
| finally: |
| self.current_task = None |
| |
| def stop(self): |
| """Stop current task""" |
| logger.info("Stopping current task") |
| self.task_status = "stopped" |
| self.current_task = None |
| |
| def get_status(self) -> Dict[str, Any]: |
| """Get current agent status""" |
| return { |
| "status": self.task_status, |
| "current_task": self.current_task, |
| "display": self.display |
| } |
|
|
|
|