From e3fdb6bab92595d5c9f83a99a4b4cf71b2b27229 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 21 Jul 2025 11:00:34 +0200 Subject: [PATCH 1/4] core/lib: init_pager lock shared wal until filled maybe_shared_wal's lock is held for a limited time increasing the chance of initializing the shared wal twice. --- core/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 2c38bfe41..6945e6a63 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -324,7 +324,8 @@ impl Database { fn init_pager(&self, page_size: Option) -> Result { // Open existing WAL file if present - if let Some(shared_wal) = self.maybe_shared_wal.read().clone() { + let mut maybe_shared_wal = self.maybe_shared_wal.write(); + if let Some(shared_wal) = maybe_shared_wal.clone() { let size = match page_size { None => unsafe { (*shared_wal.get()).page_size() as usize }, Some(size) => size, @@ -379,7 +380,7 @@ impl Database { let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections // can open the existing WAL. - *self.maybe_shared_wal.write() = Some(real_shared_wal.clone()); + *maybe_shared_wal = Some(real_shared_wal.clone()); let wal = Rc::new(RefCell::new(WalFile::new( self.io.clone(), real_shared_wal, From c55cb74dc8f4c2159f9d93e55c0dd3a3b05b0450 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 21 Jul 2025 11:06:31 +0200 Subject: [PATCH 2/4] simple write multi threaded test --- .../query_processing/test_multi_thread.rs | 38 +++++++++++++++++++ .../query_processing/test_write_path.rs | 6 +-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index 1d96e0422..13d4a8608 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::common::TempDatabase; #[test] @@ -25,3 +27,39 @@ fn test_schema_change() { }; println!("{:?} {:?}", row.get_value(0), row.get_value(1)); } + +#[test] +fn test_create_multiple_connections() -> anyhow::Result<()> { + let tries = 10; + for _ in 0..tries { + let tmp_db = Arc::new(TempDatabase::new_empty(false)); + { + let conn = tmp_db.connect_limbo(); + conn.execute("CREATE TABLE t(x)").unwrap(); + } + + let mut threads = Vec::new(); + for i in 0..10 { + let tmp_db_ = tmp_db.clone(); + threads.push(std::thread::spawn(move || { + let conn = tmp_db_.connect_limbo(); + conn.execute(format!("INSERT INTO t VALUES ({i})").as_str()) + .unwrap(); + })); + } + for thread in threads { + thread.join().unwrap(); + } + + let conn = tmp_db.connect_limbo(); + let mut stmt = conn.prepare("SELECT * FROM t").unwrap(); + let mut rows = Vec::new(); + while matches!(stmt.step().unwrap(), turso_core::StepResult::Row) { + let row = stmt.row().unwrap(); + rows.push(row.get::(0).unwrap()); + } + rows.sort(); + assert_eq!(rows, (0..10).collect::>()); + } + Ok(()) +} diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index f7605323e..b5c4e0b51 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -765,11 +765,11 @@ fn test_read_wal_dumb_no_frames() -> anyhow::Result<()> { Ok(()) } -fn run_query(tmp_db: &TempDatabase, conn: &Arc, query: &str) -> anyhow::Result<()> { +pub fn run_query(tmp_db: &TempDatabase, conn: &Arc, query: &str) -> anyhow::Result<()> { run_query_core(tmp_db, conn, query, None::) } -fn run_query_on_row( +pub fn run_query_on_row( tmp_db: &TempDatabase, conn: &Arc, query: &str, @@ -778,7 +778,7 @@ fn run_query_on_row( run_query_core(tmp_db, conn, query, Some(on_row)) } -fn run_query_core( +pub fn run_query_core( _tmp_db: &TempDatabase, conn: &Arc, query: &str, From 4c596953517b07745f7ea454ec96ff52b2fa5f9e Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 21 Jul 2025 12:43:31 +0200 Subject: [PATCH 3/4] test_multi_thread: ignore tests for now --- .../query_processing/test_multi_thread.rs | 139 +++++++++++++++++- 1 file changed, 131 insertions(+), 8 deletions(-) diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index 13d4a8608..a05aa795e 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -1,6 +1,8 @@ -use std::sync::Arc; +use std::sync::{atomic::AtomicUsize, Arc}; -use crate::common::TempDatabase; +use turso_core::StepResult; + +use crate::common::{maybe_setup_tracing, TempDatabase}; #[test] fn test_schema_change() { @@ -29,8 +31,10 @@ fn test_schema_change() { } #[test] +#[ignore] fn test_create_multiple_connections() -> anyhow::Result<()> { - let tries = 10; + maybe_setup_tracing(); + let tries = 1; for _ in 0..tries { let tmp_db = Arc::new(TempDatabase::new_empty(false)); { @@ -43,8 +47,34 @@ fn test_create_multiple_connections() -> anyhow::Result<()> { let tmp_db_ = tmp_db.clone(); threads.push(std::thread::spawn(move || { let conn = tmp_db_.connect_limbo(); - conn.execute(format!("INSERT INTO t VALUES ({i})").as_str()) - .unwrap(); + 'outer: loop { + let mut stmt = conn + .prepare(format!("INSERT INTO t VALUES ({i})").as_str()) + .unwrap(); + tracing::info!("inserting row {}", i); + loop { + match stmt.step().unwrap() { + StepResult::Row => { + panic!("unexpected row result"); + } + StepResult::IO => { + stmt.run_once().unwrap(); + } + StepResult::Done => { + tracing::info!("inserted row {}", i); + break 'outer; + } + StepResult::Interrupt => { + panic!("unexpected step result"); + } + StepResult::Busy => { + // repeat until we can insert it + tracing::info!("busy {}, repeating", i); + break; + } + } + } + } })); } for thread in threads { @@ -54,12 +84,105 @@ fn test_create_multiple_connections() -> anyhow::Result<()> { let conn = tmp_db.connect_limbo(); let mut stmt = conn.prepare("SELECT * FROM t").unwrap(); let mut rows = Vec::new(); - while matches!(stmt.step().unwrap(), turso_core::StepResult::Row) { - let row = stmt.row().unwrap(); - rows.push(row.get::(0).unwrap()); + loop { + match stmt.step().unwrap() { + StepResult::Row => { + let row = stmt.row().unwrap(); + rows.push(row.get::(0).unwrap()); + } + StepResult::IO => { + stmt.run_once().unwrap(); + } + StepResult::Done => { + break; + } + StepResult::Interrupt => { + panic!("unexpected step result"); + } + StepResult::Busy => { + panic!("unexpected busy result on select"); + } + } } rows.sort(); assert_eq!(rows, (0..10).collect::>()); } Ok(()) } + +#[test] +#[ignore] +fn test_reader_writer() -> anyhow::Result<()> { + let tries = 10; + for _ in 0..tries { + let tmp_db = Arc::new(TempDatabase::new_empty(false)); + { + let conn = tmp_db.connect_limbo(); + conn.execute("CREATE TABLE t(x)").unwrap(); + } + + let mut threads = Vec::new(); + let number_of_writers = 100; + let current_written_rows = Arc::new(AtomicUsize::new(0)); + { + let tmp_db = tmp_db.clone(); + let mut current_written_rows = current_written_rows.clone(); + threads.push(std::thread::spawn(move || { + let conn = tmp_db.connect_limbo(); + for i in 0..number_of_writers { + conn.execute(format!("INSERT INTO t VALUES ({i})").as_str()) + .unwrap(); + current_written_rows.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + })); + } + { + let current_written_rows = current_written_rows.clone(); + threads.push(std::thread::spawn(move || { + let conn = tmp_db.connect_limbo(); + loop { + let current_written_rows = + current_written_rows.load(std::sync::atomic::Ordering::Relaxed); + if current_written_rows == number_of_writers { + break; + } + let mut stmt = conn.prepare("SELECT * FROM t").unwrap(); + let mut rows = Vec::new(); + loop { + match stmt.step().unwrap() { + StepResult::Row => { + let row = stmt.row().unwrap(); + let x = row.get::(0).unwrap(); + rows.push(x); + } + StepResult::IO => { + stmt.run_once().unwrap(); + } + StepResult::Done => { + rows.sort(); + for i in 0..current_written_rows { + let i = i as i64; + assert!( + rows.contains(&i), + "row {} not found in {:?}. current_written_rows: {}", + i, + rows, + current_written_rows + ); + } + break; + } + StepResult::Interrupt | StepResult::Busy => { + panic!("unexpected step result"); + } + } + } + } + })); + } + for thread in threads { + thread.join().unwrap(); + } + } + Ok(()) +} From 6dd8f6561d98e7f446a125a527c74bc7c7cdb728 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 21 Jul 2025 12:53:56 +0200 Subject: [PATCH 4/4] clippy --- tests/integration/query_processing/test_multi_thread.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index a05aa795e..4682659c2 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -126,7 +126,7 @@ fn test_reader_writer() -> anyhow::Result<()> { let current_written_rows = Arc::new(AtomicUsize::new(0)); { let tmp_db = tmp_db.clone(); - let mut current_written_rows = current_written_rows.clone(); + let current_written_rows = current_written_rows.clone(); threads.push(std::thread::spawn(move || { let conn = tmp_db.connect_limbo(); for i in 0..number_of_writers { @@ -164,10 +164,7 @@ fn test_reader_writer() -> anyhow::Result<()> { let i = i as i64; assert!( rows.contains(&i), - "row {} not found in {:?}. current_written_rows: {}", - i, - rows, - current_written_rows + "row {i} not found in {rows:?}. current_written_rows: {current_written_rows}", ); } break;