Agent loop v2: Prompting improvements & WIP planning (#5077)

* Add categories to command registry

* Fix tests

* Clean up prompt generation

* Rename Performance Evaluations to Best Practices
* Move specification of response format from system prompt to Agent.construct_base_prompt
* Clean up PromptGenerator class

* Add debug logging to AIConfig autogeneration

* Clarify prompting and add support for multiple thought processes to Agent

* WIP: PlanningAgent

* Disable message history by default on BaseAgent

* Add CommandOutput and ThoughtProcessOutput type aliases

* Fix interrupts in main.py

* Use custom exceptions and clean up exception/error handling

* Remove duplicate agent_history.py

* Update PlanningAgent from upstream

* WIP: Support for dynamic in-prompt context

* WIP: response formats for PlanningAgent three-stage cycle

* Remove browsing overlay & separate browsing from extraction code

* Fix human feedback

* Fix tests

* Include history in Agent prompt generation

* Code improvements in agent.py

* Add ask_user command and revise system prompt
This commit is contained in:
Reinier van der Leer
2023-08-19 17:44:50 +02:00
committed by GitHub
parent 4dd75ca083
commit 3fe2246468
29 changed files with 1048 additions and 375 deletions

View File

@@ -1,4 +1,3 @@
import os
import sys
from pathlib import Path
from typing import Tuple

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import json
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from autogpt.config import AIConfig, Config
@@ -11,6 +11,12 @@ if TYPE_CHECKING:
from autogpt.memory.vector import VectorMemory
from autogpt.models.command_registry import CommandRegistry
from autogpt.agents.utils.exceptions import (
AgentException,
CommandExecutionError,
InvalidAgentResponseError,
UnknownCommandError,
)
from autogpt.json_utils.utilities import extract_dict_from_response, validate_dict
from autogpt.llm.api_manager import ApiManager
from autogpt.llm.base import Message
@@ -23,9 +29,17 @@ from autogpt.logs.log_cycle import (
USER_INPUT_FILE_NAME,
LogCycleHandler,
)
from autogpt.models.agent_actions import (
ActionErrorResult,
ActionInterruptedByHuman,
ActionResult,
ActionSuccessResult,
)
from autogpt.models.command import CommandOutput
from autogpt.models.context_item import ContextItem
from autogpt.workspace import Workspace
from .base import AgentThoughts, BaseAgent, CommandArgs, CommandName
from .base import BaseAgent
class Agent(BaseAgent):
@@ -97,6 +111,9 @@ class Agent(BaseAgent):
kwargs["append_messages"] = []
kwargs["append_messages"].append(budget_msg)
# Include message history in base prompt
kwargs["with_message_history"] = True
return super().construct_base_prompt(*args, **kwargs)
def on_before_think(self, *args, **kwargs) -> ChatSequence:
@@ -121,15 +138,19 @@ class Agent(BaseAgent):
def execute(
self,
command_name: str | None,
command_args: dict[str, str] | None,
user_input: str | None,
) -> str:
# Execute command
if command_name is not None and command_name.lower().startswith("error"):
result = f"Could not execute command: {command_name}{command_args}"
elif command_name == "human_feedback":
result = f"Human feedback: {user_input}"
command_name: str,
command_args: dict[str, str] = {},
user_input: str = "",
) -> ActionResult:
result: ActionResult
if command_name == "human_feedback":
result = ActionInterruptedByHuman(user_input)
self.history.add(
"user",
"I interrupted the execution of the command you proposed "
f"to give you some feedback: {user_input}",
)
self.log_cycle_handler.log_cycle(
self.ai_config.ai_name,
self.created_at,
@@ -143,65 +164,101 @@ class Agent(BaseAgent):
if not plugin.can_handle_pre_command():
continue
command_name, arguments = plugin.pre_command(command_name, command_args)
command_result = execute_command(
command_name=command_name,
arguments=command_args,
agent=self,
)
result = f"Command {command_name} returned: " f"{command_result}"
result_tlength = count_string_tokens(str(command_result), self.llm.name)
try:
return_value = execute_command(
command_name=command_name,
arguments=command_args,
agent=self,
)
# Intercept ContextItem if one is returned by the command
if type(return_value) == tuple and isinstance(
return_value[1], ContextItem
):
context_item = return_value[1]
# return_value = return_value[0]
logger.debug(
f"Command {command_name} returned a ContextItem: {context_item}"
)
# self.context.add(context_item)
# HACK: use content of ContextItem as return value, for legacy support
return_value = context_item.content
result = ActionSuccessResult(return_value)
except AgentException as e:
result = ActionErrorResult(e.message, e)
logger.debug(f"Command result: {result}")
result_tlength = count_string_tokens(str(result), self.llm.name)
memory_tlength = count_string_tokens(
str(self.history.summary_message()), self.llm.name
)
if result_tlength + memory_tlength > self.send_token_limit:
result = f"Failure: command {command_name} returned too much output. \
Do not execute this command again with the same arguments."
result = ActionErrorResult(
reason=f"Command {command_name} returned too much output. "
"Do not execute this command again with the same arguments."
)
for plugin in self.config.plugins:
if not plugin.can_handle_post_command():
continue
result = plugin.post_command(command_name, result)
if result.status == "success":
result.results = plugin.post_command(command_name, result.results)
elif result.status == "error":
result.reason = plugin.post_command(command_name, result.reason)
# Check if there's a result from the command append it to the message
if result is None:
self.history.add("system", "Unable to execute command", "action_result")
else:
self.history.add("system", result, "action_result")
if result.status == "success":
self.history.add(
"system",
f"Command {command_name} returned: {result.results}",
"action_result",
)
elif result.status == "error":
message = f"Command {command_name} failed: {result.reason}"
# Append hint to the error message if the exception has a hint
if (
result.error
and isinstance(result.error, AgentException)
and result.error.hint
):
message = message.rstrip(".") + f". {result.error.hint}"
self.history.add("system", message, "action_result")
return result
def parse_and_process_response(
self, llm_response: ChatModelResponse, *args, **kwargs
) -> tuple[CommandName | None, CommandArgs | None, AgentThoughts]:
) -> Agent.ThoughtProcessOutput:
if not llm_response.content:
raise SyntaxError("Assistant response has no text content")
raise InvalidAgentResponseError("Assistant response has no text content")
assistant_reply_dict = extract_dict_from_response(llm_response.content)
valid, errors = validate_dict(assistant_reply_dict, self.config)
if not valid:
raise SyntaxError(
"Validation of response failed:\n "
+ ";\n ".join([str(e) for e in errors])
)
response_content = llm_response.content
for plugin in self.config.plugins:
if not plugin.can_handle_post_planning():
continue
assistant_reply_dict = plugin.post_planning(assistant_reply_dict)
response_content = plugin.post_planning(response_content)
response = None, None, assistant_reply_dict
assistant_reply_dict = extract_dict_from_response(response_content)
# Print Assistant thoughts
if assistant_reply_dict != {}:
# Get command name and arguments
try:
command_name, arguments = extract_command(
assistant_reply_dict, llm_response, self.config
)
response = command_name, arguments, assistant_reply_dict
except Exception as e:
logger.error("Error: \n", str(e))
_, errors = validate_dict(assistant_reply_dict, self.config)
if errors:
raise InvalidAgentResponseError(
"Validation of response failed:\n "
+ ";\n ".join([str(e) for e in errors])
)
# Get command name and arguments
command_name, arguments = extract_command(
assistant_reply_dict, llm_response, self.config
)
response = command_name, arguments, assistant_reply_dict
self.log_cycle_handler.log_cycle(
self.ai_config.ai_name,
@@ -233,29 +290,26 @@ def extract_command(
"""
if config.openai_functions:
if assistant_reply.function_call is None:
return "Error:", {"message": "No 'function_call' in assistant reply"}
raise InvalidAgentResponseError("No 'function_call' in assistant reply")
assistant_reply_json["command"] = {
"name": assistant_reply.function_call.name,
"args": json.loads(assistant_reply.function_call.arguments),
}
try:
if "command" not in assistant_reply_json:
return "Error:", {"message": "Missing 'command' object in JSON"}
if not isinstance(assistant_reply_json, dict):
return (
"Error:",
{
"message": f"The previous message sent was not a dictionary {assistant_reply_json}"
},
raise InvalidAgentResponseError(
f"The previous message sent was not a dictionary {assistant_reply_json}"
)
if "command" not in assistant_reply_json:
raise InvalidAgentResponseError("Missing 'command' object in JSON")
command = assistant_reply_json["command"]
if not isinstance(command, dict):
return "Error:", {"message": "'command' object is not a dictionary"}
raise InvalidAgentResponseError("'command' object is not a dictionary")
if "name" not in command:
return "Error:", {"message": "Missing 'name' field in 'command' object"}
raise InvalidAgentResponseError("Missing 'name' field in 'command' object")
command_name = command["name"]
@@ -263,18 +317,19 @@ def extract_command(
arguments = command.get("args", {})
return command_name, arguments
except json.decoder.JSONDecodeError:
return "Error:", {"message": "Invalid JSON"}
# All other errors, return "Error: + error message"
raise InvalidAgentResponseError("Invalid JSON")
except Exception as e:
return "Error:", {"message": str(e)}
raise InvalidAgentResponseError(str(e))
def execute_command(
command_name: str,
arguments: dict[str, str],
agent: Agent,
) -> Any:
) -> CommandOutput:
"""Execute the command and return the result
Args:
@@ -285,22 +340,28 @@ def execute_command(
Returns:
str: The result of the command
"""
try:
# Execute a native command with the same name or alias, if it exists
if command := agent.command_registry.get_command(command_name):
# Execute a native command with the same name or alias, if it exists
if command := agent.command_registry.get_command(command_name):
try:
return command(**arguments, agent=agent)
except AgentException:
raise
except Exception as e:
raise CommandExecutionError(str(e))
# Handle non-native commands (e.g. from plugins)
for command in agent.ai_config.prompt_generator.commands:
if (
command_name == command.label.lower()
or command_name == command.name.lower()
):
# Handle non-native commands (e.g. from plugins)
for command in agent.ai_config.prompt_generator.commands:
if (
command_name == command.label.lower()
or command_name == command.name.lower()
):
try:
return command.function(**arguments)
except AgentException:
raise
except Exception as e:
raise CommandExecutionError(str(e))
raise RuntimeError(
f"Cannot execute '{command_name}': unknown command."
" Do not try to use this command again."
)
except Exception as e:
return f"Error: {str(e)}"
raise UnknownCommandError(
f"Cannot execute command '{command_name}': unknown command."
)

View File

@@ -9,11 +9,13 @@ if TYPE_CHECKING:
from autogpt.models.command_registry import CommandRegistry
from autogpt.agents.utils.exceptions import InvalidAgentResponseError
from autogpt.llm.base import ChatModelResponse, ChatSequence, Message
from autogpt.llm.providers.openai import OPEN_AI_CHAT_MODELS, get_openai_command_specs
from autogpt.llm.utils import count_message_tokens, create_chat_completion
from autogpt.logs import logger
from autogpt.memory.message_history import MessageHistory
from autogpt.models.agent_actions import ActionResult
from autogpt.prompts.prompt import DEFAULT_TRIGGERING_PROMPT
CommandName = str
@@ -25,6 +27,7 @@ class BaseAgent(metaclass=ABCMeta):
"""Base class for all Auto-GPT agents."""
ThoughtProcessID = Literal["one-shot"]
ThoughtProcessOutput = tuple[CommandName, CommandArgs, AgentThoughts]
def __init__(
self,
@@ -95,7 +98,7 @@ class BaseAgent(metaclass=ABCMeta):
self,
instruction: Optional[str] = None,
thought_process_id: ThoughtProcessID = "one-shot",
) -> tuple[CommandName | None, CommandArgs | None, AgentThoughts]:
) -> ThoughtProcessOutput:
"""Runs the agent for one cycle.
Params:
@@ -123,10 +126,10 @@ class BaseAgent(metaclass=ABCMeta):
@abstractmethod
def execute(
self,
command_name: str | None,
command_args: dict[str, str] | None,
user_input: str | None,
) -> str:
command_name: str,
command_args: dict[str, str] = {},
user_input: str = "",
) -> ActionResult:
"""Executes the given command, if any, and returns the agent's response.
Params:
@@ -145,6 +148,7 @@ class BaseAgent(metaclass=ABCMeta):
prepend_messages: list[Message] = [],
append_messages: list[Message] = [],
reserve_tokens: int = 0,
with_message_history: bool = False,
) -> ChatSequence:
"""Constructs and returns a prompt with the following structure:
1. System prompt
@@ -163,20 +167,23 @@ class BaseAgent(metaclass=ABCMeta):
[Message("system", self.system_prompt)] + prepend_messages,
)
# Reserve tokens for messages to be appended later, if any
reserve_tokens += self.history.max_summary_tlength
if append_messages:
reserve_tokens += count_message_tokens(append_messages, self.llm.name)
if with_message_history:
# Reserve tokens for messages to be appended later, if any
reserve_tokens += self.history.max_summary_tlength
if append_messages:
reserve_tokens += count_message_tokens(append_messages, self.llm.name)
# Fill message history, up to a margin of reserved_tokens.
# Trim remaining historical messages and add them to the running summary.
history_start_index = len(prompt)
trimmed_history = add_history_upto_token_limit(
prompt, self.history, self.send_token_limit - reserve_tokens
)
if trimmed_history:
new_summary_msg, _ = self.history.trim_messages(list(prompt), self.config)
prompt.insert(history_start_index, new_summary_msg)
# Fill message history, up to a margin of reserved_tokens.
# Trim remaining historical messages and add them to the running summary.
history_start_index = len(prompt)
trimmed_history = add_history_upto_token_limit(
prompt, self.history, self.send_token_limit - reserve_tokens
)
if trimmed_history:
new_summary_msg, _ = self.history.trim_messages(
list(prompt), self.config
)
prompt.insert(history_start_index, new_summary_msg)
if append_messages:
prompt.extend(append_messages)
@@ -323,7 +330,7 @@ class BaseAgent(metaclass=ABCMeta):
thought_process_id: ThoughtProcessID,
prompt: ChatSequence,
instruction: str,
) -> tuple[CommandName | None, CommandArgs | None, AgentThoughts]:
) -> ThoughtProcessOutput:
"""Called upon receiving a response from the chat model.
Adds the last/newest message in the prompt and the response to `history`,
@@ -348,15 +355,14 @@ class BaseAgent(metaclass=ABCMeta):
return self.parse_and_process_response(
llm_response, thought_process_id, prompt, instruction
)
except SyntaxError as e:
logger.error(f"Response could not be parsed: {e}")
except InvalidAgentResponseError as e:
# TODO: tune this message
self.history.add(
"system",
f"Your response could not be parsed: {e}"
"\n\nRemember to only respond using the specified format above!",
)
return None, None, {}
raise
# TODO: update memory/context
@@ -367,7 +373,7 @@ class BaseAgent(metaclass=ABCMeta):
thought_process_id: ThoughtProcessID,
prompt: ChatSequence,
instruction: str,
) -> tuple[CommandName | None, CommandArgs | None, AgentThoughts]:
) -> ThoughtProcessOutput:
"""Validate, parse & process the LLM's response.
Must be implemented by derivative classes: no base implementation is provided,

View File

@@ -0,0 +1,366 @@
from __future__ import annotations
import re
from datetime import datetime
from typing import TYPE_CHECKING, Literal, Optional
if TYPE_CHECKING:
from autogpt.config import AIConfig, Config
from autogpt.llm.base import ChatModelResponse, ChatSequence
from autogpt.memory.vector import VectorMemory
from autogpt.models.command_registry import CommandRegistry
from autogpt.agents.utils.exceptions import AgentException, InvalidAgentResponseError
from autogpt.json_utils.utilities import extract_dict_from_response, validate_dict
from autogpt.llm.base import Message
from autogpt.llm.utils import count_string_tokens
from autogpt.logs import logger
from autogpt.logs.log_cycle import (
CURRENT_CONTEXT_FILE_NAME,
NEXT_ACTION_FILE_NAME,
USER_INPUT_FILE_NAME,
LogCycleHandler,
)
from autogpt.models.agent_actions import (
ActionErrorResult,
ActionHistory,
ActionInterruptedByHuman,
ActionResult,
ActionSuccessResult,
)
from autogpt.models.context_item import ContextItem
from autogpt.workspace import Workspace
from .agent import execute_command, extract_command
from .base import BaseAgent
from .utils.context import AgentContext
class PlanningAgent(BaseAgent):
"""Agent class for interacting with Auto-GPT."""
ThoughtProcessID = Literal["plan", "action", "evaluate"]
def __init__(
self,
ai_config: AIConfig,
command_registry: CommandRegistry,
memory: VectorMemory,
triggering_prompt: str,
config: Config,
cycle_budget: Optional[int] = None,
):
super().__init__(
ai_config=ai_config,
command_registry=command_registry,
config=config,
default_cycle_instruction=triggering_prompt,
cycle_budget=cycle_budget,
)
self.memory = memory
"""VectorMemoryProvider used to manage the agent's context (TODO)"""
self.workspace = Workspace(config.workspace_path, config.restrict_to_workspace)
"""Workspace that the agent has access to, e.g. for reading/writing files."""
self.created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
"""Timestamp the agent was created; only used for structured debug logging."""
self.log_cycle_handler = LogCycleHandler()
"""LogCycleHandler for structured debug logging."""
self.action_history = ActionHistory()
self.context = AgentContext()
"""Dynamic segment of the prompt, to provide the LLM with relevant context"""
self.plan: list[str] = []
"""List of steps that the Agent plans to take"""
def construct_base_prompt(
self, thought_process_id: ThoughtProcessID, **kwargs
) -> ChatSequence:
prepend_messages = kwargs["prepend_messages"] = kwargs.get(
"prepend_messages", []
)
# Add the current plan to the prompt, if any
if self.plan:
plan_section = [
"## Plan",
"To complete your task, you have composed the following plan:",
]
plan_section += [f"{i}. {s}" for i, s in enumerate(self.plan, 1)]
# Add the actions so far to the prompt
if self.action_history:
plan_section += [
"\n### Progress",
"So far, you have executed the following actions based on the plan:",
]
for i, cycle in enumerate(self.action_history, 1):
if not (cycle.action and cycle.result):
logger.warn(f"Incomplete action in history: {cycle}")
continue
plan_section.append(
f"{i}. You executed the command `{cycle.action.format_call()}`, "
f"which gave the result `{cycle.result}`."
)
prepend_messages.append(Message("system", "\n".join(plan_section)))
if self.context:
context_section = [
"## Context",
"Below is information that may be relevant to your task. These take up "
"part of your working memory, which is limited, so when a context item is "
"no longer relevant for your plan, use the `close_context_item` command to "
"free up some memory."
"\n",
self.context.format_numbered(),
]
prepend_messages.append(Message("system", "\n".join(context_section)))
match thought_process_id:
case "plan":
# TODO: add planning instructions; details about what to pay attention to when planning
pass
case "action":
# TODO: need to insert the functions here again?
pass
case "evaluate":
# TODO: insert latest action (with reasoning) + result + evaluation instructions
pass
case _:
raise NotImplementedError(
f"Unknown thought process '{thought_process_id}'"
)
return super().construct_base_prompt(
thought_process_id=thought_process_id, **kwargs
)
def response_format_instruction(self, thought_process_id: ThoughtProcessID) -> str:
match thought_process_id:
case "plan":
# TODO: add planning instructions; details about what to pay attention to when planning
response_format = f"""```ts
interface Response {{
thoughts: {{
// Thoughts
text: string;
// A short logical explanation about how the action is part of the earlier composed plan
reasoning: string;
// Constructive self-criticism
criticism: string;
}};
// A plan to achieve the goals with the available resources and/or commands.
plan: Array<{{
// An actionable subtask
subtask: string;
// Criterium to determine whether the subtask has been completed
completed_if: string;
}}>;
}}
```"""
pass
case "action":
# TODO: need to insert the functions here again?
response_format = """```ts
interface Response {
thoughts: {
// Thoughts
text: string;
// A short logical explanation about how the action is part of the earlier composed plan
reasoning: string;
// Constructive self-criticism
criticism: string;
};
// The action to take, from the earlier specified list of commands
command: {
name: string;
args: Record<string, any>;
};
}
```"""
pass
case "evaluate":
# TODO: insert latest action (with reasoning) + result + evaluation instructions
response_format = f"""```ts
interface Response {{
thoughts: {{
// Thoughts
text: string;
reasoning: string;
// Constructive self-criticism
criticism: string;
}};
result_evaluation: {{
// A short logical explanation of why the given partial result does or does not complete the corresponding subtask
reasoning: string;
// Whether the current subtask has been completed
completed: boolean;
// An estimate of the progress (0.0 - 1.0) that has been made on the subtask with the actions that have been taken so far
progress: float;
}};
}}
```"""
pass
case _:
raise NotImplementedError(
f"Unknown thought process '{thought_process_id}'"
)
response_format = re.sub(
r"\n\s+",
"\n",
response_format,
)
return (
f"Respond strictly with JSON. The JSON should be compatible with "
"the TypeScript type `Response` from the following:\n"
f"{response_format}\n"
)
def on_before_think(self, *args, **kwargs) -> ChatSequence:
prompt = super().on_before_think(*args, **kwargs)
self.log_cycle_handler.log_count_within_cycle = 0
self.log_cycle_handler.log_cycle(
self.ai_config.ai_name,
self.created_at,
self.cycle_count,
self.action_history.cycles,
"action_history.json",
)
self.log_cycle_handler.log_cycle(
self.ai_config.ai_name,
self.created_at,
self.cycle_count,
prompt.raw(),
CURRENT_CONTEXT_FILE_NAME,
)
return prompt
def execute(
self,
command_name: str,
command_args: dict[str, str] = {},
user_input: str = "",
) -> ActionResult:
result: ActionResult
if command_name == "human_feedback":
result = ActionInterruptedByHuman(user_input)
self.log_cycle_handler.log_cycle(
self.ai_config.ai_name,
self.created_at,
self.cycle_count,
user_input,
USER_INPUT_FILE_NAME,
)
else:
for plugin in self.config.plugins:
if not plugin.can_handle_pre_command():
continue
command_name, arguments = plugin.pre_command(command_name, command_args)
try:
return_value = execute_command(
command_name=command_name,
arguments=command_args,
agent=self,
)
# Intercept ContextItem if one is returned by the command
if type(return_value) == tuple and isinstance(
return_value[1], ContextItem
):
self.context.add(return_value[1])
return_value = return_value[0]
result = ActionSuccessResult(return_value)
except AgentException as e:
result = ActionErrorResult(e.message, e)
result_tlength = count_string_tokens(str(result), self.llm.name)
memory_tlength = count_string_tokens(
str(self.history.summary_message()), self.llm.name
)
if result_tlength + memory_tlength > self.send_token_limit:
result = ActionErrorResult(
reason=f"Command {command_name} returned too much output. "
"Do not execute this command again with the same arguments."
)
for plugin in self.config.plugins:
if not plugin.can_handle_post_command():
continue
if result.status == "success":
result.results = plugin.post_command(command_name, result.results)
elif result.status == "error":
result.reason = plugin.post_command(command_name, result.reason)
# Check if there's a result from the command append it to the message
if result.status == "success":
self.history.add(
"system",
f"Command {command_name} returned: {result.results}",
"action_result",
)
elif result.status == "error":
message = f"Command {command_name} failed: {result.reason}"
if (
result.error
and isinstance(result.error, AgentException)
and result.error.hint
):
message = message.rstrip(".") + f". {result.error.hint}"
self.history.add("system", message, "action_result")
return result
def parse_and_process_response(
self,
llm_response: ChatModelResponse,
thought_process_id: ThoughtProcessID,
*args,
**kwargs,
) -> PlanningAgent.ThoughtProcessOutput:
if not llm_response.content:
raise InvalidAgentResponseError("Assistant response has no text content")
response_content = llm_response.content
for plugin in self.config.plugins:
if not plugin.can_handle_post_planning():
continue
response_content = plugin.post_planning(response_content)
assistant_reply_dict = extract_dict_from_response(response_content)
_, errors = validate_dict(assistant_reply_dict, self.config)
if errors:
raise InvalidAgentResponseError(
"Validation of response failed:\n "
+ ";\n ".join([str(e) for e in errors])
)
# Get command name and arguments
command_name, arguments = extract_command(
assistant_reply_dict, llm_response, self.config
)
response = command_name, arguments, assistant_reply_dict
self.log_cycle_handler.log_cycle(
self.ai_config.ai_name,
self.created_at,
self.cycle_count,
assistant_reply_dict,
NEXT_ACTION_FILE_NAME,
)
return response

View File

@@ -0,0 +1,23 @@
from autogpt.models.context_item import ContextItem
class AgentContext:
items: list[ContextItem]
def __init__(self, items: list[ContextItem] = []):
self.items = items
def __bool__(self) -> bool:
return len(self.items) > 0
def add(self, item: ContextItem) -> None:
self.items.append(item)
def close(self, index: int) -> None:
self.items.pop(index - 1)
def clear(self) -> None:
self.items.clear()
def format_numbered(self) -> str:
return "\n\n".join([f"{i}. {c}" for i, c in enumerate(self.items, 1)])

View File

@@ -0,0 +1,56 @@
from typing import Optional
class AgentException(Exception):
"""Base class for specific exceptions relevant in the execution of Agents"""
message: str
hint: Optional[str] = None
"""A hint which can be passed to the LLM to reduce reoccurrence of this error"""
def __init__(self, message: str, *args):
self.message = message
super().__init__(message, *args)
class ConfigurationError(AgentException):
"""Error caused by invalid, incompatible or otherwise incorrect configuration"""
class InvalidAgentResponseError(AgentException):
"""The LLM deviated from the prescribed response format"""
class UnknownCommandError(AgentException):
"""The AI tried to use an unknown command"""
hint = "Do not try to use this command again."
class DuplicateOperationError(AgentException):
"""The proposed operation has already been executed"""
class CommandExecutionError(AgentException):
"""An error occured when trying to execute the command"""
class InvalidArgumentError(CommandExecutionError):
"""The command received an invalid argument"""
class OperationNotAllowedError(CommandExecutionError):
"""The agent is not allowed to execute the proposed operation"""
class AccessDeniedError(CommandExecutionError):
"""The operation failed because access to a required resource was denied"""
class CodeExecutionError(CommandExecutionError):
"""The operation (an attempt to run arbitrary code) returned an error"""
class TooMuchOutputError(CommandExecutionError):
"""The operation generated more output than what the Agent can process"""

View File

@@ -11,6 +11,7 @@ from typing import Optional
from colorama import Fore, Style
from autogpt.agents import Agent, AgentThoughts, CommandArgs, CommandName
from autogpt.agents.utils.exceptions import InvalidAgentResponseError
from autogpt.app.configurator import create_config
from autogpt.app.setup import prompt_user
from autogpt.app.spinner import Spinner
@@ -174,7 +175,7 @@ def run_auto_gpt(
run_interaction_loop(agent)
def _get_cycle_budget(continuous_mode: bool, continuous_limit: int) -> int | None:
def _get_cycle_budget(continuous_mode: bool, continuous_limit: int) -> int | float:
# Translate from the continuous_mode/continuous_limit config
# to a cycle_budget (maximum number of cycles to run without checking in with the
# user) and a count of cycles_remaining before we check in..
@@ -217,10 +218,9 @@ def run_interaction_loop(
def graceful_agent_interrupt(signum: int, frame: Optional[FrameType]) -> None:
nonlocal cycle_budget, cycles_remaining, spinner
if cycles_remaining in [0, 1, math.inf]:
if cycles_remaining in [0, 1]:
logger.typewriter_log(
"Interrupt signal received. Stopping continuous command execution "
"immediately.",
"Interrupt signal received. Stopping Auto-GPT immediately.",
Fore.RED,
)
sys.exit()
@@ -244,6 +244,9 @@ def run_interaction_loop(
# Application Main Loop #
#########################
# Keep track of consecutive failures of the agent
consecutive_failures = 0
while cycles_remaining > 0:
logger.debug(f"Cycle budget: {cycle_budget}; remaining: {cycles_remaining}")
@@ -252,7 +255,20 @@ def run_interaction_loop(
########
# Have the agent determine the next action to take.
with spinner:
command_name, command_args, assistant_reply_dict = agent.think()
try:
command_name, command_args, assistant_reply_dict = agent.think()
except InvalidAgentResponseError as e:
logger.warn(f"The agent's thoughts could not be parsed: {e}")
consecutive_failures += 1
if consecutive_failures >= 3:
logger.error(
f"The agent failed to output valid thoughts {consecutive_failures} "
"times in a row. Terminating..."
)
sys.exit()
continue
consecutive_failures = 0
###############
# Update User #
@@ -298,7 +314,7 @@ def run_interaction_loop(
else: # user_feedback == UserFeedback.TEXT
command_name = "human_feedback"
else:
user_input = None
user_input = ""
# First log new-line so user can differentiate sections better in console
logger.typewriter_log("\n")
if cycles_remaining != math.inf:
@@ -315,19 +331,25 @@ def run_interaction_loop(
# and then having the decrement set it to 0, exiting the application.
if command_name != "human_feedback":
cycles_remaining -= 1
if not command_name:
continue
result = agent.execute(command_name, command_args, user_input)
if result is not None:
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result)
else:
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, "Unable to execute command")
if result.status == "success":
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result.results)
elif result.status == "error":
logger.warn(
f"Command {command_name} returned an error: {result.error or result.reason}"
)
def update_user(
config: Config,
ai_config: AIConfig,
command_name: CommandName | None,
command_args: CommandArgs | None,
command_name: CommandName,
command_args: CommandArgs,
assistant_reply_dict: AgentThoughts,
) -> None:
"""Prints the assistant's thoughts and the next command to the user.
@@ -342,32 +364,17 @@ def update_user(
print_assistant_thoughts(ai_config.ai_name, assistant_reply_dict, config)
if command_name is not None:
if command_name.lower().startswith("error"):
logger.typewriter_log(
"ERROR: ",
Fore.RED,
f"The Agent failed to select an action. "
f"Error message: {command_name}",
)
else:
if config.speak_mode:
say_text(f"I want to execute {command_name}", config)
if config.speak_mode:
say_text(f"I want to execute {command_name}", config)
# First log new-line so user can differentiate sections better in console
logger.typewriter_log("\n")
logger.typewriter_log(
"NEXT ACTION: ",
Fore.CYAN,
f"COMMAND = {Fore.CYAN}{remove_ansi_escape(command_name)}{Style.RESET_ALL} "
f"ARGUMENTS = {Fore.CYAN}{command_args}{Style.RESET_ALL}",
)
else:
logger.typewriter_log(
"NO ACTION SELECTED: ",
Fore.RED,
f"The Agent failed to select an action.",
)
# First log new-line so user can differentiate sections better in console
logger.typewriter_log("\n")
logger.typewriter_log(
"NEXT ACTION: ",
Fore.CYAN,
f"COMMAND = {Fore.CYAN}{remove_ansi_escape(command_name)}{Style.RESET_ALL} "
f"ARGUMENTS = {Fore.CYAN}{command_args}{Style.RESET_ALL}",
)
def get_user_feedback(

View File

@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, Callable, Optional, TypedDict
if TYPE_CHECKING:
from autogpt.config import Config
from autogpt.models.command import Command, CommandParameter
from autogpt.models.command import Command, CommandOutput, CommandParameter
# Unique identifier for auto-gpt commands
AUTO_GPT_COMMAND_IDENTIFIER = "auto_gpt_command"
@@ -25,10 +25,10 @@ def command(
enabled: bool | Callable[[Config], bool] = True,
disabled_reason: Optional[str] = None,
aliases: list[str] = [],
) -> Callable[..., Any]:
) -> Callable[..., CommandOutput]:
"""The command decorator is used to create Command objects from ordinary functions."""
def decorator(func: Callable[..., Any]) -> Command:
def decorator(func: Callable[..., CommandOutput]) -> Command:
typed_parameters = [
CommandParameter(
name=param_name,

View File

@@ -1,6 +1,7 @@
COMMAND_CATEGORIES = [
"autogpt.commands.execute_code",
"autogpt.commands.file_operations",
"autogpt.commands.user_interaction",
"autogpt.commands.web_search",
"autogpt.commands.web_selenium",
"autogpt.commands.system",

View File

@@ -12,6 +12,13 @@ from docker.errors import DockerException, ImageNotFound
from docker.models.containers import Container as DockerContainer
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import (
AccessDeniedError,
CodeExecutionError,
CommandExecutionError,
InvalidArgumentError,
OperationNotAllowedError,
)
from autogpt.command_decorator import command
from autogpt.config import Config
from autogpt.logs import logger
@@ -60,7 +67,9 @@ def execute_python_code(code: str, name: str, agent: Agent) -> str:
# so sanitization must be done here to prevent path traversal.
file_path = agent.workspace.get_path(code_dir / name)
if not file_path.is_relative_to(code_dir):
return "Error: 'name' argument resulted in path traversal, operation aborted"
raise AccessDeniedError(
"'name' argument resulted in path traversal, operation aborted"
)
try:
with open(file_path, "w+", encoding="utf-8") as f:
@@ -68,7 +77,7 @@ def execute_python_code(code: str, name: str, agent: Agent) -> str:
return execute_python_file(str(file_path), agent)
except Exception as e:
return f"Error: {str(e)}"
raise CommandExecutionError(*e.args)
@command(
@@ -97,12 +106,12 @@ def execute_python_file(filename: str, agent: Agent) -> str:
)
if not filename.endswith(".py"):
return "Error: Invalid file type. Only .py files are allowed."
raise InvalidArgumentError("Invalid file type. Only .py files are allowed.")
file_path = Path(filename)
if not file_path.is_file():
# Mimic the response that you get from the command line so that it's easier to identify
return (
raise FileNotFoundError(
f"python: can't open file '{filename}': [Errno 2] No such file or directory"
)
@@ -119,7 +128,7 @@ def execute_python_file(filename: str, agent: Agent) -> str:
if result.returncode == 0:
return result.stdout
else:
return f"Error: {result.stderr}"
raise CodeExecutionError(result.stderr)
logger.debug("Auto-GPT is not running in a Docker container")
try:
@@ -178,10 +187,7 @@ def execute_python_file(filename: str, agent: Agent) -> str:
logger.warn(
"Could not run the script in a container. If you haven't already, please install Docker https://docs.docker.com/get-docker/"
)
return f"Error: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
raise CommandExecutionError(f"Could not run the script in a container: {e}")
def validate_command(command: str, config: Config) -> bool:
@@ -231,7 +237,7 @@ def execute_shell(command_line: str, agent: Agent) -> str:
"""
if not validate_command(command_line, agent.config):
logger.info(f"Command '{command_line}' not allowed")
return "Error: This Shell Command is not allowed."
raise OperationNotAllowedError("This shell command is not allowed.")
current_dir = Path.cwd()
# Change dir into workspace if necessary
@@ -278,7 +284,7 @@ def execute_shell_popen(command_line, agent: Agent) -> str:
"""
if not validate_command(command_line, agent.config):
logger.info(f"Command '{command_line}' not allowed")
return "Error: This Shell Command is not allowed."
raise OperationNotAllowedError("This shell command is not allowed.")
current_dir = os.getcwd()
# Change dir into workspace if necessary

View File

@@ -13,6 +13,7 @@ from pathlib import Path
from typing import Generator, Literal
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import DuplicateOperationError
from autogpt.command_decorator import command
from autogpt.logs import logger
from autogpt.memory.vector import MemoryItem, VectorMemory
@@ -151,17 +152,14 @@ def read_file(filename: str, agent: Agent) -> str:
Returns:
str: The contents of the file
"""
try:
content = read_textual_file(filename, logger)
content = read_textual_file(filename, logger)
# TODO: invalidate/update memory when file is edited
file_memory = MemoryItem.from_text_file(content, filename, agent.config)
if len(file_memory.chunks) > 1:
return file_memory.summary
# TODO: invalidate/update memory when file is edited
file_memory = MemoryItem.from_text_file(content, filename, agent.config)
if len(file_memory.chunks) > 1:
return file_memory.summary
return content
except Exception as e:
return f"Error: {str(e)}"
return content
def ingest_file(
@@ -220,16 +218,14 @@ def write_to_file(filename: str, text: str, agent: Agent) -> str:
"""
checksum = text_checksum(text)
if is_duplicate_operation("write", filename, agent, checksum):
return "Error: File has already been updated."
try:
directory = os.path.dirname(filename)
os.makedirs(directory, exist_ok=True)
with open(filename, "w", encoding="utf-8") as f:
f.write(text)
log_operation("write", filename, agent, checksum)
return "File written to successfully."
except Exception as err:
return f"Error: {err}"
raise DuplicateOperationError("File has already been updated.")
directory = os.path.dirname(filename)
os.makedirs(directory, exist_ok=True)
with open(filename, "w", encoding="utf-8") as f:
f.write(text)
log_operation("write", filename, agent, checksum)
return "File written to successfully."
@sanitize_path_arg("filename")
@@ -246,20 +242,17 @@ def append_to_file(
Returns:
str: A message indicating success or failure
"""
try:
directory = os.path.dirname(filename)
os.makedirs(directory, exist_ok=True)
with open(filename, "a", encoding="utf-8") as f:
f.write(text)
directory = os.path.dirname(filename)
os.makedirs(directory, exist_ok=True)
with open(filename, "a", encoding="utf-8") as f:
f.write(text)
if should_log:
with open(filename, "r", encoding="utf-8") as f:
checksum = text_checksum(f.read())
log_operation("append", filename, agent, checksum=checksum)
if should_log:
with open(filename, "r", encoding="utf-8") as f:
checksum = text_checksum(f.read())
log_operation("append", filename, agent, checksum=checksum)
return "Text appended successfully."
except Exception as err:
return f"Error: {err}"
return "Text appended successfully."
@command(

View File

@@ -6,6 +6,7 @@ COMMAND_CATEGORY_TITLE = "Git Operations"
from git.repo import Repo
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import CommandExecutionError
from autogpt.command_decorator import command
from autogpt.url_utils.validators import validate_url
@@ -50,6 +51,7 @@ def clone_repository(url: str, clone_path: str, agent: Agent) -> str:
)
try:
Repo.clone_from(url=auth_repo_url, to_path=clone_path)
return f"""Cloned {url} to {clone_path}"""
except Exception as e:
return f"Error: {str(e)}"
raise CommandExecutionError(f"Could not clone repo: {e}")
return f"""Cloned {url} to {clone_path}"""

View File

@@ -5,8 +5,6 @@ from __future__ import annotations
COMMAND_CATEGORY = "system"
COMMAND_CATEGORY_TITLE = "System"
from typing import NoReturn
from autogpt.agents.agent import Agent
from autogpt.command_decorator import command
from autogpt.logs import logger
@@ -23,7 +21,7 @@ from autogpt.logs import logger
}
},
)
def task_complete(reason: str, agent: Agent) -> NoReturn:
def task_complete(reason: str, agent: Agent) -> None:
"""
A function that takes in a string and exits the program

View File

@@ -0,0 +1,28 @@
"""Commands to interact with the user"""
from __future__ import annotations
COMMAND_CATEGORY = "user_interaction"
COMMAND_CATEGORY_TITLE = "User Interaction"
from autogpt.agents.agent import Agent
from autogpt.app.utils import clean_input
from autogpt.command_decorator import command
@command(
"ask_user",
(
"If you need more details or information regarding the given goals,"
" you can ask the user for input"
),
{
"question": {
"type": "string",
"description": "The question or prompt to the user",
"required": True,
}
},
)
def ask_user(question: str, agent: Agent) -> str:
return clean_input(agent.config, question)

View File

@@ -12,6 +12,7 @@ from itertools import islice
from duckduckgo_search import DDGS
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import ConfigurationError
from autogpt.command_decorator import command
DUCKDUCKGO_MAX_ATTEMPTS = 3
@@ -119,9 +120,10 @@ def google(query: str, agent: Agent, num_results: int = 8) -> str | list[str]:
) == 403 and "invalid API key" in error_details.get("error", {}).get(
"message", ""
):
return "Error: The provided Google API key is invalid or missing."
else:
return f"Error: {e}"
raise ConfigurationError(
"The provided Google API key is invalid or missing."
)
raise
# google_result can be a list or a string depending on the search results
# Return the list of search result URLs

View File

@@ -2,15 +2,14 @@
from __future__ import annotations
from autogpt.llm.utils.token_counter import count_string_tokens
COMMAND_CATEGORY = "web_browse"
COMMAND_CATEGORY_TITLE = "Web Browsing"
import logging
import re
from pathlib import Path
from sys import platform
from typing import Optional
from typing import TYPE_CHECKING, Optional, Type
from bs4 import BeautifulSoup
from selenium.common.exceptions import WebDriverException
@@ -34,8 +33,13 @@ from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager as EdgeDriverManager
from autogpt.agents.agent import Agent
if TYPE_CHECKING:
from autogpt.config import Config
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import CommandExecutionError
from autogpt.command_decorator import command
from autogpt.llm.utils import count_string_tokens
from autogpt.logs import logger
from autogpt.memory.vector import MemoryItem, get_memory
from autogpt.processing.html import extract_hyperlinks, format_hyperlinks
@@ -46,6 +50,10 @@ TOKENS_TO_TRIGGER_SUMMARY = 50
LINKS_TO_RETURN = 20
class BrowsingError(CommandExecutionError):
"""An error occurred while trying to browse the page"""
@command(
"browse_website",
"Browses a Website",
@@ -71,13 +79,16 @@ def browse_website(url: str, question: str, agent: Agent) -> str:
"""
driver = None
try:
driver, text = scrape_text_with_selenium(url, agent)
add_header(driver)
if TOKENS_TO_TRIGGER_SUMMARY < count_string_tokens(text, agent.llm.name):
text = summarize_memorize_webpage(url, text, question, agent, driver)
driver = open_page_in_browser(url, agent.config)
text = scrape_text_with_selenium(driver)
links = scrape_links_with_selenium(driver, url)
if not text:
return f"Website did not contain any text.\n\nLinks: {links}"
elif count_string_tokens(text, agent.llm.name) > TOKENS_TO_TRIGGER_SUMMARY:
text = summarize_memorize_webpage(url, text, question, agent, driver)
# Limit links to LINKS_TO_RETURN
if len(links) > LINKS_TO_RETURN:
links = links[:LINKS_TO_RETURN]
@@ -87,47 +98,98 @@ def browse_website(url: str, question: str, agent: Agent) -> str:
# These errors are often quite long and include lots of context.
# Just grab the first line.
msg = e.msg.split("\n")[0]
return f"Error: {msg}"
if "net::" in msg:
raise BrowsingError(
f"A networking error occurred while trying to load the page: "
+ re.sub(r"^unknown error: ", "", msg)
)
raise CommandExecutionError(msg)
finally:
if driver:
close_browser(driver)
def scrape_text_with_selenium(url: str, agent: Agent) -> tuple[WebDriver, str]:
"""Scrape text from a website using selenium
def scrape_text_with_selenium(driver: WebDriver) -> str:
"""Scrape text from a browser window using selenium
Args:
url (str): The url of the website to scrape
driver (WebDriver): A driver object representing the browser window to scrape
Returns:
Tuple[WebDriver, str]: The webdriver and the text scraped from the website
str: the text scraped from the website
"""
# Get the HTML content directly from the browser's DOM
page_source = driver.execute_script("return document.body.outerHTML;")
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return text
def scrape_links_with_selenium(driver: WebDriver, base_url: str) -> list[str]:
"""Scrape links from a website using selenium
Args:
driver (WebDriver): A driver object representing the browser window to scrape
base_url (str): The base URL to use for resolving relative links
Returns:
List[str]: The links scraped from the website
"""
page_source = driver.page_source
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
hyperlinks = extract_hyperlinks(soup, base_url)
return format_hyperlinks(hyperlinks)
def open_page_in_browser(url: str, config: Config) -> WebDriver:
"""Open a browser window and load a web page using Selenium
Params:
url (str): The URL of the page to load
config (Config): The applicable application configuration
Returns:
driver (WebDriver): A driver object representing the browser window to scrape
"""
logging.getLogger("selenium").setLevel(logging.CRITICAL)
options_available: dict[str, BrowserOptions] = {
options_available: dict[str, Type[BrowserOptions]] = {
"chrome": ChromeOptions,
"edge": EdgeOptions,
"firefox": FirefoxOptions,
"safari": SafariOptions,
}
options: BrowserOptions = options_available[agent.config.selenium_web_browser]()
options: BrowserOptions = options_available[config.selenium_web_browser]()
options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36"
)
if agent.config.selenium_web_browser == "firefox":
if agent.config.selenium_headless:
if config.selenium_web_browser == "firefox":
if config.selenium_headless:
options.headless = True
options.add_argument("--disable-gpu")
driver = FirefoxDriver(
service=GeckoDriverService(GeckoDriverManager().install()), options=options
)
elif agent.config.selenium_web_browser == "edge":
elif config.selenium_web_browser == "edge":
driver = EdgeDriver(
service=EdgeDriverService(EdgeDriverManager().install()), options=options
)
elif agent.config.selenium_web_browser == "safari":
elif config.selenium_web_browser == "safari":
# Requires a bit more setup on the users end
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari
driver = SafariDriver(options=options)
@@ -137,7 +199,7 @@ def scrape_text_with_selenium(url: str, agent: Agent) -> tuple[WebDriver, str]:
options.add_argument("--remote-debugging-port=9222")
options.add_argument("--no-sandbox")
if agent.config.selenium_headless:
if config.selenium_headless:
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
@@ -155,38 +217,7 @@ def scrape_text_with_selenium(url: str, agent: Agent) -> tuple[WebDriver, str]:
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Get the HTML content directly from the browser's DOM
page_source = driver.execute_script("return document.body.outerHTML;")
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return driver, text
def scrape_links_with_selenium(driver: WebDriver, url: str) -> list[str]:
"""Scrape links from a website using selenium
Args:
driver (WebDriver): The webdriver to use to scrape the links
Returns:
List[str]: The links scraped from the website
"""
page_source = driver.page_source
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
hyperlinks = extract_hyperlinks(soup, url)
return format_hyperlinks(hyperlinks)
return driver
def close_browser(driver: WebDriver) -> None:
@@ -201,23 +232,6 @@ def close_browser(driver: WebDriver) -> None:
driver.quit()
def add_header(driver: WebDriver) -> None:
"""Add a header to the website
Args:
driver (WebDriver): The webdriver to use to add the header
Returns:
None
"""
try:
with open(f"{FILE_DIR}/js/overlay.js", "r") as overlay_file:
overlay_script = overlay_file.read()
driver.execute_script(overlay_script)
except Exception as e:
print(f"Error executing overlay.js: {e}")
def summarize_memorize_webpage(
url: str,
text: str,
@@ -237,7 +251,7 @@ def summarize_memorize_webpage(
str: The summary of the text
"""
if not text:
return "Error: No text to summarize"
raise ValueError("No text to summarize")
text_length = len(text)
logger.info(f"Text length: {text_length} characters")

View File

@@ -1,29 +0,0 @@
const overlay = document.createElement('div');
Object.assign(overlay.style, {
position: 'fixed',
zIndex: 999999,
top: 0,
left: 0,
width: '100%',
height: '100%',
background: 'rgba(0, 0, 0, 0.7)',
color: '#fff',
fontSize: '24px',
fontWeight: 'bold',
display: 'flex',
justifyContent: 'center',
alignItems: 'center',
});
const textContent = document.createElement('div');
Object.assign(textContent.style, {
textAlign: 'center',
});
textContent.textContent = 'AutoGPT Analyzing Page';
overlay.appendChild(textContent);
document.body.append(overlay);
document.body.style.overflow = 'hidden';
let dotCount = 0;
setInterval(() => {
textContent.textContent = 'AutoGPT Analyzing Page' + '.'.repeat(dotCount);
dotCount = (dotCount + 1) % 4;
}, 1000);

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Iterator, Literal, Optional
@dataclass
class Action:
name: str
args: dict[str, Any]
reasoning: str
def format_call(self) -> str:
return f"{self.name}({', '.join([f'{a}={repr(v)}' for a, v in self.args.items()])})"
@dataclass
class ActionSuccessResult:
results: Any
status: Literal["success"] = "success"
def __str__(self) -> str:
return f"Action succeeded and returned: `{self.results}`"
@dataclass
class ActionErrorResult:
reason: str
error: Optional[Exception] = None
status: Literal["error"] = "error"
def __str__(self) -> str:
return f"Action failed: `{self.reason}`"
@dataclass
class ActionInterruptedByHuman:
feedback: str
status: Literal["interrupted_by_human"] = "interrupted_by_human"
def __str__(self) -> str:
return f'The user interrupted the action with the following feedback: "{self.feedback}"'
ActionResult = ActionSuccessResult | ActionErrorResult | ActionInterruptedByHuman
class ActionHistory:
"""Utility container for an action history"""
@dataclass
class CycleRecord:
action: Action | None
result: ActionResult | None
cursor: int
cycles: list[CycleRecord]
def __init__(self, cycles: list[CycleRecord] = []):
self.cycles = cycles
self.cursor = len(self.cycles)
@property
def current_record(self) -> CycleRecord | None:
if self.cursor == len(self):
return None
return self[self.cursor]
def __getitem__(self, key: int) -> CycleRecord:
return self.cycles[key]
def __iter__(self) -> Iterator[CycleRecord]:
return iter(self.cycles)
def __len__(self) -> int:
return len(self.cycles)
def __bool__(self) -> bool:
return len(self.cycles) > 0
def register_action(self, action: Action) -> None:
if not self.current_record:
self.cycles.append(self.CycleRecord(None, None))
assert self.current_record
elif self.current_record.action:
raise ValueError("Action for current cycle already set")
self.current_record.action = action
def register_result(self, result: ActionResult) -> None:
if not self.current_record:
raise RuntimeError("Cannot register result for cycle without action")
elif self.current_record.result:
raise ValueError("Result for current cycle already set")
self.current_record.result = result

View File

@@ -6,6 +6,10 @@ if TYPE_CHECKING:
from autogpt.config import Config
from .command_parameter import CommandParameter
from .context_item import ContextItem
CommandReturnValue = Any
CommandOutput = CommandReturnValue | tuple[CommandReturnValue, ContextItem]
class Command:
@@ -21,7 +25,7 @@ class Command:
self,
name: str,
description: str,
method: Callable[..., Any],
method: Callable[..., CommandOutput],
parameters: list[CommandParameter],
enabled: bool | Callable[[Config], bool] = True,
disabled_reason: Optional[str] = None,

View File

@@ -0,0 +1,76 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
class ContextItem(ABC):
@property
@abstractmethod
def description(self) -> str:
"""Description of the context item"""
...
@property
@abstractmethod
def source(self) -> Optional[str]:
"""A string indicating the source location of the context item"""
...
@property
@abstractmethod
def content(self) -> str:
"""The content represented by the context item"""
...
def __str__(self) -> str:
return (
f"{self.description} (source: {self.source})\n"
"```\n"
f"{self.content}\n"
"```"
)
@dataclass
class FileContextItem(ContextItem):
file_path: Path
description: str
@property
def source(self) -> str:
return f"local file '{self.file_path}'"
@property
def content(self) -> str:
return self.file_path.read_text()
@dataclass
class FolderContextItem(ContextItem):
path: Path
def __post_init__(self) -> None:
assert self.path.exists(), "Selected path does not exist"
assert self.path.is_dir(), "Selected path is not a directory"
@property
def description(self) -> str:
return f"The contents of the folder '{self.path}' in the workspace"
@property
def source(self) -> str:
return f"local folder '{self.path}'"
@property
def content(self) -> str:
items = [f"{p.name}{'/' if p.is_dir() else ''}" for p in self.path.iterdir()]
items.sort()
return "\n".join(items)
@dataclass
class StaticContextItem(ContextItem):
description: str
source: Optional[str]
content: str

View File

@@ -131,12 +131,12 @@ class PromptGenerator:
"## Constraints\n"
"You operate within the following constraints:\n"
f"{self._generate_numbered_list(self.constraints + additional_constraints)}\n\n"
"## Commands\n"
"You have access to the following commands:\n"
f"{self._generate_commands()}\n\n"
"## Resources\n"
"You can leverage access to the following resources:\n"
f"{self._generate_numbered_list(self.resources + additional_resources)}\n\n"
"## Commands\n"
"You have access to the following commands:\n"
f"{self._generate_commands()}\n\n"
"## Best practices\n"
f"{self._generate_numbered_list(self.best_practices + additional_best_practices)}"
)

View File

@@ -1,14 +1,11 @@
constraints: [
'~4000 word limit for short term memory. Your short term memory is short, so immediately save important information to files.',
'If you are unsure how you previously did something or want to recall past events, thinking about similar events will help you remember.',
'No user assistance',
'Exclusively use the commands listed below e.g. command_name'
'Exclusively use the commands listed below.',
'You can only act proactively, and are unable to start background jobs or set up webhooks for yourself. Take this into account when planning your actions.',
'You are unable to interact with physical objects. If this is absolutely necessary to fulfill a task or objective or to complete a step, you must ask the user to do it for you. If the user refuses this, and there is no other way to achieve your goals, you must terminate to avoid wasting time and energy.'
]
resources: [
'Internet access for searches and information gathering.',
'Long Term memory management.',
'File output.',
'Command execution'
'The ability to read and write files.',
]
best_practices: [
'Continuously review and analyze your actions to ensure you are performing to the best of your abilities.',

View File

@@ -1,6 +1,5 @@
import os
import random
import re
import string
import tempfile
@@ -8,6 +7,11 @@ import pytest
import autogpt.commands.execute_code as sut # system under testing
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import (
AccessDeniedError,
InvalidArgumentError,
OperationNotAllowedError,
)
from autogpt.config import Config
@@ -53,11 +57,8 @@ def test_execute_python_code(random_code: str, random_string: str, agent: Agent)
def test_execute_python_code_disallows_name_arg_path_traversal(
random_code: str, agent: Agent
):
result: str = sut.execute_python_code(
random_code, name="../../test_code", agent=agent
)
assert "Error:" in result, "Path traversal in 'name' argument does not return error"
assert "path traversal" in result.lower()
with pytest.raises(AccessDeniedError, match="path traversal"):
sut.execute_python_code(random_code, name="../../test_code", agent=agent)
# Check that the code is not stored in parent directory
dst_with_traversal = agent.workspace.get_path("test_code.py")
@@ -82,16 +83,16 @@ def test_execute_python_code_overwrites_file(random_code: str, agent: Agent):
def test_execute_python_file_invalid(agent: Agent):
assert all(
s in sut.execute_python_file("not_python", agent).lower()
for s in ["error:", "invalid", ".py"]
)
with pytest.raises(InvalidArgumentError):
sut.execute_python_file("not_python", agent)
def test_execute_python_file_not_found(agent: Agent):
result = sut.execute_python_file("notexist.py", agent).lower()
assert re.match(r"python: can't open file '([A-Z]:)?[/\\\-\w]*notexist.py'", result)
assert "[errno 2] no such file or directory" in result
with pytest.raises(
FileNotFoundError,
match=r"python: can't open file '([a-zA-Z]:)?[/\\\-\w]*notexist.py': \[Errno 2\] No such file or directory",
):
sut.execute_python_file("notexist.py", agent)
def test_execute_shell(random_string: str, agent: Agent):
@@ -107,8 +108,8 @@ def test_execute_shell_local_commands_not_allowed(random_string: str, agent: Age
def test_execute_shell_denylist_should_deny(agent: Agent, random_string: str):
agent.config.shell_denylist = ["echo"]
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Error:" in result and "not allowed" in result
with pytest.raises(OperationNotAllowedError, match="not allowed"):
sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
def test_execute_shell_denylist_should_allow(agent: Agent, random_string: str):
@@ -116,15 +117,14 @@ def test_execute_shell_denylist_should_allow(agent: Agent, random_string: str):
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Hello" in result and random_string in result
assert "Error" not in result
def test_execute_shell_allowlist_should_deny(agent: Agent, random_string: str):
agent.config.shell_command_control = sut.ALLOWLIST_CONTROL
agent.config.shell_allowlist = ["cat"]
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Error:" in result and "not allowed" in result
with pytest.raises(OperationNotAllowedError, match="not allowed"):
sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
def test_execute_shell_allowlist_should_allow(agent: Agent, random_string: str):
@@ -133,4 +133,3 @@ def test_execute_shell_allowlist_should_allow(agent: Agent, random_string: str):
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Hello" in result and random_string in result
assert "Error" not in result

View File

@@ -1,33 +0,0 @@
from unittest.mock import MagicMock, patch
from colorama import Fore
from autogpt.app.main import update_user
def test_update_user_command_name_is_none() -> None:
# Mock necessary objects
config = MagicMock()
ai_config = MagicMock()
assistant_reply_dict = MagicMock()
# Mock print_assistant_thoughts and logger.typewriter_log
with patch(
"autogpt.app.main.print_assistant_thoughts"
) as mock_print_assistant_thoughts, patch(
"autogpt.app.main.logger.typewriter_log"
) as mock_logger_typewriter_log:
# Test the update_user function with None command_name
update_user(config, ai_config, None, None, assistant_reply_dict)
# Check that print_assistant_thoughts was called once
mock_print_assistant_thoughts.assert_called_once_with(
ai_config.ai_name, assistant_reply_dict, config
)
# Check that logger.typewriter_log was called once with expected arguments
mock_logger_typewriter_log.assert_called_once_with(
"NO ACTION SELECTED: ",
Fore.RED,
f"The Agent failed to select an action.",
)

View File

@@ -1,17 +1,17 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agents.agent import Agent
from autogpt.commands.web_selenium import browse_website
from autogpt.commands.web_selenium import BrowsingError, browse_website
@pytest.mark.vcr
@pytest.mark.requires_openai_api_key
def test_browse_website(agent: Agent, patched_api_requestor: MockerFixture):
def test_browse_website_nonexistent_url(agent: Agent, patched_api_requestor: None):
url = "https://barrel-roll.com"
question = "How to execute a barrel roll"
response = browse_website(url, question, agent)
assert "error" in response.lower()
# Sanity check that the response is not too long
assert len(response) < 200
with pytest.raises(BrowsingError, match=r"CONNECTION_CLOSED") as raised:
browse_website(url, question, agent)
# Sanity check that the response is not too long
assert len(raised.exconly()) < 200

View File

@@ -13,6 +13,7 @@ from pytest_mock import MockerFixture
import autogpt.commands.file_operations as file_ops
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import DuplicateOperationError
from autogpt.config import Config
from autogpt.memory.vector.memory_item import MemoryItem
from autogpt.memory.vector.utils import Embedding
@@ -199,8 +200,8 @@ def test_read_file(
def test_read_file_not_found(agent: Agent):
filename = "does_not_exist.txt"
content = file_ops.read_file(filename, agent=agent)
assert "Error:" in content and filename in content and "no such file" in content
with pytest.raises(FileNotFoundError):
file_ops.read_file(filename, agent=agent)
def test_write_to_file_relative_path(test_file_name: Path, agent: Agent):
@@ -236,8 +237,8 @@ def test_write_file_fails_if_content_exists(test_file_name: Path, agent: Agent):
agent=agent,
checksum=file_ops.text_checksum(new_content),
)
result = file_ops.write_to_file(str(test_file_name), new_content, agent=agent)
assert result == "Error: File has already been updated."
with pytest.raises(DuplicateOperationError):
file_ops.write_to_file(str(test_file_name), new_content, agent=agent)
def test_write_file_succeeds_if_content_different(

View File

@@ -3,6 +3,7 @@ from git.exc import GitCommandError
from git.repo.base import Repo
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import CommandExecutionError
from autogpt.commands.git_operations import clone_repository
@@ -38,6 +39,5 @@ def test_clone_repository_error(workspace, mock_clone_from, agent: Agent):
"clone", "fatal: repository not found", ""
)
result = clone_repository(url=url, clone_path=clone_path, agent=agent)
assert "Error: " in result
with pytest.raises(CommandExecutionError):
clone_repository(url=url, clone_path=clone_path, agent=agent)

View File

@@ -4,6 +4,7 @@ import pytest
from googleapiclient.errors import HttpError
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import ConfigurationError
from autogpt.commands.web_search import google, safe_google_results, web_search
@@ -89,20 +90,19 @@ def test_google_official_search(
@pytest.mark.parametrize(
"query, num_results, expected_output, http_code, error_msg",
"query, num_results, expected_error_type, http_code, error_msg",
[
(
"invalid query",
3,
"Error: <HttpError 400 when requesting https://www.googleapis.com/customsearch/v1?q=invalid+query&cx "
'returned "Invalid Value". Details: "Invalid Value">',
HttpError,
400,
"Invalid Value",
),
(
"invalid API key",
3,
"Error: The provided Google API key is invalid or missing.",
ConfigurationError,
403,
"invalid API key",
),
@@ -111,7 +111,7 @@ def test_google_official_search(
def test_google_official_search_errors(
query,
num_results,
expected_output,
expected_error_type,
mock_googleapiclient,
http_code,
error_msg,
@@ -132,5 +132,5 @@ def test_google_official_search_errors(
)
mock_googleapiclient.side_effect = error
actual_output = google(query, agent=agent, num_results=num_results)
assert actual_output == safe_google_results(expected_output)
with pytest.raises(expected_error_type):
google(query, agent=agent, num_results=num_results)