dynamically load commands from registry

This commit is contained in:
Peter
2023-04-06 14:13:23 -07:00
parent 3f106963a8
commit a24ab0e879
8 changed files with 160 additions and 67 deletions

View File

@@ -1,3 +1,4 @@
from auto_gpt.commands import command
from llm_utils import create_chat_completion
next_key = 0
@@ -31,6 +32,7 @@ def create_agent(task, prompt, model):
return key, agent_reply
@command("message_agent", "Message GPT Agent", '"key": "<key>", "message": "<message>"')
def message_agent(key, message):
global agents
@@ -51,6 +53,7 @@ def message_agent(key, message):
return agent_reply
@command("list_agents", "List GPT Agents", "")
def list_agents():
global agents
@@ -58,6 +61,7 @@ def list_agents():
return [(key, task) for key, (task, _, _) in agents.items()]
@command("delete_agent", "Delete GPT Agent", '"key": "<key>"')
def delete_agent(key):
global agents

View File

@@ -3,10 +3,12 @@ import json
from config import Config
from call_ai_function import call_ai_function
from json_parser import fix_and_parse_json
from auto_gpt.commands import command
cfg = Config()
# Evaluating code
@command("evaluate_code", "Evaluate Code", '"code": "<full _code_string>"')
def evaluate_code(code: str) -> List[str]:
function_string = "def analyze_code(code: str) -> List[str]:"
args = [code]
@@ -18,7 +20,7 @@ def evaluate_code(code: str) -> List[str]:
# Improving code
@command("improve_code", "Get Improved Code", '"suggestions": "<list_of_suggestions>", "code": "<full_code_string>"')
def improve_code(suggestions: List[str], code: str) -> str:
function_string = (
"def generate_improved_code(suggestions: List[str], code: str) -> str:"
@@ -32,7 +34,7 @@ def improve_code(suggestions: List[str], code: str) -> str:
# Writing tests
@command("write_tests", "Write Tests", '"code": "<full_code_string>", "focus": "<list_of_focus_areas>"')
def write_tests(code: str, focus: List[str]) -> str:
function_string = (
"def create_test_cases(code: str, focus: Optional[str] = None) -> str:"

View File

View File

@@ -0,0 +1,114 @@
import os
import sys
import importlib
import inspect
from typing import Callable, Any, List
# Unique identifier for auto-gpt commands
AUTO_GPT_COMMAND_IDENTIFIER = "auto_gpt_command"
class Command:
"""A class representing a command.
Attributes:
name (str): The name of the command.
description (str): A brief description of what the command does.
method (Callable[..., Any]): The function that the command executes.
signature (str): The signature of the function that the command executes. Defaults to None.
"""
def __init__(self, name: str, description: str, method: Callable[..., Any], signature: str = None):
self.name = name
self.description = description
self.method = method
self.signature = signature if signature else str(inspect.signature(self.method))
def __call__(self, *args, **kwargs) -> Any:
return self.method(*args, **kwargs)
def __str__(self) -> str:
return f"{self.name}: {self.description}, args: {self.signature}"
class CommandRegistry:
"""
The CommandRegistry class is a manager for a collection of Command objects.
It allows the registration, modification, and retrieval of Command objects,
as well as the scanning and loading of command plugins from a specified
directory.
"""
def __init__(self):
self.commands = {}
def _import_module(self, module_name: str) -> Any:
return importlib.import_module(module_name)
def _reload_module(self, module: Any) -> Any:
return importlib.reload(module)
def register_command(self, cmd: Command) -> None:
self.commands[cmd.name] = cmd
def reload_commands(self) -> None:
"""Reloads all loaded command plugins."""
for cmd_name in self.commands:
cmd = self.commands[cmd_name]
module = self._import_module(cmd.__module__)
reloaded_module = self._reload_module(module)
if hasattr(reloaded_module, "register"):
reloaded_module.register(self)
def get_command(self, name: str) -> Callable[..., Any]:
return self.commands.get(name)
def list_commands(self) -> List[str]:
return [str(cmd) for cmd in self.commands.values()]
def command_prompt(self) -> str:
"""
Returns a string representation of all registered `Command` objects for use in a prompt
"""
commands_list = [f"{idx + 1}. {str(cmd)}" for idx, cmd in enumerate(self.commands.values())]
return "\n".join(commands_list)
def scan_directory_for_plugins(self, directory: str) -> None:
"""
Scans the specified directory for Python files containing command plugins.
For each file in the directory that ends with ".py", this method imports the associated module and registers any
functions or classes that are decorated with the `AUTO_GPT_COMMAND_IDENTIFIER` attribute as `Command` objects.
The registered `Command` objects are then added to the `commands` dictionary of the `CommandRegistry` object.
Args:
directory (str): The directory to scan for command plugins.
"""
for file in os.listdir(directory):
if file.endswith(".py"):
module_name = file[:-3]
module = importlib.import_module(module_name)
for attr_name in dir(module):
attr = getattr(module, attr_name)
# Register decorated functions
if hasattr(attr, AUTO_GPT_COMMAND_IDENTIFIER) and getattr(attr, AUTO_GPT_COMMAND_IDENTIFIER):
self.register_command(attr.register_command)
# Register command classes
elif inspect.isclass(attr) and issubclass(attr, Command) and attr != Command:
cmd_instance = attr()
self.register_command(cmd_instance)
def command(name: str, description: str, signature: str = None) -> Callable[..., Any]:
"""The command decorator is used to create Command objects from ordinary functions."""
def decorator(func: Callable[..., Any]) -> Command:
cmd = Command(name=name, description=description, method=func, signature=signature)
def wrapper(*args, **kwargs) -> Any:
return func(*args, **kwargs)
wrapper.register_command = cmd
setattr(wrapper, AUTO_GPT_COMMAND_IDENTIFIER, True)
return wrapper
return decorator

View File

@@ -5,13 +5,10 @@ import datetime
import agent_manager as agents
import speak
from config import Config
import ai_functions as ai
from file_operations import read_file, write_to_file, append_to_file, delete_file, search_files
from execute_code import execute_python_file
from json_parser import fix_and_parse_json
from duckduckgo_search import ddg
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from auto_gpt.commands import CommandRegistry, command
cfg = Config()
@@ -51,62 +48,27 @@ def get_command(response):
return "Error:", str(e)
def execute_command(command_name, arguments):
memory = PineconeMemory()
def execute_command(command_registry: CommandRegistry, command_name: str, arguments: dict) -> str:
try:
# Look up the command in the registry
cmd = command_registry.commands.get(command_name)
# If the command is found, call it with the provided arguments
if cmd:
return cmd(**arguments)
# special case google until this can be moved down into the function.
if command_name == "google":
# Check if the Google API key is set and use the official search method
# If the API key is not set or has only whitespaces, use the unofficial search method
if cfg.google_api_key and (cfg.google_api_key.strip() if cfg.google_api_key else None):
return google_official_search(arguments["input"])
else:
return google_search(arguments["input"])
elif command_name == "memory_add":
return memory.add(arguments["string"])
elif command_name == "start_agent":
return start_agent(
arguments["name"],
arguments["task"],
arguments["prompt"])
elif command_name == "message_agent":
return message_agent(arguments["key"], arguments["message"])
elif command_name == "list_agents":
return list_agents()
elif command_name == "delete_agent":
return delete_agent(arguments["key"])
elif command_name == "get_text_summary":
return get_text_summary(arguments["url"], arguments["question"])
elif command_name == "get_hyperlinks":
return get_hyperlinks(arguments["url"])
elif command_name == "read_file":
return read_file(arguments["file"])
elif command_name == "write_to_file":
return write_to_file(arguments["file"], arguments["text"])
elif command_name == "append_to_file":
return append_to_file(arguments["file"], arguments["text"])
elif command_name == "delete_file":
return delete_file(arguments["file"])
elif command_name == "search_files":
return search_files(arguments["directory"])
elif command_name == "browse_website":
return browse_website(arguments["url"], arguments["question"])
# TODO: Change these to take in a file rather than pasted code, if
# non-file is given, return instructions "Input should be a python
# filepath, write your code to file and try again"
elif command_name == "evaluate_code":
return ai.evaluate_code(arguments["code"])
elif command_name == "improve_code":
return ai.improve_code(arguments["suggestions"], arguments["code"])
elif command_name == "write_tests":
return ai.write_tests(arguments["code"], arguments.get("focus"))
elif command_name == "execute_python_file": # Add this command
return execute_python_file(arguments["file"])
elif command_name == "task_complete":
shutdown()
else:
return f"Unknown command {command_name}"
# All errors, return "Error: + error message"
except Exception as e:
return "Error: " + str(e)
@@ -158,6 +120,7 @@ def google_official_search(query, num_results=8):
# Return the list of search result URLs
return search_results_links
@command("browse_website", "Browse Website", '"url": "<url>", "question": "<what_you_want_to_find_on_website>"')
def browse_website(url, question):
summary = get_text_summary(url, question)
links = get_hyperlinks(url)
@@ -230,6 +193,7 @@ def shutdown():
quit()
@command("start_agent", "Start GPT Agent", '"name": "<name>", "task": "<short_task_desc>", "prompt": "<prompt>"')
def start_agent(name, task, prompt, model=cfg.fast_llm_model):
global cfg

View File

@@ -1,7 +1,9 @@
import docker
import os
from auto_gpt.commands import command
@command("execute_python_file", "Execute Python File", '"file": "<file>"')
def execute_python_file(file):
workspace_folder = "auto_gpt_workspace"

View File

@@ -1,5 +1,7 @@
import os
import os.path
from auto_gpt.commands import command
# Set a dedicated folder for file I/O
working_directory = "auto_gpt_workspace"
@@ -17,20 +19,20 @@ def safe_join(base, *paths):
return norm_new_path
def read_file(filename):
@command("read_file", "Read file", '"file": "<file>"')
def read_file(file):
try:
filepath = safe_join(working_directory, filename)
filepath = safe_join(working_directory, file)
with open(filepath, "r") as f:
content = f.read()
return content
except Exception as e:
return "Error: " + str(e)
def write_to_file(filename, text):
@command("write_to_file", "Write to file", '"file": "<file>", "text": "<text>"')
def write_to_file(file, text):
try:
filepath = safe_join(working_directory, filename)
filepath = safe_join(working_directory, file)
directory = os.path.dirname(filepath)
if not os.path.exists(directory):
os.makedirs(directory)
@@ -40,25 +42,26 @@ def write_to_file(filename, text):
except Exception as e:
return "Error: " + str(e)
def append_to_file(filename, text):
@command("append_to_file", "Append to file", '"file": "<file>", "text": "<text>"')
def append_to_file(file, text):
try:
filepath = safe_join(working_directory, filename)
filepath = safe_join(working_directory, file)
with open(filepath, "a") as f:
f.write(text)
return "Text appended successfully."
except Exception as e:
return "Error: " + str(e)
def delete_file(filename):
@command("delete_file", "Delete file", '"file": "<file>"')
def delete_file(file):
try:
filepath = safe_join(working_directory, filename)
filepath = safe_join(working_directory, file)
os.remove(filepath)
return "File deleted successfully."
except Exception as e:
return "Error: " + str(e)
@command("search_files", "Search Files", '"directory": "<directory>"')
def search_files(directory):
found_files = []

View File

@@ -8,14 +8,13 @@ from colorama import Fore, Style
from spinner import Spinner
import time
import speak
from enum import Enum, auto
import sys
from config import Config
from json_parser import fix_and_parse_json
from ai_config import AIConfig
import traceback
import yaml
import argparse
from auto_gpt.commands import CommandRegistry
def print_to_console(
@@ -281,6 +280,7 @@ next_action_count = 0
# Make a constant:
user_input = "Determine which next command to use, and respond using the format specified above:"
# Initialize memory and make sure it is empty.
# this is particularly important for indexing and referencing pinecone memory
memory = PineconeMemory()
@@ -288,6 +288,10 @@ memory.clear()
print('Using memory of type: ' + memory.__class__.__name__)
# Create a CommandRegistry instance and scan default folder
command_registry = CommandRegistry()
command_registry.scan_directory_for_plugins('./scripts')
# Interaction Loop
while True:
# Send message to AI, get response
@@ -362,7 +366,7 @@ while True:
elif command_name == "human_feedback":
result = f"Human feedback: {user_input}"
else:
result = f"Command {command_name} returned: {cmd.execute_command(command_name, arguments)}"
result = f"Command {command_name} returned: {cmd.execute_command(command_registry, command_name, arguments)}"
if next_action_count > 0:
next_action_count -= 1