mirror of
https://github.com/aljazceru/Auto-GPT.git
synced 2026-02-09 08:14:27 +01:00
Fix and consolidate command workspace resolution
This commit is contained in:
@@ -2,15 +2,13 @@ import requests
|
||||
import json
|
||||
|
||||
from autogpt.config import Config
|
||||
from autogpt.commands.file_operations import safe_join
|
||||
from autogpt.workspace import path_in_workspace
|
||||
|
||||
cfg = Config()
|
||||
|
||||
working_directory = "auto_gpt_workspace"
|
||||
|
||||
|
||||
def read_audio_from_file(audio_path):
|
||||
audio_path = safe_join(working_directory, audio_path)
|
||||
audio_path = path_in_workspace(audio_path)
|
||||
with open(audio_path, "rb") as audio_file:
|
||||
audio = audio_file.read()
|
||||
return read_audio(audio)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
"""Execute code in a Docker container"""
|
||||
import os
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
|
||||
import docker
|
||||
from docker.errors import ImageNotFound
|
||||
|
||||
WORKING_DIRECTORY = Path(__file__).parent.parent / "auto_gpt_workspace"
|
||||
from autogpt.workspace import path_in_workspace, WORKSPACE_PATH
|
||||
|
||||
|
||||
def execute_python_file(file: str):
|
||||
@@ -19,12 +18,12 @@ def execute_python_file(file: str):
|
||||
str: The output of the file
|
||||
"""
|
||||
|
||||
print(f"Executing file '{file}' in workspace '{WORKING_DIRECTORY}'")
|
||||
print(f"Executing file '{file}' in workspace '{WORKSPACE_PATH}'")
|
||||
|
||||
if not file.endswith(".py"):
|
||||
return "Error: Invalid file type. Only .py files are allowed."
|
||||
|
||||
file_path = os.path.join(WORKING_DIRECTORY, file)
|
||||
file_path = path_in_workspace(file)
|
||||
|
||||
if not os.path.isfile(file_path):
|
||||
return f"Error: File '{file}' does not exist."
|
||||
@@ -65,7 +64,7 @@ def execute_python_file(file: str):
|
||||
image_name,
|
||||
f"python {file}",
|
||||
volumes={
|
||||
os.path.abspath(WORKING_DIRECTORY): {
|
||||
os.path.abspath(WORKSPACE_PATH): {
|
||||
"bind": "/workspace",
|
||||
"mode": "ro",
|
||||
}
|
||||
@@ -100,9 +99,8 @@ def execute_shell(command_line: str) -> str:
|
||||
"""
|
||||
current_dir = os.getcwd()
|
||||
# Change dir into workspace if necessary
|
||||
if str(WORKING_DIRECTORY) not in current_dir:
|
||||
work_dir = os.path.join(os.getcwd(), WORKING_DIRECTORY)
|
||||
os.chdir(work_dir)
|
||||
if str(WORKSPACE_PATH) not in current_dir:
|
||||
os.chdir(WORKSPACE_PATH)
|
||||
|
||||
print(f"Executing command '{command_line}' in working directory '{os.getcwd()}'")
|
||||
|
||||
|
||||
@@ -1,19 +1,11 @@
|
||||
"""File operations for AutoGPT"""
|
||||
import os
|
||||
import os.path
|
||||
from pathlib import Path
|
||||
from autogpt.workspace import path_in_workspace, WORKSPACE_PATH
|
||||
from typing import Generator, List
|
||||
|
||||
# Set a dedicated folder for file I/O
|
||||
WORKING_DIRECTORY = Path(os.getcwd()) / "auto_gpt_workspace"
|
||||
|
||||
# Create the directory if it doesn't exist
|
||||
if not os.path.exists(WORKING_DIRECTORY):
|
||||
os.makedirs(WORKING_DIRECTORY)
|
||||
|
||||
LOG_FILE = "file_logger.txt"
|
||||
LOG_FILE_PATH = WORKING_DIRECTORY / LOG_FILE
|
||||
WORKING_DIRECTORY = str(WORKING_DIRECTORY)
|
||||
LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE
|
||||
|
||||
|
||||
def check_duplicate_operation(operation: str, filename: str) -> bool:
|
||||
@@ -48,25 +40,6 @@ def log_operation(operation: str, filename: str) -> None:
|
||||
append_to_file(LOG_FILE, log_entry, shouldLog = False)
|
||||
|
||||
|
||||
def safe_join(base: str, *paths) -> str:
|
||||
"""Join one or more path components intelligently.
|
||||
|
||||
Args:
|
||||
base (str): The base path
|
||||
*paths (str): The paths to join to the base path
|
||||
|
||||
Returns:
|
||||
str: The joined path
|
||||
"""
|
||||
new_path = os.path.join(base, *paths)
|
||||
norm_new_path = os.path.normpath(new_path)
|
||||
|
||||
if os.path.commonprefix([base, norm_new_path]) != base:
|
||||
raise ValueError("Attempted to access outside of working directory.")
|
||||
|
||||
return norm_new_path
|
||||
|
||||
|
||||
def split_file(
|
||||
content: str, max_length: int = 4000, overlap: int = 0
|
||||
) -> Generator[str, None, None]:
|
||||
@@ -104,7 +77,7 @@ def read_file(filename: str) -> str:
|
||||
str: The contents of the file
|
||||
"""
|
||||
try:
|
||||
filepath = safe_join(WORKING_DIRECTORY, filename)
|
||||
filepath = path_in_workspace(filename)
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
return content
|
||||
@@ -159,7 +132,7 @@ def write_to_file(filename: str, text: str) -> str:
|
||||
if check_duplicate_operation("write", filename):
|
||||
return "Error: File has already been updated."
|
||||
try:
|
||||
filepath = safe_join(WORKING_DIRECTORY, filename)
|
||||
filepath = path_in_workspace(filename)
|
||||
directory = os.path.dirname(filepath)
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
@@ -182,7 +155,7 @@ def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str:
|
||||
str: A message indicating success or failure
|
||||
"""
|
||||
try:
|
||||
filepath = safe_join(WORKING_DIRECTORY, filename)
|
||||
filepath = path_in_workspace(filename)
|
||||
with open(filepath, "a") as f:
|
||||
f.write(text)
|
||||
|
||||
@@ -206,7 +179,7 @@ def delete_file(filename: str) -> str:
|
||||
if check_duplicate_operation("delete", filename):
|
||||
return "Error: File has already been deleted."
|
||||
try:
|
||||
filepath = safe_join(WORKING_DIRECTORY, filename)
|
||||
filepath = path_in_workspace(filename)
|
||||
os.remove(filepath)
|
||||
log_operation("delete", filename)
|
||||
return "File deleted successfully."
|
||||
@@ -226,15 +199,15 @@ def search_files(directory: str) -> List[str]:
|
||||
found_files = []
|
||||
|
||||
if directory in {"", "/"}:
|
||||
search_directory = WORKING_DIRECTORY
|
||||
search_directory = WORKSPACE_PATH
|
||||
else:
|
||||
search_directory = safe_join(WORKING_DIRECTORY, directory)
|
||||
search_directory = path_in_workspace(directory)
|
||||
|
||||
for root, _, files in os.walk(search_directory):
|
||||
for file in files:
|
||||
if file.startswith("."):
|
||||
continue
|
||||
relative_path = os.path.relpath(os.path.join(root, file), WORKING_DIRECTORY)
|
||||
relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH)
|
||||
found_files.append(relative_path)
|
||||
|
||||
return found_files
|
||||
|
||||
@@ -7,13 +7,11 @@ from base64 import b64decode
|
||||
import openai
|
||||
import requests
|
||||
from PIL import Image
|
||||
from pathlib import Path
|
||||
from autogpt.config import Config
|
||||
from autogpt.workspace import path_in_workspace
|
||||
|
||||
CFG = Config()
|
||||
|
||||
WORKING_DIRECTORY = Path(__file__).parent.parent / "auto_gpt_workspace"
|
||||
|
||||
|
||||
def generate_image(prompt: str) -> str:
|
||||
"""Generate an image from a prompt.
|
||||
@@ -65,7 +63,7 @@ def generate_image_with_hf(prompt: str, filename: str) -> str:
|
||||
image = Image.open(io.BytesIO(response.content))
|
||||
print(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
image.save(os.path.join(WORKING_DIRECTORY, filename))
|
||||
image.save(path_in_workspace(filename))
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
@@ -93,7 +91,7 @@ def generate_image_with_dalle(prompt: str, filename: str) -> str:
|
||||
|
||||
image_data = b64decode(response["data"][0]["b64_json"])
|
||||
|
||||
with open(f"{WORKING_DIRECTORY}/{filename}", mode="wb") as png:
|
||||
with open(path_in_workspace(filename), mode="wb") as png:
|
||||
png.write(image_data)
|
||||
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
39
autogpt/workspace.py
Normal file
39
autogpt/workspace.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Set a dedicated folder for file I/O
|
||||
WORKSPACE_PATH = Path(os.getcwd()) / "auto_gpt_workspace"
|
||||
|
||||
# Create the directory if it doesn't exist
|
||||
if not os.path.exists(WORKSPACE_PATH):
|
||||
os.makedirs(WORKSPACE_PATH)
|
||||
|
||||
|
||||
def path_in_workspace(relative_path: str | Path) -> Path:
|
||||
"""Get full path for item in workspace
|
||||
|
||||
Parameters:
|
||||
relative_path (str | Path): Path to translate into the workspace
|
||||
|
||||
Returns:
|
||||
Path: Absolute path for the given path in the workspace
|
||||
"""
|
||||
return safe_path_join(WORKSPACE_PATH, relative_path)
|
||||
|
||||
|
||||
def safe_path_join(base: Path, *paths: str | Path) -> Path:
|
||||
"""Join one or more path components, asserting the resulting path is within the workspace.
|
||||
|
||||
Args:
|
||||
base (Path): The base path
|
||||
*paths (str): The paths to join to the base path
|
||||
|
||||
Returns:
|
||||
Path: The joined path
|
||||
"""
|
||||
joined_path = base.joinpath(*paths).resolve()
|
||||
|
||||
if not joined_path.is_relative_to(base):
|
||||
raise ValueError(f"Attempted to access path '{joined_path}' outside of working directory '{base}'.")
|
||||
|
||||
return joined_path
|
||||
Reference in New Issue
Block a user