mirror of
https://github.com/aljazceru/goose.git
synced 2026-01-18 22:04:25 +01:00
fix: just adding stuff from developer.py to synopsis developer (#182)
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
# janky global state for now, think about it
|
||||
import re
|
||||
import subprocess
|
||||
import os
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
from typing import Dict
|
||||
|
||||
from exchange import Message
|
||||
import httpx
|
||||
from goose.synopsis.system import system
|
||||
from goose.toolkit.base import Toolkit, tool
|
||||
from goose.toolkit.utils import RULEPREFIX, RULESTYLE, get_language
|
||||
@@ -242,3 +245,36 @@ class SynopsisDeveloper(Toolkit):
|
||||
self.logshell(f"cd {path}")
|
||||
system.cwd = str(patho)
|
||||
return path
|
||||
|
||||
@tool
|
||||
def fetch_web_content(self, url: str) -> str:
|
||||
"""
|
||||
Fetch content from a URL using httpx.
|
||||
|
||||
Args:
|
||||
url (str): url of the site to visit.
|
||||
Returns:
|
||||
(dict): A dictionary with two keys:
|
||||
- 'html_file_path' (str): Path to a html file which has the content of the page. It will be very large so use rg to search it or head in chunks. Will contain meta data and links and markup.
|
||||
- 'text_file_path' (str): Path to a plain text file which has the some of the content of the page. It will be large so use rg to search it or head in chunks. If content isn't there, try the html variant.
|
||||
""" # noqa
|
||||
friendly_name = re.sub(r"[^a-zA-Z0-9]", "_", url)[:50] # Limit length to prevent filenames from being too long
|
||||
|
||||
try:
|
||||
result = httpx.get(url, follow_redirects=True).text
|
||||
with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=f"_{friendly_name}.html") as tmp_file:
|
||||
tmp_file.write(result)
|
||||
tmp_text_file_path = tmp_file.name.replace(".html", ".txt")
|
||||
plain_text = re.sub(
|
||||
r"<head.*?>.*?</head>|<script.*?>.*?</script>|<style.*?>.*?</style>|<[^>]+>",
|
||||
"",
|
||||
result,
|
||||
flags=re.DOTALL,
|
||||
) # Remove head, script, and style tags/content, then any other tags
|
||||
with open(tmp_text_file_path, "w") as text_file:
|
||||
text_file.write(plain_text)
|
||||
return {"html_file_path": tmp_file.name, "text_file_path": tmp_text_file_path}
|
||||
except httpx.HTTPStatusError as exc:
|
||||
self.notifier.log(f"Failed fetching with HTTP error: {exc.response.status_code}")
|
||||
except Exception as exc:
|
||||
self.notifier.log(f"Failed fetching with error: {str(exc)}")
|
||||
|
||||
@@ -93,3 +93,23 @@ def test_cancel_process(toolkit, tmpdir):
|
||||
# Verify that the process is no longer in the list
|
||||
processes = toolkit.list_processes()
|
||||
assert process_id not in processes
|
||||
|
||||
|
||||
def test_fetch_web_content(toolkit):
|
||||
url = "http://example.com"
|
||||
|
||||
result = toolkit.fetch_web_content(url)
|
||||
assert "html_file_path" in result
|
||||
assert "text_file_path" in result
|
||||
|
||||
html_file_path = result["html_file_path"]
|
||||
text_file_path = result["text_file_path"]
|
||||
|
||||
with open(html_file_path, "r") as html_file:
|
||||
fetched_content = html_file.read()
|
||||
|
||||
assert "Example Domain" in fetched_content
|
||||
|
||||
with open(text_file_path, "r") as html_file:
|
||||
fetched_content = html_file.read()
|
||||
assert "Example Domain" in fetched_content
|
||||
|
||||
Reference in New Issue
Block a user