From fef4e7e0e61a6dbfcb29c3069e88f7c75f9e2d69 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 15 Aug 2025 16:57:59 +0400 Subject: [PATCH] move tests from tursodb repo to turso-server repo --- .../src/database_sync_engine.rs | 967 +----------------- .../src/database_sync_operations.rs | 4 +- packages/turso-sync-engine/src/lib.rs | 125 +-- .../turso-sync-engine/src/test_context.rs | 154 --- packages/turso-sync-engine/src/test_empty.db | Bin 4096 -> 0 bytes .../turso-sync-engine/src/test_protocol_io.rs | 222 ---- .../turso-sync-engine/src/test_sync_server.rs | 351 ------- 7 files changed, 8 insertions(+), 1815 deletions(-) delete mode 100644 packages/turso-sync-engine/src/test_context.rs delete mode 100644 packages/turso-sync-engine/src/test_empty.db delete mode 100644 packages/turso-sync-engine/src/test_protocol_io.rs delete mode 100644 packages/turso-sync-engine/src/test_sync_server.rs diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 1acec6f1c..4c16970c7 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -332,6 +332,7 @@ impl DatabaseSyncEngine { } } + #[allow(dead_code)] async fn push_synced_to_remote(&mut self, coro: &Coro) -> Result<()> { tracing::info!( "push_synced_to_remote: draft={}, synced={}, id={}", @@ -451,969 +452,3 @@ impl DatabaseSyncEngine { self.meta.as_ref().expect("metadata must be set") } } - -#[cfg(test)] -pub mod tests { - use std::{collections::BTreeMap, sync::Arc}; - - use rand::RngCore; - use tokio::join; - - use crate::{ - database_sync_engine::DatabaseSyncEngineOpts, - errors::Error, - test_context::{FaultInjectionPlan, FaultInjectionStrategy, TestContext}, - test_protocol_io::TestProtocolIo, - test_sync_server::convert_rows, - tests::{deterministic_runtime, seed_u64, TestRunner}, - Result, - }; - - async fn query_rows(conn: &turso::Connection, sql: &str) -> Result>> { - let mut rows = conn.query(sql, ()).await?; - convert_rows(&mut rows).await - } - - #[test] - pub fn test_sync_single_db_simple() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) - .await - .unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (1)", ()) - .await - .unwrap(); - - let conn = runner.connect().await.unwrap(); - - // no table in schema before sync from remote (as DB was initialized when remote was empty) - assert!(matches!( - query_rows(&conn, "SELECT * FROM t").await, - Err(x) if x.to_string().contains("no such table: t") - )); - - // 1 rows synced - runner.pull().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![vec![turso::Value::Integer(1)]] - ); - - protocol - .server - .execute("INSERT INTO t VALUES (2)", ()) - .await - .unwrap(); - - conn.execute("INSERT INTO t VALUES (3)", ()).await.unwrap(); - - // changes are synced from the remote - but remote changes are not propagated locally - runner.push().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(3)] - ] - ); - - let server_db = protocol.server.db(); - let server_conn = server_db.connect().unwrap(); - assert_eq!( - convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap()) - .await - .unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - ] - ); - - conn.execute("INSERT INTO t VALUES (4)", ()).await.unwrap(); - runner.push().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(3)], - vec![turso::Value::Integer(4)] - ] - ); - - assert_eq!( - convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap()) - .await - .unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - vec![turso::Value::Integer(4)], - ] - ); - - runner.pull().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - vec![turso::Value::Integer(4)] - ] - ); - - assert_eq!( - convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap()) - .await - .unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - vec![turso::Value::Integer(4)], - ] - ); - }); - } - - #[test] - pub fn test_sync_single_db_no_changes_no_push() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) - .await - .unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (1)", ()) - .await - .unwrap(); - - let conn = runner.connect().await.unwrap(); - - runner.sync().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![vec![turso::Value::Integer(1)]] - ); - - conn.execute("INSERT INTO t VALUES (100)", ()) - .await - .unwrap(); - - protocol - .server - .execute("INSERT INTO t VALUES (2)", ()) - .await - .unwrap(); - - runner.sync().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(100)], - ] - ); - - protocol - .server - .execute("INSERT INTO t VALUES (3)", ()) - .await - .unwrap(); - runner.sync().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - vec![turso::Value::Integer(100)], - ] - ); - - ctx.switch_mode(FaultInjectionStrategy::Enabled { - plan: FaultInjectionPlan { - is_fault: Box::new(|name, _| Box::pin(async move { name == "wal_push_start" })), - }, - }) - .await; - - protocol - .server - .execute("INSERT INTO t VALUES (4)", ()) - .await - .unwrap(); - runner.sync().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - vec![turso::Value::Integer(4)], - vec![turso::Value::Integer(100)], - ] - ); - }); - } - - #[test] - pub fn test_sync_single_db_update_sync_concurrent() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::MemoryIO::new()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let opts = DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 1, - }; - - protocol - .server - .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ()) - .await - .unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES ('hello', 'world')", ()) - .await - .unwrap(); - - runner.init(":memory:", opts).await.unwrap(); - let conn = runner.connect().await.unwrap(); - - let syncs = async move { - for i in 0..10 { - tracing::info!("sync attempt #{i}"); - runner.sync().await.unwrap(); - } - }; - - let updates = async move { - for i in 0..10 { - tracing::info!("update attempt #{i}"); - let sql = format!("INSERT INTO t VALUES ('key-{i}', 'value-{i}')"); - match conn.execute(&sql, ()).await { - Ok(_) => {} - Err(err) if err.to_string().contains("database is locked") => {} - Err(err) => panic!("update failed: {err}"), - } - ctx.random_sleep_n(50).await; - } - }; - - join!(updates, syncs); - }); - } - - #[test] - pub fn test_sync_many_dbs_update_sync_concurrent() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::MemoryIO::new()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - protocol - .server - .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ()) - .await - .unwrap(); - protocol - .server - .execute( - "INSERT INTO t VALUES ('id-1', 'client1'), ('id-2', 'client2')", - (), - ) - .await - .unwrap(); - let mut runner1 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - runner1 - .init( - ":memory:-1", - DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 2, - }, - ) - .await - .unwrap(); - let mut runner2 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - runner2 - .init( - ":memory:-2", - DatabaseSyncEngineOpts { - client_name: "id-2".to_string(), - wal_pull_batch_size: 2, - }, - ) - .await - .unwrap(); - - let conn1 = runner1.connect().await.unwrap(); - let conn2 = runner2.connect().await.unwrap(); - - let syncs1 = async move { - for i in 0..10 { - tracing::info!("sync attempt #{i}"); - match runner1.sync().await { - Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue, - Err(err) => panic!("unexpected error: {err}"), - } - } - }; - - let syncs2 = async move { - for i in 0..10 { - tracing::info!("sync attempt #{i}"); - match runner2.sync().await { - Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue, - Err(err) => panic!("unexpected error: {err}"), - } - } - }; - - let ctx1 = ctx.clone(); - let updates1 = async move { - for i in 0..100 { - tracing::info!("update attempt #{i}"); - let sql = format!("INSERT INTO t VALUES ('key-1-{i}', 'value')"); - match conn1.execute(&sql, ()).await { - Ok(_) => {} - Err(err) if err.to_string().contains("database is locked") => {} - Err(err) => panic!("update failed: {err}"), - } - ctx1.random_sleep_n(10).await; - } - }; - - let ctx2 = ctx.clone(); - let updates2 = async move { - for i in 0..100 { - tracing::info!("update attempt #{i}"); - let sql = format!("INSERT INTO t VALUES ('key-2-{i}', 'value')"); - match conn2.execute(&sql, ()).await { - Ok(_) => {} - Err(err) if err.to_string().contains("database is locked") => {} - Err(err) => panic!("update failed: {err}"), - } - ctx2.random_sleep_n(10).await; - } - }; - - join!(updates1, updates2, syncs1, syncs2); - }); - } - - #[test] - pub fn test_sync_single_db_many_pulls_big_payloads() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 1, - }; - - runner.init(&local_path, opts).await.unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ()) - .await - .unwrap(); - - runner.sync().await.unwrap(); - - // create connection in outer scope in order to prevent Database from being dropped in between of pull operations - let conn = runner.connect().await.unwrap(); - - let mut expected = BTreeMap::new(); - for attempt in 0..10 { - for _ in 0..5 { - let key = ctx.rng().await.next_u32(); - let length = ctx.rng().await.next_u32() % (10 * 4096); - protocol - .server - .execute("INSERT INTO t VALUES (?, randomblob(?))", (key, length)) - .await - .unwrap(); - expected.insert(key as i64, length as i64); - } - - tracing::info!("pull attempt={}", attempt); - runner.sync().await.unwrap(); - - let expected = expected - .iter() - .map(|(x, y)| vec![turso::Value::Integer(*x), turso::Value::Integer(*y)]) - .collect::>(); - assert_eq!( - query_rows(&conn, "SELECT x, length(y) FROM t") - .await - .unwrap(), - expected - ); - } - }); - } - - #[test] - pub fn test_sync_single_db_checkpoint() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ()) - .await - .unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (1, randomblob(4 * 4096))", ()) - .await - .unwrap(); - protocol.server.checkpoint().await.unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (2, randomblob(5 * 4096))", ()) - .await - .unwrap(); - protocol.server.checkpoint().await.unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (3, randomblob(6 * 4096))", ()) - .await - .unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (4, randomblob(7 * 4096))", ()) - .await - .unwrap(); - - let conn = runner.connect().await.unwrap(); - - runner.pull().await.unwrap(); - - assert_eq!( - query_rows(&conn, "SELECT x, length(y) FROM t") - .await - .unwrap(), - vec![ - vec![turso::Value::Integer(1), turso::Value::Integer(4 * 4096)], - vec![turso::Value::Integer(2), turso::Value::Integer(5 * 4096)], - vec![turso::Value::Integer(3), turso::Value::Integer(6 * 4096)], - vec![turso::Value::Integer(4), turso::Value::Integer(7 * 4096)], - ] - ); - }); - } - - #[test] - pub fn test_sync_single_db_full_syncs() { - deterministic_runtime(async || { - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let server = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); - let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: "id-1".to_string(), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - server - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) - .await - .unwrap(); - server - .server - .execute("INSERT INTO t VALUES (1)", ()) - .await - .unwrap(); - - let conn = runner.connect().await.unwrap(); - - // no table in schema before sync from remote (as DB was initialized when remote was empty) - assert!(matches!( - query_rows(&conn, "SELECT * FROM t").await, - Err(x) if x.to_string().contains("no such table: t") - )); - - runner.sync().await.unwrap(); - - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![vec![turso::Value::Integer(1)]] - ); - - conn.execute("INSERT INTO t VALUES (2)", ()).await.unwrap(); - runner.sync().await.unwrap(); - - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)] - ] - ); - - conn.execute("INSERT INTO t VALUES (3)", ()).await.unwrap(); - runner.sync().await.unwrap(); - assert_eq!( - query_rows(&conn, "SELECT * FROM t").await.unwrap(), - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)] - ] - ); - }); - } - - #[test] - pub fn test_sync_multiple_dbs_conflict() { - deterministic_runtime(async || { - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let ctx = Arc::new(TestContext::new(seed_u64())); - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - let mut dbs = Vec::new(); - const CLIENTS: usize = 8; - for i in 0..CLIENTS { - let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir - .path() - .join(format!("local-{i}.db")) - .to_str() - .unwrap() - .to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: format!("id-{i}"), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - dbs.push(runner); - } - - protocol - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) - .await - .unwrap(); - - for db in &mut dbs { - db.pull().await.unwrap(); - } - for (i, db) in dbs.iter().enumerate() { - let conn = db.connect().await.unwrap(); - conn.execute("INSERT INTO t VALUES (?)", (i as i32,)) - .await - .unwrap(); - } - - let try_sync = || async { - let mut tasks = Vec::new(); - for db in &dbs { - tasks.push(async move { db.push().await }); - } - futures::future::join_all(tasks).await - }; - for attempt in 0..CLIENTS { - let results = try_sync().await; - tracing::info!("attempt #{}: {:?}", attempt, results); - assert!(results.iter().filter(|x| x.is_ok()).count() > attempt); - } - }); - } - - #[test] - pub fn test_sync_multiple_clients_no_conflicts_synchronized() { - deterministic_runtime(async || { - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - - protocol - .server - .execute("CREATE TABLE t(k INTEGER PRIMARY KEY, v)", ()) - .await - .unwrap(); - - let sync_lock = Arc::new(tokio::sync::Mutex::new(())); - let mut clients = Vec::new(); - const CLIENTS: usize = 10; - let mut expected_rows = Vec::new(); - for i in 0..CLIENTS { - let mut queries = Vec::new(); - let cnt = ctx.rng().await.next_u32() % CLIENTS as u32 + 1; - for q in 0..cnt { - let key = i * CLIENTS + q as usize; - let length = ctx.rng().await.next_u32() % 4096; - queries.push(format!( - "INSERT INTO t VALUES ({key}, randomblob({length}))", - )); - expected_rows.push(vec![ - turso::Value::Integer(key as i64), - turso::Value::Integer(length as i64), - ]); - } - clients.push({ - let io = io.clone(); - let dir = dir.path().to_path_buf().clone(); - let ctx = ctx.clone(); - let server = protocol.clone(); - let sync_lock = sync_lock.clone(); - async move { - let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); - let local_path = dir - .join(format!("local-{i}.db")) - .to_str() - .unwrap() - .to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: format!("id-{i}"), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - runner.pull().await.unwrap(); - let conn = runner.connect().await.unwrap(); - for query in queries { - conn.execute(&query, ()).await.unwrap(); - } - let guard = sync_lock.lock().await; - runner.push().await.unwrap(); - drop(guard); - } - }); - } - for client in clients { - client.await; - } - let db = protocol.server.db(); - let conn = db.connect().unwrap(); - let mut result = conn.query("SELECT k, length(v) FROM t", ()).await.unwrap(); - let rows = convert_rows(&mut result).await.unwrap(); - assert_eq!(rows, expected_rows); - }); - } - - #[test] - pub fn test_sync_single_db_sync_from_remote_nothing_single_failure() { - deterministic_runtime(async || { - let dir = tempfile::TempDir::new().unwrap(); - let server_path = dir.path().join("server.db"); - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let ctx = Arc::new(TestContext::new(seed_u64())); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x)", ()) - .await - .unwrap(); - protocol - .server - .execute("INSERT INTO t VALUES (1), (2), (3)", ()) - .await - .unwrap(); - - let mut session = ctx.fault_session(); - let mut it = 0; - while let Some(strategy) = session.next().await { - it += 1; - - let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir - .path() - .join(format!("local-{it}.db")) - .to_str() - .unwrap() - .to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: format!("id-{it}"), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. }); - - ctx.switch_mode(strategy).await; - let result = runner.pull().await; - ctx.switch_mode(FaultInjectionStrategy::Disabled).await; - - if !has_fault { - result.unwrap(); - } else { - let err = result.err().unwrap(); - tracing::info!("error after fault injection: {}", err); - } - - let conn = runner.connect().await.unwrap(); - let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap(); - assert_eq!( - rows, - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - ] - ); - - runner.pull().await.unwrap(); - - let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap(); - assert_eq!( - rows, - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - ] - ); - } - }); - } - - #[test] - pub fn test_sync_single_db_sync_from_remote_single_failure() { - deterministic_runtime(async || { - let dir = tempfile::TempDir::new().unwrap(); - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let ctx = Arc::new(TestContext::new(seed_u64())); - - let mut session = ctx.fault_session(); - let mut it = 0; - while let Some(strategy) = session.next().await { - it += 1; - - let server_path = dir.path().join(format!("server-{it}.db")); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x)", ()) - .await - .unwrap(); - - let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir - .path() - .join(format!("local-{it}.db")) - .to_str() - .unwrap() - .to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: format!("id-{it}"), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - protocol - .server - .execute("INSERT INTO t VALUES (1), (2), (3)", ()) - .await - .unwrap(); - - let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. }); - - ctx.switch_mode(strategy).await; - let result = runner.pull().await; - ctx.switch_mode(FaultInjectionStrategy::Disabled).await; - - if !has_fault { - result.unwrap(); - } else { - let err = result.err().unwrap(); - tracing::info!("error after fault injection: {}", err); - } - - let conn = runner.connect().await.unwrap(); - let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap(); - assert!(rows.len() <= 3); - - runner.pull().await.unwrap(); - - let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap(); - assert_eq!( - rows, - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - ] - ); - } - }); - } - - #[test] - pub fn test_sync_single_db_sync_to_remote_single_failure() { - deterministic_runtime(async || { - let dir = tempfile::TempDir::new().unwrap(); - let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); - let ctx = Arc::new(TestContext::new(seed_u64())); - - let mut session = ctx.fault_session(); - let mut it = 0; - while let Some(strategy) = session.next().await { - it += 1; - - let server_path = dir.path().join(format!("server-{it}.db")); - let protocol = TestProtocolIo::new(ctx.clone(), &server_path) - .await - .unwrap(); - - protocol - .server - .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) - .await - .unwrap(); - - protocol - .server - .execute("INSERT INTO t VALUES (1)", ()) - .await - .unwrap(); - - let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir - .path() - .join(format!("local-{it}.db")) - .to_str() - .unwrap() - .to_string(); - let opts = DatabaseSyncEngineOpts { - client_name: format!("id-{it}"), - wal_pull_batch_size: 1, - }; - runner.init(&local_path, opts).await.unwrap(); - - let conn = runner.connect().await.unwrap(); - - conn.execute("INSERT INTO t VALUES (2), (3)", ()) - .await - .unwrap(); - - let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. }); - - ctx.switch_mode(strategy).await; - let result = runner.push().await; - ctx.switch_mode(FaultInjectionStrategy::Disabled).await; - - if !has_fault { - result.unwrap(); - } else { - let err = result.err().unwrap(); - tracing::info!("error after fault injection: {}", err); - } - - let server_db = protocol.server.db(); - let server_conn = server_db.connect().unwrap(); - let rows = - convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap()) - .await - .unwrap(); - assert!(rows.len() <= 3); - - runner.push().await.unwrap(); - - let rows = - convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap()) - .await - .unwrap(); - assert_eq!( - rows, - vec![ - vec![turso::Value::Integer(1)], - vec![turso::Value::Integer(2)], - vec![turso::Value::Integer(3)], - ] - ); - } - }); - } -} diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 3db7c6553..3851db953 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -12,8 +12,8 @@ use crate::{ protocol_io::{DataCompletion, DataPollResult, ProtocolIO}, server_proto::{self, ExecuteStreamReq, Stmt, StreamRequest}, types::{ - Coro, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType, DbSyncInfo, - DbSyncStatus, ProtocolCommand, + Coro, DatabaseTapeOperation, DatabaseTapeRowChangeType, DbSyncInfo, DbSyncStatus, + ProtocolCommand, }, wal_session::WalSession, Result, diff --git a/packages/turso-sync-engine/src/lib.rs b/packages/turso-sync-engine/src/lib.rs index cc084b2e0..0546a15e7 100644 --- a/packages/turso-sync-engine/src/lib.rs +++ b/packages/turso-sync-engine/src/lib.rs @@ -5,35 +5,15 @@ pub mod database_tape; pub mod errors; pub mod io_operations; pub mod protocol_io; +pub mod server_proto; pub mod types; pub mod wal_session; -pub mod server_proto; - -#[cfg(test)] -pub mod test_context; -#[cfg(test)] -pub mod test_protocol_io; -#[cfg(test)] -pub mod test_sync_server; pub type Result = std::result::Result; #[cfg(test)] mod tests { - use std::sync::Arc; - - use tokio::{select, sync::Mutex}; use tracing_subscriber::EnvFilter; - use turso_core::IO; - - use crate::{ - database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, - errors::Error, - test_context::TestContext, - test_protocol_io::TestProtocolIo, - types::{Coro, ProtocolCommand}, - Result, - }; #[ctor::ctor] fn init() { @@ -43,14 +23,17 @@ mod tests { .init(); } + #[allow(dead_code)] pub fn seed_u64() -> u64 { seed().parse().unwrap_or(0) } + #[allow(dead_code)] pub fn seed() -> String { std::env::var("SEED").unwrap_or("0".to_string()) } + #[allow(dead_code)] pub fn deterministic_runtime_from_seed>( seed: &[u8], f: impl Fn() -> F, @@ -65,107 +48,9 @@ mod tests { runtime.block_on(f()); } + #[allow(dead_code)] pub fn deterministic_runtime>(f: impl Fn() -> F) { let seed = seed(); deterministic_runtime_from_seed(seed.as_bytes(), f); } - - pub struct TestRunner { - pub ctx: Arc, - pub io: Arc, - pub sync_server: TestProtocolIo, - db: Option>>>, - } - - impl TestRunner { - pub fn new(ctx: Arc, io: Arc, sync_server: TestProtocolIo) -> Self { - Self { - ctx, - io, - sync_server, - db: None, - } - } - pub async fn init(&mut self, local_path: &str, opts: DatabaseSyncEngineOpts) -> Result<()> { - let io = self.io.clone(); - let server = self.sync_server.clone(); - let db = self - .run(genawaiter::sync::Gen::new(|coro| async move { - DatabaseSyncEngine::new(&coro, io, Arc::new(server), local_path, opts).await - })) - .await - .unwrap(); - self.db = Some(Arc::new(Mutex::new(db))); - Ok(()) - } - pub async fn connect(&self) -> Result { - self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| { - Ok(turso::Connection::create(db.connect(coro).await?)) - }) - .await - } - pub async fn pull(&self) -> Result<()> { - self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| { - db.pull(coro).await - }) - .await - } - pub async fn push(&self) -> Result<()> { - self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| { - db.push(coro).await - }) - .await - } - pub async fn sync(&self) -> Result<()> { - self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| { - db.sync(coro).await - }) - .await - } - pub async fn run_db_fn( - &self, - db: &Arc>>, - f: impl AsyncFn(&Coro, &mut DatabaseSyncEngine) -> Result, - ) -> Result { - let g = genawaiter::sync::Gen::new({ - let db = db.clone(); - |coro| async move { - let mut db = db.lock().await; - f(&coro, &mut db).await - } - }); - self.run(g).await - } - pub async fn run>>( - &self, - mut g: genawaiter::sync::Gen, F>, - ) -> Result { - let mut response = Ok(()); - loop { - // we must drive internal tokio clocks on every iteration - otherwise one TestRunner without work can block everything - // if other TestRunner sleeping - as time will "freeze" in this case - self.ctx.random_sleep().await; - - match g.resume_with(response) { - genawaiter::GeneratorState::Complete(result) => return result, - genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => { - let drained = { - let mut requests = self.sync_server.requests.lock().unwrap(); - requests.drain(..).collect::>() - }; - for mut request in drained { - select! { - value = &mut request => { value.unwrap(); }, - _ = self.ctx.random_sleep() => { self.sync_server.requests.lock().unwrap().push(request); } - }; - } - response = - self.io.run_once().map(|_| ()).map_err(|e| { - Error::DatabaseSyncEngineError(format!("io error: {e}")) - }); - } - } - } - } - } } diff --git a/packages/turso-sync-engine/src/test_context.rs b/packages/turso-sync-engine/src/test_context.rs deleted file mode 100644 index 3d67d135d..000000000 --- a/packages/turso-sync-engine/src/test_context.rs +++ /dev/null @@ -1,154 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - future::Future, - pin::Pin, - sync::Arc, -}; - -use rand::{RngCore, SeedableRng}; -use rand_chacha::ChaCha8Rng; -use tokio::sync::Mutex; - -use crate::{errors::Error, Result}; - -type PinnedFuture = Pin + Send>>; - -pub struct FaultInjectionPlan { - pub is_fault: Box PinnedFuture + Send + Sync>, -} - -pub enum FaultInjectionStrategy { - Disabled, - Record, - Enabled { plan: FaultInjectionPlan }, -} - -impl std::fmt::Debug for FaultInjectionStrategy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Disabled => write!(f, "Disabled"), - Self::Record => write!(f, "Record"), - Self::Enabled { .. } => write!(f, "Enabled"), - } - } -} - -pub struct TestContext { - fault_injection: Mutex, - faulty_call: Mutex>, - rng: Mutex, -} - -pub struct FaultSession { - ctx: Arc, - recording: bool, - plans: Option>, -} - -impl FaultSession { - pub async fn next(&mut self) -> Option { - if !self.recording { - self.recording = true; - return Some(FaultInjectionStrategy::Record); - } - if self.plans.is_none() { - self.plans = Some(self.ctx.enumerate_simple_plans().await); - } - - let plans = self.plans.as_mut().unwrap(); - if plans.is_empty() { - return None; - } - - let plan = plans.pop().unwrap(); - Some(FaultInjectionStrategy::Enabled { plan }) - } -} - -impl TestContext { - pub fn new(seed: u64) -> Self { - Self { - rng: Mutex::new(ChaCha8Rng::seed_from_u64(seed)), - fault_injection: Mutex::new(FaultInjectionStrategy::Disabled), - faulty_call: Mutex::new(HashSet::new()), - } - } - pub async fn random_sleep(&self) { - let delay = self.rng.lock().await.next_u64() % 1000; - tokio::time::sleep(std::time::Duration::from_millis(delay)).await - } - pub async fn random_sleep_n(&self, n: u64) { - let delay = { - let mut rng = self.rng.lock().await; - rng.next_u64() % 1000 * (rng.next_u64() % n + 1) - }; - tokio::time::sleep(std::time::Duration::from_millis(delay)).await - } - - pub async fn rng(&self) -> tokio::sync::MutexGuard { - self.rng.lock().await - } - pub fn fault_session(self: &Arc) -> FaultSession { - FaultSession { - ctx: self.clone(), - recording: false, - plans: None, - } - } - pub async fn switch_mode(&self, updated: FaultInjectionStrategy) { - let mut mode = self.fault_injection.lock().await; - tracing::info!("switch fault injection mode: {:?}", updated); - *mode = updated; - } - pub async fn enumerate_simple_plans(&self) -> Vec { - let mut plans = vec![]; - for call in self.faulty_call.lock().await.iter() { - let mut fault_counts = HashMap::new(); - fault_counts.insert(call.clone(), 1); - - let count = Arc::new(Mutex::new(1)); - let call = call.clone(); - plans.push(FaultInjectionPlan { - is_fault: Box::new(move |name, bt| { - let call = call.clone(); - let count = count.clone(); - Box::pin(async move { - if (name, bt) != call { - return false; - } - let mut count = count.lock().await; - *count -= 1; - *count >= 0 - }) - }), - }) - } - plans - } - pub async fn faulty_call(&self, name: &str) -> Result<()> { - tracing::trace!("faulty_call: {}", name); - - // sleep here in order for scheduler to interleave different executions - self.random_sleep().await; - - if let FaultInjectionStrategy::Disabled = &*self.fault_injection.lock().await { - return Ok(()); - } - let bt = std::backtrace::Backtrace::force_capture().to_string(); - match &mut *self.fault_injection.lock().await { - FaultInjectionStrategy::Record => { - let mut call_sites = self.faulty_call.lock().await; - call_sites.insert((name.to_string(), bt)); - Ok(()) - } - FaultInjectionStrategy::Enabled { plan } => { - if plan.is_fault.as_ref()(name.to_string(), bt.clone()).await { - Err(Error::DatabaseSyncEngineError("injected fault".to_string())) - } else { - Ok(()) - } - } - _ => unreachable!("Disabled case handled above"), - } - } -} diff --git a/packages/turso-sync-engine/src/test_empty.db b/packages/turso-sync-engine/src/test_empty.db deleted file mode 100644 index 0a06b00940a2e489182e153184a104fe6003c831..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmWFz^vNtqRY=P(%1ta$FlG>7U}9o$P*7lCU|@t|AVoG{WY8>>>>>, - pub server: TestSyncServer, - ctx: Arc, - files: Arc>>>, -} - -pub struct TestDataPollResult(Vec); - -impl DataPollResult for TestDataPollResult { - fn data(&self) -> &[u8] { - &self.0 - } -} - -#[derive(Clone)] -pub struct TestDataCompletion { - status: Arc>>, - chunks: Arc>>>, - done: Arc>, - poisoned: Arc>>, -} - -impl Default for TestDataCompletion { - fn default() -> Self { - Self::new() - } -} - -impl TestDataCompletion { - pub fn new() -> Self { - Self { - status: Arc::new(std::sync::Mutex::new(None)), - chunks: Arc::new(std::sync::Mutex::new(VecDeque::new())), - done: Arc::new(std::sync::Mutex::new(false)), - poisoned: Arc::new(std::sync::Mutex::new(None)), - } - } - pub fn set_status(&self, status: u16) { - *self.status.lock().unwrap() = Some(status); - } - - pub fn push_data(&self, data: Vec) { - let mut chunks = self.chunks.lock().unwrap(); - chunks.push_back(data); - } - - pub fn set_done(&self) { - *self.done.lock().unwrap() = true; - } - - pub fn poison(&self, err: &str) { - *self.poisoned.lock().unwrap() = Some(err.to_string()); - } -} - -impl DataCompletion for TestDataCompletion { - type DataPollResult = TestDataPollResult; - - fn status(&self) -> Result> { - let poison = self.poisoned.lock().unwrap(); - if poison.is_some() { - return Err(Error::DatabaseSyncEngineError(format!( - "status error: {poison:?}" - ))); - } - Ok(*self.status.lock().unwrap()) - } - - fn poll_data(&self) -> Result> { - let poison = self.poisoned.lock().unwrap(); - if poison.is_some() { - return Err(Error::DatabaseSyncEngineError(format!( - "poll_data error: {poison:?}" - ))); - } - let mut chunks = self.chunks.lock().unwrap(); - Ok(chunks.pop_front().map(TestDataPollResult)) - } - - fn is_done(&self) -> Result { - let poison = self.poisoned.lock().unwrap(); - if poison.is_some() { - return Err(Error::DatabaseSyncEngineError(format!( - "is_done error: {poison:?}" - ))); - } - Ok(*self.done.lock().unwrap()) - } -} - -impl TestProtocolIo { - pub async fn new(ctx: Arc, path: &Path) -> Result { - Ok(Self { - ctx: ctx.clone(), - requests: Arc::new(std::sync::Mutex::new(Vec::new())), - server: TestSyncServer::new(ctx, path).await?, - files: Arc::new(Mutex::new(HashMap::new())), - }) - } - fn schedule< - Fut: std::future::Future> + Send + 'static, - F: FnOnce(TestSyncServer, TestDataCompletion) -> Fut + Send + 'static, - >( - &self, - completion: TestDataCompletion, - f: F, - ) { - let server = self.server.clone(); - let mut requests = self.requests.lock().unwrap(); - requests.push(Box::pin(tokio::spawn(async move { - if let Err(err) = f(server, completion.clone()).await { - tracing::info!("poison completion: {}", err); - completion.poison(&err.to_string()); - } - }))); - } -} - -impl ProtocolIO for TestProtocolIo { - type DataCompletion = TestDataCompletion; - fn http(&self, method: &str, path: &str, data: Option>) -> Result { - let completion = TestDataCompletion::new(); - { - let completion = completion.clone(); - let path = &path[1..].split("/").collect::>(); - match (method, path.as_slice()) { - ("GET", ["info"]) => { - self.schedule(completion, |s, c| async move { s.db_info(c).await }); - } - ("GET", ["export", generation]) => { - let generation = generation.parse().unwrap(); - self.schedule(completion, async move |s, c| { - s.db_export(c, generation).await - }); - } - ("GET", ["sync", generation, start, end]) => { - let generation = generation.parse().unwrap(); - let start = start.parse().unwrap(); - let end = end.parse().unwrap(); - self.schedule(completion, async move |s, c| { - s.wal_pull(c, generation, start, end).await - }); - } - ("POST", ["sync", generation, start, end]) => { - let generation = generation.parse().unwrap(); - let start = start.parse().unwrap(); - let end = end.parse().unwrap(); - let data = data.unwrap(); - self.schedule(completion, async move |s, c| { - s.wal_push(c, None, generation, start, end, data).await - }); - } - ("POST", ["sync", generation, start, end, baton]) => { - let baton = baton.to_string(); - let generation = generation.parse().unwrap(); - let start = start.parse().unwrap(); - let end = end.parse().unwrap(); - let data = data.unwrap(); - self.schedule(completion, async move |s, c| { - s.wal_push(c, Some(baton), generation, start, end, data) - .await - }); - } - _ => panic!("unexpected sync server request: {method} {path:?}"), - }; - } - Ok(completion) - } - - fn full_read(&self, path: &str) -> Result { - let completion = TestDataCompletion::new(); - let ctx = self.ctx.clone(); - let files = self.files.clone(); - let path = path.to_string(); - self.schedule(completion.clone(), async move |_, c| { - ctx.faulty_call("full_read_start").await?; - let files = files.lock().await; - let result = files.get(&path); - c.push_data(result.cloned().unwrap_or(Vec::new())); - ctx.faulty_call("full_read_end").await?; - c.set_done(); - Ok(()) - }); - Ok(completion) - } - - fn full_write(&self, path: &str, content: Vec) -> Result { - let completion = TestDataCompletion::new(); - let ctx = self.ctx.clone(); - let files = self.files.clone(); - let path = path.to_string(); - self.schedule(completion.clone(), async move |_, c| { - ctx.faulty_call("full_write_start").await?; - let mut files = files.lock().await; - files.insert(path, content); - ctx.faulty_call("full_write_end").await?; - c.set_done(); - Ok(()) - }); - Ok(completion) - } -} diff --git a/packages/turso-sync-engine/src/test_sync_server.rs b/packages/turso-sync-engine/src/test_sync_server.rs deleted file mode 100644 index ae5be4a22..000000000 --- a/packages/turso-sync-engine/src/test_sync_server.rs +++ /dev/null @@ -1,351 +0,0 @@ -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::Arc, -}; - -use tokio::sync::Mutex; - -use crate::{ - errors::Error, - test_context::TestContext, - test_protocol_io::TestDataCompletion, - types::{DbSyncInfo, DbSyncStatus}, - Result, -}; - -const PAGE_SIZE: usize = 4096; -const FRAME_SIZE: usize = 24 + PAGE_SIZE; - -struct Generation { - snapshot: Vec, - frames: Vec>, -} - -#[derive(Clone)] -struct SyncSession { - baton: String, - conn: turso::Connection, - in_txn: bool, -} - -struct TestSyncServerState { - generation: u64, - generations: HashMap, - sessions: HashMap, -} - -#[derive(Clone)] -pub struct TestSyncServer { - path: PathBuf, - ctx: Arc, - db: turso::Database, - state: Arc>, -} - -impl TestSyncServer { - pub async fn new(ctx: Arc, path: &Path) -> Result { - let mut generations = HashMap::new(); - generations.insert( - 1, - Generation { - snapshot: EMPTY_WAL_MODE_DB.to_vec(), - frames: Vec::new(), - }, - ); - Ok(Self { - path: path.to_path_buf(), - ctx, - db: turso::Builder::new_local(path.to_str().unwrap()) - .build() - .await?, - state: Arc::new(Mutex::new(TestSyncServerState { - generation: 1, - generations, - sessions: HashMap::new(), - })), - }) - } - pub async fn db_info(&self, completion: TestDataCompletion) -> Result<()> { - tracing::debug!("db_info"); - self.ctx.faulty_call("db_info_start").await?; - - let state = self.state.lock().await; - let result = DbSyncInfo { - current_generation: state.generation, - }; - - completion.set_status(200); - self.ctx.faulty_call("db_info_status").await?; - - completion.push_data(serde_json::to_vec(&result)?); - self.ctx.faulty_call("db_info_data").await?; - - completion.set_done(); - - Ok(()) - } - - pub async fn db_export( - &self, - completion: TestDataCompletion, - generation_id: u64, - ) -> Result<()> { - tracing::debug!("db_export: {}", generation_id); - self.ctx.faulty_call("db_export_start").await?; - - let state = self.state.lock().await; - let Some(generation) = state.generations.get(&generation_id) else { - return Err(Error::DatabaseSyncEngineError( - "generation not found".to_string(), - )); - }; - completion.set_status(200); - self.ctx.faulty_call("db_export_status").await?; - - completion.push_data(generation.snapshot.clone()); - self.ctx.faulty_call("db_export_push").await?; - - completion.set_done(); - - Ok(()) - } - - pub async fn wal_pull( - &self, - completion: TestDataCompletion, - generation_id: u64, - start_frame: u64, - end_frame: u64, - ) -> Result<()> { - tracing::debug!("wal_pull: {}/{}/{}", generation_id, start_frame, end_frame); - self.ctx.faulty_call("wal_pull_start").await?; - - let state = self.state.lock().await; - let Some(generation) = state.generations.get(&generation_id) else { - return Err(Error::DatabaseSyncEngineError( - "generation not found".to_string(), - )); - }; - let mut data = Vec::new(); - for frame_no in start_frame..end_frame { - let frame_idx = frame_no - 1; - let Some(frame) = generation.frames.get(frame_idx as usize) else { - break; - }; - data.extend_from_slice(frame); - } - if data.is_empty() { - let last_generation = state.generations.get(&state.generation).unwrap(); - - let status = DbSyncStatus { - baton: None, - status: "checkpoint_needed".to_string(), - generation: state.generation, - max_frame_no: last_generation.frames.len() as u64, - }; - completion.set_status(400); - self.ctx.faulty_call("wal_pull_400_status").await?; - - completion.push_data(serde_json::to_vec(&status)?); - self.ctx.faulty_call("wal_pull_400_push").await?; - - completion.set_done(); - } else { - completion.set_status(200); - self.ctx.faulty_call("wal_pull_200_status").await?; - - completion.push_data(data); - self.ctx.faulty_call("wal_pull_200_push").await?; - - completion.set_done(); - }; - - Ok(()) - } - - pub async fn wal_push( - &self, - completion: TestDataCompletion, - mut baton: Option, - generation_id: u64, - start_frame: u64, - end_frame: u64, - frames: Vec, - ) -> Result<()> { - tracing::debug!( - "wal_push: {}/{}/{}/{:?}", - generation_id, - start_frame, - end_frame, - baton - ); - self.ctx.faulty_call("wal_push_start").await?; - - let mut session = { - let mut state = self.state.lock().await; - if state.generation != generation_id { - let generation = state.generations.get(&state.generation).unwrap(); - let max_frame_no = generation.frames.len(); - let status = DbSyncStatus { - baton: None, - status: "checkpoint_needed".to_string(), - generation: state.generation, - max_frame_no: max_frame_no as u64, - }; - - let status = serde_json::to_vec(&status)?; - - completion.set_status(200); - self.ctx.faulty_call("wal_push_status").await?; - - completion.push_data(status); - self.ctx.faulty_call("wal_push_push").await?; - - completion.set_done(); - return Ok(()); - } - let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - let session = match state.sessions.get(&baton_str) { - Some(session) => session.clone(), - None => { - let session = SyncSession { - baton: baton_str.clone(), - conn: self.db.connect()?, - in_txn: false, - }; - state.sessions.insert(baton_str.clone(), session.clone()); - session - } - }; - baton = Some(baton_str.clone()); - session - }; - - let conflict = 'conflict: { - let mut offset = 0; - for frame_no in start_frame..end_frame { - if offset + FRAME_SIZE > frames.len() { - return Err(Error::DatabaseSyncEngineError( - "unexpected length of frames data".to_string(), - )); - } - if !session.in_txn { - session.conn.wal_insert_begin()?; - session.in_txn = true; - } - let frame = &frames[offset..offset + FRAME_SIZE]; - match session.conn.wal_insert_frame(frame_no, frame) { - Ok(info) => { - if info.is_commit_frame() { - if session.in_txn { - session.conn.wal_insert_end()?; - session.in_txn = false; - } - self.sync_frames_from_conn(&session.conn).await?; - } - } - Err(turso::Error::WalOperationError(err)) if err.contains("Conflict") => { - session.conn.wal_insert_end()?; - break 'conflict true; - } - Err(err) => { - session.conn.wal_insert_end()?; - return Err(err.into()); - } - } - offset += FRAME_SIZE; - } - false - }; - let mut state = self.state.lock().await; - state - .sessions - .insert(baton.clone().unwrap(), session.clone()); - let status = DbSyncStatus { - baton: Some(session.baton.clone()), - status: if conflict { "conflict" } else { "ok" }.into(), - generation: state.generation, - max_frame_no: session.conn.wal_frame_count()?, - }; - - let status = serde_json::to_vec(&status)?; - - completion.set_status(200); - self.ctx.faulty_call("wal_push_status").await?; - - completion.push_data(status); - self.ctx.faulty_call("wal_push_push").await?; - - completion.set_done(); - - Ok(()) - } - - pub fn db(&self) -> turso::Database { - self.db.clone() - } - pub async fn checkpoint(&self) -> Result<()> { - tracing::debug!("checkpoint sync-server db"); - let conn = self.db.connect()?; - let mut rows = conn.query("PRAGMA wal_checkpoint(TRUNCATE)", ()).await?; - let Some(_) = rows.next().await? else { - return Err(Error::DatabaseSyncEngineError( - "checkpoint must return single row".to_string(), - )); - }; - let mut state = self.state.lock().await; - let generation = state.generation + 1; - state.generation = generation; - state.generations.insert( - generation, - Generation { - snapshot: std::fs::read(&self.path).map_err(|e| { - Error::DatabaseSyncEngineError(format!( - "failed to create generation snapshot: {e}" - )) - })?, - frames: Vec::new(), - }, - ); - Ok(()) - } - pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> { - let conn = self.db.connect()?; - conn.execute(sql, params).await?; - tracing::debug!("sync_frames_from_conn after execute"); - self.sync_frames_from_conn(&conn).await?; - Ok(()) - } - async fn sync_frames_from_conn(&self, conn: &turso::Connection) -> Result<()> { - let mut state = self.state.lock().await; - let generation = state.generation; - let generation = state.generations.get_mut(&generation).unwrap(); - let last_frame = generation.frames.len() + 1; - let mut frame = [0u8; FRAME_SIZE]; - let wal_frame_count = conn.wal_frame_count()?; - tracing::debug!("conn frames count: {}", wal_frame_count); - for frame_no in last_frame..=wal_frame_count as usize { - let frame_info = conn.wal_get_frame(frame_no as u64, &mut frame)?; - tracing::debug!("push local frame {}, info={:?}", frame_no, frame_info); - generation.frames.push(frame.to_vec()); - } - Ok(()) - } -} - -// empty DB with single 4096-byte page and WAL mode (PRAGMA journal_mode=WAL) -// see test test_empty_wal_mode_db_content which validates asset content -pub const EMPTY_WAL_MODE_DB: &[u8] = include_bytes!("test_empty.db"); - -pub async fn convert_rows(rows: &mut turso::Rows) -> Result>> { - let mut rows_values = vec![]; - while let Some(row) = rows.next().await? { - let mut row_values = vec![]; - for i in 0..row.column_count() { - row_values.push(row.get_value(i)?); - } - rows_values.push(row_values); - } - Ok(rows_values) -}