Merge 'sim: improve simulator' from Pere Diaz Bou

Newly improved simulator with CREATE TABLE, INSERT, SELECT, connection
management, etc...
This new simulator is finding a bunch of bugs with write path so after
this PR I will start fixing those.

Closes #405
This commit is contained in:
Pekka Enberg
2024-11-20 08:59:13 +02:00
10 changed files with 546 additions and 72 deletions

13
Cargo.lock generated
View File

@@ -45,6 +45,15 @@ version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9"
[[package]]
name = "anarchist-readable-name-generator-lib"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18a1e15a87b13ae79e04e07b3714fc41d5f6993dff11662fdbe0b207c6ad0fe0"
dependencies = [
"rand",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
@@ -1147,9 +1156,13 @@ dependencies = [
name = "limbo_sim"
version = "0.0.6"
dependencies = [
"anarchist-readable-name-generator-lib",
"env_logger 0.10.2",
"limbo_core",
"log",
"rand",
"rand_chacha",
"tempfile",
]
[[package]]

View File

@@ -93,15 +93,18 @@ fn main() -> anyhow::Result<()> {
// At prompt, increment interrupt count
if interrupt_count.fetch_add(1, Ordering::SeqCst) >= 1 {
eprintln!("Interrupted. Exiting...");
conn.close()?;
break;
}
println!("Use .quit to exit or press Ctrl-C again to force quit.");
continue;
}
Err(ReadlineError::Eof) => {
conn.close()?;
break;
}
Err(err) => {
conn.close()?;
anyhow::bail!(err)
}
}

View File

@@ -147,6 +147,7 @@ pub fn maybe_init_database_file(file: &Rc<dyn File>, io: &Arc<dyn IO>) -> Result
&page1,
storage::sqlite3_ondisk::PageType::TableLeaf,
&db_header,
DATABASE_HEADER_SIZE,
);
let mut page = page1.borrow_mut();
@@ -294,18 +295,17 @@ impl Connection {
self.pager.clear_page_cache();
Ok(())
}
}
impl Drop for Connection {
fn drop(&mut self) {
/// Close a connection and checkpoint.
pub fn close(&self) -> Result<()> {
loop {
// TODO: make this async?
match self.pager.checkpoint().unwrap() {
match self.pager.checkpoint()? {
CheckpointStatus::Done => {
return;
return Ok(());
}
CheckpointStatus::IO => {
self.pager.io.run_once().unwrap();
self.pager.io.run_once()?;
}
};
}

View File

@@ -2,8 +2,8 @@ use log::debug;
use crate::storage::pager::{Page, Pager};
use crate::storage::sqlite3_ondisk::{
read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell,
read_btree_cell, read_record, read_varint, write_varint, BTreeCell, DatabaseHeader,
PageContent, PageType, TableInteriorCell, TableLeafCell,
};
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue, SeekKey, SeekOp};
use crate::Result;
@@ -12,7 +12,9 @@ use std::cell::{Ref, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use super::sqlite3_ondisk::{write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell};
use super::sqlite3_ondisk::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE,
};
/*
These are offsets of fields in the header of a b-tree page.
@@ -591,6 +593,10 @@ impl BTreeCursor {
let overflow = {
let mut page = page_ref.borrow_mut();
let contents = page.contents.as_mut().unwrap();
log::debug!(
"insert_into_page(overflow, cell_count={})",
contents.cell_count()
);
self.insert_into_cell(contents, cell_payload.as_slice(), cell_idx);
contents.overflow_cells.len()
@@ -738,7 +744,6 @@ impl BTreeCursor {
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
// can be a "rightmost pointer" or a "cell".
// TODO(pere): simplify locking...
// we always asumme there is a parent
let current_page = self.stack.top();
let mut page_rc = current_page.borrow_mut();
@@ -800,7 +805,7 @@ impl BTreeCursor {
"indexes still not supported "
);
let right_page_ref = self.allocate_page(page.page_type());
let right_page_ref = self.allocate_page(page.page_type(), 0);
let right_page = right_page_ref.borrow_mut();
let right_page_id = right_page.id;
@@ -814,11 +819,7 @@ impl BTreeCursor {
.borrow_mut()
.push(right_page_ref.clone());
debug!(
"splitting left={} right={}",
self.stack.current(),
right_page_id
);
debug!("splitting left={} right={}", page_rc.id, right_page_id);
self.write_info.state = WriteState::BalanceGetParentPage;
Ok(CursorResult::Ok(()))
@@ -906,7 +907,9 @@ impl BTreeCursor {
contents.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start);
contents.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0);
contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
if !contents.is_leaf() {
contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
}
}
// distribute cells
@@ -915,8 +918,15 @@ impl BTreeCursor {
let mut current_cell_index = 0_usize;
let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */
debug!(
"balance_leaf::distribute(cells={}, cells_per_page={})",
scratch_cells.len(),
cells_per_page
);
for (i, page) in new_pages.iter_mut().enumerate() {
let mut page = page.borrow_mut();
let page_id = page.id;
let contents = page.contents.as_mut().unwrap();
let last_page = i == new_pages_len - 1;
@@ -926,9 +936,15 @@ impl BTreeCursor {
} else {
cells_per_page
};
debug!(
"balance_leaf::distribute(page={}, cells_to_copy={})",
page_id, cells_to_copy
);
let cell_index_range = current_cell_index..current_cell_index + cells_to_copy;
for (j, cell_idx) in cell_index_range.enumerate() {
debug!("balance_leaf::distribute_in_page(page={}, cells_to_copy={}, j={}, cell_idx={})", page_id, cells_to_copy, j, cell_idx);
let cell = scratch_cells[cell_idx];
self.insert_into_cell(contents, cell, j);
}
@@ -982,7 +998,6 @@ impl BTreeCursor {
{
let mut page = page.borrow_mut();
let contents = page.contents.as_mut().unwrap();
assert!(contents.cell_count() > 1);
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell = read_btree_cell(
@@ -1044,15 +1059,35 @@ impl BTreeCursor {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
let is_page_1 = {
let current_root = self.stack.top();
let current_root_ref = current_root.borrow();
current_root_ref.id == 1
};
let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 };
let new_root_page_ref = self.allocate_page(PageType::TableInterior, offset);
{
let current_root = self.stack.top();
let current_root_ref = current_root.borrow();
let current_root_contents = current_root_ref.contents.as_ref().unwrap();
let mut new_root_page = new_root_page_ref.borrow_mut();
let new_root_page_id = new_root_page.id;
let new_root_page_contents = new_root_page.contents.as_mut().unwrap();
if is_page_1 {
// Copy header
let current_root_buf = current_root_contents.as_ptr();
let new_root_buf = new_root_page_contents.as_ptr();
new_root_buf[0..DATABASE_HEADER_SIZE]
.copy_from_slice(&current_root_buf[0..DATABASE_HEADER_SIZE]);
}
// point new root right child to previous root
new_root_page_contents
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32);
new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
// TODO:: this page should have offset
// copy header bytes to here
}
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
@@ -1060,15 +1095,34 @@ impl BTreeCursor {
let (root_id, child_id, child) = {
let page_ref = self.stack.top();
let child = page_ref.clone();
let mut page_rc = page_ref.borrow_mut();
let mut child_rc = page_ref.borrow_mut();
let mut new_root_page = new_root_page_ref.borrow_mut();
// Swap the entire Page structs
std::mem::swap(&mut page_rc.id, &mut new_root_page.id);
std::mem::swap(&mut child_rc.id, &mut new_root_page.id);
// TODO:: shift bytes by offset to left on child because now child has offset 100
// and header bytes
// Also change the offset of page
//
if is_page_1 {
// Remove header from child and set offset to 0
let contents = child_rc.contents.as_mut().unwrap();
let (cell_pointer_offset, _) = contents.cell_get_raw_pointer_region();
// change cell pointers
for cell_idx in 0..contents.cell_count() {
let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset;
let pc = contents.read_u16(cell_pointer_offset);
contents.write_u16(cell_pointer_offset, pc - offset as u16);
}
contents.offset = 0;
let buf = contents.as_ptr();
buf.copy_within(DATABASE_HEADER_SIZE.., 0);
}
self.pager.add_dirty(new_root_page.id);
self.pager.add_dirty(page_rc.id);
(new_root_page.id, page_rc.id, child)
self.pager.add_dirty(child_rc.id);
(new_root_page.id, child_rc.id, child)
};
debug!("Balancing root. root={}, rightmost={}", root_id, child_id);
@@ -1084,9 +1138,9 @@ impl BTreeCursor {
}
}
fn allocate_page(&self, page_type: PageType) -> Rc<RefCell<Page>> {
fn allocate_page(&self, page_type: PageType, offset: usize) -> Rc<RefCell<Page>> {
let page = self.pager.allocate_page().unwrap();
btree_init_page(&page, page_type, &*self.database_header.borrow());
btree_init_page(&page, page_type, &*self.database_header.borrow(), offset);
page
}
@@ -1142,7 +1196,9 @@ impl BTreeCursor {
}
fn defragment_page(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) {
log::debug!("defragment_page");
let cloned_page = page.clone();
// TODO(pere): usable space should include offset probably
let usable_space = (db_header.page_size - db_header.unused_space as u16) as u64;
let mut cbrk = usable_space;
@@ -1220,10 +1276,9 @@ impl BTreeCursor {
let write_buf = page.as_ptr();
// set new first byte of cell content
write_buf[5..7].copy_from_slice(&(cbrk as u16).to_be_bytes());
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cbrk as u16);
// set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start
write_buf[1] = 0;
write_buf[2] = 0;
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);
// set unused space to 0
let first_cell = cloned_page.cell_content_area() as u64;
assert!(first_cell <= cbrk);
@@ -1234,6 +1289,7 @@ impl BTreeCursor {
// and end of cell pointer area.
#[allow(unused_assignments)]
fn compute_free_space(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) -> u16 {
// TODO(pere): maybe free space is not calculated correctly with offset
let buf = page.as_ptr();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
@@ -1247,7 +1303,8 @@ impl BTreeCursor {
let ncell = page.cell_count();
// 8 + 4 == header end
let first_cell = (page.offset + 8 + 4 + (2 * ncell)) as u16;
let child_pointer_size = if page.is_leaf() { 0 } else { 4 };
let first_cell = (page.offset + 8 + child_pointer_size + (2 * ncell)) as u16;
let mut nfree = fragmented_free_bytes as usize + first_byte_in_cell_content as usize;
@@ -1284,7 +1341,7 @@ impl BTreeCursor {
// return SQLITE_CORRUPT_PAGE(pPage);
// }
// don't count header and cell pointers?
nfree -= first_cell as usize;
nfree = nfree - first_cell as usize;
nfree as u16
}
@@ -1313,12 +1370,18 @@ impl BTreeCursor {
}
let max_local = self.max_local(page_type.clone());
log::debug!(
"fill_cell_payload(record_size={}, max_local={})",
record_buf.len(),
max_local
);
if record_buf.len() <= max_local {
// enough allowed space to fit inside a btree page
cell_payload.extend_from_slice(record_buf.as_slice());
cell_payload.resize(cell_payload.len() + 4, 0);
return;
}
log::debug!("fill_cell_payload(overflow)");
let min_local = self.min_local(page_type);
let mut space_left = min_local + (record_buf.len() - min_local) % (self.usable_space() - 4);
@@ -1686,17 +1749,23 @@ impl Cursor for BTreeCursor {
flags,
),
};
let page = self.allocate_page(page_type);
let page = self.allocate_page(page_type, 0);
let id = page.borrow().id;
id as u32
}
}
pub fn btree_init_page(page: &Rc<RefCell<Page>>, page_type: PageType, db_header: &DatabaseHeader) {
pub fn btree_init_page(
page: &Rc<RefCell<Page>>,
page_type: PageType,
db_header: &DatabaseHeader,
offset: usize,
) {
// setup btree page
let mut contents = page.borrow_mut();
debug!("allocating page {}", contents.id);
debug!("btree_init_page(id={}, offset={})", contents.id, offset);
let contents = contents.contents.as_mut().unwrap();
contents.offset = offset;
let id = page_type as u8;
contents.write_u8(BTREE_HEADER_OFFSET_TYPE, id);
contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);

View File

@@ -139,8 +139,8 @@ impl DumbLruPageCache {
}
pub fn insert(&mut self, key: usize, value: Rc<RefCell<Page>>) {
debug!("cache_insert(key={})", key);
self._delete(key, false);
debug!("cache_insert(key={})", key);
let mut entry = Box::new(PageCacheEntry {
key,
next: None,
@@ -660,6 +660,7 @@ pub fn allocate_page(
bp.put(buf);
});
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
page.set_loaded();
page.contents = Some(PageContent {
offset,
buffer,

View File

@@ -320,11 +320,11 @@ impl Clone for PageContent {
impl PageContent {
pub fn page_type(&self) -> PageType {
self.read_u8(self.offset).try_into().unwrap()
self.read_u8(0).try_into().unwrap()
}
pub fn maybe_page_type(&self) -> Option<PageType> {
match self.read_u8(self.offset).try_into() {
match self.read_u8(0).try_into() {
Ok(v) => Some(v),
Err(_) => None, // this could be an overflow page
}
@@ -342,7 +342,7 @@ impl PageContent {
fn read_u8(&self, pos: usize) -> u8 {
let buf = self.as_ptr();
buf[pos]
buf[self.offset + pos]
}
pub fn read_u16(&self, pos: usize) -> u16 {
@@ -361,16 +361,19 @@ impl PageContent {
}
pub fn write_u8(&self, pos: usize, value: u8) {
log::debug!("write_u8(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos] = value;
}
pub fn write_u16(&self, pos: usize, value: u16) {
log::debug!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes());
}
pub fn write_u32(&self, pos: usize, value: u32) {
log::debug!("write_u32(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
}
@@ -408,6 +411,7 @@ impl PageContent {
min_local: usize,
usable_size: usize,
) -> Result<BTreeCell> {
log::debug!("cell_get(idx={})", idx);
let buf = self.as_ptr();
let ncells = self.cell_count();
@@ -432,6 +436,7 @@ impl PageContent {
)
}
/// When using this fu
pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) {
let cell_start = match self.page_type() {
PageType::IndexInterior => 12,

View File

@@ -2128,6 +2128,7 @@ impl Program {
cursor, rowid_reg, ..
} => {
let cursor = cursors.get_mut(cursor).unwrap();
// TODO: make io handle rng
let rowid = get_new_rowid(cursor, thread_rng())?;
match rowid {
CursorResult::Ok(rowid) => {

View File

@@ -18,3 +18,7 @@ path = "main.rs"
limbo_core = { path = "../core" }
rand = "0.8.5"
rand_chacha = "0.3.1"
log = "0.4.20"
tempfile = "3.0.7"
env_logger = "0.10.1"
anarchist-readable-name-generator-lib = "0.1.2"

View File

@@ -1,57 +1,371 @@
use limbo_core::{Database, File, OpenFlags, PlatformIO, Result, IO};
use limbo_core::{
Connection, Database, File, LimboError, OpenFlags, PlatformIO, Result, Row, RowResult, IO,
};
use log;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use tempfile::TempDir;
use anarchist_readable_name_generator_lib::{readable_name, readable_name_custom};
struct SimulatorEnv {
opts: SimulatorOpts,
tables: Vec<Table>,
connections: Vec<SimConnection>,
io: Arc<SimulatorIO>,
db: Rc<Database>,
rng: ChaCha8Rng,
}
#[derive(Clone)]
enum SimConnection {
Connected(Rc<Connection>),
Disconnected,
}
#[derive(Debug)]
struct SimulatorOpts {
ticks: usize,
max_connections: usize,
max_tables: usize,
seed: u64,
// this next options are the distribution of workload where read_percent + write_percent +
// delete_percent == 100%
read_percent: usize,
write_percent: usize,
delete_percent: usize,
page_size: usize,
}
struct Table {
rows: Vec<Vec<Value>>,
name: String,
columns: Vec<Column>,
}
#[derive(Clone)]
struct Column {
name: String,
column_type: ColumnType,
primary: bool,
unique: bool,
}
#[derive(Clone)]
enum ColumnType {
Integer,
Float,
Text,
Blob,
}
#[derive(Debug, PartialEq)]
enum Value {
Null,
Integer(i64),
Float(f64),
Text(String),
Blob(Vec<u8>),
}
#[allow(clippy::arc_with_non_send_sync)]
fn main() {
let _ = env_logger::try_init();
let seed = match std::env::var("SEED") {
Ok(seed) => seed.parse::<u64>().unwrap(),
Err(_) => rand::thread_rng().next_u64(),
};
println!("Seed: {}", seed);
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let io = Arc::new(SimulatorIO::new(seed).unwrap());
let test_path = "./testing/testing.db";
let db = match Database::open_file(io.clone(), test_path) {
let (read_percent, write_percent, delete_percent) = {
let mut remaining = 100;
let read_percent = rng.gen_range(0..=remaining);
remaining -= read_percent;
let write_percent = rng.gen_range(0..=remaining);
remaining -= write_percent;
let delete_percent = remaining;
(read_percent, write_percent, delete_percent)
};
let opts = SimulatorOpts {
ticks: rng.gen_range(0..4096),
max_connections: 1, // TODO: for now let's use one connection as we didn't implement
// correct transactions procesing
max_tables: rng.gen_range(0..128),
seed,
read_percent,
write_percent,
delete_percent,
page_size: 4096, // TODO: randomize this too
};
let io = Arc::new(SimulatorIO::new(seed, opts.page_size).unwrap());
let mut path = TempDir::new().unwrap().into_path();
path.push("simulator.db");
println!("path to db '{:?}'", path);
let db = match Database::open_file(io.clone(), path.as_path().to_str().unwrap()) {
Ok(db) => db,
Err(e) => {
panic!("error opening database test file {}: {:?}", test_path, e);
panic!("error opening simulator test file {:?}: {:?}", path, e);
}
};
for _ in 0..100 {
let conn = db.connect();
let mut stmt = conn.prepare("SELECT * FROM users").unwrap();
let mut rows = stmt.query().unwrap();
'rows_loop: loop {
io.inject_fault(rng.gen_bool(0.5));
match rows.next_row() {
Ok(result) => {
match result {
limbo_core::RowResult::Row(_row) => {
// TODO: assert that data is correct
}
limbo_core::RowResult::IO => {
io.inject_fault(rng.gen_bool(0.01));
if io.run_once().is_err() {
break 'rows_loop;
}
}
limbo_core::RowResult::Done => {
let connections = vec![SimConnection::Disconnected; opts.max_connections];
let mut env = SimulatorEnv {
opts,
tables: Vec::new(),
connections,
rng,
io,
db,
};
println!("Initial opts {:?}", env.opts);
for _ in 0..env.opts.ticks {
let connection_index = env.rng.gen_range(0..env.opts.max_connections);
let mut connection = env.connections[connection_index].clone();
match &mut connection {
SimConnection::Connected(conn) => {
let disconnect = env.rng.gen_ratio(1, 100);
if disconnect {
log::info!("disconnecting {}", connection_index);
let _ = conn.close();
env.connections[connection_index] = SimConnection::Disconnected;
} else {
match process_connection(&mut env, conn) {
Ok(_) => {}
Err(err) => {
log::error!("error {}", err);
break;
}
}
}
Err(_) => {
continue;
}
}
SimConnection::Disconnected => {
log::info!("disconnecting {}", connection_index);
env.connections[connection_index] = SimConnection::Connected(env.db.connect());
}
}
stmt.reset();
}
io.print_fault_stats();
env.io.print_stats();
}
fn process_connection(env: &mut SimulatorEnv, conn: &mut Rc<Connection>) -> Result<()> {
let management = env.rng.gen_ratio(1, 100);
if management {
// for now create table only
maybe_add_table(env, conn)?;
} else if env.tables.is_empty() {
maybe_add_table(env, conn)?;
} else {
let roll = env.rng.gen_range(0..100);
if roll < env.opts.read_percent {
// read
do_select(env, conn)?;
} else if roll < env.opts.read_percent + env.opts.write_percent {
// write
do_write(env, conn)?;
} else {
// delete
// TODO
}
}
Ok(())
}
fn do_select(env: &mut SimulatorEnv, conn: &mut Rc<Connection>) -> Result<()> {
let table = env.rng.gen_range(0..env.tables.len());
let table_name = {
let table = &env.tables[table];
table.name.clone()
};
let rows = get_all_rows(env, conn, format!("SELECT * FROM {}", table_name).as_str())?;
let table = &env.tables[table];
compare_equal_rows(&table.rows, &rows);
Ok(())
}
fn do_write(env: &mut SimulatorEnv, conn: &mut Rc<Connection>) -> Result<()> {
let mut query = String::new();
let table = env.rng.gen_range(0..env.tables.len());
{
let table = &env.tables[table];
query.push_str(format!("INSERT INTO {} VALUES (", table.name).as_str());
}
let columns = env.tables[table].columns.clone();
let mut row = Vec::new();
// gen insert query
for column in &columns {
let value = match column.column_type {
ColumnType::Integer => Value::Integer(env.rng.gen_range(std::i64::MIN..std::i64::MAX)),
ColumnType::Float => Value::Float(env.rng.gen_range(-1e10..1e10)),
ColumnType::Text => Value::Text(gen_random_text(env)),
ColumnType::Blob => Value::Blob(gen_random_text(env).as_bytes().to_vec()),
};
query.push_str(value.to_string().as_str());
query.push(',');
row.push(value);
}
let table = &mut env.tables[table];
table.rows.push(row);
query.pop();
query.push_str(");");
let _ = get_all_rows(env, conn, query.as_str())?;
Ok(())
}
fn compare_equal_rows(a: &Vec<Vec<Value>>, b: &Vec<Vec<Value>>) {
assert_eq!(a.len(), b.len(), "lengths are different");
for (r1, r2) in a.iter().zip(b) {
for (v1, v2) in r1.iter().zip(r2) {
assert_eq!(v1, v2, "values are different");
}
}
}
fn maybe_add_table(env: &mut SimulatorEnv, conn: &mut Rc<Connection>) -> Result<()> {
if env.tables.len() < env.opts.max_tables {
let table = Table {
rows: Vec::new(),
name: gen_random_name(env),
columns: gen_columns(env),
};
let rows = get_all_rows(env, conn, table.to_create_str().as_str())?;
log::debug!("{:?}", rows);
let rows = get_all_rows(
env,
conn,
format!(
"SELECT sql FROM sqlite_schema WHERE type IN ('table', 'index') AND name = '{}';",
table.name
)
.as_str(),
)?;
log::debug!("{:?}", rows);
assert!(rows.len() == 1);
let as_text = match &rows[0][0] {
Value::Text(t) => t,
_ => unreachable!(),
};
assert!(
*as_text != table.to_create_str(),
"table was not inserted correctly"
);
env.tables.push(table);
}
Ok(())
}
fn gen_random_name(env: &mut SimulatorEnv) -> String {
let name = readable_name_custom("_", &mut env.rng);
name.replace("-", "_")
}
fn gen_random_text(env: &mut SimulatorEnv) -> String {
let big_text = env.rng.gen_ratio(1, 1000);
if big_text {
let max_size: u64 = 2 * 1024 * 1024 * 1024;
let size = env.rng.gen_range(1024..max_size);
let mut name = String::new();
for i in 0..size {
name.push(((i % 26) as u8 + 'A' as u8) as char);
}
name
} else {
let name = readable_name_custom("_", &mut env.rng);
name.replace("-", "_")
}
}
fn gen_columns(env: &mut SimulatorEnv) -> Vec<Column> {
let mut column_range = env.rng.gen_range(1..128);
let mut columns = Vec::new();
while column_range > 0 {
let column_type = match env.rng.gen_range(0..4) {
0 => ColumnType::Integer,
1 => ColumnType::Float,
2 => ColumnType::Text,
3 => ColumnType::Blob,
_ => unreachable!(),
};
let column = Column {
name: gen_random_name(env),
column_type,
primary: false,
unique: false,
};
columns.push(column);
column_range -= 1;
}
columns
}
fn get_all_rows(
env: &mut SimulatorEnv,
conn: &mut Rc<Connection>,
query: &str,
) -> Result<Vec<Vec<Value>>> {
log::info!("running query '{}'", &query[0..query.len().min(4096)]);
let mut out = Vec::new();
let rows = conn.query(query);
if rows.is_err() {
let err = rows.err();
log::error!(
"Error running query '{}': {:?}",
&query[0..query.len().min(4096)],
err
);
return Err(err.unwrap());
}
let rows = rows.unwrap();
assert!(rows.is_some());
let mut rows = rows.unwrap();
'rows_loop: loop {
env.io.inject_fault(env.rng.gen_ratio(1, 10000));
match rows.next_row()? {
RowResult::Row(row) => {
let mut r = Vec::new();
for el in &row.values {
let v = match el {
limbo_core::Value::Null => Value::Null,
limbo_core::Value::Integer(i) => Value::Integer(*i),
limbo_core::Value::Float(f) => Value::Float(*f),
limbo_core::Value::Text(t) => Value::Text(t.clone().to_owned()),
limbo_core::Value::Blob(b) => Value::Blob(b.clone().to_owned()),
};
r.push(v);
}
out.push(r);
}
RowResult::IO => {
env.io.inject_fault(env.rng.gen_ratio(1, 10000));
if env.io.run_once().is_err() {
log::info!("query inject fault");
break 'rows_loop;
}
}
RowResult::Done => {
break;
}
}
}
Ok(out)
}
struct SimulatorIO {
@@ -60,10 +374,11 @@ struct SimulatorIO {
files: RefCell<Vec<Rc<SimulatorFile>>>,
rng: RefCell<ChaCha8Rng>,
nr_run_once_faults: RefCell<usize>,
page_size: usize,
}
impl SimulatorIO {
fn new(seed: u64) -> Result<Self> {
fn new(seed: u64, page_size: usize) -> Result<Self> {
let inner = Box::new(PlatformIO::new()?);
let fault = RefCell::new(false);
let files = RefCell::new(Vec::new());
@@ -75,6 +390,7 @@ impl SimulatorIO {
files,
rng,
nr_run_once_faults,
page_size,
})
}
@@ -85,10 +401,10 @@ impl SimulatorIO {
}
}
fn print_fault_stats(&self) {
fn print_stats(&self) {
println!("run_once faults: {}", self.nr_run_once_faults.borrow());
for file in self.files.borrow().iter() {
file.print_fault_stats();
file.print_stats();
}
}
}
@@ -106,6 +422,10 @@ impl IO for SimulatorIO {
fault: RefCell::new(false),
nr_pread_faults: RefCell::new(0),
nr_pwrite_faults: RefCell::new(0),
reads: RefCell::new(0),
writes: RefCell::new(0),
syncs: RefCell::new(0),
page_size: self.page_size,
});
self.files.borrow_mut().push(file.clone());
Ok(file)
@@ -136,6 +456,10 @@ struct SimulatorFile {
fault: RefCell<bool>,
nr_pread_faults: RefCell<usize>,
nr_pwrite_faults: RefCell<usize>,
writes: RefCell<usize>,
reads: RefCell<usize>,
syncs: RefCell<usize>,
page_size: usize,
}
impl SimulatorFile {
@@ -143,11 +467,14 @@ impl SimulatorFile {
self.fault.replace(fault);
}
fn print_fault_stats(&self) {
fn print_stats(&self) {
println!(
"pread faults: {}, pwrite faults: {}",
"pread faults: {}, pwrite faults: {}, reads: {}, writes: {}, syncs: {}",
*self.nr_pread_faults.borrow(),
*self.nr_pwrite_faults.borrow()
*self.nr_pwrite_faults.borrow(),
*self.reads.borrow(),
*self.writes.borrow(),
*self.syncs.borrow(),
);
}
}
@@ -178,6 +505,7 @@ impl limbo_core::File for SimulatorFile {
"Injected fault".into(),
));
}
*self.reads.borrow_mut() += 1;
self.inner.pread(pos, c)
}
@@ -193,10 +521,12 @@ impl limbo_core::File for SimulatorFile {
"Injected fault".into(),
));
}
*self.writes.borrow_mut() += 1;
self.inner.pwrite(pos, buffer, c)
}
fn sync(&self, c: Rc<limbo_core::Completion>) -> Result<()> {
*self.syncs.borrow_mut() += 1;
self.inner.sync(c)
}
@@ -210,3 +540,49 @@ impl Drop for SimulatorFile {
self.inner.unlock_file().expect("Failed to unlock file");
}
}
impl ColumnType {
pub fn as_str(&self) -> &str {
match self {
ColumnType::Integer => "INTEGER",
ColumnType::Float => "FLOAT",
ColumnType::Text => "TEXT",
ColumnType::Blob => "BLOB",
}
}
}
impl Table {
pub fn to_create_str(&self) -> String {
let mut out = String::new();
out.push_str(format!("CREATE TABLE {} (", self.name).as_str());
assert!(!self.columns.is_empty());
for column in &self.columns {
out.push_str(format!("{} {},", column.name, column.column_type.as_str()).as_str());
}
// remove last comma
out.pop();
out.push_str(");");
out
}
}
impl Value {
pub fn to_string(&self) -> String {
match self {
Value::Null => "NULL".to_string(),
Value::Integer(i) => i.to_string(),
Value::Float(f) => f.to_string(),
Value::Text(t) => format!("'{}'", t.clone()),
Value::Blob(vec) => to_sqlite_blob(&vec),
}
}
}
fn to_sqlite_blob(bytes: &Vec<u8>) -> String {
let hex: String = bytes.iter().map(|b| format!("{:02X}", b)).collect();
format!("X'{}'", hex)
}

View File

@@ -367,6 +367,7 @@ mod tests {
let conn = tmp_db.connect_limbo();
insert(1, &conn, &tmp_db).unwrap();
assert_eq!(count(&conn, &tmp_db).unwrap(), 1);
conn.close()?;
}
{
let conn = tmp_db.connect_limbo();
@@ -375,6 +376,7 @@ mod tests {
1,
"failed to read from wal from another connection"
);
conn.close()?;
}
Ok(())
}