mirror of
https://github.com/aljazceru/Auto-GPT.git
synced 2025-12-26 02:14:27 +01:00
Co-authored-by: merwanehamadi <merwanehamadi@gmail.com> Co-authored-by: Reinier van der Leer <github@pwuts.nl>
137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
""" Command and Control """
|
|
import json
|
|
from typing import Dict
|
|
|
|
from autogpt.agent.agent import Agent
|
|
from autogpt.config import Config
|
|
from autogpt.llm import ChatModelResponse
|
|
|
|
|
|
def is_valid_int(value: str) -> bool:
|
|
"""Check if the value is a valid integer
|
|
|
|
Args:
|
|
value (str): The value to check
|
|
|
|
Returns:
|
|
bool: True if the value is a valid integer, False otherwise
|
|
"""
|
|
try:
|
|
int(value)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def get_command(
|
|
assistant_reply_json: Dict, assistant_reply: ChatModelResponse, config: Config
|
|
):
|
|
"""Parse the response and return the command name and arguments
|
|
|
|
Args:
|
|
assistant_reply_json (dict): The response object from the AI
|
|
assistant_reply (ChatModelResponse): The model response from the AI
|
|
config (Config): The config object
|
|
|
|
Returns:
|
|
tuple: The command name and arguments
|
|
|
|
Raises:
|
|
json.decoder.JSONDecodeError: If the response is not valid JSON
|
|
|
|
Exception: If any other error occurs
|
|
"""
|
|
if config.openai_functions:
|
|
if assistant_reply.function_call is None:
|
|
return "Error:", "No 'function_call' in assistant reply"
|
|
assistant_reply_json["command"] = {
|
|
"name": assistant_reply.function_call.name,
|
|
"args": json.loads(assistant_reply.function_call.arguments),
|
|
}
|
|
try:
|
|
if "command" not in assistant_reply_json:
|
|
return "Error:", "Missing 'command' object in JSON"
|
|
|
|
if not isinstance(assistant_reply_json, dict):
|
|
return (
|
|
"Error:",
|
|
f"The previous message sent was not a dictionary {assistant_reply_json}",
|
|
)
|
|
|
|
command = assistant_reply_json["command"]
|
|
if not isinstance(command, dict):
|
|
return "Error:", "'command' object is not a dictionary"
|
|
|
|
if "name" not in command:
|
|
return "Error:", "Missing 'name' field in 'command' object"
|
|
|
|
command_name = command["name"]
|
|
|
|
# Use an empty dictionary if 'args' field is not present in 'command' object
|
|
arguments = command.get("args", {})
|
|
|
|
return command_name, arguments
|
|
except json.decoder.JSONDecodeError:
|
|
return "Error:", "Invalid JSON"
|
|
# All other errors, return "Error: + error message"
|
|
except Exception as e:
|
|
return "Error:", str(e)
|
|
|
|
|
|
def map_command_synonyms(command_name: str):
|
|
"""Takes the original command name given by the AI, and checks if the
|
|
string matches a list of common/known hallucinations
|
|
"""
|
|
synonyms = [
|
|
("write_file", "write_to_file"),
|
|
("create_file", "write_to_file"),
|
|
("search", "google"),
|
|
]
|
|
for seen_command, actual_command_name in synonyms:
|
|
if command_name == seen_command:
|
|
return actual_command_name
|
|
return command_name
|
|
|
|
|
|
def execute_command(
|
|
command_name: str,
|
|
arguments: dict[str, str],
|
|
agent: Agent,
|
|
):
|
|
"""Execute the command and return the result
|
|
|
|
Args:
|
|
command_name (str): The name of the command to execute
|
|
arguments (dict): The arguments for the command
|
|
agent (Agent): The agent that is executing the command
|
|
|
|
Returns:
|
|
str: The result of the command
|
|
"""
|
|
try:
|
|
cmd = agent.command_registry.commands.get(command_name)
|
|
|
|
# If the command is found, call it with the provided arguments
|
|
if cmd:
|
|
return cmd(**arguments, agent=agent)
|
|
|
|
# TODO: Remove commands below after they are moved to the command registry.
|
|
command_name = map_command_synonyms(command_name.lower())
|
|
|
|
# TODO: Change these to take in a file rather than pasted code, if
|
|
# non-file is given, return instructions "Input should be a python
|
|
# filepath, write your code to file and try again
|
|
for command in agent.ai_config.prompt_generator.commands:
|
|
if (
|
|
command_name == command["label"].lower()
|
|
or command_name == command["name"].lower()
|
|
):
|
|
return command["function"](**arguments)
|
|
return (
|
|
f"Unknown command '{command_name}'. Please refer to the 'COMMANDS'"
|
|
" list for available commands and only respond in the specified JSON"
|
|
" format."
|
|
)
|
|
except Exception as e:
|
|
return f"Error: {str(e)}"
|