diff --git a/autogpt/app.py b/autogpt/app.py index e7b16adc..fa5cab62 100644 --- a/autogpt/app.py +++ b/autogpt/app.py @@ -186,7 +186,7 @@ def execute_command(command_name: str, arguments): elif command_name == "generate_image": return generate_image(arguments["prompt"]) elif command_name == "send_tweet": - return send_tweet(arguments['text']) + return send_tweet(arguments["text"]) elif command_name == "do_nothing": return "No action performed." elif command_name == "task_complete": diff --git a/autogpt/commands/audio_text.py b/autogpt/commands/audio_text.py index bf9c3640..b9ca988c 100644 --- a/autogpt/commands/audio_text.py +++ b/autogpt/commands/audio_text.py @@ -23,7 +23,9 @@ def read_audio(audio): headers = {"Authorization": f"Bearer {api_token}"} if api_token is None: - raise ValueError("You need to set your Hugging Face API token in the config file.") + raise ValueError( + "You need to set your Hugging Face API token in the config file." + ) response = requests.post( api_url, @@ -31,5 +33,5 @@ def read_audio(audio): data=audio, ) - text = json.loads(response.content.decode("utf-8"))['text'] + text = json.loads(response.content.decode("utf-8"))["text"] return "The audio says: " + text diff --git a/autogpt/commands/file_operations.py b/autogpt/commands/file_operations.py index 47a04dce..d02b125a 100644 --- a/autogpt/commands/file_operations.py +++ b/autogpt/commands/file_operations.py @@ -5,15 +5,49 @@ from pathlib import Path from typing import Generator, List # Set a dedicated folder for file I/O -WORKING_DIRECTORY = Path(__file__).parent.parent / "auto_gpt_workspace" +WORKING_DIRECTORY = Path(os.getcwd()) / "auto_gpt_workspace" # Create the directory if it doesn't exist if not os.path.exists(WORKING_DIRECTORY): os.makedirs(WORKING_DIRECTORY) +LOG_FILE = "file_logger.txt" +LOG_FILE_PATH = WORKING_DIRECTORY / LOG_FILE WORKING_DIRECTORY = str(WORKING_DIRECTORY) +def check_duplicate_operation(operation: str, filename: str) -> bool: + """Check if the operation has already been performed on the given file + + Args: + operation (str): The operation to check for + filename (str): The name of the file to check for + + Returns: + bool: True if the operation has already been performed on the file + """ + log_content = read_file(LOG_FILE) + log_entry = f"{operation}: {filename}\n" + return log_entry in log_content + + +def log_operation(operation: str, filename: str) -> None: + """Log the file operation to the file_logger.txt + + Args: + operation (str): The operation to log + filename (str): The name of the file the operation was performed on + """ + log_entry = f"{operation}: {filename}\n" + + # Create the log file if it doesn't exist + if not os.path.exists(LOG_FILE_PATH): + with open(LOG_FILE_PATH, "w", encoding="utf-8") as f: + f.write("File Operation Logger ") + + append_to_file(LOG_FILE, log_entry) + + def safe_join(base: str, *paths) -> str: """Join one or more path components intelligently. @@ -122,6 +156,8 @@ def write_to_file(filename: str, text: str) -> str: Returns: str: A message indicating success or failure """ + if check_duplicate_operation("write", filename): + return "Error: File has already been updated." try: filepath = safe_join(WORKING_DIRECTORY, filename) directory = os.path.dirname(filepath) @@ -129,6 +165,7 @@ def write_to_file(filename: str, text: str) -> str: os.makedirs(directory) with open(filepath, "w", encoding="utf-8") as f: f.write(text) + log_operation("write", filename) return "File written to successfully." except Exception as e: return f"Error: {str(e)}" @@ -148,6 +185,7 @@ def append_to_file(filename: str, text: str) -> str: filepath = safe_join(WORKING_DIRECTORY, filename) with open(filepath, "a") as f: f.write(text) + log_operation("append", filename) return "Text appended successfully." except Exception as e: return f"Error: {str(e)}" @@ -162,9 +200,12 @@ def delete_file(filename: str) -> str: Returns: str: A message indicating success or failure """ + if check_duplicate_operation("delete", filename): + return "Error: File has already been deleted." try: filepath = safe_join(WORKING_DIRECTORY, filename) os.remove(filepath) + log_operation("delete", filename) return "File deleted successfully." except Exception as e: return f"Error: {str(e)}" diff --git a/autogpt/commands/twitter.py b/autogpt/commands/twitter.py index 1774bfb9..dc4d450c 100644 --- a/autogpt/commands/twitter.py +++ b/autogpt/commands/twitter.py @@ -7,9 +7,9 @@ load_dotenv() def send_tweet(tweet_text): consumer_key = os.environ.get("TW_CONSUMER_KEY") - consumer_secret= os.environ.get("TW_CONSUMER_SECRET") - access_token= os.environ.get("TW_ACCESS_TOKEN") - access_token_secret= os.environ.get("TW_ACCESS_TOKEN_SECRET") + consumer_secret = os.environ.get("TW_CONSUMER_SECRET") + access_token = os.environ.get("TW_ACCESS_TOKEN") + access_token_secret = os.environ.get("TW_ACCESS_TOKEN_SECRET") # Authenticate to Twitter auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) diff --git a/autogpt/logs.py b/autogpt/logs.py index c644c5ea..22ce23f4 100644 --- a/autogpt/logs.py +++ b/autogpt/logs.py @@ -273,9 +273,7 @@ def print_assistant_thoughts(ai_name, assistant_reply): if CFG.speak_mode and assistant_thoughts_speak: say_text(assistant_thoughts_speak) else: - logger.typewriter_log( - "SPEAK:", Fore.YELLOW, f"{assistant_thoughts_speak}" - ) + logger.typewriter_log("SPEAK:", Fore.YELLOW, f"{assistant_thoughts_speak}") return assistant_reply_json except json.decoder.JSONDecodeError: diff --git a/autogpt/prompt.py b/autogpt/prompt.py index 9f79d420..97bacb71 100644 --- a/autogpt/prompt.py +++ b/autogpt/prompt.py @@ -84,7 +84,6 @@ def get_prompt() -> str: ("Generate Image", "generate_image", {"prompt": ""}), ("Convert Audio to text", "read_audio_from_file", {"file": ""}), ("Send Tweet", "send_tweet", {"text": ""}), - ] # Only add shell command to the prompt if the AI is allowed to execute it diff --git a/tests/unit/test_browse_scrape_text.py b/tests/unit/test_browse_scrape_text.py index 61c19b05..fea5ebfc 100644 --- a/tests/unit/test_browse_scrape_text.py +++ b/tests/unit/test_browse_scrape_text.py @@ -50,7 +50,9 @@ class TestScrapeText: # Tests that the function returns an error message when an invalid or unreachable url is provided. def test_invalid_url(self, mocker): # Mock the requests.get() method to raise an exception - mocker.patch("requests.Session.get", side_effect=requests.exceptions.RequestException) + mocker.patch( + "requests.Session.get", side_effect=requests.exceptions.RequestException + ) # Call the function with an invalid URL and assert that it returns an error message url = "http://www.invalidurl.com" diff --git a/tests/unit/test_chat.py b/tests/unit/test_chat.py index 258ed62b..55a44492 100644 --- a/tests/unit/test_chat.py +++ b/tests/unit/test_chat.py @@ -1,4 +1,3 @@ - # Generated by CodiumAI import unittest import time @@ -8,7 +7,6 @@ from autogpt.chat import create_chat_message, generate_context class TestChat(unittest.TestCase): - # Tests that the function returns a dictionary with the correct keys and values when valid strings are provided for role and content. def test_happy_path_role_content(self): result = create_chat_message("system", "Hello, world!") @@ -34,11 +32,22 @@ class TestChat(unittest.TestCase): result = generate_context(prompt, relevant_memory, full_message_history, model) # Assert - expected_result = (-1, 47, 3, [ - {"role": "system", "content": ""}, - {"role": "system", "content": f"The current time and date is {time.strftime('%c')}"}, - {"role": "system", "content": f"This reminds you of these events from your past:\n\n\n"}, - ]) + expected_result = ( + -1, + 47, + 3, + [ + {"role": "system", "content": ""}, + { + "role": "system", + "content": f"The current time and date is {time.strftime('%c')}", + }, + { + "role": "system", + "content": f"This reminds you of these events from your past:\n\n\n", + }, + ], + ) self.assertEqual(result, expected_result) # Tests that the function successfully generates a current_context given valid inputs. @@ -50,7 +59,10 @@ class TestChat(unittest.TestCase): create_chat_message("user", "Hi there!"), create_chat_message("assistant", "Hello! How can I assist you today?"), create_chat_message("user", "Can you tell me a joke?"), - create_chat_message("assistant", "Why did the tomato turn red? Because it saw the salad dressing!"), + create_chat_message( + "assistant", + "Why did the tomato turn red? Because it saw the salad dressing!", + ), create_chat_message("user", "Haha, that's funny."), ] model = "gpt-3.5-turbo-0301" @@ -66,5 +78,9 @@ class TestChat(unittest.TestCase): self.assertGreaterEqual(result[0], 0) self.assertGreaterEqual(result[1], 0) self.assertGreaterEqual(result[2], 0) - self.assertGreaterEqual(len(result[3]), 3) # current_context should have at least 3 messages - self.assertLessEqual(result[1], 2048) # token limit for GPT-3.5-turbo-0301 is 2048 tokens + self.assertGreaterEqual( + len(result[3]), 3 + ) # current_context should have at least 3 messages + self.assertLessEqual( + result[1], 2048 + ) # token limit for GPT-3.5-turbo-0301 is 2048 tokens