refactor(agent): Fix all trivial linting errors

* Fix all but one flake8 linting errors
  * Remove unused imports
  * Wrap strings that are too long
  * Add basic autogpts/autogpt/.flake8
* Delete planning_agent.py
* Delete default_prompts.py
* Delete _test_json_parser.py
* Refactor the example function call in AgentProfileGeneratorConfiguration from a string to an object
* Rewrite/update docstrings here and there while I'm at it
* Minor change to the description of the `open_file` command
* Use `user-agent` from config in web_selenium.py
* Delete hardcoded ABILITIES from core/planning/templates.py
* Delete duplicate and superseded test from test_image_gen.py
* Fix parameter definitions in mock_commands.py
* Delete code analysis blocks from test_spinner.py, test_url_validation.py
This commit is contained in:
Reinier van der Leer
2023-12-02 05:22:33 +01:00
parent 604217a957
commit d938c2595e
97 changed files with 873 additions and 1053 deletions

11
autogpts/autogpt/.flake8 Normal file
View File

@@ -0,0 +1,11 @@
[flake8]
max-line-length = 88
extend-exclude =
.*_cache/,
.venv,
data/,
logs/,
tests/unit/data/,
extend-ignore =
# No whitespace before ':' conflicts with Black style for slices
E203,

View File

@@ -98,7 +98,7 @@ for report_file in sorted(report_files):
test_metrics = test_data["metrics"]
result_indicator = ""
if not "attempted" in test_metrics:
if "attempted" not in test_metrics:
return
elif test_metrics["attempted"]:
if test_name not in test_names:
@@ -116,7 +116,7 @@ for report_file in sorted(report_files):
for test_name, suite in test_tree.items():
try:
process_test(test_name, suite)
except KeyError as e:
except KeyError:
print(f"{test_name}.metrics: {suite['metrics']}")
raise

View File

@@ -1,3 +1,4 @@
import json
import logging
from autogpt.config import AIDirectives, AIProfile, Config
@@ -23,14 +24,61 @@ class AgentProfileGeneratorConfiguration(SystemConfiguration):
model_classification: LanguageModelClassification = UserConfigurable(
default=LanguageModelClassification.SMART_MODEL
)
_example_call: object = [
{
"type": "function",
"function": {
"name": "create_agent",
"arguments": {
"name": "CMOGPT",
"description": (
"a professional digital marketer AI that assists Solopreneurs "
"in growing their businesses by providing "
"world-class expertise in solving marketing problems "
"for SaaS, content products, agencies, and more."
),
"directives": {
"best_practices": [
(
"Engage in effective problem-solving, prioritization, "
"planning, and supporting execution to address your "
"marketing needs as your virtual "
"Chief Marketing Officer."
),
(
"Provide specific, actionable, and concise advice to "
"help you make informed decisions without the use of "
"platitudes or overly wordy explanations."
),
(
"Identify and prioritize quick wins and cost-effective "
"campaigns that maximize results with minimal time and "
"budget investment."
),
(
"Proactively take the lead in guiding you and offering "
"suggestions when faced with unclear information or "
"uncertainty to ensure your marketing strategy remains "
"on track."
),
],
"constraints": [
"Do not suggest illegal or unethical plans or strategies.",
"Take reasonable budgetary limits into account.",
],
},
},
},
}
]
system_prompt: str = UserConfigurable(
default=(
"Your job is to respond to a user-defined task, given in triple quotes, by "
"invoking the `create_agent` function to generate an autonomous agent to "
"complete the task. "
"You should supply a role-based name for the agent (_GPT), "
"an informative description for what the agent does, and "
"1 to 5 directives in each of the categories Best Practices and Constraints, "
"an informative description for what the agent does, and 1 to 5 directives "
"in each of the categories Best Practices and Constraints, "
"that are optimally aligned with the successful completion "
"of its assigned task.\n"
"\n"
@@ -38,38 +86,8 @@ class AgentProfileGeneratorConfiguration(SystemConfiguration):
'"""Help me with marketing my business"""\n\n'
"Example Call:\n"
"```\n"
"[" # tool_calls
'{"type": "function", "function": {'
'"name": "create_agent",'
' "arguments": {'
'"name": "CMOGPT",'
' "description": "a professional digital marketer AI that assists Solopreneurs in'
" growing their businesses by providing world-class expertise in solving"
' marketing problems for SaaS, content products, agencies, and more.",'
' "directives": {'
' "best_practices": ['
'"Engage in effective problem-solving, prioritization, planning, and'
" supporting execution to address your marketing needs as your virtual Chief"
' Marketing Officer.",'
' "Provide specific, actionable, and concise advice to help you make'
" informed decisions without the use of platitudes or overly wordy"
' explanations.",'
' "Identify and prioritize quick wins and cost-effective campaigns that'
' maximize results with minimal time and budget investment.",'
' "Proactively take the lead in guiding you and offering suggestions when'
" faced with unclear information or uncertainty to ensure your marketing"
' strategy remains on track."'
"]," # best_practices
' "constraints": ['
'"Do not suggest illegal or unethical plans or strategies.",'
' "Take reasonable budgetary limits into account."'
"]" # constraints
"}" # directives
"}" # arguments
"}" # function
"}" # tool call
"]\n" # tool_calls
"```"
f"{json.dumps(_example_call, indent=4)}"
"\n```"
)
)
user_prompt_template: str = UserConfigurable(default='"""{user_objective}"""')
@@ -85,7 +103,10 @@ class AgentProfileGeneratorConfiguration(SystemConfiguration):
),
"description": JSONSchema(
type=JSONSchema.Type.STRING,
description="An informative one sentence description of what the AI agent does",
description=(
"An informative one sentence description "
"of what the AI agent does"
),
required=True,
),
"directives": JSONSchema(
@@ -99,8 +120,9 @@ class AgentProfileGeneratorConfiguration(SystemConfiguration):
type=JSONSchema.Type.STRING,
),
description=(
"One to five highly effective best practices that are"
" optimally aligned with the completion of the given task."
"One to five highly effective best practices "
"that are optimally aligned with the completion "
"of the given task"
),
required=True,
),
@@ -112,8 +134,9 @@ class AgentProfileGeneratorConfiguration(SystemConfiguration):
type=JSONSchema.Type.STRING,
),
description=(
"One to five highly effective constraints that are"
" optimally aligned with the completion of the given task."
"One to five reasonable and efficacious constraints "
"that are optimally aligned with the completion "
"of the given task"
),
required=True,
),

View File

@@ -232,7 +232,7 @@ class Agent(
)
# Intercept ContextItem if one is returned by the command
if type(return_value) == tuple and isinstance(
if type(return_value) is tuple and isinstance(
return_value[1], ContextItem
):
context_item = return_value[1]

View File

@@ -230,10 +230,7 @@ class BaseAgent(Configurable[BaseAgentSettings], ABC):
return self.config.send_token_limit or self.llm.max_tokens * 3 // 4
async def propose_action(self) -> ThoughtProcessOutput:
"""Runs the agent for one cycle.
Params:
instruction: The instruction to put at the end of the prompt.
"""Proposes the next action to execute, based on the task and current state.
Returns:
The command name and arguments, if any, and the agent's thoughts.
@@ -283,7 +280,7 @@ class BaseAgent(Configurable[BaseAgentSettings], ABC):
user_input: The user's input, if any.
Returns:
The results of the command.
ActionResult: An object representing the result(s) of the command.
"""
...
@@ -294,13 +291,13 @@ class BaseAgent(Configurable[BaseAgentSettings], ABC):
extra_messages: Optional[list[ChatMessage]] = None,
**extras,
) -> ChatPrompt:
"""Constructs and returns a prompt with the following structure:
1. System prompt
2. Message history of the agent, truncated & prepended with running summary as needed
3. `cycle_instruction`
"""Constructs a prompt using `self.prompt_strategy`.
Params:
cycle_instruction: The final instruction for a thinking cycle
scratchpad: An object for plugins to write additional prompt elements to.
(E.g. commands, constraints, best practices)
extra_commands: Additional commands that the agent has access to.
extra_messages: Additional messages to include in the prompt.
"""
if not extra_commands:
extra_commands = []
@@ -349,7 +346,9 @@ class BaseAgent(Configurable[BaseAgentSettings], ABC):
output to the prompt.
Params:
instruction: The instruction for the current cycle, also used in constructing the prompt
prompt: The prompt that is about to be executed.
scratchpad: An object for plugins to write additional prompt elements to.
(E.g. commands, constraints, best practices)
Returns:
The prompt to execute
@@ -386,13 +385,13 @@ class BaseAgent(Configurable[BaseAgentSettings], ABC):
) -> ThoughtProcessOutput:
"""Called upon receiving a response from the chat model.
Adds the last/newest message in the prompt and the response to `history`,
and calls `self.parse_and_process_response()` to do the rest.
Calls `self.parse_and_process_response()`.
Params:
llm_response: The raw response from the chat model
prompt: The prompt that was executed
instruction: The instruction for the current cycle, also used in constructing the prompt
llm_response: The raw response from the chat model.
prompt: The prompt that was executed.
scratchpad: An object containing additional prompt elements from plugins.
(E.g. commands, constraints, best practices)
Returns:
The parsed command name and command args, if any, and the agent thoughts.
@@ -419,9 +418,10 @@ class BaseAgent(Configurable[BaseAgentSettings], ABC):
since the implementation depends on the role of the derivative Agent.
Params:
llm_response: The raw response from the chat model
prompt: The prompt that was executed
instruction: The instruction for the current cycle, also used in constructing the prompt
llm_response: The raw response from the chat model.
prompt: The prompt that was executed.
scratchpad: An object containing additional prompt elements from plugins.
(E.g. commands, constraints, best practices)
Returns:
The parsed command name and command args, if any, and the agent thoughts.

View File

@@ -61,10 +61,10 @@ class ContextMixin:
0,
ChatMessage.system(
"## Context\n"
+ self.context.format_numbered()
+ "\n\nWhen a context item is no longer needed and you are not done yet,"
" you can hide the item by specifying its number in the list above"
" to `hide_context_item`.",
f"{self.context.format_numbered()}\n\n"
"When a context item is no longer needed and you are not done yet, "
"you can hide the item by specifying its number in the list above "
"to `hide_context_item`.",
),
)

View File

@@ -1,340 +0,0 @@
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import TYPE_CHECKING, Literal, Optional
if TYPE_CHECKING:
from autogpt.config import Config
from autogpt.llm.base import ChatModelResponse, ChatSequence
from autogpt.memory.vector import VectorMemory
from autogpt.models.command_registry import CommandRegistry
from autogpt.agents.utils.exceptions import AgentException, InvalidAgentResponseError
from autogpt.json_utils.utilities import extract_dict_from_response, validate_dict
from autogpt.llm.base import Message
from autogpt.llm.utils import count_string_tokens
from autogpt.logs.log_cycle import (
CURRENT_CONTEXT_FILE_NAME,
NEXT_ACTION_FILE_NAME,
USER_INPUT_FILE_NAME,
LogCycleHandler,
)
from autogpt.models.action_history import (
ActionErrorResult,
ActionInterruptedByHuman,
ActionResult,
ActionSuccessResult,
)
from autogpt.models.context_item import ContextItem
from .agent import execute_command, extract_command
from .base import BaseAgent
from .features.context import ContextMixin
from .features.file_workspace import FileWorkspaceMixin
logger = logging.getLogger(__name__)
class PlanningAgent(ContextMixin, FileWorkspaceMixin, BaseAgent):
"""Agent class for interacting with AutoGPT."""
ThoughtProcessID = Literal["plan", "action", "evaluate"]
def __init__(
self,
command_registry: CommandRegistry,
memory: VectorMemory,
triggering_prompt: str,
config: Config,
cycle_budget: Optional[int] = None,
):
super().__init__(
command_registry=command_registry,
config=config,
default_cycle_instruction=triggering_prompt,
cycle_budget=cycle_budget,
)
self.memory = memory
"""VectorMemoryProvider used to manage the agent's context (TODO)"""
self.created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
"""Timestamp the agent was created; only used for structured debug logging."""
self.log_cycle_handler = LogCycleHandler()
"""LogCycleHandler for structured debug logging."""
self.plan: list[str] = []
"""List of steps that the Agent plans to take"""
def construct_base_prompt(
self, thought_process_id: ThoughtProcessID, **kwargs
) -> ChatSequence:
prepend_messages = kwargs["prepend_messages"] = kwargs.get(
"prepend_messages", []
)
# Add the current plan to the prompt, if any
if self.plan:
plan_section = [
"## Plan",
"To complete your task, you have composed the following plan:",
]
plan_section += [f"{i}. {s}" for i, s in enumerate(self.plan, 1)]
# Add the actions so far to the prompt
if self.event_history:
plan_section += [
"\n### Progress",
"So far, you have executed the following actions based on the plan:",
]
for i, cycle in enumerate(self.event_history, 1):
if not (cycle.action and cycle.result):
logger.warn(f"Incomplete action in history: {cycle}")
continue
plan_section.append(
f"{i}. You executed the command `{cycle.action.format_call()}`, "
f"which gave the result `{cycle.result}`."
)
prepend_messages.append(Message("system", "\n".join(plan_section)))
if self.context:
context_section = [
"## Context",
"Below is information that may be relevant to your task. These take up "
"part of your working memory, which is limited, so when a context item is "
"no longer relevant for your plan, use the `close_context_item` command to "
"free up some memory."
"\n",
self.context.format_numbered(),
]
prepend_messages.append(Message("system", "\n".join(context_section)))
match thought_process_id:
case "plan":
# TODO: add planning instructions; details about what to pay attention to when planning
pass
case "action":
# TODO: need to insert the functions here again?
pass
case "evaluate":
# TODO: insert latest action (with reasoning) + result + evaluation instructions
pass
case _:
raise NotImplementedError(
f"Unknown thought process '{thought_process_id}'"
)
return super().construct_base_prompt(
thought_process_id=thought_process_id, **kwargs
)
def response_format_instruction(self, thought_process_id: ThoughtProcessID) -> str:
match thought_process_id:
case "plan":
# TODO: add planning instructions; details about what to pay attention to when planning
response_format = f"""```ts
interface Response {{
thoughts: {{
// Thoughts
text: string;
// A short logical explanation about how the action is part of the earlier composed plan
reasoning: string;
// Constructive self-criticism
criticism: string;
}};
// A plan to achieve the goals with the available resources and/or commands.
plan: Array<{{
// An actionable subtask
subtask: string;
// Criterium to determine whether the subtask has been completed
completed_if: string;
}}>;
}}
```"""
pass
case "action":
# TODO: need to insert the functions here again?
response_format = """```ts
interface Response {
thoughts: {
// Thoughts
text: string;
// A short logical explanation about how the action is part of the earlier composed plan
reasoning: string;
// Constructive self-criticism
criticism: string;
};
// The action to take, from the earlier specified list of commands
command: {
name: string;
args: Record<string, any>;
};
}
```"""
pass
case "evaluate":
# TODO: insert latest action (with reasoning) + result + evaluation instructions
response_format = f"""```ts
interface Response {{
thoughts: {{
// Thoughts
text: string;
reasoning: string;
// Constructive self-criticism
criticism: string;
}};
result_evaluation: {{
// A short logical explanation of why the given partial result does or does not complete the corresponding subtask
reasoning: string;
// Whether the current subtask has been completed
completed: boolean;
// An estimate of the progress (0.0 - 1.0) that has been made on the subtask with the actions that have been taken so far
progress: float;
}};
}}
```"""
pass
case _:
raise NotImplementedError(
f"Unknown thought process '{thought_process_id}'"
)
response_format = re.sub(
r"\n\s+",
"\n",
response_format,
)
return (
f"Respond strictly with JSON. The JSON should be compatible with "
"the TypeScript type `Response` from the following:\n"
f"{response_format}\n"
)
def on_before_think(self, *args, **kwargs) -> ChatSequence:
prompt = super().on_before_think(*args, **kwargs)
self.log_cycle_handler.log_count_within_cycle = 0
self.log_cycle_handler.log_cycle(
self.ai_profile.ai_name,
self.created_at,
self.cycle_count,
self.event_history.episodes,
"event_history.json",
)
self.log_cycle_handler.log_cycle(
self.ai_profile.ai_name,
self.created_at,
self.cycle_count,
prompt.raw(),
CURRENT_CONTEXT_FILE_NAME,
)
return prompt
def execute(
self,
command_name: str,
command_args: dict[str, str] = {},
user_input: str = "",
) -> ActionResult:
result: ActionResult
if command_name == "human_feedback":
result = ActionInterruptedByHuman(feedback=user_input)
self.log_cycle_handler.log_cycle(
self.ai_profile.ai_name,
self.created_at,
self.cycle_count,
user_input,
USER_INPUT_FILE_NAME,
)
else:
for plugin in self.config.plugins:
if not plugin.can_handle_pre_command():
continue
command_name, arguments = plugin.pre_command(command_name, command_args)
try:
return_value = execute_command(
command_name=command_name,
arguments=command_args,
agent=self,
)
# Intercept ContextItem if one is returned by the command
if type(return_value) == tuple and isinstance(
return_value[1], ContextItem
):
self.context.add(return_value[1])
return_value = return_value[0]
result = ActionSuccessResult(outputs=return_value)
except AgentException as e:
result = ActionErrorResult.from_exception(e)
result_tlength = count_string_tokens(str(result), self.llm.name)
memory_tlength = count_string_tokens(
str(self.event_history.fmt_paragraph()), self.llm.name
)
if result_tlength + memory_tlength > self.send_token_limit:
result = ActionErrorResult(
reason=f"Command {command_name} returned too much output. "
"Do not execute this command again with the same arguments."
)
for plugin in self.config.plugins:
if not plugin.can_handle_post_command():
continue
if result.status == "success":
result.outputs = plugin.post_command(command_name, result.outputs)
elif result.status == "error":
result.reason = plugin.post_command(command_name, result.reason)
return result
def parse_and_process_response(
self,
llm_response: ChatModelResponse,
thought_process_id: ThoughtProcessID,
*args,
**kwargs,
) -> PlanningAgent.ThoughtProcessOutput:
if not llm_response.content:
raise InvalidAgentResponseError("Assistant response has no text content")
response_content = llm_response.content
for plugin in self.config.plugins:
if not plugin.can_handle_post_planning():
continue
response_content = plugin.post_planning(response_content)
assistant_reply_dict = extract_dict_from_response(response_content)
_, errors = validate_dict(assistant_reply_dict, self.config)
if errors:
raise InvalidAgentResponseError(
"Validation of response failed:\n "
+ ";\n ".join([str(e) for e in errors])
)
# Get command name and arguments
command_name, arguments = extract_command(
assistant_reply_dict, llm_response, self.config
)
response = command_name, arguments, assistant_reply_dict
self.log_cycle_handler.log_cycle(
self.ai_profile.ai_name,
self.created_at,
self.cycle_count,
assistant_reply_dict,
NEXT_ACTION_FILE_NAME,
)
return response

View File

@@ -7,7 +7,6 @@ from logging import Logger
from typing import TYPE_CHECKING, Callable, Optional
import distro
from pydantic import Field
if TYPE_CHECKING:
from autogpt.agents.agent import Agent
@@ -85,7 +84,10 @@ class OneShotAgentPromptConfiguration(SystemConfiguration):
required=True,
),
"plan": JSONSchema(
description="Short markdown-style bullet list that conveys the long-term plan",
description=(
"Short markdown-style bullet list that conveys the "
"long-term plan"
),
type=JSONSchema.Type.STRING,
required=True,
),
@@ -165,11 +167,9 @@ class OneShotAgentPromptStrategy(PromptStrategy):
) -> ChatPrompt:
"""Constructs and returns a prompt with the following structure:
1. System prompt
2. Message history of the agent, truncated & prepended with running summary as needed
2. Message history of the agent, truncated & prepended with running summary
as needed
3. `cycle_instruction`
Params:
cycle_instruction: The final instruction for a thinking cycle
"""
if not extra_messages:
extra_messages = []
@@ -264,7 +264,7 @@ class OneShotAgentPromptStrategy(PromptStrategy):
steps: list[str] = []
tokens: int = 0
start: int = len(episode_history)
# start: int = len(episode_history)
for i, c in reversed(list(enumerate(episode_history))):
step = f"### Step {i+1}: Executed `{c.action.format_call()}`\n"
@@ -291,11 +291,10 @@ class OneShotAgentPromptStrategy(PromptStrategy):
tokens += step_tokens
steps.insert(0, step)
start = i
# start = i
# TODO: summarize remaining
part = slice(0, start)
# # TODO: summarize remaining
# part = slice(0, start)
return "\n\n".join(steps)
@@ -315,10 +314,19 @@ class OneShotAgentPromptStrategy(PromptStrategy):
response_schema.to_typescript_object_interface("Response"),
)
instruction = (
(
"Respond strictly with a JSON object containing your thoughts, "
"and a tool_call specifying the next command to use."
)
if use_functions_api
else "Respond strictly with a JSON object."
)
return (
f"Respond strictly with a JSON object{' containing your thoughts, and a tool_call specifying the next command to use' if use_functions_api else ''}. "
"The JSON object should be compatible with the TypeScript type `Response` from the following:\n"
f"{response_format}"
f"{instruction} "
"The JSON object should be compatible with the TypeScript type `Response` "
f"from the following:\n{response_format}"
)
def _generate_intro_prompt(self, ai_profile: AIProfile) -> list[str]:

View File

@@ -52,7 +52,7 @@ class PromptScratchpad(BaseModel):
"""
for p, s in params.items():
invalid = False
if type(s) == str and s not in JSONSchema.Type._value2member_map_:
if type(s) is str and s not in JSONSchema.Type._value2member_map_:
invalid = True
logger.warning(
f"Cannot add command '{name}':"
@@ -72,7 +72,7 @@ class PromptScratchpad(BaseModel):
description=description,
parameters={
name: JSONSchema(type=JSONSchema.Type._value2member_map_[spec])
if type(spec) == str
if type(spec) is str
else JSONSchema.from_dict(spec)
for name, spec in params.items()
},

View File

@@ -58,7 +58,8 @@ class AgentProtocolServer:
config.bind = [f"localhost:{port}"]
app = FastAPI(
title="AutoGPT Server",
description="Forked from AutoGPT Forge; Modified version of The Agent Protocol.",
description="Forked from AutoGPT Forge; "
"Modified version of The Agent Protocol.",
version="v0.4",
)
@@ -93,7 +94,8 @@ class AgentProtocolServer:
else:
logger.warning(
f"Frontend not found. {frontend_path} does not exist. The frontend will not be available."
f"Frontend not found. {frontend_path} does not exist. "
"The frontend will not be available."
)
# Used to access the methods on this class from API route handlers
@@ -258,7 +260,7 @@ class AgentProtocolServer:
# Format step output
output = (
(
f"Command `{execute_command}({fmt_kwargs(execute_command_args)})` returned:"
f"`{execute_command}({fmt_kwargs(execute_command_args)})` returned:"
+ ("\n\n" if "\n" in str(execute_result) else " ")
+ f"{execute_result}\n\n"
)
@@ -361,9 +363,9 @@ class AgentProtocolServer:
file_path = artifact.relative_path
workspace = get_task_agent_file_workspace(task_id, self.agent_manager)
retrieved_artifact = workspace.read_file(file_path, binary=True)
except NotFoundError as e:
except NotFoundError:
raise
except FileNotFoundError as e:
except FileNotFoundError:
raise
return StreamingResponse(

View File

@@ -39,7 +39,7 @@ def cli(ctx: click.Context):
)
@click.option(
# TODO: this is a hidden option for now, necessary for integration testing.
# We should make this public once we're ready to roll out agent specific workspaces.
# We should make this public once we're ready to roll out agent specific workspaces.
"--workspace-directory",
"-w",
type=click.Path(file_okay=False),

View File

@@ -41,19 +41,23 @@ def apply_overrides_to_config(
"""Updates the config object with the given arguments.
Args:
continuous (bool): Whether to run in continuous mode
continuous_limit (int): The number of times to run in continuous mode
ai_settings_file (Path): The path to the ai_settings.yaml file
prompt_settings_file (Path): The path to the prompt_settings.yaml file
skip_reprompt (bool): Whether to skip the re-prompting messages at the beginning of the script
speak (bool): Whether to enable speak mode
debug (bool): Whether to enable debug mode
gpt3only (bool): Whether to enable GPT3.5 only mode
gpt4only (bool): Whether to enable GPT4 only mode
memory_type (str): The type of memory backend to use
browser_name (str): The name of the browser to use when using selenium to scrape the web
allow_downloads (bool): Whether to allow AutoGPT to download files natively
skips_news (bool): Whether to suppress the output of latest news on startup
config (Config): The config object to update.
continuous (bool): Whether to run in continuous mode.
continuous_limit (int): The number of times to run in continuous mode.
ai_settings_file (Path): The path to the ai_settings.yaml file.
prompt_settings_file (Path): The path to the prompt_settings.yaml file.
skip_reprompt (bool): Whether to skip the re-prompting messages on start.
speak (bool): Whether to enable speak mode.
debug (bool): Whether to enable debug mode.
log_level (int): The global log level for the application.
log_format (str): The format for the log(s).
log_file_format (str): Override the format for the log file.
gpt3only (bool): Whether to enable GPT3.5 only mode.
gpt4only (bool): Whether to enable GPT4 only mode.
memory_type (str): The type of memory backend to use.
browser_name (str): The name of the browser to use for scraping the web.
allow_downloads (bool): Whether to allow AutoGPT to download files natively.
skips_news (bool): Whether to suppress the output of latest news on startup.
"""
config.continuous_mode = False
config.tts_config.speak_mode = False
@@ -164,11 +168,15 @@ def apply_overrides_to_config(
if allow_downloads:
print_attribute("Native Downloading", "ENABLED")
logger.warn(
msg=f"{Back.LIGHTYELLOW_EX}AutoGPT will now be able to download and save files to your machine.{Back.RESET}"
msg=f"{Back.LIGHTYELLOW_EX}"
"AutoGPT will now be able to download and save files to your machine."
f"{Back.RESET}"
" It is recommended that you monitor any files it downloads carefully.",
)
logger.warn(
msg=f"{Back.RED + Style.BRIGHT}ALWAYS REMEMBER TO NEVER OPEN FILES YOU AREN'T SURE OF!{Style.RESET_ALL}",
msg=f"{Back.RED + Style.BRIGHT}"
"NEVER OPEN FILES YOU AREN'T SURE OF!"
f"{Style.RESET_ALL}",
)
config.allow_downloads = True
@@ -190,6 +198,6 @@ def check_model(
return model_name
logger.warn(
f"You do not have access to {model_name}. Setting {model_type} to gpt-3.5-turbo."
f"You don't have access to {model_name}. Setting {model_type} to gpt-3.5-turbo."
)
return "gpt-3.5-turbo"

View File

@@ -1,4 +1,6 @@
"""The application entry point. Can be invoked by a CLI or any other front end application."""
"""
The application entry point. Can be invoked by a CLI or any other front end application.
"""
import enum
import logging
import math
@@ -143,7 +145,8 @@ async def run_auto_gpt(
)
load_existing_agent = await clean_input(
config,
"Enter the number or name of the agent to run, or hit enter to create a new one:",
"Enter the number or name of the agent to run,"
" or hit enter to create a new one:",
)
if re.match(r"^\d+$", load_existing_agent):
load_existing_agent = existing_agents[int(load_existing_agent) - 1]
@@ -263,8 +266,9 @@ async def run_auto_gpt(
if not agent.config.allow_fs_access:
logger.info(
f"{Fore.YELLOW}NOTE: All files/directories created by this agent"
f" can be found inside its workspace at:{Fore.RESET} {agent.workspace.root}",
f"{Fore.YELLOW}"
"NOTE: All files/directories created by this agent can be found "
f"inside its workspace at:{Fore.RESET} {agent.workspace.root}",
extra={"preserve_color": True},
)
@@ -281,7 +285,8 @@ async def run_auto_gpt(
save_as_id = (
await clean_input(
config,
f"Press enter to save as '{agent_id}', or enter a different ID to save to:",
f"Press enter to save as '{agent_id}',"
" or enter a different ID to save to:",
)
or agent_id
)
@@ -574,7 +579,8 @@ async def run_interaction_loop(
)
elif result.status == "error":
logger.warn(
f"Command {command_name} returned an error: {result.error or result.reason}"
f"Command {command_name} returned an error: "
f"{result.error or result.reason}"
)

View File

@@ -94,7 +94,8 @@ async def interactively_revise_ai_settings(
new_constraint = (
await clean_input(
app_config,
f"Enter new constraint {i+1} (press enter to keep current, or '-' to remove):",
f"Enter new constraint {i+1}"
" (press enter to keep current, or '-' to remove):",
)
or constraint
)
@@ -119,7 +120,8 @@ async def interactively_revise_ai_settings(
new_resource = (
await clean_input(
app_config,
f"Enter new resource {i+1} (press enter to keep current, or '-' to remove):",
f"Enter new resource {i+1}"
" (press enter to keep current, or '-' to remove):",
)
or resource
)
@@ -144,7 +146,8 @@ async def interactively_revise_ai_settings(
new_best_practice = (
await clean_input(
app_config,
f"Enter new best practice {i+1} (press enter to keep current, or '-' to remove):",
f"Enter new best practice {i+1}"
" (press enter to keep current, or '-' to remove):",
)
or best_practice
)

View File

@@ -5,7 +5,7 @@ import sys
import requests
from colorama import Fore, Style
from git.repo import Repo
from git import InvalidGitRepositoryError, Repo
from prompt_toolkit import ANSI, PromptSession
from prompt_toolkit.history import InMemoryHistory
@@ -62,7 +62,7 @@ async def clean_input(config: Config, prompt: str = ""):
def get_bulletin_from_web():
try:
response = requests.get(
"https://raw.githubusercontent.com/Significant-Gravitas/AutoGPT/master/autogpts/autogpt/BULLETIN.md"
"https://raw.githubusercontent.com/Significant-Gravitas/AutoGPT/master/autogpts/autogpt/BULLETIN.md" # noqa: E501
)
if response.status_code == 200:
return response.text
@@ -77,7 +77,7 @@ def get_current_git_branch() -> str:
repo = Repo(search_parent_directories=True)
branch = repo.active_branch
return branch.name
except:
except InvalidGitRepositoryError:
return ""
@@ -94,7 +94,7 @@ def get_latest_bulletin() -> tuple[str, bool]:
news_header = Fore.YELLOW + "Welcome to AutoGPT!\n"
if new_bulletin or current_bulletin:
news_header += (
"Below you'll find the latest AutoGPT News and updates regarding features!\n"
"Below you'll find the latest AutoGPT News and feature updates!\n"
"If you don't wish to see this message, you "
"can run AutoGPT with the *--skip-news* flag.\n"
)
@@ -145,7 +145,7 @@ behalf. You acknowledge that using the System could expose you to potential liab
## Indemnification
By using the System, you agree to indemnify, defend, and hold harmless the Project Parties from and against any and all claims, liabilities, damages, losses, or expenses (including reasonable attorneys' fees and costs) arising out of or in connection with your use of the System, including, without limitation, any actions taken by the System on your behalf, any failure to properly supervise or monitor the System, and any resulting harm or unintended consequences.
"""
""" # noqa: E501
return legal_text

View File

@@ -27,7 +27,9 @@ def command(
aliases: list[str] = [],
available: Literal[True] | Callable[[BaseAgent], bool] = True,
) -> Callable[[Callable[P, CO]], Callable[P, CO]]:
"""The command decorator is used to create Command objects from ordinary functions."""
"""
The command decorator is used to create Command objects from ordinary functions.
"""
def decorator(func: Callable[P, CO]) -> Callable[P, CO]:
typed_parameters = [

View File

@@ -23,7 +23,8 @@ def sanitize_path_arg(
arg_index = list(func.__annotations__.keys()).index(arg_name)
except ValueError:
raise TypeError(
f"Sanitized parameter '{arg_name}' absent or not annotated on function '{func.__name__}'"
f"Sanitized parameter '{arg_name}' absent or not annotated"
f" on function '{func.__name__}'"
)
# Get position of agent parameter, in case it is passed as a positional argument
@@ -31,7 +32,8 @@ def sanitize_path_arg(
agent_arg_index = list(func.__annotations__.keys()).index("agent")
except ValueError:
raise TypeError(
f"Parameter 'agent' absent or not annotated on function '{func.__name__}'"
f"Parameter 'agent' absent or not annotated"
f" on function '{func.__name__}'"
)
@functools.wraps(func)

View File

@@ -1,8 +1,5 @@
"""Commands to execute code"""
COMMAND_CATEGORY = "execute_code"
COMMAND_CATEGORY_TITLE = "Execute Code"
import logging
import os
import subprocess
@@ -26,6 +23,10 @@ from autogpt.core.utils.json_schema import JSONSchema
from .decorators import sanitize_path_arg
COMMAND_CATEGORY = "execute_code"
COMMAND_CATEGORY_TITLE = "Execute Code"
logger = logging.getLogger(__name__)
ALLOWLIST_CONTROL = "allowlist"
@@ -45,15 +46,18 @@ DENYLIST_CONTROL = "denylist"
},
)
def execute_python_code(code: str, agent: Agent) -> str:
"""Create and execute a Python file in a Docker container and return the STDOUT of the
executed code. If there is any data that needs to be captured use a print statement
"""
Create and execute a Python file in a Docker container and return the STDOUT of the
executed code.
If the code generates any data that needs to be captured, use a print statement.
Args:
code (str): The Python code to run
name (str): A name to be given to the Python file
code (str): The Python code to run.
agent (Agent): The Agent executing the command.
Returns:
str: The STDOUT captured from the code when it ran
str: The STDOUT captured from the code when it ran.
"""
tmp_code_file = NamedTemporaryFile(
@@ -63,7 +67,7 @@ def execute_python_code(code: str, agent: Agent) -> str:
tmp_code_file.flush()
try:
return execute_python_file(tmp_code_file.name, agent)
return execute_python_file(tmp_code_file.name, agent) # type: ignore
except Exception as e:
raise CommandExecutionError(*e.args)
finally:
@@ -102,7 +106,8 @@ def execute_python_file(
str: The output of the file
"""
logger.info(
f"Executing python file '{filename}' in working directory '{agent.workspace.root}'"
f"Executing python file '{filename}' "
f"in working directory '{agent.workspace.root}'"
)
if isinstance(args, str):
@@ -113,14 +118,16 @@ def execute_python_file(
file_path = filename
if not file_path.is_file():
# Mimic the response that you get from the command line so that it's easier to identify
# Mimic the response that you get from the command line to make it
# intuitively understandable for the LLM
raise FileNotFoundError(
f"python: can't open file '{filename}': [Errno 2] No such file or directory"
)
if we_are_running_in_a_docker_container():
logger.debug(
f"AutoGPT is running in a Docker container; executing {file_path} directly..."
"AutoGPT is running in a Docker container; "
f"executing {file_path} directly..."
)
result = subprocess.run(
["python", "-B", str(file_path)] + args,
@@ -145,14 +152,17 @@ def execute_python_file(
container_is_fresh = False
container_name = f"{agent.state.agent_id}_sandbox"
try:
container: DockerContainer = client.containers.get(container_name) # type: ignore
container: DockerContainer = client.containers.get(
container_name
) # type: ignore
except NotFound:
try:
client.images.get(image_name)
logger.debug(f"Image '{image_name}' found locally")
except ImageNotFound:
logger.info(
f"Image '{image_name}' not found locally, pulling from Docker Hub..."
f"Image '{image_name}' not found locally,"
" pulling from Docker Hub..."
)
# Use the low-level API to stream the pull response
low_level_client = docker.APIClient()
@@ -207,7 +217,9 @@ def execute_python_file(
except DockerException as e:
logger.warn(
"Could not run the script in a container. If you haven't already, please install Docker https://docs.docker.com/get-docker/"
"Could not run the script in a container. "
"If you haven't already, please install Docker: "
"https://docs.docker.com/get-docker/"
)
raise CommandExecutionError(f"Could not run the script in a container: {e}")

View File

@@ -2,16 +2,10 @@
from __future__ import annotations
COMMAND_CATEGORY = "file_operations"
COMMAND_CATEGORY_TITLE = "File Operations"
import contextlib
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from autogpt.agents import Agent, BaseAgent
from autogpt.agents.features.context import ContextMixin, get_agent_context
from autogpt.agents.utils.exceptions import (
CommandExecutionError,
@@ -23,6 +17,13 @@ from autogpt.models.context_item import FileContextItem, FolderContextItem
from .decorators import sanitize_path_arg
COMMAND_CATEGORY = "file_operations"
COMMAND_CATEGORY_TITLE = "File Operations"
if TYPE_CHECKING:
from autogpt.agents import Agent, BaseAgent
def agent_implements_context(agent: BaseAgent) -> bool:
return isinstance(agent, ContextMixin)
@@ -30,8 +31,9 @@ def agent_implements_context(agent: BaseAgent) -> bool:
@command(
"open_file",
"Open a file for editing or continued viewing; create it if it does not exist yet."
" Note: if you only need to read or write a file once, use `write_to_file` instead.",
"Opens a file for editing or continued viewing;"
" creates it if it does not exist yet. "
"Note: If you only need to read or write a file once, use `write_to_file` instead.",
{
"file_path": JSONSchema(
type=JSONSchema.Type.STRING,
@@ -76,7 +78,8 @@ def open_file(file_path: Path, agent: Agent) -> tuple[str, FileContextItem]:
raise DuplicateOperationError(f"The file {file_path} is already open")
return (
f"File {file_path}{' created,' if created else ''} has been opened and added to the context ✅",
f"File {file_path}{' created,' if created else ''} has been opened"
" and added to the context ✅",
file,
)

View File

@@ -2,9 +2,6 @@
from __future__ import annotations
COMMAND_CATEGORY = "file_operations"
COMMAND_CATEGORY_TITLE = "File Operations"
import contextlib
import hashlib
import logging
@@ -20,9 +17,14 @@ from autogpt.core.utils.json_schema import JSONSchema
from autogpt.memory.vector import MemoryItem, VectorMemory
from .decorators import sanitize_path_arg
from .file_context import open_file, open_folder # NOQA
from .file_operations_utils import read_textual_file
COMMAND_CATEGORY = "file_operations"
COMMAND_CATEGORY_TITLE = "File Operations"
from .file_context import open_file, open_folder # NOQA
logger = logging.getLogger(__name__)
Operation = Literal["write", "append", "delete"]

View File

@@ -1,8 +1,5 @@
"""Commands to perform Git operations"""
COMMAND_CATEGORY = "git_operations"
COMMAND_CATEGORY_TITLE = "Git Operations"
from pathlib import Path
from git.repo import Repo
@@ -15,6 +12,9 @@ from autogpt.url_utils.validators import validate_url
from .decorators import sanitize_path_arg
COMMAND_CATEGORY = "git_operations"
COMMAND_CATEGORY_TITLE = "Git Operations"
@command(
"clone_repository",
@@ -47,7 +47,7 @@ def clone_repository(url: str, clone_path: Path, agent: Agent) -> str:
str: The result of the clone operation.
"""
split_url = url.split("//")
auth_repo_url = f"//{agent.legacy_config.github_username}:{agent.legacy_config.github_api_key}@".join(
auth_repo_url = f"//{agent.legacy_config.github_username}:{agent.legacy_config.github_api_key}@".join( # noqa: E501
split_url
)
try:

View File

@@ -1,14 +1,12 @@
"""Commands to generate images based on text input"""
COMMAND_CATEGORY = "text_to_image"
COMMAND_CATEGORY_TITLE = "Text to Image"
import io
import json
import logging
import time
import uuid
from base64 import b64decode
from pathlib import Path
import openai
import requests
@@ -18,6 +16,10 @@ from autogpt.agents.agent import Agent
from autogpt.command_decorator import command
from autogpt.core.utils.json_schema import JSONSchema
COMMAND_CATEGORY = "text_to_image"
COMMAND_CATEGORY_TITLE = "Text to Image"
logger = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ def generate_image(prompt: str, agent: Agent, size: int = 256) -> str:
Args:
prompt (str): The prompt to use
size (int, optional): The size of the image. Defaults to 256. (Not supported by HuggingFace)
size (int, optional): The size of the image. Defaults to 256.
Not supported by HuggingFace.
Returns:
str: The filename of the image
@@ -58,17 +61,17 @@ def generate_image(prompt: str, agent: Agent, size: int = 256) -> str:
return "No Image Provider Set"
def generate_image_with_hf(prompt: str, filename: str, agent: Agent) -> str:
def generate_image_with_hf(prompt: str, output_file: Path, agent: Agent) -> str:
"""Generate an image with HuggingFace's API.
Args:
prompt (str): The prompt to use
filename (str): The filename to save the image to
filename (Path): The filename to save the image to
Returns:
str: The filename of the image
"""
API_URL = f"https://api-inference.huggingface.co/models/{agent.legacy_config.huggingface_image_model}"
API_URL = f"https://api-inference.huggingface.co/models/{agent.legacy_config.huggingface_image_model}" # noqa: E501
if agent.legacy_config.huggingface_api_token is None:
raise ValueError(
"You need to set your Hugging Face API token in the config file."
@@ -92,8 +95,8 @@ def generate_image_with_hf(prompt: str, filename: str, agent: Agent) -> str:
try:
image = Image.open(io.BytesIO(response.content))
logger.info(f"Image Generated for prompt:{prompt}")
image.save(filename)
return f"Saved to disk:{filename}"
image.save(output_file)
return f"Saved to disk: {output_file}"
except Exception as e:
logger.error(e)
break
@@ -113,17 +116,17 @@ def generate_image_with_hf(prompt: str, filename: str, agent: Agent) -> str:
retry_count += 1
return f"Error creating image."
return "Error creating image."
def generate_image_with_dalle(
prompt: str, filename: str, size: int, agent: Agent
prompt: str, output_file: Path, size: int, agent: Agent
) -> str:
"""Generate an image with DALL-E.
Args:
prompt (str): The prompt to use
filename (str): The filename to save the image to
filename (Path): The filename to save the image to
size (int): The size of the image
Returns:
@@ -134,7 +137,8 @@ def generate_image_with_dalle(
if size not in [256, 512, 1024]:
closest = min([256, 512, 1024], key=lambda x: abs(x - size))
logger.info(
f"DALL-E only supports image sizes of 256x256, 512x512, or 1024x1024. Setting to {closest}, was {size}."
"DALL-E only supports image sizes of 256x256, 512x512, or 1024x1024. "
f"Setting to {closest}, was {size}."
)
size = closest
@@ -150,15 +154,15 @@ def generate_image_with_dalle(
image_data = b64decode(response["data"][0]["b64_json"])
with open(filename, mode="wb") as png:
with open(output_file, mode="wb") as png:
png.write(image_data)
return f"Saved to disk:{filename}"
return f"Saved to disk: {output_file}"
def generate_image_with_sd_webui(
prompt: str,
filename: str,
output_file: Path,
agent: Agent,
size: int = 512,
negative_prompt: str = "",
@@ -196,12 +200,12 @@ def generate_image_with_sd_webui(
},
)
logger.info(f"Image Generated for prompt:{prompt}")
logger.info(f"Image Generated for prompt: '{prompt}'")
# Save the image to disk
response = response.json()
b64 = b64decode(response["images"][0].split(",", 1)[0])
image = Image.open(io.BytesIO(b64))
image.save(filename)
image.save(output_file)
return f"Saved to disk:{filename}"
return f"Saved to disk: {output_file}"

View File

@@ -2,20 +2,22 @@
from __future__ import annotations
COMMAND_CATEGORY = "system"
COMMAND_CATEGORY_TITLE = "System"
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from autogpt.agents.agent import Agent
from autogpt.agents.features.context import get_agent_context
from autogpt.agents.utils.exceptions import AgentTerminated, InvalidArgumentError
from autogpt.command_decorator import command
from autogpt.core.utils.json_schema import JSONSchema
COMMAND_CATEGORY = "system"
COMMAND_CATEGORY_TITLE = "System"
if TYPE_CHECKING:
from autogpt.agents.agent import Agent
logger = logging.getLogger(__name__)

View File

@@ -2,14 +2,14 @@
from __future__ import annotations
COMMAND_CATEGORY = "user_interaction"
COMMAND_CATEGORY_TITLE = "User Interaction"
from autogpt.agents.agent import Agent
from autogpt.app.utils import clean_input
from autogpt.command_decorator import command
from autogpt.core.utils.json_schema import JSONSchema
COMMAND_CATEGORY = "user_interaction"
COMMAND_CATEGORY_TITLE = "User Interaction"
@command(
"ask_user",

View File

@@ -2,9 +2,6 @@
from __future__ import annotations
COMMAND_CATEGORY = "web_search"
COMMAND_CATEGORY_TITLE = "Web Search"
import json
import time
from itertools import islice
@@ -16,6 +13,10 @@ from autogpt.agents.utils.exceptions import ConfigurationError
from autogpt.command_decorator import command
from autogpt.core.utils.json_schema import JSONSchema
COMMAND_CATEGORY = "web_search"
COMMAND_CATEGORY_TITLE = "Web Search"
DUCKDUCKGO_MAX_ATTEMPTS = 3

View File

@@ -2,9 +2,6 @@
from __future__ import annotations
COMMAND_CATEGORY = "web_browse"
COMMAND_CATEGORY_TITLE = "Web Browsing"
import logging
import re
from pathlib import Path
@@ -33,10 +30,6 @@ from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager as EdgeDriverManager
if TYPE_CHECKING:
from autogpt.config import Config
from autogpt.agents.agent import Agent
from autogpt.agents.utils.exceptions import CommandExecutionError
from autogpt.command_decorator import command
from autogpt.core.utils.json_schema import JSONSchema
@@ -44,6 +37,15 @@ from autogpt.processing.html import extract_hyperlinks, format_hyperlinks
from autogpt.processing.text import summarize_text
from autogpt.url_utils.validators import validate_url
COMMAND_CATEGORY = "web_browse"
COMMAND_CATEGORY_TITLE = "Web Browsing"
if TYPE_CHECKING:
from autogpt.agents.agent import Agent
from autogpt.config import Config
logger = logging.getLogger(__name__)
FILE_DIR = Path(__file__).parent.parent
@@ -57,9 +59,12 @@ class BrowsingError(CommandExecutionError):
@command(
"read_webpage",
"Read a webpage, and extract specific information from it if a question is specified."
" If you are looking to extract specific information from the webpage, you should"
" specify a question.",
(
"Read a webpage, and extract specific information from it"
" if a question is specified."
" If you are looking to extract specific information from the webpage,"
" you should specify a question."
),
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
@@ -68,7 +73,9 @@ class BrowsingError(CommandExecutionError):
),
"question": JSONSchema(
type=JSONSchema.Type.STRING,
description="A question that you want to answer using the content of the webpage.",
description=(
"A question that you want to answer using the content of the webpage."
),
required=False,
),
},
@@ -124,8 +131,8 @@ async def read_webpage(url: str, agent: Agent, question: str = "") -> str:
msg = e.msg.split("\n")[0]
if "net::" in msg:
raise BrowsingError(
f"A networking error occurred while trying to load the page: "
+ re.sub(r"^unknown error: ", "", msg)
"A networking error occurred while trying to load the page: %s"
% re.sub(r"^unknown error: ", "", msg)
)
raise CommandExecutionError(msg)
finally:
@@ -198,9 +205,7 @@ def open_page_in_browser(url: str, config: Config) -> WebDriver:
}
options: BrowserOptions = options_available[config.selenium_web_browser]()
options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36"
)
options.add_argument(f"user-agent={config.user_agent}")
if config.selenium_web_browser == "firefox":
if config.selenium_headless:
@@ -214,8 +219,8 @@ def open_page_in_browser(url: str, config: Config) -> WebDriver:
service=EdgeDriverService(EdgeDriverManager().install()), options=options
)
elif config.selenium_web_browser == "safari":
# Requires a bit more setup on the users end
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari
# Requires a bit more setup on the users end.
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari # noqa: E501
driver = SafariDriver(options=options)
else:
if platform == "linux" or platform == "linux2":
@@ -282,7 +287,12 @@ async def summarize_memorize_webpage(
# memory = get_memory(agent.legacy_config)
# new_memory = MemoryItem.from_webpage(text, url, agent.legacy_config, question=question)
# new_memory = MemoryItem.from_webpage(
# content=text,
# url=url,
# config=agent.legacy_config,
# question=question,
# )
# memory.add(new_memory)
summary, _ = await summarize_text(

View File

@@ -24,7 +24,7 @@ class AIProfile(BaseModel):
def load(ai_settings_file: str | Path) -> "AIProfile":
"""
Returns class object with parameters (ai_name, ai_role, ai_goals, api_budget)
loaded from yaml file if yaml file exists, else returns class with no parameters.
loaded from yaml file if it exists, else returns class with no parameters.
Parameters:
ai_settings_file (Path): The path to the config yaml file.

View File

@@ -99,7 +99,7 @@ class Config(SystemSettings, arbitrary_types_allowed=True):
# Web browsing
selenium_web_browser: str = "chrome"
selenium_headless: bool = True
user_agent: str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"
user_agent: str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36" # noqa: E501
###################
# Plugin Settings #

View File

@@ -6,3 +6,13 @@ from autogpt.core.ability.simple import (
AbilityRegistrySettings,
SimpleAbilityRegistry,
)
__all__ = [
"Ability",
"AbilityConfiguration",
"AbilityRegistry",
"AbilityResult",
"AbilityRegistryConfiguration",
"AbilityRegistrySettings",
"SimpleAbilityRegistry",
]

View File

@@ -4,3 +4,9 @@ from autogpt.core.ability.builtins.query_language_model import QueryLanguageMode
BUILTIN_ABILITIES = {
QueryLanguageModel.name(): QueryLanguageModel,
}
__all__ = [
"BUILTIN_ABILITIES",
"CreateNewAbility",
"QueryLanguageModel",
]

View File

@@ -32,7 +32,10 @@ class CreateNewAbility(Ability):
required=True,
),
"description": JSONSchema(
description="A detailed description of the ability and its uses, including any limitations.",
description=(
"A detailed description of the ability and its uses, "
"including any limitations."
),
type=JSONSchema.Type.STRING,
required=True,
),
@@ -47,11 +50,16 @@ class CreateNewAbility(Ability):
type=JSONSchema.Type.STRING,
),
"type": JSONSchema(
description="The type of the argument. Must be a standard json schema type.",
description=(
"The type of the argument. "
"Must be a standard json schema type."
),
type=JSONSchema.Type.STRING,
),
"description": JSONSchema(
description="A detailed description of the argument and its uses.",
description=(
"A detailed description of the argument and its uses."
),
type=JSONSchema.Type.STRING,
),
},
@@ -66,15 +74,22 @@ class CreateNewAbility(Ability):
),
),
"package_requirements": JSONSchema(
description="A list of the names of the Python packages that are required to execute the ability.",
description=(
"A list of the names of the Python packages that are required to "
"execute the ability."
),
type=JSONSchema.Type.ARRAY,
items=JSONSchema(
description="The of the Python package that is required to execute the ability.",
description=(
"The of the Python package that is required to execute the ability."
),
type=JSONSchema.Type.STRING,
),
),
"code": JSONSchema(
description="The Python code that will be executed when the ability is called.",
description=(
"The Python code that will be executed when the ability is called."
),
type=JSONSchema.Type.STRING,
required=True,
),

View File

@@ -45,7 +45,10 @@ class QueryLanguageModel(Ability):
parameters: ClassVar[dict[str, JSONSchema]] = {
"query": JSONSchema(
type=JSONSchema.Type.STRING,
description="A query for a language model. A query should contain a question and any relevant context.",
description=(
"A query for a language model. "
"A query should contain a question and any relevant context."
),
)
}

View File

@@ -1,3 +1,9 @@
"""The Agent is an autonomouos entity guided by a LLM provider."""
from autogpt.core.agent.base import Agent
from autogpt.core.agent.simple import AgentSettings, SimpleAgent
__all__ = [
"Agent",
"AgentSettings",
"SimpleAgent",
]

View File

@@ -92,7 +92,9 @@ class SimpleAgent(Agent, Configurable):
),
openai_provider=PluginLocation(
storage_format=PluginStorageFormat.INSTALLED_PACKAGE,
storage_route="autogpt.core.resource.model_providers.OpenAIProvider",
storage_route=(
"autogpt.core.resource.model_providers.OpenAIProvider"
),
),
planning=PluginLocation(
storage_format=PluginStorageFormat.INSTALLED_PACKAGE,
@@ -184,7 +186,7 @@ class SimpleAgent(Agent, Configurable):
)
tasks = [Task.parse_obj(task) for task in plan.parsed_result["task_list"]]
# TODO: Should probably do a step to evaluate the quality of the generated tasks,
# TODO: Should probably do a step to evaluate the quality of the generated tasks
# and ensure that they have actionable ready and acceptance criteria
self._task_queue.extend(tasks)
@@ -235,7 +237,7 @@ class SimpleAgent(Agent, Configurable):
else:
self._logger.debug(f"Evaluating task {task} and adding relevant context.")
# TODO: Look up relevant memories (need working memory system)
# TODO: Evaluate whether there is enough information to start the task (language model call).
# TODO: Eval whether there is enough information to start the task (w/ LLM).
task.context.enough_info = True
task.context.status = TaskStatus.IN_PROGRESS
return task
@@ -248,10 +250,12 @@ class SimpleAgent(Agent, Configurable):
"""Choose the next ability to use for the task."""
self._logger.debug(f"Choosing next ability for task {task}.")
if task.context.cycle_count > self._configuration.max_task_cycle_count:
# Don't hit the LLM, just set the next action as "breakdown_task" with an appropriate reason
# Don't hit the LLM, just set the next action as "breakdown_task"
# with an appropriate reason
raise NotImplementedError
elif not task.context.enough_info:
# Don't ask the LLM, just set the next action as "breakdown_task" with an appropriate reason
# Don't ask the LLM, just set the next action as "breakdown_task"
# with an appropriate reason
raise NotImplementedError
else:
next_ability = await self._planning.determine_next_ability(
@@ -378,7 +382,8 @@ class SimpleAgent(Agent, Configurable):
def _prune_empty_dicts(d: dict) -> dict:
"""
Prune branches from a nested dictionary if the branch only contains empty dictionaries at the leaves.
Prune branches from a nested dictionary if the branch only contains empty
dictionaries at the leaves.
Args:
d: The dictionary to prune.

View File

@@ -5,3 +5,10 @@ from autogpt.core.configuration.schema import (
SystemSettings,
UserConfigurable,
)
__all__ = [
"Configurable",
"SystemConfiguration",
"SystemSettings",
"UserConfigurable",
]

View File

@@ -1,3 +1,9 @@
"""The memory subsystem manages the Agent's long-term memory."""
from autogpt.core.memory.base import Memory
from autogpt.core.memory.simple import MemorySettings, SimpleMemory
__all__ = [
"Memory",
"MemorySettings",
"SimpleMemory",
]

View File

@@ -1,3 +1,11 @@
"""The planning system organizes the Agent's activities."""
from autogpt.core.planning.schema import Task, TaskStatus, TaskType
from autogpt.core.planning.simple import PlannerSettings, SimplePlanner
__all__ = [
"PlannerSettings",
"SimplePlanner",
"Task",
"TaskStatus",
"TaskType",
]

View File

@@ -1,5 +1,8 @@
# class Planner(abc.ABC):
# """Manages the agent's planning and goal-setting by constructing language model prompts."""
# """
# Manages the agent's planning and goal-setting
# by constructing language model prompts.
# """
#
# @staticmethod
# @abc.abstractmethod

View File

@@ -1,3 +1,12 @@
from .initial_plan import InitialPlan, InitialPlanConfiguration
from .name_and_goals import NameAndGoals, NameAndGoalsConfiguration
from .next_ability import NextAbility, NextAbilityConfiguration
__all__ = [
"InitialPlan",
"InitialPlanConfiguration",
"NameAndGoals",
"NameAndGoalsConfiguration",
"NextAbility",
"NextAbilityConfiguration",
]

View File

@@ -25,13 +25,18 @@ class InitialPlanConfiguration(SystemConfiguration):
class InitialPlan(PromptStrategy):
DEFAULT_SYSTEM_PROMPT_TEMPLATE = (
"You are an expert project planner. You're responsibility is to create work plans for autonomous agents. "
"You will be given a name, a role, set of goals for the agent to accomplish. Your job is to "
"break down those goals into a set of tasks that the agent can accomplish to achieve those goals. "
"Agents are resourceful, but require clear instructions. Each task you create should have clearly defined "
"`ready_criteria` that the agent can check to see if the task is ready to be started. Each task should "
"also have clearly defined `acceptance_criteria` that the agent can check to evaluate if the task is complete. "
"You should create as many tasks as you think is necessary to accomplish the goals.\n\n"
"You are an expert project planner. "
"Your responsibility is to create work plans for autonomous agents. "
"You will be given a name, a role, set of goals for the agent to accomplish. "
"Your job is to break down those goals into a set of tasks that the agent can"
" accomplish to achieve those goals. "
"Agents are resourceful, but require clear instructions."
" Each task you create should have clearly defined `ready_criteria` that the"
" agent can check to see if the task is ready to be started."
" Each task should also have clearly defined `acceptance_criteria` that the"
" agent can check to evaluate if the task is complete. "
"You should create as many tasks as you think is necessary to accomplish"
" the goals.\n\n"
"System Info:\n{system_info}"
)
@@ -47,7 +52,9 @@ class InitialPlan(PromptStrategy):
DEFAULT_CREATE_PLAN_FUNCTION = CompletionModelFunction(
name="create_initial_agent_plan",
description="Creates a set of tasks that forms the initial plan for an autonomous agent.",
description=(
"Creates a set of tasks that forms the initial plan of an autonomous agent."
),
parameters={
"task_list": JSONSchema(
type=JSONSchema.Type.ARRAY,
@@ -56,7 +63,10 @@ class InitialPlan(PromptStrategy):
properties={
"objective": JSONSchema(
type=JSONSchema.Type.STRING,
description="An imperative verb phrase that succinctly describes the task.",
description=(
"An imperative verb phrase that succinctly describes "
"the task."
),
),
"type": JSONSchema(
type=JSONSchema.Type.STRING,
@@ -67,12 +77,19 @@ class InitialPlan(PromptStrategy):
type=JSONSchema.Type.ARRAY,
items=JSONSchema(
type=JSONSchema.Type.STRING,
description="A list of measurable and testable criteria that must be met for the task to be considered complete.",
description=(
"A list of measurable and testable criteria that "
"must be met for the task to be considered "
"complete."
),
),
),
"priority": JSONSchema(
type=JSONSchema.Type.INTEGER,
description="A number between 1 and 10 indicating the priority of the task relative to other generated tasks.",
description=(
"A number between 1 and 10 indicating the priority of "
"the task relative to other generated tasks."
),
minimum=1,
maximum=10,
),
@@ -80,7 +97,10 @@ class InitialPlan(PromptStrategy):
type=JSONSchema.Type.ARRAY,
items=JSONSchema(
type=JSONSchema.Type.STRING,
description="A list of measurable and testable criteria that must be met before the task can be started.",
description=(
"A list of measurable and testable criteria that "
"must be met before the task can be started."
),
),
),
},

View File

@@ -61,7 +61,9 @@ class NameAndGoals(PromptStrategy):
),
"agent_role": JSONSchema(
type=JSONSchema.Type.STRING,
description="An informative one sentence description of what the AI agent does",
description=(
"An informative one sentence description of what the AI agent does"
),
),
"agent_goals": JSONSchema(
type=JSONSchema.Type.ARRAY,
@@ -71,8 +73,9 @@ class NameAndGoals(PromptStrategy):
type=JSONSchema.Type.STRING,
),
description=(
"One to five highly effective goals that are optimally aligned with the completion of a "
"specific task. The number and complexity of the goals should correspond to the "
"One to five highly effective goals that are optimally aligned "
"with the completion of a specific task. "
"The number and complexity of the goals should correspond to the "
"complexity of the agent's primary objective."
),
),

View File

@@ -41,25 +41,36 @@ class NextAbility(PromptStrategy):
"{additional_info}\n\n"
"Additionally, you should consider the following:\n"
"{user_input}\n\n"
"Your task of {task_objective} is complete when the following acceptance criteria have been met:\n"
"Your task of {task_objective} is complete when the following acceptance"
" criteria have been met:\n"
"{acceptance_criteria}\n\n"
"Please choose one of the provided functions to accomplish this task. "
"Some tasks may require multiple functions to accomplish. If that is the case, choose the function that "
"you think is most appropriate for the current situation given your progress so far."
"Some tasks may require multiple functions to accomplish. If that is the case,"
" choose the function that you think is most appropriate for the current"
" situation given your progress so far."
)
DEFAULT_ADDITIONAL_ABILITY_ARGUMENTS = {
"motivation": JSONSchema(
type=JSONSchema.Type.STRING,
description="Your justification for choosing choosing this function instead of a different one.",
description=(
"Your justification for choosing choosing this function instead of a "
"different one."
),
),
"self_criticism": JSONSchema(
type=JSONSchema.Type.STRING,
description="Thoughtful self-criticism that explains why this function may not be the best choice.",
description=(
"Thoughtful self-criticism that explains why this function may not be "
"the best choice."
),
),
"reasoning": JSONSchema(
type=JSONSchema.Type.STRING,
description="Your reasoning for choosing this function taking into account the `motivation` and weighing the `self_criticism`.",
description=(
"Your reasoning for choosing this function taking into account the "
"`motivation` and weighing the `self_criticism`."
),
),
}
@@ -124,7 +135,9 @@ class NextAbility(PromptStrategy):
template_kwargs["additional_info"] = to_numbered_list(
[memory.summary() for memory in task.context.memories]
+ [info for info in task.context.supplementary_info],
no_items_response="There is no additional information available at this time.",
no_items_response=(
"There is no additional information available at this time."
),
**template_kwargs,
)
template_kwargs["user_input"] = to_numbered_list(

View File

@@ -43,5 +43,6 @@ class Task(BaseModel):
context: TaskContext = Field(default_factory=TaskContext)
# Need to resolve the circular dependency between Task and TaskContext once both models are defined.
# Need to resolve the circular dependency between Task and TaskContext
# once both models are defined.
TaskContext.update_forward_refs()

View File

@@ -53,11 +53,17 @@ class PlannerSettings(SystemSettings):
class SimplePlanner(Configurable):
"""Manages the agent's planning and goal-setting by constructing language model prompts."""
"""
Manages the agent's planning and goal-setting
by constructing language model prompts.
"""
default_settings = PlannerSettings(
name="planner",
description="Manages the agent's planning and goal-setting by constructing language model prompts.",
description=(
"Manages the agent's planning and goal-setting "
"by constructing language model prompts."
),
configuration=PlannerConfiguration(
models={
LanguageModelClassification.FAST_MODEL: LanguageModelConfiguration(

View File

@@ -13,23 +13,6 @@ USER_OBJECTIVE = (
)
ABILITIES = (
'analyze_code: Analyze Code, args: "code": "<full_code_string>"',
'execute_python_file: Execute Python File, args: "filename": "<filename>"',
'append_to_file: Append to file, args: "filename": "<filename>", "text": "<text>"',
'list_files: List Files in Directory, args: "directory": "<directory>"',
'read_file: Read a file, args: "filename": "<filename>"',
'write_to_file: Write to file, args: "filename": "<filename>", "text": "<text>"',
'google: Google Search, args: "query": "<query>"',
'improve_code: Get Improved Code, args: "suggestions": "<list_of_suggestions>", "code": "<full_code_string>"',
'browse_website: Browse Website, args: "url": "<url>", "question": "<what_you_want_to_find_on_website>"',
'write_tests: Write Tests, args: "code": "<full_code_string>", "focus": "<list_of_focus_areas>"',
'get_hyperlinks: Get hyperlinks, args: "url": "<url>"',
'get_text_summary: Get text summary, args: "url": "<url>", "question": "<question>"',
'task_complete: Task Complete (Shutdown), args: "reason": "<reason>"',
)
# Plan Prompt
# -----------

View File

@@ -1,2 +1,6 @@
"""The plugin system allows the Agent to be extended with new functionality."""
from autogpt.core.plugin.base import PluginService
__all__ = [
"PluginService",
]

View File

@@ -34,12 +34,19 @@ class PluginStorageFormat(str, enum.Enum):
INSTALLED_PACKAGE = "installed_package" # Required now, loads system defaults
WORKSPACE = "workspace" # Required now
# OPENAPI_URL = "open_api_url" # Soon (requires some tooling we don't have yet).
# Soon (requires some tooling we don't have yet).
# OPENAPI_URL = "open_api_url"
# OTHER_FILE_PATH = "other_file_path" # Maybe later (maybe now)
# GIT = "git" # Maybe later (or soon)
# PYPI = "pypi" # Maybe later
# AUTOGPT_PLUGIN_SERVICE = "autogpt_plugin_service" # Long term solution, requires design
# AUTO = "auto" # Feature for later maybe, automatically find plugin.
# Long term solution, requires design
# AUTOGPT_PLUGIN_SERVICE = "autogpt_plugin_service"
# Feature for later maybe, automatically find plugin.
# AUTO = "auto"
# Installed package example

View File

@@ -28,7 +28,8 @@ class SimplePluginService(PluginService):
)
else:
raise NotImplementedError(
f"Plugin storage format {plugin_location.storage_format} is not implemented."
"Plugin storage format %s is not implemented."
% plugin_location.storage_format
)
####################################
@@ -39,7 +40,7 @@ class SimplePluginService(PluginService):
"""Load a plugin from a file path."""
# TODO: Define an on disk storage format and implement this.
# Can pull from existing zip file loading implementation
raise NotImplemented("Loading from file path is not implemented.")
raise NotImplementedError("Loading from file path is not implemented.")
@staticmethod
def load_from_import_path(plugin_route: PluginStorageRoute) -> "PluginType":
@@ -56,7 +57,7 @@ class SimplePluginService(PluginService):
# storage locations. E.g. if we know that path_type is a file path, we can
# search the workspace for it. If it's an import path, we can check the core
# system and the auto_gpt_plugins package.
raise NotImplemented("Resolving plugin name to path is not implemented.")
raise NotImplementedError("Resolving plugin name to path is not implemented.")
#####################################
# High-level storage format loaders #

View File

@@ -14,9 +14,10 @@ def to_numbered_list(
def json_loads(json_str: str):
# TODO: this is a hack function for now. Trying to see what errors show up in testing.
# Can hopefully just replace with a call to ast.literal_eval (the function api still
# sometimes returns json strings with minor issues like trailing commas).
# TODO: this is a hack function for now. We'll see what errors show up in testing.
# Can hopefully just replace with a call to ast.literal_eval.
# Can't use json.loads because the function API still sometimes returns json strings
# with minor issues like trailing commas.
try:
json_str = json_str[json_str.index("{") : json_str.rindex("}") + 1]
return ast.literal_eval(json_str)

View File

@@ -5,3 +5,11 @@ from autogpt.core.resource.schema import (
ProviderUsage,
ResourceType,
)
__all__ = [
"ProviderBudget",
"ProviderCredentials",
"ProviderSettings",
"ProviderUsage",
"ResourceType",
]

View File

@@ -572,10 +572,12 @@ def count_openai_functions_tokens(
) -> int:
"""Returns the number of tokens taken up by a set of function definitions
Reference: https://community.openai.com/t/how-to-calculate-the-tokens-when-using-function-call/266573/18
Reference: https://community.openai.com/t/how-to-calculate-the-tokens-when-using-function-call/266573/18 # noqa: E501
"""
return count_tokens(
f"# Tools\n\n## functions\n\n{format_function_specs_as_typescript_ns(functions)}"
"# Tools\n\n"
"## functions\n\n"
f"{format_function_specs_as_typescript_ns(functions)}"
)

View File

@@ -49,7 +49,9 @@ async def client(settings_file) -> None:
if settings_file.exists():
settings = yaml.safe_load(settings_file.read_text())
# TODO: Call the API server with the settings and task, using the Python API client for agent protocol.
settings
# TODO: Call the API server with the settings and task,
# using the Python API client for agent protocol.
if __name__ == "__main__":

View File

@@ -1,5 +1,4 @@
import logging
from pathlib import Path
from agent_protocol import StepHandler, StepResult

View File

@@ -51,7 +51,7 @@ class FancyConsoleFormatter(logging.Formatter):
# Make sure `msg` is a string
if not hasattr(record, "msg"):
record.msg = ""
elif not type(record.msg) == str:
elif not type(record.msg) is str:
record.msg = str(record.msg)
# Determine default color based on error level

View File

@@ -17,9 +17,10 @@ def handle_exceptions(
This is intended to be used as a wrapper for the main function of a CLI application.
It will catch all errors and drop a user into a debugger if the error is not a
KeyboardInterrupt. If the error is a KeyboardInterrupt, it will raise the error.
If the error is not a KeyboardInterrupt, it will log the error and drop a user into a
debugger if with_debugger is True. If with_debugger is False, it will raise the error.
`KeyboardInterrupt`. If the error is a `KeyboardInterrupt`, it will raise the error.
If the error is not a `KeyboardInterrupt`, it will log the error and drop a user
into a debugger if `with_debugger` is `True`.
If `with_debugger` is `False`, it will raise the error.
Parameters
----------

View File

@@ -96,8 +96,9 @@ class JSONSchema(BaseModel):
schema (JSONSchema): The JSONSchema to validate against.
Returns:
tuple: A tuple where the first element is a boolean indicating whether the object is valid or not,
and the second element is a list of errors found in the object, or None if the object is valid.
tuple: A tuple where the first element is a boolean indicating whether the
object is valid or not, and the second element is a list of errors found
in the object, or None if the object is valid.
"""
validator = Draft7Validator(self.to_dict())

View File

@@ -1,3 +1,9 @@
"""The workspace is the central hub for the Agent's on disk resources."""
from autogpt.core.workspace.base import Workspace
from autogpt.core.workspace.simple import SimpleWorkspace, WorkspaceSettings
__all__ = [
"SimpleWorkspace",
"Workspace",
"WorkspaceSettings",
]

View File

@@ -136,7 +136,8 @@ class SimpleWorkspace(Configurable, Workspace):
if relative_path.is_absolute():
raise ValueError(
f"Attempted to access absolute path '{relative_path}' in workspace '{root}'."
f"Attempted to access absolute path '{relative_path}' "
f"in workspace '{root}'."
)
full_path = root.joinpath(relative_path).resolve()

View File

@@ -133,7 +133,8 @@ class FileWorkspace:
and not relative_path.is_relative_to(root)
):
raise ValueError(
f"Attempted to access absolute path '{relative_path}' in workspace '{root}'."
f"Attempted to access absolute path '{relative_path}' "
f"in workspace '{root}'."
)
full_path = root.joinpath(relative_path).resolve()

View File

@@ -24,7 +24,8 @@ def extract_dict_from_response(response_content: str) -> dict[str, Any]:
if match:
response_content = match.group()
# response content comes from OpenAI as a Python `str(content_dict)`, literal_eval reverses this
# Response content comes from OpenAI as a Python `str(content_dict)`.
# `literal_eval` does the reverse of `str(dict)`.
try:
return ast.literal_eval(response_content)
except BaseException as e:

View File

@@ -1,3 +1,4 @@
from .config import configure_chat_plugins, configure_logging
from .helpers import user_friendly_output
from .log_cycle import (
CURRENT_CONTEXT_FILE_NAME,
@@ -9,3 +10,17 @@ from .log_cycle import (
USER_INPUT_FILE_NAME,
LogCycleHandler,
)
__all__ = [
"configure_logging",
"configure_chat_plugins",
"user_friendly_output",
"CURRENT_CONTEXT_FILE_NAME",
"NEXT_ACTION_FILE_NAME",
"PROMPT_SUMMARY_FILE_NAME",
"PROMPT_SUPERVISOR_FEEDBACK_FILE_NAME",
"SUMMARY_FILE_NAME",
"SUPERVISOR_FEEDBACK_FILE_NAME",
"USER_INPUT_FILE_NAME",
"LogCycleHandler",
]

View File

@@ -8,7 +8,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Optional
from auto_gpt_plugin_template import AutoGPTPluginTemplate
from google.cloud.logging_v2.handlers import CloudLoggingFilter
from openai.util import logger as openai_logger
if TYPE_CHECKING:

View File

@@ -17,7 +17,7 @@ class AutoGptFormatter(FancyConsoleFormatter):
# Make sure `msg` is a string
if not hasattr(record, "msg"):
record.msg = ""
elif not type(record.msg) == str:
elif not type(record.msg) is str:
record.msg = str(record.msg)
# Strip color from the message to prevent color spoofing

View File

@@ -53,10 +53,9 @@ def print_attribute(
def request_user_double_check(additionalText: Optional[str] = None) -> None:
if not additionalText:
additionalText = (
"Please ensure you've setup and configured everything"
" correctly. Read https://github.com/Significant-Gravitas/AutoGPT/autogpts/autogpt#readme to "
"double check. You can also create a github issue or join the discord"
" and ask there!"
"Please ensure you've setup and configured everything correctly. "
"Read https://docs.agpt.co/autogpt/setup/ to double check. "
"You can also create a github issue or join the discord and ask there!"
)
user_friendly_output(

View File

@@ -1,5 +1,3 @@
import logging
from autogpt.config import Config
from .memory_item import MemoryItem, MemoryItemRelevance
@@ -41,7 +39,8 @@ supported_memory = ["json_file", "no_memory"]
def get_memory(config: Config) -> VectorMemory:
"""Returns a memory object corresponding to the memory backend specified in the config.
"""
Returns a memory object corresponding to the memory backend specified in the config.
The type of memory object returned depends on the value of the `memory_backend`
attribute in the configuration. E.g. if `memory_backend` is set to "pinecone", a
@@ -50,11 +49,11 @@ def get_memory(config: Config) -> VectorMemory:
By default, a `JSONFileMemory` object is returned.
Params:
config: A configuration object that contains information about the memory backend
to be used and other relevant parameters.
config: A configuration object that contains information about the memory
backend to be used and other relevant parameters.
Returns:
VectorMemory: an instance of a memory object based on the configuration provided.
VectorMemory: an instance of a memory object based on the configuration provided
"""
memory = None
@@ -65,8 +64,8 @@ def get_memory(config: Config) -> VectorMemory:
case "pinecone":
raise NotImplementedError(
"The Pinecone memory backend has been rendered incompatible by work on "
"the memory system, and was removed. Whether support will be added back "
"in the future is subject to discussion, feel free to pitch in: "
"the memory system, and was removed. Whether support will be added "
"back in the future is subject to discussion, feel free to pitch in: "
"https://github.com/Significant-Gravitas/AutoGPT/discussions/4280"
)
# if not PineconeMemory:
@@ -95,14 +94,14 @@ def get_memory(config: Config) -> VectorMemory:
case "weaviate":
raise NotImplementedError(
"The Weaviate memory backend has been rendered incompatible by work on "
"the memory system, and was removed. Whether support will be added back "
"in the future is subject to discussion, feel free to pitch in: "
"the memory system, and was removed. Whether support will be added "
"back in the future is subject to discussion, feel free to pitch in: "
"https://github.com/Significant-Gravitas/AutoGPT/discussions/4280"
)
# if not WeaviateMemory:
# logger.warn(
# "Error: Weaviate is not installed. Please install weaviate-client to"
# " use Weaviate as a memory backend."
# "Error: Weaviate is not installed. Please install weaviate-client"
# " to use Weaviate as a memory backend."
# )
# else:
# memory = WeaviateMemory(config)
@@ -110,14 +109,15 @@ def get_memory(config: Config) -> VectorMemory:
case "milvus":
raise NotImplementedError(
"The Milvus memory backend has been rendered incompatible by work on "
"the memory system, and was removed. Whether support will be added back "
"in the future is subject to discussion, feel free to pitch in: "
"the memory system, and was removed. Whether support will be added "
"back in the future is subject to discussion, feel free to pitch in: "
"https://github.com/Significant-Gravitas/AutoGPT/discussions/4280"
)
# if not MilvusMemory:
# logger.warn(
# "Error: pymilvus sdk is not installed."
# "Please install pymilvus to use Milvus or Zilliz Cloud as memory backend."
# "Error: pymilvus sdk is not installed, but required "
# "to use Milvus or Zilliz as memory backend. "
# "Please install pymilvus."
# )
# else:
# memory = MilvusMemory(config)
@@ -127,7 +127,8 @@ def get_memory(config: Config) -> VectorMemory:
case _:
raise ValueError(
f"Unknown memory backend '{config.memory_backend}'. Please check your config."
f"Unknown memory backend '{config.memory_backend}'."
" Please check your config."
)
if memory is None:

View File

@@ -141,7 +141,11 @@ class MemoryItem(BaseModel, arbitrary_types_allowed=True):
return MemoryItem.from_text(
text=memory_content,
source_type="agent_history",
how_to_summarize="if possible, also make clear the link between the command in the assistant's response and the command result. Do not mention the human feedback if there is none",
how_to_summarize=(
"if possible, also make clear the link between the command in the"
" assistant's response and the command result. "
"Do not mention the human feedback if there is none.",
),
)
@staticmethod
@@ -161,9 +165,10 @@ class MemoryItem(BaseModel, arbitrary_types_allowed=True):
token_length = self.llm_provider.count_tokens(
self.raw_content, Config().embedding_model
)
n_chunks = len(self.e_chunks)
return f"""
=============== MemoryItem ===============
Size: {f'{token_length} tokens in ' if calculate_length else ''}{len(self.e_chunks)} chunks
Size: {f'{token_length} tokens in ' if calculate_length else ''}{n_chunks} chunks
Metadata: {json.dumps(self.metadata, indent=2)}
---------------- SUMMARY -----------------
{self.summary}

View File

@@ -63,7 +63,7 @@ class JSONFileMemory(VectorMemoryProvider):
def discard(self, item: MemoryItem):
try:
self.remove(item)
except:
except ValueError: # item not in memory
pass
def clear(self):

View File

@@ -13,7 +13,10 @@ class Action(BaseModel):
reasoning: str
def format_call(self) -> str:
return f"{self.name}({', '.join([f'{a}={repr(v)}' for a, v in self.args.items()])})"
return (
f"{self.name}"
f"({', '.join([f'{a}={repr(v)}' for a, v in self.args.items()])})"
)
class ActionSuccessResult(BaseModel):
@@ -69,7 +72,10 @@ class ActionInterruptedByHuman(BaseModel):
status: Literal["interrupted_by_human"] = "interrupted_by_human"
def __str__(self) -> str:
return f'The user interrupted the action with the following feedback: "{self.feedback}"'
return (
'The user interrupted the action with the following feedback: "%s"'
% self.feedback
)
ActionResult = ActionSuccessResult | ActionErrorResult | ActionInterruptedByHuman

View File

@@ -62,7 +62,11 @@ class Command:
def __str__(self) -> str:
params = [
f"{param.name}: {param.spec.type.value if param.spec.required else f'Optional[{param.spec.type.value}]'}"
f"{param.name}: "
+ ("%s" if param.spec.required else "Optional[%s]") % param.spec.type.value
for param in self.parameters
]
return f"{self.name}: {self.description.rstrip('.')}. Params: ({', '.join(params)})"
return (
f"{self.name}: {self.description.rstrip('.')}. "
f"Params: ({', '.join(params)})"
)

View File

@@ -9,4 +9,9 @@ class CommandParameter:
spec: JSONSchema
def __repr__(self):
return f"CommandParameter('{self.name}', '{self.spec.type}', '{self.spec.description}', {self.spec.required})"
return "CommandParameter('%s', '%s', '%s', %s)" % (
self.name,
self.spec.type,
self.spec.description,
self.spec.required,
)

View File

@@ -116,11 +116,14 @@ class CommandRegistry:
yield cmd
# def command_specs(self) -> str:
# """Returns a technical declaration of all commands in the registry for use in a prompt"""
# """
# Returns a technical declaration of all commands in the registry,
# for use in a prompt.
# """
#
# Declaring functions or commands should be done in a model-specific way to achieve
# optimal results. For this reason, it should NOT be implemented here, but in an
# LLM provider module.
# Declaring functions or commands should be done in a model-specific way to
# achieve optimal results. For this reason, it should NOT be implemented here,
# but in an LLM provider module.
# MUST take command AVAILABILITY into account.
@staticmethod
@@ -128,7 +131,8 @@ class CommandRegistry:
new_registry = CommandRegistry()
logger.debug(
f"The following command categories are disabled: {config.disabled_command_categories}"
"The following command categories are disabled: "
f"{config.disabled_command_categories}"
)
enabled_command_modules = [
x for x in modules if x not in config.disabled_command_categories

View File

@@ -11,7 +11,7 @@ import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, List
from urllib.parse import urlparse
from zipimport import zipimporter
from zipimport import ZipImportError, zipimporter
import openapi_python_client
import requests
@@ -79,7 +79,8 @@ def fetch_openai_plugins_manifest_and_spec(config: Config) -> dict:
manifest = response.json()
if manifest["schema_version"] != "v1":
logger.warn(
f"Unsupported manifest version: {manifest['schem_version']} for {url}"
"Unsupported manifest version: "
f"{manifest['schem_version']} for {url}"
)
continue
if manifest["api"]["type"] != "openapi":
@@ -230,14 +231,16 @@ def scan_plugins(config: Config) -> List[AutoGPTPluginTemplate]:
try:
__import__(qualified_module_name)
except:
except ImportError:
logger.error(f"Failed to load {qualified_module_name}")
continue
plugin = sys.modules[qualified_module_name]
if not plugins_config.is_enabled(plugin_module_name):
logger.warn(
f"Plugin folder {plugin_module_name} found but not configured. If this is a legitimate plugin, please add it to plugins_config.yaml (key: {plugin_module_name})."
f"Plugin folder {plugin_module_name} found but not configured. "
"If this is a legitimate plugin, please add it to plugins_config.yaml "
f"(key: {plugin_module_name})."
)
continue
@@ -258,8 +261,9 @@ def scan_plugins(config: Config) -> List[AutoGPTPluginTemplate]:
zipped_package = zipimporter(str(plugin))
try:
zipped_module = zipped_package.load_module(str(module.parent))
except:
except ZipImportError:
logger.error(f"Failed to load {str(module.parent)}")
continue
for key in dir(zipped_module):
if key.startswith("__"):
@@ -279,24 +283,29 @@ def scan_plugins(config: Config) -> List[AutoGPTPluginTemplate]:
if plugin_configured and plugin_enabled:
logger.debug(
f"Loading plugin {plugin_name}. Enabled in plugins_config.yaml."
f"Loading plugin {plugin_name}. "
"Enabled in plugins_config.yaml."
)
loaded_plugins.append(a_module())
elif plugin_configured and not plugin_enabled:
logger.debug(
f"Not loading plugin {plugin_name}. Disabled in plugins_config.yaml."
f"Not loading plugin {plugin_name}. "
"Disabled in plugins_config.yaml."
)
elif not plugin_configured:
logger.warn(
f"Not loading plugin {plugin_name}. Key '{plugin_name}' was not found in plugins_config.yaml. "
f"Zipped plugins should use the class name ({plugin_name}) as the key."
f"Not loading plugin {plugin_name}. "
f"No entry for '{plugin_name}' in plugins_config.yaml. "
"Note: Zipped plugins should use the class name "
f"({plugin_name}) as the key."
)
else:
if (
module_name := getattr(a_module, "__name__", str(a_module))
) != "AutoGPTPluginTemplate":
logger.debug(
f"Skipping '{module_name}' because it doesn't subclass AutoGPTPluginTemplate."
f"Skipping '{module_name}' because it doesn't subclass "
"AutoGPTPluginTemplate."
)
# OpenAI plugins
@@ -306,9 +315,8 @@ def scan_plugins(config: Config) -> List[AutoGPTPluginTemplate]:
manifests_specs_clients = initialize_openai_plugins(manifests_specs, config)
for url, openai_plugin_meta in manifests_specs_clients.items():
if not plugins_config.is_enabled(url):
logger.warn(
f"OpenAI Plugin {plugin_module_name} found but not configured"
)
plugin_name = openai_plugin_meta["manifest"]["name_for_model"]
logger.warn(f"OpenAI Plugin {plugin_name} found but not configured")
continue
plugin = BaseOpenAIPlugin(openai_plugin_meta)

View File

@@ -42,16 +42,17 @@ class PluginsConfig(BaseModel):
plugins_denylist,
plugins_allowlist,
)
if type(config_data) != dict:
if type(config_data) is not dict:
logger.error(
f"Expected plugins config to be a dict, got {type(config_data)}, continuing without plugins"
f"Expected plugins config to be a dict, got {type(config_data)}."
" Continuing without plugins."
)
return empty_config
return cls(plugins=config_data)
except BaseException as e:
logger.error(
f"Plugin config is invalid, continuing without plugins. Error: {e}"
f"Plugin config is invalid. Continuing without plugins. Error: {e}"
)
return empty_config
@@ -75,13 +76,13 @@ class PluginsConfig(BaseModel):
plugins = {}
for name, plugin in plugins_config.items():
if type(plugin) == dict:
if type(plugin) is dict:
plugins[name] = PluginConfig(
name=name,
enabled=plugin.get("enabled", False),
config=plugin.get("config", {}),
)
elif type(plugin) == PluginConfig:
elif isinstance(plugin, PluginConfig):
plugins[name] = plugin
else:
raise ValueError(f"Invalid plugin config data type: {type(plugin)}")
@@ -93,7 +94,10 @@ class PluginsConfig(BaseModel):
plugins_denylist: list[str],
plugins_allowlist: list[str],
):
"""Create an empty plugins_config.yaml file. Fill it with values from old env variables."""
"""
Create an empty plugins_config.yaml file.
Fill it with values from old env variables.
"""
base_config = {}
logger.debug(f"Legacy plugin denylist: {plugins_denylist}")

View File

@@ -21,8 +21,11 @@ T = TypeVar("T")
def batch(
sequence: list[T], max_batch_length: int, overlap: int = 0
) -> Iterator[list[T]]:
"""Batch data from iterable into slices of length N. The last batch may be shorter."""
# batched('ABCDEFG', 3) --> ABC DEF G
"""
Batch data from iterable into slices of length N. The last batch may be shorter.
Example: `batched('ABCDEFGHIJ', 3)` --> `ABC DEF GHI J`
"""
if max_batch_length < 1:
raise ValueError("n must be at least one")
for i in range(0, len(sequence), max_batch_length - overlap):
@@ -60,10 +63,13 @@ async def summarize_text(
"""Summarize text using the OpenAI API
Args:
text (str): The text to summarize
config (Config): The config object
instruction (str): Additional instruction for summarization, e.g. "focus on information related to polar bears", "omit personal information contained in the text"
question (str): Question to answer in the summary
text (str): The text to summarize.
llm_provider: LLM provider to use for summarization.
config (Config): The global application config, containing the FAST_LLM setting.
instruction (str): Additional instruction for summarization, e.g.
"focus on information related to polar bears", or
"omit personal information contained in the text".
question (str): Question to be answered by the summary.
Returns:
str: The summary of the text
@@ -80,9 +86,9 @@ async def summarize_text(
if question:
instruction = (
f'include any information that can be used to answer the question "{question}". '
"Do not directly answer the question itself"
)
'Include any information that can be used to answer the question: "%s". '
"Do not directly answer the question itself."
) % question
summarization_prompt = ChatPrompt(messages=[])
@@ -97,13 +103,12 @@ async def summarize_text(
# summarization_prompt.add("user", text)
summarization_prompt.messages.append(
ChatMessage.user(
"Write a concise summary of the following text"
f"{f'; {instruction}' if instruction is not None else ''}:"
"Write a concise summary of the following text."
f"{f' {instruction}' if instruction is not None else ''}:"
"\n\n\n"
f'LITERAL TEXT: """{text}"""'
"\n\n\n"
"CONCISE SUMMARY: The text is best summarized as"
# "Only respond with a concise summary or description of the user message."
)
)
@@ -160,14 +165,15 @@ def split_text(
tokenizer: ModelTokenizer,
with_overlap: bool = True,
) -> Iterator[tuple[str, int]]:
"""Split text into chunks of sentences, with each chunk not exceeding the maximum length
"""
Split text into chunks of sentences, with each chunk not exceeding the max length.
Args:
text (str): The text to split
for_model (str): The model to chunk for; determines tokenizer and constraints
config (Config): The config object
with_overlap (bool, optional): Whether to allow overlap between chunks
max_chunk_length (int, optional): The maximum length of a chunk
text (str): The text to split.
config (Config): Config object containing the Spacy model setting.
max_chunk_length (int, optional): The maximum length of a chunk.
tokenizer (ModelTokenizer): Tokenizer to use for determining chunk length.
with_overlap (bool, optional): Whether to allow overlap between chunks.
Yields:
str: The next chunk of text

View File

@@ -1,29 +0,0 @@
#########################Setup.py#################################
DEFAULT_SYSTEM_PROMPT_AICONFIG_AUTOMATIC = """
Your task is to devise up to 5 highly effective goals and an appropriate role-based name (_GPT) for an autonomous agent, ensuring that the goals are optimally aligned with the successful completion of its assigned task.
The user will provide the task, you will provide only the output in the exact format specified below with no explanation or conversation.
Example input:
Help me with marketing my business
Example output:
Name: CMOGPT
Description: a professional digital marketer AI that assists Solopreneurs in growing their businesses by providing world-class expertise in solving marketing problems for SaaS, content products, agencies, and more.
Goals:
- Engage in effective problem-solving, prioritization, planning, and supporting execution to address your marketing needs as your virtual Chief Marketing Officer.
- Provide specific, actionable, and concise advice to help you make informed decisions without the use of platitudes or overly wordy explanations.
- Identify and prioritize quick wins and cost-effective campaigns that maximize results with minimal time and budget investment.
- Proactively take the lead in guiding you and offering suggestions when faced with unclear information or uncertainty to ensure your marketing strategy remains on track.
"""
DEFAULT_TASK_PROMPT_AICONFIG_AUTOMATIC = (
"Task: '{{user_prompt}}'\n"
"Respond only with the output in the exact format specified in the system prompt, with no explanation or conversation.\n"
)
DEFAULT_USER_DESIRE_PROMPT = "Write a wikipedia style article about the project: https://github.com/significant-gravitas/AutoGPT" # Default prompt

View File

@@ -6,6 +6,6 @@ def format_numbered_list(items: list[Any], start_at: int = 1) -> str:
def indent(content: str, indentation: int | str = 4) -> str:
if type(indentation) == int:
if type(indentation) is int:
indentation = " " * indentation
return indentation + content.replace("\n", f"\n{indentation}") # type: ignore

View File

@@ -31,7 +31,7 @@ class VoiceBase:
voice_index (int): The index of the voice to use.
"""
text = re.sub(
r"\b(?:https?://[-\w_.]+/?\w[-\w_.]*\.(?:[-\w_.]+/?\w[-\w_.]*\.)?[a-z]+(?:/[-\w_.%]+)*\b(?!\.))",
r"\b(?:https?://[-\w_.]+/?\w[-\w_.]*\.(?:[-\w_.]+/?\w[-\w_.]*\.)?[a-z]+(?:/[-\w_.%]+)*\b(?!\.))", # noqa: E501
"",
text,
)

View File

@@ -45,7 +45,10 @@ class TextToSpeechProvider:
thread.start()
def __repr__(self):
return f"{self.__class__.__name__}(provider={self._voice_engine.__class__.__name__})"
return "{class_name}(provider={voice_engine_name})".format(
class_name=self.__class__.__name__,
voice_engine_name=self._voice_engine.__class__.__name__,
)
@staticmethod
def _get_voice_engine(config: TTSConfig) -> tuple[VoiceBase, VoiceBase]:

View File

@@ -8,12 +8,14 @@ T = TypeVar("T")
def validate_url(func: Callable[P, T]) -> Callable[P, T]:
"""The method decorator validate_url is used to validate urls for any command that requires
a url as an argument"""
"""
The method decorator validate_url is used to validate urls for any command that
requires a url as an argument.
"""
@functools.wraps(func)
def wrapper(url: str, *args, **kwargs) -> Any:
"""Check if the URL is valid using a basic check, urllib check, and local file check
"""Check if the URL is valid and not a local file accessor.
Args:
url (str): The URL to check

View File

@@ -21,7 +21,7 @@ def install_plugin_dependencies():
"""
plugins_dir = Path(os.getenv("PLUGINS_DIR", "plugins"))
logger.debug(f"Checking for dependencies in zipped plugins...")
logger.debug("Checking for dependencies in zipped plugins...")
# Install zip-based plugins
for plugin_archive in plugins_dir.glob("*.zip"):
@@ -49,7 +49,7 @@ def install_plugin_dependencies():
os.remove(extracted)
os.rmdir(os.path.join(plugins_dir, basedir))
logger.debug(f"Checking for dependencies in other plugin folders...")
logger.debug("Checking for dependencies in other plugin folders...")
# Install directory-based plugins
for requirements_file in glob(f"{plugins_dir}/*/requirements.txt"):

View File

@@ -51,7 +51,10 @@ def workspace(workspace_root: Path) -> FileWorkspace:
@pytest.fixture
def temp_plugins_config_file():
"""Create a plugins_config.yaml file in a temp directory so that it doesn't mess with existing ones"""
"""
Create a plugins_config.yaml file in a temp directory
so that it doesn't mess with existing ones.
"""
config_directory = TemporaryDirectory()
config_file = Path(config_directory.name) / "plugins_config.yaml"
with open(config_file, "w+") as f:

View File

@@ -68,7 +68,7 @@ def test_json_memory_clear(config: Config, memory_item: MemoryItem):
def test_json_memory_get(config: Config, memory_item: MemoryItem, mock_get_embedding):
index = JSONFileMemory(config)
assert (
index.get("test", config) == None
index.get("test", config) is None
), "Cannot test get() because initial index is not empty"
index.add(memory_item)

View File

@@ -66,15 +66,16 @@ def test_execute_python_code(random_code: str, random_string: str, agent: Agent)
def test_execute_python_file_invalid(agent: Agent):
with pytest.raises(InvalidArgumentError):
sut.execute_python_file("not_python", agent)
sut.execute_python_file(Path("not_python.txt"), agent)
def test_execute_python_file_not_found(agent: Agent):
with pytest.raises(
FileNotFoundError,
match=r"python: can't open file '([a-zA-Z]:)?[/\\\-\w]*notexist.py': \[Errno 2\] No such file or directory",
match=r"python: can't open file '([a-zA-Z]:)?[/\\\-\w]*notexist.py': "
r"\[Errno 2\] No such file or directory",
):
sut.execute_python_file("notexist.py", agent)
sut.execute_python_file(Path("notexist.py"), agent)
def test_execute_shell(random_string: str, agent: Agent):

View File

@@ -29,7 +29,8 @@ def test_dalle(agent: Agent, workspace, image_size, patched_api_requestor):
@pytest.mark.xfail(
reason="The image is too big to be put in a cassette for a CI pipeline. We're looking into a solution."
reason="The image is too big to be put in a cassette for a CI pipeline. "
"We're looking into a solution."
)
@pytest.mark.requires_huggingface_api_key
@pytest.mark.parametrize(
@@ -69,12 +70,14 @@ def test_sd_webui_negative_prompt(agent: Agent, workspace, image_size):
)
# Generate an image with a negative prompt
image_path = lst(gen_image(negative_prompt="horse", filename="negative.jpg"))
image_path = lst(
gen_image(negative_prompt="horse", output_file=Path("negative.jpg"))
)
with Image.open(image_path) as img:
neg_image_hash = hashlib.md5(img.tobytes()).hexdigest()
# Generate an image without a negative prompt
image_path = lst(gen_image(filename="positive.jpg"))
image_path = lst(gen_image(output_file=Path("positive.jpg")))
with Image.open(image_path) as img:
image_hash = hashlib.md5(img.tobytes()).hexdigest()
@@ -83,7 +86,7 @@ def test_sd_webui_negative_prompt(agent: Agent, workspace, image_size):
def lst(txt):
"""Extract the file path from the output of `generate_image()`"""
return Path(txt.split(":", maxsplit=1)[1].strip())
return Path(txt.split(": ", maxsplit=1)[1].strip())
def generate_and_validate(
@@ -108,7 +111,8 @@ def generate_and_validate(
@pytest.mark.parametrize(
"return_text",
[
'{"error":"Model [model] is currently loading","estimated_time": [delay]}', # Delay
# Delay
'{"error":"Model [model] is currently loading","estimated_time": [delay]}',
'{"error":"Model [model] is currently loading"}', # No delay
'{"error:}', # Bad JSON
"", # Bad Image
@@ -154,29 +158,6 @@ def test_huggingface_fail_request_with_delay(
mock_sleep.assert_not_called()
def test_huggingface_fail_request_with_delay(mocker, agent: Agent):
agent.legacy_config.huggingface_api_token = "1"
# Mock requests.post
mock_post = mocker.patch("requests.post")
mock_post.return_value.status_code = 500
mock_post.return_value.ok = False
mock_post.return_value.text = '{"error":"Model CompVis/stable-diffusion-v1-4 is currently loading","estimated_time":0}'
# Mock time.sleep
mock_sleep = mocker.patch("time.sleep")
agent.legacy_config.image_provider = "huggingface"
agent.legacy_config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
result = generate_image("astronaut riding a horse", agent, 512)
assert result == "Error creating image."
# Verify retry was called with delay.
mock_sleep.assert_called_with(0)
def test_huggingface_fail_request_no_delay(mocker, agent: Agent):
agent.legacy_config.huggingface_api_token = "1"
@@ -245,7 +226,7 @@ def test_huggingface_fail_missing_api_token(mocker, agent: Agent):
agent.legacy_config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
# Mock requests.post to raise ValueError
mock_post = mocker.patch("requests.post", side_effect=ValueError)
mocker.patch("requests.post", side_effect=ValueError)
# Verify request raises an error.
with pytest.raises(ValueError):

View File

@@ -1,16 +1,29 @@
from autogpt.command_decorator import command
from autogpt.core.utils.json_schema import JSONSchema
COMMAND_CATEGORY = "mock"
@command(
"function_based",
"function_based_cmd",
"Function-based test command",
{
"arg1": {"type": "int", "description": "arg 1", "required": True},
"arg2": {"type": "str", "description": "arg 2", "required": True},
"arg1": JSONSchema(
type=JSONSchema.Type.INTEGER,
description="arg 1",
required=True,
),
"arg2": JSONSchema(
type=JSONSchema.Type.STRING,
description="arg 2",
required=True,
),
},
)
def function_based(arg1: int, arg2: str) -> str:
"""A function-based test command that returns a string with the two arguments separated by a dash."""
def function_based_cmd(arg1: int, arg2: str) -> str:
"""A function-based test command.
Returns:
str: the two arguments separated by a dash.
"""
return f"{arg1} - {arg2}"

View File

@@ -1,113 +0,0 @@
import pytest
from autogpt.json_utils.json_fix_llm import fix_and_parse_json
def test_valid_json():
"""Test that a valid JSON string is parsed correctly."""
json_str = '{"name": "John", "age": 30, "city": "New York"}'
obj = fix_and_parse_json(json_str)
assert obj == {"name": "John", "age": 30, "city": "New York"}
def test_invalid_json_minor():
"""Test that an invalid JSON string can be fixed with gpt."""
json_str = '{"name": "John", "age": 30, "city": "New York",}'
assert fix_and_parse_json(json_str, try_to_fix_with_gpt=False) == {
"name": "John",
"age": 30,
"city": "New York",
}
def test_invalid_json_major_with_gpt():
"""Test that an invalid JSON string raises an error when try_to_fix_with_gpt is False."""
json_str = 'BEGIN: "name": "John" - "age": 30 - "city": "New York" :END'
assert fix_and_parse_json(json_str, try_to_fix_with_gpt=True) == {
"name": "John",
"age": 30,
"city": "New York",
}
def test_invalid_json_major_without_gpt():
"""Test that a REALLY invalid JSON string raises an error when try_to_fix_with_gpt is False."""
json_str = 'BEGIN: "name": "John" - "age": 30 - "city": "New York" :END'
# Assert that this raises an exception:
with pytest.raises(Exception):
fix_and_parse_json(json_str, try_to_fix_with_gpt=False)
def test_invalid_json_leading_sentence_with_gpt():
"""Test that a REALLY invalid JSON string raises an error when try_to_fix_with_gpt is False."""
json_str = """I suggest we start by browsing the repository to find any issues that we can fix.
{
"command": {
"name": "browse_website",
"args":{
"url": "https://github.com/Significant-Gravitas/AutoGPT"
}
},
"thoughts":
{
"text": "I suggest we start browsing the repository to find any issues that we can fix.",
"reasoning": "Browsing the repository will give us an idea of the current state of the codebase and identify any issues that we can address to improve the repo.",
"plan": "- Look through the repository to find any issues.\n- Investigate any issues to determine what needs to be fixed\n- Identify possible solutions to fix the issues\n- Open Pull Requests with fixes",
"criticism": "I should be careful while browsing so as not to accidentally introduce any new bugs or issues.",
"speak": "I will start browsing the repository to find any issues we can fix."
}
}"""
good_obj = {
"command": {
"name": "browse_website",
"args": {"url": "https://github.com/Significant-Gravitas/AutoGPT"},
},
"thoughts": {
"text": "I suggest we start browsing the repository to find any issues that we can fix.",
"reasoning": "Browsing the repository will give us an idea of the current state of the codebase and identify any issues that we can address to improve the repo.",
"plan": "- Look through the repository to find any issues.\n- Investigate any issues to determine what needs to be fixed\n- Identify possible solutions to fix the issues\n- Open Pull Requests with fixes",
"criticism": "I should be careful while browsing so as not to accidentally introduce any new bugs or issues.",
"speak": "I will start browsing the repository to find any issues we can fix.",
},
}
# Assert that this raises an exception:
assert fix_and_parse_json(json_str, try_to_fix_with_gpt=False) == good_obj
def test_invalid_json_leading_sentence_with_gpt(self):
"""Test that a REALLY invalid JSON string raises an error when try_to_fix_with_gpt is False."""
json_str = """I will first need to browse the repository (https://github.com/Significant-Gravitas/AutoGPT) and identify any potential bugs that need fixing. I will use the "browse_website" command for this.
{
"command": {
"name": "browse_website",
"args":{
"url": "https://github.com/Significant-Gravitas/AutoGPT"
}
},
"thoughts":
{
"text": "Browsing the repository to identify potential bugs",
"reasoning": "Before fixing bugs, I need to identify what needs fixing. I will use the 'browse_website' command to analyze the repository.",
"plan": "- Analyze the repository for potential bugs and areas of improvement",
"criticism": "I need to ensure I am thorough and pay attention to detail while browsing the repository.",
"speak": "I am browsing the repository to identify potential bugs."
}
}"""
good_obj = {
"command": {
"name": "browse_website",
"args": {"url": "https://github.com/Significant-Gravitas/AutoGPT"},
},
"thoughts": {
"text": "Browsing the repository to identify potential bugs",
"reasoning": "Before fixing bugs, I need to identify what needs fixing. I will use the 'browse_website' command to analyze the repository.",
"plan": "- Analyze the repository for potential bugs and areas of improvement",
"criticism": "I need to ensure I am thorough and pay attention to detail while browsing the repository.",
"speak": "I am browsing the repository to identify potential bugs.",
},
}
assert fix_and_parse_json(json_str, try_to_fix_with_gpt=False) == good_obj

View File

@@ -73,7 +73,7 @@ def test_dummy_plugin_default_methods(dummy_plugin):
assert isinstance(pre_command, tuple)
assert len(pre_command) == 2
assert pre_command[0] == "evolve"
assert pre_command[1]["continuously"] == True
assert pre_command[1]["continuously"] is True
post_command = dummy_plugin.post_command("evolve", "upgraded successfully!")
assert isinstance(post_command, str)
assert post_command == "upgraded successfully!"

View File

@@ -201,10 +201,11 @@ def test_import_mock_commands_module():
registry.import_command_module(mock_commands_module)
assert "function_based" in registry
assert registry.commands["function_based"].name == "function_based"
assert "function_based_cmd" in registry
assert registry.commands["function_based_cmd"].name == "function_based_cmd"
assert (
registry.commands["function_based"].description == "Function-based test command"
registry.commands["function_based_cmd"].description
== "Function-based test command"
)
@@ -230,8 +231,9 @@ def test_import_temp_command_file_module(tmp_path: Path):
# Remove the temp directory from sys.path
sys.path.remove(str(tmp_path))
assert "function_based" in registry
assert registry.commands["function_based"].name == "function_based"
assert "function_based_cmd" in registry
assert registry.commands["function_based_cmd"].name == "function_based_cmd"
assert (
registry.commands["function_based"].description == "Function-based test command"
registry.commands["function_based_cmd"].description
== "Function-based test command"
)

View File

@@ -1,7 +1,3 @@
"""
This set of unit tests is designed to test the file operations that autoGPT has access to.
"""
import hashlib
import os
import re
@@ -67,7 +63,7 @@ def test_file_with_content_path(test_file: TextIOWrapper, file_content, agent: A
test_file.write(file_content)
test_file.close()
file_ops.log_operation(
"write", test_file.name, agent, file_ops.text_checksum(file_content)
"write", Path(test_file.name), agent, file_ops.text_checksum(file_content)
)
return Path(test_file.name)
@@ -136,42 +132,46 @@ def test_is_duplicate_operation(agent: Agent, mocker: MockerFixture):
# Test cases with write operations
assert (
file_ops.is_duplicate_operation(
"write", "path/to/file1.txt", agent, "checksum1"
"write", Path("path/to/file1.txt"), agent, "checksum1"
)
is True
)
assert (
file_ops.is_duplicate_operation(
"write", "path/to/file1.txt", agent, "checksum2"
"write", Path("path/to/file1.txt"), agent, "checksum2"
)
is False
)
assert (
file_ops.is_duplicate_operation(
"write", "path/to/file3.txt", agent, "checksum3"
"write", Path("path/to/file3.txt"), agent, "checksum3"
)
is False
)
# Test cases with append operations
assert (
file_ops.is_duplicate_operation(
"append", "path/to/file1.txt", agent, "checksum1"
"append", Path("path/to/file1.txt"), agent, "checksum1"
)
is False
)
# Test cases with delete operations
assert (
file_ops.is_duplicate_operation("delete", "path/to/file1.txt", agent) is False
file_ops.is_duplicate_operation("delete", Path("path/to/file1.txt"), agent)
is False
)
assert (
file_ops.is_duplicate_operation("delete", Path("path/to/file3.txt"), agent)
is True
)
assert file_ops.is_duplicate_operation("delete", "path/to/file3.txt", agent) is True
# Test logging a file operation
def test_log_operation(agent: Agent):
file_ops.log_operation("log_test", "path/to/test", agent=agent)
file_ops.log_operation("log_test", Path("path/to/test"), agent=agent)
with open(agent.file_manager.file_ops_log_path, "r", encoding="utf-8") as f:
content = f.read()
assert f"log_test: path/to/test\n" in content
assert "log_test: path/to/test\n" in content
def test_text_checksum(file_content: str):
@@ -182,10 +182,12 @@ def test_text_checksum(file_content: str):
def test_log_operation_with_checksum(agent: Agent):
file_ops.log_operation("log_test", "path/to/test", agent=agent, checksum="ABCDEF")
file_ops.log_operation(
"log_test", Path("path/to/test"), agent=agent, checksum="ABCDEF"
)
with open(agent.file_manager.file_ops_log_path, "r", encoding="utf-8") as f:
content = f.read()
assert f"log_test: path/to/test #ABCDEF\n" in content
assert "log_test: path/to/test #ABCDEF\n" in content
def test_read_file(

View File

@@ -26,7 +26,7 @@ def test_clone_auto_gpt_repository(workspace, mock_clone_from, agent: Agent):
assert clone_result == expected_output
mock_clone_from.assert_called_once_with(
url=f"{scheme}{agent.legacy_config.github_username}:{agent.legacy_config.github_api_key}@{repo}",
url=f"{scheme}{agent.legacy_config.github_username}:{agent.legacy_config.github_api_key}@{repo}", # noqa: E501
to_path=clone_path,
)

View File

@@ -7,12 +7,20 @@ from autogpt.logs.utils import remove_color_codes
"raw_text, clean_text",
[
(
"COMMAND = \x1b[36mbrowse_website\x1b[0m ARGUMENTS = \x1b[36m{'url': 'https://www.google.com', 'question': 'What is the capital of France?'}\x1b[0m",
"COMMAND = browse_website ARGUMENTS = {'url': 'https://www.google.com', 'question': 'What is the capital of France?'}",
"COMMAND = \x1b[36mbrowse_website\x1b[0m "
"ARGUMENTS = \x1b[36m{'url': 'https://www.google.com',"
" 'question': 'What is the capital of France?'}\x1b[0m",
"COMMAND = browse_website "
"ARGUMENTS = {'url': 'https://www.google.com',"
" 'question': 'What is the capital of France?'}",
),
(
"{'Schaue dir meine Projekte auf github () an, als auch meine Webseiten': 'https://github.com/Significant-Gravitas/AutoGPT, https://discord.gg/autogpt und https://twitter.com/Auto_GPT'}",
"{'Schaue dir meine Projekte auf github () an, als auch meine Webseiten': 'https://github.com/Significant-Gravitas/AutoGPT, https://discord.gg/autogpt und https://twitter.com/Auto_GPT'}",
"{'Schaue dir meine Projekte auf github () an, als auch meine Webseiten': "
"'https://github.com/Significant-Gravitas/AutoGPT,"
" https://discord.gg/autogpt und https://twitter.com/Auto_GPT'}",
"{'Schaue dir meine Projekte auf github () an, als auch meine Webseiten': "
"'https://github.com/Significant-Gravitas/AutoGPT,"
" https://discord.gg/autogpt und https://twitter.com/Auto_GPT'}",
),
("", ""),
("hello", "hello"),

View File

@@ -65,7 +65,10 @@ def test_inspect_zip_for_modules():
def test_create_base_config(config: Config):
"""Test the backwards-compatibility shim to convert old plugin allow/deny list to a config file"""
"""
Test the backwards-compatibility shim to convert old plugin allow/deny list
to a config file.
"""
config.plugins_allowlist = ["a", "b"]
config.plugins_denylist = ["c", "d"]
@@ -96,7 +99,9 @@ def test_create_base_config(config: Config):
def test_load_config(config: Config):
"""Test that the plugin config is loaded correctly from the plugins_config.yaml file"""
"""
Test that the plugin config is loaded correctly from the plugins_config.yaml file.
"""
# Create a test config and write it to disk
test_config = {
"a": {"enabled": True, "config": {"api_key": "1234"}},

View File

@@ -1,29 +1,7 @@
# Generated by CodiumAI
import time
from autogpt.app.spinner import Spinner
"""
Code Analysis
Main functionalities:
The Spinner class provides a simple way to display a spinning animation while a process is running. It can be used to indicate that a process is ongoing and to provide visual feedback to the user. The class can be used as a context manager, which means that it can be used with the 'with' statement to automatically start and stop the spinner animation.
Methods:
- __init__(self, message: str = "Loading...", delay: float = 0.1) -> None: Initializes the Spinner class with a message to display and a delay between each spinner update.
- spin(self) -> None: Spins the spinner animation while the process is running.
- __enter__(self): Starts the spinner animation when used as a context manager.
- __exit__(self, exc_type, exc_value, exc_traceback) -> None: Stops the spinner animation when used as a context manager.
- update_message(self, new_message, delay=0.1): Updates the message displayed by the spinner animation.
Fields:
- spinner: An itertools.cycle object that contains the characters used for the spinner animation.
- delay: The delay between each spinner update.
- message: The message to display.
- running: A boolean value that indicates whether the spinner animation is running.
- spinner_thread: A threading.Thread object that runs the spin method in a separate thread.
"""
ALMOST_DONE_MESSAGE = "Almost done..."
PLEASE_WAIT = "Please wait..."

View File

@@ -39,7 +39,8 @@ def mock_pdf_file():
# Write the page object
f.write(b"2 0 obj\n")
f.write(
b"<< /Type /Page /Parent 1 0 R /Resources << /Font << /F1 3 0 R >> >> /MediaBox [0 0 612 792] /Contents 4 0 R >>\n"
b"<< /Type /Page /Parent 1 0 R /Resources << /Font << /F1 3 0 R >> >> "
b"/MediaBox [0 0 612 792] /Contents 4 0 R >>\n"
)
f.write(b"endobj\n")
# Write the font object
@@ -103,7 +104,10 @@ def mock_yaml_file():
def mock_html_file():
html = BeautifulSoup(
f"<html><head><title>This is a test</title></head><body><p>{plain_text_str}</p></body></html>",
"<html>"
"<head><title>This is a test</title></head>"
f"<body><p>{plain_text_str}</p></body>"
"</html>",
"html.parser",
)
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".html") as f:
@@ -119,7 +123,12 @@ def mock_md_file():
def mock_latex_file():
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".tex") as f:
latex_str = rf"\documentclass{{article}}\begin{{document}}{plain_text_str}\end{{document}}"
latex_str = (
r"\documentclass{article}"
r"\begin{document}"
f"{plain_text_str}"
r"\end{document}"
)
f.write(latex_str)
return f.name

View File

@@ -3,33 +3,6 @@ from pytest import raises
from autogpt.url_utils.validators import validate_url
"""
Code Analysis
Objective:
The objective of the 'validate_url' function is to validate URLs for any command that requires a URL as an argument. It checks if the URL is valid using a basic check, urllib check, and local file check. If the URL fails any of the validation tests, it raises a ValueError.
Inputs:
- func: A callable function that takes in any number of arguments and returns any type of output.
Flow:
- The 'validate_url' function takes in a callable function as an argument.
- It defines a wrapper function that takes in a URL and any number of arguments and keyword arguments.
- The wrapper function first checks if the URL starts with "http://" or "https://". If not, it raises a ValueError with the message "Invalid URL format".
- It then checks if the URL is valid using the 'is_valid_url' function. If not, it raises a ValueError with the message "Missing Scheme or Network location".
- It then checks if the URL is a local file using the 'check_local_file_access' function. If it is, it raises a ValueError with the message "Access to local files is restricted".
- If the URL passes all the validation tests, it sanitizes the URL using the 'sanitize_url' function and calls the original function with the sanitized URL and any other arguments and keyword arguments.
- The wrapper function returns the result of the original function.
Outputs:
- The 'validate_url' function returns the wrapper function that takes in a URL and any number of arguments and keyword arguments and returns the result of the original function.
Additional aspects:
- The 'validate_url' function uses the 'functools.wraps' decorator to preserve the original function's metadata, such as its name, docstring, and annotations.
- The 'validate_url' function uses the 'urlparse' function from the 'urllib.parse' module to parse the URL and extract its components.
- The 'validate_url' function uses the 'urljoin' function from the 'requests.compat' module to join the sanitized URL components back into a URL string.
"""
@validate_url
def dummy_method(url):
@@ -77,89 +50,108 @@ def test_url_validation_fails_local_path(url):
dummy_method(url)
class TestValidateUrl:
# Tests that the function successfully validates a valid URL with http:// or https:// prefix.
def test_happy_path_valid_url(self):
"""Test that the function successfully validates a valid URL with http:// or https:// prefix"""
def test_happy_path_valid_url():
"""
Test that the function successfully validates a valid URL with `http://` or
`https://` prefix.
"""
@validate_url
def test_func(url):
return url
@validate_url
def test_func(url):
return url
assert test_func("https://www.google.com") == "https://www.google.com"
assert test_func("http://www.google.com") == "http://www.google.com"
assert test_func("https://www.google.com") == "https://www.google.com"
assert test_func("http://www.google.com") == "http://www.google.com"
# Tests that the function successfully validates a valid URL with additional path, parameters, and query string.
def test_general_behavior_additional_path_parameters_query_string(self):
"""Test that the function successfully validates a valid URL with additional path, parameters, and query string"""
@validate_url
def test_func(url):
return url
def test_general_behavior_additional_path_parameters_query_string():
"""
Test that the function successfully validates a valid URL with additional path,
parameters, and query string.
"""
assert (
test_func("https://www.google.com/search?q=python")
== "https://www.google.com/search?q=python"
)
@validate_url
def test_func(url):
return url
# Tests that the function raises a ValueError if the URL is missing scheme or network location.
def test_edge_case_missing_scheme_or_network_location(self):
"""Test that the function raises a ValueError if the URL is missing scheme or network location"""
assert (
test_func("https://www.google.com/search?q=python")
== "https://www.google.com/search?q=python"
)
@validate_url
def test_func(url):
return url
with pytest.raises(ValueError):
test_func("www.google.com")
def test_edge_case_missing_scheme_or_network_location():
"""
Test that the function raises a ValueError if the URL is missing scheme or
network location.
"""
# Tests that the function raises a ValueError if the URL has local file access.
def test_edge_case_local_file_access(self):
"""Test that the function raises a ValueError if the URL has local file access"""
@validate_url
def test_func(url):
return url
@validate_url
def test_func(url):
return url
with pytest.raises(ValueError):
test_func("www.google.com")
with pytest.raises(ValueError):
test_func("file:///etc/passwd")
# Tests that the function sanitizes the URL by removing any unnecessary components.
def test_general_behavior_sanitizes_url(self):
"""Test that the function sanitizes the URL by removing any unnecessary components"""
def test_edge_case_local_file_access():
"""Test that the function raises a ValueError if the URL has local file access"""
@validate_url
def test_func(url):
return url
@validate_url
def test_func(url):
return url
assert (
test_func("https://www.google.com/search?q=python#top")
== "https://www.google.com/search?q=python"
)
with pytest.raises(ValueError):
test_func("file:///etc/passwd")
# Tests that the function raises a ValueError if the URL has an invalid format (e.g. missing slashes).
def test_general_behavior_invalid_url_format(self):
"""Test that the function raises a ValueError if the URL has an invalid format (e.g. missing slashes)"""
@validate_url
def test_func(url):
return url
def test_general_behavior_sanitizes_url():
"""Test that the function sanitizes the URL by removing unnecessary components"""
with pytest.raises(ValueError):
test_func("https:www.google.com")
@validate_url
def test_func(url):
return url
# Tests that the function can handle URLs that contain unusual but valid characters.
def test_url_with_special_chars(self):
url = "https://example.com/path%20with%20spaces"
assert dummy_method(url) == url
assert (
test_func("https://www.google.com/search?q=python#top")
== "https://www.google.com/search?q=python"
)
# Tests that the function raises a ValueError if the URL is over 2000 characters.
def test_extremely_long_url(self):
url = "http://example.com/" + "a" * 2000
with raises(ValueError, match="URL is too long"):
dummy_method(url)
# Tests that the function can handle internationalized URLs, which contain non-ASCII characters.
def test_internationalized_url(self):
url = "http://例子.测试"
assert dummy_method(url) == url
def test_general_behavior_invalid_url_format():
"""
Test that the function raises a ValueError if the URL has an invalid format
(e.g. missing slashes)
"""
@validate_url
def test_func(url):
return url
with pytest.raises(ValueError):
test_func("https:www.google.com")
def test_url_with_special_chars():
"""
Tests that the function can handle URLs that contain unusual but valid characters.
"""
url = "https://example.com/path%20with%20spaces"
assert dummy_method(url) == url
def test_extremely_long_url():
"""
Tests that the function raises a ValueError if the URL is over 2000 characters.
"""
url = "http://example.com/" + "a" * 2000
with raises(ValueError, match="URL is too long"):
dummy_method(url)
def test_internationalized_url():
"""
Tests that the function can handle internationalized URLs with non-ASCII characters.
"""
url = "http://例子.测试"
assert dummy_method(url) == url

View File

@@ -18,11 +18,15 @@ from tests.utils import skip_in_ci
def valid_json_response() -> dict:
return {
"thoughts": {
"text": "My task is complete. I will use the 'task_complete' command to shut down.",
"reasoning": "I will use the 'task_complete' command because it allows me to shut down and signal that my task is complete.",
"plan": "I will use the 'task_complete' command with the reason 'Task complete: retrieved Tesla's revenue in 2022.' to shut down.",
"criticism": "I need to ensure that I have completed all necessary tasks before shutting down.",
"speak": "",
"text": "My task is complete. I will use the 'task_complete' command "
"to shut down.",
"reasoning": "I will use the 'task_complete' command because it allows me "
"to shut down and signal that my task is complete.",
"plan": "I will use the 'task_complete' command with the reason "
"'Task complete: retrieved Tesla's revenue in 2022.' to shut down.",
"criticism": "I need to ensure that I have completed all necessary tasks "
"before shutting down.",
"speak": "All done!",
},
"command": {
"name": "task_complete",
@@ -35,10 +39,14 @@ def valid_json_response() -> dict:
def invalid_json_response() -> dict:
return {
"thoughts": {
"text": "My task is complete. I will use the 'task_complete' command to shut down.",
"reasoning": "I will use the 'task_complete' command because it allows me to shut down and signal that my task is complete.",
"plan": "I will use the 'task_complete' command with the reason 'Task complete: retrieved Tesla's revenue in 2022.' to shut down.",
"criticism": "I need to ensure that I have completed all necessary tasks before shutting down.",
"text": "My task is complete. I will use the 'task_complete' command "
"to shut down.",
"reasoning": "I will use the 'task_complete' command because it allows me "
"to shut down and signal that my task is complete.",
"plan": "I will use the 'task_complete' command with the reason "
"'Task complete: retrieved Tesla's revenue in 2022.' to shut down.",
"criticism": "I need to ensure that I have completed all necessary tasks "
"before shutting down.",
"speak": "",
},
"command": {"name": "", "args": {}},
@@ -51,27 +59,32 @@ def test_validate_yaml_file_valid():
result, message = validate_yaml_file("valid_test_file.yaml")
os.remove("valid_test_file.yaml")
assert result == True
assert result is True
assert "Successfully validated" in message
def test_validate_yaml_file_not_found():
result, message = validate_yaml_file("non_existent_file.yaml")
assert result == False
assert result is False
assert "wasn't found" in message
def test_validate_yaml_file_invalid():
with open("invalid_test_file.yaml", "w") as f:
f.write(
"settings:\n first_setting: value\n second_setting: value\n nested_setting: value\n third_setting: value\nunindented_setting: value"
"settings:\n"
" first_setting: value\n"
" second_setting: value\n"
" nested_setting: value\n"
" third_setting: value\n"
"unindented_setting: value"
)
result, message = validate_yaml_file("invalid_test_file.yaml")
os.remove("invalid_test_file.yaml")
print(result)
print(message)
assert result == False
assert result is False
assert "There was an issue while trying to read" in message
@@ -85,7 +98,7 @@ def test_get_bulletin_from_web_success(mock_get):
assert expected_content in bulletin
mock_get.assert_called_with(
"https://raw.githubusercontent.com/Significant-Gravitas/AutoGPT/master/autogpts/autogpt/BULLETIN.md"
"https://raw.githubusercontent.com/Significant-Gravitas/AutoGPT/master/autogpts/autogpt/BULLETIN.md" # noqa: E501
)
@@ -121,7 +134,7 @@ def test_get_latest_bulletin_with_file():
with patch("autogpt.app.utils.get_bulletin_from_web", return_value=""):
bulletin, is_new = get_latest_bulletin()
assert expected_content in bulletin
assert is_new == False
assert is_new is False
os.remove("data/CURRENT_BULLETIN.md")
@@ -152,7 +165,7 @@ def test_get_latest_bulletin_new_bulletin_same_as_old_bulletin():
):
bulletin, is_new = get_latest_bulletin()
assert expected_content in bulletin
assert is_new == False
assert is_new is False
os.remove("data/CURRENT_BULLETIN.md")
@@ -160,8 +173,6 @@ def test_get_latest_bulletin_new_bulletin_same_as_old_bulletin():
@skip_in_ci
def test_get_current_git_branch():
branch_name = get_current_git_branch()
# Assuming that the branch name will be non-empty if the function is working correctly.
assert branch_name != ""

View File

@@ -4,6 +4,7 @@ import os
import re
from io import BytesIO
from typing import Any, Dict, List
from urllib.parse import urlparse, urlunparse
from vcr.request import Request
@@ -49,7 +50,7 @@ def freeze_request_body(json_body: str | bytes) -> bytes:
try:
body = json.loads(json_body)
except ValueError:
return json_body if type(json_body) == bytes else json_body.encode()
return json_body if type(json_body) is bytes else json_body.encode()
if "messages" not in body:
return json.dumps(body, sort_keys=True).encode()
@@ -98,9 +99,6 @@ def before_record_request(request: Request) -> Request | None:
return filtered_request_without_dynamic_data
from urllib.parse import urlparse, urlunparse
def replace_request_hostname(
request: Request, original_url: str, new_hostname: str
) -> Request: