From 253b4933f72bfb186d15fdfd1f6d5053fc0cdaef Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 6 Aug 2025 19:30:16 +0400 Subject: [PATCH] more small fixes --- .../src/database_sync_engine.rs | 23 ++--- .../src/database_sync_operations.rs | 6 +- .../turso-sync-engine/src/test_protocol_io.rs | 6 +- .../turso-sync-engine/src/test_sync_server.rs | 83 +++++++++---------- 4 files changed, 51 insertions(+), 67 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index fc5bfc0d8..077c8e7ca 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -434,7 +434,7 @@ pub mod tests { database_sync_engine::DatabaseSyncEngineOpts, test_context::{FaultInjectionStrategy, TestContext}, test_protocol_io::TestProtocolIo, - test_sync_server::{convert_rows, TestSyncServerOpts}, + test_sync_server::convert_rows, tests::{deterministic_runtime, seed_u64, TestRunner}, Result, }; @@ -450,9 +450,8 @@ pub mod tests { let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); let dir = tempfile::TempDir::new().unwrap(); let server_path = dir.path().join("server.db"); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path, opts) + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); @@ -574,9 +573,8 @@ pub mod tests { let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); let dir = tempfile::TempDir::new().unwrap(); let server_path = dir.path().join("server.db"); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let ctx = Arc::new(TestContext::new(seed_u64())); - let server = TestProtocolIo::new(ctx.clone(), &server_path, opts) + let server = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); @@ -642,10 +640,9 @@ pub mod tests { deterministic_runtime(async || { let dir = tempfile::TempDir::new().unwrap(); let server_path = dir.path().join("server.db"); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let ctx = Arc::new(TestContext::new(seed_u64())); let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path, opts) + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); let mut dbs = Vec::new(); @@ -697,10 +694,9 @@ pub mod tests { deterministic_runtime(async || { let dir = tempfile::TempDir::new().unwrap(); let server_path = dir.path().join("server.db"); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path, opts) + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); @@ -770,9 +766,8 @@ pub mod tests { let dir = tempfile::TempDir::new().unwrap(); let server_path = dir.path().join("server.db"); let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path, opts) + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); @@ -844,7 +839,6 @@ pub mod tests { deterministic_runtime(async || { let dir = tempfile::TempDir::new().unwrap(); let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let ctx = Arc::new(TestContext::new(seed_u64())); let mut session = ctx.fault_session(); @@ -853,7 +847,7 @@ pub mod tests { it += 1; let server_path = dir.path().join(format!("server-{it}.db")); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path, opts.clone()) + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); @@ -914,7 +908,6 @@ pub mod tests { deterministic_runtime(async || { let dir = tempfile::TempDir::new().unwrap(); let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let opts = TestSyncServerOpts { pull_batch_size: 1 }; let ctx = Arc::new(TestContext::new(seed_u64())); let mut session = ctx.fault_session(); @@ -923,7 +916,7 @@ pub mod tests { it += 1; let server_path = dir.path().join(format!("server-{it}.db")); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path, opts.clone()) + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) .await .unwrap(); diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 510f31bbf..1e81f0dc7 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, rc::Rc, sync::Arc}; +use std::{rc::Rc, sync::Arc}; use turso_core::{types::Text, Buffer, Completion, LimboError, Value}; @@ -47,8 +47,8 @@ pub async fn db_bootstrap( let chunk = chunk.data(); let content_len = chunk.len(); // todo(sivukhin): optimize allocations here - let buffer = Arc::new(RefCell::new(Buffer::allocate(chunk.len(), Rc::new(|_| {})))); - buffer.borrow_mut().as_mut_slice().copy_from_slice(&chunk); + let buffer = Arc::new(Buffer::allocate(chunk.len(), Rc::new(|_| {}))); + buffer.as_mut_slice().copy_from_slice(&chunk); let mut completions = Vec::with_capacity(dbs.len()); for i in 0..dbs.len() { let c = Completion::new_write(move |size| { diff --git a/packages/turso-sync-engine/src/test_protocol_io.rs b/packages/turso-sync-engine/src/test_protocol_io.rs index c90d36c65..18b4e9361 100644 --- a/packages/turso-sync-engine/src/test_protocol_io.rs +++ b/packages/turso-sync-engine/src/test_protocol_io.rs @@ -11,7 +11,7 @@ use crate::{ errors::Error, protocol_io::{DataCompletion, DataPollResult, ProtocolIO}, test_context::TestContext, - test_sync_server::{TestSyncServer, TestSyncServerOpts}, + test_sync_server::TestSyncServer, Result, }; @@ -102,11 +102,11 @@ impl DataCompletion for TestDataCompletion { } impl TestProtocolIo { - pub async fn new(ctx: Arc, path: &Path, opts: TestSyncServerOpts) -> Result { + pub async fn new(ctx: Arc, path: &Path) -> Result { Ok(Self { ctx: ctx.clone(), requests: Arc::new(std::sync::Mutex::new(Vec::new())), - server: TestSyncServer::new(ctx, path, opts).await?, + server: TestSyncServer::new(ctx, path).await?, files: Arc::new(Mutex::new(HashMap::new())), }) } diff --git a/packages/turso-sync-engine/src/test_sync_server.rs b/packages/turso-sync-engine/src/test_sync_server.rs index 7ffca94aa..e7c5dac69 100644 --- a/packages/turso-sync-engine/src/test_sync_server.rs +++ b/packages/turso-sync-engine/src/test_sync_server.rs @@ -31,20 +31,35 @@ struct TestSyncServerState { sessions: HashMap, } -#[derive(Debug, Clone)] -pub struct TestSyncServerOpts { - pub pull_batch_size: usize, -} - #[derive(Clone)] pub struct TestSyncServer { ctx: Arc, db: turso::Database, - opts: Arc, state: Arc>, } impl TestSyncServer { + pub async fn new(ctx: Arc, path: &Path) -> Result { + let mut generations = HashMap::new(); + generations.insert( + 1, + Generation { + snapshot: EMPTY_WAL_MODE_DB.to_vec(), + frames: Vec::new(), + }, + ); + Ok(Self { + ctx, + db: turso::Builder::new_local(path.to_str().unwrap()) + .build() + .await?, + state: Arc::new(Mutex::new(TestSyncServerState { + generation: 1, + generations, + sessions: HashMap::new(), + })), + }) + } pub async fn db_info(&self, completion: TestDataCompletion) -> Result<()> { tracing::debug!("db_info"); self.ctx.faulty_call("db_info_start").await?; @@ -244,47 +259,7 @@ impl TestSyncServer { Ok(()) } -} -// empty DB with single 4096-byte page and WAL mode (PRAGMA journal_mode=WAL) -// see test test_empty_wal_mode_db_content which validates asset content -pub const EMPTY_WAL_MODE_DB: &[u8] = include_bytes!("test_empty.db"); - -pub async fn convert_rows(rows: &mut turso::Rows) -> Result>> { - let mut rows_values = vec![]; - while let Some(row) = rows.next().await? { - let mut row_values = vec![]; - for i in 0..row.column_count() { - row_values.push(row.get_value(i)?); - } - rows_values.push(row_values); - } - Ok(rows_values) -} - -impl TestSyncServer { - pub async fn new(ctx: Arc, path: &Path, opts: TestSyncServerOpts) -> Result { - let mut generations = HashMap::new(); - generations.insert( - 1, - Generation { - snapshot: EMPTY_WAL_MODE_DB.to_vec(), - frames: Vec::new(), - }, - ); - Ok(Self { - ctx, - db: turso::Builder::new_local(path.to_str().unwrap()) - .build() - .await?, - opts: Arc::new(opts), - state: Arc::new(Mutex::new(TestSyncServerState { - generation: 1, - generations, - sessions: HashMap::new(), - })), - }) - } pub fn db(&self) -> turso::Database { self.db.clone() } @@ -310,3 +285,19 @@ impl TestSyncServer { Ok(()) } } + +// empty DB with single 4096-byte page and WAL mode (PRAGMA journal_mode=WAL) +// see test test_empty_wal_mode_db_content which validates asset content +pub const EMPTY_WAL_MODE_DB: &[u8] = include_bytes!("test_empty.db"); + +pub async fn convert_rows(rows: &mut turso::Rows) -> Result>> { + let mut rows_values = vec![]; + while let Some(row) = rows.next().await? { + let mut row_values = vec![]; + for i in 0..row.column_count() { + row_values.push(row.get_value(i)?); + } + rows_values.push(row_values); + } + Ok(rows_values) +}