antithesis: Add CREATE TABLE parallel driver

This commit is contained in:
Pekka Enberg
2025-07-21 13:08:14 +03:00
parent 9788a8739c
commit bd30cb313c
10 changed files with 236 additions and 41 deletions

View File

@@ -32,7 +32,6 @@ except Exception as e:
cur_init = con_init.cursor()
cur_init.execute("CREATE TABLE schemas (schema TEXT, tbl INT)")
cur_init.execute("CREATE TABLE tables (count INT)")
cur_init.execute("CREATE TABLE indexes (idx_name TEXT, tbl_name TEXT, idx_type TEXT, cols TEXT)")
try:
@@ -45,8 +44,6 @@ cur = con.cursor()
tbl_count = max(1, get_random() % 10)
cur_init.execute(f"INSERT INTO tables (count) VALUES ({tbl_count})")
schemas = []
for i in range(tbl_count):
col_count = max(1, get_random() % 10)

View File

@@ -5,7 +5,6 @@ import string
import turso
from antithesis.random import get_random, random_choice
from utils import generate_random_value
# Get initial state
try:
@@ -16,15 +15,17 @@ except Exception as e:
cur_init = con_init.cursor()
# Get table count
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
if tbl_len == 0:
# Get all existing tables from schemas
existing_schemas = cur_init.execute(
"SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables available for index creation")
exit(0)
# Select a random table
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
# Connect to the main database
try:
@@ -37,21 +38,24 @@ cur = con.cursor()
# Check existing indexes on this table
existing_indexes = cur.execute(f"""
SELECT name FROM sqlite_master
SELECT name FROM sqlite_master
WHERE type = 'index' AND tbl_name = 'tbl_{selected_tbl}'
AND sql IS NOT NULL
""").fetchall()
existing_index_names = {idx[0] for idx in existing_indexes}
print(f"Selected table: tbl_{selected_tbl} with {tbl_schema['colCount']} columns")
print(
f"Selected table: tbl_{selected_tbl} with {tbl_schema['colCount']} columns")
print(f"Existing indexes: {len(existing_indexes)}")
# Decide whether to create a single-column or composite index
create_composite = tbl_schema["colCount"] > 2 and get_random() % 3 == 0 # 33% chance for composite
create_composite = tbl_schema["colCount"] > 2 and get_random(
) % 3 == 0 # 33% chance for composite
if create_composite:
# Create composite index
num_cols = 2 + (get_random() % min(2, tbl_schema["colCount"] - 1)) # 2-3 columns
num_cols = 2 + (get_random() %
min(2, tbl_schema["colCount"] - 1)) # 2-3 columns
selected_cols = []
available_cols = list(range(tbl_schema["colCount"]))
@@ -74,7 +78,7 @@ if create_composite:
# Store index information in init_state.db
cur_init.execute(f"""
INSERT INTO indexes (idx_name, tbl_name, idx_type, cols)
INSERT INTO indexes (idx_name, tbl_name, idx_type, cols)
VALUES ('{index_name}', 'tbl_{selected_tbl}', 'composite', '{", ".join(col_names)}')
""")
con_init.commit()
@@ -96,7 +100,8 @@ else:
# Determine index type based on column data type
col_type = tbl_schema[col_name]["data_type"]
index_suffix = "".join(random_choice(string.ascii_lowercase) for _ in range(4))
index_suffix = "".join(random_choice(string.ascii_lowercase)
for _ in range(4))
if col_type == "TEXT" and tbl_schema[col_name].get("unique", False):
# Create unique index for unique text columns
@@ -115,7 +120,7 @@ else:
# Store index information in init_state.db
idx_type = "unique" if "UNIQUE" in create_stmt else "single"
cur_init.execute(f"""
INSERT INTO indexes (idx_name, tbl_name, idx_type, cols)
INSERT INTO indexes (idx_name, tbl_name, idx_type, cols)
VALUES ('{index_name}', 'tbl_{selected_tbl}', '{idx_type}', '{col_name}')
""")
con_init.commit()

View File

@@ -0,0 +1,152 @@
#!/usr/bin/env -S python3 -u
import json
import string
import turso
from antithesis.random import get_random, random_choice
# Get initial state
try:
con_init = turso.connect("init_state.db")
except Exception as e:
print(f"Error connecting to database: {e}")
exit(0)
cur_init = con_init.cursor()
# Connect to the main database
try:
con = turso.connect("stress_composer.db", experimental_indexes=True)
except Exception as e:
print(f"Failed to open stress_composer.db. Exiting... {e}")
exit(0)
cur = con.cursor()
# Find the next available table number
existing_tables = cur.execute("""
SELECT name FROM sqlite_master
WHERE type = 'table' AND name LIKE 'tbl_%'
""").fetchall()
# Extract table numbers
table_numbers = set()
for (name,) in existing_tables:
if name.startswith("tbl_"):
try:
table_numbers.add(int(name[4:]))
except ValueError:
pass
# Find next available table number
next_table_num = 0
while next_table_num in table_numbers:
next_table_num += 1
print(f"Creating new table: tbl_{next_table_num}")
# Define possible data types and constraints
data_types = ["INTEGER", "REAL", "TEXT", "BLOB", "NUMERIC"]
constraints = ["", "NOT NULL", "DEFAULT 0", "DEFAULT ''", "UNIQUE", "CHECK (col_0 > 0)"]
# Generate random number of columns (2-10)
col_count = 2 + (get_random() % 9)
# Select primary key column
pk = get_random() % col_count
# Build schema
schema = {"table": next_table_num, "colCount": col_count, "pk": pk}
cols = []
for j in range(col_count):
col_data_type = random_choice(data_types)
col_constraint_1 = random_choice(constraints)
col_constraint_2 = random_choice(constraints)
# Primary key handling
if j == pk:
col_def = f"col_{j} {col_data_type} PRIMARY KEY"
schema[f"col_{j}"] = {
"data_type": col_data_type,
"constraint1": "",
"constraint2": "NOT NULL",
}
else:
# Ensure constraints are different if both are selected
if col_constraint_2 == col_constraint_1:
col_constraint_2 = ""
col_def = f"col_{j} {col_data_type}"
if col_constraint_1:
col_def += f" {col_constraint_1}"
if col_constraint_2:
col_def += f" {col_constraint_2}"
schema[f"col_{j}"] = {
"data_type": col_data_type,
"constraint1": col_constraint_1,
"constraint2": col_constraint_2,
}
cols.append(col_def)
cols_str = ", ".join(cols)
try:
# Create the table
create_stmt = f"CREATE TABLE tbl_{next_table_num} ({cols_str})"
print(f"Creating table with {col_count} columns")
print(f"Schema: {create_stmt}")
cur.execute(create_stmt)
# Store schema information
cur_init.execute(f"""
INSERT INTO schemas (schema, tbl)
VALUES ('{json.dumps(schema)}', {next_table_num})
""")
con_init.commit()
con.commit()
print(f"Successfully created table tbl_{next_table_num}")
# Optionally create some initial indexes (20% chance per non-PK column)
indexes_created = 0
for j in range(col_count):
if j != pk and get_random() % 5 == 0: # 20% chance
col_name = f"col_{j}"
col_info = schema[col_name]
index_suffix = "".join(random_choice(string.ascii_lowercase) for _ in range(4))
if col_info["data_type"] in ["INTEGER", "REAL", "NUMERIC"]:
idx_name = f"idx_tbl{next_table_num}_col{j}_{index_suffix}"
else:
idx_name = f"idx_tbl{next_table_num}_col{j}_text_{index_suffix}"
try:
cur.execute(f"CREATE INDEX {idx_name} ON tbl_{next_table_num} ({col_name})")
cur_init.execute(f"""
INSERT INTO indexes (idx_name, tbl_name, idx_type, cols)
VALUES ('{idx_name}', 'tbl_{next_table_num}', 'single', '{col_name}')
""")
indexes_created += 1
print(f"Created index: {idx_name}")
except Exception as e:
print(f"Failed to create index {idx_name}: {e}")
if indexes_created > 0:
con_init.commit()
con.commit()
print(f"Created {indexes_created} initial indexes")
except turso.OperationalError as e:
print(f"Failed to create table: {e}")
con.rollback()
con_init.rollback()
con.close()
con_init.close()

View File

@@ -15,9 +15,16 @@ except Exception as e:
cur_init = con_init.cursor()
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
# Get all existing tables from schemas
existing_schemas = cur_init.execute("SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables found in schemas")
exit(0)
# Select a random table
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
# get primary key column
pk = tbl_schema["pk"]

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env -S python3 -u
import json
import turso
from antithesis.random import get_random
@@ -25,8 +24,8 @@ cur = con.cursor()
# Get all user-created indexes (excluding automatic indexes)
existing_indexes = cur.execute("""
SELECT name, tbl_name FROM sqlite_master
WHERE type = 'index'
SELECT name, tbl_name FROM sqlite_master
WHERE type = 'index'
AND sql IS NOT NULL
AND name NOT LIKE 'sqlite_%'
""").fetchall()
@@ -47,14 +46,14 @@ try:
drop_stmt = f"DROP INDEX {index_name}"
print(f"Dropping index: {drop_stmt}")
cur.execute(drop_stmt)
# Remove index information from init_state.db
cur_init.execute(f"""
DELETE FROM indexes
DELETE FROM indexes
WHERE idx_name = '{index_name}'
""")
con_init.commit()
print(f"Successfully dropped index: {index_name}")
except turso.OperationalError as e:
print(f"Failed to drop index: {e}")
@@ -65,4 +64,4 @@ except Exception as e:
con.commit()
con.close()
con_init.close()
con_init.close()

View File

@@ -15,9 +15,16 @@ except Exception as e:
cur_init = con_init.cursor()
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
# Get all existing tables from schemas
existing_schemas = cur_init.execute("SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables found in schemas")
exit(0)
# Select a random table
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
cols = ", ".join([f"col_{col}" for col in range(tbl_schema["colCount"])])
try:

View File

@@ -15,9 +15,16 @@ except Exception as e:
cur_init = con_init.cursor()
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
# Get all existing tables from schemas
existing_schemas = cur_init.execute("SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables found in schemas")
exit(0)
# Select a random table
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
cols = ", ".join([f"col_{col}" for col in range(tbl_schema["colCount"])])
try:

View File

@@ -15,9 +15,16 @@ except Exception as e:
cur_init = con_init.cursor()
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
# Get all existing tables from schemas
existing_schemas = cur_init.execute("SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables found in schemas")
exit(0)
# Select a random table
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
cols = ", ".join([f"col_{col}" for col in range(tbl_schema["colCount"])])
try:

View File

@@ -15,9 +15,16 @@ except Exception as e:
cur_init = con_init.cursor()
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
# Get all existing tables from schemas
existing_schemas = cur_init.execute("SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables found in schemas")
exit(0)
# Select a random table
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
tbl_name = f"tbl_{selected_tbl}"

View File

@@ -15,9 +15,16 @@ except Exception as e:
cur_init = con_init.cursor()
tbl_len = cur_init.execute("SELECT count FROM tables").fetchone()[0]
selected_tbl = get_random() % tbl_len
tbl_schema = json.loads(cur_init.execute(f"SELECT schema FROM schemas WHERE tbl = {selected_tbl}").fetchone()[0])
# Get all existing tables from schemas
existing_schemas = cur_init.execute("SELECT tbl, schema FROM schemas").fetchall()
if not existing_schemas:
print("No tables found in schemas")
exit(0)
# Select a random table
selected_idx = get_random() % len(existing_schemas)
selected_tbl, schema_json = existing_schemas[selected_idx]
tbl_schema = json.loads(schema_json)
# get primary key column
pk = tbl_schema["pk"]