mirror of
https://github.com/aljazceru/Auto-GPT.git
synced 2026-01-07 16:24:19 +01:00
* Add links to github issues in the README and clarify run instructions * Rename agent subpackage to agents * Revert all unwanted changes * Use relative import in `agents/__init__.py` --------- Co-authored-by: Reinier van der Leer <github@pwuts.nl>
399 lines
16 KiB
Python
399 lines
16 KiB
Python
import json
|
|
import signal
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from colorama import Fore, Style
|
|
|
|
from autogpt.config import Config
|
|
from autogpt.config.ai_config import AIConfig
|
|
from autogpt.json_utils.utilities import extract_json_from_response, validate_json
|
|
from autogpt.llm import ChatModelResponse
|
|
from autogpt.llm.chat import chat_with_ai
|
|
from autogpt.llm.providers.openai import OPEN_AI_CHAT_MODELS
|
|
from autogpt.llm.utils import count_string_tokens
|
|
from autogpt.logs import (
|
|
FULL_MESSAGE_HISTORY_FILE_NAME,
|
|
NEXT_ACTION_FILE_NAME,
|
|
USER_INPUT_FILE_NAME,
|
|
LogCycleHandler,
|
|
logger,
|
|
print_assistant_thoughts,
|
|
remove_ansi_escape,
|
|
)
|
|
from autogpt.memory.message_history import MessageHistory
|
|
from autogpt.memory.vector import VectorMemory
|
|
from autogpt.models.command_registry import CommandRegistry
|
|
from autogpt.speech import say_text
|
|
from autogpt.spinner import Spinner
|
|
from autogpt.utils import clean_input
|
|
from autogpt.workspace import Workspace
|
|
|
|
|
|
class Agent:
|
|
"""Agent class for interacting with Auto-GPT.
|
|
|
|
Attributes:
|
|
ai_name: The name of the agent.
|
|
memory: The memory object to use.
|
|
next_action_count: The number of actions to execute.
|
|
system_prompt: The system prompt is the initial prompt that defines everything
|
|
the AI needs to know to achieve its task successfully.
|
|
Currently, the dynamic and customizable information in the system prompt are
|
|
ai_name, description and goals.
|
|
|
|
triggering_prompt: The last sentence the AI will see before answering.
|
|
For Auto-GPT, this prompt is:
|
|
Determine exactly one command to use, and respond using the format specified
|
|
above:
|
|
The triggering prompt is not part of the system prompt because between the
|
|
system prompt and the triggering
|
|
prompt we have contextual information that can distract the AI and make it
|
|
forget that its goal is to find the next task to achieve.
|
|
SYSTEM PROMPT
|
|
CONTEXTUAL INFORMATION (memory, previous conversations, anything relevant)
|
|
TRIGGERING PROMPT
|
|
|
|
The triggering prompt reminds the AI about its short term meta task
|
|
(defining the next task)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ai_name: str,
|
|
memory: VectorMemory,
|
|
next_action_count: int,
|
|
command_registry: CommandRegistry,
|
|
ai_config: AIConfig,
|
|
system_prompt: str,
|
|
triggering_prompt: str,
|
|
workspace_directory: str | Path,
|
|
config: Config,
|
|
):
|
|
self.ai_name = ai_name
|
|
self.memory = memory
|
|
self.history = MessageHistory.for_model(config.smart_llm, agent=self)
|
|
self.next_action_count = next_action_count
|
|
self.command_registry = command_registry
|
|
self.config = config
|
|
self.ai_config = ai_config
|
|
self.system_prompt = system_prompt
|
|
self.triggering_prompt = triggering_prompt
|
|
self.workspace = Workspace(workspace_directory, config.restrict_to_workspace)
|
|
self.created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.cycle_count = 0
|
|
self.log_cycle_handler = LogCycleHandler()
|
|
self.smart_token_limit = OPEN_AI_CHAT_MODELS.get(config.smart_llm).max_tokens
|
|
|
|
def start_interaction_loop(self):
|
|
# Interaction Loop
|
|
self.cycle_count = 0
|
|
command_name = None
|
|
arguments = None
|
|
user_input = ""
|
|
|
|
# Signal handler for interrupting y -N
|
|
def signal_handler(signum, frame):
|
|
if self.next_action_count == 0:
|
|
sys.exit()
|
|
else:
|
|
print(
|
|
Fore.RED
|
|
+ "Interrupt signal received. Stopping continuous command execution."
|
|
+ Style.RESET_ALL
|
|
)
|
|
self.next_action_count = 0
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
while True:
|
|
# Discontinue if continuous limit is reached
|
|
self.cycle_count += 1
|
|
self.log_cycle_handler.log_count_within_cycle = 0
|
|
self.log_cycle_handler.log_cycle(
|
|
self.ai_config.ai_name,
|
|
self.created_at,
|
|
self.cycle_count,
|
|
[m.raw() for m in self.history],
|
|
FULL_MESSAGE_HISTORY_FILE_NAME,
|
|
)
|
|
if (
|
|
self.config.continuous_mode
|
|
and self.config.continuous_limit > 0
|
|
and self.cycle_count > self.config.continuous_limit
|
|
):
|
|
logger.typewriter_log(
|
|
"Continuous Limit Reached: ",
|
|
Fore.YELLOW,
|
|
f"{self.config.continuous_limit}",
|
|
)
|
|
break
|
|
# Send message to AI, get response
|
|
with Spinner("Thinking... ", plain_output=self.config.plain_output):
|
|
assistant_reply = chat_with_ai(
|
|
self.config,
|
|
self,
|
|
self.system_prompt,
|
|
self.triggering_prompt,
|
|
self.smart_token_limit,
|
|
self.config.smart_llm,
|
|
)
|
|
|
|
try:
|
|
assistant_reply_json = extract_json_from_response(
|
|
assistant_reply.content
|
|
)
|
|
validate_json(assistant_reply_json, self.config)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Exception while validating assistant reply JSON: {e}")
|
|
assistant_reply_json = {}
|
|
|
|
for plugin in self.config.plugins:
|
|
if not plugin.can_handle_post_planning():
|
|
continue
|
|
assistant_reply_json = plugin.post_planning(assistant_reply_json)
|
|
|
|
# Print Assistant thoughts
|
|
if assistant_reply_json != {}:
|
|
# Get command name and arguments
|
|
try:
|
|
print_assistant_thoughts(
|
|
self.ai_name, assistant_reply_json, self.config
|
|
)
|
|
command_name, arguments = extract_command(
|
|
assistant_reply_json, assistant_reply, self.config
|
|
)
|
|
if self.config.speak_mode:
|
|
say_text(f"I want to execute {command_name}", self.config)
|
|
|
|
except Exception as e:
|
|
logger.error("Error: \n", str(e))
|
|
self.log_cycle_handler.log_cycle(
|
|
self.ai_config.ai_name,
|
|
self.created_at,
|
|
self.cycle_count,
|
|
assistant_reply_json,
|
|
NEXT_ACTION_FILE_NAME,
|
|
)
|
|
|
|
# First log new-line so user can differentiate sections better in console
|
|
logger.typewriter_log("\n")
|
|
logger.typewriter_log(
|
|
"NEXT ACTION: ",
|
|
Fore.CYAN,
|
|
f"COMMAND = {Fore.CYAN}{remove_ansi_escape(command_name)}{Style.RESET_ALL} "
|
|
f"ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}",
|
|
)
|
|
|
|
if not self.config.continuous_mode and self.next_action_count == 0:
|
|
# ### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
|
|
# Get key press: Prompt the user to press enter to continue or escape
|
|
# to exit
|
|
self.user_input = ""
|
|
logger.info(
|
|
f"Enter '{self.config.authorise_key}' to authorise command, "
|
|
f"'{self.config.authorise_key} -N' to run N continuous commands, "
|
|
f"'{self.config.exit_key}' to exit program, or enter feedback for "
|
|
f"{self.ai_name}..."
|
|
)
|
|
while True:
|
|
if self.config.chat_messages_enabled:
|
|
console_input = clean_input(
|
|
self.config, "Waiting for your response..."
|
|
)
|
|
else:
|
|
console_input = clean_input(
|
|
self.config, Fore.MAGENTA + "Input:" + Style.RESET_ALL
|
|
)
|
|
if console_input.lower().strip() == self.config.authorise_key:
|
|
user_input = "GENERATE NEXT COMMAND JSON"
|
|
break
|
|
elif console_input.lower().strip() == "":
|
|
logger.warn("Invalid input format.")
|
|
continue
|
|
elif console_input.lower().startswith(
|
|
f"{self.config.authorise_key} -"
|
|
):
|
|
try:
|
|
self.next_action_count = abs(
|
|
int(console_input.split(" ")[1])
|
|
)
|
|
user_input = "GENERATE NEXT COMMAND JSON"
|
|
except ValueError:
|
|
logger.warn(
|
|
f"Invalid input format. Please enter '{self.config.authorise_key} -n' "
|
|
"where n is the number of continuous tasks."
|
|
)
|
|
continue
|
|
break
|
|
elif console_input.lower() == self.config.exit_key:
|
|
user_input = "EXIT"
|
|
break
|
|
else:
|
|
user_input = console_input
|
|
command_name = "human_feedback"
|
|
self.log_cycle_handler.log_cycle(
|
|
self.ai_config.ai_name,
|
|
self.created_at,
|
|
self.cycle_count,
|
|
user_input,
|
|
USER_INPUT_FILE_NAME,
|
|
)
|
|
break
|
|
|
|
if user_input == "GENERATE NEXT COMMAND JSON":
|
|
logger.typewriter_log(
|
|
"-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=",
|
|
Fore.MAGENTA,
|
|
"",
|
|
)
|
|
elif user_input == "EXIT":
|
|
logger.info("Exiting...")
|
|
break
|
|
else:
|
|
# First log new-line so user can differentiate sections better in console
|
|
logger.typewriter_log("\n")
|
|
# Print authorized commands left value
|
|
logger.typewriter_log(
|
|
f"{Fore.CYAN}AUTHORISED COMMANDS LEFT: {Style.RESET_ALL}{self.next_action_count}"
|
|
)
|
|
|
|
# Execute command
|
|
if command_name is not None and command_name.lower().startswith("error"):
|
|
result = f"Could not execute command: {arguments}"
|
|
elif command_name == "human_feedback":
|
|
result = f"Human feedback: {user_input}"
|
|
else:
|
|
for plugin in self.config.plugins:
|
|
if not plugin.can_handle_pre_command():
|
|
continue
|
|
command_name, arguments = plugin.pre_command(
|
|
command_name, arguments
|
|
)
|
|
command_result = execute_command(
|
|
command_name=command_name,
|
|
arguments=arguments,
|
|
agent=self,
|
|
)
|
|
result = f"Command {command_name} returned: " f"{command_result}"
|
|
|
|
result_tlength = count_string_tokens(
|
|
str(command_result), self.config.smart_llm
|
|
)
|
|
memory_tlength = count_string_tokens(
|
|
str(self.history.summary_message()), self.config.smart_llm
|
|
)
|
|
if result_tlength + memory_tlength + 600 > self.smart_token_limit:
|
|
result = f"Failure: command {command_name} returned too much output. \
|
|
Do not execute this command again with the same arguments."
|
|
|
|
for plugin in self.config.plugins:
|
|
if not plugin.can_handle_post_command():
|
|
continue
|
|
result = plugin.post_command(command_name, result)
|
|
if self.next_action_count > 0:
|
|
self.next_action_count -= 1
|
|
|
|
# Check if there's a result from the command append it to the message
|
|
# history
|
|
if result is not None:
|
|
self.history.add("system", result, "action_result")
|
|
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result)
|
|
else:
|
|
self.history.add("system", "Unable to execute command", "action_result")
|
|
logger.typewriter_log(
|
|
"SYSTEM: ", Fore.YELLOW, "Unable to execute command"
|
|
)
|
|
|
|
|
|
def extract_command(
|
|
assistant_reply_json: dict, assistant_reply: ChatModelResponse, config: Config
|
|
):
|
|
"""Parse the response and return the command name and arguments
|
|
|
|
Args:
|
|
assistant_reply_json (dict): The response object from the AI
|
|
assistant_reply (ChatModelResponse): The model response from the AI
|
|
config (Config): The config object
|
|
|
|
Returns:
|
|
tuple: The command name and arguments
|
|
|
|
Raises:
|
|
json.decoder.JSONDecodeError: If the response is not valid JSON
|
|
|
|
Exception: If any other error occurs
|
|
"""
|
|
if config.openai_functions:
|
|
if assistant_reply.function_call is None:
|
|
return "Error:", "No 'function_call' in assistant reply"
|
|
assistant_reply_json["command"] = {
|
|
"name": assistant_reply.function_call.name,
|
|
"args": json.loads(assistant_reply.function_call.arguments),
|
|
}
|
|
try:
|
|
if "command" not in assistant_reply_json:
|
|
return "Error:", "Missing 'command' object in JSON"
|
|
|
|
if not isinstance(assistant_reply_json, dict):
|
|
return (
|
|
"Error:",
|
|
f"The previous message sent was not a dictionary {assistant_reply_json}",
|
|
)
|
|
|
|
command = assistant_reply_json["command"]
|
|
if not isinstance(command, dict):
|
|
return "Error:", "'command' object is not a dictionary"
|
|
|
|
if "name" not in command:
|
|
return "Error:", "Missing 'name' field in 'command' object"
|
|
|
|
command_name = command["name"]
|
|
|
|
# Use an empty dictionary if 'args' field is not present in 'command' object
|
|
arguments = command.get("args", {})
|
|
|
|
return command_name, arguments
|
|
except json.decoder.JSONDecodeError:
|
|
return "Error:", "Invalid JSON"
|
|
# All other errors, return "Error: + error message"
|
|
except Exception as e:
|
|
return "Error:", str(e)
|
|
|
|
|
|
def execute_command(
|
|
command_name: str,
|
|
arguments: dict[str, str],
|
|
agent: Agent,
|
|
):
|
|
"""Execute the command and return the result
|
|
|
|
Args:
|
|
command_name (str): The name of the command to execute
|
|
arguments (dict): The arguments for the command
|
|
agent (Agent): The agent that is executing the command
|
|
|
|
Returns:
|
|
str: The result of the command
|
|
"""
|
|
try:
|
|
# Execute a native command with the same name or alias, if it exists
|
|
if command := agent.command_registry.get_command(command_name):
|
|
return command(**arguments, agent=agent)
|
|
|
|
# Handle non-native commands (e.g. from plugins)
|
|
for command in agent.ai_config.prompt_generator.commands:
|
|
if (
|
|
command_name == command["label"].lower()
|
|
or command_name == command["name"].lower()
|
|
):
|
|
return command["function"](**arguments)
|
|
|
|
raise RuntimeError(
|
|
f"Cannot execute '{command_name}': unknown command."
|
|
" Do not try to use this command again."
|
|
)
|
|
except Exception as e:
|
|
return f"Error: {str(e)}"
|