From ffb214addf87dfa8a15526c7e0bbbf1b8868e76d Mon Sep 17 00:00:00 2001 From: Max Novich Date: Sun, 3 Nov 2024 21:39:12 -0800 Subject: [PATCH] feat: add browser toolkit (#179) Co-authored-by: Michael Neale --- .github/workflows/scripts/check_licenses.py | 1 + pyproject.toml | 4 + src/goose/cli/session_notifier.py | 3 + src/goose/toolkit/prompts/browser.jinja | 41 ++ src/goose/toolkit/web_browser.py | 439 ++++++++++++++++++++ tests/toolkit/test_web_browser.py | 32 ++ 6 files changed, 520 insertions(+) create mode 100644 src/goose/toolkit/prompts/browser.jinja create mode 100644 src/goose/toolkit/web_browser.py create mode 100644 tests/toolkit/test_web_browser.py diff --git a/.github/workflows/scripts/check_licenses.py b/.github/workflows/scripts/check_licenses.py index 8b118db0..395121d6 100755 --- a/.github/workflows/scripts/check_licenses.py +++ b/.github/workflows/scripts/check_licenses.py @@ -47,6 +47,7 @@ class LicenseConfig: "MIT", "BSD-3-Clause", "Apache-2.0", + "Apache License 2", "Apache Software License", "Python Software Foundation License", "BSD License", diff --git a/pyproject.toml b/pyproject.toml index 11f0a452..fd1924f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,9 @@ dependencies = [ "prompt-toolkit>=3.0.47", "keyring>=25.4.1", "langfuse>=2.38.2", + "selenium>=4.0.0", + "beautifulsoup4>=4.9.3", + "pyshadow<=0.0.5" ] author = [{ name = "Block", email = "ai-oss-tools@block.xyz" }] packages = [{ include = "goose", from = "src" }] @@ -31,6 +34,7 @@ screen = "goose.toolkit.screen:Screen" reasoner = "goose.toolkit.reasoner:Reasoner" repo_context = "goose.toolkit.repo_context.repo_context:RepoContext" synopsis = "goose.synopsis.toolkit:SynopsisDeveloper" +browser = "goose.toolkit.web_browser:BrowserToolkit" [project.entry-points."goose.profile"] default = "goose.profile:default_profile" diff --git a/src/goose/cli/session_notifier.py b/src/goose/cli/session_notifier.py index d29ce944..fb76ff8a 100644 --- a/src/goose/cli/session_notifier.py +++ b/src/goose/cli/session_notifier.py @@ -11,6 +11,9 @@ class SessionNotifier(Notifier): self.status_indicator = status_indicator self.live = Live(self.status_indicator, refresh_per_second=8, transient=True) + def notify(self, message: str) -> None: + print(f"Notification: {message}") + def log(self, content: RenderableType) -> None: print(content) diff --git a/src/goose/toolkit/prompts/browser.jinja b/src/goose/toolkit/prompts/browser.jinja new file mode 100644 index 00000000..4f322999 --- /dev/null +++ b/src/goose/toolkit/prompts/browser.jinja @@ -0,0 +1,41 @@ +BrowserToolkit is a selenium-based toolset for automated web interactions. +This is useful when the best way to load content, or run a search, perform an action as a user on a page, test a page fill out a form etc requires a real browser to render, run javascript etc. + +You should keep the browser open if needed, as the user may be able to log in and interact to help out if you ask. + +Requests could include: +* searching for an item using a websites search feature +* filling out a form +* reading content +* testing a page or viewing a page +* accessing social media (in which case you check user can log in) +* performing a web search + + +You will use combinations of these tools to take the relevant actions to satisfy the user's requests: + +- **navigate_to(url: str)**: Load and navigate to the specified URL in the web driver. The tool ensures the page has fully loaded before proceeding. + +- **get_html_content()**: Extract the HTML content of the current page and store it in a cached file. Use this to retrieve the latest cache file for offline HTML analysis. + +- **type_into_input(selector: str, text: str, click_enter: False, click_tab: False)**: Type specified text into an input element located by a CSS selector. Simulates human typing for natural input + +- **click_element(selector: str)**: Click an element (button/link) identified by a CSS selector. Use this to interact with webpage elements directly. + +- **find_element_by_text_soup(text: str, filename: str)**: Search for an element containing specific text using BeautifulSoup, sourcing from the cached HTML file. Useful for text-based element queries. + +- **take_browser_screenshot(filename: str)**: Capture a screenshot of the current browser window and save it to a file. Use this for visual verification. + +- **find_elements_of_type(tag_type: str, filename: str)**: Find all elements of a specific HTML tag type using BeautifulSoup, sourcing from the cached HTML file. Useful for retrieving multiple elements of the same type. + +### Important Note on Element Selection: + +When using tools that require CSS selectors or text identification, ensure that: + +1. **Precision**: Selectors must be accurate and precise. The specificity of CSS selectors should match the target element precisely to avoid selection errors. + +2. **DOM Considerations**: Some elements may reside within shadow DOMs, requiring special handling using tools like PyShadow, or may not be visible in the default DOM structure. + +3. **Element Types**: Elements may not always be of the expected type or have attributes you're searching for. Consider the tree structure and hierarchy when querying elements. + +This toolkit facilitates browser automation by scripting user interactions and processing web content efficiently. diff --git a/src/goose/toolkit/web_browser.py b/src/goose/toolkit/web_browser.py new file mode 100644 index 00000000..038e7753 --- /dev/null +++ b/src/goose/toolkit/web_browser.py @@ -0,0 +1,439 @@ +import importlib.util +import os +import random +import shutil +import subprocess +import sys +import time +from typing import Callable + +# Windows-specific import +# if sys.platform.startswith("win"): +# import winreg + +# Check and install selenium if not installed +if importlib.util.find_spec("selenium") is None: + subprocess.check_call(["python", "-m", "pip", "install", "selenium"]) +from bs4 import BeautifulSoup +from exchange import Message +from pyshadow.main import Shadow +from selenium import webdriver +from selenium.common.exceptions import InvalidSessionIdException, NoSuchElementException, TimeoutException +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as ec +from selenium.webdriver.support.ui import WebDriverWait + +from goose.toolkit.base import Toolkit, tool + + +class BrowserToolkit(Toolkit): + """A toolkit for interacting with web browsers using Selenium.""" + + def __init__(self, *args: object, **kwargs: dict[str, object]) -> None: + super().__init__(*args, **kwargs) + self.driver = None + self.history = [] + self.session_dir = ".goose/browsing_session" + os.makedirs(self.session_dir, exist_ok=True) + self.cached_url = "" + + def _initialize_driver(self, force_restart: bool = False, mock_driver: object = None) -> None: + """Initialize the web driver if not already initialized or if a restart is forced.""" + if self.driver is None or force_restart: + if mock_driver: + self.driver = mock_driver + return + if self.driver is not None: + try: + self.driver.quit() + self.notifier.notify("Previous browser session closed.") + except Exception as e: + self.notifier.notify(f"Error closing previous session: {str(e)}") + self.driver = None + subprocess.run(["pkill", "-f", "webdriver"]) # Attempt to close all previous browser instances + self.notifier.notify("All previous browser instances terminated.") + if self.driver is not None: + try: + self.driver.quit() + except Exception as e: + self.notifier.notify(f"Error closing driver: {str(e)}") + + browser_name = self._get_default_browser() + + try: + if "chrome" in browser_name.lower(): + options = webdriver.ChromeOptions() + self.driver = webdriver.Chrome(options=options) + elif "firefox" in browser_name.lower(): + self.driver = webdriver.Firefox() + else: + self.driver = webdriver.Firefox() + + try: + self.driver.set_window_size(835, 1024) + except Exception: + pass # Ignore window sizing errors if they occur + except Exception as e: + self.notifier.notify(f"Failed to initialize browser driver: {str(e)}") + self.notifier.notify("Falling back to Firefox.") + self.driver = webdriver.Firefox() + + def _get_default_browser(self) -> str: + return get_default_browser() + + def system(self) -> str: + return Message.load("prompts/browser.jinja").text + + def safe_execute(self, func: Callable, *args: object, **kwargs: dict[str, object]) -> object: + """Safely execute a browser action, restart the driver if needed.""" + try: + return func(*args, **kwargs) + except (TimeoutException, NoSuchElementException, InvalidSessionIdException, Exception) as e: + self.notifier.notify(f"Error during browser action: {str(e)}") + self._initialize_driver(force_restart=True) + return func(*args, **kwargs) + + @tool + def navigate_to(self, url: str) -> None: + """Navigate or browse to a specified URL in the browser. + + Args: + url (str): The URL to navigate to. + """ + self._initialize_driver() + self.notifier.notify(f"Navigating to {url}") + self.safe_execute(self.driver.get, url) + self.wait_for_page_load() + self.history.append(url) + + @tool + def take_browser_screenshot(self, filename: str) -> str: + """Take a screenshot of the current browser window to assist with navigation. + + Args: + filename (str): The file path where the screenshot will be saved. + """ + try: + path = os.path.join(self.session_dir, filename) + self.driver.save_screenshot(path) + self.notifier.notify(f"Screenshot saved in browsing session: {path}") + return f"image:{path}" + except Exception as e: + self.notifier.notify(f"Error taking screenshot: {str(e)}") + + @tool + def open_new_tab(self, url: str) -> None: + """Open a new tab and navigate to the specified URL. + + Args: + url (str): The URL to navigate to in the new tab. + """ + if not self.driver: + self.notifier.notify("Driver not initialized, using navigate_to instead.") + self.navigate_to(url) + return + + self.notifier.notify(f"Opening a new tab and navigating to {url}.") + self.driver.execute_script(f"window.open('{url}', '_blank');") + self.driver.switch_to.window(self.driver.window_handles[-1]) + self.wait_for_page_load() + + @tool + def check_current_page_url(self) -> str: + """Get the URL of the current page.""" + if not self.driver: + self.notifier.notify("Driver is not initialized.") + return "" + + current_url = self.driver.current_url + self.notifier.notify(f"Current page URL: {current_url}") + return current_url + + @tool + def switch_to_tab(self, index: int) -> None: + """Switch to the browser tab at the specified index. + + Args: + index (int): The index of the tab to switch to. + """ + try: + self.notifier.notify(f"Switching to tab at index {index}.") + self.driver.switch_to.window(self.driver.window_handles[index]) + self.wait_for_page_load() + except IndexError: + self.notifier.notify(f"Invalid tab index: {index}.") + + @tool + def close_current_tab(self) -> None: + """Close the current browser tab.""" + if not self.driver: + self.notifier.notify("Cannot close the tab as the driver is not initialized.") + return + + self.notifier.notify("Closing the current tab.") + self.driver.close() + if len(self.driver.window_handles) > 0: + self.driver.switch_to.window(self.driver.window_handles[-1]) + + def refresh_page(self) -> None: + """Refresh the current browser page.""" + self.notifier.notify("Refreshing the current page.") + self.driver.refresh() + self.wait_for_page_load() + + @tool + def get_html_content(self) -> str: + """Extract the full HTML content of the current page and cache it to a file.""" + self.notifier.notify("Extracting full HTML content of the page.") + current_url = self.driver.current_url.replace("https://", "").replace("http://", "").replace("/", "_") + + if current_url != self.cached_url: + html_content = self.driver.page_source + filename = os.path.join(self.session_dir, f"{current_url}_page.html") + with open(filename, "w", encoding="utf-8") as f: + f.write(html_content) + self.cached_html_path = filename + self.cached_url = current_url + self.notifier.notify(f"HTML cached as {filename}.") + + return self.cached_html_path + + # @tool + # def run_js(self, script: str) -> str: + # """Execute custom JavaScript on the page. + # + # Args: + # script (str): JavaScript code to execute. + # + # Returns: + # str: The result of the JavaScript execution. + # """ + # self.notifier.notify("Running JavaScript in the browser.") + # return self.driver.execute_script(script) + + @tool + def type_into_input(self, selector: str, text: str) -> None: + """Type text into an input element specified by a CSS selector for the currently open page. + + Args: + selector (str): CSS selector string to locate the input element. + text (str): The text to type into the input element. + """ + retries = 3 + for attempt in range(retries): + try: + self.notifier.notify(f"Typing '{text}' into input with selector: {selector}") + element = WebDriverWait(self.driver, 20).until(ec.element_to_be_clickable((By.CSS_SELECTOR, selector))) + element.clear() + for char in text: + element.send_keys(char) + time.sleep(random.uniform(0.1, 0.3)) + break + except TimeoutException as e: + if attempt < retries - 1: + self.notifier.notify(f"Retry {attempt + 1}/{retries} due to timeout: {str(e)}") + time.sleep(2) + else: + raise + + def wait_for_page_load(self, timeout: int = 45) -> None: + """Wait for the page to fully load by checking the document readiness state. + + Args: + timeout (int): Maximum time to wait for page load, in seconds. + """ + WebDriverWait(self.driver, timeout).until( + lambda driver: driver.execute_script("return document.readyState") == "complete" + ) + self.notifier.notify("Page fully loaded.") + + @tool + def click_element(self, selector: str) -> None: + """Click a button or link specified by a CSS selector. + + Args: + selector (str): CSS selector string to locate the element. + """ + retries = 3 + for attempt in range(retries): + try: + self.notifier.notify(f"Clicking element with selector: {selector}") + element = WebDriverWait(self.driver, 20).until(ec.element_to_be_clickable((By.CSS_SELECTOR, selector))) + element.click() + self.wait_for_page_load() + break + except TimeoutException as e: + if attempt < retries - 1: + self.notifier.notify(f"Retry {attempt + 1}/{retries} due to timeout: {str(e)}") + time.sleep(2) + else: + raise + + @tool + def find_element_by_text_soup(self, text: str, filename: str) -> str: + """Find an element containing the specified text using BeautifulSoup on HTML content stored in a file. + If not found, fallback to Shadow DOM search using PyShadow. + + Args: + text (str): The text content to find within an element. + filename (str): The name of the file containing the HTML content. + + """ + # Search using BeautifulSoup as previously implemented + try: + with open(filename, "r", encoding="utf-8") as file: + soup = BeautifulSoup(file, "html.parser") + element = soup.find( + lambda tag: (tag.string and text in tag.string) + or (tag.get_text() and text in tag.get_text()) + or (tag.has_attr("title") and text in tag["title"]) + or (tag.has_attr("alt") and text in tag["alt"]) + or (tag.has_attr("aria-label") and text in tag["aria-label"]) + ) + + if element: + self.notifier.notify(f"Element found with text: {text}") + return str(element) + except FileNotFoundError: + self.notifier.notify(f"File not found: {filename}") + return None + + # Fallback: search using PyShadow + try: + shadow = Shadow(self.driver) + shadow_element = shadow.find_element_by_xpath(f"//*[contains(text(), '{text}')]") + if shadow_element: + self.notifier.notify(f"Element found in shadow DOM with text: {text}") + return shadow_element.get_attribute("outerHTML") + except Exception as e: + self.notifier.notify(f"Error searching in shadow DOM: {str(e)}") + + self.notifier.notify(f"Element not found with text: {text} in either DOMs") + return None + + @tool + def find_elements_of_type(self, tag_type: str, filename: str) -> list[str]: + """Find all elements of a specific tag type using BeautifulSoup on HTML content stored in a file. + + Args: + tag_type (str): The HTML tag type to search for. + filename (str): The name of the file containing the HTML content. + """ + elements_as_strings = [] + try: + with open(filename, "r", encoding="utf-8") as file: + soup = BeautifulSoup(file, "html.parser") + elements = soup.find_all(tag_type) + elements_as_strings = [str(element) for element in elements] + self.notifier.notify(f"Found {len(elements_as_strings)} elements of type: {tag_type}") + except FileNotFoundError: + self.notifier.notify(f"File not found: {filename}") + return elements_as_strings + + def __del__(self) -> None: + # Remove the entire session directory + if os.path.exists(self.session_dir): + try: + shutil.rmtree(self.session_dir) + self.notifier.notify(f"Removed browsing session directory: {self.session_dir}") + except OSError as e: + self.notifier.notify(f"Error removing session directory: {str(e)}") + + if self.driver: + self.driver.quit() + + +# def get_default_browser_windows() -> str: +# try: +# with winreg.OpenKey( +# winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\Shell\Associations\UrlAssociations\http\UserChoice" +# ) as key: +# prog_id, _ = winreg.QueryValueEx(key, "ProgId") +# +# with winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, f"{prog_id}\\shell\\open\\command") as cmd_key: +# command, _ = winreg.QueryValueEx(cmd_key, None) +# +# if command.startswith('"'): +# executable = command.split('"')[1] +# else: +# executable = command.split(" ")[0] +# +# return os.path.basename(executable) +# +# except Exception as e: +# print(f"Error retrieving default browser on Windows: {e}") +# return None + + +def get_default_browser_macos() -> str: + try: + import os + import plistlib + + plist_path = os.path.expanduser( + "~/Library/Preferences/com.apple.LaunchServices/com.apple.launchservices.secure.plist" + ) + + if not os.path.exists(plist_path): + print(f"Launch services plist not found at: {plist_path}") + return None + + with open(plist_path, "rb") as fp: + plist = plistlib.load(fp) + handlers = plist.get("LSHandlers", []) + + for handler in handlers: + scheme = handler.get("LSHandlerURLScheme") + if scheme and scheme.lower() == "http": + return handler.get("LSHandlerRoleAll") + + return None + except Exception as e: + print(f"Error retrieving default browser on macOS: {e}") + return None + + +# def get_default_browser_linux() -> str: +# try: +# result = subprocess.run( +# ["xdg-settings", "get", "default-web-browser"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True +# ) +# +# if result.returncode != 0: +# print(f"Error: {result.stderr.strip()}") +# return None +# +# desktop_file = result.stdout.strip() +# desktop_paths = [ +# os.path.expanduser("~/.local/share/applications/"), +# "/usr/share/applications/", +# "/usr/local/share/applications/", +# ] +# +# for path in desktop_paths: +# desktop_file_path = os.path.join(path, desktop_file) +# if os.path.exists(desktop_file_path): +# with open(desktop_file_path, "r") as f: +# for line in f: +# if line.startswith("Name="): +# name = line.split("=", 1)[1].strip() +# return name +# return desktop_file.replace(".desktop", "") +# +# except Exception as e: +# print(f"Error retrieving default browser on Linux: {e}") +# return None + + +def get_default_browser() -> str: + if sys.platform.startswith("darwin"): + return get_default_browser_macos() + # other platforms are not enabled yet. + # elif sys.platform.startswith("win"): + # return get_default_browser_windows() + # elif sys.platform.startswith("linux"): + # return get_default_browser_linux() + else: + print(f"Unsupported platform {sys.platform}") + return None + return None diff --git a/tests/toolkit/test_web_browser.py b/tests/toolkit/test_web_browser.py new file mode 100644 index 00000000..0d072005 --- /dev/null +++ b/tests/toolkit/test_web_browser.py @@ -0,0 +1,32 @@ +import pytest +from unittest.mock import MagicMock +from goose.toolkit.web_browser import BrowserToolkit + + +# Mock the webdriver +@pytest.fixture +def mock_driver(mocker): + mocker.patch("selenium.webdriver.Chrome") + mocker.patch("selenium.webdriver.Firefox") + + driver_mock = MagicMock() + + mocker.patch.object(BrowserToolkit, "_initialize_driver", return_value=None) + + return driver_mock + + +def test_html_content_extraction(mock_driver): + mock_notifier = MagicMock() + toolkit = BrowserToolkit(notifier=mock_notifier) + toolkit.driver = mock_driver + mock_driver.current_url = "http://example.com" + mock_driver.page_source = "TestPage" + + cached_html_path = toolkit.get_html_content() + + # Read from the cached HTML file and assert its content + with open(cached_html_path, "r", encoding="utf-8") as file: + html_content = file.read() + + assert html_content == "TestPage"