mirror of
https://github.com/aljazceru/Auto-GPT.git
synced 2026-01-03 22:34:29 +01:00
246 lines
10 KiB
Python
246 lines
10 KiB
Python
import autogpt.commands as cmd
|
|
import json
|
|
import traceback
|
|
from tkinter.ttk import Style
|
|
|
|
from colorama import Fore
|
|
|
|
import autogpt.chat
|
|
from autogpt.config import Config
|
|
from autogpt.logger import logger
|
|
import autogpt.speak
|
|
from autogpt.spinner import Spinner
|
|
|
|
|
|
class Agent:
|
|
"""Agent class for interacting with Auto-GPT.
|
|
|
|
Attributes:
|
|
ai_name: The name of the agent.
|
|
memory: The memory object to use.
|
|
full_message_history: The full message history.
|
|
next_action_count: The number of actions to execute.
|
|
prompt: The prompt to use.
|
|
user_input: The user input.
|
|
|
|
"""
|
|
def __init__(self,
|
|
ai_name,
|
|
memory,
|
|
full_message_history,
|
|
next_action_count,
|
|
prompt,
|
|
user_input):
|
|
self.ai_name = ai_name
|
|
self.memory = memory
|
|
self.full_message_history = full_message_history
|
|
self.next_action_count = next_action_count
|
|
self.prompt = prompt
|
|
self.user_input = user_input
|
|
|
|
def start_interaction_loop(self):
|
|
# Interaction Loop
|
|
cfg = Config()
|
|
loop_count = 0
|
|
while True:
|
|
# Discontinue if continuous limit is reached
|
|
loop_count += 1
|
|
if cfg.continuous_mode and cfg.continuous_limit > 0 and loop_count > cfg.continuous_limit:
|
|
logger.typewriter_log("Continuous Limit Reached: ", Fore.YELLOW, f"{cfg.continuous_limit}")
|
|
break
|
|
|
|
# Send message to AI, get response
|
|
with Spinner("Thinking... "):
|
|
assistant_reply = chat.chat_with_ai(
|
|
self.prompt,
|
|
self.user_input,
|
|
self.full_message_history,
|
|
self.memory,
|
|
cfg.fast_token_limit) # TODO: This hardcodes the model to use GPT3.5. Make this an argument
|
|
|
|
# Print Assistant thoughts
|
|
print_assistant_thoughts(assistant_reply)
|
|
|
|
# Get command name and arguments
|
|
try:
|
|
command_name, arguments = cmd.get_command(
|
|
attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply))
|
|
if cfg.speak_mode:
|
|
speak.say_text(f"I want to execute {command_name}")
|
|
except Exception as e:
|
|
logger.error("Error: \n", str(e))
|
|
|
|
if not cfg.continuous_mode and self.next_action_count == 0:
|
|
### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
|
|
# Get key press: Prompt the user to press enter to continue or escape
|
|
# to exit
|
|
self.user_input = ""
|
|
logger.typewriter_log(
|
|
"NEXT ACTION: ",
|
|
Fore.CYAN,
|
|
f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}")
|
|
print(
|
|
f"Enter 'y' to authorise command, 'y -N' to run N continuous commands, 'n' to exit program, or enter feedback for {self.ai_name}...",
|
|
flush=True)
|
|
while True:
|
|
console_input = utils.clean_input(Fore.MAGENTA + "Input:" + Style.RESET_ALL)
|
|
if console_input.lower().rstrip() == "y":
|
|
self.user_input = "GENERATE NEXT COMMAND JSON"
|
|
break
|
|
elif console_input.lower().startswith("y -"):
|
|
try:
|
|
self.next_action_count = abs(int(console_input.split(" ")[1]))
|
|
self.user_input = "GENERATE NEXT COMMAND JSON"
|
|
except ValueError:
|
|
print("Invalid input format. Please enter 'y -n' where n is the number of continuous tasks.")
|
|
continue
|
|
break
|
|
elif console_input.lower() == "n":
|
|
self.user_input = "EXIT"
|
|
break
|
|
else:
|
|
self.user_input = console_input
|
|
command_name = "human_feedback"
|
|
break
|
|
|
|
if self.user_input == "GENERATE NEXT COMMAND JSON":
|
|
logger.typewriter_log(
|
|
"-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=",
|
|
Fore.MAGENTA,
|
|
"")
|
|
elif self.user_input == "EXIT":
|
|
print("Exiting...", flush=True)
|
|
break
|
|
else:
|
|
# Print command
|
|
logger.typewriter_log(
|
|
"NEXT ACTION: ",
|
|
Fore.CYAN,
|
|
f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}")
|
|
|
|
# Execute command
|
|
if command_name is not None and command_name.lower().startswith("error"):
|
|
result = f"Command {command_name} threw the following error: " + arguments
|
|
elif command_name == "human_feedback":
|
|
result = f"Human feedback: {self.user_input}"
|
|
else:
|
|
result = f"Command {command_name} returned: {cmd.execute_command(command_name, arguments)}"
|
|
if self.next_action_count > 0:
|
|
self.next_action_count -= 1
|
|
|
|
memory_to_add = f"Assistant Reply: {assistant_reply} " \
|
|
f"\nResult: {result} " \
|
|
f"\nHuman Feedback: {self.user_input} "
|
|
|
|
self.memory.add(memory_to_add)
|
|
|
|
# Check if there's a result from the command append it to the message
|
|
# history
|
|
if result is not None:
|
|
self.full_message_history.append(chat.create_chat_message("system", result))
|
|
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result)
|
|
else:
|
|
self.full_message_history.append(
|
|
chat.create_chat_message(
|
|
"system", "Unable to execute command"))
|
|
logger.typewriter_log("SYSTEM: ", Fore.YELLOW, "Unable to execute command")
|
|
|
|
|
|
def attempt_to_fix_json_by_finding_outermost_brackets(json_string):
|
|
cfg = Config()
|
|
if cfg.speak_mode and cfg.debug_mode:
|
|
speak.say_text("I have received an invalid JSON response from the OpenAI API. Trying to fix it now.")
|
|
logger.typewriter_log("Attempting to fix JSON by finding outermost brackets\n")
|
|
|
|
try:
|
|
# Use regex to search for JSON objects
|
|
import regex
|
|
json_pattern = regex.compile(r"\{(?:[^{}]|(?R))*\}")
|
|
json_match = json_pattern.search(json_string)
|
|
|
|
if json_match:
|
|
# Extract the valid JSON object from the string
|
|
json_string = json_match.group(0)
|
|
logger.typewriter_log(title="Apparently json was fixed.", title_color=Fore.GREEN)
|
|
if cfg.speak_mode and cfg.debug_mode:
|
|
speak.say_text("Apparently json was fixed.")
|
|
else:
|
|
raise ValueError("No valid JSON object found")
|
|
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
if cfg.speak_mode:
|
|
speak.say_text("Didn't work. I will have to ignore this response then.")
|
|
logger.error("Error: Invalid JSON, setting it to empty JSON now.\n")
|
|
json_string = {}
|
|
|
|
return json_string
|
|
|
|
|
|
def print_assistant_thoughts(assistant_reply):
|
|
"""Prints the assistant's thoughts to the console"""
|
|
global ai_name
|
|
global cfg
|
|
cfg = Config()
|
|
try:
|
|
try:
|
|
# Parse and print Assistant response
|
|
assistant_reply_json = fix_and_parse_json(assistant_reply)
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Error: Invalid JSON in assistant thoughts\n", assistant_reply)
|
|
assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply)
|
|
assistant_reply_json = fix_and_parse_json(assistant_reply_json)
|
|
|
|
# Check if assistant_reply_json is a string and attempt to parse it into a JSON object
|
|
if isinstance(assistant_reply_json, str):
|
|
try:
|
|
assistant_reply_json = json.loads(assistant_reply_json)
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Error: Invalid JSON\n", assistant_reply)
|
|
assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply_json)
|
|
|
|
assistant_thoughts_reasoning = None
|
|
assistant_thoughts_plan = None
|
|
assistant_thoughts_speak = None
|
|
assistant_thoughts_criticism = None
|
|
assistant_thoughts = assistant_reply_json.get("thoughts", {})
|
|
assistant_thoughts_text = assistant_thoughts.get("text")
|
|
|
|
if assistant_thoughts:
|
|
assistant_thoughts_reasoning = assistant_thoughts.get("reasoning")
|
|
assistant_thoughts_plan = assistant_thoughts.get("plan")
|
|
assistant_thoughts_criticism = assistant_thoughts.get("criticism")
|
|
assistant_thoughts_speak = assistant_thoughts.get("speak")
|
|
|
|
logger.typewriter_log(f"{ai_name.upper()} THOUGHTS:", Fore.YELLOW, assistant_thoughts_text)
|
|
logger.typewriter_log("REASONING:", Fore.YELLOW, assistant_thoughts_reasoning)
|
|
|
|
if assistant_thoughts_plan:
|
|
logger.typewriter_log("PLAN:", Fore.YELLOW, "")
|
|
# If it's a list, join it into a string
|
|
if isinstance(assistant_thoughts_plan, list):
|
|
assistant_thoughts_plan = "\n".join(assistant_thoughts_plan)
|
|
elif isinstance(assistant_thoughts_plan, dict):
|
|
assistant_thoughts_plan = str(assistant_thoughts_plan)
|
|
|
|
# Split the input_string using the newline character and dashes
|
|
lines = assistant_thoughts_plan.split('\n')
|
|
for line in lines:
|
|
line = line.lstrip("- ")
|
|
logger.typewriter_log("- ", Fore.GREEN, line.strip())
|
|
|
|
logger.typewriter_log("CRITICISM:", Fore.YELLOW, assistant_thoughts_criticism)
|
|
# Speak the assistant's thoughts
|
|
if cfg.speak_mode and assistant_thoughts_speak:
|
|
speak.say_text(assistant_thoughts_speak)
|
|
|
|
return assistant_reply_json
|
|
except json.decoder.JSONDecodeError as e:
|
|
logger.error("Error: Invalid JSON\n", assistant_reply)
|
|
if cfg.speak_mode:
|
|
speak.say_text("I have received an invalid JSON response from the OpenAI API. I cannot ignore this response.")
|
|
|
|
# All other errors, return "Error: + error message"
|
|
except Exception as e:
|
|
call_stack = traceback.format_exc()
|
|
logger.error("Error: \n", call_stack)
|