diff --git a/Cargo.lock b/Cargo.lock index 14a647fd5..f56378f96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,15 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +[[package]] +name = "anarchist-readable-name-generator-lib" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18a1e15a87b13ae79e04e07b3714fc41d5f6993dff11662fdbe0b207c6ad0fe0" +dependencies = [ + "rand", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -1147,9 +1156,13 @@ dependencies = [ name = "limbo_sim" version = "0.0.6" dependencies = [ + "anarchist-readable-name-generator-lib", + "env_logger 0.10.2", "limbo_core", + "log", "rand", "rand_chacha", + "tempfile", ] [[package]] diff --git a/cli/main.rs b/cli/main.rs index fcb339530..605e3cd2a 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -93,15 +93,18 @@ fn main() -> anyhow::Result<()> { // At prompt, increment interrupt count if interrupt_count.fetch_add(1, Ordering::SeqCst) >= 1 { eprintln!("Interrupted. Exiting..."); + conn.close()?; break; } println!("Use .quit to exit or press Ctrl-C again to force quit."); continue; } Err(ReadlineError::Eof) => { + conn.close()?; break; } Err(err) => { + conn.close()?; anyhow::bail!(err) } } diff --git a/core/lib.rs b/core/lib.rs index 21650f46d..c1dccaa14 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -147,6 +147,7 @@ pub fn maybe_init_database_file(file: &Rc, io: &Arc) -> Result &page1, storage::sqlite3_ondisk::PageType::TableLeaf, &db_header, + DATABASE_HEADER_SIZE, ); let mut page = page1.borrow_mut(); @@ -294,18 +295,17 @@ impl Connection { self.pager.clear_page_cache(); Ok(()) } -} -impl Drop for Connection { - fn drop(&mut self) { + /// Close a connection and checkpoint. + pub fn close(&self) -> Result<()> { loop { // TODO: make this async? - match self.pager.checkpoint().unwrap() { + match self.pager.checkpoint()? { CheckpointStatus::Done => { - return; + return Ok(()); } CheckpointStatus::IO => { - self.pager.io.run_once().unwrap(); + self.pager.io.run_once()?; } }; } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 3a60937ea..8cc392f60 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2,8 +2,8 @@ use log::debug; use crate::storage::pager::{Page, Pager}; use crate::storage::sqlite3_ondisk::{ - read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, - TableInteriorCell, TableLeafCell, + read_btree_cell, read_record, read_varint, write_varint, BTreeCell, DatabaseHeader, + PageContent, PageType, TableInteriorCell, TableLeafCell, }; use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue, SeekKey, SeekOp}; use crate::Result; @@ -12,7 +12,9 @@ use std::cell::{Ref, RefCell}; use std::pin::Pin; use std::rc::Rc; -use super::sqlite3_ondisk::{write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell}; +use super::sqlite3_ondisk::{ + write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE, +}; /* These are offsets of fields in the header of a b-tree page. @@ -591,6 +593,10 @@ impl BTreeCursor { let overflow = { let mut page = page_ref.borrow_mut(); let contents = page.contents.as_mut().unwrap(); + log::debug!( + "insert_into_page(overflow, cell_count={})", + contents.cell_count() + ); self.insert_into_cell(contents, cell_payload.as_slice(), cell_idx); contents.overflow_cells.len() @@ -738,7 +744,6 @@ impl BTreeCursor { // NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer). // Right pointer means cell that points to the last page, as we don't really want to drop this one. This one // can be a "rightmost pointer" or a "cell". - // TODO(pere): simplify locking... // we always asumme there is a parent let current_page = self.stack.top(); let mut page_rc = current_page.borrow_mut(); @@ -800,7 +805,7 @@ impl BTreeCursor { "indexes still not supported " ); - let right_page_ref = self.allocate_page(page.page_type()); + let right_page_ref = self.allocate_page(page.page_type(), 0); let right_page = right_page_ref.borrow_mut(); let right_page_id = right_page.id; @@ -814,11 +819,7 @@ impl BTreeCursor { .borrow_mut() .push(right_page_ref.clone()); - debug!( - "splitting left={} right={}", - self.stack.current(), - right_page_id - ); + debug!("splitting left={} right={}", page_rc.id, right_page_id); self.write_info.state = WriteState::BalanceGetParentPage; Ok(CursorResult::Ok(())) @@ -906,7 +907,9 @@ impl BTreeCursor { contents.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start); contents.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0); - contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0); + if !contents.is_leaf() { + contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0); + } } // distribute cells @@ -915,8 +918,15 @@ impl BTreeCursor { let mut current_cell_index = 0_usize; let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */ + debug!( + "balance_leaf::distribute(cells={}, cells_per_page={})", + scratch_cells.len(), + cells_per_page + ); + for (i, page) in new_pages.iter_mut().enumerate() { let mut page = page.borrow_mut(); + let page_id = page.id; let contents = page.contents.as_mut().unwrap(); let last_page = i == new_pages_len - 1; @@ -926,9 +936,15 @@ impl BTreeCursor { } else { cells_per_page }; + debug!( + "balance_leaf::distribute(page={}, cells_to_copy={})", + page_id, cells_to_copy + ); let cell_index_range = current_cell_index..current_cell_index + cells_to_copy; for (j, cell_idx) in cell_index_range.enumerate() { + debug!("balance_leaf::distribute_in_page(page={}, cells_to_copy={}, j={}, cell_idx={})", page_id, cells_to_copy, j, cell_idx); + let cell = scratch_cells[cell_idx]; self.insert_into_cell(contents, cell, j); } @@ -982,7 +998,6 @@ impl BTreeCursor { { let mut page = page.borrow_mut(); let contents = page.contents.as_mut().unwrap(); - assert!(contents.cell_count() > 1); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; let cell = read_btree_cell( @@ -1044,15 +1059,35 @@ impl BTreeCursor { /* todo: balance deeper, create child and copy contents of root there. Then split root */ /* if we are in root page then we just need to create a new root and push key there */ - let new_root_page_ref = self.allocate_page(PageType::TableInterior); + let is_page_1 = { + let current_root = self.stack.top(); + let current_root_ref = current_root.borrow(); + current_root_ref.id == 1 + }; + + let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 }; + let new_root_page_ref = self.allocate_page(PageType::TableInterior, offset); { + let current_root = self.stack.top(); + let current_root_ref = current_root.borrow(); + let current_root_contents = current_root_ref.contents.as_ref().unwrap(); + let mut new_root_page = new_root_page_ref.borrow_mut(); let new_root_page_id = new_root_page.id; let new_root_page_contents = new_root_page.contents.as_mut().unwrap(); + if is_page_1 { + // Copy header + let current_root_buf = current_root_contents.as_ptr(); + let new_root_buf = new_root_page_contents.as_ptr(); + new_root_buf[0..DATABASE_HEADER_SIZE] + .copy_from_slice(¤t_root_buf[0..DATABASE_HEADER_SIZE]); + } // point new root right child to previous root new_root_page_contents .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32); new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); + // TODO:: this page should have offset + // copy header bytes to here } /* swap splitted page buffer with new root buffer so we don't have to update page idx */ @@ -1060,15 +1095,34 @@ impl BTreeCursor { let (root_id, child_id, child) = { let page_ref = self.stack.top(); let child = page_ref.clone(); - let mut page_rc = page_ref.borrow_mut(); + let mut child_rc = page_ref.borrow_mut(); let mut new_root_page = new_root_page_ref.borrow_mut(); // Swap the entire Page structs - std::mem::swap(&mut page_rc.id, &mut new_root_page.id); + std::mem::swap(&mut child_rc.id, &mut new_root_page.id); + // TODO:: shift bytes by offset to left on child because now child has offset 100 + // and header bytes + // Also change the offset of page + // + if is_page_1 { + // Remove header from child and set offset to 0 + let contents = child_rc.contents.as_mut().unwrap(); + let (cell_pointer_offset, _) = contents.cell_get_raw_pointer_region(); + // change cell pointers + for cell_idx in 0..contents.cell_count() { + let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset; + let pc = contents.read_u16(cell_pointer_offset); + contents.write_u16(cell_pointer_offset, pc - offset as u16); + } + + contents.offset = 0; + let buf = contents.as_ptr(); + buf.copy_within(DATABASE_HEADER_SIZE.., 0); + } self.pager.add_dirty(new_root_page.id); - self.pager.add_dirty(page_rc.id); - (new_root_page.id, page_rc.id, child) + self.pager.add_dirty(child_rc.id); + (new_root_page.id, child_rc.id, child) }; debug!("Balancing root. root={}, rightmost={}", root_id, child_id); @@ -1084,9 +1138,9 @@ impl BTreeCursor { } } - fn allocate_page(&self, page_type: PageType) -> Rc> { + fn allocate_page(&self, page_type: PageType, offset: usize) -> Rc> { let page = self.pager.allocate_page().unwrap(); - btree_init_page(&page, page_type, &*self.database_header.borrow()); + btree_init_page(&page, page_type, &*self.database_header.borrow(), offset); page } @@ -1142,7 +1196,9 @@ impl BTreeCursor { } fn defragment_page(&self, page: &PageContent, db_header: Ref) { + log::debug!("defragment_page"); let cloned_page = page.clone(); + // TODO(pere): usable space should include offset probably let usable_space = (db_header.page_size - db_header.unused_space as u16) as u64; let mut cbrk = usable_space; @@ -1220,10 +1276,9 @@ impl BTreeCursor { let write_buf = page.as_ptr(); // set new first byte of cell content - write_buf[5..7].copy_from_slice(&(cbrk as u16).to_be_bytes()); + page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cbrk as u16); // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start - write_buf[1] = 0; - write_buf[2] = 0; + page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0); // set unused space to 0 let first_cell = cloned_page.cell_content_area() as u64; assert!(first_cell <= cbrk); @@ -1234,6 +1289,7 @@ impl BTreeCursor { // and end of cell pointer area. #[allow(unused_assignments)] fn compute_free_space(&self, page: &PageContent, db_header: Ref) -> u16 { + // TODO(pere): maybe free space is not calculated correctly with offset let buf = page.as_ptr(); let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; @@ -1247,7 +1303,8 @@ impl BTreeCursor { let ncell = page.cell_count(); // 8 + 4 == header end - let first_cell = (page.offset + 8 + 4 + (2 * ncell)) as u16; + let child_pointer_size = if page.is_leaf() { 0 } else { 4 }; + let first_cell = (page.offset + 8 + child_pointer_size + (2 * ncell)) as u16; let mut nfree = fragmented_free_bytes as usize + first_byte_in_cell_content as usize; @@ -1284,7 +1341,7 @@ impl BTreeCursor { // return SQLITE_CORRUPT_PAGE(pPage); // } // don't count header and cell pointers? - nfree -= first_cell as usize; + nfree = nfree - first_cell as usize; nfree as u16 } @@ -1313,12 +1370,18 @@ impl BTreeCursor { } let max_local = self.max_local(page_type.clone()); + log::debug!( + "fill_cell_payload(record_size={}, max_local={})", + record_buf.len(), + max_local + ); if record_buf.len() <= max_local { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_slice()); cell_payload.resize(cell_payload.len() + 4, 0); return; } + log::debug!("fill_cell_payload(overflow)"); let min_local = self.min_local(page_type); let mut space_left = min_local + (record_buf.len() - min_local) % (self.usable_space() - 4); @@ -1686,17 +1749,23 @@ impl Cursor for BTreeCursor { flags, ), }; - let page = self.allocate_page(page_type); + let page = self.allocate_page(page_type, 0); let id = page.borrow().id; id as u32 } } -pub fn btree_init_page(page: &Rc>, page_type: PageType, db_header: &DatabaseHeader) { +pub fn btree_init_page( + page: &Rc>, + page_type: PageType, + db_header: &DatabaseHeader, + offset: usize, +) { // setup btree page let mut contents = page.borrow_mut(); - debug!("allocating page {}", contents.id); + debug!("btree_init_page(id={}, offset={})", contents.id, offset); let contents = contents.contents.as_mut().unwrap(); + contents.offset = offset; let id = page_type as u8; contents.write_u8(BTREE_HEADER_OFFSET_TYPE, id); contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index c51ea31ab..1bfbe8392 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -139,8 +139,8 @@ impl DumbLruPageCache { } pub fn insert(&mut self, key: usize, value: Rc>) { - debug!("cache_insert(key={})", key); self._delete(key, false); + debug!("cache_insert(key={})", key); let mut entry = Box::new(PageCacheEntry { key, next: None, @@ -660,6 +660,7 @@ pub fn allocate_page( bp.put(buf); }); let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn))); + page.set_loaded(); page.contents = Some(PageContent { offset, buffer, diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index da91b815b..be15ce011 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -320,11 +320,11 @@ impl Clone for PageContent { impl PageContent { pub fn page_type(&self) -> PageType { - self.read_u8(self.offset).try_into().unwrap() + self.read_u8(0).try_into().unwrap() } pub fn maybe_page_type(&self) -> Option { - match self.read_u8(self.offset).try_into() { + match self.read_u8(0).try_into() { Ok(v) => Some(v), Err(_) => None, // this could be an overflow page } @@ -342,7 +342,7 @@ impl PageContent { fn read_u8(&self, pos: usize) -> u8 { let buf = self.as_ptr(); - buf[pos] + buf[self.offset + pos] } pub fn read_u16(&self, pos: usize) -> u16 { @@ -361,16 +361,19 @@ impl PageContent { } pub fn write_u8(&self, pos: usize, value: u8) { + log::debug!("write_u8(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[self.offset + pos] = value; } pub fn write_u16(&self, pos: usize, value: u16) { + log::debug!("write_u16(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes()); } pub fn write_u32(&self, pos: usize, value: u32) { + log::debug!("write_u32(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes()); } @@ -408,6 +411,7 @@ impl PageContent { min_local: usize, usable_size: usize, ) -> Result { + log::debug!("cell_get(idx={})", idx); let buf = self.as_ptr(); let ncells = self.cell_count(); @@ -432,6 +436,7 @@ impl PageContent { ) } + /// When using this fu pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) { let cell_start = match self.page_type() { PageType::IndexInterior => 12, diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 4eedaf879..477a5c797 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -2128,6 +2128,7 @@ impl Program { cursor, rowid_reg, .. } => { let cursor = cursors.get_mut(cursor).unwrap(); + // TODO: make io handle rng let rowid = get_new_rowid(cursor, thread_rng())?; match rowid { CursorResult::Ok(rowid) => { diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index afa718e41..51351aee2 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -18,3 +18,7 @@ path = "main.rs" limbo_core = { path = "../core" } rand = "0.8.5" rand_chacha = "0.3.1" +log = "0.4.20" +tempfile = "3.0.7" +env_logger = "0.10.1" +anarchist-readable-name-generator-lib = "0.1.2" diff --git a/simulator/main.rs b/simulator/main.rs index ceea26ee3..bbb9b8bc1 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -1,57 +1,371 @@ -use limbo_core::{Database, File, OpenFlags, PlatformIO, Result, IO}; +use limbo_core::{ + Connection, Database, File, LimboError, OpenFlags, PlatformIO, Result, Row, RowResult, IO, +}; +use log; use rand::prelude::*; use rand_chacha::ChaCha8Rng; use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; +use tempfile::TempDir; + +use anarchist_readable_name_generator_lib::{readable_name, readable_name_custom}; + +struct SimulatorEnv { + opts: SimulatorOpts, + tables: Vec, + connections: Vec, + io: Arc, + db: Rc, + rng: ChaCha8Rng, +} + +#[derive(Clone)] +enum SimConnection { + Connected(Rc), + Disconnected, +} + +#[derive(Debug)] +struct SimulatorOpts { + ticks: usize, + max_connections: usize, + max_tables: usize, + seed: u64, + // this next options are the distribution of workload where read_percent + write_percent + + // delete_percent == 100% + read_percent: usize, + write_percent: usize, + delete_percent: usize, + page_size: usize, +} + +struct Table { + rows: Vec>, + name: String, + columns: Vec, +} + +#[derive(Clone)] +struct Column { + name: String, + column_type: ColumnType, + primary: bool, + unique: bool, +} + +#[derive(Clone)] +enum ColumnType { + Integer, + Float, + Text, + Blob, +} + +#[derive(Debug, PartialEq)] +enum Value { + Null, + Integer(i64), + Float(f64), + Text(String), + Blob(Vec), +} #[allow(clippy::arc_with_non_send_sync)] fn main() { + let _ = env_logger::try_init(); let seed = match std::env::var("SEED") { Ok(seed) => seed.parse::().unwrap(), Err(_) => rand::thread_rng().next_u64(), }; println!("Seed: {}", seed); let mut rng = ChaCha8Rng::seed_from_u64(seed); - let io = Arc::new(SimulatorIO::new(seed).unwrap()); - let test_path = "./testing/testing.db"; - let db = match Database::open_file(io.clone(), test_path) { + + let (read_percent, write_percent, delete_percent) = { + let mut remaining = 100; + let read_percent = rng.gen_range(0..=remaining); + remaining -= read_percent; + let write_percent = rng.gen_range(0..=remaining); + remaining -= write_percent; + let delete_percent = remaining; + (read_percent, write_percent, delete_percent) + }; + + let opts = SimulatorOpts { + ticks: rng.gen_range(0..4096), + max_connections: 1, // TODO: for now let's use one connection as we didn't implement + // correct transactions procesing + max_tables: rng.gen_range(0..128), + seed, + read_percent, + write_percent, + delete_percent, + page_size: 4096, // TODO: randomize this too + }; + let io = Arc::new(SimulatorIO::new(seed, opts.page_size).unwrap()); + + let mut path = TempDir::new().unwrap().into_path(); + path.push("simulator.db"); + println!("path to db '{:?}'", path); + let db = match Database::open_file(io.clone(), path.as_path().to_str().unwrap()) { Ok(db) => db, Err(e) => { - panic!("error opening database test file {}: {:?}", test_path, e); + panic!("error opening simulator test file {:?}: {:?}", path, e); } }; - for _ in 0..100 { - let conn = db.connect(); - let mut stmt = conn.prepare("SELECT * FROM users").unwrap(); - let mut rows = stmt.query().unwrap(); - 'rows_loop: loop { - io.inject_fault(rng.gen_bool(0.5)); - match rows.next_row() { - Ok(result) => { - match result { - limbo_core::RowResult::Row(_row) => { - // TODO: assert that data is correct - } - limbo_core::RowResult::IO => { - io.inject_fault(rng.gen_bool(0.01)); - if io.run_once().is_err() { - break 'rows_loop; - } - } - limbo_core::RowResult::Done => { + + let connections = vec![SimConnection::Disconnected; opts.max_connections]; + let mut env = SimulatorEnv { + opts, + tables: Vec::new(), + connections, + rng, + io, + db, + }; + + println!("Initial opts {:?}", env.opts); + + for _ in 0..env.opts.ticks { + let connection_index = env.rng.gen_range(0..env.opts.max_connections); + let mut connection = env.connections[connection_index].clone(); + + match &mut connection { + SimConnection::Connected(conn) => { + let disconnect = env.rng.gen_ratio(1, 100); + if disconnect { + log::info!("disconnecting {}", connection_index); + let _ = conn.close(); + env.connections[connection_index] = SimConnection::Disconnected; + } else { + match process_connection(&mut env, conn) { + Ok(_) => {} + Err(err) => { + log::error!("error {}", err); break; } } } - Err(_) => { - continue; - } + } + SimConnection::Disconnected => { + log::info!("disconnecting {}", connection_index); + env.connections[connection_index] = SimConnection::Connected(env.db.connect()); } } - stmt.reset(); } - io.print_fault_stats(); + + env.io.print_stats(); +} + +fn process_connection(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + let management = env.rng.gen_ratio(1, 100); + if management { + // for now create table only + maybe_add_table(env, conn)?; + } else if env.tables.is_empty() { + maybe_add_table(env, conn)?; + } else { + let roll = env.rng.gen_range(0..100); + if roll < env.opts.read_percent { + // read + do_select(env, conn)?; + } else if roll < env.opts.read_percent + env.opts.write_percent { + // write + do_write(env, conn)?; + } else { + // delete + // TODO + } + } + Ok(()) +} + +fn do_select(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + let table = env.rng.gen_range(0..env.tables.len()); + let table_name = { + let table = &env.tables[table]; + table.name.clone() + }; + let rows = get_all_rows(env, conn, format!("SELECT * FROM {}", table_name).as_str())?; + + let table = &env.tables[table]; + compare_equal_rows(&table.rows, &rows); + Ok(()) +} + +fn do_write(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + let mut query = String::new(); + let table = env.rng.gen_range(0..env.tables.len()); + { + let table = &env.tables[table]; + query.push_str(format!("INSERT INTO {} VALUES (", table.name).as_str()); + } + + let columns = env.tables[table].columns.clone(); + let mut row = Vec::new(); + + // gen insert query + for column in &columns { + let value = match column.column_type { + ColumnType::Integer => Value::Integer(env.rng.gen_range(std::i64::MIN..std::i64::MAX)), + ColumnType::Float => Value::Float(env.rng.gen_range(-1e10..1e10)), + ColumnType::Text => Value::Text(gen_random_text(env)), + ColumnType::Blob => Value::Blob(gen_random_text(env).as_bytes().to_vec()), + }; + + query.push_str(value.to_string().as_str()); + query.push(','); + row.push(value); + } + + let table = &mut env.tables[table]; + table.rows.push(row); + + query.pop(); + query.push_str(");"); + + let _ = get_all_rows(env, conn, query.as_str())?; + + Ok(()) +} + +fn compare_equal_rows(a: &Vec>, b: &Vec>) { + assert_eq!(a.len(), b.len(), "lengths are different"); + for (r1, r2) in a.iter().zip(b) { + for (v1, v2) in r1.iter().zip(r2) { + assert_eq!(v1, v2, "values are different"); + } + } +} + +fn maybe_add_table(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + if env.tables.len() < env.opts.max_tables { + let table = Table { + rows: Vec::new(), + name: gen_random_name(env), + columns: gen_columns(env), + }; + let rows = get_all_rows(env, conn, table.to_create_str().as_str())?; + log::debug!("{:?}", rows); + let rows = get_all_rows( + env, + conn, + format!( + "SELECT sql FROM sqlite_schema WHERE type IN ('table', 'index') AND name = '{}';", + table.name + ) + .as_str(), + )?; + log::debug!("{:?}", rows); + assert!(rows.len() == 1); + let as_text = match &rows[0][0] { + Value::Text(t) => t, + _ => unreachable!(), + }; + assert!( + *as_text != table.to_create_str(), + "table was not inserted correctly" + ); + env.tables.push(table); + } + Ok(()) +} + +fn gen_random_name(env: &mut SimulatorEnv) -> String { + let name = readable_name_custom("_", &mut env.rng); + name.replace("-", "_") +} + +fn gen_random_text(env: &mut SimulatorEnv) -> String { + let big_text = env.rng.gen_ratio(1, 1000); + if big_text { + let max_size: u64 = 2 * 1024 * 1024 * 1024; + let size = env.rng.gen_range(1024..max_size); + let mut name = String::new(); + for i in 0..size { + name.push(((i % 26) as u8 + 'A' as u8) as char); + } + name + } else { + let name = readable_name_custom("_", &mut env.rng); + name.replace("-", "_") + } +} + +fn gen_columns(env: &mut SimulatorEnv) -> Vec { + let mut column_range = env.rng.gen_range(1..128); + let mut columns = Vec::new(); + while column_range > 0 { + let column_type = match env.rng.gen_range(0..4) { + 0 => ColumnType::Integer, + 1 => ColumnType::Float, + 2 => ColumnType::Text, + 3 => ColumnType::Blob, + _ => unreachable!(), + }; + let column = Column { + name: gen_random_name(env), + column_type, + primary: false, + unique: false, + }; + columns.push(column); + column_range -= 1; + } + columns +} + +fn get_all_rows( + env: &mut SimulatorEnv, + conn: &mut Rc, + query: &str, +) -> Result>> { + log::info!("running query '{}'", &query[0..query.len().min(4096)]); + let mut out = Vec::new(); + let rows = conn.query(query); + if rows.is_err() { + let err = rows.err(); + log::error!( + "Error running query '{}': {:?}", + &query[0..query.len().min(4096)], + err + ); + return Err(err.unwrap()); + } + let rows = rows.unwrap(); + assert!(rows.is_some()); + let mut rows = rows.unwrap(); + 'rows_loop: loop { + env.io.inject_fault(env.rng.gen_ratio(1, 10000)); + match rows.next_row()? { + RowResult::Row(row) => { + let mut r = Vec::new(); + for el in &row.values { + let v = match el { + limbo_core::Value::Null => Value::Null, + limbo_core::Value::Integer(i) => Value::Integer(*i), + limbo_core::Value::Float(f) => Value::Float(*f), + limbo_core::Value::Text(t) => Value::Text(t.clone().to_owned()), + limbo_core::Value::Blob(b) => Value::Blob(b.clone().to_owned()), + }; + r.push(v); + } + + out.push(r); + } + RowResult::IO => { + env.io.inject_fault(env.rng.gen_ratio(1, 10000)); + if env.io.run_once().is_err() { + log::info!("query inject fault"); + break 'rows_loop; + } + } + RowResult::Done => { + break; + } + } + } + Ok(out) } struct SimulatorIO { @@ -60,10 +374,11 @@ struct SimulatorIO { files: RefCell>>, rng: RefCell, nr_run_once_faults: RefCell, + page_size: usize, } impl SimulatorIO { - fn new(seed: u64) -> Result { + fn new(seed: u64, page_size: usize) -> Result { let inner = Box::new(PlatformIO::new()?); let fault = RefCell::new(false); let files = RefCell::new(Vec::new()); @@ -75,6 +390,7 @@ impl SimulatorIO { files, rng, nr_run_once_faults, + page_size, }) } @@ -85,10 +401,10 @@ impl SimulatorIO { } } - fn print_fault_stats(&self) { + fn print_stats(&self) { println!("run_once faults: {}", self.nr_run_once_faults.borrow()); for file in self.files.borrow().iter() { - file.print_fault_stats(); + file.print_stats(); } } } @@ -106,6 +422,10 @@ impl IO for SimulatorIO { fault: RefCell::new(false), nr_pread_faults: RefCell::new(0), nr_pwrite_faults: RefCell::new(0), + reads: RefCell::new(0), + writes: RefCell::new(0), + syncs: RefCell::new(0), + page_size: self.page_size, }); self.files.borrow_mut().push(file.clone()); Ok(file) @@ -136,6 +456,10 @@ struct SimulatorFile { fault: RefCell, nr_pread_faults: RefCell, nr_pwrite_faults: RefCell, + writes: RefCell, + reads: RefCell, + syncs: RefCell, + page_size: usize, } impl SimulatorFile { @@ -143,11 +467,14 @@ impl SimulatorFile { self.fault.replace(fault); } - fn print_fault_stats(&self) { + fn print_stats(&self) { println!( - "pread faults: {}, pwrite faults: {}", + "pread faults: {}, pwrite faults: {}, reads: {}, writes: {}, syncs: {}", *self.nr_pread_faults.borrow(), - *self.nr_pwrite_faults.borrow() + *self.nr_pwrite_faults.borrow(), + *self.reads.borrow(), + *self.writes.borrow(), + *self.syncs.borrow(), ); } } @@ -178,6 +505,7 @@ impl limbo_core::File for SimulatorFile { "Injected fault".into(), )); } + *self.reads.borrow_mut() += 1; self.inner.pread(pos, c) } @@ -193,10 +521,12 @@ impl limbo_core::File for SimulatorFile { "Injected fault".into(), )); } + *self.writes.borrow_mut() += 1; self.inner.pwrite(pos, buffer, c) } fn sync(&self, c: Rc) -> Result<()> { + *self.syncs.borrow_mut() += 1; self.inner.sync(c) } @@ -210,3 +540,49 @@ impl Drop for SimulatorFile { self.inner.unlock_file().expect("Failed to unlock file"); } } + +impl ColumnType { + pub fn as_str(&self) -> &str { + match self { + ColumnType::Integer => "INTEGER", + ColumnType::Float => "FLOAT", + ColumnType::Text => "TEXT", + ColumnType::Blob => "BLOB", + } + } +} + +impl Table { + pub fn to_create_str(&self) -> String { + let mut out = String::new(); + + out.push_str(format!("CREATE TABLE {} (", self.name).as_str()); + + assert!(!self.columns.is_empty()); + for column in &self.columns { + out.push_str(format!("{} {},", column.name, column.column_type.as_str()).as_str()); + } + // remove last comma + out.pop(); + + out.push_str(");"); + out + } +} + +impl Value { + pub fn to_string(&self) -> String { + match self { + Value::Null => "NULL".to_string(), + Value::Integer(i) => i.to_string(), + Value::Float(f) => f.to_string(), + Value::Text(t) => format!("'{}'", t.clone()), + Value::Blob(vec) => to_sqlite_blob(&vec), + } + } +} + +fn to_sqlite_blob(bytes: &Vec) -> String { + let hex: String = bytes.iter().map(|b| format!("{:02X}", b)).collect(); + format!("X'{}'", hex) +} diff --git a/test/src/lib.rs b/test/src/lib.rs index 5e0367206..63ee44f1f 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -367,6 +367,7 @@ mod tests { let conn = tmp_db.connect_limbo(); insert(1, &conn, &tmp_db).unwrap(); assert_eq!(count(&conn, &tmp_db).unwrap(), 1); + conn.close()?; } { let conn = tmp_db.connect_limbo(); @@ -375,6 +376,7 @@ mod tests { 1, "failed to read from wal from another connection" ); + conn.close()?; } Ok(()) }