import json import random import commands as cmd import utils from memory import get_memory, get_supported_memory_backends import chat from colorama import Fore, Style from spinner import Spinner import time import speak from config import Config from json_parser import fix_and_parse_json from ai_config import AIConfig import traceback import yaml import argparse from logger import logger import logging from prompt import get_prompt cfg = Config() def check_openai_api_key(): """Check if the OpenAI API key is set in config.py or as an environment variable.""" if not cfg.openai_api_key: print( Fore.RED + "Please set your OpenAI API key in .env or as an environment variable." ) print("You can get your key from https://beta.openai.com/account/api-keys") exit(1) def attempt_to_fix_json_by_finding_outermost_brackets(json_string): if cfg.speak_mode and cfg.debug_mode: speak.say_text("I have received an invalid JSON response from the OpenAI API. Trying to fix it now.") logger.typewriter_log("Attempting to fix JSON by finding outermost brackets\n") try: # Use regex to search for JSON objects import regex json_pattern = regex.compile(r"\{(?:[^{}]|(?R))*\}") json_match = json_pattern.search(json_string) if json_match: # Extract the valid JSON object from the string json_string = json_match.group(0) logger.typewriter_log(title="Apparently json was fixed.", title_color=Fore.GREEN) if cfg.speak_mode and cfg.debug_mode: speak.say_text("Apparently json was fixed.") else: raise ValueError("No valid JSON object found") except (json.JSONDecodeError, ValueError) as e: if cfg.speak_mode: speak.say_text("Didn't work. I will have to ignore this response then.") logger.error("Error: Invalid JSON, setting it to empty JSON now.\n") json_string = {} return json_string def print_assistant_thoughts(assistant_reply): """Prints the assistant's thoughts to the console""" global ai_name global cfg try: try: # Parse and print Assistant response assistant_reply_json = fix_and_parse_json(assistant_reply) except json.JSONDecodeError as e: logger.error("Error: Invalid JSON in assistant thoughts\n", assistant_reply) assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply) assistant_reply_json = fix_and_parse_json(assistant_reply_json) # Check if assistant_reply_json is a string and attempt to parse it into a JSON object if isinstance(assistant_reply_json, str): try: assistant_reply_json = json.loads(assistant_reply_json) except json.JSONDecodeError as e: logger.error("Error: Invalid JSON\n", assistant_reply) assistant_reply_json = attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply_json) assistant_thoughts_reasoning = None assistant_thoughts_plan = None assistant_thoughts_speak = None assistant_thoughts_criticism = None assistant_thoughts = assistant_reply_json.get("thoughts", {}) assistant_thoughts_text = assistant_thoughts.get("text") if assistant_thoughts: assistant_thoughts_reasoning = assistant_thoughts.get("reasoning") assistant_thoughts_plan = assistant_thoughts.get("plan") assistant_thoughts_criticism = assistant_thoughts.get("criticism") assistant_thoughts_speak = assistant_thoughts.get("speak") logger.typewriter_log(f"{ai_name.upper()} THOUGHTS:", Fore.YELLOW, assistant_thoughts_text) logger.typewriter_log("REASONING:", Fore.YELLOW, assistant_thoughts_reasoning) if assistant_thoughts_plan: logger.typewriter_log("PLAN:", Fore.YELLOW, "") # If it's a list, join it into a string if isinstance(assistant_thoughts_plan, list): assistant_thoughts_plan = "\n".join(assistant_thoughts_plan) elif isinstance(assistant_thoughts_plan, dict): assistant_thoughts_plan = str(assistant_thoughts_plan) # Split the input_string using the newline character and dashes lines = assistant_thoughts_plan.split('\n') for line in lines: line = line.lstrip("- ") logger.typewriter_log("- ", Fore.GREEN, line.strip()) logger.typewriter_log("CRITICISM:", Fore.YELLOW, assistant_thoughts_criticism) # Speak the assistant's thoughts if cfg.speak_mode and assistant_thoughts_speak: speak.say_text(assistant_thoughts_speak) return assistant_reply_json except json.decoder.JSONDecodeError as e: logger.error("Error: Invalid JSON\n", assistant_reply) if cfg.speak_mode: speak.say_text("I have received an invalid JSON response from the OpenAI API. I cannot ignore this response.") # All other errors, return "Error: + error message" except Exception as e: call_stack = traceback.format_exc() logger.error("Error: \n", call_stack) def construct_prompt(): """Construct the prompt for the AI to respond to""" config = AIConfig.load() if config.ai_name: logger.typewriter_log( f"Welcome back! ", Fore.GREEN, f"Would you like me to return to being {config.ai_name}?", speak_text=True) should_continue = utils.clean_input(f"""Continue with the last settings? Name: {config.ai_name} Role: {config.ai_role} Goals: {config.ai_goals} Continue (y/n): """) if should_continue.lower() == "n": config = AIConfig() if not config.ai_name: config = prompt_user() config.save() # Get rid of this global: global ai_name ai_name = config.ai_name full_prompt = config.construct_full_prompt() return full_prompt def prompt_user(): """Prompt the user for input""" ai_name = "" # Construct the prompt logger.typewriter_log( "Welcome to Auto-GPT! ", Fore.GREEN, "Enter the name of your AI and its role below. Entering nothing will load defaults.", speak_text=True) # Get AI Name from User logger.typewriter_log( "Name your AI: ", Fore.GREEN, "For example, 'Entrepreneur-GPT'") ai_name = utils.clean_input("AI Name: ") if ai_name == "": ai_name = "Entrepreneur-GPT" logger.typewriter_log( f"{ai_name} here!", Fore.LIGHTBLUE_EX, "I am at your service.", speak_text=True) # Get AI Role from User logger.typewriter_log( "Describe your AI's role: ", Fore.GREEN, "For example, 'an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth.'") ai_role = utils.clean_input(f"{ai_name} is: ") if ai_role == "": ai_role = "an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth." # Enter up to 5 goals for the AI logger.typewriter_log( "Enter up to 5 goals for your AI: ", Fore.GREEN, "For example: \nIncrease net worth, Grow Twitter Account, Develop and manage multiple businesses autonomously'") print("Enter nothing to load defaults, enter nothing when finished.", flush=True) ai_goals = [] for i in range(5): ai_goal = utils.clean_input(f"{Fore.LIGHTBLUE_EX}Goal{Style.RESET_ALL} {i+1}: ") if ai_goal == "": break ai_goals.append(ai_goal) if len(ai_goals) == 0: ai_goals = ["Increase net worth", "Grow Twitter Account", "Develop and manage multiple businesses autonomously"] config = AIConfig(ai_name, ai_role, ai_goals) return config def parse_arguments(): """Parses the arguments passed to the script""" global cfg cfg.set_debug_mode(False) cfg.set_continuous_mode(False) cfg.set_speak_mode(False) parser = argparse.ArgumentParser(description='Process arguments.') parser.add_argument('--continuous', action='store_true', help='Enable Continuous Mode') parser.add_argument('--continuous-limit', '-l', type=int, dest="continuous_limit", help='Defines the number of times to run in continuous mode') parser.add_argument('--speak', action='store_true', help='Enable Speak Mode') parser.add_argument('--debug', action='store_true', help='Enable Debug Mode') parser.add_argument('--gpt3only', action='store_true', help='Enable GPT3.5 Only Mode') parser.add_argument('--gpt4only', action='store_true', help='Enable GPT4 Only Mode') parser.add_argument('--use-memory', '-m', dest="memory_type", help='Defines which Memory backend to use') args = parser.parse_args() if args.debug: logger.typewriter_log("Debug Mode: ", Fore.GREEN, "ENABLED") cfg.set_debug_mode(True) if args.continuous: logger.typewriter_log("Continuous Mode: ", Fore.RED, "ENABLED") logger.typewriter_log( "WARNING: ", Fore.RED, "Continuous mode is not recommended. It is potentially dangerous and may cause your AI to run forever or carry out actions you would not usually authorise. Use at your own risk.") cfg.set_continuous_mode(True) if args.continuous_limit: logger.typewriter_log( "Continuous Limit: ", Fore.GREEN, f"{args.continuous_limit}") cfg.set_continuous_limit(args.continuous_limit) # Check if continuous limit is used without continuous mode if args.continuous_limit and not args.continuous: parser.error("--continuous-limit can only be used with --continuous") if args.speak: logger.typewriter_log("Speak Mode: ", Fore.GREEN, "ENABLED") cfg.set_speak_mode(True) if args.gpt3only: logger.typewriter_log("GPT3.5 Only Mode: ", Fore.GREEN, "ENABLED") cfg.set_smart_llm_model(cfg.fast_llm_model) if args.gpt4only: logger.typewriter_log("GPT4 Only Mode: ", Fore.GREEN, "ENABLED") cfg.set_fast_llm_model(cfg.smart_llm_model) if args.memory_type: supported_memory = get_supported_memory_backends() chosen = args.memory_type if not chosen in supported_memory: logger.typewriter_log("ONLY THE FOLLOWING MEMORY BACKENDS ARE SUPPORTED: ", Fore.RED, f'{supported_memory}') logger.typewriter_log(f"Defaulting to: ", Fore.YELLOW, cfg.memory_backend) else: cfg.memory_backend = chosen def main(): global ai_name, memory # TODO: fill in llm values here check_openai_api_key() parse_arguments() logger.set_level(logging.DEBUG if cfg.debug_mode else logging.INFO) ai_name = "" prompt = construct_prompt() # print(prompt) # Initialize variables full_message_history = [] result = None next_action_count = 0 # Make a constant: user_input = "Determine which next command to use, and respond using the format specified above:" # Initialize memory and make sure it is empty. # this is particularly important for indexing and referencing pinecone memory memory = get_memory(cfg, init=True) print('Using memory of type: ' + memory.__class__.__name__) agent = Agent( ai_name=ai_name, memory=memory, full_message_history=full_message_history, next_action_count=next_action_count, prompt=prompt, user_input=user_input ) agent.start_interaction_loop() class Agent: """Agent class for interacting with Auto-GPT. Attributes: ai_name: The name of the agent. memory: The memory object to use. full_message_history: The full message history. next_action_count: The number of actions to execute. prompt: The prompt to use. user_input: The user input. """ def __init__(self, ai_name, memory, full_message_history, next_action_count, prompt, user_input): self.ai_name = ai_name self.memory = memory self.full_message_history = full_message_history self.next_action_count = next_action_count self.prompt = prompt self.user_input = user_input def start_interaction_loop(self): # Interaction Loop loop_count = 0 while True: # Discontinue if continuous limit is reached loop_count += 1 if cfg.continuous_mode and cfg.continuous_limit > 0 and loop_count > cfg.continuous_limit: logger.typewriter_log("Continuous Limit Reached: ", Fore.YELLOW, f"{cfg.continuous_limit}") break # Send message to AI, get response with Spinner("Thinking... "): assistant_reply = chat.chat_with_ai( self.prompt, self.user_input, self.full_message_history, self.memory, cfg.fast_token_limit) # TODO: This hardcodes the model to use GPT3.5. Make this an argument # Print Assistant thoughts print_assistant_thoughts(assistant_reply) # Get command name and arguments try: command_name, arguments = cmd.get_command( attempt_to_fix_json_by_finding_outermost_brackets(assistant_reply)) if cfg.speak_mode: speak.say_text(f"I want to execute {command_name}") except Exception as e: logger.error("Error: \n", str(e)) if not cfg.continuous_mode and self.next_action_count == 0: ### GET USER AUTHORIZATION TO EXECUTE COMMAND ### # Get key press: Prompt the user to press enter to continue or escape # to exit self.user_input = "" logger.typewriter_log( "NEXT ACTION: ", Fore.CYAN, f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}") print( f"Enter 'y' to authorise command, 'y -N' to run N continuous commands, 'n' to exit program, or enter feedback for {self.ai_name}...", flush=True) while True: console_input = utils.clean_input(Fore.MAGENTA + "Input:" + Style.RESET_ALL) if console_input.lower().rstrip() == "y": self.user_input = "GENERATE NEXT COMMAND JSON" break elif console_input.lower().startswith("y -"): try: self.next_action_count = abs(int(console_input.split(" ")[1])) self.user_input = "GENERATE NEXT COMMAND JSON" except ValueError: print("Invalid input format. Please enter 'y -n' where n is the number of continuous tasks.") continue break elif console_input.lower() == "n": self.user_input = "EXIT" break else: self.user_input = console_input command_name = "human_feedback" break if self.user_input == "GENERATE NEXT COMMAND JSON": logger.typewriter_log( "-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=", Fore.MAGENTA, "") elif self.user_input == "EXIT": print("Exiting...", flush=True) break else: # Print command logger.typewriter_log( "NEXT ACTION: ", Fore.CYAN, f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}") # Execute command if command_name is not None and command_name.lower().startswith("error"): result = f"Command {command_name} threw the following error: " + arguments elif command_name == "human_feedback": result = f"Human feedback: {self.user_input}" else: result = f"Command {command_name} returned: {cmd.execute_command(command_name, arguments)}" if self.next_action_count > 0: self.next_action_count -= 1 memory_to_add = f"Assistant Reply: {assistant_reply} " \ f"\nResult: {result} " \ f"\nHuman Feedback: {self.user_input} " self.memory.add(memory_to_add) # Check if there's a result from the command append it to the message # history if result is not None: self.full_message_history.append(chat.create_chat_message("system", result)) logger.typewriter_log("SYSTEM: ", Fore.YELLOW, result) else: self.full_message_history.append( chat.create_chat_message( "system", "Unable to execute command")) logger.typewriter_log("SYSTEM: ", Fore.YELLOW, "Unable to execute command") if __name__ == "__main__": main()