Merge 'Clean up extension python tests to use convenience CLI-test class' from Preston Thorpe

A nice python class was added in #941 for easily testing limbo's CLI
output. As the tests around extensions continue to grow, we should use
this as much as possible for non-Tcl/compatibility tests so they can
scale more easily without getting messy. I am going to add a whole bunch
of tests that will explicitly test each kind of extension both when
loaded staticly/dynamically and combined with other export types and I
want to get the existing ones migrated over first.

Closes #1048
This commit is contained in:
Pekka Enberg
2025-02-26 13:50:31 +02:00
3 changed files with 142 additions and 288 deletions

View File

@@ -67,7 +67,7 @@ test: limbo test-compat test-vector test-sqlite3 test-shell test-extensions
test-extensions: limbo
cargo build --package limbo_regexp
./testing/extensions.py
./testing/cli_tests/extensions.py
.PHONY: test-extensions
test-shell: limbo

View File

@@ -1,8 +1,6 @@
#!/usr/bin/env python3
import os
import subprocess
import select
import time
from test_limbo_cli import TestLimboShell
sqlite_exec = "./target/debug/limbo"
sqlite_flags = os.getenv("SQLITE_FLAGS", "-q").split(" ")
@@ -26,184 +24,85 @@ INSERT INTO test values (70, 25);
"""
def init_limbo():
pipe = subprocess.Popen(
[sqlite_exec, *sqlite_flags],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
write_to_pipe(pipe, test_data)
return pipe
def validate_string_uuid(res):
return len(res) == 36
def execute_sql(pipe, sql):
end_suffix = "END_OF_RESULT"
write_to_pipe(pipe, sql)
write_to_pipe(pipe, f"SELECT '{end_suffix}';\n")
stdout = pipe.stdout
stderr = pipe.stderr
output = ""
while True:
ready_to_read, _, error_in_pipe = select.select(
[stdout, stderr], [], [stdout, stderr]
)
ready_to_read_or_err = set(ready_to_read + error_in_pipe)
if stderr in ready_to_read_or_err:
exit_on_error(stderr)
if stdout in ready_to_read_or_err:
fragment = stdout.read(select.PIPE_BUF)
output += fragment.decode()
if output.rstrip().endswith(end_suffix):
output = output.rstrip().removesuffix(end_suffix)
break
output = strip_each_line(output)
return output
def strip_each_line(lines: str) -> str:
split = lines.split("\n")
res = [line.strip() for line in split if line != ""]
return "\n".join(res)
def write_to_pipe(pipe, command):
if pipe.stdin is None:
raise RuntimeError("Failed to write to shell")
pipe.stdin.write((command + "\n").encode())
pipe.stdin.flush()
def exit_on_error(stderr):
while True:
ready_to_read, _, _ = select.select([stderr], [], [])
if not ready_to_read:
break
print(stderr.read().decode(), end="")
exit(1)
def run_test(pipe, sql, validator=None, name=None):
print(f"Running test {name}: {sql}")
result = execute_sql(pipe, sql)
if validator is not None:
if not validator(result):
print(f"Test FAILED: {sql}")
print(f"Returned: {result}")
raise Exception("Validation failed")
print("Test PASSED")
def validate_true(result):
return result == "1"
def validate_false(result):
return result == "0"
def validate_blob(result):
# HACK: blobs are difficult to test because the shell
# tries to return them as utf8 strings, so we call hex
# and assert they are valid hex digits
return int(result, 16) is not None
def validate_string_uuid(result):
return len(result) == 36 and result.count("-") == 4
def returns_error_no_func(result):
return "error: no such function: " in result
def returns_vtable_parse_err(result):
return "Parse error: Virtual table" in result
def returns_null(result):
return result == "" or result == "\n"
def assert_now_unixtime(result):
return result == str(int(time.time()))
def assert_specific_time(result):
return result == "1736720789"
def test_uuid(pipe):
def test_uuid():
limbo = TestLimboShell()
specific_time = "01945ca0-3189-76c0-9a8f-caf310fc8b8e"
# these are built into the binary, so we just test they work
run_test(
pipe,
limbo.run_test_fn(
"SELECT hex(uuid4());",
validate_blob,
lambda res: int(res, 16) is not None,
"uuid functions are registered properly with ext loaded",
)
run_test(pipe, "SELECT uuid4_str();", validate_string_uuid)
run_test(pipe, "SELECT hex(uuid7());", validate_blob)
run_test(
pipe,
"SELECT uuid7_timestamp_ms(uuid7()) / 1000;",
limbo.run_test_fn("SELECT uuid4_str();", lambda res: len(res) == 36)
limbo.run_test_fn("SELECT hex(uuid7());", lambda res: int(res, 16) is not None)
limbo.run_test_fn(
"SELECT uuid7_timestamp_ms(uuid7()) / 1000;", lambda res: res.isdigit()
)
run_test(pipe, "SELECT uuid7_str();", validate_string_uuid)
run_test(pipe, "SELECT uuid_str(uuid7());", validate_string_uuid)
run_test(pipe, "SELECT hex(uuid_blob(uuid7_str()));", validate_blob)
run_test(pipe, "SELECT uuid_str(uuid_blob(uuid7_str()));", validate_string_uuid)
run_test(
pipe,
limbo.run_test_fn("SELECT uuid7_str();", validate_string_uuid)
limbo.run_test_fn("SELECT uuid_str(uuid7());", validate_string_uuid)
limbo.run_test_fn(
"SELECT hex(uuid_blob(uuid7_str()));", lambda res: int(res, 16) is not None
)
limbo.run_test_fn("SELECT uuid_str(uuid_blob(uuid7_str()));", validate_string_uuid)
limbo.run_test_fn(
f"SELECT uuid7_timestamp_ms('{specific_time}') / 1000;",
assert_specific_time,
lambda res: res == "1736720789",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT gen_random_uuid();",
validate_string_uuid,
"scalar alias's are registered properly",
)
def test_regexp(pipe):
extension_path = "./target/debug/liblimbo_regexp"
def true(res):
return res == "1"
def false(res):
return res == "0"
def null(res):
return res == ""
def test_regexp():
limbo = TestLimboShell(test_data)
extension_path = "./target/debug/liblimbo_regexp"
# before extension loads, assert no function
run_test(pipe, "SELECT regexp('a.c', 'abc');", returns_error_no_func)
run_test(pipe, f".load {extension_path}", returns_null)
limbo.run_test_fn(
"SELECT regexp('a.c', 'abc');",
lambda res: "Parse error: no such function" in res,
)
limbo.run_test_fn(f".load {extension_path}", null)
print(f"Extension {extension_path} loaded successfully.")
run_test(pipe, "SELECT regexp('a.c', 'abc');", validate_true)
run_test(pipe, "SELECT regexp('a.c', 'ac');", validate_false)
run_test(pipe, "SELECT regexp('[0-9]+', 'the year is 2021');", validate_true)
run_test(pipe, "SELECT regexp('[0-9]+', 'the year is unknow');", validate_false)
run_test(pipe, "SELECT regexp_like('the year is 2021', '[0-9]+');", validate_true)
run_test(
pipe, "SELECT regexp_like('the year is unknow', '[0-9]+');", validate_false
)
run_test(
pipe,
limbo.run_test_fn("SELECT regexp('a.c', 'abc');", true)
limbo.run_test_fn("SELECT regexp('a.c', 'ac');", false)
limbo.run_test_fn("SELECT regexp('[0-9]+', 'the year is 2021');", true)
limbo.run_test_fn("SELECT regexp('[0-9]+', 'the year is unknow');", false)
limbo.run_test_fn("SELECT regexp_like('the year is 2021', '[0-9]+');", true)
limbo.run_test_fn("SELECT regexp_like('the year is unknow', '[0-9]+');", false)
limbo.run_test_fn(
"SELECT regexp_substr('the year is 2021', '[0-9]+') = '2021';",
validate_true,
true,
)
run_test(
pipe, "SELECT regexp_substr('the year is unknow', '[0-9]+');", returns_null
)
run_test(
pipe,
limbo.run_test_fn("SELECT regexp_substr('the year is unknow', '[0-9]+');", null)
limbo.run_test_fn(
"select regexp_replace('the year is 2021', '[0-9]+', '2050') = 'the year is 2050';",
validate_true,
true,
)
run_test(
pipe,
limbo.run_test_fn(
"select regexp_replace('the year is 2021', '2k21', '2050') = 'the year is 2021';",
validate_true,
true,
)
run_test(
pipe,
limbo.run_test_fn(
"select regexp_replace('the year is 2021', '([0-9]+)', '$1 or 2050') = 'the year is 2021 or 2050';",
validate_true,
true,
)
@@ -227,51 +126,42 @@ def validate_percentile_disc(res):
return res == "40.0"
def test_aggregates(pipe):
def test_aggregates():
limbo = TestLimboShell(init_commands=test_data)
extension_path = "./target/debug/liblimbo_percentile"
# assert no function before extension loads
run_test(
pipe,
limbo.run_test_fn(
"SELECT median(1);",
returns_error_no_func,
lambda res: "error: no such function: " in res,
"median agg function returns null when ext not loaded",
)
run_test(
pipe,
f".load {extension_path}",
returns_null,
"load extension command works properly",
)
run_test(
pipe,
limbo.execute_dot(f".load {extension_path}")
limbo.run_test_fn(
"select median(value) from numbers;",
validate_median,
"median agg function works",
)
write_to_pipe(pipe, "INSERT INTO numbers (value) VALUES (8.0);\n")
run_test(
pipe,
limbo.execute_dot("INSERT INTO numbers (value) VALUES (8.0);\n")
limbo.run_test_fn(
"select median(value) from numbers;",
validate_median_odd,
"median agg function works with odd number of elements",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT percentile(value, percent) from test;",
validate_percentile1,
"test aggregate percentile function with 2 arguments works",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT percentile(value, 55) from test;",
validate_percentile2,
"test aggregate percentile function with 1 argument works",
)
run_test(
pipe, "SELECT percentile_cont(value, 0.25) from test;", validate_percentile1
limbo.run_test_fn(
"SELECT percentile_cont(value, 0.25) from test;", validate_percentile1
)
run_test(
pipe, "SELECT percentile_disc(value, 0.55) from test;", validate_percentile_disc
limbo.run_test_fn(
"SELECT percentile_disc(value, 0.55) from test;", validate_percentile_disc
)
@@ -316,57 +206,46 @@ def validate_base64_decode(a):
return a == "hello"
def test_crypto(pipe):
def test_crypto():
limbo = TestLimboShell()
extension_path = "./target/debug/liblimbo_crypto"
# assert no function before extension loads
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_blake('a');",
lambda res: "Parse error" in res,
"crypto_blake3 returns null when ext not loaded",
)
run_test(
pipe,
f".load {extension_path}",
returns_null,
"load extension command works properly",
)
limbo.execute_dot(f".load {extension_path}")
# Hashing and Decode
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode(crypto_blake3('abc'), 'hex');",
lambda res: res
== "6437b3ac38465133ffb63b75273a8db548c558465d79db03fd359c6cd5bd9d85",
"blake3 should encrypt correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode(crypto_md5('abc'), 'hex');",
lambda res: res == "900150983cd24fb0d6963f7d28e17f72",
"md5 should encrypt correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode(crypto_sha1('abc'), 'hex');",
lambda res: res == "a9993e364706816aba3e25717850c26c9cd0d89d",
"sha1 should encrypt correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode(crypto_sha256('abc'), 'hex');",
lambda a: a
== "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad",
"sha256 should encrypt correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode(crypto_sha384('abc'), 'hex');",
lambda a: a
== "cb00753f45a35e8bb5a03d699ac65007272c32ab0eded1631a8b605a43ff5bed8086072ba1e7cc2358baeca134c825a7",
"sha384 should encrypt correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode(crypto_sha512('abc'), 'hex');",
lambda a: a
== "ddaf35a193617abacc417349ae20413112e6fa4e89a97ea20a9eeee64b55d39a2192992a274fc1a836ba3c23a3feebbd454d4423643ce80e2a9ac94fa54ca49f",
@@ -374,191 +253,158 @@ def test_crypto(pipe):
)
# Encoding and Decoding
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode('hello', 'base32');",
validate_base32_encode,
"base32 should encode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_decode('NBSWY3DP', 'base32');",
validate_base32_decode,
"base32 should decode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode('hello', 'base64');",
validate_base64_encode,
"base64 should encode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_decode('aGVsbG8=', 'base64');",
validate_base64_decode,
"base64 should decode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode('hello', 'base85');",
validate_base85_encode,
"base85 should encode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_decode('BOu!rDZ', 'base85');",
validate_base85_decode,
"base85 should decode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode('hello', 'hex');",
validate_hex_encode,
"hex should encode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_decode('68656c6c6f', 'hex');",
validate_hex_decode,
"hex should decode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_encode('/hello?text=(ಠ_ಠ)', 'url');",
validate_url_encode,
"url should encode correctly",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT crypto_decode('%2Fhello%3Ftext%3D%28%E0%B2%A0_%E0%B2%A0%29', 'url');",
validate_url_decode,
"url should decode correctly",
)
def test_series(pipe):
def test_series():
limbo = TestLimboShell()
ext_path = "./target/debug/liblimbo_series"
run_test(
pipe,
limbo.run_test_fn(
"SELECT * FROM generate_series(1, 10);",
lambda res: "Virtual table module not found: generate_series" in res,
)
run_test(pipe, f".load {ext_path}", returns_null)
run_test(
pipe,
limbo.execute_dot(f".load {ext_path}")
limbo.run_test_fn(
"SELECT * FROM generate_series(1, 10);",
lambda res: res == "1\n2\n3\n4\n5\n6\n7\n8\n9\n10",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT * FROM generate_series(1, 10, 2);",
lambda res: res == "1\n3\n5\n7\n9",
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT * FROM generate_series(1, 10, 2, 3);",
lambda res: "Invalid Argument" in res,
)
run_test(
pipe,
limbo.run_test_fn(
"SELECT * FROM generate_series(10, 1, -2);",
lambda res: res == "10\n8\n6\n4\n2",
)
def test_kv(pipe):
def test_kv():
ext_path = "./target/debug/liblimbo_kv"
run_test(
pipe,
limbo = TestLimboShell()
limbo.run_test_fn(
"create virtual table t using kv_store;",
lambda res: "Virtual table module not found: kv_store" in res,
)
run_test(pipe, f".load {ext_path}", returns_null)
run_test(
pipe,
limbo.execute_dot(f".load {ext_path}")
limbo.run_test_fn(
"create virtual table t using kv_store;",
returns_null,
null,
"can create kv_store vtable",
)
run_test(
pipe,
limbo.run_test_fn(
"insert into t values ('hello', 'world');",
returns_null,
null,
"can insert into kv_store vtable",
)
run_test(
pipe,
limbo.run_test_fn(
"select value from t where key = 'hello';",
lambda res: "world" == res,
"can select from kv_store",
)
run_test(
pipe,
limbo.run_test_fn(
"delete from t where key = 'hello';",
returns_null,
null,
"can delete from kv_store",
)
run_test(pipe, "insert into t values ('other', 'value');", returns_null)
run_test(
pipe,
limbo.run_test_fn("insert into t values ('other', 'value');", null)
limbo.run_test_fn(
"select value from t where key = 'hello';",
lambda res: "" == res,
"proper data is deleted",
)
run_test(
pipe,
limbo.run_test_fn(
"select * from t;",
lambda res: "other|value" == res,
"can select after deletion",
)
run_test(
pipe,
limbo.run_test_fn(
"delete from t where key = 'other';",
returns_null,
null,
"can delete from kv_store",
)
run_test(
pipe,
limbo.run_test_fn(
"select * from t;",
lambda res: "" == res,
"can select empty table without error",
)
run_test(
pipe,
limbo.run_test_fn(
"delete from t;",
returns_null,
null,
"can delete from empty table without error",
)
for i in range(100):
write_to_pipe(pipe, f"insert into t values ('key{i}', 'val{i}');")
run_test(
pipe, "select count(*) from t;", lambda res: "100" == res, "can insert 100 rows"
limbo.execute_dot(f"insert into t values ('key{i}', 'val{i}');")
limbo.run_test_fn(
"select count(*) from t;", lambda res: "100" == res, "can insert 100 rows"
)
run_test(pipe, "delete from t limit 96;", returns_null, "can delete 96 rows")
run_test(
pipe, "select count(*) from t;", lambda res: "4" == res, "four rows remain"
limbo.run_test_fn("delete from t limit 96;", null, "can delete 96 rows")
limbo.run_test_fn(
"select count(*) from t;", lambda res: "4" == res, "four rows remain"
)
def main():
pipe = init_limbo()
try:
test_regexp(pipe)
test_uuid(pipe)
test_aggregates(pipe)
test_crypto(pipe)
test_series(pipe)
test_kv(pipe)
except Exception as e:
print(f"Test FAILED: {e}")
pipe.terminate()
exit(1)
pipe.terminate()
print("All tests passed successfully.")
if __name__ == "__main__":
main()
try:
test_regexp()
test_uuid()
test_aggregates()
test_crypto()
test_series()
test_kv()
except Exception as e:
print(f"Test FAILED: {e}")
exit(1)
print("All tests passed successfully.")

View File

@@ -4,7 +4,7 @@ import select
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
from typing import Callable, List, Optional
PIPE_BUF = 4096
@@ -132,5 +132,13 @@ INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3)
f"Actual:\n{repr(actual)}"
)
def run_test_fn(
self, sql: str, validate: Callable[[str], bool], desc: str = ""
) -> None:
actual = self.shell.execute(sql)
if desc:
print(f"Testing: {desc}")
assert validate(actual), f"Test failed\nSQL: {sql}\nActual:\n{repr(actual)}"
def execute_dot(self, dot_command: str) -> None:
self.shell._write_to_pipe(dot_command)