mirror of
https://github.com/aljazceru/Auto-GPT.git
synced 2025-12-17 05:54:26 +01:00
feat(benchmark): Add -N, --attempts option for multiple attempts per challenge
LLMs are probabilistic systems. Reproducibility of completions is not guaranteed. It only makes sense to account for this, by running challenges multiple times to obtain a success ratio rather than a boolean success/failure result. Changes: - Add `-N`, `--attempts` option to CLI and `attempts_per_challenge` parameter to `main.py:run_benchmark`. - Add dynamic `i_attempt` fixture through `pytest_generate_tests` hook in conftest.py to achieve multiple runs per challenge. - Modify `pytest_runtest_makereport` hook in conftest.py to handle multiple reporting calls per challenge. - Refactor report_types.py, reports.py, process_report.ty to allow multiple results per challenge. - Calculate `success_percentage` from results of the current run, rather than all known results ever. - Add docstrings to a number of models in report_types.py. - Allow `None` as a success value, e.g. for runs that did not render any results before being cut off. - Make SingletonReportManager thread-safe.
This commit is contained in:
@@ -59,6 +59,9 @@ def start():
|
||||
|
||||
|
||||
@cli.command(default=True)
|
||||
@click.option(
|
||||
"-N", "--attempts", default=1, help="Number of times to run each challenge."
|
||||
)
|
||||
@click.option(
|
||||
"-c",
|
||||
"--category",
|
||||
@@ -107,6 +110,7 @@ def run(
|
||||
test: tuple[str],
|
||||
category: tuple[str],
|
||||
skip_category: tuple[str],
|
||||
attempts: int,
|
||||
cutoff: Optional[int] = None,
|
||||
backend: Optional[bool] = False,
|
||||
# agent_path: Optional[Path] = None,
|
||||
@@ -153,6 +157,7 @@ def run(
|
||||
tests=test,
|
||||
categories=category,
|
||||
skip_categories=skip_category,
|
||||
attempts_per_challenge=attempts,
|
||||
cutoff=cutoff,
|
||||
)
|
||||
|
||||
@@ -171,6 +176,7 @@ def run(
|
||||
tests=test,
|
||||
categories=category,
|
||||
skip_categories=skip_category,
|
||||
attempts_per_challenge=attempts,
|
||||
cutoff=cutoff,
|
||||
)
|
||||
|
||||
|
||||
@@ -47,7 +47,10 @@ class BaseChallenge(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def test_method(
|
||||
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
|
||||
self,
|
||||
config: AgentBenchmarkConfig,
|
||||
request: pytest.FixtureRequest,
|
||||
i_attempt: int,
|
||||
) -> None:
|
||||
"""
|
||||
Test method for use by Pytest-based benchmark sessions. Should return normally
|
||||
|
||||
@@ -155,7 +155,10 @@ class BuiltinChallenge(BaseChallenge):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_method(
|
||||
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
|
||||
self,
|
||||
config: AgentBenchmarkConfig,
|
||||
request: pytest.FixtureRequest,
|
||||
i_attempt: int,
|
||||
) -> None:
|
||||
if os.environ.get("HELICONE_API_KEY"):
|
||||
from helicone.lock import HeliconeLockManager
|
||||
|
||||
@@ -353,7 +353,10 @@ class WebArenaChallenge(BaseChallenge):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_method(
|
||||
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
|
||||
self,
|
||||
config: AgentBenchmarkConfig,
|
||||
request: pytest.FixtureRequest,
|
||||
i_attempt: int,
|
||||
) -> None:
|
||||
if os.environ.get("HELICONE_API_KEY"):
|
||||
from helicone.lock import HeliconeLockManager
|
||||
|
||||
@@ -12,10 +12,11 @@ import pytest
|
||||
|
||||
from agbenchmark.challenges import OPTIONAL_CATEGORIES, BaseChallenge
|
||||
from agbenchmark.config import AgentBenchmarkConfig
|
||||
from agbenchmark.reports.processing.report_types import Test
|
||||
from agbenchmark.reports.ReportManager import RegressionTestsTracker
|
||||
from agbenchmark.reports.reports import (
|
||||
finalize_test_report,
|
||||
initialize_test_report,
|
||||
add_test_result_to_report,
|
||||
make_empty_test_report,
|
||||
session_finish,
|
||||
)
|
||||
from agbenchmark.utils.data_types import Category
|
||||
@@ -80,6 +81,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
|
||||
Args:
|
||||
parser: The Pytest CLI parser to which the command-line options are added.
|
||||
"""
|
||||
parser.addoption("-N", "--attempts", action="store")
|
||||
parser.addoption("--no-dep", action="store_true")
|
||||
parser.addoption("--mock", action="store_true")
|
||||
parser.addoption("--host", default=None)
|
||||
@@ -149,6 +151,9 @@ def mock(request: pytest.FixtureRequest) -> bool:
|
||||
return request.config.getoption("--mock")
|
||||
|
||||
|
||||
test_reports: dict[str, Test] = {}
|
||||
|
||||
|
||||
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
|
||||
"""
|
||||
Pytest hook that is called when a test report is being generated.
|
||||
@@ -159,14 +164,19 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
|
||||
call: The call object from which the test result is retrieved.
|
||||
"""
|
||||
challenge: type[BaseChallenge] = item.cls # type: ignore
|
||||
challenge_id = challenge.info.eval_id
|
||||
|
||||
if challenge_id not in test_reports:
|
||||
test_reports[challenge_id] = make_empty_test_report(challenge.info)
|
||||
|
||||
if call.when == "setup":
|
||||
test_name = item.nodeid.split("::")[1]
|
||||
item.user_properties.append(("test_name", test_name))
|
||||
initialize_test_report(item, challenge.info)
|
||||
|
||||
if call.when == "call":
|
||||
finalize_test_report(item, call, agbenchmark_config)
|
||||
add_test_result_to_report(
|
||||
test_reports[challenge_id], item, call, agbenchmark_config
|
||||
)
|
||||
|
||||
|
||||
def timeout_monitor(start_time: int) -> None:
|
||||
@@ -205,6 +215,11 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
|
||||
session_finish(agbenchmark_config)
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: pytest.Metafunc):
|
||||
if type(n := metafunc.config.getoption("-N")) is str:
|
||||
metafunc.parametrize("i_attempt", range(int(n)))
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(
|
||||
items: list[pytest.Item], config: pytest.Config
|
||||
) -> None:
|
||||
|
||||
@@ -21,6 +21,7 @@ def run_benchmark(
|
||||
tests: tuple[str] = tuple(),
|
||||
categories: tuple[str] = tuple(),
|
||||
skip_categories: tuple[str] = tuple(),
|
||||
attempts_per_challenge: int = 1,
|
||||
mock: bool = False,
|
||||
no_dep: bool = False,
|
||||
no_cutoff: bool = False,
|
||||
@@ -96,6 +97,9 @@ def run_benchmark(
|
||||
if active:
|
||||
pytest_args.append(flag)
|
||||
|
||||
if attempts_per_challenge > 1:
|
||||
pytest_args.append(f"--attempts={attempts_per_challenge}")
|
||||
|
||||
if cutoff:
|
||||
pytest_args.append(f"--cutoff={cutoff}")
|
||||
logger.debug(f"Setting cuttoff override to {cutoff} seconds.")
|
||||
@@ -104,6 +108,7 @@ def run_benchmark(
|
||||
pytest_args.append(str(current_dir / "generate_test.py"))
|
||||
|
||||
pytest_args.append("--cache-clear")
|
||||
logger.debug(f"Running Pytest with args: {pytest_args}")
|
||||
exit_code = pytest.main(pytest_args)
|
||||
|
||||
SingletonReportManager.clear_instance()
|
||||
|
||||
@@ -10,7 +10,9 @@ from typing import Any
|
||||
|
||||
from agbenchmark.config import AgentBenchmarkConfig
|
||||
from agbenchmark.reports.processing.graphs import save_single_radar_chart
|
||||
from agbenchmark.reports.processing.process_report import get_agent_category
|
||||
from agbenchmark.reports.processing.process_report import (
|
||||
get_highest_achieved_difficulty_per_category,
|
||||
)
|
||||
from agbenchmark.reports.processing.report_types import MetricsOverall, Report, Test
|
||||
from agbenchmark.utils.utils import get_highest_success_difficulty
|
||||
|
||||
@@ -79,7 +81,6 @@ class BaseReportManager:
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
logger.warning(f"Could not parse {self.report_file}: {e}")
|
||||
self.tests = {}
|
||||
self.save()
|
||||
|
||||
def save(self) -> None:
|
||||
with self.report_file.open("w") as f:
|
||||
@@ -113,6 +114,13 @@ class SessionReportManager(BaseReportManager):
|
||||
else:
|
||||
json.dump({k: v.dict() for k, v in self.tests.items()}, f, indent=4)
|
||||
|
||||
def load(self) -> None:
|
||||
super().load()
|
||||
if "tests" in self.tests: # type: ignore
|
||||
self.tests = Report.parse_obj(self.tests)
|
||||
else:
|
||||
self.tests = {n: Test.parse_obj(d) for n, d in self.tests.items()}
|
||||
|
||||
def add_test_report(self, test_name: str, test_report: Test) -> None:
|
||||
if isinstance(self.tests, Report):
|
||||
raise RuntimeError("Session report already finalized")
|
||||
@@ -148,7 +156,7 @@ class SessionReportManager(BaseReportManager):
|
||||
config=config.dict(exclude_none=True),
|
||||
)
|
||||
|
||||
agent_categories = get_agent_category(self.tests)
|
||||
agent_categories = get_highest_achieved_difficulty_per_category(self.tests)
|
||||
if len(agent_categories) > 1:
|
||||
save_single_radar_chart(
|
||||
agent_categories,
|
||||
@@ -166,7 +174,7 @@ class SessionReportManager(BaseReportManager):
|
||||
total_cost = 0
|
||||
all_costs_none = True
|
||||
for test_data in tests.values():
|
||||
cost = test_data.metrics.cost or 0.0
|
||||
cost = sum(r.cost or 0 for r in test_data.results)
|
||||
|
||||
if cost is not None: # check if cost is not None
|
||||
all_costs_none = False
|
||||
@@ -184,8 +192,8 @@ class RegressionTestsTracker(BaseReportManager):
|
||||
def add_test(self, test_name: str, test_details: dict) -> None:
|
||||
if test_name.startswith("Test"):
|
||||
test_name = test_name[4:]
|
||||
self.tests[test_name] = test_details
|
||||
|
||||
self.tests[test_name] = test_details
|
||||
self.save()
|
||||
|
||||
def has_regression_test(self, test_name: str) -> bool:
|
||||
@@ -195,11 +203,11 @@ class RegressionTestsTracker(BaseReportManager):
|
||||
class SuccessRatesTracker(BaseReportManager):
|
||||
"""Abstracts interaction with the regression tests file"""
|
||||
|
||||
tests: dict[str, list[bool]]
|
||||
tests: dict[str, list[bool | None]]
|
||||
|
||||
def update(self, test_name: str, success_history: list[bool]) -> None:
|
||||
def update(self, test_name: str, success_history: list[bool | None]) -> None:
|
||||
if test_name.startswith("Test"):
|
||||
test_name = test_name[4:]
|
||||
self.tests[test_name] = success_history
|
||||
|
||||
self.tests[test_name] = success_history
|
||||
self.save()
|
||||
|
||||
@@ -34,26 +34,23 @@ def get_reports_data(report_path: str) -> dict[str, Any]:
|
||||
return reports_data
|
||||
|
||||
|
||||
def get_agent_category(report: Report) -> dict[str, Any]:
|
||||
def get_highest_achieved_difficulty_per_category(report: Report) -> dict[str, Any]:
|
||||
categories: dict[str, Any] = {}
|
||||
|
||||
def get_highest_category_difficulty(data: Test) -> None:
|
||||
for category in data.category:
|
||||
if (
|
||||
category == "interface"
|
||||
or category == "iterate"
|
||||
or category == "product_advisor"
|
||||
):
|
||||
for _, test_data in report.tests.items():
|
||||
for category in test_data.category:
|
||||
if category in ("interface", "iterate", "product_advisor"):
|
||||
continue
|
||||
categories.setdefault(category, 0)
|
||||
if data.metrics.success and data.metrics.difficulty:
|
||||
num_dif = STRING_DIFFICULTY_MAP[data.metrics.difficulty]
|
||||
if (
|
||||
test_data.results
|
||||
and all(r.success for r in test_data.results)
|
||||
and test_data.difficulty
|
||||
):
|
||||
num_dif = STRING_DIFFICULTY_MAP[test_data.difficulty]
|
||||
if num_dif > categories[category]:
|
||||
categories[category] = num_dif
|
||||
|
||||
for _, test_data in report.tests.items():
|
||||
get_highest_category_difficulty(test_data)
|
||||
|
||||
return categories
|
||||
|
||||
|
||||
@@ -61,7 +58,7 @@ def all_agent_categories(reports_data: dict[str, Any]) -> dict[str, Any]:
|
||||
all_categories: dict[str, Any] = {}
|
||||
|
||||
for name, report in reports_data.items():
|
||||
categories = get_agent_category(report)
|
||||
categories = get_highest_achieved_difficulty_per_category(report)
|
||||
if categories: # only add to all_categories if categories is not empty
|
||||
logger.debug(f"Adding {name}: {categories}")
|
||||
all_categories[name] = categories
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
"""
|
||||
Model definitions used internally and for reports generated during command-line runs.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from pydantic import BaseModel, Field, constr, validator
|
||||
@@ -5,42 +9,66 @@ from pydantic import BaseModel, Field, constr, validator
|
||||
datetime_format = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+00:00$"
|
||||
|
||||
|
||||
class Metrics(BaseModel):
|
||||
difficulty: str | None
|
||||
success: bool | None = None
|
||||
run_time: str | None = None
|
||||
fail_reason: str | None = None
|
||||
success_percentage: float | None = Field(default=None, alias="success_%")
|
||||
attempted: bool
|
||||
cost: float | None = None
|
||||
class TestResult(BaseModel):
|
||||
"""Result details for a single run of a test/challenge."""
|
||||
|
||||
@validator("attempted")
|
||||
def require_metrics_if_attempted(cls, v: bool, values: dict[str, Any]):
|
||||
required_fields_if_attempted = ["success", "run_time"]
|
||||
success: bool | None = None
|
||||
"""Whether the run was successful"""
|
||||
run_time: str | None = None
|
||||
"""The (formatted) duration of the run"""
|
||||
fail_reason: str | None = None
|
||||
"""If applicable, the reason why the run was not successful"""
|
||||
reached_cutoff: bool | None = None # None if in progress
|
||||
"""Whether the run had to be stopped due to reaching the timeout"""
|
||||
cost: float | None = None
|
||||
"""The (known) cost incurred by the run, e.g. from using paid LLM APIs"""
|
||||
|
||||
@validator("fail_reason")
|
||||
def success_xor_fail_reason(cls, v: str | None, values: dict[str, Any]):
|
||||
if v:
|
||||
for f in required_fields_if_attempted:
|
||||
assert (
|
||||
values.get(f) is not None
|
||||
), f"'{f}' must be defined if attempted is True"
|
||||
success = values["success"]
|
||||
assert not success, "fail_reason must only be specified if success=False"
|
||||
else:
|
||||
assert values["success"], "fail_reason is required if success=False"
|
||||
return v
|
||||
|
||||
|
||||
class TestMetrics(BaseModel):
|
||||
"""
|
||||
Result metrics for a set of runs for a test/challenge. Should be an aggregate of all
|
||||
results for the same test/challenge within a benchmarking session.
|
||||
"""
|
||||
|
||||
attempted: bool
|
||||
"""Whether the challenge was attempted during this session"""
|
||||
is_regression: bool
|
||||
"""Whether the challenge was considered a regression test at the time of running"""
|
||||
success_percentage: float | None = Field(default=None, alias="success_%")
|
||||
"""Success rate (0-100) for this challenge within the session"""
|
||||
|
||||
|
||||
class MetricsOverall(BaseModel):
|
||||
"""Global metrics concerning a benchmarking session"""
|
||||
|
||||
run_time: str
|
||||
"""Duration from beginning to end of the session"""
|
||||
highest_difficulty: str
|
||||
percentage: float | None = None
|
||||
"""
|
||||
Difficulty of the most difficult challenge that succeeded at least once this session
|
||||
"""
|
||||
total_cost: float | None = None
|
||||
"""Total known cost of the session"""
|
||||
|
||||
|
||||
class Test(BaseModel):
|
||||
data_path: str
|
||||
is_regression: bool
|
||||
answer: str
|
||||
description: str
|
||||
metrics: Metrics
|
||||
category: List[str]
|
||||
difficulty: str | None
|
||||
data_path: str
|
||||
description: str
|
||||
task: str
|
||||
reached_cutoff: bool | None = None # None if in progress
|
||||
answer: str
|
||||
metrics: TestMetrics
|
||||
results: list[TestResult]
|
||||
metadata: dict[str, Any] | None = Field(default_factory=dict)
|
||||
|
||||
|
||||
@@ -57,9 +85,3 @@ class ReportBase(BaseModel):
|
||||
|
||||
class Report(ReportBase):
|
||||
tests: Dict[str, Test]
|
||||
|
||||
|
||||
class ReportV2(Test, ReportBase):
|
||||
test_name: str
|
||||
run_id: str | None
|
||||
team_name: str | None
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
"""Model definitions for use in the API"""
|
||||
|
||||
from pydantic import BaseModel, constr
|
||||
|
||||
datetime_format = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+00:00$"
|
||||
|
||||
|
||||
class BaseModelBenchmark(BaseModel):
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
|
||||
|
||||
class TaskInfo(BaseModelBenchmark):
|
||||
class TaskInfo(BaseModel):
|
||||
data_path: str
|
||||
is_regression: bool | None
|
||||
answer: str
|
||||
@@ -17,14 +14,14 @@ class TaskInfo(BaseModelBenchmark):
|
||||
task: str
|
||||
|
||||
|
||||
class RepositoryInfo(BaseModelBenchmark):
|
||||
class RepositoryInfo(BaseModel):
|
||||
repo_url: str | None = None
|
||||
team_name: str | None = None
|
||||
agent_git_commit_sha: str | None = None
|
||||
benchmark_git_commit_sha: str | None = None
|
||||
|
||||
|
||||
class Metrics(BaseModelBenchmark):
|
||||
class Metrics(BaseModel):
|
||||
cost: float | None = None
|
||||
success: bool
|
||||
attempted: bool
|
||||
@@ -34,7 +31,7 @@ class Metrics(BaseModelBenchmark):
|
||||
success_percentage: float | None = None
|
||||
|
||||
|
||||
class RunDetails(BaseModelBenchmark):
|
||||
class RunDetails(BaseModel):
|
||||
test_name: str
|
||||
run_id: str | None = None
|
||||
command: str
|
||||
@@ -42,7 +39,7 @@ class RunDetails(BaseModelBenchmark):
|
||||
benchmark_start_time: constr(regex=datetime_format)
|
||||
|
||||
|
||||
class BenchmarkRun(BaseModelBenchmark):
|
||||
class BenchmarkRun(BaseModel):
|
||||
repository_info: RepositoryInfo
|
||||
run_details: RunDetails
|
||||
task_info: TaskInfo
|
||||
|
||||
@@ -1,137 +1,129 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from agbenchmark.challenges import ChallengeInfo
|
||||
from agbenchmark.config import AgentBenchmarkConfig
|
||||
from agbenchmark.reports.processing.report_types import Metrics, Test
|
||||
from agbenchmark.reports.processing.report_types import Test, TestMetrics, TestResult
|
||||
from agbenchmark.reports.ReportManager import SingletonReportManager
|
||||
from agbenchmark.utils.data_types import DifficultyLevel
|
||||
from agbenchmark.utils.utils import calculate_success_percentage
|
||||
|
||||
# from agbenchmark.utils.get_data_from_helicone import get_data_from_helicone
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_and_update_success_history(test_name: str, info_details: Test) -> list[bool]:
|
||||
def get_and_update_success_history(
|
||||
test_name: str, success: bool | None
|
||||
) -> list[bool | None]:
|
||||
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
|
||||
|
||||
prev_test_results = SingletonReportManager().SUCCESS_RATE_TRACKER.tests.get(
|
||||
test_name, []
|
||||
)
|
||||
|
||||
if not mock and info_details.metrics.success is not None:
|
||||
if not mock:
|
||||
# only add if it's an actual test
|
||||
prev_test_results.append(info_details.metrics.success)
|
||||
prev_test_results.append(success)
|
||||
SingletonReportManager().SUCCESS_RATE_TRACKER.update(
|
||||
test_name, prev_test_results
|
||||
)
|
||||
|
||||
# can calculate success rate regardless of mock
|
||||
info_details.metrics.success_percentage = calculate_success_percentage(
|
||||
prev_test_results
|
||||
)
|
||||
|
||||
return prev_test_results
|
||||
|
||||
|
||||
def update_regression_tests(
|
||||
prev_test_results: list[bool],
|
||||
info_details: Test,
|
||||
prev_test_results: list[bool | None],
|
||||
test_report: Test,
|
||||
test_name: str,
|
||||
) -> None:
|
||||
if len(prev_test_results) >= 3 and prev_test_results[-3:] == [True, True, True]:
|
||||
# if the last 3 tests were successful, add to the regression tests
|
||||
info_details.is_regression = True
|
||||
test_report.metrics.is_regression = True
|
||||
SingletonReportManager().REGRESSION_MANAGER.add_test(
|
||||
test_name, info_details.dict(include={"difficulty", "data_path"})
|
||||
test_name, test_report.dict(include={"difficulty", "data_path"})
|
||||
)
|
||||
|
||||
|
||||
def initialize_test_report(
|
||||
item: pytest.Item,
|
||||
def make_empty_test_report(
|
||||
challenge_info: ChallengeInfo,
|
||||
):
|
||||
) -> Test:
|
||||
difficulty = challenge_info.difficulty
|
||||
if isinstance(difficulty, DifficultyLevel):
|
||||
difficulty = difficulty.value
|
||||
|
||||
# Extract the challenge_location from the class
|
||||
# challenge_location: str = getattr(item.cls, "CHALLENGE_LOCATION", "")
|
||||
# test_name = item.nodeid.split("::")[1]
|
||||
# item.test_name = test_name
|
||||
|
||||
test_info = dict(item.user_properties).get("info_details") or Test(
|
||||
data_path=challenge_info.source_uri,
|
||||
is_regression=False,
|
||||
return Test(
|
||||
category=[c.value for c in challenge_info.category],
|
||||
difficulty=difficulty,
|
||||
data_path=challenge_info.source_uri,
|
||||
description=challenge_info.description or "",
|
||||
task=challenge_info.task,
|
||||
answer=challenge_info.reference_answer or "",
|
||||
description=challenge_info.description or "",
|
||||
metrics=Metrics(
|
||||
difficulty=difficulty,
|
||||
attempted=False,
|
||||
),
|
||||
metrics=TestMetrics(attempted=False, is_regression=False),
|
||||
results=[],
|
||||
)
|
||||
|
||||
# user facing reporting
|
||||
if item:
|
||||
item.user_properties.append(("info_details", test_info))
|
||||
|
||||
return test_info
|
||||
|
||||
|
||||
def finalize_test_report(
|
||||
item: pytest.Item, call: pytest.CallInfo, config: AgentBenchmarkConfig
|
||||
def add_test_result_to_report(
|
||||
test_report: Test,
|
||||
item: pytest.Item,
|
||||
call: pytest.CallInfo,
|
||||
config: AgentBenchmarkConfig,
|
||||
) -> None:
|
||||
user_properties: dict = dict(item.user_properties)
|
||||
|
||||
info_details: Test = user_properties.get("info_details", {})
|
||||
test_name: str = user_properties.get("test_name", "")
|
||||
|
||||
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
|
||||
|
||||
if call.excinfo is None:
|
||||
info_details.metrics.success = True
|
||||
else:
|
||||
if not mock: # don't remove if it's a mock test
|
||||
if call.excinfo:
|
||||
if not mock:
|
||||
SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name)
|
||||
info_details.metrics.fail_reason = str(call.excinfo.value)
|
||||
if call.excinfo.typename == "Skipped":
|
||||
info_details.metrics.attempted = False
|
||||
info_details.metrics.attempted = True
|
||||
info_details.metrics.run_time = f"{str(round(call.duration, 3))} seconds"
|
||||
info_details.reached_cutoff = user_properties.get("timed_out", False)
|
||||
|
||||
prev_test_results: list[bool] = get_and_update_success_history(
|
||||
test_name, info_details
|
||||
test_report.metrics.attempted = call.excinfo.typename != "Skipped"
|
||||
else:
|
||||
test_report.metrics.attempted = True
|
||||
|
||||
test_report.results.append(
|
||||
TestResult(
|
||||
success=call.excinfo is None,
|
||||
run_time=f"{str(round(call.duration, 3))} seconds",
|
||||
fail_reason=str(call.excinfo.value) if call.excinfo else None,
|
||||
reached_cutoff=user_properties.get("timed_out", False),
|
||||
)
|
||||
)
|
||||
test_report.metrics.success_percentage = (
|
||||
sum(r.success or False for r in test_report.results)
|
||||
/ len(test_report.results)
|
||||
* 100
|
||||
)
|
||||
|
||||
update_regression_tests(prev_test_results, info_details, test_name)
|
||||
prev_test_results: list[bool | None] = get_and_update_success_history(
|
||||
test_name, test_report.results[-1].success
|
||||
)
|
||||
|
||||
if info_details and test_name:
|
||||
update_regression_tests(prev_test_results, test_report, test_name)
|
||||
|
||||
if test_report and test_name:
|
||||
# if "--mock" not in sys.argv and os.environ.get("HELICONE_API_KEY"):
|
||||
# logger.debug("Getting cost from Helicone")
|
||||
# info_details.metrics.cost = get_data_from_helicone(test_name)
|
||||
# test_report.metrics.cost = get_data_from_helicone(test_name)
|
||||
# logger.debug(f"Cost: {cost}")
|
||||
|
||||
if "--mock" not in sys.argv:
|
||||
if not mock:
|
||||
update_challenges_already_beaten(
|
||||
config.challenges_already_beaten_file, info_details, test_name
|
||||
config.challenges_already_beaten_file, test_report, test_name
|
||||
)
|
||||
|
||||
SingletonReportManager().INFO_MANAGER.add_test_report(test_name, info_details)
|
||||
SingletonReportManager().INFO_MANAGER.add_test_report(test_name, test_report)
|
||||
|
||||
|
||||
def update_challenges_already_beaten(
|
||||
challenges_already_beaten_file: Path, info_details: Test, test_name: str
|
||||
challenges_already_beaten_file: Path, test_report: Test, test_name: str
|
||||
) -> None:
|
||||
current_run_successful = info_details.metrics.success
|
||||
current_run_successful = any(r.success for r in test_report.results)
|
||||
try:
|
||||
with open(challenges_already_beaten_file, "r") as f:
|
||||
challenges_beaten_before = json.load(f)
|
||||
|
||||
@@ -32,17 +32,6 @@ def replace_backslash(value: Any) -> Any:
|
||||
return value
|
||||
|
||||
|
||||
def calculate_success_percentage(results: list[bool]) -> float:
|
||||
# Take the last 10 results or all if less than 10
|
||||
last_results = results[-10:] if len(results) > 10 else results
|
||||
success_count = last_results.count(True)
|
||||
total_count = len(last_results)
|
||||
if total_count == 0:
|
||||
return 0
|
||||
success_percentage = (success_count / total_count) * 100 # as a percentage
|
||||
return round(success_percentage, 2)
|
||||
|
||||
|
||||
def get_test_path(json_file: str | Path) -> str:
|
||||
if isinstance(json_file, str):
|
||||
json_file = Path(json_file)
|
||||
@@ -71,8 +60,8 @@ def get_highest_success_difficulty(
|
||||
|
||||
for test_name, test_data in data.items():
|
||||
try:
|
||||
if test_data.metrics.success:
|
||||
difficulty_str = test_data.metrics.difficulty
|
||||
if any(r.success for r in test_data.results):
|
||||
difficulty_str = test_data.difficulty
|
||||
if not difficulty_str:
|
||||
continue
|
||||
|
||||
|
||||
Reference in New Issue
Block a user