mirror of
https://github.com/aljazceru/Auto-GPT.git
synced 2026-01-23 07:54:27 +01:00
WIP: add file context functionality
This commit is contained in:
@@ -11,12 +11,6 @@ if TYPE_CHECKING:
|
||||
from autogpt.memory.vector import VectorMemory
|
||||
from autogpt.models.command_registry import CommandRegistry
|
||||
|
||||
from autogpt.agents.utils.exceptions import (
|
||||
AgentException,
|
||||
CommandExecutionError,
|
||||
InvalidAgentResponseError,
|
||||
UnknownCommandError,
|
||||
)
|
||||
from autogpt.json_utils.utilities import extract_dict_from_response, validate_dict
|
||||
from autogpt.llm.api_manager import ApiManager
|
||||
from autogpt.llm.base import Message
|
||||
@@ -40,9 +34,16 @@ from autogpt.models.context_item import ContextItem
|
||||
from autogpt.workspace import Workspace
|
||||
|
||||
from .base import BaseAgent
|
||||
from .utils.context import ContextMixin
|
||||
from .utils.exceptions import (
|
||||
AgentException,
|
||||
CommandExecutionError,
|
||||
InvalidAgentResponseError,
|
||||
UnknownCommandError,
|
||||
)
|
||||
|
||||
|
||||
class Agent(BaseAgent):
|
||||
class Agent(BaseAgent, ContextMixin):
|
||||
"""Agent class for interacting with Auto-GPT."""
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -94,6 +94,9 @@ class BaseAgent(metaclass=ABCMeta):
|
||||
max_summary_tlength=summary_max_tlength or self.send_token_limit // 6,
|
||||
)
|
||||
|
||||
# Support multi-inheritance
|
||||
super(BaseAgent, self).__init__()
|
||||
|
||||
def think(
|
||||
self,
|
||||
instruction: Optional[str] = None,
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..base import BaseAgent
|
||||
|
||||
from autogpt.models.context_item import ContextItem
|
||||
|
||||
|
||||
@@ -10,6 +17,9 @@ class AgentContext:
|
||||
def __bool__(self) -> bool:
|
||||
return len(self.items) > 0
|
||||
|
||||
def __contains__(self, item: ContextItem):
|
||||
return any([i.source == item.source for i in self.items])
|
||||
|
||||
def add(self, item: ContextItem) -> None:
|
||||
self.items.append(item)
|
||||
|
||||
@@ -21,3 +31,20 @@ class AgentContext:
|
||||
|
||||
def format_numbered(self) -> str:
|
||||
return "\n\n".join([f"{i}. {c}" for i, c in enumerate(self.items, 1)])
|
||||
|
||||
|
||||
class ContextMixin:
|
||||
"""Mixin that adds context support to a class"""
|
||||
|
||||
context: AgentContext
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(ContextMixin, self).__init__(**kwargs)
|
||||
self.context = AgentContext()
|
||||
|
||||
|
||||
def get_agent_context(agent: BaseAgent) -> AgentContext | None:
|
||||
if isinstance(agent, ContextMixin):
|
||||
return agent.context
|
||||
|
||||
return None
|
||||
|
||||
@@ -6,7 +6,9 @@ from autogpt.agents.agent import Agent
|
||||
from autogpt.logs import logger
|
||||
|
||||
|
||||
def sanitize_path_arg(arg_name: str):
|
||||
def sanitize_path_arg(arg_name: str, make_relative: bool = False):
|
||||
"""Sanitizes the specified path (str | Path) argument, resolving it to a Path"""
|
||||
|
||||
def decorator(func: Callable):
|
||||
# Get position of path parameter, in case it is passed as a positional argument
|
||||
try:
|
||||
@@ -45,9 +47,15 @@ def sanitize_path_arg(arg_name: str):
|
||||
)
|
||||
if given_path:
|
||||
if given_path in {"", "/"}:
|
||||
sanitized_path = str(agent.workspace.root)
|
||||
sanitized_path = agent.workspace.root
|
||||
else:
|
||||
sanitized_path = str(agent.workspace.get_path(given_path))
|
||||
sanitized_path = agent.workspace.get_path(given_path)
|
||||
|
||||
# Make path relative if possible
|
||||
if make_relative and sanitized_path.is_relative_to(
|
||||
agent.workspace.root
|
||||
):
|
||||
sanitized_path = sanitized_path.relative_to(agent.workspace.root)
|
||||
|
||||
if arg_name in kwargs:
|
||||
kwargs[arg_name] = sanitized_path
|
||||
|
||||
@@ -75,7 +75,7 @@ def execute_python_code(code: str, name: str, agent: Agent) -> str:
|
||||
with open(file_path, "w+", encoding="utf-8") as f:
|
||||
f.write(code)
|
||||
|
||||
return execute_python_file(str(file_path), agent)
|
||||
return execute_python_file(file_path, agent)
|
||||
except Exception as e:
|
||||
raise CommandExecutionError(*e.args)
|
||||
|
||||
@@ -92,11 +92,11 @@ def execute_python_code(code: str, name: str, agent: Agent) -> str:
|
||||
},
|
||||
)
|
||||
@sanitize_path_arg("filename")
|
||||
def execute_python_file(filename: str, agent: Agent) -> str:
|
||||
def execute_python_file(filename: Path, agent: Agent) -> str:
|
||||
"""Execute a Python file in a Docker container and return the output
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to execute
|
||||
filename (Path): The name of the file to execute
|
||||
|
||||
Returns:
|
||||
str: The output of the file
|
||||
@@ -105,10 +105,10 @@ def execute_python_file(filename: str, agent: Agent) -> str:
|
||||
f"Executing python file '{filename}' in working directory '{agent.config.workspace_path}'"
|
||||
)
|
||||
|
||||
if not filename.endswith(".py"):
|
||||
if not str(filename).endswith(".py"):
|
||||
raise InvalidArgumentError("Invalid file type. Only .py files are allowed.")
|
||||
|
||||
file_path = Path(filename)
|
||||
file_path = filename
|
||||
if not file_path.is_file():
|
||||
# Mimic the response that you get from the command line so that it's easier to identify
|
||||
raise FileNotFoundError(
|
||||
|
||||
111
autogpt/commands/file_context.py
Normal file
111
autogpt/commands/file_context.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Commands to perform operations on files"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
COMMAND_CATEGORY = "file_operations"
|
||||
COMMAND_CATEGORY_TITLE = "File Operations"
|
||||
|
||||
import contextlib
|
||||
from pathlib import Path
|
||||
|
||||
from autogpt.agents.agent import Agent
|
||||
from autogpt.agents.utils.context import get_agent_context
|
||||
from autogpt.agents.utils.exceptions import (
|
||||
CommandExecutionError,
|
||||
DuplicateOperationError,
|
||||
)
|
||||
from autogpt.command_decorator import command
|
||||
from autogpt.models.context_item import FileContextItem, FolderContextItem
|
||||
|
||||
from .decorators import sanitize_path_arg
|
||||
|
||||
|
||||
@command(
|
||||
"open_file",
|
||||
"Open a file for editing, creating it if it does not exist yet",
|
||||
{
|
||||
"file_path": {
|
||||
"type": "string",
|
||||
"description": "The path of the file to open",
|
||||
"required": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
@sanitize_path_arg("file_path")
|
||||
def open_file(file_path: Path, agent: Agent) -> tuple[str, FileContextItem]:
|
||||
"""Open a file and return a context item
|
||||
|
||||
Args:
|
||||
file_path (Path): The path of the file to open
|
||||
|
||||
Returns:
|
||||
str: A status message indicating what happened
|
||||
FileContextItem: A ContextItem representing the opened file
|
||||
"""
|
||||
# Try to make the file path relative
|
||||
with contextlib.suppress(ValueError):
|
||||
file_path = file_path.relative_to(agent.workspace.root)
|
||||
|
||||
if (agent_context := get_agent_context(agent)) is None:
|
||||
raise NotImplementedError(
|
||||
f"{agent.__class__.__name__} does not implement context"
|
||||
)
|
||||
|
||||
created = False
|
||||
if not file_path.exists():
|
||||
file_path.touch()
|
||||
created = True
|
||||
elif not file_path.is_file():
|
||||
raise CommandExecutionError(f"{file_path} exists but is not a file")
|
||||
|
||||
file = FileContextItem(file_path)
|
||||
if file in agent_context:
|
||||
raise DuplicateOperationError(f"The file {file_path} is already open")
|
||||
|
||||
return (
|
||||
f"File {file}{' created,' if created else ''} opened and added to context ✅",
|
||||
file,
|
||||
)
|
||||
|
||||
|
||||
@command(
|
||||
"open_folder",
|
||||
"Open a folder to keep track of its content",
|
||||
{
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "The path of the folder to open",
|
||||
"required": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
@sanitize_path_arg("path")
|
||||
def open_folder(path: Path, agent: Agent) -> tuple[str, FolderContextItem]:
|
||||
"""Open a folder and return a context item
|
||||
|
||||
Args:
|
||||
path (Path): The path of the folder to open
|
||||
|
||||
Returns:
|
||||
str: A status message indicating what happened
|
||||
FolderContextItem: A ContextItem representing the opened folder
|
||||
"""
|
||||
# Try to make the path relative
|
||||
with contextlib.suppress(ValueError):
|
||||
path = path.relative_to(agent.workspace.root)
|
||||
|
||||
if (agent_context := get_agent_context(agent)) is None:
|
||||
raise NotImplementedError(
|
||||
f"{agent.__class__.__name__} does not implement context"
|
||||
)
|
||||
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"open_folder {path} failed: no such file or directory")
|
||||
elif not path.is_dir():
|
||||
raise CommandExecutionError(f"{path} exists but is not a folder")
|
||||
|
||||
folder = FolderContextItem(path)
|
||||
if folder in agent_context:
|
||||
raise DuplicateOperationError(f"The folder {path} is already open")
|
||||
|
||||
return f"Folder {folder} opened and added to context ✅", folder
|
||||
@@ -80,49 +80,49 @@ def file_operations_state(log_path: str | Path) -> dict[str, str]:
|
||||
return state
|
||||
|
||||
|
||||
@sanitize_path_arg("filename")
|
||||
@sanitize_path_arg("file_path")
|
||||
def is_duplicate_operation(
|
||||
operation: Operation, filename: str, agent: Agent, checksum: str | None = None
|
||||
operation: Operation, file_path: Path, agent: Agent, checksum: str | None = None
|
||||
) -> bool:
|
||||
"""Check if the operation has already been performed
|
||||
|
||||
Args:
|
||||
operation: The operation to check for
|
||||
filename: The name of the file to check for
|
||||
file_path: The name of the file to check for
|
||||
agent: The agent
|
||||
checksum: The checksum of the contents to be written
|
||||
|
||||
Returns:
|
||||
True if the operation has already been performed on the file
|
||||
"""
|
||||
# Make the filename into a relative path if possible
|
||||
# Make the file path into a relative path if possible
|
||||
with contextlib.suppress(ValueError):
|
||||
filename = str(Path(filename).relative_to(agent.workspace.root))
|
||||
file_path = file_path.relative_to(agent.workspace.root)
|
||||
|
||||
state = file_operations_state(agent.config.file_logger_path)
|
||||
if operation == "delete" and filename not in state:
|
||||
if operation == "delete" and str(file_path) not in state:
|
||||
return True
|
||||
if operation == "write" and state.get(filename) == checksum:
|
||||
if operation == "write" and state.get(str(file_path)) == checksum:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@sanitize_path_arg("filename")
|
||||
@sanitize_path_arg("file_path")
|
||||
def log_operation(
|
||||
operation: Operation, filename: str, agent: Agent, checksum: str | None = None
|
||||
operation: Operation, file_path: Path, agent: Agent, checksum: str | None = None
|
||||
) -> None:
|
||||
"""Log the file operation to the file_logger.txt
|
||||
|
||||
Args:
|
||||
operation: The operation to log
|
||||
filename: The name of the file the operation was performed on
|
||||
file_path: The name of the file the operation was performed on
|
||||
checksum: The checksum of the contents to be written
|
||||
"""
|
||||
# Make the filename into a relative path if possible
|
||||
# Make the file path into a relative path if possible
|
||||
with contextlib.suppress(ValueError):
|
||||
filename = str(Path(filename).relative_to(agent.workspace.root))
|
||||
file_path = file_path.relative_to(agent.workspace.root)
|
||||
|
||||
log_entry = f"{operation}: {filename}"
|
||||
log_entry = f"{operation}: {file_path}"
|
||||
if checksum is not None:
|
||||
log_entry += f" #{checksum}"
|
||||
logger.debug(f"Logging file operation: {log_entry}")
|
||||
@@ -143,11 +143,11 @@ def log_operation(
|
||||
},
|
||||
)
|
||||
@sanitize_path_arg("filename")
|
||||
def read_file(filename: str, agent: Agent) -> str:
|
||||
def read_file(filename: Path, agent: Agent) -> str:
|
||||
"""Read a file and return the contents
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to read
|
||||
filename (Path): The name of the file to read
|
||||
|
||||
Returns:
|
||||
str: The contents of the file
|
||||
@@ -155,7 +155,7 @@ def read_file(filename: str, agent: Agent) -> str:
|
||||
content = read_textual_file(filename, logger)
|
||||
|
||||
# TODO: invalidate/update memory when file is edited
|
||||
file_memory = MemoryItem.from_text_file(content, filename, agent.config)
|
||||
file_memory = MemoryItem.from_text_file(content, str(filename), agent.config)
|
||||
if len(file_memory.chunks) > 1:
|
||||
return file_memory.summary
|
||||
|
||||
@@ -206,11 +206,11 @@ def ingest_file(
|
||||
aliases=["write_file", "create_file"],
|
||||
)
|
||||
@sanitize_path_arg("filename")
|
||||
def write_to_file(filename: str, text: str, agent: Agent) -> str:
|
||||
def write_to_file(filename: Path, text: str, agent: Agent) -> str:
|
||||
"""Write text to a file
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to write to
|
||||
filename (Path): The name of the file to write to
|
||||
text (str): The text to write to the file
|
||||
|
||||
Returns:
|
||||
@@ -230,12 +230,12 @@ def write_to_file(filename: str, text: str, agent: Agent) -> str:
|
||||
|
||||
@sanitize_path_arg("filename")
|
||||
def append_to_file(
|
||||
filename: str, text: str, agent: Agent, should_log: bool = True
|
||||
filename: Path, text: str, agent: Agent, should_log: bool = True
|
||||
) -> str:
|
||||
"""Append text to a file
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to append to
|
||||
filename (Path): The name of the file to append to
|
||||
text (str): The text to append to the file
|
||||
should_log (bool): Should log output
|
||||
|
||||
@@ -267,11 +267,11 @@ def append_to_file(
|
||||
},
|
||||
)
|
||||
@sanitize_path_arg("directory")
|
||||
def list_files(directory: str, agent: Agent) -> list[str]:
|
||||
def list_files(directory: Path, agent: Agent) -> list[str]:
|
||||
"""lists files in a directory recursively
|
||||
|
||||
Args:
|
||||
directory (str): The directory to search in
|
||||
directory (Path): The directory to search in
|
||||
|
||||
Returns:
|
||||
list[str]: A list of files found in the directory
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import charset_normalizer
|
||||
import docx
|
||||
@@ -14,13 +15,13 @@ from autogpt.logs import logger
|
||||
|
||||
|
||||
class ParserStrategy:
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
# Basic text file reading
|
||||
class TXTParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
charset_match = charset_normalizer.from_path(file_path).best()
|
||||
logger.debug(f"Reading '{file_path}' with encoding '{charset_match.encoding}'")
|
||||
return str(charset_match)
|
||||
@@ -28,7 +29,7 @@ class TXTParser(ParserStrategy):
|
||||
|
||||
# Reading text from binary file using pdf parser
|
||||
class PDFParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
parser = PyPDF2.PdfReader(file_path)
|
||||
text = ""
|
||||
for page_idx in range(len(parser.pages)):
|
||||
@@ -38,7 +39,7 @@ class PDFParser(ParserStrategy):
|
||||
|
||||
# Reading text from binary file using docs parser
|
||||
class DOCXParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
doc_file = docx.Document(file_path)
|
||||
text = ""
|
||||
for para in doc_file.paragraphs:
|
||||
@@ -48,7 +49,7 @@ class DOCXParser(ParserStrategy):
|
||||
|
||||
# Reading as dictionary and returning string format
|
||||
class JSONParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
data = json.load(f)
|
||||
text = str(data)
|
||||
@@ -56,7 +57,7 @@ class JSONParser(ParserStrategy):
|
||||
|
||||
|
||||
class XMLParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
soup = BeautifulSoup(f, "xml")
|
||||
text = soup.get_text()
|
||||
@@ -65,7 +66,7 @@ class XMLParser(ParserStrategy):
|
||||
|
||||
# Reading as dictionary and returning string format
|
||||
class YAMLParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
data = yaml.load(f, Loader=yaml.FullLoader)
|
||||
text = str(data)
|
||||
@@ -73,7 +74,7 @@ class YAMLParser(ParserStrategy):
|
||||
|
||||
|
||||
class HTMLParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
soup = BeautifulSoup(f, "html.parser")
|
||||
text = soup.get_text()
|
||||
@@ -81,7 +82,7 @@ class HTMLParser(ParserStrategy):
|
||||
|
||||
|
||||
class MarkdownParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
html = markdown.markdown(f.read())
|
||||
text = "".join(BeautifulSoup(html, "html.parser").findAll(string=True))
|
||||
@@ -89,7 +90,7 @@ class MarkdownParser(ParserStrategy):
|
||||
|
||||
|
||||
class LaTeXParser(ParserStrategy):
|
||||
def read(self, file_path: str) -> str:
|
||||
def read(self, file_path: Path) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
latex = f.read()
|
||||
text = LatexNodes2Text().latex_to_text(latex)
|
||||
@@ -128,7 +129,7 @@ extension_to_parser = {
|
||||
}
|
||||
|
||||
|
||||
def is_file_binary_fn(file_path: str):
|
||||
def is_file_binary_fn(file_path: Path):
|
||||
"""Given a file path load all its content and checks if the null bytes is present
|
||||
|
||||
Args:
|
||||
@@ -144,11 +145,18 @@ def is_file_binary_fn(file_path: str):
|
||||
return False
|
||||
|
||||
|
||||
def read_textual_file(file_path: str, logger: logs.Logger) -> str:
|
||||
if not os.path.isfile(file_path):
|
||||
raise FileNotFoundError(
|
||||
f"read_file {file_path} failed: no such file or directory"
|
||||
)
|
||||
def read_textual_file(file_path: Path, logger: logs.Logger) -> str:
|
||||
if not file_path.is_absolute():
|
||||
raise ValueError("File path must be absolute")
|
||||
|
||||
if not file_path.is_file():
|
||||
if not file_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"read_file {file_path} failed: no such file or directory"
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"read_file failed: {file_path} is not a file")
|
||||
|
||||
is_binary = is_file_binary_fn(file_path)
|
||||
file_extension = os.path.splitext(file_path)[1].lower()
|
||||
parser = extension_to_parser.get(file_extension)
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
COMMAND_CATEGORY = "git_operations"
|
||||
COMMAND_CATEGORY_TITLE = "Git Operations"
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from git.repo import Repo
|
||||
|
||||
from autogpt.agents.agent import Agent
|
||||
@@ -33,12 +35,12 @@ from .decorators import sanitize_path_arg
|
||||
)
|
||||
@sanitize_path_arg("clone_path")
|
||||
@validate_url
|
||||
def clone_repository(url: str, clone_path: str, agent: Agent) -> str:
|
||||
def clone_repository(url: str, clone_path: Path, agent: Agent) -> str:
|
||||
"""Clone a GitHub repository locally.
|
||||
|
||||
Args:
|
||||
url (str): The URL of the repository to clone.
|
||||
clone_path (str): The path to clone the repository to.
|
||||
clone_path (Path): The path to clone the repository to.
|
||||
|
||||
Returns:
|
||||
str: The result of the clone operation.
|
||||
|
||||
@@ -3,6 +3,8 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from autogpt.commands.file_operations_utils import read_textual_file
|
||||
|
||||
|
||||
class ContextItem(ABC):
|
||||
@property
|
||||
@@ -35,15 +37,18 @@ class ContextItem(ABC):
|
||||
@dataclass
|
||||
class FileContextItem(ContextItem):
|
||||
file_path: Path
|
||||
description: str
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return f"The current content of the file '{self.file_path}'"
|
||||
|
||||
@property
|
||||
def source(self) -> str:
|
||||
return f"local file '{self.file_path}'"
|
||||
return str(self.file_path)
|
||||
|
||||
@property
|
||||
def content(self) -> str:
|
||||
return self.file_path.read_text()
|
||||
return read_textual_file(self.file_path)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -60,7 +65,7 @@ class FolderContextItem(ContextItem):
|
||||
|
||||
@property
|
||||
def source(self) -> str:
|
||||
return f"local folder '{self.path}'"
|
||||
return str(self.path)
|
||||
|
||||
@property
|
||||
def content(self) -> str:
|
||||
|
||||
Reference in New Issue
Block a user