try: import click import github except ImportError: import os os.system("pip3 install click") os.system("pip3 install PyGithub") import click @click.group() def cli(): pass @cli.command() def setup(): """Installs dependencies needed for your system. Works with Linux, MacOS and Windows WSL.""" import os import subprocess script_dir = os.path.dirname(os.path.realpath(__file__)) setup_script = os.path.join(script_dir, "setup.sh") if os.path.exists(setup_script): subprocess.Popen([setup_script], cwd=script_dir) click.echo(click.style("šŸš€ Setup initiated", fg="green")) else: click.echo( click.style( "āŒ Error: setup.sh does not exist in the current directory.", fg="red" ) ) try: # Check if GitHub user name is configured user_name = ( subprocess.check_output(["git", "config", "user.name"]) .decode("utf-8") .strip() ) user_email = ( subprocess.check_output(["git", "config", "user.email"]) .decode("utf-8") .strip() ) if user_name and user_email: click.echo( click.style( f"āœ… GitHub account is configured with username: {user_name} and email: {user_email}", fg="green", ) ) else: raise subprocess.CalledProcessError( returncode=1, cmd="git config user.name or user.email" ) except subprocess.CalledProcessError: # If the GitHub account is not configured, print instructions on how to set it up click.echo(click.style("āŒ GitHub account is not configured.", fg="red")) click.echo( click.style( "To configure your GitHub account, use the following commands:", fg="red", ) ) click.echo( click.style( ' git config --global user.name "Your GitHub Username"', fg="red" ) ) click.echo( click.style( ' git config --global user.email "Your GitHub Email"', fg="red" ) ) # Check for the existence of the .github_access_token file if os.path.exists(".github_access_token"): with open(".github_access_token", "r") as file: github_access_token = file.read().strip() if github_access_token: click.echo( click.style( "āœ… GitHub access token loaded successfully.", fg="green" ) ) # Check if the token has the required permissions import requests headers = {"Authorization": f"token {github_access_token}"} response = requests.get("https://api.github.com/user", headers=headers) if response.status_code == 200: scopes = response.headers.get("X-OAuth-Scopes") if "public_repo" in scopes or "repo" in scopes: click.echo( click.style( "āœ… GitHub access token has the required permissions.", fg="green", ) ) else: click.echo( click.style( "āŒ GitHub access token does not have the required permissions. Please ensure it has 'public_repo' or 'repo' scope.", fg="red", ) ) else: click.echo( click.style( "āŒ Failed to validate GitHub access token. Please ensure it is correct.", fg="red", ) ) else: click.echo( click.style( "āŒ GitHub access token file is empty. Please follow the instructions below to set up your GitHub access token.", fg="red", ) ) else: # Create the .github_access_token file if it doesn't exist with open(".github_access_token", "w") as file: file.write("") # Instructions to set up GitHub access token click.echo( click.style( "āŒ To configure your GitHub access token, follow these steps:", fg="red" ) ) click.echo( click.style("\t1. Ensure you are logged into your GitHub account", fg="red") ) click.echo( click.style("\t2. Navigate to https://github.com/settings/tokens", fg="red") ) click.echo(click.style("\t6. Click on 'Generate new token'.", fg="red")) click.echo( click.style( "\t7. Fill out the form to generate a new token. Ensure you select the 'repo' scope.", fg="red", ) ) click.echo( click.style( "\t8. Open the '.github_access_token' file in the same directory as this script and paste the token into this file.", fg="red", ) ) click.echo( click.style("\t9. Save the file and run the setup command again.", fg="red") ) @cli.group() def agent(): """Commands to create, start and stop agents""" pass @agent.command() @click.argument("agent_name") def create(agent_name): """Create's a new agent with the agent name provieded""" import os import re import shutil if not re.match("^[a-zA-Z0-9_-]*$", agent_name): click.echo( click.style( f"šŸ˜ž Agent name '{agent_name}' is not valid. It should not contain spaces or special characters other than -_", fg="red", ) ) return try: new_agent_dir = f"./autogpts/{agent_name}" agent_json_file = f"./arena/{agent_name}.json" if not os.path.exists(new_agent_dir) and not os.path.exists(agent_json_file): shutil.copytree("./autogpts/forge", new_agent_dir) click.echo( click.style( f"šŸŽ‰ New agent '{agent_name}' created. The code for your new agent is in: autogpts/{agent_name}", fg="green", ) ) click.echo( click.style( f"šŸš€ If you would like to enter the arena, run './run arena enter {agent_name}'", fg="yellow", ) ) else: click.echo( click.style( f"šŸ˜ž Agent '{agent_name}' already exists. Enter a different name for your agent", fg="red", ) ) except Exception as e: click.echo(click.style(f"😢 An error occurred: {e}", fg="red")) @agent.command() @click.argument("agent_name") def start(agent_name): """Start agent command""" import os import subprocess script_dir = os.path.dirname(os.path.realpath(__file__)) agent_dir = os.path.join(script_dir, f"autogpts/{agent_name}") run_command = os.path.join(agent_dir, "run") if os.path.exists(agent_dir) and os.path.isfile(run_command): os.chdir(agent_dir) subprocess.Popen(["./run"], cwd=agent_dir) click.echo(f"Agent '{agent_name}' started") elif not os.path.exists(agent_dir): click.echo( click.style( f"šŸ˜ž Agent '{agent_name}' does not exist. Please create the agent first.", fg="red", ) ) else: click.echo( click.style( f"šŸ˜ž Run command does not exist in the agent '{agent_name}' directory.", fg="red", ) ) @agent.command() def stop(): """Stop agent command""" import os import signal import subprocess try: pid = int(subprocess.check_output(["lsof", "-t", "-i", ":8000"])) os.kill(pid, signal.SIGTERM) click.echo("Agent stopped") except subprocess.CalledProcessError as e: click.echo("Error: Unexpected error occurred.") except ProcessLookupError: click.echo("Error: No process with the specified PID was found.") @agent.command() def list(): """List agents command""" import os try: agents_dir = "./autogpts" agents_list = [ d for d in os.listdir(agents_dir) if os.path.isdir(os.path.join(agents_dir, d)) ] if agents_list: click.echo(click.style("Available agents: šŸ¤–", fg="green")) for agent in agents_list: click.echo(click.style(f"\tšŸ™ {agent}", fg="blue")) else: click.echo(click.style("No agents found šŸ˜ž", fg="red")) except FileNotFoundError: click.echo(click.style("The autogpts directory does not exist 😢", fg="red")) except Exception as e: click.echo(click.style(f"An error occurred: {e} 😢", fg="red")) @cli.group() def benchmark(): """Commands to start the benchmark and list tests and categories""" pass @benchmark.command( context_settings=dict( ignore_unknown_options=True, ) ) @click.argument("agent_name") @click.argument("subprocess_args", nargs=-1, type=click.UNPROCESSED) def start(agent_name, subprocess_args): """Starts the benchmark command""" import os import subprocess script_dir = os.path.dirname(os.path.realpath(__file__)) agent_dir = os.path.join(script_dir, f"autogpts/{agent_name}") benchmark_script = os.path.join(agent_dir, "run_benchmark.sh") if os.path.exists(agent_dir) and os.path.isfile(benchmark_script): os.chdir(agent_dir) subprocess.Popen([benchmark_script, *subprocess_args], cwd=agent_dir) click.echo( click.style( f"šŸš€ Running benchmark for '{agent_name}' with subprocess arguments: {' '.join(subprocess_args)}", fg="green", ) ) else: click.echo( click.style( f"šŸ˜ž Agent '{agent_name}' does not exist. Please create the agent first.", fg="red", ) ) @benchmark.group(name="categories") def benchmark_categories(): """Benchmark categories group command""" pass @benchmark_categories.command(name="list") def benchmark_categories_list(): """List benchmark categories command""" import glob import json import os categories = set() # Get the directory of this file this_dir = os.path.dirname(os.path.abspath(__file__)) glob_path = os.path.join( this_dir, "./benchmark/agbenchmark/challenges/**/[!deprecated]*/data.json" ) # Use it as the base for the glob pattern, excluding 'deprecated' directory for data_file in glob.glob(glob_path, recursive=True): with open(data_file, "r") as f: try: data = json.load(f) categories.update(data.get("category", [])) except json.JSONDecodeError: print(f"Error: {data_file} is not a valid JSON file.") continue except IOError: print(f"IOError: file could not be read: {data_file}") continue if categories: click.echo(click.style("Available categories: šŸ“š", fg="green")) for category in categories: click.echo(click.style(f"\tšŸ“– {category}", fg="blue")) else: click.echo(click.style("No categories found šŸ˜ž", fg="red")) @benchmark.group(name="tests") def benchmark_tests(): """Benchmark tests group command""" pass @benchmark_tests.command(name="list") def benchmark_tests_list(): """List benchmark tests command""" import glob import json import os import re tests = {} # Get the directory of this file this_dir = os.path.dirname(os.path.abspath(__file__)) glob_path = os.path.join( this_dir, "./benchmark/agbenchmark/challenges/**/[!deprecated]*/data.json" ) # Use it as the base for the glob pattern, excluding 'deprecated' directory for data_file in glob.glob(glob_path, recursive=True): with open(data_file, "r") as f: try: data = json.load(f) category = data.get("category", []) test_name = data.get("name", "") if category and test_name: if category[0] not in tests: tests[category[0]] = [] tests[category[0]].append(test_name) except json.JSONDecodeError: print(f"Error: {data_file} is not a valid JSON file.") continue except IOError: print(f"IOError: file could not be read: {data_file}") continue if tests: click.echo(click.style("Available tests: šŸ“š", fg="green")) for category, test_list in tests.items(): click.echo(click.style(f"\tšŸ“– {category}", fg="blue")) for test in sorted(test_list): test_name = ( " ".join(word for word in re.split("([A-Z][a-z]*)", test) if word) .replace("_", "") .replace("C L I", "CLI")[5:] .replace(" ", " ") ) test_name_padded = f"{test_name:<40}" click.echo(click.style(f"\t\tšŸ”¬ {test_name_padded} - {test}", fg="cyan")) else: click.echo(click.style("No tests found šŸ˜ž", fg="red")) @benchmark_tests.command(name="details") @click.argument("test_name") def benchmark_tests_details(test_name): """Benchmark test details command""" import glob import json import os # Get the directory of this file this_dir = os.path.dirname(os.path.abspath(__file__)) glob_path = os.path.join( this_dir, "./benchmark/agbenchmark/challenges/**/[!deprecated]*/data.json" ) # Use it as the base for the glob pattern, excluding 'deprecated' directory for data_file in glob.glob(glob_path, recursive=True): with open(data_file, "r") as f: try: data = json.load(f) if data.get("name") == test_name: click.echo( click.style( f"\n{data.get('name')}\n{'-'*len(data.get('name'))}\n", fg="blue", ) ) click.echo( click.style( f"\tCategory: {', '.join(data.get('category'))}", fg="green", ) ) click.echo(click.style(f"\tTask: {data.get('task')}", fg="green")) click.echo( click.style( f"\tDependencies: {', '.join(data.get('dependencies')) if data.get('dependencies') else 'None'}", fg="green", ) ) click.echo( click.style(f"\tCutoff: {data.get('cutoff')}\n", fg="green") ) click.echo( click.style("\tTest Conditions\n\t-------", fg="magenta") ) click.echo( click.style( f"\t\tAnswer: {data.get('ground').get('answer')}", fg="magenta", ) ) click.echo( click.style( f"\t\tShould Contain: {', '.join(data.get('ground').get('should_contain'))}", fg="magenta", ) ) click.echo( click.style( f"\t\tShould Not Contain: {', '.join(data.get('ground').get('should_not_contain'))}", fg="magenta", ) ) click.echo( click.style( f"\t\tFiles: {', '.join(data.get('ground').get('files'))}", fg="magenta", ) ) click.echo( click.style( f"\t\tEval: {data.get('ground').get('eval').get('type')}\n", fg="magenta", ) ) click.echo(click.style("\tInfo\n\t-------", fg="yellow")) click.echo( click.style( f"\t\tDifficulty: {data.get('info').get('difficulty')}", fg="yellow", ) ) click.echo( click.style( f"\t\tDescription: {data.get('info').get('description')}", fg="yellow", ) ) click.echo( click.style( f"\t\tSide Effects: {', '.join(data.get('info').get('side_effects'))}", fg="yellow", ) ) break except json.JSONDecodeError: print(f"Error: {data_file} is not a valid JSON file.") continue except IOError: print(f"IOError: file could not be read: {data_file}") continue @cli.command() def frontend(): """Starts the frontend""" import os import socket import subprocess try: output = subprocess.check_output(["lsof", "-t", "-i", ":8000"]) if output: click.echo("Agent is running.") else: click.echo("Error: Agent is not running. Please start an agent first.") except subprocess.CalledProcessError as e: click.echo("Error: Unexpected error occurred.") return frontend_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "frontend") run_file = os.path.join(frontend_dir, "run") if os.path.exists(frontend_dir) and os.path.isfile(run_file): subprocess.Popen(["./run"], cwd=frontend_dir) click.echo("Launching frontend") else: click.echo("Error: Frontend directory or run file does not exist.") @cli.group() def arena(): """Commands to enter the arena""" pass @arena.command() @click.argument("agent_name") @click.option("--branch", default="master", help="Branch to use instead of master") def enter(agent_name, branch): import json import os import subprocess from datetime import datetime from github import Github # Check if the agent_name directory exists in the autogpts directory agent_dir = f"./autogpts/{agent_name}" if not os.path.exists(agent_dir): click.echo( click.style( f"āŒ The directory for agent '{agent_name}' does not exist in the autogpts directory.", fg="red", ) ) click.echo( click.style( f"šŸš€ Run './run agent create {agent_name}' to create the agent.", fg="yellow", ) ) return else: # Check if the agent has already entered the arena if os.path.exists(f"arena/{agent_name}.json"): click.echo( click.style( f"āš ļø The agent '{agent_name}' has already entered the arena. Use './run arena submit' to update your submission.", fg="yellow", ) ) return # Check if there are staged changes staged_changes = [ line for line in subprocess.check_output(["git", "status", "--porcelain"]) .decode("utf-8") .split("\n") if line and line[0] in ("A", "M", "D", "R", "C") ] if staged_changes: click.echo( click.style( f"āŒ There are staged changes. Please commit or stash them and run the command again.", fg="red", ) ) return try: # Load GitHub access token from file with open(".github_access_token", "r") as file: github_access_token = file.read().strip() # Get GitHub repository URL github_repo_url = ( subprocess.check_output(["git", "config", "--get", "remote.origin.url"]) .decode("utf-8") .strip() ) # If --branch is passed, use it instead of master if branch: branch_to_use = branch else: branch_to_use = "master" # Get the commit hash of HEAD of the branch_to_use commit_hash_to_benchmark = ( subprocess.check_output(["git", "rev-parse", branch_to_use]) .decode("utf-8") .strip() ) arena_submission_branch = f"arena_submission_{agent_name}" # Create a new branch called arena_submission_{agent_name} # subprocess.check_call(['git', 'checkout', '-b', arena_submission_branch]) subprocess.check_call(["git", "checkout", arena_submission_branch]) # Create a dictionary with the necessary fields data = { "github_repo_url": github_repo_url, "timestamp": datetime.utcnow().isoformat(), "commit_hash_to_benchmark": commit_hash_to_benchmark, } # If --branch was passed, add branch_to_benchmark to the JSON file if branch: data["branch_to_benchmark"] = branch # Create agent directory if it does not exist subprocess.check_call(["mkdir", "-p", "arena"]) # Create a JSON file with the data with open(f"arena/{agent_name}.json", "w") as json_file: json.dump(data, json_file, indent=4) # Create a commit with the specified message subprocess.check_call(["git", "add", f"arena/{agent_name}.json"]) subprocess.check_call( ["git", "commit", "-m", f"{agent_name} entering the arena"] ) # Push the commit subprocess.check_call(["git", "push", "origin", arena_submission_branch]) # Create a PR into the parent repository g = Github(github_access_token) repo = g.get_repo(github_repo_url.split(":")[-1].split(".git")[0]) parent_repo = repo.parent if parent_repo: pr = parent_repo.create_pull( title=f"{agent_name} entering the arena", body="""**Introduction:** **Team Members:** **What we are working on:** Please replace this text with your own introduction, the names of your team members, and a brief description of your work.""", head=f"arena_submission_{agent_name}", base=branch_to_use, ) click.echo( click.style( f"šŸš€ {agent_name} has entered the arena! Please edit your PR description at the following URL: {pr.html_url}", fg="green", ) ) else: click.echo( click.style( "āŒ This repository does not have a parent repository to sync with.", fg="red", ) ) return # Switch back to the master branch subprocess.check_call(["git", "checkout", branch_to_use]) except Exception as e: click.echo(click.style(f"āŒ An error occurred: {e}", fg="red")) # Switch back to the master branch subprocess.check_call(["git", "checkout", branch_to_use]) if __name__ == "__main__": cli()