Merge branch 'main' into vtab_schema

This commit is contained in:
Preston Thorpe
2025-04-28 22:09:10 -04:00
committed by GitHub
218 changed files with 36922 additions and 13792 deletions

View File

@@ -1,8 +1,9 @@
#!/usr/bin/env python3
from test_limbo_cli import TestLimboShell
from cli_tests.test_limbo_cli import TestLimboShell
from pathlib import Path
import time
import os
from cli_tests import console
def test_basic_queries():
@@ -242,8 +243,66 @@ def test_table_patterns():
shell.quit()
if __name__ == "__main__":
print("Running all Limbo CLI tests...")
def test_update_with_limit():
limbo = TestLimboShell(
"CREATE TABLE t (a,b,c); insert into t values (1,2,3), (4,5,6), (7,8,9), (1,2,3),(4,5,6), (7,8,9);"
)
limbo.run_test("update-limit", "UPDATE t SET a = 10 LIMIT 1;", "")
limbo.run_test("update-limit-result", "SELECT COUNT(*) from t WHERE a = 10;", "1")
limbo.run_test("update-limit-zero", "UPDATE t SET a = 100 LIMIT 0;", "")
limbo.run_test(
"update-limit-zero-result", "SELECT COUNT(*) from t WHERE a = 100;", "0"
)
limbo.run_test("update-limit-all", "UPDATE t SET a = 100 LIMIT -1;", "")
# negative limit is treated as no limit in sqlite due to check for --val = 0
limbo.run_test("update-limit-result", "SELECT COUNT(*) from t WHERE a = 100;", "6")
limbo.run_test(
"udpate-limit-where", "UPDATE t SET a = 333 WHERE b = 5 LIMIT 1;", ""
)
limbo.run_test(
"update-limit-where-result", "SELECT COUNT(*) from t WHERE a = 333;", "1"
)
limbo.quit()
def test_update_with_limit_and_offset():
limbo = TestLimboShell(
"CREATE TABLE t (a,b,c); insert into t values (1,2,3), (4,5,6), (7,8,9), (1,2,3),(4,5,6), (7,8,9);"
)
limbo.run_test("update-limit-offset", "UPDATE t SET a = 10 LIMIT 1 OFFSET 3;", "")
limbo.run_test(
"update-limit-offset-result", "SELECT COUNT(*) from t WHERE a = 10;", "1"
)
limbo.run_test("update-limit-result", "SELECT a from t LIMIT 4;", "1\n4\n7\n10")
limbo.run_test(
"update-limit-offset-zero", "UPDATE t SET a = 100 LIMIT 0 OFFSET 0;", ""
)
limbo.run_test(
"update-limit-zero-result", "SELECT COUNT(*) from t WHERE a = 100;", "0"
)
limbo.run_test("update-limit-all", "UPDATE t SET a = 100 LIMIT -1 OFFSET 1;", "")
limbo.run_test("update-limit-result", "SELECT COUNT(*) from t WHERE a = 100;", "5")
limbo.run_test(
"udpate-limit-where", "UPDATE t SET a = 333 WHERE b = 5 LIMIT 1 OFFSET 2;", ""
)
limbo.run_test(
"update-limit-where-result", "SELECT COUNT(*) from t WHERE a = 333;", "0"
)
limbo.quit()
def test_insert_default_values():
limbo = TestLimboShell(
"CREATE TABLE t (a integer default(42),b integer default (43),c integer default(44));"
)
for _ in range(1, 10):
limbo.execute_dot("INSERT INTO t DEFAULT VALUES;")
limbo.run_test("insert-default-values", "SELECT * FROM t;", "42|43|44\n" * 9)
limbo.quit()
def main():
console.info("Running all Limbo CLI tests...")
test_basic_queries()
test_schema_operations()
test_file_operations()
@@ -259,4 +318,10 @@ if __name__ == "__main__":
test_import_csv_verbose()
test_import_csv_skip()
test_table_patterns()
print("All tests have passed")
test_update_with_limit()
test_update_with_limit_and_offset()
console.info("All tests have passed")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,122 @@
from typing import Any, Optional, Union
from rich.console import Console, JustifyMethod
from rich.theme import Theme
from rich.style import Style
custom_theme = Theme(
{
"info": "bold blue",
"error": "bold red",
"debug": "bold blue",
"test": "bold green",
}
)
console = Console(theme=custom_theme, force_terminal=True)
def info(
*objects: Any,
sep: str = " ",
end: str = "\n",
style: Optional[Union[str, Style]] = None,
justify: Optional[JustifyMethod] = None,
emoji: Optional[bool] = None,
markup: Optional[bool] = None,
highlight: Optional[bool] = None,
log_locals: bool = False,
_stack_offset: int = 1,
):
console.log(
"[info]INFO[/info]",
*objects,
sep=sep,
end=end,
style=style,
justify=justify,
emoji=emoji,
markup=markup,
highlight=highlight,
log_locals=log_locals,
_stack_offset=_stack_offset + 1,
)
def error(
*objects: Any,
sep: str = " ",
end: str = "\n",
style: Optional[Union[str, Style]] = None,
justify: Optional[JustifyMethod] = None,
emoji: Optional[bool] = None,
markup: Optional[bool] = None,
highlight: Optional[bool] = None,
log_locals: bool = False,
_stack_offset: int = 1,
):
console.log(
"[error]ERROR[/error]",
*objects,
sep=sep,
end=end,
style=style,
justify=justify,
emoji=emoji,
markup=markup,
highlight=highlight,
log_locals=log_locals,
_stack_offset=_stack_offset + 1,
)
def debug(
*objects: Any,
sep: str = " ",
end: str = "\n",
style: Optional[Union[str, Style]] = None,
justify: Optional[JustifyMethod] = None,
emoji: Optional[bool] = None,
markup: Optional[bool] = None,
highlight: Optional[bool] = None,
log_locals: bool = False,
_stack_offset: int = 1,
):
console.log(
"[debug]DEBUG[/debug]",
*objects,
sep=sep,
end=end,
style=style,
justify=justify,
emoji=emoji,
markup=markup,
highlight=highlight,
log_locals=log_locals,
_stack_offset=_stack_offset + 1,
)
def test(
*objects: Any,
sep: str = " ",
end: str = "\n",
style: Optional[Union[str, Style]] = None,
justify: Optional[JustifyMethod] = None,
emoji: Optional[bool] = None,
markup: Optional[bool] = None,
highlight: Optional[bool] = None,
log_locals: bool = False,
_stack_offset: int = 1,
):
console.log(
"[test]TEST[/test]",
*objects,
sep=sep,
end=end,
style=style,
justify=justify,
emoji=emoji,
markup=markup,
highlight=highlight,
log_locals=log_locals,
_stack_offset=_stack_offset + 1,
)

View File

@@ -0,0 +1,371 @@
#!/usr/bin/env python3
# Eventually extract these tests to be in the fuzzing integration tests
import os
from faker import Faker
from faker.providers.lorem.en_US import Provider as P
from cli_tests.test_limbo_cli import TestLimboShell
from pydantic import BaseModel
from cli_tests import console
from enum import Enum
import random
import sqlite3
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
keywords = [
"ABORT",
"ACTION",
"ADD",
"AFTER",
"ALL",
"ALTER",
"ALWAYS",
"ANALYZE",
"AND",
"AS",
"ASC",
"ATTACH",
"AUTOINCREMENT",
"BEFORE",
"BEGIN",
"BETWEEN",
"BY",
"CASCADE",
"CASE",
"CAST",
"CHECK",
"COLLATE",
"COLUMN",
"COMMIT",
"CONFLICT",
"CONSTRAINT",
"CREATE",
"CROSS",
"CURRENT",
"CURRENT_DATE",
"CURRENT_TIME",
"CURRENT_TIMESTAMP",
"DATABASE",
"DEFAULT",
"DEFERRABLE",
"DEFERRED",
"DELETE",
"DESC",
"DETACH",
"DISTINCT",
"DO",
"DROP",
"EACH",
"ELSE",
"END",
"ESCAPE",
"EXCEPT",
"EXCLUDE",
"EXCLUSIVE",
"EXISTS",
"EXPLAIN",
"FAIL",
"FILTER",
"FIRST",
"FOLLOWING",
"FOR",
"FOREIGN",
"FROM",
"FULL",
"GENERATED",
"GLOB",
"GROUP",
"GROUPS",
"HAVING",
"IF",
"IGNORE",
"IMMEDIATE",
"IN",
"INDEX",
"INDEXED",
"INITIALLY",
"INNER",
"INSERT",
"INSTEAD",
"INTERSECT",
"INTO",
"IS",
"ISNULL",
"JOIN",
"KEY",
"LAST",
"LEFT",
"LIKE",
"LIMIT",
"MATCH",
"MATERIALIZED",
"NATURAL",
"NO",
"NOT",
"NOTHING",
"NOTNULL",
"NULL",
"NULLS",
"OF",
"OFFSET",
"ON",
"OR",
"ORDER",
"OTHERS",
"OUTER",
"OVER",
"PARTITION",
"PLAN",
"PRAGMA",
"PRECEDING",
"PRIMARY",
"QUERY",
"RAISE",
"RANGE",
"RECURSIVE",
"REFERENCES",
"REGEXP",
"REINDEX",
"RELEASE",
"RENAME",
"REPLACE",
"RESTRICT",
"RETURNING",
"RIGHT",
"ROLLBACK",
"ROW",
"ROWS",
"SAVEPOINT",
"SELECT",
"SET",
"TABLE",
"TEMP",
"TEMPORARY",
"THEN",
"TIES",
"TO",
"TRANSACTION",
"TRIGGER",
"UNBOUNDED",
"UNION",
"UNIQUE",
"UPDATE",
"USING",
"VACUUM",
"VALUES",
"VIEW",
"VIRTUAL",
"WHEN",
"WHERE",
"WINDOW",
"WITH",
"WITHOUT",
]
P.word_list = tuple(word for word in P.word_list if word.upper() not in keywords)
del P
fake: Faker = Faker(locale="en_US").unique
Faker.seed(0)
class ColumnType(Enum):
blob = "blob"
integer = "integer"
real = "real"
text = "text"
def generate(self, faker: Faker) -> str:
match self.value:
case "blob":
blob = sqlite3.Binary(faker.binary(length=4)).hex()
return f"x'{blob}'"
case "integer":
return str(faker.pyint())
case "real":
return str(faker.pyfloat())
case "text":
return f"'{faker.text(max_nb_chars=20)}'"
def __str__(self) -> str:
return self.value.upper()
class Column(BaseModel):
name: str
col_type: ColumnType
primary_key: bool
def generate(faker: Faker) -> "Column":
name = faker.word().replace(" ", "_")
return Column(
name=name,
col_type=Faker().enum(ColumnType),
primary_key=False,
)
def __str__(self) -> str:
return f"{self.name} {str(self.col_type)}"
class Table(BaseModel):
columns: list[Column]
name: str
def create_table(self) -> str:
accum = f"CREATE TABLE {self.name} "
col_strings = [str(col) for col in self.columns]
pk_columns = [col.name for col in self.columns if col.primary_key]
primary_key_stmt = "PRIMARY KEY (" + ", ".join(pk_columns) + ")"
col_strings.append(primary_key_stmt)
accum = accum + "(" + ", ".join(col_strings) + ");"
return accum
def generate_insert(self) -> str:
vals = [col.col_type.generate(fake) for col in self.columns]
vals = ", ".join(vals)
return f"INSERT INTO {self.name} VALUES ({vals});"
class ConstraintTest(BaseModel):
table: Table
db_path: str = "testing/constraint.db"
insert_stmts: list[str]
insert_errors: list[str]
def run(
self,
limbo: TestLimboShell,
):
big_stmt = [self.table.create_table()]
for insert_stmt in self.insert_stmts:
big_stmt.append(insert_stmt)
limbo.run_test("Inserting values into table", "\n".join(big_stmt), "")
for insert_stmt in self.insert_errors:
limbo.run_test_fn(
insert_stmt,
lambda val: "Runtime error: UNIQUE constraint failed" in val,
)
limbo.run_test(
"Nothing was inserted after error",
f"SELECT count(*) from {self.table.name};",
str(len(self.insert_stmts)),
)
def validate_with_expected(result: str, expected: str):
return (expected in result, expected)
def generate_test(col_amount: int, primary_keys: int) -> ConstraintTest:
assert col_amount >= primary_keys, "Cannot have more primary keys than columns"
cols: list[Column] = []
for _ in range(col_amount):
cols.append(Column.generate(fake))
pk_cols = random.sample(
population=cols,
k=primary_keys,
)
for col in pk_cols:
for c in cols:
if col.name == c.name:
c.primary_key = True
table = Table(columns=cols, name=fake.word())
insert_stmts = [table.generate_insert() for _ in range(col_amount)]
return ConstraintTest(
table=table, insert_stmts=insert_stmts, insert_errors=insert_stmts
)
def custom_test_1() -> ConstraintTest:
cols = [
Column(name="id", col_type="integer", primary_key=True),
Column(name="username", col_type="text", primary_key=True),
]
table = Table(columns=cols, name="users")
insert_stmts = [
"INSERT INTO users VALUES (1, 'alice');",
"INSERT INTO users VALUES (2, 'bob');",
]
return ConstraintTest(
table=table, insert_stmts=insert_stmts, insert_errors=insert_stmts
)
def custom_test_2(limbo: TestLimboShell):
create = "CREATE TABLE users (id INT PRIMARY KEY, username TEXT);"
first_insert = "INSERT INTO users VALUES (1, 'alice');"
limbo.run_test("Create unique INT index", create + first_insert, "")
fail_insert = "INSERT INTO users VALUES (1, 'bob');"
limbo.run_test_fn(
fail_insert,
lambda val: "Runtime error: UNIQUE constraint failed" in val,
)
def all_tests() -> list[ConstraintTest]:
tests: list[ConstraintTest] = []
max_cols = 10
curr_fake = Faker()
for _ in range(25):
num_cols = curr_fake.pyint(1, max_cols)
test = generate_test(num_cols, curr_fake.pyint(1, num_cols))
tests.append(test)
tests.append(custom_test_1())
return tests
def cleanup(db_fullpath: str):
wal_path = f"{db_fullpath}-wal"
shm_path = f"{db_fullpath}-shm"
paths = [db_fullpath, wal_path, shm_path]
for path in paths:
if os.path.exists(path):
os.remove(path)
def main():
tests = all_tests()
for test in tests:
console.info(test.table)
db_path = test.db_path
try:
# Use with syntax to automatically close shell on error
with TestLimboShell("") as limbo:
limbo.execute_dot(f".open {db_path}")
test.run(limbo)
except Exception as e:
console.error(f"Test FAILED: {e}")
console.debug(test.table.create_table(), test.insert_stmts)
cleanup(db_path)
exit(1)
# delete db after every compat test so we we have fresh db for next test
cleanup(db_path)
db_path = "testing/constraint.db"
try:
with TestLimboShell("") as limbo:
limbo.execute_dot(f".open {db_path}")
custom_test_2(limbo)
except Exception as e:
console.error(f"Test FAILED: {e}")
cleanup(db_path)
exit(1)
cleanup(db_path)
console.info("All tests passed successfully.")
if __name__ == "__main__":
main()

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import os
from test_limbo_cli import TestLimboShell
from cli_tests.test_limbo_cli import TestLimboShell
from cli_tests import console
sqlite_exec = "./scripts/limbo-sqlite3"
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
@@ -81,7 +82,7 @@ def test_regexp():
lambda res: "Parse error: no such function" in res,
)
limbo.run_test_fn(f".load {extension_path}", null)
print(f"Extension {extension_path} loaded successfully.")
console.info(f"Extension {extension_path} loaded successfully.")
limbo.run_test_fn("SELECT regexp('a.c', 'abc');", true)
limbo.run_test_fn("SELECT regexp('a.c', 'ac');", false)
limbo.run_test_fn("SELECT regexp('[0-9]+', 'the year is 2021');", true)
@@ -339,16 +340,18 @@ def test_series():
def test_kv():
ext_path = "target/debug/liblimbo_ext_tests"
limbo = TestLimboShell()
# first, create a normal table to ensure no issues
limbo.execute_dot("CREATE TABLE other (a,b,c);")
limbo.execute_dot("INSERT INTO other values (23,32,23);")
limbo.run_test_fn(
"create virtual table t using kv_store;",
lambda res: "Parse error: no such module: kv_store" in res,
)
limbo.execute_dot(f".load {ext_path}")
limbo.run_test_fn(
limbo.execute_dot(
"create virtual table t using kv_store;",
null,
"can create kv_store vtable",
)
limbo.run_test_fn(".schema", lambda res: "CREATE VIRTUAL TABLE t" in res)
limbo.run_test_fn(
"insert into t values ('hello', 'world');",
null,
@@ -395,10 +398,35 @@ def test_kv():
limbo.run_test_fn(
"select count(*) from t;", lambda res: "100" == res, "can insert 100 rows"
)
limbo.run_test_fn("update t set value = 'updated' where key = 'key33';", null)
limbo.run_test_fn(
"select * from t where key = 'key33';",
lambda res: res == "key33|updated",
"can update single row",
)
limbo.run_test_fn(
"select COUNT(*) from t where value = 'updated';",
lambda res: res == "1",
"only updated a single row",
)
limbo.run_test_fn("update t set value = 'updated2';", null)
limbo.run_test_fn(
"select COUNT(*) from t where value = 'updated2';",
lambda res: res == "100",
"can update all rows",
)
limbo.run_test_fn("delete from t limit 96;", null, "can delete 96 rows")
limbo.run_test_fn(
"select count(*) from t;", lambda res: "4" == res, "four rows remain"
)
limbo.run_test_fn(
"update t set key = '100' where 1;", null, "where clause evaluates properly"
)
limbo.run_test_fn(
"select * from t where key = '100';",
lambda res: res == "100|updated2",
"there is only 1 key remaining after setting all keys to same value",
)
limbo.quit()
@@ -494,11 +522,33 @@ def test_vfs():
lambda res: res == "50",
"Tested large write to testfs",
)
print("Tested large write to testfs")
# open regular db file to ensure we don't segfault when vfs file is dropped
limbo.execute_dot(".open testing/vfs.db")
limbo.execute_dot("create table test (id integer primary key, value float);")
limbo.execute_dot("insert into test (value) values (1.0);")
console.info("Tested large write to testfs")
limbo.quit()
def test_drop_virtual_table():
ext_path = "target/debug/liblimbo_ext_tests"
limbo = TestLimboShell()
limbo.execute_dot(f".load {ext_path}")
limbo.debug_print(
"create virtual table t using kv_store;",
)
limbo.run_test_fn(".schema", lambda res: "CREATE VIRTUAL TABLE t" in res)
limbo.run_test_fn(
"insert into t values ('hello', 'world');",
null,
"can insert into kv_store vtable",
)
limbo.run_test_fn(
"DROP TABLE t;",
lambda res: "VDestroy called" in res,
"can drop kv_store vtable",
)
limbo.run_test_fn(
"DROP TABLE t;",
lambda res: "× Parse error: No such table: t" == res,
"should error when drop kv_store vtable",
)
limbo.quit()
@@ -538,20 +588,25 @@ def cleanup():
os.remove("testing/vfs.db-wal")
if __name__ == "__main__":
def main():
try:
test_regexp()
test_uuid()
test_aggregates()
test_crypto()
test_series()
test_kv()
test_ipaddr()
test_vfs()
test_sqlite_vfs_compat()
test_kv()
test_drop_virtual_table()
except Exception as e:
print(f"Test FAILED: {e}")
console.error(f"Test FAILED: {e}")
cleanup()
exit(1)
cleanup()
print("All tests passed successfully.")
console.info("All tests passed successfully.")
if __name__ == "__main__":
main()

110
testing/cli_tests/memory.py Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
import os
from cli_tests.test_limbo_cli import TestLimboShell
from cli_tests import console
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
def validate_with_expected(result: str, expected: str):
return (expected in result, expected)
def stub_memory_test(
limbo: TestLimboShell,
name: str,
blob_size: int = 1024**2,
vals: int = 100,
blobs: bool = True,
):
# zero_blob_size = 1024 **2
zero_blob = "0" * blob_size * 2
# vals = 100
big_stmt = ["CREATE TABLE temp (t1 BLOB, t2 INTEGER);"]
big_stmt = big_stmt + [
f"INSERT INTO temp (t1) VALUES (zeroblob({blob_size}));"
if i % 2 == 0 and blobs
else f"INSERT INTO temp (t2) VALUES ({i});"
for i in range(vals * 2)
]
expected = []
for i in range(vals * 2):
if i % 2 == 0 and blobs:
big_stmt.append(f"SELECT hex(t1) FROM temp LIMIT 1 OFFSET {i};")
expected.append(zero_blob)
else:
big_stmt.append(f"SELECT t2 FROM temp LIMIT 1 OFFSET {i};")
expected.append(f"{i}")
big_stmt.append("SELECT count(*) FROM temp;")
expected.append(str(vals * 2))
big_stmt = "".join(big_stmt)
expected = "\n".join(expected)
limbo.run_test_fn(big_stmt, lambda res: validate_with_expected(res, expected), name)
# TODO no delete tests for now because of limbo outputs some debug information on delete
def memory_tests() -> list[dict]:
tests = []
for vals in range(0, 1000, 100):
tests.append(
{
"name": f"small-insert-integer-vals-{vals}",
"vals": vals,
"blobs": False,
}
)
tests.append(
{
"name": f"small-insert-blob-interleaved-blob-size-{1024}",
"vals": 10,
"blob_size": 1024,
}
)
tests.append(
{
"name": f"big-insert-blob-interleaved-blob-size-{1024}",
"vals": 100,
"blob_size": 1024,
}
)
for blob_size in range(0, (1024 * 1024) + 1, 1024 * 4**4):
if blob_size == 0:
continue
tests.append(
{
"name": f"small-insert-blob-interleaved-blob-size-{blob_size}",
"vals": 10,
"blob_size": blob_size,
}
)
tests.append(
{
"name": f"big-insert-blob-interleaved-blob-size-{blob_size}",
"vals": 100,
"blob_size": blob_size,
}
)
return tests
def main():
tests = memory_tests()
# TODO see how to parallelize this loop with different subprocesses
for test in tests:
try:
with TestLimboShell("") as limbo:
stub_memory_test(limbo, **test)
except Exception as e:
console.error(f"Test FAILED: {e}")
exit(1)
console.info("All tests passed successfully.")
if __name__ == "__main__":
main()

View File

@@ -5,6 +5,7 @@ from time import sleep
import subprocess
from pathlib import Path
from typing import Callable, List, Optional
from cli_tests import console
PIPE_BUF = 4096
@@ -50,7 +51,8 @@ class LimboShell:
return ""
self._write_to_pipe(f"SELECT '{end_marker}';")
output = ""
while True:
done = False
while not done:
ready, _, errors = select.select(
[self.pipe.stdout, self.pipe.stderr],
[],
@@ -58,7 +60,7 @@ class LimboShell:
)
ready_or_errors = set(ready + errors)
if self.pipe.stderr in ready_or_errors:
self._handle_error()
done = self._handle_error()
if self.pipe.stdout in ready_or_errors:
fragment = self.pipe.stdout.read(PIPE_BUF).decode()
output += fragment
@@ -71,7 +73,7 @@ class LimboShell:
self.pipe.stdin.write((command + "\n").encode())
self.pipe.stdin.flush()
def _handle_error(self) -> None:
def _handle_error(self) -> bool:
while True:
ready, _, errors = select.select(
[self.pipe.stderr], [], [self.pipe.stderr], 0
@@ -79,7 +81,7 @@ class LimboShell:
if not (ready + errors):
break
error_output = self.pipe.stderr.read(PIPE_BUF).decode()
print(error_output, end="")
console.error(error_output, end="", _stack_offset=2)
raise RuntimeError("Error encountered in Limbo shell.")
@staticmethod
@@ -111,7 +113,6 @@ class TestLimboShell:
if init_commands is None:
# Default initialization
init_commands = """
.open :memory:
CREATE TABLE users (id INTEGER PRIMARY KEY, first_name TEXT, last_name TEXT, age INTEGER);
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
INSERT INTO users VALUES (1, 'Alice', 'Smith', 30), (2, 'Bob', 'Johnson', 25),
@@ -131,7 +132,7 @@ INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3)
self.shell.quit()
def run_test(self, name: str, sql: str, expected: str) -> None:
print(f"Running test: {name}")
console.test(f"Running test: {name}", _stack_offset=2)
actual = self.shell.execute(sql)
assert actual == expected, (
f"Test failed: {name}\n"
@@ -141,17 +142,26 @@ INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3)
)
def debug_print(self, sql: str):
print(f"debugging: {sql}")
console.debug(f"debugging: {sql}", _stack_offset=2)
actual = self.shell.execute(sql)
print(f"OUTPUT:\n{repr(actual)}")
console.debug(f"OUTPUT:\n{repr(actual)}", _stack_offset=2)
def run_test_fn(
self, sql: str, validate: Callable[[str], bool], desc: str = ""
) -> None:
actual = self.shell.execute(sql)
# Print the test that is executing before executing the sql command
# Printing later confuses the user of the code what test has actually failed
if desc:
print(f"Testing: {desc}")
console.test(f"Testing: {desc}", _stack_offset=2)
actual = self.shell.execute(sql)
assert validate(actual), f"Test failed\nSQL: {sql}\nActual:\n{repr(actual)}"
def execute_dot(self, dot_command: str) -> None:
self.shell._write_to_pipe(dot_command)
# Enables the use of `with` syntax
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.quit()

138
testing/cli_tests/update.py Normal file
View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
import os
from cli_tests.test_limbo_cli import TestLimboShell
from pydantic import BaseModel
from cli_tests import console
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
class UpdateTest(BaseModel):
name: str
db_schema: str = "CREATE TABLE test (key INTEGER, t1 BLOB, t2 INTEGER, t3 TEXT);"
blob_size: int = 1024
vals: int = 1000
updates: int = 1
db_path: str = "testing/update.db"
def init_db(self):
with TestLimboShell(
init_commands="",
exec_name="sqlite3",
flags=f"{self.db_path}",
) as sqlite:
sqlite.execute_dot(f".open {self.db_path}")
zero_blob = "0" * self.blob_size * 2
t2_val = "1"
t3_val = "2"
stmt = [self.db_schema]
stmt = stmt + [
f"INSERT INTO test (key, t1, t2, t3) VALUES ({i} ,zeroblob({self.blob_size}), {t2_val}, {t3_val});"
for i in range(self.vals)
]
stmt.append("SELECT count(*) FROM test;")
sqlite.run_test(
"Init Update Db in Sqlite",
"".join(stmt),
f"{self.vals}",
)
stmt = [
f"SELECT hex(t1), t2, t3 FROM test LIMIT 1 OFFSET {i};"
for i in range(self.vals)
]
expected = [f"{zero_blob}|{t2_val}|{t3_val}" for _ in range(self.vals)]
sqlite.run_test(
"Check Values correctly inserted in Sqlite",
"".join(stmt),
"\n".join(expected),
)
def run(self, limbo: TestLimboShell):
limbo.execute_dot(f".open {self.db_path}")
# TODO blobs are hard. Forget about blob updates for now
# one_blob = ("0" * ((self.blob_size * 2) - 1)) + "1"
# TODO For now update just on one row. To expand the tests in the future
# use self.updates and do more than 1 update
t2_update_val = "123"
stmt = f"UPDATE test SET t2 = {t2_update_val} WHERE key = {0};"
limbo.run_test(self.name, stmt, "")
def test_compat(self):
console.info("Testing in SQLite\n")
with TestLimboShell(
init_commands="",
exec_name="sqlite3",
flags=f"{self.db_path}",
) as sqlite:
sqlite.execute_dot(f".open {self.db_path}")
zero_blob = "0" * self.blob_size * 2
t2_val = "1"
t2_update_val = "123"
t3_val = "2"
stmt = []
stmt.append("SELECT count(*) FROM test;")
sqlite.run_test(
"Check all rows present in Sqlite",
"".join(stmt),
f"{self.vals}",
)
stmt = [
f"SELECT hex(t1), t2, t3 FROM test LIMIT 1 OFFSET {i};"
for i in range(self.vals)
]
expected = [
f"{zero_blob}|{t2_val}|{t3_val}"
if i != 0
else f"{zero_blob}|{t2_update_val}|{t3_val}"
for i in range(self.vals)
]
sqlite.run_test(
"Check Values correctly updated in Sqlite",
"".join(stmt),
"\n".join(expected),
)
console.info()
def cleanup(db_fullpath: str):
wal_path = f"{db_fullpath}-wal"
shm_path = f"{db_fullpath}-shm"
paths = [db_fullpath, wal_path, shm_path]
for path in paths:
if os.path.exists(path):
os.remove(path)
def main():
test = UpdateTest(name="Update 1 column", vals=1)
console.info(test)
db_path = test.db_path
try:
test.init_db()
# Use with syntax to automatically close shell on error
with TestLimboShell("") as limbo:
test.run(limbo)
test.test_compat()
except Exception as e:
console.error(f"Test FAILED: {e}")
cleanup(db_path)
exit(1)
# delete db after every compat test so we we have fresh db for next test
cleanup(db_path)
console.info("All tests passed successfully.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
# vfs benchmarking/comparison
import os
from pathlib import Path
import subprocess
import statistics
import argparse
from time import perf_counter, sleep
from typing import Dict
from cli_tests.test_limbo_cli import TestLimboShell
from cli_tests.console import info, error, test
LIMBO_BIN = Path("./target/release/limbo")
DB_FILE = Path("testing/temp.db")
vfs_list = ["syscall", "io_uring"]
def append_time(times, start, perf_counter):
times.append(perf_counter() - start)
return True
def bench_one(vfs: str, sql: str, iterations: int) -> list[float]:
"""
Launch a single Limbo process with the requested VFS, run `sql`
`iterations` times, return a list of elapsed wallclock times.
"""
shell = TestLimboShell(
exec_name=str(LIMBO_BIN),
flags=f"-q -m list --vfs {vfs} {DB_FILE}",
init_commands="",
)
times: list[float] = []
for i in range(1, iterations + 1):
start = perf_counter()
_ = shell.run_test_fn(
sql, lambda x: x is not None and append_time(times, start, perf_counter)
)
test(f" {vfs} | run {i:>3}: {times[-1]:.6f}s")
shell.quit()
return times
def setup_temp_db() -> None:
cmd = ["sqlite3", "testing/testing.db", ".clone testing/temp.db"]
proc = subprocess.run(cmd, check=True)
proc.check_returncode()
sleep(0.3) # make sure it's finished
def cleanup_temp_db() -> None:
if DB_FILE.exists():
DB_FILE.unlink()
os.remove("testing/temp.db-wal")
def main() -> None:
parser = argparse.ArgumentParser(
description="Benchmark a SQL statement against all Limbo VFS backends."
)
parser.add_argument("sql", help="SQL statement to execute (quote it)")
parser.add_argument("iterations", type=int, help="number of repetitions")
args = parser.parse_args()
setup_temp_db()
sql, iterations = args.sql, args.iterations
if iterations <= 0:
error("iterations must be a positive integer")
parser.error("Invalid Arguments")
info(f"SQL : {sql}")
info(f"Iterations : {iterations}")
info(f"Database : {DB_FILE.resolve()}")
info("-" * 60)
averages: Dict[str, float] = {}
for vfs in vfs_list:
test(f"\n### VFS: {vfs} ###")
times = bench_one(vfs, sql, iterations)
info(f"All times ({vfs}):", " ".join(f"{t:.6f}" for t in times))
avg = statistics.mean(times)
averages[vfs] = avg
info("\n" + "-" * 60)
info("Average runtime per VFS")
info("-" * 60)
for vfs in vfs_list:
info(f"vfs: {vfs} : {averages[vfs]:.6f} s")
info("-" * 60)
baseline = "syscall"
baseline_avg = averages[baseline]
name_pad = max(len(v) for v in vfs_list)
for vfs in vfs_list:
avg = averages[vfs]
if vfs == baseline:
info(f"{vfs:<{name_pad}} : {avg:.6f} (baseline)")
else:
pct = (avg - baseline_avg) / baseline_avg * 100.0
faster_slower = "slower" if pct > 0 else "faster"
info(
f"{vfs:<{name_pad}} : {avg:.6f} ({abs(pct):.1f}% {faster_slower} than {baseline})"
)
info("-" * 60)
cleanup_temp_db()
if __name__ == "__main__":
main()

157
testing/cli_tests/write.py Executable file
View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
import os
from cli_tests.test_limbo_cli import TestLimboShell
from pydantic import BaseModel
from cli_tests import console
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
class InsertTest(BaseModel):
name: str
db_schema: str = "CREATE TABLE test (t1 BLOB, t2 INTEGER);"
blob_size: int = 1024**2
vals: int = 100
has_blob: bool = True
db_path: str = "testing/writes.db"
def run(self, limbo: TestLimboShell):
zero_blob = "0" * self.blob_size * 2
big_stmt = [self.db_schema]
big_stmt = big_stmt + [
f"INSERT INTO test (t1) VALUES (zeroblob({self.blob_size}));"
if i % 2 == 0 and self.has_blob
else f"INSERT INTO test (t2) VALUES ({i});"
for i in range(self.vals * 2)
]
expected = []
for i in range(self.vals * 2):
if i % 2 == 0 and self.has_blob:
big_stmt.append(f"SELECT hex(t1) FROM test LIMIT 1 OFFSET {i};")
expected.append(zero_blob)
else:
big_stmt.append(f"SELECT t2 FROM test LIMIT 1 OFFSET {i};")
expected.append(f"{i}")
big_stmt.append("SELECT count(*) FROM test;")
expected.append(str(self.vals * 2))
big_stmt = "".join(big_stmt)
expected = "\n".join(expected)
limbo.run_test_fn(
big_stmt, lambda res: validate_with_expected(res, expected), self.name
)
def test_compat(self):
console.info("Testing in SQLite\n")
with TestLimboShell(
init_commands="",
exec_name="sqlite3",
flags=f"{self.db_path}",
) as sqlite:
sqlite.run_test_fn(
".show",
lambda res: f"filename: {self.db_path}" in res,
"Opened db file created with Limbo in sqlite3",
)
sqlite.run_test_fn(
".schema",
lambda res: self.db_schema in res,
"Tables created by previous Limbo test exist in db file",
)
sqlite.run_test_fn(
"SELECT count(*) FROM test;",
lambda res: res == str(self.vals * 2),
"Counting total rows inserted",
)
console.info()
def validate_with_expected(result: str, expected: str):
return (expected in result, expected)
# TODO no delete tests for now
def blob_tests() -> list[InsertTest]:
tests: list[InsertTest] = []
for vals in range(0, 1000, 100):
tests.append(
InsertTest(
name=f"small-insert-integer-vals-{vals}",
vals=vals,
has_blob=False,
)
)
tests.append(
InsertTest(
name=f"small-insert-blob-interleaved-blob-size-{1024}",
vals=10,
blob_size=1024,
)
)
tests.append(
InsertTest(
name=f"big-insert-blob-interleaved-blob-size-{1024}",
vals=100,
blob_size=1024,
)
)
for blob_size in range(0, (1024 * 1024) + 1, 1024 * 4**4):
if blob_size == 0:
continue
tests.append(
InsertTest(
name=f"small-insert-blob-interleaved-blob-size-{blob_size}",
vals=10,
blob_size=blob_size,
)
)
tests.append(
InsertTest(
name=f"big-insert-blob-interleaved-blob-size-{blob_size}",
vals=100,
blob_size=blob_size,
)
)
return tests
def cleanup(db_fullpath: str):
wal_path = f"{db_fullpath}-wal"
shm_path = f"{db_fullpath}-shm"
paths = [db_fullpath, wal_path, shm_path]
for path in paths:
if os.path.exists(path):
os.remove(path)
def main():
tests = blob_tests()
for test in tests:
console.info(test)
db_path = test.db_path
try:
# Use with syntax to automatically close shell on error
with TestLimboShell("") as limbo:
limbo.execute_dot(f".open {db_path}")
test.run(limbo)
test.test_compat()
except Exception as e:
console.error(f"Test FAILED: {e}")
cleanup(db_path)
exit(1)
# delete db after every compat test so we we have fresh db for next test
cleanup(db_path)
console.info("All tests passed successfully.")
if __name__ == "__main__":
main()