From a24ab0e87994eb7e314fb9e2ff17a97f68408aba Mon Sep 17 00:00:00 2001 From: Peter Date: Thu, 6 Apr 2023 14:13:23 -0700 Subject: [PATCH] dynamically load commands from registry --- scripts/agent_manager.py | 4 ++ scripts/ai_functions.py | 8 ++- scripts/auto_gpt/__init__.py | 0 scripts/auto_gpt/commands.py | 114 +++++++++++++++++++++++++++++++++++ scripts/commands.py | 62 ++++--------------- scripts/execute_code.py | 2 + scripts/file_operations.py | 27 +++++---- scripts/main.py | 10 ++- 8 files changed, 160 insertions(+), 67 deletions(-) create mode 100644 scripts/auto_gpt/__init__.py create mode 100644 scripts/auto_gpt/commands.py diff --git a/scripts/agent_manager.py b/scripts/agent_manager.py index ad120c40..9bd87aa9 100644 --- a/scripts/agent_manager.py +++ b/scripts/agent_manager.py @@ -1,3 +1,4 @@ +from auto_gpt.commands import command from llm_utils import create_chat_completion next_key = 0 @@ -31,6 +32,7 @@ def create_agent(task, prompt, model): return key, agent_reply +@command("message_agent", "Message GPT Agent", '"key": "", "message": ""') def message_agent(key, message): global agents @@ -51,6 +53,7 @@ def message_agent(key, message): return agent_reply +@command("list_agents", "List GPT Agents", "") def list_agents(): global agents @@ -58,6 +61,7 @@ def list_agents(): return [(key, task) for key, (task, _, _) in agents.items()] +@command("delete_agent", "Delete GPT Agent", '"key": ""') def delete_agent(key): global agents diff --git a/scripts/ai_functions.py b/scripts/ai_functions.py index 05aa93a2..175dffa2 100644 --- a/scripts/ai_functions.py +++ b/scripts/ai_functions.py @@ -3,10 +3,12 @@ import json from config import Config from call_ai_function import call_ai_function from json_parser import fix_and_parse_json +from auto_gpt.commands import command + cfg = Config() # Evaluating code - +@command("evaluate_code", "Evaluate Code", '"code": ""') def evaluate_code(code: str) -> List[str]: function_string = "def analyze_code(code: str) -> List[str]:" args = [code] @@ -18,7 +20,7 @@ def evaluate_code(code: str) -> List[str]: # Improving code - +@command("improve_code", "Get Improved Code", '"suggestions": "", "code": ""') def improve_code(suggestions: List[str], code: str) -> str: function_string = ( "def generate_improved_code(suggestions: List[str], code: str) -> str:" @@ -32,7 +34,7 @@ def improve_code(suggestions: List[str], code: str) -> str: # Writing tests - +@command("write_tests", "Write Tests", '"code": "", "focus": ""') def write_tests(code: str, focus: List[str]) -> str: function_string = ( "def create_test_cases(code: str, focus: Optional[str] = None) -> str:" diff --git a/scripts/auto_gpt/__init__.py b/scripts/auto_gpt/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/auto_gpt/commands.py b/scripts/auto_gpt/commands.py new file mode 100644 index 00000000..fb05fdb8 --- /dev/null +++ b/scripts/auto_gpt/commands.py @@ -0,0 +1,114 @@ +import os +import sys +import importlib +import inspect +from typing import Callable, Any, List + +# Unique identifier for auto-gpt commands +AUTO_GPT_COMMAND_IDENTIFIER = "auto_gpt_command" + +class Command: + """A class representing a command. + + Attributes: + name (str): The name of the command. + description (str): A brief description of what the command does. + method (Callable[..., Any]): The function that the command executes. + signature (str): The signature of the function that the command executes. Defaults to None. + """ + + def __init__(self, name: str, description: str, method: Callable[..., Any], signature: str = None): + self.name = name + self.description = description + self.method = method + self.signature = signature if signature else str(inspect.signature(self.method)) + + def __call__(self, *args, **kwargs) -> Any: + return self.method(*args, **kwargs) + + def __str__(self) -> str: + return f"{self.name}: {self.description}, args: {self.signature}" + +class CommandRegistry: + """ + The CommandRegistry class is a manager for a collection of Command objects. + It allows the registration, modification, and retrieval of Command objects, + as well as the scanning and loading of command plugins from a specified + directory. + """ + + def __init__(self): + self.commands = {} + + def _import_module(self, module_name: str) -> Any: + return importlib.import_module(module_name) + + def _reload_module(self, module: Any) -> Any: + return importlib.reload(module) + + def register_command(self, cmd: Command) -> None: + self.commands[cmd.name] = cmd + + def reload_commands(self) -> None: + """Reloads all loaded command plugins.""" + for cmd_name in self.commands: + cmd = self.commands[cmd_name] + module = self._import_module(cmd.__module__) + reloaded_module = self._reload_module(module) + if hasattr(reloaded_module, "register"): + reloaded_module.register(self) + + def get_command(self, name: str) -> Callable[..., Any]: + return self.commands.get(name) + + def list_commands(self) -> List[str]: + return [str(cmd) for cmd in self.commands.values()] + + def command_prompt(self) -> str: + """ + Returns a string representation of all registered `Command` objects for use in a prompt + """ + commands_list = [f"{idx + 1}. {str(cmd)}" for idx, cmd in enumerate(self.commands.values())] + return "\n".join(commands_list) + + def scan_directory_for_plugins(self, directory: str) -> None: + """ + Scans the specified directory for Python files containing command plugins. + + For each file in the directory that ends with ".py", this method imports the associated module and registers any + functions or classes that are decorated with the `AUTO_GPT_COMMAND_IDENTIFIER` attribute as `Command` objects. + The registered `Command` objects are then added to the `commands` dictionary of the `CommandRegistry` object. + + Args: + directory (str): The directory to scan for command plugins. + """ + + for file in os.listdir(directory): + if file.endswith(".py"): + module_name = file[:-3] + module = importlib.import_module(module_name) + for attr_name in dir(module): + attr = getattr(module, attr_name) + # Register decorated functions + if hasattr(attr, AUTO_GPT_COMMAND_IDENTIFIER) and getattr(attr, AUTO_GPT_COMMAND_IDENTIFIER): + self.register_command(attr.register_command) + # Register command classes + elif inspect.isclass(attr) and issubclass(attr, Command) and attr != Command: + cmd_instance = attr() + self.register_command(cmd_instance) + + +def command(name: str, description: str, signature: str = None) -> Callable[..., Any]: + """The command decorator is used to create Command objects from ordinary functions.""" + def decorator(func: Callable[..., Any]) -> Command: + cmd = Command(name=name, description=description, method=func, signature=signature) + + def wrapper(*args, **kwargs) -> Any: + return func(*args, **kwargs) + + wrapper.register_command = cmd + setattr(wrapper, AUTO_GPT_COMMAND_IDENTIFIER, True) + return wrapper + + return decorator + diff --git a/scripts/commands.py b/scripts/commands.py index fc10d1d0..78f5dbe3 100644 --- a/scripts/commands.py +++ b/scripts/commands.py @@ -5,13 +5,10 @@ import datetime import agent_manager as agents import speak from config import Config -import ai_functions as ai -from file_operations import read_file, write_to_file, append_to_file, delete_file, search_files -from execute_code import execute_python_file from json_parser import fix_and_parse_json from duckduckgo_search import ddg -from googleapiclient.discovery import build -from googleapiclient.errors import HttpError + +from auto_gpt.commands import CommandRegistry, command cfg = Config() @@ -51,62 +48,27 @@ def get_command(response): return "Error:", str(e) -def execute_command(command_name, arguments): - memory = PineconeMemory() +def execute_command(command_registry: CommandRegistry, command_name: str, arguments: dict) -> str: try: + # Look up the command in the registry + cmd = command_registry.commands.get(command_name) + + # If the command is found, call it with the provided arguments + if cmd: + return cmd(**arguments) + # special case google until this can be moved down into the function. if command_name == "google": - # Check if the Google API key is set and use the official search method # If the API key is not set or has only whitespaces, use the unofficial search method if cfg.google_api_key and (cfg.google_api_key.strip() if cfg.google_api_key else None): return google_official_search(arguments["input"]) else: return google_search(arguments["input"]) - elif command_name == "memory_add": - return memory.add(arguments["string"]) - elif command_name == "start_agent": - return start_agent( - arguments["name"], - arguments["task"], - arguments["prompt"]) - elif command_name == "message_agent": - return message_agent(arguments["key"], arguments["message"]) - elif command_name == "list_agents": - return list_agents() - elif command_name == "delete_agent": - return delete_agent(arguments["key"]) - elif command_name == "get_text_summary": - return get_text_summary(arguments["url"], arguments["question"]) - elif command_name == "get_hyperlinks": - return get_hyperlinks(arguments["url"]) - elif command_name == "read_file": - return read_file(arguments["file"]) - elif command_name == "write_to_file": - return write_to_file(arguments["file"], arguments["text"]) - elif command_name == "append_to_file": - return append_to_file(arguments["file"], arguments["text"]) - elif command_name == "delete_file": - return delete_file(arguments["file"]) - elif command_name == "search_files": - return search_files(arguments["directory"]) - elif command_name == "browse_website": - return browse_website(arguments["url"], arguments["question"]) - # TODO: Change these to take in a file rather than pasted code, if - # non-file is given, return instructions "Input should be a python - # filepath, write your code to file and try again" - elif command_name == "evaluate_code": - return ai.evaluate_code(arguments["code"]) - elif command_name == "improve_code": - return ai.improve_code(arguments["suggestions"], arguments["code"]) - elif command_name == "write_tests": - return ai.write_tests(arguments["code"], arguments.get("focus")) - elif command_name == "execute_python_file": # Add this command - return execute_python_file(arguments["file"]) elif command_name == "task_complete": shutdown() else: return f"Unknown command {command_name}" - # All errors, return "Error: + error message" + except Exception as e: return "Error: " + str(e) @@ -158,6 +120,7 @@ def google_official_search(query, num_results=8): # Return the list of search result URLs return search_results_links +@command("browse_website", "Browse Website", '"url": "", "question": ""') def browse_website(url, question): summary = get_text_summary(url, question) links = get_hyperlinks(url) @@ -230,6 +193,7 @@ def shutdown(): quit() +@command("start_agent", "Start GPT Agent", '"name": "", "task": "", "prompt": ""') def start_agent(name, task, prompt, model=cfg.fast_llm_model): global cfg diff --git a/scripts/execute_code.py b/scripts/execute_code.py index 614ef6fc..a90ab4a0 100644 --- a/scripts/execute_code.py +++ b/scripts/execute_code.py @@ -1,7 +1,9 @@ import docker import os +from auto_gpt.commands import command +@command("execute_python_file", "Execute Python File", '"file": ""') def execute_python_file(file): workspace_folder = "auto_gpt_workspace" diff --git a/scripts/file_operations.py b/scripts/file_operations.py index 90c9a1e4..140123c5 100644 --- a/scripts/file_operations.py +++ b/scripts/file_operations.py @@ -1,5 +1,7 @@ import os import os.path +from auto_gpt.commands import command + # Set a dedicated folder for file I/O working_directory = "auto_gpt_workspace" @@ -17,20 +19,20 @@ def safe_join(base, *paths): return norm_new_path - -def read_file(filename): +@command("read_file", "Read file", '"file": ""') +def read_file(file): try: - filepath = safe_join(working_directory, filename) + filepath = safe_join(working_directory, file) with open(filepath, "r") as f: content = f.read() return content except Exception as e: return "Error: " + str(e) - -def write_to_file(filename, text): +@command("write_to_file", "Write to file", '"file": "", "text": ""') +def write_to_file(file, text): try: - filepath = safe_join(working_directory, filename) + filepath = safe_join(working_directory, file) directory = os.path.dirname(filepath) if not os.path.exists(directory): os.makedirs(directory) @@ -40,25 +42,26 @@ def write_to_file(filename, text): except Exception as e: return "Error: " + str(e) - -def append_to_file(filename, text): +@command("append_to_file", "Append to file", '"file": "", "text": ""') +def append_to_file(file, text): try: - filepath = safe_join(working_directory, filename) + filepath = safe_join(working_directory, file) with open(filepath, "a") as f: f.write(text) return "Text appended successfully." except Exception as e: return "Error: " + str(e) - -def delete_file(filename): +@command("delete_file", "Delete file", '"file": ""') +def delete_file(file): try: - filepath = safe_join(working_directory, filename) + filepath = safe_join(working_directory, file) os.remove(filepath) return "File deleted successfully." except Exception as e: return "Error: " + str(e) +@command("search_files", "Search Files", '"directory": ""') def search_files(directory): found_files = [] diff --git a/scripts/main.py b/scripts/main.py index a79fd553..5337ceeb 100644 --- a/scripts/main.py +++ b/scripts/main.py @@ -8,14 +8,13 @@ from colorama import Fore, Style from spinner import Spinner import time import speak -from enum import Enum, auto -import sys from config import Config from json_parser import fix_and_parse_json from ai_config import AIConfig import traceback import yaml import argparse +from auto_gpt.commands import CommandRegistry def print_to_console( @@ -281,6 +280,7 @@ next_action_count = 0 # Make a constant: user_input = "Determine which next command to use, and respond using the format specified above:" + # Initialize memory and make sure it is empty. # this is particularly important for indexing and referencing pinecone memory memory = PineconeMemory() @@ -288,6 +288,10 @@ memory.clear() print('Using memory of type: ' + memory.__class__.__name__) +# Create a CommandRegistry instance and scan default folder +command_registry = CommandRegistry() +command_registry.scan_directory_for_plugins('./scripts') + # Interaction Loop while True: # Send message to AI, get response @@ -362,7 +366,7 @@ while True: elif command_name == "human_feedback": result = f"Human feedback: {user_input}" else: - result = f"Command {command_name} returned: {cmd.execute_command(command_name, arguments)}" + result = f"Command {command_name} returned: {cmd.execute_command(command_registry, command_name, arguments)}" if next_action_count > 0: next_action_count -= 1