Cleaner and less error prone Write Tests

This commit is contained in:
pedrocarlo
2025-04-03 01:48:33 -03:00
parent 58e091cb23
commit 0c137d6dff
3 changed files with 110 additions and 154 deletions

View File

@@ -1,146 +1,147 @@
#!/usr/bin/env python3
import os
from cli_tests.test_limbo_cli import TestLimboShell
from pydantic import BaseModel
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
class InsertTest(BaseModel):
name: str
db_schema: str = "CREATE TABLE test (t1 BLOB, t2 INTEGER);"
blob_size: int = 1024**2
vals: int = 100
has_blob: bool = True
db_path: str = "testing/writes.db"
def run(self, limbo: TestLimboShell):
zero_blob = "0" * self.blob_size * 2
big_stmt = [self.db_schema]
big_stmt = big_stmt + [
f"INSERT INTO test (t1) VALUES (zeroblob({self.blob_size}));"
if i % 2 == 0 and self.has_blob
else f"INSERT INTO test (t2) VALUES ({i});"
for i in range(self.vals * 2)
]
expected = []
for i in range(self.vals * 2):
if i % 2 == 0 and self.has_blob:
big_stmt.append(f"SELECT hex(t1) FROM test LIMIT 1 OFFSET {i};")
expected.append(zero_blob)
else:
big_stmt.append(f"SELECT t2 FROM test LIMIT 1 OFFSET {i};")
expected.append(f"{i}")
big_stmt.append("SELECT count(*) FROM test;")
expected.append(str(self.vals * 2))
big_stmt = "".join(big_stmt)
expected = "\n".join(expected)
limbo.run_test_fn(
big_stmt, lambda res: validate_with_expected(res, expected), self.name
)
def test_compat(self):
print("Testing in SQLite\n")
with TestLimboShell(
init_commands="",
exec_name="sqlite3",
flags=f"{self.db_path}",
) as sqlite:
sqlite.run_test_fn(
".show",
lambda res: f"filename: {self.db_path}" in res,
"Opened db file created with Limbo in sqlite3",
)
sqlite.run_test_fn(
".schema",
lambda res: self.db_schema in res,
"Tables created by previous Limbo test exist in db file",
)
# TODO Have some pydantic object be passed to this function with common fields
# To extract the information necessary to query the db in sqlite
# The object should contain Schema information and queries that should be run to
# test in sqlite for compatibility sakes
print()
pass
def validate_with_expected(result: str, expected: str):
return (expected in result, expected)
def stub_write_blob_test(
limbo: TestLimboShell,
name: str,
blob_size: int = 1024**2,
vals: int = 100,
blobs: bool = True,
schema: str = "CREATE TABLE test (t1 BLOB, t2 INTEGER);",
):
zero_blob = "0" * blob_size * 2
big_stmt = [schema]
big_stmt = big_stmt + [
f"INSERT INTO test (t1) VALUES (zeroblob({blob_size}));"
if i % 2 == 0 and blobs
else f"INSERT INTO test (t2) VALUES ({i});"
for i in range(vals * 2)
]
expected = []
for i in range(vals * 2):
if i % 2 == 0 and blobs:
big_stmt.append(f"SELECT hex(t1) FROM test LIMIT 1 OFFSET {i};")
expected.append(zero_blob)
else:
big_stmt.append(f"SELECT t2 FROM test LIMIT 1 OFFSET {i};")
expected.append(f"{i}")
big_stmt.append("SELECT count(*) FROM test;")
expected.append(str(vals * 2))
big_stmt = "".join(big_stmt)
expected = "\n".join(expected)
limbo.run_test_fn(big_stmt, lambda res: validate_with_expected(res, expected), name)
# TODO no delete tests for now
def blob_tests() -> list[dict]:
def blob_tests() -> list[InsertTest]:
tests: list[dict] = []
for vals in range(0, 1000, 100):
tests.append(
{
"name": f"small-insert-integer-vals-{vals}",
"vals": vals,
"blobs": False,
}
InsertTest(
name=f"small-insert-integer-vals-{vals}",
vals=vals,
has_blob=False,
)
)
tests.append(
{
"name": f"small-insert-blob-interleaved-blob-size-{1024}",
"vals": 10,
"blob_size": 1024,
}
InsertTest(
name=f"small-insert-blob-interleaved-blob-size-{1024}",
vals=10,
blob_size=1024,
)
)
tests.append(
{
"name": f"big-insert-blob-interleaved-blob-size-{1024}",
"vals": 100,
"blob_size": 1024,
}
InsertTest(
name=f"big-insert-blob-interleaved-blob-size-{1024}",
vals=100,
blob_size=1024,
)
)
for blob_size in range(0, (1024 * 1024) + 1, 1024 * 4**4):
if blob_size == 0:
continue
tests.append(
{
"name": f"small-insert-blob-interleaved-blob-size-{blob_size}",
"vals": 10,
"blob_size": blob_size,
}
InsertTest(
name=f"small-insert-blob-interleaved-blob-size-{blob_size}",
vals=10,
blob_size=blob_size,
)
)
tests.append(
{
"name": f"big-insert-blob-interleaved-blob-size-{blob_size}",
"vals": 100,
"blob_size": blob_size,
}
InsertTest(
name=f"big-insert-blob-interleaved-blob-size-{blob_size}",
vals=100,
blob_size=blob_size,
)
)
return tests
def test_sqlite_compat(db_fullpath: str, schema: str):
sqlite = TestLimboShell(
with TestLimboShell(
init_commands="",
exec_name="sqlite3",
flags=f"{db_fullpath}",
)
sqlite.run_test_fn(
".show",
lambda res: f"filename: {db_fullpath}" in res,
"Opened db file created with Limbo in sqlite3",
)
sqlite.run_test_fn(
".schema",
lambda res: schema in res,
"Tables created by previous Limbo test exist in db file",
)
# TODO when we can import external dependencies
# Have some pydantic object be passed to this function with common fields
) as sqlite:
sqlite.run_test_fn(
".show",
lambda res: f"filename: {db_fullpath}" in res,
"Opened db file created with Limbo in sqlite3",
)
sqlite.run_test_fn(
".schema",
lambda res: schema in res,
"Tables created by previous Limbo test exist in db file",
)
# TODO Have some pydantic object be passed to this function with common fields
# To extract the information necessary to query the db in sqlite
# The object should contain Schema information and queries that should be run to
# test in sqlite for compatibility sakes
# sqlite.run_test_fn(
# "SELECT count(*) FROM test;",
# lambda res: res == "50",
# "Tested large write to testfs",
# )
# sqlite.run_test_fn(
# "SELECT count(*) FROM vfs;",
# lambda res: res == "50",
# "Tested large write to testfs",
# )
sqlite.quit()
def touch_db_file(db_fullpath: str):
os.O_RDWR
descriptor = os.open(
path=db_fullpath,
flags=(
os.O_RDWR # access mode: read and write
| os.O_CREAT # create if not exists
| os.O_TRUNC # truncate the file to zero
),
mode=0o777,
)
f = open(descriptor)
f.close()
def cleanup(db_fullpath: str):
wal_path = f"{db_fullpath}-wal"
@@ -153,18 +154,15 @@ def cleanup(db_fullpath: str):
def main():
tests = blob_tests()
db_path = "testing/writes.db"
schema = "CREATE TABLE test (t1 BLOB, t2 INTEGER);"
# TODO see how to parallelize this loop with different subprocesses
for test in tests:
db_path = test.db_path
try:
# Use with syntax to automatically close shell on error
with TestLimboShell() as limbo:
limbo.execute_dot(f".open {db_path}")
stub_write_blob_test(limbo, **test)
print("Testing in SQLite\n")
test_sqlite_compat(db_path, schema)
print()
test.run(limbo)
test.test_compat()
except Exception as e:
print(f"Test FAILED: {e}")

View File

@@ -4,7 +4,10 @@ name = "limbo_test"
readme = "README.md"
requires-python = ">=3.13"
version = "0.1.0"
dependencies = ["faker>=37.1.0", "pydantic>=2.11.1", "rich>=14.0.0"]
dependencies = [
"faker>=37.1.0",
"pydantic>=2.11.1",
]
[project.scripts]
test-writes = "cli_tests.writes:main"