Files
Reinier van der Leer 9012ff4db2 refactor(benchmark): Interface & type consoledation, and arch change, to allow adding challenge providers
Squashed commit of the following:

commit 7d6476d3297860f74c276d571da995d958a8cc1a
Author: Reinier van der Leer <pwuts@agpt.co>
Date:   Tue Jan 9 18:10:45 2024 +0100

    refactor(benchmark/challenge): Set up structure to support more challenge providers

    - Move `Challenge`, `ChallengeData`, `load_challenges` to `challenges/builtin.py` and rename to `BuiltinChallenge`, `BuiltinChallengeSpec`, `load_builtin_challenges`
    - Create `BaseChallenge` to serve as interface and base class for different challenge implementations
    - Create `ChallengeInfo` model to serve as universal challenge info object
    - Create `get_challenge_from_source_uri` function in `challenges/__init__.py`
    - Replace `ChallengeData` by `ChallengeInfo` everywhere except in `BuiltinChallenge`
    - Add strong typing to `task_informations` store in app.py
    - Use `call.duration` in `finalize_test_report` and remove `timer` fixture
    - Update docstring on `challenges/__init__.py:get_unique_categories`
    - Add docstring to `generate_test.py`

commit 5df2aa7939b45d85a2c2b5de9ac0522330d1502a
Author: Reinier van der Leer <pwuts@agpt.co>
Date:   Tue Jan 9 16:58:01 2024 +0100

    refactor(benchmark): Refactor & rename functions in agent_interface.py and agent_api_interface.py

    - `copy_artifacts_into_temp_folder` -> `copy_challenge_artifacts_into_workspace`
    - `copy_agent_artifacts_into_folder` -> `download_agent_artifacts_into_folder`
    - Reorder parameters of `run_api_agent`, `copy_challenge_artifacts_into_workspace`; use `Path` instead of `str`

commit 6a256fef4c7950b7ee82fb801e70c83afe6b6f8b
Author: Reinier van der Leer <pwuts@agpt.co>
Date:   Tue Jan 9 16:02:25 2024 +0100

    refactor(benchmark): Refactor & typefix report generation and handling logic

    - Rename functions in reports.py and ReportManager.py to better reflect what they do
       - `get_previous_test_results` -> `get_and_update_success_history`
       - `generate_single_call_report` -> `initialize_test_report`
       - `finalize_reports` -> `finalize_test_report`
       - `ReportManager.end_info_report` -> `SessionReportManager.finalize_session_report`
    - Modify `pytest_runtest_makereport` hook in conftest.py to finalize the report immediately after the challenge finishes running instead of after teardown
       - Move result processing logic from `initialize_test_report` to `finalize_test_report` in reports.py
    - Use `Test` and `Report` types from report_types.py where possible instead of untyped dicts: reports.py, utils.py, ReportManager.py
    - Differentiate `ReportManager` into `SessionReportManager`, `RegressionTestsTracker`, `SuccessRateTracker`
    - Move filtering of optional challenge categories from challenge.py (`Challenge.skip_optional_categories`) to conftest.py (`pytest_collection_modifyitems`)
    - Remove unused `scores` fixture in conftest.py

commit 370d6dbf5df75d78e3878877968e8cd309d6d7fb
Author: Reinier van der Leer <pwuts@agpt.co>
Date:   Tue Jan 9 15:16:43 2024 +0100

    refactor(benchmark): Simplify models in report_types.py

    - Removed ForbidOptionalMeta and BaseModelBenchmark classes.
    - Changed model attributes to optional: `Metrics.difficulty`, `Metrics.success`, `Metrics.success_percentage`, `Metrics.run_time`, and `Test.reached_cutoff`.
    - Added validator to `Metrics` model to require `success` and `run_time` fields if `attempted=True`.
    - Added default values to all optional model fields.
    - Removed duplicate imports.
    - Added condition in process_report.py to prevent null lookups if `metrics.difficulty` is not set.
2024-01-18 15:19:06 +01:00

335 lines
12 KiB
Python

import datetime
import glob
import json
import logging
import sys
import time
import uuid
from collections import deque
from multiprocessing import Process
from pathlib import Path
from typing import Optional
import httpx
import psutil
from agent_protocol_client import AgentApi, ApiClient, ApiException, Configuration
from agent_protocol_client.models import Task, TaskRequestBody
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Extra, ValidationError
from agbenchmark.challenges import ChallengeInfo
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.processing.report_types_v2 import (
BenchmarkRun,
Metrics,
RepositoryInfo,
RunDetails,
TaskInfo,
)
from agbenchmark.schema import TaskEvalRequestBody
from agbenchmark.utils.utils import write_pretty_json
sys.path.append(str(Path(__file__).parent.parent))
logger = logging.getLogger(__name__)
CHALLENGES: dict[str, ChallengeInfo] = {}
challenges_path = Path(__file__).parent / "challenges"
challenge_spec_files = deque(
glob.glob(
f"{challenges_path}/**/data.json",
recursive=True,
)
)
logger.debug("Loading challenges...")
while challenge_spec_files:
challenge_spec_file = Path(challenge_spec_files.popleft())
challenge_relpath = challenge_spec_file.relative_to(challenges_path.parent)
if challenge_relpath.is_relative_to("challenges/deprecated"):
continue
logger.debug(f"Loading {challenge_relpath}...")
try:
challenge_info = ChallengeInfo.parse_file(challenge_spec_file)
except ValidationError as e:
if logging.getLogger().level == logging.DEBUG:
logger.warning(f"Spec file {challenge_relpath} failed to load:\n{e}")
logger.debug(f"Invalid challenge spec: {challenge_spec_file.read_text()}")
continue
challenge_info.spec_file = challenge_spec_file
if not challenge_info.eval_id:
challenge_info.eval_id = str(uuid.uuid4())
# this will sort all the keys of the JSON systematically
# so that the order is always the same
write_pretty_json(challenge_info.dict(), challenge_spec_file)
CHALLENGES[challenge_info.eval_id] = challenge_info
class BenchmarkTaskInfo(BaseModel):
task_id: str
start_time: datetime.datetime
challenge_info: ChallengeInfo
task_informations: dict[str, BenchmarkTaskInfo] = {}
def find_agbenchmark_without_uvicorn():
pids = []
for process in psutil.process_iter(
attrs=[
"pid",
"cmdline",
"name",
"username",
"status",
"cpu_percent",
"memory_info",
"create_time",
"cwd",
"connections",
]
):
try:
# Convert the process.info dictionary values to strings and concatenate them
full_info = " ".join([str(v) for k, v in process.as_dict().items()])
if "agbenchmark" in full_info and "uvicorn" not in full_info:
pids.append(process.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
class CreateReportRequest(BaseModel):
test: str = None
test_run_id: str = None
# category: Optional[str] = []
mock: Optional[bool] = False
class Config:
extra = Extra.forbid # this will forbid any extra fields
updates_list = []
origins = [
"http://localhost:8000",
"http://localhost:8080",
"http://127.0.0.1:5000",
"http://localhost:5000",
]
def stream_output(pipe):
for line in pipe:
print(line, end="")
def setup_fastapi_app(agbenchmark_config: AgentBenchmarkConfig) -> FastAPI:
from agbenchmark.agent_api_interface import upload_artifacts
from agbenchmark.challenges import get_challenge_from_source_uri
from agbenchmark.main import run_benchmark
configuration = Configuration(
host=agbenchmark_config.host or "http://localhost:8000"
)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
router = APIRouter()
@router.post("/reports")
def run_single_test(body: CreateReportRequest) -> dict:
pids = find_agbenchmark_without_uvicorn()
logger.info(f"pids already running with agbenchmark: {pids}")
logger.debug(f"Request to /reports: {body.dict()}")
# Start the benchmark in a separate thread
benchmark_process = Process(
target=lambda: run_benchmark(
config=agbenchmark_config,
tests=(body.test,),
mock=body.mock or False,
)
)
benchmark_process.start()
# Wait for the benchmark to finish, with a timeout of 200 seconds
timeout = 200
start_time = time.time()
while benchmark_process.is_alive():
if time.time() - start_time > timeout:
logger.warning(f"Benchmark run timed out after {timeout} seconds")
benchmark_process.terminate()
break
time.sleep(1)
else:
logger.debug(f"Benchmark finished running in {time.time() - start_time} s")
# List all folders in the current working directory
path_reports = agbenchmark_config.reports_folder
folders = [folder for folder in path_reports.iterdir() if folder.is_dir()]
# Sort the folders based on their names
sorted_folders = sorted(folders, key=lambda x: x.name)
# Get the last folder
latest_folder = sorted_folders[-1] if sorted_folders else None
# Read report.json from this folder
if latest_folder:
report_path = latest_folder / "report.json"
logger.debug(f"Getting latest report from {report_path}")
if report_path.exists():
with report_path.open() as file:
data = json.load(file)
logger.debug(f"Report data: {data}")
else:
logger.error(
"Could not get result after running benchmark: "
f"'report.json' does not exist in '{latest_folder}'"
)
else:
logger.error(
"Could not get result after running benchmark: no reports found"
)
return data
@router.post("/agent/tasks", tags=["agent"])
async def create_agent_task(task_eval_request: TaskEvalRequestBody) -> Task:
"""
Creates a new task using the provided TaskEvalRequestBody and returns a Task.
Args:
task_eval_request: `TaskRequestBody` including an eval_id.
Returns:
Task: A new task with task_id, input, additional_input,
and empty lists for artifacts and steps.
Example:
Request (TaskEvalRequestBody defined in schema.py):
{
...,
"eval_id": "50da533e-3904-4401-8a07-c49adf88b5eb"
}
Response (Task defined in `agent_protocol_client.models`):
{
"task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
"input": "Write the word 'Washington' to a .txt file",
"artifacts": []
}
"""
try:
challenge_info = CHALLENGES[task_eval_request.eval_id]
async with ApiClient(configuration) as api_client:
api_instance = AgentApi(api_client)
task_input = challenge_info.task
task_request_body = TaskRequestBody(input=task_input)
task_response = await api_instance.create_agent_task(
task_request_body=task_request_body
)
task_info = BenchmarkTaskInfo(
task_id=task_response.task_id,
start_time=datetime.datetime.now(datetime.timezone.utc),
challenge_info=challenge_info,
)
task_informations[task_info.task_id] = task_info
if input_artifacts_dir := challenge_info.task_artifacts_dir:
await upload_artifacts(
api_instance,
input_artifacts_dir,
task_response.task_id,
"artifacts_in",
)
return task_response
except ApiException as e:
logger.error(f"Error whilst trying to create a task:\n{e}")
logger.error(
"The above error was caused while processing request: "
f"{task_eval_request}"
)
raise HTTPException(500)
@router.post("/agent/tasks/{task_id}/steps")
async def proxy(request: Request, task_id: str):
timeout = httpx.Timeout(300.0, read=300.0) # 5 minutes
async with httpx.AsyncClient(timeout=timeout) as client:
# Construct the new URL
new_url = f"{configuration.host}/ap/v1/agent/tasks/{task_id}/steps"
# Forward the request
response = await client.post(
new_url,
data=await request.body(),
headers=dict(request.headers),
)
# Return the response from the forwarded request
return Response(content=response.content, status_code=response.status_code)
@router.post("/agent/tasks/{task_id}/evaluations")
async def create_evaluation(task_id: str) -> BenchmarkRun:
task_info = task_informations[task_id]
challenge = get_challenge_from_source_uri(task_info.challenge_info.source_uri)
try:
async with ApiClient(configuration) as api_client:
api_instance = AgentApi(api_client)
eval_results = await challenge.evaluate_task_state(
api_instance, task_id
)
eval_info = BenchmarkRun(
repository_info=RepositoryInfo(),
run_details=RunDetails(
command=f"agbenchmark --test={challenge.info.name}",
benchmark_start_time=(
task_info.start_time.strftime("%Y-%m-%dT%H:%M:%S+00:00")
),
test_name=challenge.info.name,
),
task_info=TaskInfo(
data_path=challenge.info.source_uri,
is_regression=None,
category=[c.value for c in challenge.info.category],
task=challenge.info.task,
answer=challenge.info.reference_answer or "",
description=challenge.info.description or "",
),
metrics=Metrics(
success=all(e.passed for e in eval_results),
success_percentage=(
100 * sum(e.score for e in eval_results) / len(eval_results)
if eval_results # avoid division by 0
else 0
),
attempted=True,
),
config={},
)
logger.debug(f"Returning evaluation data:\n{eval_info.json(indent=4)}")
return eval_info
except ApiException as e:
logger.error(f"Error {e} whilst trying to evaluate task: {task_id}")
raise HTTPException(500)
app.include_router(router, prefix="/ap/v1")
return app