feat: reduce tool entrypoints in synopsis for text editor, bash, process manager (#191)

This commit is contained in:
Salman Mohammed
2024-10-29 18:21:02 -04:00
committed by GitHub
parent 1c128e7316
commit 40da95b3e9
7 changed files with 464 additions and 248 deletions

View File

@@ -0,0 +1,49 @@
import os
from pathlib import Path
from goose.notifier import Notifier
from goose.view import ExchangeView
from goose.synopsis.system import system
from goose.utils.shell import shell
from goose.synopsis.util import log_command
class Bash:
def __init__(self, notifier: Notifier, exchange_view: ExchangeView) -> None:
self.notifier = notifier
self.exchange_view = exchange_view
def _logshell(self, command: str, title: str = "shell") -> None:
log_command(self.notifier, command, path=os.path.abspath(system.cwd), title=title)
def _source(self, path: str) -> str:
"""Source the file at path."""
source_command = f"source {path} && env"
self._logshell(f"source {path}")
result = shell(source_command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env)
env_vars = dict(line.split("=", 1) for line in result.splitlines() if "=" in line)
system.env.update(env_vars)
return f"Sourced {path}"
def _shell(self, command: str) -> str:
"""Execute any shell command."""
if command.startswith("cat"):
raise ValueError("You must read files through the text_editor tool with 'view' comamnd.")
if command.startswith("cd"):
raise ValueError("You must change dirs through the bash tool with 'working_dir' param.")
if command.startswith("source"):
raise ValueError("You must source files through the bash tool with 'source' command.")
self._logshell(command)
return shell(command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env)
def _change_dir(self, path: str) -> str:
"""Change the directory to the specified path."""
patho = system.to_patho(path)
if not patho.is_dir():
raise ValueError(f"The directory {path} does not exist")
if patho.resolve() < Path(os.getcwd()).resolve():
raise ValueError("You can cd into subdirs but not above the directory where we started.")
self._logshell(f"cd {path}")
system.cwd = str(patho)
return f"Changed directory to: {path}"

View File

@@ -0,0 +1,95 @@
import subprocess
import os
from typing import Literal, Dict
from rich.markdown import Markdown
from rich.rule import Rule
from goose.notifier import Notifier
from goose.synopsis.system import system
from goose.synopsis.util import log_command
from goose.toolkit.utils import RULEPREFIX, RULESTYLE
from goose.utils.shell import is_dangerous_command, keep_unsafe_command_prompt
ProcessManagerCommand = Literal["start", "list", "view_output", "cancel"]
class ProcessManager:
def __init__(self, notifier: Notifier) -> None:
self.notifier = notifier
# Command dispatch dictionary
self.command_dispatch = {
"start": self._start_process,
"list": self._list_processes,
"view_output": self._view_process_output,
"cancel": self._cancel_process,
}
def _logshell(self, command: str, title: str = "background") -> None:
log_command(self.notifier, command, path=os.path.abspath(system.cwd), title=title)
def _start_process(self, shell_command: str, **kwargs: dict) -> int:
"""Start a background process running the specified command."""
self._logshell(shell_command, title="background")
if is_dangerous_command(shell_command):
self.notifier.stop()
if not keep_unsafe_command_prompt(shell_command):
raise RuntimeError(f"The command {shell_command} was rejected as dangerous.")
self.notifier.start()
process = subprocess.Popen(
shell_command,
shell=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=system.cwd,
env=system.env,
)
process_id = system.add_process(process)
return process_id
def _list_processes(self, **kwargs: dict) -> Dict[int, str]:
"""List all running background processes."""
processes = system.get_processes()
process_list = "```\n" + "\n".join(f"id: {pid}, command: {cmd}" for pid, cmd in processes.items()) + "\n```"
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left"))
self.notifier.log(Markdown(process_list))
self.notifier.log("")
return processes
def _view_process_output(self, process_id: int, **kwargs: dict) -> str:
"""View the output of a running background process."""
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left"))
self.notifier.log(Markdown(f"```\nreading {process_id}\n```"))
self.notifier.log("")
output = system.view_process_output(process_id)
return output
def _cancel_process(self, process_id: int, **kwargs: dict) -> str:
"""Cancel the background process with the specified ID."""
result = system.cancel_process(process_id)
self._logshell(f"kill {process_id}")
if result:
return f"Process {process_id} cancelled"
else:
return f"No known process with ID {process_id}"
def run_command(self, command: ProcessManagerCommand, **kwargs: dict) -> str:
"""
Dispatch process management commands.
Args:
command (ProcessManagerCommand): The process management command to execute.
**kwargs: Additional arguments for the commands, such as shell_command or process_id.
Returns:
str: The result of the process management operation.
"""
if command not in self.command_dispatch:
raise ValueError(f"Unknown command '{command}'.")
return self.command_dispatch[command](**kwargs)

View File

@@ -0,0 +1,157 @@
from typing import Optional, Literal
from pathlib import Path
from rich.markdown import Markdown
from rich.rule import Rule
from goose.notifier import Notifier
from goose.synopsis.system import system
from goose.toolkit.utils import RULEPREFIX, RULESTYLE, get_language
TextEditorCommand = Literal["view", "create", "str_replace", "insert", "undo_edit"]
class TextEditor:
def __init__(self, notifier: Notifier) -> None:
self.notifier = notifier
self._file_history = {}
# Command dispatch dictionary
self.command_dispatch = {
"view": self._view_file_or_directory,
"create": self._create_file,
"str_replace": self._replace_string,
"insert": self._insert_string,
"undo_edit": self._undo_edit,
}
def _write_file(self, path: str, content: str) -> str:
"""Write content to the file at path."""
patho = system.to_patho(path)
if patho.exists() and not system.is_active(path):
raise ValueError(f"You must view {path} using read_file before you overwrite it")
self._save_file_history(patho)
patho.parent.mkdir(parents=True, exist_ok=True)
patho.write_text(content)
system.remember_file(path)
language = get_language(path)
self._log_file_operation(path, content, language)
return f"Successfully wrote to {path}"
def _patch_file(self, path: str, before: str, after: str) -> str:
"""Patch the file by replacing 'before' with 'after'."""
patho = system.to_patho(path)
if not patho.exists():
raise ValueError(f"You can't patch {path} - it does not exist yet")
if not system.is_active(path):
raise ValueError(f"You must view {path} using read_file before you patch it")
content = patho.read_text()
if content.count(before) != 1:
raise ValueError("The 'before' content must appear exactly once in the file.")
self._save_file_history(patho)
content = content.replace(before, after)
system.remember_file(path)
patho.write_text(content)
self._log_file_operation(path, f"{before} -> {after}", get_language(path))
return "Successfully replaced before with after."
def _save_file_history(self, patho: Path) -> None:
"""Save the current content of the file to history for undo functionality."""
content = patho.read_text() if patho.exists() else ""
self._file_history[str(patho)] = content
def _undo_edit(self, path: str, **kwargs: dict) -> str:
"""Undo the last edit made to a file."""
patho = system.to_patho(path)
if not patho.exists() or str(patho) not in self._file_history:
raise ValueError(f"No edit history available to undo changes on {path}.")
previous_content = self._file_history.pop(str(patho))
patho.write_text(previous_content)
system.remember_file(path)
self._log_file_operation(path, "Undo edit", get_language(path))
return f"Successfully undid the last edit on {path}"
def _view_file_or_directory(self, path: str, view_range: Optional[list[int]] = None, **kwargs: dict) -> str:
"""View the content of a file or directory."""
patho = system.to_patho(path)
if patho.is_file():
return self._view_file(patho, view_range)
elif patho.is_dir():
return self._view_directory(patho)
else:
raise ValueError(f"The path {path} does not exist.")
def _view_file(self, patho: Path, view_range: Optional[list[int]]) -> str:
if not patho.exists():
raise ValueError(f"The file {patho} does not exist.")
with open(patho, "r") as f:
content = f.readlines()
if view_range:
start_line, end_line = view_range
if start_line < 1 or end_line < start_line:
raise ValueError("Invalid view range.")
content = content[start_line - 1 : (end_line if end_line != -1 else len(content))]
system.remember_file(str(patho))
return f"Displayed content of {str(patho)}"
def _view_directory(self, patho: Path) -> str:
files = [str(p) for p in patho.iterdir()]
dir_content = "\n".join(files)
return f"The contents of directory {str(patho)}:\n{dir_content}"
def _insert_string(self, path: str, insert_line: int, new_str: str, **kwargs: dict) -> str:
"""Insert a string into the file after a specific line number."""
patho = system.to_patho(path)
if not patho.exists() or not system.is_active(path):
raise ValueError(f"You must view {path} before editing.")
self._save_file_history(patho)
with open(patho, "r") as f:
lines = f.readlines()
if insert_line < 0 or insert_line > len(lines):
raise ValueError("Insert line is out of range.")
lines.insert(insert_line, new_str + "\n")
with open(patho, "w") as f:
f.writelines(lines)
system.remember_file(path)
self._log_file_operation(path, new_str, get_language(path))
return f"Successfully inserted new_str into {path} after line {insert_line}"
def _create_file(self, path: str, file_text: str, **kwargs: dict) -> str:
"""Create a new file with the given content."""
return self._write_file(path, file_text)
def _replace_string(self, path: str, old_str: str, new_str: str, **kwargs: dict) -> str:
"""Replace a string in a file."""
return self._patch_file(path, old_str, new_str)
def _log_file_operation(self, path: str, content: str, language: Optional[str]) -> None:
"""Log the file operation in markdown format."""
md_content = f"```{language}\n{content}\n```" if language else f"```\n{content}\n```"
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + path, style=RULESTYLE, align="left"))
self.notifier.log(Markdown(md_content))
self.notifier.log("")
def run_command(self, command: TextEditorCommand, path: str, **kwargs: dict) -> str:
"""Dispatch text editing operations to the appropriate handler."""
if command not in self.command_dispatch:
raise ValueError(f"Unknown command '{command}'.")
return self.command_dispatch[command](path, **kwargs)

View File

@@ -1,19 +1,15 @@
# janky global state for now, think about it
from collections import defaultdict
import re
import subprocess
import os
from pathlib import Path
import tempfile
from typing import Dict
from typing import Dict, Optional
from exchange import Message
import httpx
from goose.synopsis.system import system
from goose.synopsis.bash import Bash
from goose.synopsis.text_editor import TextEditor, TextEditorCommand
from goose.synopsis.process_manager import ProcessManager, ProcessManagerCommand
from goose.toolkit.base import Toolkit, tool
from goose.toolkit.utils import RULEPREFIX, RULESTYLE, get_language
from goose.utils.shell import is_dangerous_command, shell, keep_unsafe_command_prompt
from rich.markdown import Markdown
from rich.rule import Rule
class SynopsisDeveloper(Toolkit):
@@ -21,230 +17,138 @@ class SynopsisDeveloper(Toolkit):
def __init__(self, *args: object, **kwargs: Dict[str, object]) -> None:
super().__init__(*args, **kwargs)
self._file_history = defaultdict(list)
def system(self) -> str:
"""Retrieve system configuration details for developer"""
system_prompt = Message.load("developer.md").text
return system_prompt
def logshell(self, command: str, title: str = "shell") -> None:
self.notifier.log("")
self.notifier.log(
Rule(RULEPREFIX + f"{title} | [dim magenta]{os.path.abspath(system.cwd)}[/]", style=RULESTYLE, align="left")
@tool
def bash(
self,
command: Optional[str] = None,
working_dir: Optional[str] = None,
source_path: Optional[str] = None,
) -> str:
"""
Run commands in a bash shell.
Perform bash-related operations in a specific order:
1. Change the working directory (if provided)
2. Source a file (if provided)
3. Run a shell command (if provided)
At least one of the parameters must be provided.
Args:
command (str, optional):The bash shell command to run.
working_dir (str, optional): The directory to change to.
source_path (str, optional): The file to source before running the command.
"""
assert any(
[command, working_dir, source_path]
), "At least one of the parameters for bash shell must be provided."
bash_tool = Bash(notifier=self.notifier, exchange_view=self.exchange_view)
outputs = []
if working_dir:
_out = bash_tool._change_dir(working_dir)
outputs.append(_out)
if source_path:
_out = bash_tool._source(source_path)
outputs.append(_out)
if command:
_out = bash_tool._shell(command)
outputs.append(_out)
return "\n".join(outputs)
@tool
def text_editor(
self,
command: TextEditorCommand,
path: str,
file_text: Optional[str] = None,
insert_line: Optional[int] = None,
new_str: Optional[str] = None,
old_str: Optional[str] = None,
view_range: Optional[list[int]] = None,
) -> str:
"""
Perform text editing operations on files.
The `command` parameter specifies the operation to perform. Allowed options are:
- `view`: View the content of a file or directory.
- `create`: Create a new file with the given content.
- `str_replace`: Replace a string in a file with a new string.
- `insert`: Insert a string into a file after a specific line number.
- `undo_edit`: Undo the last edit made to a file.
Args:
command (str): The commands to run.
Allowed options are: `view`, `create`, `str_replace`, `insert`, `undo_edit`.
path (str): Absolute path (or relative path against cwd) to file or directory,
e.g. `/repo/file.py` or `/repo` or `curr_dir_file.py`.
file_text (str, optional): Required parameter of `create` command, with the content
of the file to be created.
insert_line (int, optional): Required parameter of `insert` command.
The `new_str` will be inserted AFTER the line `insert_line` of `path`.
new_str (str, optional): Optional parameter of `str_replace` command
containing the new string (if not given, no string will be added).
Required parameter of `insert` command containing the string to insert.
old_str (str, optional): Required parameter of `str_replace` command containing the
string in `path` to replace.
view_range (list, optional): Optional parameter of `view` command when `path` points to a file.
If none is given, the full file is shown. If provided, the file will be shown in the indicated line
number range, e.g. [11, 12] will show lines 11 and 12. Indexing at 1 to start.
Setting `[start_line, -1]` shows all lines from `start_line` to the end of the file.
"""
text_editor_instance = TextEditor(notifier=self.notifier)
return text_editor_instance.run_command(
command=command,
path=path,
file_text=file_text,
insert_line=insert_line,
new_str=new_str,
old_str=old_str,
view_range=view_range,
)
self.notifier.log(Markdown(f"```bash\n{command}\n```"))
self.notifier.log("")
@tool
def source(self, path: str) -> str:
"""Source the file at path, keeping the updates reflected in future shell commands
def process_manager(
self,
command: ProcessManagerCommand,
shell_command: Optional[str] = None,
process_id: Optional[int] = None,
) -> str:
"""
Manage background processes.
The `command` parameter specifies the operation to perform. Allowed options are:
- `start`: Start a background process by running a shell command.
- `list`: List all currently running background processes with their IDs and commands.
- `view_output`: View the output of a running background process by providing its ID.
- `cancel`: Cancel a running background process by providing its ID.
Args:
path (str): The path to the file to source.
command (str): The command to run.
Allowed options are: `start`, `list`, `view_output`, `cancel`.
shell_command (str, optional): Required parameter for the `start` command, representing
the shell command to be executed in the background.
Example: `"python -m http.server &"` to start a web server in the background.
process_id (int, optional): Required parameter for `view_output` and `cancel` commands,
representing the process ID of the background process to manage.
"""
source_command = f"source {path} && env"
self.logshell(f"source {path}")
result = shell(source_command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env)
env_vars = dict(line.split("=", 1) for line in result.splitlines() if "=" in line)
system.env.update(env_vars)
return f"Sourced {path}"
@tool
def shell(self, command: str) -> str:
"""Execute any command on the shell
Args:
command (str): The shell command to run. It can support multiline statements
if you need to run more than one at a time
"""
if command.startswith("cat"):
raise ValueError("You must read files through the read_file tool.")
if command.startswith("cd"):
raise ValueError("You must change dirs through the change_dir tool.")
if command.startswith("source"):
raise ValueError("You must source files through the source tool.")
self.logshell(command)
return shell(command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env)
@tool
def read_file(self, path: str) -> str:
"""Read the content of the file at path
Args:
path (str): The destination file path, in the format "path/to/file.txt"
"""
system.remember_file(path)
self.logshell(f"cat {path}")
return f"The file content at {path} has been updated above."
@tool
def write_file(self, path: str, content: str) -> str:
"""
Write a file at the specified path with the provided content. This will create any directories if they do not exist.
The content will fully overwrite the existing file.
Args:
path (str): The destination file path, in the format "path/to/file.txt"
content (str): The raw file content.
""" # noqa: E501
patho = system.to_patho(path)
if patho.exists() and not system.is_active(path):
print(f"We are warning the LLM to view before write in write_file, with path={path} and patho={str(patho)}")
raise ValueError(f"You must view {path} using read_file before you overwrite it")
patho.parent.mkdir(parents=True, exist_ok=True)
patho.write_text(content)
system.remember_file(path)
language = get_language(path)
md = f"```{language}\n{content}\n```"
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + path, style=RULESTYLE, align="left"))
self.notifier.log(Markdown(md))
self.notifier.log("")
return f"Successfully wrote to {path}"
@tool
def patch_file(self, path: str, before: str, after: str) -> str:
"""Patch the file at the specified by replacing before with after
Before **must** be present exactly once in the file, so that it can safely
be replaced with after.
Args:
path (str): The path to the file, in the format "path/to/file.txt"
before (str): The content that will be replaced
after (str): The content it will be replaced with
"""
self.notifier.status(f"editing {path}")
patho = system.to_patho(path)
if not patho.exists():
raise ValueError(f"You can't patch {path} - it does not exist yet")
if not system.is_active(path):
raise ValueError(f"You must view {path} using read_file before you patch it")
language = get_language(path)
content = patho.read_text()
if content.count(before) > 1:
raise ValueError("The before content is present multiple times in the file, be more specific.")
if content.count(before) < 1:
raise ValueError("The before content was not found in file, be careful that you recreate it exactly.")
content = content.replace(before, after)
system.remember_file(path)
patho.write_text(content)
output = f"""
```{language}
{before}
```
->
```{language}
{after}
```
"""
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + path, style=RULESTYLE, align="left"))
self.notifier.log(Markdown(output))
self.notifier.log("")
return "Succesfully replaced before with after."
@tool
def start_process(self, command: str) -> int:
"""Start a background process running the specified command
Use this exclusively for processes that you need to run in the background
because they do not terminate, such as running a webserver.
Args:
command (str): The shell command to run
"""
self.logshell(command, title="background")
if is_dangerous_command(command):
self.notifier.stop()
if not keep_unsafe_command_prompt(command):
raise RuntimeError(
f"The command {command} was rejected as dangerous by the user."
" Do not proceed further, instead ask for instructions."
)
self.notifier.start()
process = subprocess.Popen(
command,
shell=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=system.cwd,
env=system.env,
process_manager_instance = ProcessManager(notifier=self.notifier)
return process_manager_instance.run_command(
command=command,
shell_command=shell_command,
process_id=process_id,
)
process_id = system.add_process(process)
return process_id
@tool
def list_processes(self) -> Dict[int, str]:
"""List all running background processes with their IDs and commands."""
processes = system.get_processes()
process_list = "```\n" + "\n".join(f"id: {pid}, command: {cmd}" for pid, cmd in processes.items()) + "\n```"
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left"))
self.notifier.log(Markdown(process_list))
self.notifier.log("")
return processes
@tool
def view_process_output(self, process_id: int) -> str:
"""View the output of a running background process
Args:
process_id (int): The ID of the process to view output.
"""
self.notifier.log("")
self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left"))
self.notifier.log(Markdown(f"```\nreading {process_id}\n```"))
self.notifier.log("")
output = system.view_process_output(process_id)
return output
@tool
def cancel_process(self, process_id: int) -> str:
"""Cancel the background process with the specified ID.
Args:
process_id (int): The ID of the process to be cancelled.
"""
result = system.cancel_process(process_id)
self.logshell(f"kill {process_id}")
if result:
return f"process {process_id} cancelled"
else:
return f"no known process {process_id}"
@tool
def change_dir(self, path: str) -> str:
"""Change the directory to the specified path
Args:
path (str): The new dir path, in the format "path/to/dir"
"""
patho = system.to_patho(path)
if not patho.is_dir():
raise ValueError(f"The directory {path} does not exist")
if patho.resolve() < Path(os.getcwd()).resolve():
raise ValueError("You can cd into subdirs but not above the directory where we started.")
self.logshell(f"cd {path}")
system.cwd = str(patho)
return path
@tool
def fetch_web_content(self, url: str) -> str:

View File

@@ -0,0 +1,11 @@
from goose.notifier import Notifier
from goose.toolkit.utils import RULEPREFIX, RULESTYLE
from rich.markdown import Markdown
from rich.rule import Rule
def log_command(notifier: Notifier, command: str, path: str, title: str = "shell") -> None:
notifier.log("")
notifier.log(Rule(RULEPREFIX + f"{title} | [dim magenta]{path}[/]", style=RULESTYLE, align="left"))
notifier.log(Markdown(f"```bash\n{command}\n```"))
notifier.log("")

View File

@@ -29,7 +29,7 @@ def toolkit(tmpdir):
def test_start_process(toolkit):
process_id = toolkit.start_process("python -m http.server 8000")
process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8000")
assert process_id > 0
time.sleep(2) # Give the server time to start
@@ -39,24 +39,24 @@ def test_start_process(toolkit):
assert response.status_code == 200
except requests.ConnectionError:
pytest.fail("HTTP server did not start successfully")
output = toolkit.view_process_output(process_id)
output = toolkit.process_manager(command="view_output", process_id=process_id)
assert "200" in output
def test_list_processes(toolkit):
process_id = toolkit.start_process("python -m http.server 8001")
processes = toolkit.list_processes()
process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8001")
processes = toolkit.process_manager(command="list")
assert process_id in processes
assert "python -m http.server 8001" in processes[process_id]
def test_cancel_process(toolkit):
process_id = toolkit.start_process("python -m http.server 8003")
process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8003")
time.sleep(2) # Give the server time to start
result = toolkit.cancel_process(process_id)
assert result == f"process {process_id} cancelled"
result = toolkit.process_manager(command="cancel", process_id=process_id)
assert result == f"Process {process_id} cancelled"
# Verify that the process is no longer running
with pytest.raises(ValueError):
toolkit.view_process_output(process_id)
toolkit.process_manager(command="view_output", process_id=process_id)

View File

@@ -30,54 +30,54 @@ def toolkit(tmpdir):
def test_shell(toolkit, tmpdir):
result = toolkit.shell("echo 'Hello, World!'")
result = toolkit.bash(command="echo 'Hello, World!'")
assert "Hello, World!" in result
def test_read_write_file(toolkit, tmpdir):
def test_text_editor_read_write_file(toolkit, tmpdir):
test_file = tmpdir.join("test_file.txt")
content = "Test content"
toolkit.write_file(str(test_file), content)
toolkit.text_editor(command="create", path=str(test_file), file_text=content)
assert test_file.read() == content
result = toolkit.read_file(str(test_file))
assert "The file content at" in result
result = toolkit.text_editor(command="view", path=str(test_file))
assert "Displayed content of" in result
assert system.is_active(str(test_file))
def test_patch_file(toolkit, tmpdir):
def test_text_editor_patch_file(toolkit, tmpdir):
test_file = tmpdir.join("test_file.txt")
test_file.write("Hello, World!")
toolkit.read_file(str(test_file)) # Remember the file
result = toolkit.patch_file(str(test_file), "World", "Universe")
assert "Succesfully replaced before with after" in result
toolkit.text_editor(command="view", path=str(test_file)) # Remember the file
result = toolkit.text_editor(command="str_replace", path=str(test_file), old_str="World", new_str="Universe")
assert "Successfully replaced before with after" in result
assert test_file.read() == "Hello, Universe!"
def test_change_dir(toolkit, tmpdir):
subdir = tmpdir.mkdir("subdir")
result = toolkit.change_dir(str(subdir))
assert result == str(subdir)
result = toolkit.bash(working_dir=str(subdir))
assert str(subdir) in result
assert system.cwd == str(subdir)
def test_start_process(toolkit, tmpdir):
process_id = toolkit.start_process("python -m http.server 8000")
process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8000")
assert process_id > 0
# Check if the process is in the list of running processes
processes = toolkit.list_processes()
processes = toolkit.process_manager(command="list")
assert process_id in processes
assert "python -m http.server 8000" in processes[process_id]
def test_list_processes(toolkit, tmpdir):
process_id1 = toolkit.start_process("python -m http.server 8001")
process_id2 = toolkit.start_process("python -m http.server 8002")
process_id1 = toolkit.process_manager(command="start", shell_command="python -m http.server 8001")
process_id2 = toolkit.process_manager(command="start", shell_command="python -m http.server 8002")
processes = toolkit.list_processes()
processes = toolkit.process_manager(command="list")
assert process_id1 in processes
assert process_id2 in processes
assert "python -m http.server 8001" in processes[process_id1]
@@ -85,13 +85,13 @@ def test_list_processes(toolkit, tmpdir):
def test_cancel_process(toolkit, tmpdir):
process_id = toolkit.start_process("python -m http.server 8003")
process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8003")
result = toolkit.cancel_process(process_id)
assert result == f"process {process_id} cancelled"
result = toolkit.process_manager(command="cancel", process_id=process_id)
assert result == f"Process {process_id} cancelled"
# Verify that the process is no longer in the list
processes = toolkit.list_processes()
processes = toolkit.process_manager(command="list")
assert process_id not in processes