Files
turso/testing/cli_tests/test_turso_cli.py
Pekka Enberg b895381ae6 Revert "Merge 'Reachable assertions in Antithesis Python Test for better logging' from Pedro Muniz"
This reverts commit dbbc3f5190, reversing
changes made to 1cd5a49705. We're missing
some mandatory parameters, causing these to fail under Antithesis.
2025-07-08 17:51:12 +03:00

184 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import select
import subprocess
import time
from pathlib import Path
from time import sleep
from typing import Callable, List, Optional
from cli_tests import console
PIPE_BUF = 4096
class ShellConfig:
def __init__(self, exe_name, flags: str = "-q"):
self.sqlite_exec: str = exe_name
self.sqlite_flags: List[str] = flags.split()
self.cwd = os.getcwd()
self.test_dir: Path = Path("testing")
self.py_folder: Path = Path("cli_tests")
self.test_files: Path = Path("test_files")
class TursoShell:
def __init__(self, config: ShellConfig, init_commands: Optional[str] = None):
self.config = config
self.pipe = self._start_repl(init_commands)
def _start_repl(self, init_commands: Optional[str]) -> subprocess.Popen:
env = os.environ.copy()
env["RUST_BACKTRACE"] = "1"
pipe = subprocess.Popen(
[self.config.sqlite_exec, *self.config.sqlite_flags],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
env=env,
)
if init_commands and pipe.stdin is not None:
pipe.stdin.write((init_commands + "\n").encode())
pipe.stdin.flush()
return pipe
def get_test_filepath(self) -> Path:
return self.config.test_dir / "limbo_output.txt"
def execute(self, sql: str) -> str:
end_marker = "END_OF_RESULT"
self._write_to_pipe(sql)
# If we're redirecting output, return so test's don't hang
if sql.strip().startswith(".output"):
return ""
self._write_to_pipe(f"SELECT '{end_marker}';")
output = ""
while True:
ready, _, errors = select.select(
[self.pipe.stdout, self.pipe.stderr],
[],
[self.pipe.stdout, self.pipe.stderr],
)
ready_or_errors = set(ready + errors)
if self.pipe.stderr in ready_or_errors:
fragment = self.pipe.stderr.read(PIPE_BUF).decode()
if not fragment:
console.error(output, end="", _stack_offset=2)
raise RuntimeError("Error encountered in Turso shell.")
output += fragment
if self.pipe.stdout in ready_or_errors:
fragment = self.pipe.stdout.read(PIPE_BUF).decode()
output += fragment
if output.rstrip().endswith(end_marker):
break
return self._clean_output(output, end_marker)
def _write_to_pipe(self, command: str) -> None:
if not self.pipe.stdin:
raise RuntimeError("Failed to start Turso REPL")
self.pipe.stdin.write((command + "\n").encode())
self.pipe.stdin.flush()
@staticmethod
def _clean_output(output: str, marker: str) -> str:
output = output.rstrip().removesuffix(marker)
lines = [line.strip() for line in output.split("\n") if line]
return "\n".join(lines)
def quit(self) -> None:
self._write_to_pipe(".quit")
sleep(0.3)
self.pipe.terminate()
self.pipe.kill()
class TestTursoShell:
def __init__(
self,
init_commands: Optional[str] = None,
init_blobs_table: bool = False,
exec_name: Optional[str] = None,
use_testing_db: bool = False,
flags="",
):
if exec_name is None:
exec_name = os.environ.get("SQLITE_EXEC", "./scripts/limbo-sqlite3")
self.config = ShellConfig(exe_name=exec_name, flags=flags)
if use_testing_db:
self.init_test_db()
init_commands = ".open testing/testing_clone.db"
if init_commands is None:
# Default initialization
init_commands = """
CREATE TABLE users (id INTEGER PRIMARY KEY, first_name TEXT, last_name TEXT, age INTEGER);
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
INSERT INTO users VALUES (1, 'Alice', 'Smith', 30), (2, 'Bob', 'Johnson', 25),
(3, 'Charlie', 'Brown', 66), (4, 'David', 'Nichols', 70);
INSERT INTO products VALUES (1, 'Hat', 19.99), (2, 'Shirt', 29.99),
(3, 'Shorts', 39.99), (4, 'Dress', 49.99);
"""
if init_blobs_table:
init_commands += """
CREATE TABLE t (x1, x2, x3, x4);
INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3), zeroblob(1024 - 4));"""
init_commands += "\n.nullvalue TURSO"
self.shell = TursoShell(self.config, init_commands)
def quit(self, cleanup=True):
if cleanup:
self.cleanup_test_db()
self.shell.quit()
def run_test(self, name: str, sql: str, expected: str) -> None:
console.test(f"Running test: {name}", _stack_offset=2)
actual = self.shell.execute(sql)
assert actual == expected, (
f"Test failed: {name}\nSQL: {sql}\nExpected:\n{repr(expected)}\nActual:\n{repr(actual)}"
)
def run_debug(self, sql: str):
console.debug(f"debugging: {sql}", _stack_offset=2)
actual = self.shell.execute(sql)
console.debug(f"OUTPUT:\n{repr(actual)}", _stack_offset=2)
def run_test_fn(self, sql: str, validate: Callable[[str], bool], desc: str = "") -> None:
# Print the test that is executing before executing the sql command
# Printing later confuses the user of the code what test has actually failed
if desc:
console.test(f"Testing: {desc}", _stack_offset=2)
actual = self.shell.execute(sql)
assert validate(actual), f"Test failed\nSQL: {sql}\nActual:\n{repr(actual)}"
def execute_dot(self, dot_command: str) -> None:
self.shell._write_to_pipe(dot_command)
def init_test_db(self) -> None:
self.cleanup_test_db()
path = os.path.join("testing", "testing_clone.db")
if os.path.exists(path):
os.remove(path)
time.sleep(0.1) # Ensure the file is removed before cloning
cmd = "sqlite3 testing/testing.db '.clone testing/testing_clone.db'"
subprocess.run(cmd, shell=True, capture_output=True, text=True)
if not os.path.exists("testing/testing_clone.db"):
raise RuntimeError("Failed to clone testing.db to testing/testing_clone.db")
def cleanup_test_db(self) -> None:
path = os.path.join("testing", "testing_clone.db")
if os.path.exists(path):
os.remove(path)
walpath = os.path.join("testing", "testing.db-wal")
if os.path.exists(walpath):
os.remove(walpath)
# Enables the use of `with` syntax
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.cleanup_test_db()
self.quit()