diff --git a/core/io/memory.rs b/core/io/memory.rs index f3cadd4f1..29ac4f2be 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -57,6 +57,7 @@ impl IO for MemoryIO { files.insert( path.to_string(), Arc::new(MemoryFile { + path: path.to_string(), pages: BTreeMap::new().into(), size: 0.into(), }), @@ -89,6 +90,7 @@ impl IO for MemoryIO { } pub struct MemoryFile { + path: String, pages: UnsafeCell>, size: Cell, } @@ -104,6 +106,7 @@ impl File for MemoryFile { } fn pread(&self, pos: usize, c: Completion) -> Result { + tracing::debug!("pread(path={}): pos={}", self.path, pos); let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -145,6 +148,12 @@ impl File for MemoryFile { } fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + tracing::debug!( + "pwrite(path={}): pos={}, size={}", + self.path, + pos, + buffer.len() + ); let buf_len = buffer.len(); if buf_len == 0 { c.complete(0); @@ -180,12 +189,14 @@ impl File for MemoryFile { } fn sync(&self, c: Completion) -> Result { + tracing::debug!("sync(path={})", self.path); // no-op c.complete(0); Ok(c) } fn truncate(&self, len: usize, c: Completion) -> Result { + tracing::debug!("truncate(path={}): len={}", self.path, len); if len < self.size.get() { // Truncate pages unsafe { @@ -199,6 +210,12 @@ impl File for MemoryFile { } fn pwritev(&self, pos: usize, buffers: Vec>, c: Completion) -> Result { + tracing::debug!( + "pwritev(path={}): pos={}, buffers={:?}", + self.path, + pos, + buffers.iter().map(|x| x.len()).collect::>() + ); let mut offset = pos; let mut total_written = 0; @@ -236,6 +253,7 @@ impl File for MemoryFile { } fn size(&self) -> Result { + tracing::debug!("size(path={}): {}", self.path, self.size.get()); Ok(self.size.get() as u64) } } diff --git a/core/lib.rs b/core/lib.rs index fc0e357be..85bf799c1 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -411,7 +411,7 @@ impl Database { _shared_cache: false, cache_size: Cell::new(default_cache_size), page_size: Cell::new(page_size), - wal_checkpoint_disabled: Cell::new(false), + wal_auto_checkpoint_disabled: Cell::new(false), capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), closed: Cell::new(false), attached_databases: RefCell::new(DatabaseCatalog::new()), @@ -780,7 +780,9 @@ pub struct Connection { /// page size used for an uninitialized database or the next vacuum command. /// it's not always equal to the current page size of the database page_size: Cell, - wal_checkpoint_disabled: Cell, + /// Disable automatic checkpoint behaviour when DB is shutted down or WAL reach certain size + /// Client still can manually execute PRAGMA wal_checkpoint(...) commands + wal_auto_checkpoint_disabled: Cell, capture_data_changes: RefCell, closed: Cell, /// Attached databases @@ -1385,9 +1387,7 @@ impl Connection { if self.closed.get() { return Err(LimboError::InternalError("Connection closed".to_string())); } - self.pager - .borrow() - .wal_checkpoint(self.wal_checkpoint_disabled.get(), mode) + self.pager.borrow().wal_checkpoint(mode) } /// Close a connection and checkpoint. @@ -1407,7 +1407,7 @@ impl Connection { pager.end_tx( true, // rollback = true for close self, - self.wal_checkpoint_disabled.get(), + self.wal_auto_checkpoint_disabled.get(), ) })?; self.transaction_state.set(TransactionState::None); @@ -1416,11 +1416,11 @@ impl Connection { self.pager .borrow() - .checkpoint_shutdown(self.wal_checkpoint_disabled.get()) + .checkpoint_shutdown(self.wal_auto_checkpoint_disabled.get()) } - pub fn wal_disable_checkpoint(&self) { - self.wal_checkpoint_disabled.set(true); + pub fn wal_auto_checkpoint_disable(&self) { + self.wal_auto_checkpoint_disabled.set(true); } pub fn last_insert_rowid(&self) -> i64 { @@ -1895,11 +1895,10 @@ impl Connection { pub fn copy_db(&self, file: &str) -> Result<()> { // use a new PlatformIO instance here to allow for copying in-memory databases let io: Arc = Arc::new(PlatformIO::new()?); - let disabled = false; // checkpoint so everything is in the DB file before copying self.pager .borrow_mut() - .wal_checkpoint(disabled, CheckpointMode::Truncate)?; + .wal_checkpoint(CheckpointMode::Truncate)?; self.pager.borrow_mut().db_file.copy_to(&*io, file) } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index e78b9dcd0..24e55256e 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -502,7 +502,7 @@ impl StateTransition for CommitStateMachine { .end_tx( false, // rollback = false since we're committing &self.connection, - self.connection.wal_checkpoint_disabled.get(), + self.connection.wal_auto_checkpoint_disabled.get(), ) .map_err(|e| LimboError::InternalError(e.to_string())) .unwrap(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 8398bee58..5a6212336 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -961,7 +961,7 @@ impl Pager { &self, rollback: bool, connection: &Connection, - wal_checkpoint_disabled: bool, + wal_auto_checkpoint_disabled: bool, ) -> Result> { tracing::trace!("end_tx(rollback={})", rollback); let Some(wal) = self.wal.as_ref() else { @@ -980,7 +980,7 @@ impl Pager { self.rollback(schema_did_change, connection, is_write)?; return Ok(IOResult::Done(PagerCommitResult::Rollback)); } - let commit_status = return_if_io!(self.commit_dirty_pages(wal_checkpoint_disabled)); + let commit_status = return_if_io!(self.commit_dirty_pages(wal_auto_checkpoint_disabled)); wal.borrow().end_write_tx(); wal.borrow().end_read_tx(); @@ -1163,7 +1163,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn commit_dirty_pages( &self, - wal_checkpoint_disabled: bool, + wal_auto_checkpoint_disabled: bool, ) -> Result> { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( @@ -1226,7 +1226,7 @@ impl Pager { return Ok(IOResult::IO(IOCompletions::Single(c))); } CommitState::AfterSyncWal => { - if wal_checkpoint_disabled || !wal.borrow().should_checkpoint() { + if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() { self.commit_info.borrow_mut().state = CommitState::Start; break PagerCommitResult::WalWritten; } @@ -1354,7 +1354,7 @@ impl Pager { .expect("Failed to clear page cache"); } - pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> { + pub fn checkpoint_shutdown(&self, wal_auto_checkpoint_disabled: bool) -> Result<()> { let mut _attempts = 0; { let Some(wal) = self.wal.as_ref() else { @@ -1369,24 +1369,19 @@ impl Pager { let c = wal.sync()?; self.io.wait_for_completion(c)?; } - self.wal_checkpoint(wal_checkpoint_disabled, CheckpointMode::Passive)?; + if !wal_auto_checkpoint_disabled { + self.wal_checkpoint(CheckpointMode::Passive)?; + } Ok(()) } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_checkpoint( - &self, - wal_checkpoint_disabled: bool, - mode: CheckpointMode, - ) -> Result { + pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( "wal_checkpoint() called on database without WAL".to_string(), )); }; - if wal_checkpoint_disabled { - return Ok(CheckpointResult::default()); - } let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 3061e3cbb..b02744101 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1168,6 +1168,12 @@ impl Wal for WalFile { let page_size = self.page_size(); let mut frame = vec![0u8; page_size as usize + WAL_FRAME_HEADER_SIZE]; let mut seen = HashSet::new(); + turso_assert!( + frame_count >= frame_watermark, + "frame_count must be not less than frame_watermark: {} vs {}", + frame_count, + frame_watermark + ); let mut pages = Vec::with_capacity((frame_count - frame_watermark) as usize); for frame_no in frame_watermark + 1..=frame_count { let c = self.read_frame_raw(frame_no, &mut frame)?; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 9232887d3..38d24d0dd 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -581,7 +581,7 @@ impl Program { let cacheflush_status = pager.end_tx( rollback, connection, - connection.wal_checkpoint_disabled.get(), + connection.wal_auto_checkpoint_disabled.get(), )?; match cacheflush_status { IOResult::Done(_) => { diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index dafdfbc9b..4621b61d5 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -2,9 +2,11 @@ use std::sync::Arc; use crate::{ database_sync_operations::{ - checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes, - transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult, + checkpoint_wal_file, connect, connect_untracked, db_bootstrap, reset_wal_file, + transfer_logical_changes, transfer_physical_changes, wait_full_body, wal_pull, wal_push, + WalPullResult, }, + database_tape::DatabaseTape, errors::Error, io_operations::IoOperations, protocol_io::ProtocolIO, @@ -22,6 +24,7 @@ pub struct DatabaseSyncEngineOpts { pub struct DatabaseSyncEngine { io: Arc, protocol: Arc

, + draft_tape: DatabaseTape, draft_path: String, synced_path: String, meta_path: String, @@ -74,10 +77,13 @@ impl DatabaseSyncEngine { path: &str, opts: DatabaseSyncEngineOpts, ) -> Result { + let draft_path = format!("{path}-draft"); + let draft_tape = io.open_tape(&draft_path, true)?; let mut db = Self { io, protocol, - draft_path: format!("{path}-draft"), + draft_tape, + draft_path, synced_path: format!("{path}-synced"), meta_path: format!("{path}-info"), opts, @@ -90,8 +96,7 @@ impl DatabaseSyncEngine { /// Create database connection and appropriately configure it before use pub async fn connect(&self, coro: &Coro) -> Result> { - let db = self.io.open_tape(&self.draft_path, true)?; - db.connect(coro).await + connect(coro, &self.draft_tape).await } /// Sync all new changes from remote DB and apply them locally @@ -114,13 +119,11 @@ impl DatabaseSyncEngine { { // we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote // so, we pass 'capture: true' as we need to preserve all changes for future push of WAL - let draft = self.io.open_tape(&self.draft_path, true)?; - tracing::info!("opened draft"); let synced = self.io.open_tape(&self.synced_path, true)?; tracing::info!("opened synced"); // we will start wal write session for Draft DB in order to hold write lock during transfer of changes - let mut draft_session = WalSession::new(draft.connect(coro).await?); + let mut draft_session = WalSession::new(connect(coro, &self.draft_tape).await?); draft_session.begin()?; // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db @@ -128,7 +131,7 @@ impl DatabaseSyncEngine { // transfer logical changes to the Synced DB in order to later execute physical "rebase" operation let client_id = &self.meta().client_unique_id; - transfer_logical_changes(coro, &draft, &synced, client_id, true).await?; + transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, true).await?; // now we are ready to do the rebase: let's transfer physical changes from Synced to Draft let synced_wal_watermark = self.meta().synced_wal_match_watermark; @@ -172,7 +175,7 @@ impl DatabaseSyncEngine { ); { let synced = self.io.open_tape(&self.synced_path, false)?; - checkpoint_wal_file(coro, &synced.connect_untracked()?).await?; + checkpoint_wal_file(coro, &connect_untracked(&synced)?).await?; update_meta( coro, self.protocol.as_ref(), @@ -209,14 +212,13 @@ impl DatabaseSyncEngine { // we will push Synced WAL to the remote // so, we pass 'capture: false' as we don't need to preserve changes made to Synced WAL in turso_cdc - let draft = self.io.open_tape(&self.draft_path, true)?; let synced = self.io.open_tape(&self.synced_path, false)?; // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db self.synced_is_dirty = true; let client_id = &self.meta().client_unique_id; - transfer_logical_changes(coro, &draft, &synced, client_id, false).await?; + transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?; self.push_synced_to_remote(coro).await?; Ok(()) @@ -287,7 +289,7 @@ impl DatabaseSyncEngine { self.synced_path, ); let synced = self.io.open_tape(&self.synced_path, false)?; - let synced_conn = synced.connect(coro).await?; + let synced_conn = connect(coro, &synced).await?; let mut wal = WalSession::new(synced_conn); let generation = self.meta().synced_generation; @@ -333,7 +335,7 @@ impl DatabaseSyncEngine { self.meta().client_unique_id ); let synced = self.io.open_tape(&self.synced_path, false)?; - let synced_conn = synced.connect(coro).await?; + let synced_conn = connect(coro, &synced).await?; let mut wal = WalSession::new(synced_conn); wal.begin()?; @@ -450,6 +452,7 @@ pub mod tests { use std::{collections::BTreeMap, sync::Arc}; use rand::RngCore; + use tokio::join; use crate::{ database_sync_engine::DatabaseSyncEngineOpts, @@ -476,12 +479,12 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -588,6 +591,60 @@ pub mod tests { }); } + #[test] + pub fn test_sync_single_db_update_sync_concurrent() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::MemoryIO::new()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + + protocol + .server + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES ('hello', 'world')", ()) + .await + .unwrap(); + + runner.init(":memory:", opts).await.unwrap(); + let conn = runner.connect().await.unwrap(); + + let syncs = async move { + for i in 0..10 { + tracing::info!("sync attempt #{i}"); + runner.sync().await.unwrap(); + } + }; + + let updates = async move { + for i in 0..10 { + tracing::info!("update attempt #{i}"); + let sql = format!("INSERT INTO t VALUES ('key-{i}', 'value-{i}')"); + match conn.execute(&sql, ()).await { + Ok(_) => {} + Err(err) if err.to_string().contains("database is locked") => {} + Err(err) => panic!("update failed: {err}"), + } + ctx.random_sleep_n(50).await; + } + }; + + join!(updates, syncs); + }); + } + #[test] pub fn test_sync_single_db_many_pulls_big_payloads() { deterministic_runtime(async || { @@ -599,13 +656,13 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -659,12 +716,12 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -723,12 +780,12 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); server .server @@ -794,12 +851,17 @@ pub mod tests { const CLIENTS: usize = 8; for i in 0..CLIENTS { let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{i}.db")); + let local_path = dir + .path() + .join(format!("local-{i}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{i}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); dbs.push(runner); } @@ -877,12 +939,16 @@ pub mod tests { let sync_lock = sync_lock.clone(); async move { let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); - let local_path = dir.join(format!("local-{i}.db")); + let local_path = dir + .join(format!("local-{i}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{i}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); runner.pull().await.unwrap(); let conn = runner.connect().await.unwrap(); for query in queries { @@ -933,12 +999,17 @@ pub mod tests { it += 1; let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{it}.db")); + let local_path = dir + .path() + .join(format!("local-{it}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{it}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. }); @@ -1003,12 +1074,17 @@ pub mod tests { .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{it}.db")); + let local_path = dir + .path() + .join(format!("local-{it}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{it}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -1078,12 +1154,17 @@ pub mod tests { .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{it}.db")); + let local_path = dir + .path() + .join(format!("local-{it}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{it}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); let conn = runner.connect().await.unwrap(); diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 3de1a4ace..8f21868af 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -35,6 +35,18 @@ pub enum WalPushResult { NeedCheckpoint, } +pub async fn connect(coro: &Coro, tape: &DatabaseTape) -> Result> { + let conn = tape.connect(coro).await?; + conn.wal_auto_checkpoint_disable(); + Ok(conn) +} + +pub fn connect_untracked(tape: &DatabaseTape) -> Result> { + let conn = tape.connect_untracked()?; + conn.wal_auto_checkpoint_disable(); + Ok(conn) +} + /// Bootstrap multiple DB files from latest generation from remote pub async fn db_bootstrap( coro: &Coro, @@ -257,8 +269,8 @@ pub async fn transfer_logical_changes( bump_pull_gen: bool, ) -> Result<()> { tracing::debug!("transfer_logical_changes: client_id={client_id}"); - let source_conn = source.connect_untracked()?; - let target_conn = target.connect_untracked()?; + let source_conn = connect_untracked(source)?; + let target_conn = connect_untracked(target)?; // fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure let source_pull_gen = 'source_pull_gen: { @@ -333,7 +345,7 @@ pub async fn transfer_logical_changes( use_implicit_rowid: false, }; - let source_schema_cookie = source.connect_untracked()?.read_schema_version()?; + let source_schema_cookie = connect_untracked(source)?.read_schema_version()?; let mut session = target.start_replay_session(coro, replay_opts).await?; @@ -414,7 +426,7 @@ pub async fn transfer_physical_changes( ) -> Result { tracing::debug!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}"); - let source_conn = source.connect(coro).await?; + let source_conn = connect(coro, source).await?; let mut source_session = WalSession::new(source_conn.clone()); source_session.begin()?; diff --git a/packages/turso-sync-engine/src/lib.rs b/packages/turso-sync-engine/src/lib.rs index d0f61dc21..c67b2e363 100644 --- a/packages/turso-sync-engine/src/lib.rs +++ b/packages/turso-sync-engine/src/lib.rs @@ -18,7 +18,7 @@ pub type Result = std::result::Result; #[cfg(test)] mod tests { - use std::{path::PathBuf, sync::Arc}; + use std::sync::Arc; use tokio::{select, sync::Mutex}; use tracing_subscriber::EnvFilter; @@ -84,23 +84,12 @@ mod tests { db: None, } } - pub async fn init( - &mut self, - local_path: PathBuf, - opts: DatabaseSyncEngineOpts, - ) -> Result<()> { + pub async fn init(&mut self, local_path: &str, opts: DatabaseSyncEngineOpts) -> Result<()> { let io = self.io.clone(); let server = self.sync_server.clone(); let db = self .run(genawaiter::sync::Gen::new(|coro| async move { - DatabaseSyncEngine::new( - &coro, - io, - Arc::new(server), - local_path.to_str().unwrap(), - opts, - ) - .await + DatabaseSyncEngine::new(&coro, io, Arc::new(server), local_path, opts).await })) .await .unwrap(); diff --git a/packages/turso-sync-engine/src/test_context.rs b/packages/turso-sync-engine/src/test_context.rs index 3691550da..3d67d135d 100644 --- a/packages/turso-sync-engine/src/test_context.rs +++ b/packages/turso-sync-engine/src/test_context.rs @@ -77,6 +77,13 @@ impl TestContext { let delay = self.rng.lock().await.next_u64() % 1000; tokio::time::sleep(std::time::Duration::from_millis(delay)).await } + pub async fn random_sleep_n(&self, n: u64) { + let delay = { + let mut rng = self.rng.lock().await; + rng.next_u64() % 1000 * (rng.next_u64() % n + 1) + }; + tokio::time::sleep(std::time::Duration::from_millis(delay)).await + } pub async fn rng(&self) -> tokio::sync::MutexGuard { self.rng.lock().await diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 723a3f178..30e3af817 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1476,7 +1476,7 @@ pub unsafe extern "C" fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> ffi: } let db: &mut sqlite3 = &mut *db; let db = db.inner.lock().unwrap(); - db.conn.wal_disable_checkpoint(); + db.conn.wal_auto_checkpoint_disable(); SQLITE_OK }