Merge pull request #1436 from CalCameron/master

File Logger that tracks changes to file operations to prevent looping
This commit is contained in:
BillSchumacher
2023-04-16 01:33:25 -05:00
committed by GitHub
8 changed files with 80 additions and 22 deletions

View File

@@ -186,7 +186,7 @@ def execute_command(command_name: str, arguments):
elif command_name == "generate_image":
return generate_image(arguments["prompt"])
elif command_name == "send_tweet":
return send_tweet(arguments['text'])
return send_tweet(arguments["text"])
elif command_name == "do_nothing":
return "No action performed."
elif command_name == "task_complete":

View File

@@ -23,7 +23,9 @@ def read_audio(audio):
headers = {"Authorization": f"Bearer {api_token}"}
if api_token is None:
raise ValueError("You need to set your Hugging Face API token in the config file.")
raise ValueError(
"You need to set your Hugging Face API token in the config file."
)
response = requests.post(
api_url,
@@ -31,5 +33,5 @@ def read_audio(audio):
data=audio,
)
text = json.loads(response.content.decode("utf-8"))['text']
text = json.loads(response.content.decode("utf-8"))["text"]
return "The audio says: " + text

View File

@@ -5,15 +5,49 @@ from pathlib import Path
from typing import Generator, List
# Set a dedicated folder for file I/O
WORKING_DIRECTORY = Path(__file__).parent.parent / "auto_gpt_workspace"
WORKING_DIRECTORY = Path(os.getcwd()) / "auto_gpt_workspace"
# Create the directory if it doesn't exist
if not os.path.exists(WORKING_DIRECTORY):
os.makedirs(WORKING_DIRECTORY)
LOG_FILE = "file_logger.txt"
LOG_FILE_PATH = WORKING_DIRECTORY / LOG_FILE
WORKING_DIRECTORY = str(WORKING_DIRECTORY)
def check_duplicate_operation(operation: str, filename: str) -> bool:
"""Check if the operation has already been performed on the given file
Args:
operation (str): The operation to check for
filename (str): The name of the file to check for
Returns:
bool: True if the operation has already been performed on the file
"""
log_content = read_file(LOG_FILE)
log_entry = f"{operation}: {filename}\n"
return log_entry in log_content
def log_operation(operation: str, filename: str) -> None:
"""Log the file operation to the file_logger.txt
Args:
operation (str): The operation to log
filename (str): The name of the file the operation was performed on
"""
log_entry = f"{operation}: {filename}\n"
# Create the log file if it doesn't exist
if not os.path.exists(LOG_FILE_PATH):
with open(LOG_FILE_PATH, "w", encoding="utf-8") as f:
f.write("File Operation Logger ")
append_to_file(LOG_FILE, log_entry)
def safe_join(base: str, *paths) -> str:
"""Join one or more path components intelligently.
@@ -122,6 +156,8 @@ def write_to_file(filename: str, text: str) -> str:
Returns:
str: A message indicating success or failure
"""
if check_duplicate_operation("write", filename):
return "Error: File has already been updated."
try:
filepath = safe_join(WORKING_DIRECTORY, filename)
directory = os.path.dirname(filepath)
@@ -129,6 +165,7 @@ def write_to_file(filename: str, text: str) -> str:
os.makedirs(directory)
with open(filepath, "w", encoding="utf-8") as f:
f.write(text)
log_operation("write", filename)
return "File written to successfully."
except Exception as e:
return f"Error: {str(e)}"
@@ -148,6 +185,7 @@ def append_to_file(filename: str, text: str) -> str:
filepath = safe_join(WORKING_DIRECTORY, filename)
with open(filepath, "a") as f:
f.write(text)
log_operation("append", filename)
return "Text appended successfully."
except Exception as e:
return f"Error: {str(e)}"
@@ -162,9 +200,12 @@ def delete_file(filename: str) -> str:
Returns:
str: A message indicating success or failure
"""
if check_duplicate_operation("delete", filename):
return "Error: File has already been deleted."
try:
filepath = safe_join(WORKING_DIRECTORY, filename)
os.remove(filepath)
log_operation("delete", filename)
return "File deleted successfully."
except Exception as e:
return f"Error: {str(e)}"

View File

@@ -7,9 +7,9 @@ load_dotenv()
def send_tweet(tweet_text):
consumer_key = os.environ.get("TW_CONSUMER_KEY")
consumer_secret= os.environ.get("TW_CONSUMER_SECRET")
access_token= os.environ.get("TW_ACCESS_TOKEN")
access_token_secret= os.environ.get("TW_ACCESS_TOKEN_SECRET")
consumer_secret = os.environ.get("TW_CONSUMER_SECRET")
access_token = os.environ.get("TW_ACCESS_TOKEN")
access_token_secret = os.environ.get("TW_ACCESS_TOKEN_SECRET")
# Authenticate to Twitter
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

View File

@@ -273,9 +273,7 @@ def print_assistant_thoughts(ai_name, assistant_reply):
if CFG.speak_mode and assistant_thoughts_speak:
say_text(assistant_thoughts_speak)
else:
logger.typewriter_log(
"SPEAK:", Fore.YELLOW, f"{assistant_thoughts_speak}"
)
logger.typewriter_log("SPEAK:", Fore.YELLOW, f"{assistant_thoughts_speak}")
return assistant_reply_json
except json.decoder.JSONDecodeError:

View File

@@ -84,7 +84,6 @@ def get_prompt() -> str:
("Generate Image", "generate_image", {"prompt": "<prompt>"}),
("Convert Audio to text", "read_audio_from_file", {"file": "<file>"}),
("Send Tweet", "send_tweet", {"text": "<text>"}),
]
# Only add shell command to the prompt if the AI is allowed to execute it

View File

@@ -50,7 +50,9 @@ class TestScrapeText:
# Tests that the function returns an error message when an invalid or unreachable url is provided.
def test_invalid_url(self, mocker):
# Mock the requests.get() method to raise an exception
mocker.patch("requests.Session.get", side_effect=requests.exceptions.RequestException)
mocker.patch(
"requests.Session.get", side_effect=requests.exceptions.RequestException
)
# Call the function with an invalid URL and assert that it returns an error message
url = "http://www.invalidurl.com"

View File

@@ -1,4 +1,3 @@
# Generated by CodiumAI
import unittest
import time
@@ -8,7 +7,6 @@ from autogpt.chat import create_chat_message, generate_context
class TestChat(unittest.TestCase):
# Tests that the function returns a dictionary with the correct keys and values when valid strings are provided for role and content.
def test_happy_path_role_content(self):
result = create_chat_message("system", "Hello, world!")
@@ -34,11 +32,22 @@ class TestChat(unittest.TestCase):
result = generate_context(prompt, relevant_memory, full_message_history, model)
# Assert
expected_result = (-1, 47, 3, [
{"role": "system", "content": ""},
{"role": "system", "content": f"The current time and date is {time.strftime('%c')}"},
{"role": "system", "content": f"This reminds you of these events from your past:\n\n\n"},
])
expected_result = (
-1,
47,
3,
[
{"role": "system", "content": ""},
{
"role": "system",
"content": f"The current time and date is {time.strftime('%c')}",
},
{
"role": "system",
"content": f"This reminds you of these events from your past:\n\n\n",
},
],
)
self.assertEqual(result, expected_result)
# Tests that the function successfully generates a current_context given valid inputs.
@@ -50,7 +59,10 @@ class TestChat(unittest.TestCase):
create_chat_message("user", "Hi there!"),
create_chat_message("assistant", "Hello! How can I assist you today?"),
create_chat_message("user", "Can you tell me a joke?"),
create_chat_message("assistant", "Why did the tomato turn red? Because it saw the salad dressing!"),
create_chat_message(
"assistant",
"Why did the tomato turn red? Because it saw the salad dressing!",
),
create_chat_message("user", "Haha, that's funny."),
]
model = "gpt-3.5-turbo-0301"
@@ -66,5 +78,9 @@ class TestChat(unittest.TestCase):
self.assertGreaterEqual(result[0], 0)
self.assertGreaterEqual(result[1], 0)
self.assertGreaterEqual(result[2], 0)
self.assertGreaterEqual(len(result[3]), 3) # current_context should have at least 3 messages
self.assertLessEqual(result[1], 2048) # token limit for GPT-3.5-turbo-0301 is 2048 tokens
self.assertGreaterEqual(
len(result[3]), 3
) # current_context should have at least 3 messages
self.assertLessEqual(
result[1], 2048
) # token limit for GPT-3.5-turbo-0301 is 2048 tokens