@@ -31,31 +31,31 @@ def validate_string_uuid(res):
def test_uuid ( ) :
limb o = TestTursoShell ( )
turs o = TestTursoShell ( )
specific_time = " 01945ca0-3189-76c0-9a8f-caf310fc8b8e "
# these are built into the binary, so we just test they work
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT hex(uuid4()); " ,
lambda res : int ( res , 16 ) is not None ,
" uuid functions are registered properly with ext loaded " ,
)
limb o. run_test_fn ( " SELECT uuid4_str(); " , lambda res : len ( res ) == 36 )
limb o. run_test_fn ( " SELECT hex(uuid7()); " , lambda res : int ( res , 16 ) is not None )
limb o. run_test_fn ( " SELECT uuid7_timestamp_ms(uuid7()) / 1000; " , lambda res : res . isdigit ( ) )
limb o. run_test_fn ( " SELECT uuid7_str(); " , validate_string_uuid )
limb o. run_test_fn ( " SELECT uuid_str(uuid7()); " , validate_string_uuid )
limb o. run_test_fn ( " SELECT hex(uuid_blob(uuid7_str())); " , lambda res : int ( res , 16 ) is not None )
limb o. run_test_fn ( " SELECT uuid_str(uuid_blob(uuid7_str())); " , validate_string_uuid )
limb o. run_test_fn (
turs o. run_test_fn ( " SELECT uuid4_str(); " , lambda res : len ( res ) == 36 )
turs o. run_test_fn ( " SELECT hex(uuid7()); " , lambda res : int ( res , 16 ) is not None )
turs o. run_test_fn ( " SELECT uuid7_timestamp_ms(uuid7()) / 1000; " , lambda res : res . isdigit ( ) )
turs o. run_test_fn ( " SELECT uuid7_str(); " , validate_string_uuid )
turs o. run_test_fn ( " SELECT uuid_str(uuid7()); " , validate_string_uuid )
turs o. run_test_fn ( " SELECT hex(uuid_blob(uuid7_str())); " , lambda res : int ( res , 16 ) is not None )
turs o. run_test_fn ( " SELECT uuid_str(uuid_blob(uuid7_str())); " , validate_string_uuid )
turs o. run_test_fn (
f " SELECT uuid7_timestamp_ms( ' { specific_time } ' ) / 1000; " ,
lambda res : res == " 1736720789 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT gen_random_uuid(); " ,
validate_string_uuid ,
" scalar alias ' s are registered properly " ,
)
limb o. quit ( )
turs o. quit ( )
def true ( res ) :
@@ -71,51 +71,51 @@ def null(res):
def test_regexp ( ) :
limb o = TestTursoShell ( test_data )
turs o = TestTursoShell ( test_data )
extension_path = " ./target/debug/liblimbo_regexp "
# before extension loads, assert no function
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT regexp( ' a.c ' , ' abc ' ); " ,
lambda res : " Parse error: no such function " in res ,
)
limb o. run_test_fn ( f " .load { extension_path } " , null )
turs o. run_test_fn ( f " .load { extension_path } " , null )
console . info ( f " Extension { extension_path } loaded successfully. " )
limb o. run_test_fn ( " SELECT regexp( ' a.c ' , ' abc ' ); " , true )
limb o. run_test_fn ( " SELECT regexp( ' a.c ' , ' ac ' ); " , false )
limb o. run_test_fn ( " SELECT regexp( ' [0-9]+ ' , ' the year is 2021 ' ); " , true )
limb o. run_test_fn ( " SELECT regexp( ' [0-9]+ ' , ' the year is unknow ' ); " , false )
limb o. run_test_fn ( " SELECT regexp_like( ' the year is 2021 ' , ' [0-9]+ ' ); " , true )
limb o. run_test_fn ( " SELECT regexp_like( ' the year is unknow ' , ' [0-9]+ ' ); " , false )
limb o. run_test_fn (
turs o. run_test_fn ( " SELECT regexp( ' a.c ' , ' abc ' ); " , true )
turs o. run_test_fn ( " SELECT regexp( ' a.c ' , ' ac ' ); " , false )
turs o. run_test_fn ( " SELECT regexp( ' [0-9]+ ' , ' the year is 2021 ' ); " , true )
turs o. run_test_fn ( " SELECT regexp( ' [0-9]+ ' , ' the year is unknow ' ); " , false )
turs o. run_test_fn ( " SELECT regexp_like( ' the year is 2021 ' , ' [0-9]+ ' ); " , true )
turs o. run_test_fn ( " SELECT regexp_like( ' the year is unknow ' , ' [0-9]+ ' ); " , false )
turs o. run_test_fn (
" SELECT regexp_substr( ' the year is 2021 ' , ' [0-9]+ ' ) = ' 2021 ' ; " ,
true ,
)
limb o. run_test_fn ( " SELECT regexp_substr( ' the year is unknow ' , ' [0-9]+ ' ); " , null )
limb o. run_test_fn (
turs o. run_test_fn ( " SELECT regexp_substr( ' the year is unknow ' , ' [0-9]+ ' ); " , null )
turs o. run_test_fn (
" select regexp_replace( ' the year is 2021 ' , ' [0-9]+ ' , ' 2050 ' ) = ' the year is 2050 ' ; " ,
true ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select regexp_replace( ' the year is 2021 ' , ' 2k21 ' , ' 2050 ' ) = ' the year is 2021 ' ; " ,
true ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select regexp_replace( ' the year is 2021 ' , ' ([0-9]+) ' , ' $1 or 2050 ' ) = ' the year is 2021 or 2050 ' ; " ,
true ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select regexp_capture( ' the year is 2021 ' , ' ([0-9]+) ' ) = ' 2021 ' ; " ,
true ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select regexp_capture( ' abc 123 def ' , ' ([a-z]+) ([0-9]+) ([a-z]+) ' , 2) = ' 123 ' ; " ,
true ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select regexp_capture( ' no digits here ' , ' ([0-9]+) ' ); " ,
null ,
)
limb o. quit ( )
turs o. quit ( )
def validate_median ( res ) :
@@ -139,82 +139,82 @@ def validate_percentile_disc(res):
def test_aggregates ( ) :
limb o = TestTursoShell ( init_commands = test_data )
turs o = TestTursoShell ( init_commands = test_data )
extension_path = " ./target/debug/liblimbo_percentile "
# assert no function before extension loads
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT median(1); " ,
lambda res : " error: no such function: " in res ,
" median agg function returns null when ext not loaded " ,
)
limb o. execute_dot ( f " .load { extension_path } " )
limb o. run_test_fn (
turs o. execute_dot ( f " .load { extension_path } " )
turs o. run_test_fn (
" select median(value) from numbers; " ,
validate_median ,
" median agg function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select CASE WHEN median(value) > 0 THEN median(value) ELSE 0 END from numbers; " ,
validate_median ,
" median agg function wrapped in expression works " ,
)
limb o. execute_dot ( " INSERT INTO numbers (value) VALUES (8.0); \n " )
limb o. run_test_fn (
turs o. execute_dot ( " INSERT INTO numbers (value) VALUES (8.0); \n " )
turs o. run_test_fn (
" select median(value) from numbers; " ,
validate_median_odd ,
" median agg function works with odd number of elements " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT percentile(value, percent) from test; " ,
validate_percentile1 ,
" test aggregate percentile function with 2 arguments works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT percentile(value, 55) from test; " ,
validate_percentile2 ,
" test aggregate percentile function with 1 argument works " ,
)
limb o. run_test_fn ( " SELECT percentile_cont(value, 0.25) from test; " , validate_percentile1 )
limb o. run_test_fn ( " SELECT percentile_disc(value, 0.55) from test; " , validate_percentile_disc )
limb o. quit ( )
turs o. run_test_fn ( " SELECT percentile_cont(value, 0.25) from test; " , validate_percentile1 )
turs o. run_test_fn ( " SELECT percentile_disc(value, 0.55) from test; " , validate_percentile_disc )
turs o. quit ( )
def test_grouped_aggregates ( ) :
limb o = TestTursoShell ( init_commands = test_data )
turs o = TestTursoShell ( init_commands = test_data )
extension_path = " ./target/debug/liblimbo_percentile "
limb o. execute_dot ( f " .load { extension_path } " )
turs o. execute_dot ( f " .load { extension_path } " )
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT median(value) FROM numbers GROUP BY category; " ,
lambda res : " 2.0 \n 5.5 " == res ,
" median aggregate function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select CASE WHEN median(value) > 0 THEN median(value) ELSE 0 END from numbers GROUP BY category; " ,
lambda res : " 2.0 \n 5.5 " == res ,
" median aggregate function wrapped in expression works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT percentile(value, percent) FROM test GROUP BY category; " ,
lambda res : " 12.5 \n 30.0 \n 45.0 \n 70.0 " == res ,
" grouped aggregate percentile function with 2 arguments works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT percentile(value, 55) FROM test GROUP BY category; " ,
lambda res : " 15.5 \n 30.0 \n 51.0 \n 70.0 " == res ,
" grouped aggregate percentile function with 1 argument works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT percentile_cont(value, 0.25) FROM test GROUP BY category; " ,
lambda res : " 12.5 \n 30.0 \n 45.0 \n 70.0 " == res ,
" grouped aggregate percentile_cont function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT percentile_disc(value, 0.55) FROM test GROUP BY category; " ,
lambda res : " 10.0 \n 30.0 \n 50.0 \n 70.0 " == res ,
" grouped aggregate percentile_disc function works " ,
)
limb o. quit ( )
turs o. quit ( )
# Encoders and decoders
@@ -259,43 +259,43 @@ def validate_base64_decode(a):
def test_crypto ( ) :
limb o = TestTursoShell ( )
turs o = TestTursoShell ( )
extension_path = " ./target/debug/liblimbo_crypto "
# assert no function before extension loads
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_blake( ' a ' ); " ,
lambda res : " Parse error " in res ,
" crypto_blake3 returns null when ext not loaded " ,
)
limb o. execute_dot ( f " .load { extension_path } " )
turs o. execute_dot ( f " .load { extension_path } " )
# Hashing and Decode
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode(crypto_blake3( ' abc ' ), ' hex ' ); " ,
lambda res : res == " 6437b3ac38465133ffb63b75273a8db548c558465d79db03fd359c6cd5bd9d85 " ,
" blake3 should encrypt correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode(crypto_md5( ' abc ' ), ' hex ' ); " ,
lambda res : res == " 900150983cd24fb0d6963f7d28e17f72 " ,
" md5 should encrypt correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode(crypto_sha1( ' abc ' ), ' hex ' ); " ,
lambda res : res == " a9993e364706816aba3e25717850c26c9cd0d89d " ,
" sha1 should encrypt correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode(crypto_sha256( ' abc ' ), ' hex ' ); " ,
lambda a : a == " ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad " ,
" sha256 should encrypt correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode(crypto_sha384( ' abc ' ), ' hex ' ); " ,
lambda a : a
== " cb00753f45a35e8bb5a03d699ac65007272c32ab0eded1631a8b605a43ff5bed8086072ba1e7cc2358baeca134c825a7 " ,
" sha384 should encrypt correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode(crypto_sha512( ' abc ' ), ' hex ' ); " ,
lambda a : a
== " ddaf35a193617abacc417349ae20413112e6fa4e89a97ea20a9eeee64b55d39a2192992a274fc1a836ba3c23a3feebbd454d4423643ce80e2a9ac94fa54ca49f " , # noqa: E501
@@ -303,67 +303,67 @@ def test_crypto():
)
# Encoding and Decoding
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode( ' hello ' , ' base32 ' ); " ,
validate_base32_encode ,
" base32 should encode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_decode( ' NBSWY3DP ' , ' base32 ' ); " ,
validate_base32_decode ,
" base32 should decode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode( ' hello ' , ' base64 ' ); " ,
validate_base64_encode ,
" base64 should encode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_decode( ' aGVsbG8= ' , ' base64 ' ); " ,
validate_base64_decode ,
" base64 should decode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode( ' hello ' , ' base85 ' ); " ,
validate_base85_encode ,
" base85 should encode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_decode( ' BOu!rDZ ' , ' base85 ' ); " ,
validate_base85_decode ,
" base85 should decode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode( ' hello ' , ' hex ' ); " ,
validate_hex_encode ,
" hex should encode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_decode( ' 68656c6c6f ' , ' hex ' ); " ,
validate_hex_decode ,
" hex should decode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_encode( ' /hello?text=(ಠ_ಠ) ' , ' url ' ); " ,
validate_url_encode ,
" url should encode correctly " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT crypto_decode( ' %2F hello %3F text % 3D %28% E0 % B2 % A0_ %E 0 % B2 % A0 % 29 ' , ' url ' ); " ,
validate_url_decode ,
" url should decode correctly " ,
)
limb o. quit ( )
turs o. quit ( )
def test_series ( ) :
console . info ( " Running test_series for Limbo " )
limb o = TestTursoShell ( )
_test_series ( limb o)
turs o = TestTursoShell ( )
_test_series ( turs o)
console . info ( " Running test_series for SQLite " )
limb o = TestTursoShell ( exec_name = " sqlite3 " )
_test_series ( limb o)
turs o = TestTursoShell ( exec_name = " sqlite3 " )
_test_series ( turs o)
def _test_series ( limbo : TestTursoShell ) :
@@ -399,78 +399,78 @@ def test_kv():
def _test_kv ( exec_name , ext_path ) :
console . info ( f " Running test_kv for { ext_path } in { exec_name } " )
limb o = TestTursoShell (
turs o = TestTursoShell (
exec_name = exec_name ,
)
# first, create a normal table to ensure no issues
limb o. execute_dot ( " CREATE TABLE other (a,b,c); " )
limb o. execute_dot ( " INSERT INTO other values (23,32,23); " )
limb o. run_test_fn (
turs o. execute_dot ( " CREATE TABLE other (a,b,c); " )
turs o. execute_dot ( " INSERT INTO other values (23,32,23); " )
turs o. run_test_fn (
" create virtual table t using kv_store; " ,
lambda res : " no such module: kv_store " in res ,
)
limb o. execute_dot ( f " .load { ext_path } " )
limb o. execute_dot (
turs o. execute_dot ( f " .load { ext_path } " )
turs o. execute_dot (
" create virtual table t using kv_store; " ,
)
limb o. run_test_fn ( " .schema " , lambda res : " CREATE VIRTUAL TABLE t " in res )
limb o. run_test_fn (
turs o. run_test_fn ( " .schema " , lambda res : " CREATE VIRTUAL TABLE t " in res )
turs o. run_test_fn (
" insert into t values ( ' hello ' , ' world ' ); " ,
null ,
" can insert into kv_store vtable " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select value from t where key = ' hello ' ; " ,
lambda res : " world " == res ,
" can select from kv_store " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" delete from t where key = ' hello ' ; " ,
null ,
" can delete from kv_store " ,
)
limb o. run_test_fn ( " insert into t values ( ' other ' , ' value ' ); " , null )
limb o. run_test_fn (
turs o. run_test_fn ( " insert into t values ( ' other ' , ' value ' ); " , null )
turs o. run_test_fn (
" select value from t where key = ' hello ' ; " ,
lambda res : " " == res ,
" proper data is deleted " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select * from t; " ,
lambda res : " other|value " == res ,
" can select after deletion " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" delete from t where key = ' other ' ; " ,
null ,
" can delete from kv_store " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select * from t; " ,
lambda res : " " == res ,
" can select empty table without error " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" delete from t; " ,
null ,
" can delete from empty table without error " ,
)
for i in range ( 100 ) :
limb o. execute_dot ( f " insert into t values ( ' key { i } ' , ' val { i } ' ); " )
limb o. run_test_fn ( " select count(*) from t; " , lambda res : " 100 " == res , " can insert 100 rows " )
limb o. run_test_fn ( " update t set value = ' updated ' where key = ' key33 ' ; " , null )
limb o. run_test_fn (
turs o. execute_dot ( f " insert into t values ( ' key { i } ' , ' val { i } ' ); " )
turs o. run_test_fn ( " select count(*) from t; " , lambda res : " 100 " == res , " can insert 100 rows " )
turs o. run_test_fn ( " update t set value = ' updated ' where key = ' key33 ' ; " , null )
turs o. run_test_fn (
" select * from t where key = ' key33 ' ; " ,
lambda res : res == " key33|updated " ,
" can update single row " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select COUNT(*) from t where value = ' updated ' ; " ,
lambda res : res == " 1 " ,
" only updated a single row " ,
)
limb o. run_test_fn ( " update t set value = ' updated2 ' ; " , null )
limb o. run_test_fn (
turs o. run_test_fn ( " update t set value = ' updated2 ' ; " , null )
turs o. run_test_fn (
" select COUNT(*) from t where value = ' updated2 ' ; " ,
lambda res : res == " 100 " ,
" can update all rows " ,
@@ -478,229 +478,245 @@ def _test_kv(exec_name, ext_path):
if exec_name is None :
# Test only on Limbo, since SQLite supports the DELETE ... LIMIT syntax only when compiled
# with the SQLITE_ENABLE_UPDATE_DELETE_LIMIT option: https://www.sqlite.org/lang_delete.html
limb o. run_test_fn ( " delete from t limit 96; " , null , " can delete 96 rows " )
limb o. run_test_fn ( " select count(*) from t; " , lambda res : " 4 " == res , " four rows remain " )
limb o. run_test_fn ( " update t set key = ' 100 ' where 1; " , null , " where clause evaluates properly " )
limb o. run_test_fn (
turs o. run_test_fn ( " delete from t limit 96; " , null , " can delete 96 rows " )
turs o. run_test_fn ( " select count(*) from t; " , lambda res : " 4 " == res , " four rows remain " )
turs o. run_test_fn ( " update t set key = ' 100 ' where 1; " , null , " where clause evaluates properly " )
turs o. run_test_fn (
" select * from t where key = ' 100 ' ; " ,
lambda res : res == " 100|updated2 " ,
" there is only 1 key remaining after setting all keys to same value " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" select * from t a, other b where b.c = 23 and a.key= ' 100 ' ; " ,
lambda res : " 100|updated2|23|32|23 " == res ,
)
limb o. quit ( )
turs o. quit ( )
def test_ipaddr ( ) :
limb o = TestTursoShell ( )
turs o = TestTursoShell ( )
ext_path = " ./target/debug/liblimbo_ipaddr "
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipfamily( ' 192.168.1.1 ' ); " ,
lambda res : " error: no such function: " in res ,
" ipfamily function returns null when ext not loaded " ,
)
limb o. execute_dot ( f " .load { ext_path } " )
turs o. execute_dot ( f " .load { ext_path } " )
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipfamily( ' 192.168.1.1 ' ); " ,
lambda res : " 4 " == res ,
" ipfamily function returns 4 for IPv4 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipfamily( ' 2001:db8::1 ' ); " ,
lambda res : " 6 " == res ,
" ipfamily function returns 6 for IPv6 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipcontains( ' 192.168.16.0/24 ' , ' 192.168.16.3 ' ); " ,
lambda res : " 1 " == res ,
" ipcontains function returns 1 for IPv4 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipcontains( ' 192.168.1.0/24 ' , ' 192.168.2.1 ' ); " ,
lambda res : " 0 " == res ,
" ipcontains function returns 0 for IPv4 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT iphost( ' 192.168.1.0/24 ' ); " ,
lambda res : " 192.168.1.0 " == res ,
" iphost function returns the host for IPv4 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT iphost( ' 2001:db8::1/128 ' ); " ,
lambda res : " 2001:db8::1 " == res ,
" iphost function returns the host for IPv6 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipmasklen( ' 192.168.1.0/24 ' ); " ,
lambda res : " 24 " == res ,
" ipmasklen function returns the mask length for IPv4 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipmasklen( ' 2001:db8::1 ' ); " ,
lambda res : " 128 " == res ,
" ipmasklen function returns the mask length for IPv6 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipnetwork( ' 192.168.16.12/24 ' ); " ,
lambda res : " 192.168.16.0/24 " == res ,
" ipnetwork function returns the flattened CIDR for IPv4 " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT ipnetwork( ' 2001:db8::1 ' ); " ,
lambda res : " 2001:db8::1/128 " == res ,
" ipnetwork function returns the network for IPv6 " ,
)
limb o. quit ( )
turs o. quit ( )
def validate_fuzzy_leven ( a ) :
return a == " 3 "
def validate_fuzzy_damlev1 ( a ) :
return a == " 2 "
def validate_fuzzy_damlev2 ( a ) :
return a == " 1 "
def validate_fuzzy_editdist1 ( a ) :
return a == " 225 "
def validate_fuzzy_editdist2 ( a ) :
return a == " 110 "
def validate_fuzzy_jarowin ( a ) :
return a == " 0.907142857142857 "
def validate_fuzzy_osadist ( a ) :
return a == " 3 "
def validate_fuzzy_soundex ( a ) :
return a == " A250 "
def validate_fuzzy_phonetic ( a ) :
return a == " ABACAMA "
def validate_fuzzy_caver ( a ) :
return a == " AWSM111111 "
def validate_fuzzy_rsoundex ( a ) :
return a == " A03080 "
def validate_fuzzy_translit1 ( a ) :
return a == " oh my ? "
def validate_fuzzy_translit2 ( a ) :
return a == " privet "
def validate_fuzzy_script ( a ) :
return a == " 160 "
def test_fuzzy ( ) :
limb o = TestTursoShell ( )
turs o = TestTursoShell ( )
ext_path = " ./target/debug/liblimbo_fuzzy "
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_leven( ' awesome ' , ' aewsme ' ); " ,
lambda res : " error: no such function: " in res ,
" fuzzy levenshtein function returns null when ext not loaded " ,
)
limb o. execute_dot ( f " .load { ext_path } " )
limb o. run_test_fn (
turs o. execute_dot ( f " .load { ext_path } " )
turs o. run_test_fn (
" SELECT fuzzy_leven( ' awesome ' , ' aewsme ' ); " ,
validate_fuzzy_leven ,
" fuzzy levenshtein function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_damlev( ' awesome ' , ' aewsme ' ); " ,
validate_fuzzy_damlev1 ,
" fuzzy damerau levenshtein1 function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_damlev( ' Something ' , ' Smoething ' ); " ,
validate_fuzzy_damlev2 ,
" fuzzy damerau levenshtein2 function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_editdist( ' abc ' , ' ca ' ); " ,
validate_fuzzy_editdist1 ,
" fuzzy editdist1 function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_editdist( ' abc ' , ' acb ' ); " ,
validate_fuzzy_editdist2 ,
" fuzzy editdist2 function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_jarowin( ' awesome ' , ' aewsme ' ); " ,
validate_fuzzy_jarowin ,
" fuzzy jarowin function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_osadist( ' awesome ' , ' aewsme ' ); " ,
validate_fuzzy_osadist ,
" fuzzy osadist function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_phonetic( ' awesome ' ); " ,
validate_fuzzy_phonetic ,
" fuzzy phonetic function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_caver( ' awesome ' ); " ,
validate_fuzzy_caver ,
" fuzzy caver function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_rsoundex( ' awesome ' ); " ,
validate_fuzzy_rsoundex ,
" fuzzy rsoundex function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_translit( ' oh my 😅 ' ); " ,
validate_fuzzy_translit1 ,
" fuzzy translit1 function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_translit( ' привет ' ); " ,
validate_fuzzy_translit2 ,
" fuzzy translit2 function works " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT fuzzy_script( ' داناوانب ' ); " ,
validate_fuzzy_script ,
" fuzzy script function works " ,
)
def test_vfs ( ) :
limb o = TestTursoShell ( )
turs o = TestTursoShell ( )
ext_path = " target/debug/libturso_ext_tests "
limb o. run_test_fn ( " .vfslist " , lambda x : " testvfs " not in x , " testvfs not loaded " )
limb o. execute_dot ( f " .load { ext_path } " )
limb o. run_test_fn ( " .vfslist " , lambda res : " testvfs " in res , " testvfs extension loaded " )
limb o. execute_dot ( " .open testing/vfs.db testvfs " )
limb o. execute_dot ( " create table test (id integer primary key, value float); " )
limb o. execute_dot ( " create table vfs (id integer primary key, value blob); " )
turs o. run_test_fn ( " .vfslist " , lambda x : " testvfs " not in x , " testvfs not loaded " )
turs o. execute_dot ( f " .load { ext_path } " )
turs o. run_test_fn ( " .vfslist " , lambda res : " testvfs " in res , " testvfs extension loaded " )
turs o. execute_dot ( " .open testing/vfs.db testvfs " )
turs o. execute_dot ( " create table test (id integer primary key, value float); " )
turs o. execute_dot ( " create table vfs (id integer primary key, value blob); " )
for i in range ( 50 ) :
limb o. execute_dot ( " insert into test (value) values (randomblob(32*1024)); " )
limb o. execute_dot ( f " insert into vfs (value) values ( { i } ); " )
limb o. run_test_fn (
turs o. execute_dot ( " insert into test (value) values (randomblob(32*1024)); " )
turs o. execute_dot ( f " insert into vfs (value) values ( { i } ); " )
turs o. run_test_fn (
" SELECT count(*) FROM test; " ,
lambda res : res == " 50 " ,
" Tested large write to testfs " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT count(*) FROM vfs; " ,
lambda res : res == " 50 " ,
" Tested large write to testfs " ,
)
console . info ( " Tested large write to testfs " )
limb o. quit ( )
turs o. quit ( )
def test_sqlite_vfs_compat ( ) :
@@ -735,81 +751,81 @@ def test_sqlite_vfs_compat():
def test_csv ( ) :
# open new empty connection explicitly to test whether we can load an extension
# with brand new connection/uninitialized database.
limb o = TestTursoShell ( init_commands = " " )
test_module_list ( limb o, " target/debug/liblimbo_csv " , " csv " )
turs o = TestTursoShell ( init_commands = " " )
test_module_list ( turs o, " target/debug/liblimbo_csv " , " csv " )
limb o. run_test_fn (
turs o. run_test_fn (
" CREATE VIRTUAL TABLE temp.csv USING csv(filename=./testing/test_files/test.csv); " ,
null ,
" Create virtual table from CSV file " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM temp.csv; " ,
lambda res : res == " 1|2.0|String ' 1 \n 3|4.0|String2 " ,
" Read all rows from CSV table " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM temp.csv WHERE c2 = ' String2 ' ; " ,
lambda res : res == " 3|4.0|String2 " ,
" Filter rows with WHERE clause " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" INSERT INTO temp.csv VALUES (5, 6.0, ' String3 ' ); " ,
lambda res : " Table is read-only " in res ,
" INSERT into CSV table should fail " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" UPDATE temp.csv SET c0 = 10 WHERE c1 = ' 2.0 ' ; " ,
lambda res : " is read-only " in res ,
" UPDATE on CSV table should fail " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" DELETE FROM temp.csv WHERE c1 = ' 2.0 ' ; " ,
lambda res : " is read-only " in res ,
" DELETE on CSV table should fail " ,
)
limb o. run_test_fn ( " DROP TABLE temp.csv; " , null , " Drop CSV table " )
limb o. run_test_fn (
turs o. run_test_fn ( " DROP TABLE temp.csv; " , null , " Drop CSV table " )
turs o. run_test_fn (
" SELECT * FROM temp.csv; " ,
lambda res : " Parse error: no such table: csv " in res ,
" Query dropped CSV table should fail " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" create virtual table t1 using csv(data= ' 1 ' \\ ' 2 ' ); " ,
lambda res : " unrecognized token at " in res ,
" Create CSV table with malformed escape sequence " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" create virtual table t1 using csv(data= \" 12 ' ); " ,
lambda res : " non-terminated literal at " in res ,
" Create CSV table with unterminated quoted string " ,
)
limb o. run_debug ( " create virtual table t1 using csv(data= ' ' ); " )
limb o. run_test_fn (
turs o. run_debug ( " create virtual table t1 using csv(data= ' ' ); " )
turs o. run_test_fn (
" SELECT c0 FROM t1; " ,
lambda res : res == " " ,
" Empty CSV table without a header should have one column: ' c0 ' " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT c1 FROM t1; " ,
lambda res : " Parse error: no such column: c1 " in res ,
" Empty CSV table without header should not have columns other than ' c0 ' " ,
)
limb o. run_debug ( " create virtual table t2 using csv(data= ' ' , header=true); " )
limb o. run_test_fn (
turs o. run_debug ( " create virtual table t2 using csv(data= ' ' , header=true); " )
turs o. run_test_fn (
' SELECT " (NULL) " FROM t2; ' ,
lambda res : res == " " ,
" Empty CSV table with header should have one column named ' (NULL) ' " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT c0 FROM t2; " ,
lambda res : " Parse error: no such column: c0 " in res ,
" Empty CSV table with header should not have columns other than ' (NULL) ' " ,
)
limb o. quit ( )
turs o. quit ( )
def cleanup ( ) :
@@ -821,45 +837,45 @@ def cleanup():
def test_tablestats ( ) :
ext_path = " target/debug/libturso_ext_tests "
limb o = TestTursoShell ( use_testing_db = True )
test_module_list ( limb o, ext_path = ext_path , module_name = " tablestats " )
limb o. execute_dot ( " CREATE TABLE people(id INTEGER PRIMARY KEY, name TEXT); " )
limb o. execute_dot ( " INSERT INTO people(name) VALUES ( ' Ada ' ), ( ' Grace ' ), ( ' Linus ' ); " )
turs o = TestTursoShell ( use_testing_db = True )
test_module_list ( turs o, ext_path = ext_path , module_name = " tablestats " )
turs o. execute_dot ( " CREATE TABLE people(id INTEGER PRIMARY KEY, name TEXT); " )
turs o. execute_dot ( " INSERT INTO people(name) VALUES ( ' Ada ' ), ( ' Grace ' ), ( ' Linus ' ); " )
limb o. execute_dot ( " CREATE TABLE logs(ts INT, msg TEXT); " )
limb o. execute_dot ( " INSERT INTO logs VALUES (1, ' boot ok ' ); " )
turs o. execute_dot ( " CREATE TABLE logs(ts INT, msg TEXT); " )
turs o. execute_dot ( " INSERT INTO logs VALUES (1, ' boot ok ' ); " )
# verify counts
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT COUNT(*) FROM people; " ,
lambda res : res == " 3 " ,
" three people rowsverify user count " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT COUNT(*) FROM logs; " ,
lambda res : res == " 1 " ,
" one logs rowverify logs count " ,
)
limb o. execute_dot ( " CREATE VIRTUAL TABLE stats USING tablestats; " )
turs o. execute_dot ( " CREATE VIRTUAL TABLE stats USING tablestats; " )
def _split ( res ) :
return [ ln . strip ( ) for ln in res . splitlines ( ) if ln . strip ( ) ]
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM stats ORDER BY name; " ,
lambda res : sorted ( _split ( res ) ) == sorted ( [ " logs|1 " , " people|3 " , " products|11 " , " users|10000 " ] ) ,
" stats shows correct initial counts (and skips itself) " ,
)
limb o. execute_dot ( " INSERT INTO logs VALUES (2, ' panic ' ), (3, ' recovery ' ); " )
limb o. execute_dot ( " DELETE FROM people WHERE name= ' Linus ' ; " )
turs o. execute_dot ( " INSERT INTO logs VALUES (2, ' panic ' ), (3, ' recovery ' ); " )
turs o. execute_dot ( " DELETE FROM people WHERE name= ' Linus ' ; " )
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM stats WHERE name= ' logs ' ; " ,
lambda res : res == " logs|3 " ,
" row‑ count grows after INSERT " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM stats WHERE name= ' people ' ; " ,
lambda res : res == " people|2 " ,
" row‑ count shrinks after DELETE " ,
@@ -869,7 +885,7 @@ def test_tablestats():
# the test extension is also doing a testing insert on every query
# as part of its own testing, so we cannot assert 'products|11'
# we need to add 3 for the 3 queries we did above.
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM stats WHERE name= ' products ' ; " ,
lambda x : x == " products|14 " ,
" products table reflects changes " ,
@@ -877,32 +893,32 @@ def test_tablestats():
# an insert to products with (name,price) ('xConnect', 42)
# has happened on each query (4 so far) in the testing extension.
# so at this point the sum should be 168
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT sum(price) FROM products WHERE name = ' xConnect ' ; " ,
lambda x : x == " 168.0 " ,
" price sum for ' xConnect ' inserts happenning in the testing extension " ,
)
limb o. run_test_fn (
turs o. run_test_fn (
" SELECT * FROM stats WHERE name= ' users ' ; " ,
lambda x : x == " users|10000 " ,
" users table unchanged " ,
)
limb o. execute_dot ( " CREATE TABLE misc(x); " )
limb o. run_test_fn (
turs o. execute_dot ( " CREATE TABLE misc(x); " )
turs o. run_test_fn (
" SELECT * FROM stats WHERE name= ' misc ' ; " ,
lambda res : res == " misc|0 " ,
" newly‑ created table shows up with zero rows " ,
)
limb o. execute_dot ( " DROP TABLE logs; " )
limb o. run_test_fn (
turs o. execute_dot ( " DROP TABLE logs; " )
turs o. run_test_fn (
" SELECT name FROM stats WHERE name= ' logs ' ; " ,
lambda res : res == " " ,
" dropped table disappears from stats " ,
)
limb o. quit ( )
turs o. quit ( )
def test_module_list ( turso_shell , ext_path , module_name ) :