antithesis-tests: add all tests

This commit is contained in:
eric-dinh-antithesis
2025-04-24 12:20:41 -04:00
parent 80d39929ad
commit f993a22023
11 changed files with 386 additions and 0 deletions

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env -S python3 -u
import limbo
from antithesis.random import get_random
from antithesis.assertions import always
con = limbo.connect("bank_test.db")
cur = con.cursor()
initial_state = cur.execute(f'''
SELECT * FROM initial_state
''').fetchone()
curr_total = cur.execute(f'''
SELECT SUM(balance) AS total FROM accounts;
''').fetchone()
always(
initial_state[1] == curr_total[0],
'[Anytime] Initial balance always equals current balance',
{
'init_bal': initial_state[1],
'curr_bal': curr_total[0]
}
)

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env -S python3 -u
import limbo
from antithesis.random import get_random
from antithesis.assertions import always
con = limbo.connect("bank_test.db")
cur = con.cursor()
initial_state = cur.execute(f'''
SELECT * FROM initial_state
''').fetchone()
curr_total = cur.execute(f'''
SELECT SUM(balance) AS total FROM accounts;
''').fetchone()
always(
initial_state[1] == curr_total[0],
'[Eventually] Initial balance always equals current balance',
{
'init_bal': initial_state[1],
'curr_bal': curr_total[0]
}
)

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env -S python3 -u
import limbo
from antithesis.random import get_random
from antithesis.assertions import always
con = limbo.connect("bank_test.db")
cur = con.cursor()
initial_state = cur.execute(f'''
SELECT * FROM initial_state
''').fetchone()
curr_total = cur.execute(f'''
SELECT SUM(balance) AS total FROM accounts;
''').fetchone()
always(
initial_state[1] == curr_total[0],
'[Finally] Initial balance always equals current balance',
{
'init_bal': initial_state[1],
'curr_bal': curr_total[0]
}
)

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env -S python3 -u
import limbo
from antithesis.random import get_random
con = limbo.connect("bank_test.db")
cur = con.cursor()
# drop accounts table if it exists and create a new table
cur.execute(f'''
DROP TABLE IF EXISTS accounts;
''')
cur.execute(f'''
CREATE TABLE accounts (
account_id INTEGER PRIMARY KEY AUTOINCREMENT,
balance REAL NOT NULL DEFAULT 0.0
);
''')
# randomly create up to 100 accounts with a balance up to 1e9
total = 0
num_accts = get_random() % 100
for i in range(num_accts):
bal = get_random() % 1e9
total += bal
cur.execute(f'''
INSERT INTO accounts (balance)
VALUES ({bal})
''')
# drop initial_state table if it exists and create a new table
cur.execute(f'''
DROP TABLE IF EXISTS initial_state;
''')
cur.execute(f'''
CREATE TABLE initial_state (
num_accts INTEGER,
total REAL
);
''')
# store initial state in the table
cur.execute(f'''
INSERT INTO initial_state (num_accts, total)
VALUES ({num_accts}, {total})
''')

View File

@@ -0,0 +1,54 @@
#!/usr/bin/env -S python3 -u
import limbo
import logging
from logging.handlers import RotatingFileHandler
from antithesis.random import get_random
handler = RotatingFileHandler(filename='bank_test.log', mode='a', maxBytes=1*1024*1024, backupCount=5, encoding=None, delay=0)
handler.setLevel(logging.INFO)
logger = logging.getLogger('root')
logger.setLevel(logging.INFO)
logger.addHandler(handler)
con = limbo.connect("bank_test.db")
cur = con.cursor()
length = cur.execute("SELECT num_accts FROM initial_state").fetchone()[0]
def transaction():
# check that sender and recipient are different
sender = get_random() % length + 1
recipient = get_random() % length + 1
if sender != recipient:
# get a random value to transfer between accounts
value = get_random() % 1e9
logger.info(f"Sender ID: {sender} | Recipient ID: {recipient} | Txn Val: {value}")
cur.execute("BEGIN TRANSACTION;")
# subtract value from balance of the sender account
cur.execute(f'''
UPDATE accounts
SET balance = balance - {value}
WHERE account_id = {sender};
''')
# add value to balance of the recipient account
cur.execute(f'''
UPDATE accounts
SET balance = balance + {value}
WHERE account_id = {recipient};
''')
cur.execute("COMMIT;")
# run up to 100 transactions
iterations = get_random() % 100
# logger.info(f"Starting {iterations} iterations")
for i in range(iterations):
transaction()
# logger.info(f"Finished {iterations} iterations")

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env -S python3 -u
import json
import glob
import os
import limbo
from antithesis.random import get_random, random_choice
constraints = ['NOT NULL', 'UNIQUE', '']
data_type = ['INTEGER', 'REAL', 'TEXT', 'BLOB', 'NUMERIC']
# remove any existing db files
for f in glob.glob('*.db'):
try:
os.remove(f)
except OSError:
pass
for f in glob.glob('*.db-wal'):
try:
os.remove(f)
except OSError:
pass
# store initial states in a separate db
con_init = limbo.connect('init_state.db')
cur_init = con_init.cursor()
cur_init.execute('CREATE TABLE schemas (schema TEXT, tbl INT PRIMARY KEY)')
cur_init.execute('CREATE TABLE tables (count INT)')
con = limbo.connect('stress_composer.db')
cur = con.cursor()
tbl_count = max(1, get_random() % 10)
cur_init.execute(f'INSERT INTO tables (count) VALUES ({tbl_count})')
schemas = []
for i in range(tbl_count):
col_count = max(1, get_random() % 10)
pk = get_random() % col_count
schema = {
'table': i,
'colCount': col_count,
'pk': pk
}
cols = []
cols_str = ''
for j in range(col_count):
col_data_type = random_choice(data_type)
col_constraint_1 = random_choice(constraints)
col_constraint_2 = random_choice(constraints)
col = f'col_{j} {col_data_type} {col_constraint_1} {col_constraint_2 if col_constraint_2 != col_constraint_1 else ""}' if j != pk else f'col_{j} {col_data_type} PRIMARY KEY NOT NULL'
cols.append(col)
schema[f'col_{j}'] = {
'data_type': col_data_type,
'constraint1': col_constraint_1 if j != pk else 'PRIMARY KEY',
'constraint2': col_constraint_2 if col_constraint_1 != col_constraint_2 else "" if j != pk else 'NOT NULL',
}
cols_str = ', '.join(cols)
schemas.append(schema)
cur_init.execute(f"INSERT INTO schemas (schema, tbl) VALUES ('{json.dumps(schema)}', {i})")
cur.execute(f'''
CREATE TABLE tbl_{i} ({cols_str})
''')
print(f'DB Schemas\n------------\n{json.dumps(schemas, indent=2)}')

View File

@@ -0,0 +1,33 @@
#!/usr/bin/env -S python3 -u
import json
import limbo
from utils import generate_random_value
from antithesis.random import get_random
# Get initial state
con_init = limbo.connect('init_state.db')
cur_init = con_init.cursor()
tbl_len = cur_init.execute('SELECT count FROM tables').fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f'SELECT schema FROM schemas WHERE tbl = {selected_tbl}').fetchone()[0])
# get primary key column
pk = tbl_schema['pk']
# get non-pk columns
cols = [f'col_{col}' for col in range(tbl_schema['colCount']) if col != pk]
con = limbo.connect('stress_composer.db')
cur = con.cursor()
deletions = get_random() % 100
print(f'Attempt to delete {deletions} rows in tbl_{selected_tbl}...')
for i in range(deletions):
where_clause = f"col_{pk} = {generate_random_value(tbl_schema[f'col_{pk}']['data_type'])}"
cur.execute(f'''
DELETE FROM tbl_{selected_tbl} WHERE {where_clause}
''')

View File

@@ -0,0 +1,31 @@
#!/usr/bin/env -S python3 -u
import json
import limbo
from utils import generate_random_value
from antithesis.random import get_random
# Get initial state
con_init = limbo.connect('init_state.db')
cur_init = con_init.cursor()
tbl_len = cur_init.execute('SELECT count FROM tables').fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f'SELECT schema FROM schemas WHERE tbl = {selected_tbl}').fetchone()[0])
cols = ', '.join([f'col_{col}' for col in range(tbl_schema['colCount'])])
con = limbo.connect('stress_composer.db')
cur = con.cursor()
# insert up to 100 rows in the selected table
insertions = get_random() % 100
print(f'Inserting {insertions} rows...')
for i in range(insertions):
values = [generate_random_value(tbl_schema[f'col_{col}']['data_type']) for col in range(tbl_schema['colCount'])]
cur.execute(f'''
INSERT INTO tbl_{selected_tbl} ({cols})
VALUES ({', '.join(values)})
''')

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env -S python3 -u
import json
import limbo
from utils import generate_random_value
from antithesis.random import get_random
# Get initial state
con_init = limbo.connect('init_state.db')
cur_init = con_init.cursor()
tbl_len = cur_init.execute('SELECT count FROM tables').fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f'SELECT schema FROM schemas WHERE tbl = {selected_tbl}').fetchone()[0])
# get primary key column
pk = tbl_schema['pk']
# get non-pk columns
cols = [f'col_{col}' for col in range(tbl_schema['colCount']) if col != pk]
# print(cols)
con = limbo.connect('stress_composer.db')
cur = con.cursor()
# insert up to 100 rows in the selected table
updates = get_random() % 100
print(f'Attempt to update {updates} rows in tbl_{selected_tbl}...')
for i in range(updates):
set_clause = ''
if tbl_schema['colCount'] == 1:
set_clause = f"col_{pk} = {generate_random_value(tbl_schema[f'col_{pk}']['data_type'])}"
else:
values = []
for col in cols:
# print(col)
values.append(f"{col} = {generate_random_value(tbl_schema[col]['data_type'])}")
set_clause = ', '.join(values)
where_clause = f"col_{pk} = {generate_random_value(tbl_schema[f'col_{pk}']['data_type'])}"
# print(where_clause)
cur.execute(f'''
UPDATE tbl_{selected_tbl} SET {set_clause} WHERE {where_clause}
''')

View File

@@ -0,0 +1,20 @@
import string
from antithesis.random import get_random, random_choice
def generate_random_identifier(type: str, num: int):
return ''.join(type, '_', get_random() % num)
def generate_random_value(type: str):
match type:
case 'INTEGER':
return str(get_random() % 100)
case 'REAL':
return '{:.2f}'.format(get_random() % 100 / 100.0)
case 'TEXT':
return f"'{''.join(random_choice(string.ascii_lowercase) for _ in range(5))}'"
case 'BLOB':
return f"x'{''.join(random_choice(string.ascii_lowercase) for _ in range(5)).encode().hex()}'"
case 'NUMERIC':
return str(get_random() % 100)
case _:
return NULL

View File

@@ -0,0 +1,3 @@
#!/usr/bin/env bash
/bin/limbo_stress