diff --git a/Cargo.lock b/Cargo.lock index c6cd32ac3..227be5df6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,15 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +[[package]] +name = "anarchist-readable-name-generator-lib" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18a1e15a87b13ae79e04e07b3714fc41d5f6993dff11662fdbe0b207c6ad0fe0" +dependencies = [ + "rand", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -1147,9 +1156,13 @@ dependencies = [ name = "limbo_sim" version = "0.0.4" dependencies = [ + "anarchist-readable-name-generator-lib", + "env_logger 0.10.2", "limbo_core", + "log", "rand", "rand_chacha", + "tempfile", ] [[package]] diff --git a/cli/main.rs b/cli/main.rs index fcb339530..947b171b4 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -93,6 +93,7 @@ fn main() -> anyhow::Result<()> { // At prompt, increment interrupt count if interrupt_count.fetch_add(1, Ordering::SeqCst) >= 1 { eprintln!("Interrupted. Exiting..."); + conn.close(); break; } println!("Use .quit to exit or press Ctrl-C again to force quit."); diff --git a/core/lib.rs b/core/lib.rs index 21650f46d..fff59f418 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -294,18 +294,17 @@ impl Connection { self.pager.clear_page_cache(); Ok(()) } -} -impl Drop for Connection { - fn drop(&mut self) { + /// Close a connection and checkpoint. + pub fn close(&self) -> Result<()> { loop { // TODO: make this async? - match self.pager.checkpoint().unwrap() { + match self.pager.checkpoint()? { CheckpointStatus::Done => { - return; + return Ok(()); } CheckpointStatus::IO => { - self.pager.io.run_once().unwrap(); + self.pager.io.run_once()?; } }; } diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index afa718e41..51351aee2 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -18,3 +18,7 @@ path = "main.rs" limbo_core = { path = "../core" } rand = "0.8.5" rand_chacha = "0.3.1" +log = "0.4.20" +tempfile = "3.0.7" +env_logger = "0.10.1" +anarchist-readable-name-generator-lib = "0.1.2" diff --git a/simulator/main.rs b/simulator/main.rs index ceea26ee3..09ed27f9e 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -1,57 +1,340 @@ -use limbo_core::{Database, File, OpenFlags, PlatformIO, Result, IO}; +use limbo_core::{Connection, Database, File, OpenFlags, PlatformIO, Result, RowResult, IO}; +use log; use rand::prelude::*; use rand_chacha::ChaCha8Rng; use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; +use tempfile::TempDir; + +use anarchist_readable_name_generator_lib::{readable_name, readable_name_custom}; + +struct SimulatorEnv { + opts: SimulatorOpts, + tables: Vec, + connections: Vec, + io: Arc, + db: Rc, + rng: ChaCha8Rng, +} + +#[derive(Clone)] +enum SimConnection { + Connected(Rc), + Disconnected, +} + +#[derive(Debug)] +struct SimulatorOpts { + ticks: usize, + max_connections: usize, + max_tables: usize, + seed: u64, + // this next options are the distribution of workload where read_percent + write_percent + + // delete_percent == 100% + read_percent: usize, + write_percent: usize, + delete_percent: usize, +} + +struct Table { + rows: Vec>, + name: String, + columns: Vec, +} + +#[derive(Clone)] +struct Column { + name: String, + column_type: ColumnType, + primary: bool, + unique: bool, +} + +#[derive(Clone)] +enum ColumnType { + Integer, + Float, + Text, + Blob, +} + +#[derive(Debug, PartialEq)] +enum Value { + Null, + Integer(i64), + Float(f64), + Text(String), + Blob(Vec), +} #[allow(clippy::arc_with_non_send_sync)] fn main() { + let _ = env_logger::try_init(); let seed = match std::env::var("SEED") { Ok(seed) => seed.parse::().unwrap(), Err(_) => rand::thread_rng().next_u64(), }; println!("Seed: {}", seed); let mut rng = ChaCha8Rng::seed_from_u64(seed); + + let (read_percent, write_percent, delete_percent) = { + let mut remaining = 100; + let read_percent = rng.gen_range(0..=remaining); + remaining -= read_percent; + let write_percent = rng.gen_range(0..=remaining); + remaining -= write_percent; + let delete_percent = remaining; + (read_percent, write_percent, delete_percent) + }; + + let opts = SimulatorOpts { + ticks: rng.gen_range(0..4096), + max_connections: 1, // TODO: for now let's use one connection as we didn't implement + // correct transactions procesing + max_tables: rng.gen_range(0..128), + seed, + read_percent, + write_percent, + delete_percent, + }; let io = Arc::new(SimulatorIO::new(seed).unwrap()); - let test_path = "./testing/testing.db"; - let db = match Database::open_file(io.clone(), test_path) { + + let mut path = TempDir::new().unwrap().into_path(); + path.push("simulator.db"); + println!("path to db '{:?}'", path); + let db = match Database::open_file(io.clone(), path.as_path().to_str().unwrap()) { Ok(db) => db, Err(e) => { - panic!("error opening database test file {}: {:?}", test_path, e); + panic!("error opening simulator test file {:?}: {:?}", path, e); } }; - for _ in 0..100 { - let conn = db.connect(); - let mut stmt = conn.prepare("SELECT * FROM users").unwrap(); - let mut rows = stmt.query().unwrap(); - 'rows_loop: loop { - io.inject_fault(rng.gen_bool(0.5)); - match rows.next_row() { - Ok(result) => { - match result { - limbo_core::RowResult::Row(_row) => { - // TODO: assert that data is correct - } - limbo_core::RowResult::IO => { - io.inject_fault(rng.gen_bool(0.01)); - if io.run_once().is_err() { - break 'rows_loop; - } - } - limbo_core::RowResult::Done => { - break; - } - } - } - Err(_) => { - continue; + + let connections = vec![SimConnection::Disconnected; opts.max_connections]; + let mut env = SimulatorEnv { + opts, + tables: Vec::new(), + connections, + rng, + io, + db, + }; + + println!("Initial opts {:?}", env.opts); + + for _ in 0..env.opts.ticks { + let connection_index = env.rng.gen_range(0..env.opts.max_connections); + let mut connection = env.connections[connection_index].clone(); + + match &mut connection { + SimConnection::Connected(conn) => { + let disconnect = env.rng.gen_ratio(1, 100); + if disconnect { + log::info!("disconnecting {}", connection_index); + let _ = conn.close(); + env.connections[connection_index] = SimConnection::Disconnected; + } else { + let _ = process_connection(&mut env, conn); } } + SimConnection::Disconnected => { + log::info!("disconnecting {}", connection_index); + env.connections[connection_index] = SimConnection::Connected(env.db.connect()); + } } - stmt.reset(); } - io.print_fault_stats(); + + env.io.print_fault_stats(); +} + +fn process_connection(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + let management = env.rng.gen_ratio(1, 100); + if management { + // for now create table only + maybe_add_table(env, conn)?; + } else if env.tables.is_empty() { + maybe_add_table(env, conn)?; + } else { + let roll = env.rng.gen_range(0..100); + if roll < env.opts.read_percent { + // read + do_select(env, conn)?; + } else if roll < env.opts.read_percent + env.opts.write_percent { + // write + do_write(env, conn)?; + } else { + // delete + // TODO + } + } + Ok(()) +} + +fn do_select(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + let table = env.rng.gen_range(0..env.tables.len()); + let table_name = { + let table = &env.tables[table]; + table.name.clone() + }; + let rows = get_all_rows(env, conn, format!("SELECT * FROM {}", table_name).as_str())?; + + let table = &env.tables[table]; + compare_equal_rows(&table.rows, &rows); + Ok(()) +} + +fn do_write(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + let mut query = String::new(); + let table = env.rng.gen_range(0..env.tables.len()); + { + let table = &env.tables[table]; + query.push_str(format!("INSERT INTO {} VALUES (", table.name).as_str()); + } + + let columns = env.tables[table].columns.clone(); + + // gen insert query + for column in &columns { + let value_str = match column.column_type { + ColumnType::Integer => env.rng.gen_range(std::i64::MIN..std::i64::MAX).to_string(), + ColumnType::Float => env.rng.gen_range(std::f64::MIN..std::f64::MAX).to_string(), + ColumnType::Text => format!("'{}'", gen_random_text(env)), + ColumnType::Blob => format!("X'{}'", gen_random_text(env)), + }; + query.push_str(value_str.as_str()); + query.push(','); + } + + query.pop(); + query.push_str(");"); + + let _ = get_all_rows(env, conn, query.as_str()); + + Ok(()) +} + +fn compare_equal_rows(a: &Vec>, b: &Vec>) { + assert_eq!(a.len(), b.len()); + for (r1, r2) in a.iter().zip(b) { + for (v1, v2) in r1.iter().zip(r2) { + assert_eq!(v1, v2); + } + } +} + +fn maybe_add_table(env: &mut SimulatorEnv, conn: &mut Rc) -> Result<()> { + if env.tables.len() < env.opts.max_tables { + let table = Table { + rows: Vec::new(), + name: gen_random_name(env), + columns: gen_columns(env), + }; + let rows = get_all_rows(env, conn, table.to_create_str().as_str())?; + log::debug!("{:?}", rows); + let rows = get_all_rows(env, conn, ".schema;")?; + let mut found = false; + for row in &rows { + let as_text = match &row[0] { + Value::Text(t) => t, + _ => unreachable!(), + }; + if *as_text == table.to_create_str() { + found = true; + break; + } + } + assert!(found, "table was not inserted correctly"); + env.tables.push(table); + } + Ok(()) +} + +fn gen_random_name(env: &mut SimulatorEnv) -> String { + let name = readable_name_custom("_", &mut env.rng); + name.replace("-", "_") +} + +fn gen_random_text(env: &mut SimulatorEnv) -> String { + let big_text = env.rng.gen_ratio(1, 100); + if big_text { + let max_size: u64 = 2 * 1024 * 1024 * 1024; + let size = env.rng.gen_range(1024..max_size); + let mut name = String::new(); + for i in 0..size { + name.push(((i % 26) as u8 + 'A' as u8) as char); + } + name + } else { + let name = readable_name_custom("_", &mut env.rng); + name.replace("-", "_") + } +} + +fn gen_columns(env: &mut SimulatorEnv) -> Vec { + let mut column_range = env.rng.gen_range(1..128); + let mut columns = Vec::new(); + while column_range > 0 { + let column_type = match env.rng.gen_range(0..4) { + 0 => ColumnType::Integer, + 1 => ColumnType::Float, + 2 => ColumnType::Text, + 3 => ColumnType::Blob, + _ => unreachable!(), + }; + let column = Column { + name: gen_random_name(env), + column_type, + primary: false, + unique: false, + }; + columns.push(column); + column_range -= 1; + } + columns +} + +fn get_all_rows( + env: &mut SimulatorEnv, + conn: &mut Rc, + query: &str, +) -> Result>> { + log::info!("running query '{}'", query); + let mut out = Vec::new(); + let rows = conn.query(query)?; + if rows.is_none() { + return Ok(out); + } + let mut rows = rows.unwrap(); + 'rows_loop: loop { + env.io.inject_fault(env.rng.gen_bool(0.5)); + match rows.next_row()? { + RowResult::Row(row) => { + let mut r = Vec::new(); + for el in &row.values { + let v = match el { + limbo_core::Value::Null => Value::Null, + limbo_core::Value::Integer(i) => Value::Integer(*i), + limbo_core::Value::Float(f) => Value::Float(*f), + limbo_core::Value::Text(t) => Value::Text(t.clone().to_owned()), + limbo_core::Value::Blob(b) => Value::Blob(b.clone().to_owned()), + }; + r.push(v); + } + + out.push(r); + } + RowResult::IO => { + env.io.inject_fault(env.rng.gen_bool(0.01)); + if env.io.run_once().is_err() { + break 'rows_loop; + } + } + RowResult::Done => { + break; + } + } + } + Ok(out) } struct SimulatorIO { @@ -210,3 +493,31 @@ impl Drop for SimulatorFile { self.inner.unlock_file().expect("Failed to unlock file"); } } + +impl ColumnType { + pub fn as_str(&self) -> &str { + match self { + ColumnType::Integer => "INTEGER", + ColumnType::Float => "FLOAT", + ColumnType::Text => "TEXT", + ColumnType::Blob => "BLOB", + } + } +} +impl Table { + pub fn to_create_str(&self) -> String { + let mut out = String::new(); + + out.push_str(format!("CREATE TABLE {} (", self.name).as_str()); + + assert!(!self.columns.is_empty()); + for column in &self.columns { + out.push_str(format!("{} {},", column.name, column.column_type.as_str()).as_str()); + } + // remove last comma + out.pop(); + + out.push_str(");"); + out + } +}