diff --git a/src/goose/synopsis/bash.py b/src/goose/synopsis/bash.py new file mode 100644 index 00000000..f38ed3ad --- /dev/null +++ b/src/goose/synopsis/bash.py @@ -0,0 +1,49 @@ +import os +from pathlib import Path + +from goose.notifier import Notifier +from goose.view import ExchangeView +from goose.synopsis.system import system +from goose.utils.shell import shell +from goose.synopsis.util import log_command + + +class Bash: + def __init__(self, notifier: Notifier, exchange_view: ExchangeView) -> None: + self.notifier = notifier + self.exchange_view = exchange_view + + def _logshell(self, command: str, title: str = "shell") -> None: + log_command(self.notifier, command, path=os.path.abspath(system.cwd), title=title) + + def _source(self, path: str) -> str: + """Source the file at path.""" + source_command = f"source {path} && env" + self._logshell(f"source {path}") + result = shell(source_command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env) + env_vars = dict(line.split("=", 1) for line in result.splitlines() if "=" in line) + system.env.update(env_vars) + return f"Sourced {path}" + + def _shell(self, command: str) -> str: + """Execute any shell command.""" + if command.startswith("cat"): + raise ValueError("You must read files through the text_editor tool with 'view' comamnd.") + if command.startswith("cd"): + raise ValueError("You must change dirs through the bash tool with 'working_dir' param.") + if command.startswith("source"): + raise ValueError("You must source files through the bash tool with 'source' command.") + + self._logshell(command) + return shell(command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env) + + def _change_dir(self, path: str) -> str: + """Change the directory to the specified path.""" + patho = system.to_patho(path) + if not patho.is_dir(): + raise ValueError(f"The directory {path} does not exist") + if patho.resolve() < Path(os.getcwd()).resolve(): + raise ValueError("You can cd into subdirs but not above the directory where we started.") + self._logshell(f"cd {path}") + system.cwd = str(patho) + return f"Changed directory to: {path}" diff --git a/src/goose/synopsis/process_manager.py b/src/goose/synopsis/process_manager.py new file mode 100644 index 00000000..cb112d89 --- /dev/null +++ b/src/goose/synopsis/process_manager.py @@ -0,0 +1,95 @@ +import subprocess +import os +from typing import Literal, Dict +from rich.markdown import Markdown +from rich.rule import Rule +from goose.notifier import Notifier +from goose.synopsis.system import system +from goose.synopsis.util import log_command +from goose.toolkit.utils import RULEPREFIX, RULESTYLE +from goose.utils.shell import is_dangerous_command, keep_unsafe_command_prompt + +ProcessManagerCommand = Literal["start", "list", "view_output", "cancel"] + + +class ProcessManager: + def __init__(self, notifier: Notifier) -> None: + self.notifier = notifier + + # Command dispatch dictionary + self.command_dispatch = { + "start": self._start_process, + "list": self._list_processes, + "view_output": self._view_process_output, + "cancel": self._cancel_process, + } + + def _logshell(self, command: str, title: str = "background") -> None: + log_command(self.notifier, command, path=os.path.abspath(system.cwd), title=title) + + def _start_process(self, shell_command: str, **kwargs: dict) -> int: + """Start a background process running the specified command.""" + self._logshell(shell_command, title="background") + + if is_dangerous_command(shell_command): + self.notifier.stop() + if not keep_unsafe_command_prompt(shell_command): + raise RuntimeError(f"The command {shell_command} was rejected as dangerous.") + self.notifier.start() + + process = subprocess.Popen( + shell_command, + shell=True, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + cwd=system.cwd, + env=system.env, + ) + process_id = system.add_process(process) + return process_id + + def _list_processes(self, **kwargs: dict) -> Dict[int, str]: + """List all running background processes.""" + processes = system.get_processes() + process_list = "```\n" + "\n".join(f"id: {pid}, command: {cmd}" for pid, cmd in processes.items()) + "\n```" + self.notifier.log("") + self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left")) + self.notifier.log(Markdown(process_list)) + self.notifier.log("") + return processes + + def _view_process_output(self, process_id: int, **kwargs: dict) -> str: + """View the output of a running background process.""" + self.notifier.log("") + self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left")) + self.notifier.log(Markdown(f"```\nreading {process_id}\n```")) + self.notifier.log("") + output = system.view_process_output(process_id) + return output + + def _cancel_process(self, process_id: int, **kwargs: dict) -> str: + """Cancel the background process with the specified ID.""" + result = system.cancel_process(process_id) + self._logshell(f"kill {process_id}") + if result: + return f"Process {process_id} cancelled" + else: + return f"No known process with ID {process_id}" + + def run_command(self, command: ProcessManagerCommand, **kwargs: dict) -> str: + """ + Dispatch process management commands. + + Args: + command (ProcessManagerCommand): The process management command to execute. + **kwargs: Additional arguments for the commands, such as shell_command or process_id. + + Returns: + str: The result of the process management operation. + """ + if command not in self.command_dispatch: + raise ValueError(f"Unknown command '{command}'.") + + return self.command_dispatch[command](**kwargs) diff --git a/src/goose/synopsis/text_editor.py b/src/goose/synopsis/text_editor.py new file mode 100644 index 00000000..cb9da5ee --- /dev/null +++ b/src/goose/synopsis/text_editor.py @@ -0,0 +1,157 @@ +from typing import Optional, Literal +from pathlib import Path +from rich.markdown import Markdown +from rich.rule import Rule +from goose.notifier import Notifier +from goose.synopsis.system import system +from goose.toolkit.utils import RULEPREFIX, RULESTYLE, get_language + +TextEditorCommand = Literal["view", "create", "str_replace", "insert", "undo_edit"] + + +class TextEditor: + def __init__(self, notifier: Notifier) -> None: + self.notifier = notifier + self._file_history = {} + + # Command dispatch dictionary + self.command_dispatch = { + "view": self._view_file_or_directory, + "create": self._create_file, + "str_replace": self._replace_string, + "insert": self._insert_string, + "undo_edit": self._undo_edit, + } + + def _write_file(self, path: str, content: str) -> str: + """Write content to the file at path.""" + patho = system.to_patho(path) + + if patho.exists() and not system.is_active(path): + raise ValueError(f"You must view {path} using read_file before you overwrite it") + + self._save_file_history(patho) + patho.parent.mkdir(parents=True, exist_ok=True) + patho.write_text(content) + system.remember_file(path) + + language = get_language(path) + self._log_file_operation(path, content, language) + return f"Successfully wrote to {path}" + + def _patch_file(self, path: str, before: str, after: str) -> str: + """Patch the file by replacing 'before' with 'after'.""" + patho = system.to_patho(path) + + if not patho.exists(): + raise ValueError(f"You can't patch {path} - it does not exist yet") + if not system.is_active(path): + raise ValueError(f"You must view {path} using read_file before you patch it") + + content = patho.read_text() + + if content.count(before) != 1: + raise ValueError("The 'before' content must appear exactly once in the file.") + + self._save_file_history(patho) + content = content.replace(before, after) + system.remember_file(path) + patho.write_text(content) + + self._log_file_operation(path, f"{before} -> {after}", get_language(path)) + return "Successfully replaced before with after." + + def _save_file_history(self, patho: Path) -> None: + """Save the current content of the file to history for undo functionality.""" + content = patho.read_text() if patho.exists() else "" + self._file_history[str(patho)] = content + + def _undo_edit(self, path: str, **kwargs: dict) -> str: + """Undo the last edit made to a file.""" + patho = system.to_patho(path) + + if not patho.exists() or str(patho) not in self._file_history: + raise ValueError(f"No edit history available to undo changes on {path}.") + + previous_content = self._file_history.pop(str(patho)) + patho.write_text(previous_content) + system.remember_file(path) + + self._log_file_operation(path, "Undo edit", get_language(path)) + return f"Successfully undid the last edit on {path}" + + def _view_file_or_directory(self, path: str, view_range: Optional[list[int]] = None, **kwargs: dict) -> str: + """View the content of a file or directory.""" + patho = system.to_patho(path) + + if patho.is_file(): + return self._view_file(patho, view_range) + elif patho.is_dir(): + return self._view_directory(patho) + else: + raise ValueError(f"The path {path} does not exist.") + + def _view_file(self, patho: Path, view_range: Optional[list[int]]) -> str: + if not patho.exists(): + raise ValueError(f"The file {patho} does not exist.") + + with open(patho, "r") as f: + content = f.readlines() + + if view_range: + start_line, end_line = view_range + if start_line < 1 or end_line < start_line: + raise ValueError("Invalid view range.") + content = content[start_line - 1 : (end_line if end_line != -1 else len(content))] + + system.remember_file(str(patho)) + return f"Displayed content of {str(patho)}" + + def _view_directory(self, patho: Path) -> str: + files = [str(p) for p in patho.iterdir()] + dir_content = "\n".join(files) + return f"The contents of directory {str(patho)}:\n{dir_content}" + + def _insert_string(self, path: str, insert_line: int, new_str: str, **kwargs: dict) -> str: + """Insert a string into the file after a specific line number.""" + patho = system.to_patho(path) + if not patho.exists() or not system.is_active(path): + raise ValueError(f"You must view {path} before editing.") + + self._save_file_history(patho) + with open(patho, "r") as f: + lines = f.readlines() + + if insert_line < 0 or insert_line > len(lines): + raise ValueError("Insert line is out of range.") + + lines.insert(insert_line, new_str + "\n") + with open(patho, "w") as f: + f.writelines(lines) + + system.remember_file(path) + self._log_file_operation(path, new_str, get_language(path)) + return f"Successfully inserted new_str into {path} after line {insert_line}" + + def _create_file(self, path: str, file_text: str, **kwargs: dict) -> str: + """Create a new file with the given content.""" + return self._write_file(path, file_text) + + def _replace_string(self, path: str, old_str: str, new_str: str, **kwargs: dict) -> str: + """Replace a string in a file.""" + return self._patch_file(path, old_str, new_str) + + def _log_file_operation(self, path: str, content: str, language: Optional[str]) -> None: + """Log the file operation in markdown format.""" + md_content = f"```{language}\n{content}\n```" if language else f"```\n{content}\n```" + self.notifier.log("") + self.notifier.log(Rule(RULEPREFIX + path, style=RULESTYLE, align="left")) + self.notifier.log(Markdown(md_content)) + self.notifier.log("") + + def run_command(self, command: TextEditorCommand, path: str, **kwargs: dict) -> str: + """Dispatch text editing operations to the appropriate handler.""" + if command not in self.command_dispatch: + raise ValueError(f"Unknown command '{command}'.") + + return self.command_dispatch[command](path, **kwargs) diff --git a/src/goose/synopsis/toolkit.py b/src/goose/synopsis/toolkit.py index 8424987c..ea9b88c2 100644 --- a/src/goose/synopsis/toolkit.py +++ b/src/goose/synopsis/toolkit.py @@ -1,19 +1,15 @@ # janky global state for now, think about it +from collections import defaultdict import re -import subprocess -import os -from pathlib import Path import tempfile -from typing import Dict +from typing import Dict, Optional from exchange import Message import httpx -from goose.synopsis.system import system +from goose.synopsis.bash import Bash +from goose.synopsis.text_editor import TextEditor, TextEditorCommand +from goose.synopsis.process_manager import ProcessManager, ProcessManagerCommand from goose.toolkit.base import Toolkit, tool -from goose.toolkit.utils import RULEPREFIX, RULESTYLE, get_language -from goose.utils.shell import is_dangerous_command, shell, keep_unsafe_command_prompt -from rich.markdown import Markdown -from rich.rule import Rule class SynopsisDeveloper(Toolkit): @@ -21,230 +17,138 @@ class SynopsisDeveloper(Toolkit): def __init__(self, *args: object, **kwargs: Dict[str, object]) -> None: super().__init__(*args, **kwargs) + self._file_history = defaultdict(list) def system(self) -> str: """Retrieve system configuration details for developer""" system_prompt = Message.load("developer.md").text return system_prompt - def logshell(self, command: str, title: str = "shell") -> None: - self.notifier.log("") - self.notifier.log( - Rule(RULEPREFIX + f"{title} | [dim magenta]{os.path.abspath(system.cwd)}[/]", style=RULESTYLE, align="left") + @tool + def bash( + self, + command: Optional[str] = None, + working_dir: Optional[str] = None, + source_path: Optional[str] = None, + ) -> str: + """ + Run commands in a bash shell. + + Perform bash-related operations in a specific order: + 1. Change the working directory (if provided) + 2. Source a file (if provided) + 3. Run a shell command (if provided) + + At least one of the parameters must be provided. + + Args: + command (str, optional):The bash shell command to run. + working_dir (str, optional): The directory to change to. + source_path (str, optional): The file to source before running the command. + """ + assert any( + [command, working_dir, source_path] + ), "At least one of the parameters for bash shell must be provided." + + bash_tool = Bash(notifier=self.notifier, exchange_view=self.exchange_view) + outputs = [] + + if working_dir: + _out = bash_tool._change_dir(working_dir) + outputs.append(_out) + + if source_path: + _out = bash_tool._source(source_path) + outputs.append(_out) + + if command: + _out = bash_tool._shell(command) + outputs.append(_out) + + return "\n".join(outputs) + + @tool + def text_editor( + self, + command: TextEditorCommand, + path: str, + file_text: Optional[str] = None, + insert_line: Optional[int] = None, + new_str: Optional[str] = None, + old_str: Optional[str] = None, + view_range: Optional[list[int]] = None, + ) -> str: + """ + Perform text editing operations on files. + + The `command` parameter specifies the operation to perform. Allowed options are: + - `view`: View the content of a file or directory. + - `create`: Create a new file with the given content. + - `str_replace`: Replace a string in a file with a new string. + - `insert`: Insert a string into a file after a specific line number. + - `undo_edit`: Undo the last edit made to a file. + + Args: + command (str): The commands to run. + Allowed options are: `view`, `create`, `str_replace`, `insert`, `undo_edit`. + path (str): Absolute path (or relative path against cwd) to file or directory, + e.g. `/repo/file.py` or `/repo` or `curr_dir_file.py`. + file_text (str, optional): Required parameter of `create` command, with the content + of the file to be created. + insert_line (int, optional): Required parameter of `insert` command. + The `new_str` will be inserted AFTER the line `insert_line` of `path`. + new_str (str, optional): Optional parameter of `str_replace` command + containing the new string (if not given, no string will be added). + Required parameter of `insert` command containing the string to insert. + old_str (str, optional): Required parameter of `str_replace` command containing the + string in `path` to replace. + view_range (list, optional): Optional parameter of `view` command when `path` points to a file. + If none is given, the full file is shown. If provided, the file will be shown in the indicated line + number range, e.g. [11, 12] will show lines 11 and 12. Indexing at 1 to start. + Setting `[start_line, -1]` shows all lines from `start_line` to the end of the file. + """ + text_editor_instance = TextEditor(notifier=self.notifier) + return text_editor_instance.run_command( + command=command, + path=path, + file_text=file_text, + insert_line=insert_line, + new_str=new_str, + old_str=old_str, + view_range=view_range, ) - self.notifier.log(Markdown(f"```bash\n{command}\n```")) - self.notifier.log("") @tool - def source(self, path: str) -> str: - """Source the file at path, keeping the updates reflected in future shell commands + def process_manager( + self, + command: ProcessManagerCommand, + shell_command: Optional[str] = None, + process_id: Optional[int] = None, + ) -> str: + """ + Manage background processes. + + The `command` parameter specifies the operation to perform. Allowed options are: + - `start`: Start a background process by running a shell command. + - `list`: List all currently running background processes with their IDs and commands. + - `view_output`: View the output of a running background process by providing its ID. + - `cancel`: Cancel a running background process by providing its ID. Args: - path (str): The path to the file to source. + command (str): The command to run. + Allowed options are: `start`, `list`, `view_output`, `cancel`. + shell_command (str, optional): Required parameter for the `start` command, representing + the shell command to be executed in the background. + Example: `"python -m http.server &"` to start a web server in the background. + process_id (int, optional): Required parameter for `view_output` and `cancel` commands, + representing the process ID of the background process to manage. """ - source_command = f"source {path} && env" - self.logshell(f"source {path}") - result = shell(source_command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env) - env_vars = dict(line.split("=", 1) for line in result.splitlines() if "=" in line) - system.env.update(env_vars) - return f"Sourced {path}" - - @tool - def shell(self, command: str) -> str: - """Execute any command on the shell - - Args: - command (str): The shell command to run. It can support multiline statements - if you need to run more than one at a time - """ - if command.startswith("cat"): - raise ValueError("You must read files through the read_file tool.") - if command.startswith("cd"): - raise ValueError("You must change dirs through the change_dir tool.") - if command.startswith("source"): - raise ValueError("You must source files through the source tool.") - - self.logshell(command) - return shell(command, self.notifier, self.exchange_view, cwd=system.cwd, env=system.env) - - @tool - def read_file(self, path: str) -> str: - """Read the content of the file at path - - Args: - path (str): The destination file path, in the format "path/to/file.txt" - """ - system.remember_file(path) - self.logshell(f"cat {path}") - return f"The file content at {path} has been updated above." - - @tool - def write_file(self, path: str, content: str) -> str: - """ - Write a file at the specified path with the provided content. This will create any directories if they do not exist. - The content will fully overwrite the existing file. - - Args: - path (str): The destination file path, in the format "path/to/file.txt" - content (str): The raw file content. - """ # noqa: E501 - patho = system.to_patho(path) - - if patho.exists() and not system.is_active(path): - print(f"We are warning the LLM to view before write in write_file, with path={path} and patho={str(patho)}") - raise ValueError(f"You must view {path} using read_file before you overwrite it") - - patho.parent.mkdir(parents=True, exist_ok=True) - patho.write_text(content) - system.remember_file(path) - - language = get_language(path) - md = f"```{language}\n{content}\n```" - - self.notifier.log("") - self.notifier.log(Rule(RULEPREFIX + path, style=RULESTYLE, align="left")) - self.notifier.log(Markdown(md)) - self.notifier.log("") - - return f"Successfully wrote to {path}" - - @tool - def patch_file(self, path: str, before: str, after: str) -> str: - """Patch the file at the specified by replacing before with after - - Before **must** be present exactly once in the file, so that it can safely - be replaced with after. - - Args: - path (str): The path to the file, in the format "path/to/file.txt" - before (str): The content that will be replaced - after (str): The content it will be replaced with - """ - self.notifier.status(f"editing {path}") - patho = system.to_patho(path) - - if not patho.exists(): - raise ValueError(f"You can't patch {path} - it does not exist yet") - - if not system.is_active(path): - raise ValueError(f"You must view {path} using read_file before you patch it") - - language = get_language(path) - - content = patho.read_text() - - if content.count(before) > 1: - raise ValueError("The before content is present multiple times in the file, be more specific.") - if content.count(before) < 1: - raise ValueError("The before content was not found in file, be careful that you recreate it exactly.") - - content = content.replace(before, after) - system.remember_file(path) - patho.write_text(content) - - output = f""" -```{language} -{before} -``` --> -```{language} -{after} -``` -""" - self.notifier.log("") - self.notifier.log(Rule(RULEPREFIX + path, style=RULESTYLE, align="left")) - self.notifier.log(Markdown(output)) - self.notifier.log("") - return "Succesfully replaced before with after." - - @tool - def start_process(self, command: str) -> int: - """Start a background process running the specified command - - Use this exclusively for processes that you need to run in the background - because they do not terminate, such as running a webserver. - - Args: - command (str): The shell command to run - """ - self.logshell(command, title="background") - - if is_dangerous_command(command): - self.notifier.stop() - if not keep_unsafe_command_prompt(command): - raise RuntimeError( - f"The command {command} was rejected as dangerous by the user." - " Do not proceed further, instead ask for instructions." - ) - self.notifier.start() - - process = subprocess.Popen( - command, - shell=True, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - cwd=system.cwd, - env=system.env, + process_manager_instance = ProcessManager(notifier=self.notifier) + return process_manager_instance.run_command( + command=command, + shell_command=shell_command, + process_id=process_id, ) - process_id = system.add_process(process) - return process_id - - @tool - def list_processes(self) -> Dict[int, str]: - """List all running background processes with their IDs and commands.""" - processes = system.get_processes() - process_list = "```\n" + "\n".join(f"id: {pid}, command: {cmd}" for pid, cmd in processes.items()) + "\n```" - self.notifier.log("") - self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left")) - self.notifier.log(Markdown(process_list)) - self.notifier.log("") - return processes - - @tool - def view_process_output(self, process_id: int) -> str: - """View the output of a running background process - - Args: - process_id (int): The ID of the process to view output. - """ - self.notifier.log("") - self.notifier.log(Rule(RULEPREFIX + "processes", style=RULESTYLE, align="left")) - self.notifier.log(Markdown(f"```\nreading {process_id}\n```")) - self.notifier.log("") - output = system.view_process_output(process_id) - return output - - @tool - def cancel_process(self, process_id: int) -> str: - """Cancel the background process with the specified ID. - - Args: - process_id (int): The ID of the process to be cancelled. - """ - result = system.cancel_process(process_id) - self.logshell(f"kill {process_id}") - if result: - return f"process {process_id} cancelled" - else: - return f"no known process {process_id}" - - @tool - def change_dir(self, path: str) -> str: - """Change the directory to the specified path - - Args: - path (str): The new dir path, in the format "path/to/dir" - """ - patho = system.to_patho(path) - if not patho.is_dir(): - raise ValueError(f"The directory {path} does not exist") - if patho.resolve() < Path(os.getcwd()).resolve(): - raise ValueError("You can cd into subdirs but not above the directory where we started.") - self.logshell(f"cd {path}") - system.cwd = str(patho) - return path @tool def fetch_web_content(self, url: str) -> str: diff --git a/src/goose/synopsis/util.py b/src/goose/synopsis/util.py new file mode 100644 index 00000000..3f73dd4e --- /dev/null +++ b/src/goose/synopsis/util.py @@ -0,0 +1,11 @@ +from goose.notifier import Notifier +from goose.toolkit.utils import RULEPREFIX, RULESTYLE +from rich.markdown import Markdown +from rich.rule import Rule + + +def log_command(notifier: Notifier, command: str, path: str, title: str = "shell") -> None: + notifier.log("") + notifier.log(Rule(RULEPREFIX + f"{title} | [dim magenta]{path}[/]", style=RULESTYLE, align="left")) + notifier.log(Markdown(f"```bash\n{command}\n```")) + notifier.log("") diff --git a/tests/synopsis/test_process_management.py b/tests/synopsis/test_process_management.py index ae9483df..e166aea9 100644 --- a/tests/synopsis/test_process_management.py +++ b/tests/synopsis/test_process_management.py @@ -29,7 +29,7 @@ def toolkit(tmpdir): def test_start_process(toolkit): - process_id = toolkit.start_process("python -m http.server 8000") + process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8000") assert process_id > 0 time.sleep(2) # Give the server time to start @@ -39,24 +39,24 @@ def test_start_process(toolkit): assert response.status_code == 200 except requests.ConnectionError: pytest.fail("HTTP server did not start successfully") - output = toolkit.view_process_output(process_id) + output = toolkit.process_manager(command="view_output", process_id=process_id) assert "200" in output def test_list_processes(toolkit): - process_id = toolkit.start_process("python -m http.server 8001") - processes = toolkit.list_processes() + process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8001") + processes = toolkit.process_manager(command="list") assert process_id in processes assert "python -m http.server 8001" in processes[process_id] def test_cancel_process(toolkit): - process_id = toolkit.start_process("python -m http.server 8003") + process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8003") time.sleep(2) # Give the server time to start - result = toolkit.cancel_process(process_id) - assert result == f"process {process_id} cancelled" + result = toolkit.process_manager(command="cancel", process_id=process_id) + assert result == f"Process {process_id} cancelled" # Verify that the process is no longer running with pytest.raises(ValueError): - toolkit.view_process_output(process_id) + toolkit.process_manager(command="view_output", process_id=process_id) diff --git a/tests/synopsis/test_toolkit.py b/tests/synopsis/test_toolkit.py index f78017cb..29aabf59 100644 --- a/tests/synopsis/test_toolkit.py +++ b/tests/synopsis/test_toolkit.py @@ -30,54 +30,54 @@ def toolkit(tmpdir): def test_shell(toolkit, tmpdir): - result = toolkit.shell("echo 'Hello, World!'") + result = toolkit.bash(command="echo 'Hello, World!'") assert "Hello, World!" in result -def test_read_write_file(toolkit, tmpdir): +def test_text_editor_read_write_file(toolkit, tmpdir): test_file = tmpdir.join("test_file.txt") content = "Test content" - toolkit.write_file(str(test_file), content) + toolkit.text_editor(command="create", path=str(test_file), file_text=content) assert test_file.read() == content - result = toolkit.read_file(str(test_file)) - assert "The file content at" in result + result = toolkit.text_editor(command="view", path=str(test_file)) + assert "Displayed content of" in result assert system.is_active(str(test_file)) -def test_patch_file(toolkit, tmpdir): +def test_text_editor_patch_file(toolkit, tmpdir): test_file = tmpdir.join("test_file.txt") test_file.write("Hello, World!") - toolkit.read_file(str(test_file)) # Remember the file - result = toolkit.patch_file(str(test_file), "World", "Universe") - assert "Succesfully replaced before with after" in result + toolkit.text_editor(command="view", path=str(test_file)) # Remember the file + result = toolkit.text_editor(command="str_replace", path=str(test_file), old_str="World", new_str="Universe") + assert "Successfully replaced before with after" in result assert test_file.read() == "Hello, Universe!" def test_change_dir(toolkit, tmpdir): subdir = tmpdir.mkdir("subdir") - result = toolkit.change_dir(str(subdir)) - assert result == str(subdir) + result = toolkit.bash(working_dir=str(subdir)) + assert str(subdir) in result assert system.cwd == str(subdir) def test_start_process(toolkit, tmpdir): - process_id = toolkit.start_process("python -m http.server 8000") + process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8000") assert process_id > 0 # Check if the process is in the list of running processes - processes = toolkit.list_processes() + processes = toolkit.process_manager(command="list") assert process_id in processes assert "python -m http.server 8000" in processes[process_id] def test_list_processes(toolkit, tmpdir): - process_id1 = toolkit.start_process("python -m http.server 8001") - process_id2 = toolkit.start_process("python -m http.server 8002") + process_id1 = toolkit.process_manager(command="start", shell_command="python -m http.server 8001") + process_id2 = toolkit.process_manager(command="start", shell_command="python -m http.server 8002") - processes = toolkit.list_processes() + processes = toolkit.process_manager(command="list") assert process_id1 in processes assert process_id2 in processes assert "python -m http.server 8001" in processes[process_id1] @@ -85,13 +85,13 @@ def test_list_processes(toolkit, tmpdir): def test_cancel_process(toolkit, tmpdir): - process_id = toolkit.start_process("python -m http.server 8003") + process_id = toolkit.process_manager(command="start", shell_command="python -m http.server 8003") - result = toolkit.cancel_process(process_id) - assert result == f"process {process_id} cancelled" + result = toolkit.process_manager(command="cancel", process_id=process_id) + assert result == f"Process {process_id} cancelled" # Verify that the process is no longer in the list - processes = toolkit.list_processes() + processes = toolkit.process_manager(command="list") assert process_id not in processes