diff --git a/autogpt/commands/file_operations.py b/autogpt/commands/file_operations.py index 05f06088..e5181691 100644 --- a/autogpt/commands/file_operations.py +++ b/autogpt/commands/file_operations.py @@ -264,7 +264,7 @@ def download_file(url, filename): progress = f"{readable_file_size(downloaded_size)} / {readable_file_size(total_size)}" spinner.update_message(f"{message} {progress}") - return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(total_size)})' + return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(downloaded_size)})' except requests.HTTPError as e: return f"Got an HTTP Error whilst trying to download file: {e}" except Exception as e: diff --git a/tests/unit/test_file_operations.py b/tests/unit/test_file_operations.py index b2760671..f324193a 100644 --- a/tests/unit/test_file_operations.py +++ b/tests/unit/test_file_operations.py @@ -1,12 +1,18 @@ +""" +This set of unit tests is designed to test the file operations that autoGPT has access to. +""" + import os -import shutil -import unittest from pathlib import Path +from tempfile import gettempdir + +import pytest from autogpt.commands.file_operations import ( append_to_file, check_duplicate_operation, delete_file, + download_file, log_operation, read_file, search_files, @@ -14,118 +20,150 @@ from autogpt.commands.file_operations import ( write_to_file, ) from autogpt.config import Config -from autogpt.workspace import Workspace +from autogpt.utils import readable_file_size -class TestFileOperations(unittest.TestCase): - """ - This set of unit tests is designed to test the file operations that autoGPT has access to. - """ - - def setUp(self): - self.config = Config() - workspace_path = os.path.join(os.path.dirname(__file__), "workspace") - self.workspace_path = Workspace.make_workspace(workspace_path) - self.config.workspace_path = workspace_path - self.config.file_logger_path = os.path.join(workspace_path, "file_logger.txt") - self.workspace = Workspace(workspace_path, restrict_to_workspace=True) - - self.test_file = str(self.workspace.get_path("test_file.txt")) - self.test_file2 = "test_file2.txt" - self.test_directory = str(self.workspace.get_path("test_directory")) - self.test_nested_file = str(self.workspace.get_path("nested/test_file.txt")) - self.file_content = "This is a test file.\n" - self.file_logger_logs = "file_logger.txt" - - with open(self.test_file, "w") as f: - f.write(self.file_content) - - def tearDown(self) -> None: - shutil.rmtree(self.workspace_path) - - def test_check_duplicate_operation(self): - log_operation("write", self.test_file) - self.assertTrue(check_duplicate_operation("write", self.test_file)) - - # Test logging a file operation - def test_log_operation(self): - if os.path.exists(self.file_logger_logs): - os.remove(self.file_logger_logs) - - log_operation("log_test", self.test_file) - with open(self.config.file_logger_path, "r") as f: - content = f.read() - self.assertIn(f"log_test: {self.test_file}", content) - - # Test splitting a file into chunks - def test_split_file(self): - content = "abcdefghij" - chunks = list(split_file(content, max_length=4, overlap=1)) - expected = ["abcd", "defg", "ghij"] - self.assertEqual(chunks, expected) - - def test_read_file(self): - content = read_file(self.test_file) - self.assertEqual(content, self.file_content) - - def test_write_to_file(self): - new_content = "This is new content.\n" - write_to_file(self.test_nested_file, new_content) - with open(self.test_nested_file, "r") as f: - content = f.read() - self.assertEqual(content, new_content) - - def test_append_to_file(self): - append_text = "This is appended text.\n" - append_to_file(self.test_nested_file, append_text) - with open(self.test_nested_file, "r") as f: - content = f.read() - - append_to_file(self.test_nested_file, append_text) - - with open(self.test_nested_file, "r") as f: - content_after = f.read() - - self.assertEqual(content_after, append_text + append_text) - - def test_delete_file(self): - delete_file(self.test_file) - self.assertFalse(os.path.exists(self.test_file)) - - def test_search_files(self): - # Case 1: Create files A and B, search for A, and ensure we don't return A and B - file_a = self.workspace.get_path("file_a.txt") - file_b = self.workspace.get_path("file_b.txt") - - with open(file_a, "w") as f: - f.write("This is file A.") - - with open(file_b, "w") as f: - f.write("This is file B.") - - # Create a subdirectory and place a copy of file_a in it - if not os.path.exists(self.test_directory): - os.makedirs(self.test_directory) - - with open(os.path.join(self.test_directory, file_a.name), "w") as f: - f.write("This is file A in the subdirectory.") - - files = search_files(str(self.workspace.root)) - self.assertIn(file_a.name, files) - self.assertIn(file_b.name, files) - self.assertIn(os.path.join(Path(self.test_directory).name, file_a.name), files) - - # Clean up - os.remove(file_a) - os.remove(file_b) - os.remove(os.path.join(self.test_directory, file_a.name)) - os.rmdir(self.test_directory) - - # Case 2: Search for a file that does not exist and make sure we don't throw - non_existent_file = "non_existent_file.txt" - files = search_files("") - self.assertNotIn(non_existent_file, files) +@pytest.fixture() +def file_content(): + return "This is a test file.\n" -if __name__ == "__main__": - unittest.main() +@pytest.fixture() +def test_file(workspace, file_content): + test_file = str(workspace.get_path("test_file.txt")) + with open(test_file, "w") as f: + f.write(file_content) + return test_file + + +@pytest.fixture() +def test_directory(workspace): + return str(workspace.get_path("test_directory")) + + +@pytest.fixture() +def test_nested_file(workspace): + return str(workspace.get_path("nested/test_file.txt")) + + +def test_check_duplicate_operation(config, test_file): + log_operation("write", test_file) + assert check_duplicate_operation("write", test_file) is True + + +# Test logging a file operation +def test_log_operation(test_file, config): + file_logger_name = config.file_logger_path + if os.path.exists(file_logger_name): + os.remove(file_logger_name) + + log_operation("log_test", test_file) + with open(config.file_logger_path, "r") as f: + content = f.read() + assert f"log_test: {test_file}" in content + + +# Test splitting a file into chunks +def test_split_file(): + content = "abcdefghij" + chunks = list(split_file(content, max_length=4, overlap=1)) + expected = ["abcd", "defg", "ghij"] + assert chunks == expected + + +def test_read_file(test_file, file_content): + content = read_file(test_file) + assert content == file_content + + +def test_write_to_file(config, test_nested_file): + new_content = "This is new content.\n" + write_to_file(test_nested_file, new_content) + with open(test_nested_file, "r") as f: + content = f.read() + assert content == new_content + + +def test_append_to_file(test_nested_file): + append_text = "This is appended text.\n" + write_to_file(test_nested_file, append_text) + + append_to_file(test_nested_file, append_text) + + with open(test_nested_file, "r") as f: + content_after = f.read() + + assert content_after == append_text + append_text + + +def test_delete_file(config, test_file): + delete_file(test_file) + assert os.path.exists(test_file) is False + assert delete_file(test_file) == "Error: File has already been deleted." + + +def test_delete_missing_file(test_file): + os.remove(test_file) + try: + os.remove(test_file) + except FileNotFoundError as e: + error_string = str(e) + assert error_string in delete_file(test_file) + return + assert True, "Failed to test delete_file" + + +def test_search_files(config, workspace, test_directory): + # Case 1: Create files A and B, search for A, and ensure we don't return A and B + file_a = workspace.get_path("file_a.txt") + file_b = workspace.get_path("file_b.txt") + + with open(file_a, "w") as f: + f.write("This is file A.") + + with open(file_b, "w") as f: + f.write("This is file B.") + + # Create a subdirectory and place a copy of file_a in it + if not os.path.exists(test_directory): + os.makedirs(test_directory) + + with open(os.path.join(test_directory, file_a.name), "w") as f: + f.write("This is file A in the subdirectory.") + + files = search_files(str(workspace.root)) + assert file_a.name in files + assert file_b.name in files + assert os.path.join(Path(test_directory).name, file_a.name) in files + + # Clean up + os.remove(file_a) + os.remove(file_b) + os.remove(os.path.join(test_directory, file_a.name)) + os.rmdir(test_directory) + + # Case 2: Search for a file that does not exist and make sure we don't throw + non_existent_file = "non_existent_file.txt" + files = search_files("") + assert non_existent_file not in files + + +def test_download_file(): + url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.2.2.tar.gz" + local_name = os.path.join(gettempdir(), "auto-gpt.tar.gz") + size = 365023 + readable_size = readable_file_size(size) + assert ( + download_file(url, local_name) + == f'Successfully downloaded and locally stored file: "{local_name}"! (Size: {readable_size})' + ) + assert os.path.isfile(local_name) is True + assert os.path.getsize(local_name) == size + + url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.0.0.tar.gz" + assert "Got an HTTP Error whilst trying to download file" in download_file( + url, local_name + ) + + url = "https://thiswebsiteiswrong.hmm/v0.0.0.tar.gz" + assert "Failed to establish a new connection:" in download_file(url, local_name)