Rearrange tests & fix CI (#4596)

* Rearrange tests into unit/integration/challenge categories

* Fix linting + `tests.challenges` imports

* Fix obscured duplicate test in test_url_validation.py

* Move VCR conftest to tests.vcr

* Specify tests to run & their order (unit -> integration -> challenges) in CI

* Fail Docker CI when tests fail

* Fix import & linting errors in tests

* Fix `get_text_summary`

* Fix linting errors

* Clean up pytest args in CI

* Remove bogus tests from GoCodeo
This commit is contained in:
Reinier van der Leer
2023-06-06 19:48:49 +02:00
committed by GitHub
parent 8a881f70a3
commit dafbd11686
59 changed files with 150 additions and 377 deletions

View File

View File

@@ -0,0 +1,10 @@
If the goal oriented task pipeline fails, it means:
- you somehow changed the way the system prompt is generated
- or you broke autogpt.
To know which one, you can run the following command:
```bash
pytest -s -k tests/integration/goal_oriented
If the test is successful, it will record new cassettes in VCR. Then you can just push these to your branch and the pipeline
will pass

View File

@@ -0,0 +1,25 @@
import pytest
from autogpt.agent import Agent
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 2
@requires_api_key("OPENAI_API_KEY")
@pytest.mark.vcr
@challenge
def test_browse_website(
browser_agent: Agent,
patched_api_requestor: None,
monkeypatch: pytest.MonkeyPatch,
level_to_run: int,
) -> None:
file_path = browser_agent.workspace.get_path("browse_website.txt")
run_interaction_loop(monkeypatch, browser_agent, CYCLE_COUNT)
# content = read_file(file_path, config)
content = open(file_path, encoding="utf-8").read()
assert "£25.89" in content, f"Expected £25.89, got {content}"

View File

@@ -0,0 +1,28 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 3
@requires_api_key("OPENAI_API_KEY")
@pytest.mark.vcr
@challenge
def test_write_file(
writer_agent: Agent,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
) -> None:
file_path = str(writer_agent.workspace.get_path("hello_world.txt"))
run_interaction_loop(monkeypatch, writer_agent, CYCLE_COUNT)
content = read_file(file_path, config)
assert content == "Hello World", f"Expected 'Hello World', got {content}"

View File

@@ -0,0 +1,23 @@
from typing import Optional
class Challenge:
BEAT_CHALLENGES = False
def __init__(
self,
name: str,
category: str,
max_level: int,
is_new_challenge: bool,
max_level_beaten: Optional[int],
level_to_run: Optional[int] = None,
) -> None:
self.name = name
self.category = category
self.max_level_beaten = max_level_beaten
self.max_level = max_level
self.succeeded = False
self.skipped = False
self.level_to_run = level_to_run
self.is_new_challenge = is_new_challenge

View File

@@ -0,0 +1,71 @@
import os
from functools import wraps
from typing import Any, Callable, Optional
import pytest
from tests.challenges.challenge_decorator.challenge import Challenge
from tests.challenges.challenge_decorator.challenge_utils import create_challenge
from tests.challenges.challenge_decorator.score_utils import (
get_scores,
update_new_score,
)
MAX_LEVEL_TO_IMPROVE_ON = (
1 # we will attempt to beat 1 level above the current level for now.
)
def challenge(func: Callable[..., Any]) -> Callable[..., None]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> None:
run_remaining = MAX_LEVEL_TO_IMPROVE_ON if Challenge.BEAT_CHALLENGES else 1
original_error = None
while run_remaining > 0:
current_score, new_score, new_score_location = get_scores()
level_to_run = kwargs["level_to_run"] if "level_to_run" in kwargs else None
challenge = create_challenge(
func, current_score, Challenge.BEAT_CHALLENGES, level_to_run
)
if challenge.level_to_run is not None:
kwargs["level_to_run"] = challenge.level_to_run
try:
func(*args, **kwargs)
challenge.succeeded = True
except AssertionError as err:
original_error = err
challenge.succeeded = False
else:
challenge.skipped = True
if os.environ.get("CI") == "true":
new_max_level_beaten = get_new_max_level_beaten(
challenge, Challenge.BEAT_CHALLENGES
)
update_new_score(
new_score_location, new_score, challenge, new_max_level_beaten
)
if challenge.level_to_run is None:
pytest.skip("This test has not been unlocked yet.")
if not challenge.succeeded:
if Challenge.BEAT_CHALLENGES or challenge.is_new_challenge:
# xfail
pytest.xfail("Challenge failed")
if original_error:
raise original_error
raise AssertionError("Challenge failed")
run_remaining -= 1
return wrapper
def get_new_max_level_beaten(
challenge: Challenge, beat_challenges: bool
) -> Optional[int]:
if challenge.succeeded:
return challenge.level_to_run
if challenge.skipped:
return challenge.max_level_beaten
# Challenge failed
return challenge.max_level_beaten if beat_challenges else None

View File

@@ -0,0 +1,85 @@
import os
from typing import Any, Callable, Dict, Optional, Tuple
from tests.challenges.challenge_decorator.challenge import Challenge
CHALLENGE_PREFIX = "test_"
def create_challenge(
func: Callable[..., Any],
current_score: Dict[str, Any],
is_beat_challenges: bool,
level_to_run: Optional[int] = None,
) -> Challenge:
challenge_category, challenge_name = get_challenge_identifiers(func)
is_new_challenge = challenge_name not in current_score.get(challenge_category, {})
max_level = get_max_level(current_score, challenge_category, challenge_name)
max_level_beaten = get_max_level_beaten(
current_score, challenge_category, challenge_name
)
level_to_run = get_level_to_run(
is_beat_challenges, level_to_run, max_level, max_level_beaten, is_new_challenge
)
return Challenge(
name=challenge_name,
category=challenge_category,
max_level=max_level,
max_level_beaten=max_level_beaten,
level_to_run=level_to_run,
is_new_challenge=is_new_challenge,
)
def get_level_to_run(
is_beat_challenges: bool,
level_to_run: Optional[int],
max_level: int,
max_level_beaten: Optional[int],
is_new_challenge: bool,
) -> Optional[int]:
if is_new_challenge:
return 1
if level_to_run is not None:
if level_to_run > max_level:
raise ValueError(
f"Level to run ({level_to_run}) is greater than max level ({max_level})"
)
return level_to_run
if is_beat_challenges:
if max_level_beaten == max_level:
return None
return 1 if max_level_beaten is None else max_level_beaten + 1
return max_level_beaten
def get_challenge_identifiers(func: Callable[..., Any]) -> Tuple[str, str]:
full_path = os.path.dirname(os.path.abspath(func.__code__.co_filename))
challenge_category = os.path.basename(full_path)
challenge_name = func.__name__.replace(CHALLENGE_PREFIX, "")
return challenge_category, challenge_name
def get_max_level(
current_score: Dict[str, Any],
challenge_category: str,
challenge_name: str,
) -> int:
return (
current_score.get(challenge_category, {})
.get(challenge_name, {})
.get("max_level", 1)
)
def get_max_level_beaten(
current_score: Dict[str, Any],
challenge_category: str,
challenge_name: str,
) -> Optional[int]:
return (
current_score.get(challenge_category, {})
.get(challenge_name, {})
.get("max_level_beaten", None)
)

View File

@@ -0,0 +1,59 @@
import json
import os
from typing import Any, Dict, Optional, Tuple
from tests.challenges.challenge_decorator.challenge import Challenge
CURRENT_SCORE_LOCATION = "../current_score"
NEW_SCORE_LOCATION = "../new_score"
def update_new_score(
filename_new_score: str,
new_score: Dict[str, Any],
challenge: Challenge,
new_max_level_beaten: Optional[int],
) -> None:
write_new_score(new_score, challenge, new_max_level_beaten)
write_new_score_to_file(new_score, filename_new_score)
def write_new_score(
new_score: Dict[str, Any], challenge: Challenge, new_max_level_beaten: Optional[int]
) -> Dict[str, Any]:
new_score.setdefault(challenge.category, {})
new_score[challenge.category][challenge.name] = {
"max_level_beaten": new_max_level_beaten,
"max_level": challenge.max_level,
}
return new_score
def write_new_score_to_file(new_score: Dict[str, Any], filename: str) -> None:
with open(filename, "w") as file:
json.dump(new_score, file, indent=4)
def get_scores() -> Tuple[Dict[str, Any], Dict[str, Any], str]:
filename_current_score, filename_new_score = get_score_locations()
current_score = load_json(filename_current_score)
new_score = load_json(filename_new_score)
return current_score, new_score, filename_new_score
def load_json(filename: str) -> Dict[str, Any]:
if os.path.isfile(filename):
with open(filename, "r") as file:
return json.load(file)
else:
return {}
def get_score_locations() -> Tuple[str, str]:
pid = os.getpid()
project_root = os.path.dirname(os.path.abspath(__file__))
filename_current_score = os.path.join(
project_root, f"{CURRENT_SCORE_LOCATION}.json"
)
filename_new_score = os.path.join(project_root, f"{NEW_SCORE_LOCATION}_{pid}.json")
return filename_current_score, filename_new_score

View File

@@ -0,0 +1,56 @@
from typing import Any, Dict, Optional
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from tests.challenges.challenge_decorator.challenge import Challenge
from tests.vcr import BASE_VCR_CONFIG, before_record_response
def before_record_response_filter_errors(
response: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""In challenges we don't want to record errors (See issue #4461)"""
if response["status"]["code"] >= 400:
return None
return before_record_response(response)
@pytest.fixture(scope="module")
def vcr_config() -> Dict[str, Any]:
# this fixture is called by the pytest-recording vcr decorator.
return BASE_VCR_CONFIG | {
"before_record_response": before_record_response_filter_errors,
}
def pytest_addoption(parser: Parser) -> None:
parser.addoption(
"--level", action="store", default=None, type=int, help="Specify test level"
)
parser.addoption(
"--beat-challenges",
action="store_true",
help="Spepcifies whether the test suite should attempt to beat challenges",
)
def pytest_configure(config: Config) -> None:
level = config.getoption("--level", default=None)
config.option.level = level
beat_challenges = config.getoption("--beat-challenges", default=False)
config.option.beat_challenges = beat_challenges
@pytest.fixture
def level_to_run(request: FixtureRequest) -> int:
## used for challenges in the goal oriented tests
return request.config.option.level
@pytest.fixture(autouse=True)
def check_beat_challenges(request: FixtureRequest) -> None:
Challenge.BEAT_CHALLENGES = request.config.getoption("--beat-challenges")

View File

@@ -0,0 +1,48 @@
{
"basic_abilities": {
"browse_website": {
"max_level": 1,
"max_level_beaten": 1
},
"write_file": {
"max_level": 1,
"max_level_beaten": 1
}
},
"debug_code": {
"debug_code_challenge_a": {
"max_level": 1,
"max_level_beaten": null
}
},
"information_retrieval": {
"information_retrieval_challenge_a": {
"max_level": 3,
"max_level_beaten": 1
},
"information_retrieval_challenge_b": {
"max_level": 1,
"max_level_beaten": null
}
},
"kubernetes": {
"kubernetes_template_challenge_a": {
"max_level": 1,
"max_level_beaten": null
}
},
"memory": {
"memory_challenge_a": {
"max_level": 3,
"max_level_beaten": 3
},
"memory_challenge_b": {
"max_level": 5,
"max_level_beaten": null
},
"memory_challenge_c": {
"max_level": 5,
"max_level_beaten": 1
}
}
}

View File

@@ -0,0 +1,19 @@
# mypy: ignore-errors
from typing import List, Optional
def two_sum(nums: List, target: int) -> Optional[int]:
seen = {}
for i, num in enumerate(nums):
complement = target - num
if complement in seen:
return [seen[complement], i]
seen[num] = i
return None
# Example usage:
nums = [2, 7, 11, 15]
target = 9
result = two_sum(nums, target)
print(result) # Output: [0, 1]

View File

@@ -0,0 +1,30 @@
# mypy: ignore-errors
# we need a new line at the top of the file to avoid a syntax error
def test_two_sum(nums, target, expected_result):
# These tests are appended to the two_sum file so we can ignore this error for now
result = two_sum(nums, target)
print(result)
assert (
result == expected_result
), f"AssertionError: Expected the output to be {expected_result}"
# test the trivial case with the first two numbers
nums = [2, 7, 11, 15]
target = 9
expected_result = [0, 1]
test_two_sum(nums, target, expected_result)
# test for ability to use zero and the same number twice
nums = [2, 7, 0, 15, 12, 0]
target = 0
expected_result = [2, 5]
test_two_sum(nums, target, expected_result)
# test for first and last index usage and negative numbers
nums = [-6, 7, 11, 4]
target = -2
expected_result = [0, 3]
test_two_sum(nums, target, expected_result)

View File

@@ -0,0 +1,49 @@
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.execute_code import execute_python_file
from autogpt.commands.file_operations import append_to_file, write_to_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 5
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_debug_code_challenge_a(
debug_code_agent: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: MockerFixture,
config: Config,
level_to_run: int,
) -> None:
"""
Test whether the agent can debug a simple code snippet.
:param debug_code_agent: The agent to test.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:patched_api_requestor: Sends api requests to our API CI pipeline
:config: The config object for the agent.
:level_to_run: The level to run.
"""
file_path = str(debug_code_agent.workspace.get_path("code.py"))
code_file_path = Path(__file__).parent / "data" / "two_sum.py"
test_file_path = Path(__file__).parent / "data" / "two_sum_tests.py"
write_to_file(file_path, code_file_path.read_text(), config)
run_interaction_loop(monkeypatch, debug_code_agent, CYCLE_COUNT)
append_to_file(file_path, test_file_path.read_text(), config)
output = execute_python_file(file_path, config)
assert "error" not in output.lower(), f"Errors found in output: {output}!"

View File

@@ -0,0 +1,40 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 3
EXPECTED_REVENUES = [["81"], ["81"], ["81", "53", "24", "21", "11", "7", "4", "3", "2"]]
from autogpt.agent import Agent
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_information_retrieval_challenge_a(
information_retrieval_agents: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: MockerFixture,
config: Config,
level_to_run: int,
) -> None:
"""
Test the challenge_a function in a given agent by mocking user inputs and checking the output file content.
:param get_company_revenue_agent: The agent to test.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
"""
information_retrieval_agent = information_retrieval_agents[level_to_run - 1]
run_interaction_loop(monkeypatch, information_retrieval_agent, CYCLE_COUNT)
file_path = str(information_retrieval_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
expected_revenues = EXPECTED_REVENUES[level_to_run - 1]
for revenue in expected_revenues:
assert (
f"{revenue}." in content or f"{revenue}," in content
), f"Expected the file to contain {revenue}"

View File

@@ -0,0 +1,50 @@
import contextlib
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 3
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_information_retrieval_challenge_b(
get_nobel_prize_agent: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: MockerFixture,
level_to_run: int,
config: Config,
) -> None:
"""
Test the challenge_b function in a given agent by mocking user inputs and checking the output file content.
:param get_nobel_prize_agent: The agent to test.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:param patched_api_requestor: APIRequestor Patch to override the openai.api_requestor module for testing.
:param level_to_run: The level to run.
:param config: The config object.
"""
with contextlib.suppress(SystemExit):
run_interaction_loop(monkeypatch, get_nobel_prize_agent, CYCLE_COUNT)
file_path = str(
get_nobel_prize_agent.workspace.get_path("2010_nobel_prize_winners.txt")
)
content = read_file(file_path, config)
assert "Andre Geim" in content, "Expected the file to contain Andre Geim"
assert (
"Konstantin Novoselov" in content
), "Expected the file to contain Konstantin Novoselov"
assert (
"University of Manchester" in content
), "Expected the file to contain University of Manchester"
assert "graphene" in content, "Expected the file to contain graphene"

View File

@@ -0,0 +1,45 @@
import pytest
import yaml
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 3
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_kubernetes_template_challenge_a(
kubernetes_agent: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: MockerFixture,
config: Config,
level_to_run: int,
) -> None:
"""
Test the challenge_a function in a given agent by mocking user inputs
and checking the output file content.
Args:
kubernetes_agent (Agent)
monkeypatch (pytest.MonkeyPatch)
config (Config)
level_to_run (int)
"""
run_interaction_loop(monkeypatch, kubernetes_agent, CYCLE_COUNT)
file_path = str(kubernetes_agent.workspace.get_path("kube.yaml"))
content = read_file(file_path, config)
for word in ["apiVersion", "kind", "metadata", "spec"]:
assert word in content, f"Expected the file to contain {word}"
content = yaml.safe_load(content)
for word in ["Service", "Deployment", "Pod"]:
assert word in content["kind"], f"Expected the file to contain {word}"

View File

View File

@@ -0,0 +1,83 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_memory_challenge_a(
memory_management_agent: Agent,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
) -> None:
"""
The agent reads a file containing a task_id. Then, it reads a series of other files.
After reading 'n' files, the agent must write the task_id into a new file.
Args:
memory_management_agent (Agent)
patched_api_requestor (MockerFixture)
monkeypatch (pytest.MonkeyPatch)
config (Config)
level_to_run (int)
"""
task_id = "2314"
create_instructions_files(memory_management_agent, level_to_run, task_id, config)
run_interaction_loop(monkeypatch, memory_management_agent, level_to_run + 2)
file_path = str(memory_management_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
assert task_id in content, f"Expected the file to contain {task_id}"
def create_instructions_files(
memory_management_agent: Agent,
num_files: int,
task_id: str,
config: Config,
base_filename: str = "instructions_",
) -> None:
"""
Creates a series of instructions files for the memory challenge.
Args:
memory_management_agent (Agent)
num_files (int)
task_id (str)
base_filename (str, optional)
"""
for i in range(1, num_files + 1):
content = generate_content(i, task_id, base_filename, num_files)
file_name = f"{base_filename}{i}.txt"
file_path = str(memory_management_agent.workspace.get_path(file_name))
write_to_file(file_path, content, config)
def generate_content(
index: int, task_id: str, base_filename: str, num_files: int
) -> str:
"""
Args:
index: int
task_id: str
base_filename: str
num_files: int
Returns: str
"""
if index == 1:
return (
f"This task_id is {task_id}\nRead the file {base_filename}{index + 1}.txt"
)
if index != num_files:
return f"Read the file {base_filename}{index + 1}.txt"
return "Write the task_id into the file output.txt\nShutdown"

View File

@@ -0,0 +1,84 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import generate_noise, run_interaction_loop
from tests.utils import requires_api_key
NOISE = 1000
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_memory_challenge_b(
memory_management_agent: Agent,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
) -> None:
"""
The agent reads a series of files, each containing a task_id and noise. After reading 'n' files,
the agent must write all the task_ids into a new file, filtering out the noise.
Args:
memory_management_agent (Agent)
patched_api_requestor (MockerFixture)
monkeypatch (pytest.MonkeyPatch)
level_to_run (int)
"""
task_ids = [str(i * 1111) for i in range(1, level_to_run + 1)]
create_instructions_files(memory_management_agent, level_to_run, task_ids, config)
run_interaction_loop(monkeypatch, memory_management_agent, level_to_run + 2)
file_path = str(memory_management_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
for task_id in task_ids:
assert task_id in content, f"Expected the file to contain {task_id}"
def create_instructions_files(
memory_management_agent: Agent,
level: int,
task_ids: list,
config: Config,
base_filename: str = "instructions_",
) -> None:
"""
Creates a series of instructions files for the memory challenge.
Args:
level:
memory_management_agent (Agent)
num_files (int)
task_ids (list)
base_filename (str, optional)
"""
for i in range(1, level + 1):
content = generate_content(i, task_ids, base_filename, level)
file_name = f"{base_filename}{i}.txt"
file_path = str(memory_management_agent.workspace.get_path(file_name))
write_to_file(file_path, content, config)
def generate_content(index: int, task_ids: list, base_filename: str, level: int) -> str:
"""
Args:
index: int
task_ids: list
base_filename: str
num_files: int
Returns: str
"""
task_id = task_ids[index - 1]
noise = generate_noise(NOISE)
if index != level:
if level == 1:
return f"{noise}\nThe current task_id is {task_id}.\n{noise}\nWrite all the task_ids into the file output.txt. The file has not been created yet. After that, use the task_complete command."
return f"{noise}\nThe current task_id is {task_id}.\n{noise}\nRead the file {base_filename}{index + 1}.txt using the read_file command."
return f"{noise}\nThis task_id is {task_id}\n{noise}\nWrite all the task_ids into the file output.txt. The file has not been created yet. After that, use the task_complete command.\n"

View File

@@ -0,0 +1,104 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from autogpt.config import Config
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import generate_noise, run_interaction_loop
from tests.utils import requires_api_key
NOISE = 1000
# @pytest.mark.vcr
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_memory_challenge_c(
memory_management_agent: Agent,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
) -> None:
"""
Instead of reading task Ids from files as with the previous challenges, the agent now must remember
phrases which may have semantically similar meaning and the agent must write the phrases to a file
after seeing several of them.
Args:
memory_management_agent (Agent)
patched_api_requestor (MockerFixture)
monkeypatch (pytest.MonkeyPatch)
config (Config)
level_to_run (int)
"""
silly_phrases = [
"The purple elephant danced on a rainbow while eating a taco.",
"The sneaky toaster stole my socks and ran away to Hawaii.",
"My pet rock sings better than Beyoncé on Tuesdays.",
"The giant hamster rode a unicycle through the crowded mall.",
"The talking tree gave me a high-five and then flew away.",
"I have a collection of invisible hats that I wear on special occasions.",
"The flying spaghetti monster stole my sandwich and left a note saying 'thanks for the snack!'",
"My imaginary friend is a dragon who loves to play video games.",
"I once saw a cloud shaped like a giant chicken eating a pizza.",
"The ninja unicorn disguised itself as a potted plant and infiltrated the office.",
]
level_silly_phrases = silly_phrases[:level_to_run]
create_instructions_files(
memory_management_agent, level_to_run, level_silly_phrases, config=config
)
run_interaction_loop(monkeypatch, memory_management_agent, level_to_run + 2)
file_path = str(memory_management_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
for phrase in level_silly_phrases:
assert phrase in content, f"Expected the file to contain {phrase}"
def create_instructions_files(
memory_management_agent: Agent,
level: int,
task_ids: list,
config: Config,
base_filename: str = "instructions_",
) -> None:
"""
Creates a series of instructions files for the memory challenge.
Args:
level:
memory_management_agent (Agent)
num_files (int)
task_ids (list)
base_filename (str, optional)
"""
for i in range(1, level + 1):
content = generate_content(i, task_ids, base_filename, level)
file_name = f"{base_filename}{i}.txt"
file_path = str(memory_management_agent.workspace.get_path(file_name))
write_to_file(file_path, content, config)
def generate_content(
index: int, silly_phrases: list, base_filename: str, level: int
) -> str:
"""
Args:
index: int
task_ids: list
base_filename: str
num_files: int
Returns: str
"""
phrase = silly_phrases[index - 1]
noise = generate_noise(NOISE)
if index != level:
if level == 1:
return f"{noise}\nThe current phrase to remember is '{phrase}'.\n{noise}\nWrite all the phrases into the file output.txt. The file has not been created yet. After that, use the task_complete command."
return f"{noise}\nThe current phrase is '{phrase}'.\n{noise}\nRead the file {base_filename}{index + 1}.txt using the read_file command."
return f"{noise}\nThis phrase is '{phrase}'\n{noise}\nWrite all the phrases into the file output.txt. The file has not been created yet. After that, use the task_complete command.\n"

View File

@@ -0,0 +1,59 @@
import importlib.util
import inspect
import os
from types import ModuleType
from typing import List
# Path to the challenges folder
CHALLENGES_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "../challenges"
)
def get_python_files(directory: str, exclude_file: str) -> List[str]:
"""Recursively get all python files in a directory and subdirectories."""
python_files: List[str] = []
for root, dirs, files in os.walk(directory):
for file in files:
if (
file.endswith(".py")
and file.startswith("test_")
and file != exclude_file
):
python_files.append(os.path.join(root, file))
return python_files
def load_module_from_file(test_file: str) -> ModuleType:
spec = importlib.util.spec_from_file_location("module.name", test_file)
assert spec is not None, f"Unable to get spec for module in file {test_file}"
module = importlib.util.module_from_spec(spec)
assert (
spec.loader is not None
), f"Unable to get loader for module in file {test_file}"
spec.loader.exec_module(module)
return module
def get_test_functions(module: ModuleType) -> List:
return [
o
for o in inspect.getmembers(module)
if inspect.isfunction(o[1]) and o[0].startswith("test_")
]
def assert_single_test_function(functions_list: List, test_file: str) -> None:
assert len(functions_list) == 1, f"{test_file} should contain only one function"
assert (
functions_list[0][0][5:] == os.path.basename(test_file)[5:-3]
), f"The function in {test_file} should have the same name as the file without 'test_' prefix"
def test_method_name_and_count() -> None:
current_file: str = os.path.basename(__file__)
test_files: List[str] = get_python_files(CHALLENGES_DIR, current_file)
for test_file in test_files:
module = load_module_from_file(test_file)
functions_list = get_test_functions(module)
assert_single_test_function(functions_list, test_file)

44
tests/challenges/utils.py Normal file
View File

@@ -0,0 +1,44 @@
import contextlib
import random
from typing import Generator
import pytest
from autogpt.agent import Agent
def generate_noise(noise_size: int) -> str:
random.seed(42)
return "".join(
random.choices(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789",
k=noise_size,
)
)
def setup_mock_input(monkeypatch: pytest.MonkeyPatch, cycle_count: int) -> None:
"""
Sets up the mock input for testing.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:param cycle_count: The number of cycles to mock.
"""
input_sequence = ["y"] * (cycle_count) + ["EXIT"]
def input_generator() -> Generator[str, None, None]:
"""
Creates a generator that yields input strings from the given sequence.
"""
yield from input_sequence
gen = input_generator()
monkeypatch.setattr("builtins.input", lambda _: next(gen))
def run_interaction_loop(
monkeypatch: pytest.MonkeyPatch, agent: Agent, cycle_count: int
) -> None:
setup_mock_input(monkeypatch, cycle_count)
with contextlib.suppress(SystemExit):
agent.start_interaction_loop()

View File

@@ -0,0 +1,43 @@
import glob
import json
import os
from typing import Any, Dict
def deep_merge(source: Dict[Any, Any], dest: Dict[Any, Any]) -> Dict[Any, Any]:
for key, value in source.items():
if isinstance(value, Dict):
dest[key] = deep_merge(value, dest.get(key, {}))
else:
dest[key] = value
return dest
import collections
def recursive_sort_dict(data: dict) -> dict:
for key, value in data.items():
if isinstance(value, dict):
data[key] = recursive_sort_dict(value)
return collections.OrderedDict(sorted(data.items()))
# setup
cwd = os.getcwd() # get current working directory
new_score_filename_pattern = os.path.join(cwd, "tests/challenges/new_score_*.json")
current_score_filename = os.path.join(cwd, "tests/challenges/current_score.json")
merged_data: Dict[str, Any] = {}
for filename in glob.glob(new_score_filename_pattern):
with open(filename, "r") as f_new:
data = json.load(f_new)
merged_data = deep_merge(
data, merged_data
) # deep merge the new data with the merged data
os.remove(filename) # remove the individual file
sorted_data = recursive_sort_dict(merged_data)
with open(current_score_filename, "w") as f_current:
json.dump(sorted_data, f_current, indent=4)