move tests from tursodb repo to turso-server repo

This commit is contained in:
Nikita Sivukhin
2025-08-15 16:57:59 +04:00
parent 979e7da633
commit fef4e7e0e6
7 changed files with 8 additions and 1815 deletions

View File

@@ -332,6 +332,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
}
}
#[allow(dead_code)]
async fn push_synced_to_remote(&mut self, coro: &Coro) -> Result<()> {
tracing::info!(
"push_synced_to_remote: draft={}, synced={}, id={}",
@@ -451,969 +452,3 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
self.meta.as_ref().expect("metadata must be set")
}
}
#[cfg(test)]
pub mod tests {
use std::{collections::BTreeMap, sync::Arc};
use rand::RngCore;
use tokio::join;
use crate::{
database_sync_engine::DatabaseSyncEngineOpts,
errors::Error,
test_context::{FaultInjectionPlan, FaultInjectionStrategy, TestContext},
test_protocol_io::TestProtocolIo,
test_sync_server::convert_rows,
tests::{deterministic_runtime, seed_u64, TestRunner},
Result,
};
async fn query_rows(conn: &turso::Connection, sql: &str) -> Result<Vec<Vec<turso::Value>>> {
let mut rows = conn.query(sql, ()).await?;
convert_rows(&mut rows).await
}
#[test]
pub fn test_sync_single_db_simple() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1)", ())
.await
.unwrap();
let conn = runner.connect().await.unwrap();
// no table in schema before sync from remote (as DB was initialized when remote was empty)
assert!(matches!(
query_rows(&conn, "SELECT * FROM t").await,
Err(x) if x.to_string().contains("no such table: t")
));
// 1 rows synced
runner.pull().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![vec![turso::Value::Integer(1)]]
);
protocol
.server
.execute("INSERT INTO t VALUES (2)", ())
.await
.unwrap();
conn.execute("INSERT INTO t VALUES (3)", ()).await.unwrap();
// changes are synced from the remote - but remote changes are not propagated locally
runner.push().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(3)]
]
);
let server_db = protocol.server.db();
let server_conn = server_db.connect().unwrap();
assert_eq!(
convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap())
.await
.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
]
);
conn.execute("INSERT INTO t VALUES (4)", ()).await.unwrap();
runner.push().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(3)],
vec![turso::Value::Integer(4)]
]
);
assert_eq!(
convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap())
.await
.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
vec![turso::Value::Integer(4)],
]
);
runner.pull().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
vec![turso::Value::Integer(4)]
]
);
assert_eq!(
convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap())
.await
.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
vec![turso::Value::Integer(4)],
]
);
});
}
#[test]
pub fn test_sync_single_db_no_changes_no_push() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1)", ())
.await
.unwrap();
let conn = runner.connect().await.unwrap();
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![vec![turso::Value::Integer(1)]]
);
conn.execute("INSERT INTO t VALUES (100)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (2)", ())
.await
.unwrap();
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(100)],
]
);
protocol
.server
.execute("INSERT INTO t VALUES (3)", ())
.await
.unwrap();
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
vec![turso::Value::Integer(100)],
]
);
ctx.switch_mode(FaultInjectionStrategy::Enabled {
plan: FaultInjectionPlan {
is_fault: Box::new(|name, _| Box::pin(async move { name == "wal_push_start" })),
},
})
.await;
protocol
.server
.execute("INSERT INTO t VALUES (4)", ())
.await
.unwrap();
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
vec![turso::Value::Integer(4)],
vec![turso::Value::Integer(100)],
]
);
});
}
#[test]
pub fn test_sync_single_db_update_sync_concurrent() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::MemoryIO::new());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
protocol
.server
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES ('hello', 'world')", ())
.await
.unwrap();
runner.init(":memory:", opts).await.unwrap();
let conn = runner.connect().await.unwrap();
let syncs = async move {
for i in 0..10 {
tracing::info!("sync attempt #{i}");
runner.sync().await.unwrap();
}
};
let updates = async move {
for i in 0..10 {
tracing::info!("update attempt #{i}");
let sql = format!("INSERT INTO t VALUES ('key-{i}', 'value-{i}')");
match conn.execute(&sql, ()).await {
Ok(_) => {}
Err(err) if err.to_string().contains("database is locked") => {}
Err(err) => panic!("update failed: {err}"),
}
ctx.random_sleep_n(50).await;
}
};
join!(updates, syncs);
});
}
#[test]
pub fn test_sync_many_dbs_update_sync_concurrent() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::MemoryIO::new());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
protocol
.server
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ())
.await
.unwrap();
protocol
.server
.execute(
"INSERT INTO t VALUES ('id-1', 'client1'), ('id-2', 'client2')",
(),
)
.await
.unwrap();
let mut runner1 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
runner1
.init(
":memory:-1",
DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 2,
},
)
.await
.unwrap();
let mut runner2 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
runner2
.init(
":memory:-2",
DatabaseSyncEngineOpts {
client_name: "id-2".to_string(),
wal_pull_batch_size: 2,
},
)
.await
.unwrap();
let conn1 = runner1.connect().await.unwrap();
let conn2 = runner2.connect().await.unwrap();
let syncs1 = async move {
for i in 0..10 {
tracing::info!("sync attempt #{i}");
match runner1.sync().await {
Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue,
Err(err) => panic!("unexpected error: {err}"),
}
}
};
let syncs2 = async move {
for i in 0..10 {
tracing::info!("sync attempt #{i}");
match runner2.sync().await {
Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue,
Err(err) => panic!("unexpected error: {err}"),
}
}
};
let ctx1 = ctx.clone();
let updates1 = async move {
for i in 0..100 {
tracing::info!("update attempt #{i}");
let sql = format!("INSERT INTO t VALUES ('key-1-{i}', 'value')");
match conn1.execute(&sql, ()).await {
Ok(_) => {}
Err(err) if err.to_string().contains("database is locked") => {}
Err(err) => panic!("update failed: {err}"),
}
ctx1.random_sleep_n(10).await;
}
};
let ctx2 = ctx.clone();
let updates2 = async move {
for i in 0..100 {
tracing::info!("update attempt #{i}");
let sql = format!("INSERT INTO t VALUES ('key-2-{i}', 'value')");
match conn2.execute(&sql, ()).await {
Ok(_) => {}
Err(err) if err.to_string().contains("database is locked") => {}
Err(err) => panic!("update failed: {err}"),
}
ctx2.random_sleep_n(10).await;
}
};
join!(updates1, updates2, syncs1, syncs2);
});
}
#[test]
pub fn test_sync_single_db_many_pulls_big_payloads() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ())
.await
.unwrap();
runner.sync().await.unwrap();
// create connection in outer scope in order to prevent Database from being dropped in between of pull operations
let conn = runner.connect().await.unwrap();
let mut expected = BTreeMap::new();
for attempt in 0..10 {
for _ in 0..5 {
let key = ctx.rng().await.next_u32();
let length = ctx.rng().await.next_u32() % (10 * 4096);
protocol
.server
.execute("INSERT INTO t VALUES (?, randomblob(?))", (key, length))
.await
.unwrap();
expected.insert(key as i64, length as i64);
}
tracing::info!("pull attempt={}", attempt);
runner.sync().await.unwrap();
let expected = expected
.iter()
.map(|(x, y)| vec![turso::Value::Integer(*x), turso::Value::Integer(*y)])
.collect::<Vec<_>>();
assert_eq!(
query_rows(&conn, "SELECT x, length(y) FROM t")
.await
.unwrap(),
expected
);
}
});
}
#[test]
pub fn test_sync_single_db_checkpoint() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1, randomblob(4 * 4096))", ())
.await
.unwrap();
protocol.server.checkpoint().await.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (2, randomblob(5 * 4096))", ())
.await
.unwrap();
protocol.server.checkpoint().await.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (3, randomblob(6 * 4096))", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (4, randomblob(7 * 4096))", ())
.await
.unwrap();
let conn = runner.connect().await.unwrap();
runner.pull().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT x, length(y) FROM t")
.await
.unwrap(),
vec![
vec![turso::Value::Integer(1), turso::Value::Integer(4 * 4096)],
vec![turso::Value::Integer(2), turso::Value::Integer(5 * 4096)],
vec![turso::Value::Integer(3), turso::Value::Integer(6 * 4096)],
vec![turso::Value::Integer(4), turso::Value::Integer(7 * 4096)],
]
);
});
}
#[test]
pub fn test_sync_single_db_full_syncs() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let server = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone());
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
server
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ())
.await
.unwrap();
server
.server
.execute("INSERT INTO t VALUES (1)", ())
.await
.unwrap();
let conn = runner.connect().await.unwrap();
// no table in schema before sync from remote (as DB was initialized when remote was empty)
assert!(matches!(
query_rows(&conn, "SELECT * FROM t").await,
Err(x) if x.to_string().contains("no such table: t")
));
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![vec![turso::Value::Integer(1)]]
);
conn.execute("INSERT INTO t VALUES (2)", ()).await.unwrap();
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)]
]
);
conn.execute("INSERT INTO t VALUES (3)", ()).await.unwrap();
runner.sync().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)]
]
);
});
}
#[test]
pub fn test_sync_multiple_dbs_conflict() {
deterministic_runtime(async || {
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut dbs = Vec::new();
const CLIENTS: usize = 8;
for i in 0..CLIENTS {
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
let local_path = dir
.path()
.join(format!("local-{i}.db"))
.to_str()
.unwrap()
.to_string();
let opts = DatabaseSyncEngineOpts {
client_name: format!("id-{i}"),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
dbs.push(runner);
}
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ())
.await
.unwrap();
for db in &mut dbs {
db.pull().await.unwrap();
}
for (i, db) in dbs.iter().enumerate() {
let conn = db.connect().await.unwrap();
conn.execute("INSERT INTO t VALUES (?)", (i as i32,))
.await
.unwrap();
}
let try_sync = || async {
let mut tasks = Vec::new();
for db in &dbs {
tasks.push(async move { db.push().await });
}
futures::future::join_all(tasks).await
};
for attempt in 0..CLIENTS {
let results = try_sync().await;
tracing::info!("attempt #{}: {:?}", attempt, results);
assert!(results.iter().filter(|x| x.is_ok()).count() > attempt);
}
});
}
#[test]
pub fn test_sync_multiple_clients_no_conflicts_synchronized() {
deterministic_runtime(async || {
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
protocol
.server
.execute("CREATE TABLE t(k INTEGER PRIMARY KEY, v)", ())
.await
.unwrap();
let sync_lock = Arc::new(tokio::sync::Mutex::new(()));
let mut clients = Vec::new();
const CLIENTS: usize = 10;
let mut expected_rows = Vec::new();
for i in 0..CLIENTS {
let mut queries = Vec::new();
let cnt = ctx.rng().await.next_u32() % CLIENTS as u32 + 1;
for q in 0..cnt {
let key = i * CLIENTS + q as usize;
let length = ctx.rng().await.next_u32() % 4096;
queries.push(format!(
"INSERT INTO t VALUES ({key}, randomblob({length}))",
));
expected_rows.push(vec![
turso::Value::Integer(key as i64),
turso::Value::Integer(length as i64),
]);
}
clients.push({
let io = io.clone();
let dir = dir.path().to_path_buf().clone();
let ctx = ctx.clone();
let server = protocol.clone();
let sync_lock = sync_lock.clone();
async move {
let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone());
let local_path = dir
.join(format!("local-{i}.db"))
.to_str()
.unwrap()
.to_string();
let opts = DatabaseSyncEngineOpts {
client_name: format!("id-{i}"),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
runner.pull().await.unwrap();
let conn = runner.connect().await.unwrap();
for query in queries {
conn.execute(&query, ()).await.unwrap();
}
let guard = sync_lock.lock().await;
runner.push().await.unwrap();
drop(guard);
}
});
}
for client in clients {
client.await;
}
let db = protocol.server.db();
let conn = db.connect().unwrap();
let mut result = conn.query("SELECT k, length(v) FROM t", ()).await.unwrap();
let rows = convert_rows(&mut result).await.unwrap();
assert_eq!(rows, expected_rows);
});
}
#[test]
pub fn test_sync_single_db_sync_from_remote_nothing_single_failure() {
deterministic_runtime(async || {
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
protocol
.server
.execute("CREATE TABLE t(x)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1), (2), (3)", ())
.await
.unwrap();
let mut session = ctx.fault_session();
let mut it = 0;
while let Some(strategy) = session.next().await {
it += 1;
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
let local_path = dir
.path()
.join(format!("local-{it}.db"))
.to_str()
.unwrap()
.to_string();
let opts = DatabaseSyncEngineOpts {
client_name: format!("id-{it}"),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. });
ctx.switch_mode(strategy).await;
let result = runner.pull().await;
ctx.switch_mode(FaultInjectionStrategy::Disabled).await;
if !has_fault {
result.unwrap();
} else {
let err = result.err().unwrap();
tracing::info!("error after fault injection: {}", err);
}
let conn = runner.connect().await.unwrap();
let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap();
assert_eq!(
rows,
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
]
);
runner.pull().await.unwrap();
let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap();
assert_eq!(
rows,
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
]
);
}
});
}
#[test]
pub fn test_sync_single_db_sync_from_remote_single_failure() {
deterministic_runtime(async || {
let dir = tempfile::TempDir::new().unwrap();
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let ctx = Arc::new(TestContext::new(seed_u64()));
let mut session = ctx.fault_session();
let mut it = 0;
while let Some(strategy) = session.next().await {
it += 1;
let server_path = dir.path().join(format!("server-{it}.db"));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
protocol
.server
.execute("CREATE TABLE t(x)", ())
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
let local_path = dir
.path()
.join(format!("local-{it}.db"))
.to_str()
.unwrap()
.to_string();
let opts = DatabaseSyncEngineOpts {
client_name: format!("id-{it}"),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1), (2), (3)", ())
.await
.unwrap();
let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. });
ctx.switch_mode(strategy).await;
let result = runner.pull().await;
ctx.switch_mode(FaultInjectionStrategy::Disabled).await;
if !has_fault {
result.unwrap();
} else {
let err = result.err().unwrap();
tracing::info!("error after fault injection: {}", err);
}
let conn = runner.connect().await.unwrap();
let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap();
assert!(rows.len() <= 3);
runner.pull().await.unwrap();
let rows = query_rows(&conn, "SELECT * FROM t").await.unwrap();
assert_eq!(
rows,
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
]
);
}
});
}
#[test]
pub fn test_sync_single_db_sync_to_remote_single_failure() {
deterministic_runtime(async || {
let dir = tempfile::TempDir::new().unwrap();
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let ctx = Arc::new(TestContext::new(seed_u64()));
let mut session = ctx.fault_session();
let mut it = 0;
while let Some(strategy) = session.next().await {
it += 1;
let server_path = dir.path().join(format!("server-{it}.db"));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1)", ())
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
let local_path = dir
.path()
.join(format!("local-{it}.db"))
.to_str()
.unwrap()
.to_string();
let opts = DatabaseSyncEngineOpts {
client_name: format!("id-{it}"),
wal_pull_batch_size: 1,
};
runner.init(&local_path, opts).await.unwrap();
let conn = runner.connect().await.unwrap();
conn.execute("INSERT INTO t VALUES (2), (3)", ())
.await
.unwrap();
let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. });
ctx.switch_mode(strategy).await;
let result = runner.push().await;
ctx.switch_mode(FaultInjectionStrategy::Disabled).await;
if !has_fault {
result.unwrap();
} else {
let err = result.err().unwrap();
tracing::info!("error after fault injection: {}", err);
}
let server_db = protocol.server.db();
let server_conn = server_db.connect().unwrap();
let rows =
convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap())
.await
.unwrap();
assert!(rows.len() <= 3);
runner.push().await.unwrap();
let rows =
convert_rows(&mut server_conn.query("SELECT * FROM t", ()).await.unwrap())
.await
.unwrap();
assert_eq!(
rows,
vec![
vec![turso::Value::Integer(1)],
vec![turso::Value::Integer(2)],
vec![turso::Value::Integer(3)],
]
);
}
});
}
}

View File

@@ -12,8 +12,8 @@ use crate::{
protocol_io::{DataCompletion, DataPollResult, ProtocolIO},
server_proto::{self, ExecuteStreamReq, Stmt, StreamRequest},
types::{
Coro, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType, DbSyncInfo,
DbSyncStatus, ProtocolCommand,
Coro, DatabaseTapeOperation, DatabaseTapeRowChangeType, DbSyncInfo, DbSyncStatus,
ProtocolCommand,
},
wal_session::WalSession,
Result,

View File

@@ -5,35 +5,15 @@ pub mod database_tape;
pub mod errors;
pub mod io_operations;
pub mod protocol_io;
pub mod server_proto;
pub mod types;
pub mod wal_session;
pub mod server_proto;
#[cfg(test)]
pub mod test_context;
#[cfg(test)]
pub mod test_protocol_io;
#[cfg(test)]
pub mod test_sync_server;
pub type Result<T> = std::result::Result<T, errors::Error>;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tokio::{select, sync::Mutex};
use tracing_subscriber::EnvFilter;
use turso_core::IO;
use crate::{
database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts},
errors::Error,
test_context::TestContext,
test_protocol_io::TestProtocolIo,
types::{Coro, ProtocolCommand},
Result,
};
#[ctor::ctor]
fn init() {
@@ -43,14 +23,17 @@ mod tests {
.init();
}
#[allow(dead_code)]
pub fn seed_u64() -> u64 {
seed().parse().unwrap_or(0)
}
#[allow(dead_code)]
pub fn seed() -> String {
std::env::var("SEED").unwrap_or("0".to_string())
}
#[allow(dead_code)]
pub fn deterministic_runtime_from_seed<F: std::future::Future<Output = ()>>(
seed: &[u8],
f: impl Fn() -> F,
@@ -65,107 +48,9 @@ mod tests {
runtime.block_on(f());
}
#[allow(dead_code)]
pub fn deterministic_runtime<F: std::future::Future<Output = ()>>(f: impl Fn() -> F) {
let seed = seed();
deterministic_runtime_from_seed(seed.as_bytes(), f);
}
pub struct TestRunner {
pub ctx: Arc<TestContext>,
pub io: Arc<dyn IO>,
pub sync_server: TestProtocolIo,
db: Option<Arc<Mutex<DatabaseSyncEngine<TestProtocolIo>>>>,
}
impl TestRunner {
pub fn new(ctx: Arc<TestContext>, io: Arc<dyn IO>, sync_server: TestProtocolIo) -> Self {
Self {
ctx,
io,
sync_server,
db: None,
}
}
pub async fn init(&mut self, local_path: &str, opts: DatabaseSyncEngineOpts) -> Result<()> {
let io = self.io.clone();
let server = self.sync_server.clone();
let db = self
.run(genawaiter::sync::Gen::new(|coro| async move {
DatabaseSyncEngine::new(&coro, io, Arc::new(server), local_path, opts).await
}))
.await
.unwrap();
self.db = Some(Arc::new(Mutex::new(db)));
Ok(())
}
pub async fn connect(&self) -> Result<turso::Connection> {
self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| {
Ok(turso::Connection::create(db.connect(coro).await?))
})
.await
}
pub async fn pull(&self) -> Result<()> {
self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| {
db.pull(coro).await
})
.await
}
pub async fn push(&self) -> Result<()> {
self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| {
db.push(coro).await
})
.await
}
pub async fn sync(&self) -> Result<()> {
self.run_db_fn(self.db.as_ref().unwrap(), async move |coro, db| {
db.sync(coro).await
})
.await
}
pub async fn run_db_fn<T>(
&self,
db: &Arc<Mutex<DatabaseSyncEngine<TestProtocolIo>>>,
f: impl AsyncFn(&Coro, &mut DatabaseSyncEngine<TestProtocolIo>) -> Result<T>,
) -> Result<T> {
let g = genawaiter::sync::Gen::new({
let db = db.clone();
|coro| async move {
let mut db = db.lock().await;
f(&coro, &mut db).await
}
});
self.run(g).await
}
pub async fn run<T, F: std::future::Future<Output = Result<T>>>(
&self,
mut g: genawaiter::sync::Gen<ProtocolCommand, Result<()>, F>,
) -> Result<T> {
let mut response = Ok(());
loop {
// we must drive internal tokio clocks on every iteration - otherwise one TestRunner without work can block everything
// if other TestRunner sleeping - as time will "freeze" in this case
self.ctx.random_sleep().await;
match g.resume_with(response) {
genawaiter::GeneratorState::Complete(result) => return result,
genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => {
let drained = {
let mut requests = self.sync_server.requests.lock().unwrap();
requests.drain(..).collect::<Vec<_>>()
};
for mut request in drained {
select! {
value = &mut request => { value.unwrap(); },
_ = self.ctx.random_sleep() => { self.sync_server.requests.lock().unwrap().push(request); }
};
}
response =
self.io.run_once().map(|_| ()).map_err(|e| {
Error::DatabaseSyncEngineError(format!("io error: {e}"))
});
}
}
}
}
}
}

View File

@@ -1,154 +0,0 @@
use std::{
collections::{HashMap, HashSet},
future::Future,
pin::Pin,
sync::Arc,
};
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use tokio::sync::Mutex;
use crate::{errors::Error, Result};
type PinnedFuture = Pin<Box<dyn Future<Output = bool> + Send>>;
pub struct FaultInjectionPlan {
pub is_fault: Box<dyn Fn(String, String) -> PinnedFuture + Send + Sync>,
}
pub enum FaultInjectionStrategy {
Disabled,
Record,
Enabled { plan: FaultInjectionPlan },
}
impl std::fmt::Debug for FaultInjectionStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disabled => write!(f, "Disabled"),
Self::Record => write!(f, "Record"),
Self::Enabled { .. } => write!(f, "Enabled"),
}
}
}
pub struct TestContext {
fault_injection: Mutex<FaultInjectionStrategy>,
faulty_call: Mutex<HashSet<(String, String)>>,
rng: Mutex<ChaCha8Rng>,
}
pub struct FaultSession {
ctx: Arc<TestContext>,
recording: bool,
plans: Option<Vec<FaultInjectionPlan>>,
}
impl FaultSession {
pub async fn next(&mut self) -> Option<FaultInjectionStrategy> {
if !self.recording {
self.recording = true;
return Some(FaultInjectionStrategy::Record);
}
if self.plans.is_none() {
self.plans = Some(self.ctx.enumerate_simple_plans().await);
}
let plans = self.plans.as_mut().unwrap();
if plans.is_empty() {
return None;
}
let plan = plans.pop().unwrap();
Some(FaultInjectionStrategy::Enabled { plan })
}
}
impl TestContext {
pub fn new(seed: u64) -> Self {
Self {
rng: Mutex::new(ChaCha8Rng::seed_from_u64(seed)),
fault_injection: Mutex::new(FaultInjectionStrategy::Disabled),
faulty_call: Mutex::new(HashSet::new()),
}
}
pub async fn random_sleep(&self) {
let delay = self.rng.lock().await.next_u64() % 1000;
tokio::time::sleep(std::time::Duration::from_millis(delay)).await
}
pub async fn random_sleep_n(&self, n: u64) {
let delay = {
let mut rng = self.rng.lock().await;
rng.next_u64() % 1000 * (rng.next_u64() % n + 1)
};
tokio::time::sleep(std::time::Duration::from_millis(delay)).await
}
pub async fn rng(&self) -> tokio::sync::MutexGuard<ChaCha8Rng> {
self.rng.lock().await
}
pub fn fault_session(self: &Arc<Self>) -> FaultSession {
FaultSession {
ctx: self.clone(),
recording: false,
plans: None,
}
}
pub async fn switch_mode(&self, updated: FaultInjectionStrategy) {
let mut mode = self.fault_injection.lock().await;
tracing::info!("switch fault injection mode: {:?}", updated);
*mode = updated;
}
pub async fn enumerate_simple_plans(&self) -> Vec<FaultInjectionPlan> {
let mut plans = vec![];
for call in self.faulty_call.lock().await.iter() {
let mut fault_counts = HashMap::new();
fault_counts.insert(call.clone(), 1);
let count = Arc::new(Mutex::new(1));
let call = call.clone();
plans.push(FaultInjectionPlan {
is_fault: Box::new(move |name, bt| {
let call = call.clone();
let count = count.clone();
Box::pin(async move {
if (name, bt) != call {
return false;
}
let mut count = count.lock().await;
*count -= 1;
*count >= 0
})
}),
})
}
plans
}
pub async fn faulty_call(&self, name: &str) -> Result<()> {
tracing::trace!("faulty_call: {}", name);
// sleep here in order for scheduler to interleave different executions
self.random_sleep().await;
if let FaultInjectionStrategy::Disabled = &*self.fault_injection.lock().await {
return Ok(());
}
let bt = std::backtrace::Backtrace::force_capture().to_string();
match &mut *self.fault_injection.lock().await {
FaultInjectionStrategy::Record => {
let mut call_sites = self.faulty_call.lock().await;
call_sites.insert((name.to_string(), bt));
Ok(())
}
FaultInjectionStrategy::Enabled { plan } => {
if plan.is_fault.as_ref()(name.to_string(), bt.clone()).await {
Err(Error::DatabaseSyncEngineError("injected fault".to_string()))
} else {
Ok(())
}
}
_ => unreachable!("Disabled case handled above"),
}
}
}

View File

@@ -1,222 +0,0 @@
use std::{
collections::{HashMap, VecDeque},
path::Path,
pin::Pin,
sync::Arc,
};
use tokio::{sync::Mutex, task::JoinHandle};
use crate::{
errors::Error,
protocol_io::{DataCompletion, DataPollResult, ProtocolIO},
test_context::TestContext,
test_sync_server::TestSyncServer,
Result,
};
#[derive(Clone)]
pub struct TestProtocolIo {
#[allow(clippy::type_complexity)]
pub requests: Arc<std::sync::Mutex<Vec<Pin<Box<JoinHandle<()>>>>>>,
pub server: TestSyncServer,
ctx: Arc<TestContext>,
files: Arc<Mutex<HashMap<String, Vec<u8>>>>,
}
pub struct TestDataPollResult(Vec<u8>);
impl DataPollResult for TestDataPollResult {
fn data(&self) -> &[u8] {
&self.0
}
}
#[derive(Clone)]
pub struct TestDataCompletion {
status: Arc<std::sync::Mutex<Option<u16>>>,
chunks: Arc<std::sync::Mutex<VecDeque<Vec<u8>>>>,
done: Arc<std::sync::Mutex<bool>>,
poisoned: Arc<std::sync::Mutex<Option<String>>>,
}
impl Default for TestDataCompletion {
fn default() -> Self {
Self::new()
}
}
impl TestDataCompletion {
pub fn new() -> Self {
Self {
status: Arc::new(std::sync::Mutex::new(None)),
chunks: Arc::new(std::sync::Mutex::new(VecDeque::new())),
done: Arc::new(std::sync::Mutex::new(false)),
poisoned: Arc::new(std::sync::Mutex::new(None)),
}
}
pub fn set_status(&self, status: u16) {
*self.status.lock().unwrap() = Some(status);
}
pub fn push_data(&self, data: Vec<u8>) {
let mut chunks = self.chunks.lock().unwrap();
chunks.push_back(data);
}
pub fn set_done(&self) {
*self.done.lock().unwrap() = true;
}
pub fn poison(&self, err: &str) {
*self.poisoned.lock().unwrap() = Some(err.to_string());
}
}
impl DataCompletion for TestDataCompletion {
type DataPollResult = TestDataPollResult;
fn status(&self) -> Result<Option<u16>> {
let poison = self.poisoned.lock().unwrap();
if poison.is_some() {
return Err(Error::DatabaseSyncEngineError(format!(
"status error: {poison:?}"
)));
}
Ok(*self.status.lock().unwrap())
}
fn poll_data(&self) -> Result<Option<Self::DataPollResult>> {
let poison = self.poisoned.lock().unwrap();
if poison.is_some() {
return Err(Error::DatabaseSyncEngineError(format!(
"poll_data error: {poison:?}"
)));
}
let mut chunks = self.chunks.lock().unwrap();
Ok(chunks.pop_front().map(TestDataPollResult))
}
fn is_done(&self) -> Result<bool> {
let poison = self.poisoned.lock().unwrap();
if poison.is_some() {
return Err(Error::DatabaseSyncEngineError(format!(
"is_done error: {poison:?}"
)));
}
Ok(*self.done.lock().unwrap())
}
}
impl TestProtocolIo {
pub async fn new(ctx: Arc<TestContext>, path: &Path) -> Result<Self> {
Ok(Self {
ctx: ctx.clone(),
requests: Arc::new(std::sync::Mutex::new(Vec::new())),
server: TestSyncServer::new(ctx, path).await?,
files: Arc::new(Mutex::new(HashMap::new())),
})
}
fn schedule<
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
F: FnOnce(TestSyncServer, TestDataCompletion) -> Fut + Send + 'static,
>(
&self,
completion: TestDataCompletion,
f: F,
) {
let server = self.server.clone();
let mut requests = self.requests.lock().unwrap();
requests.push(Box::pin(tokio::spawn(async move {
if let Err(err) = f(server, completion.clone()).await {
tracing::info!("poison completion: {}", err);
completion.poison(&err.to_string());
}
})));
}
}
impl ProtocolIO for TestProtocolIo {
type DataCompletion = TestDataCompletion;
fn http(&self, method: &str, path: &str, data: Option<Vec<u8>>) -> Result<TestDataCompletion> {
let completion = TestDataCompletion::new();
{
let completion = completion.clone();
let path = &path[1..].split("/").collect::<Vec<_>>();
match (method, path.as_slice()) {
("GET", ["info"]) => {
self.schedule(completion, |s, c| async move { s.db_info(c).await });
}
("GET", ["export", generation]) => {
let generation = generation.parse().unwrap();
self.schedule(completion, async move |s, c| {
s.db_export(c, generation).await
});
}
("GET", ["sync", generation, start, end]) => {
let generation = generation.parse().unwrap();
let start = start.parse().unwrap();
let end = end.parse().unwrap();
self.schedule(completion, async move |s, c| {
s.wal_pull(c, generation, start, end).await
});
}
("POST", ["sync", generation, start, end]) => {
let generation = generation.parse().unwrap();
let start = start.parse().unwrap();
let end = end.parse().unwrap();
let data = data.unwrap();
self.schedule(completion, async move |s, c| {
s.wal_push(c, None, generation, start, end, data).await
});
}
("POST", ["sync", generation, start, end, baton]) => {
let baton = baton.to_string();
let generation = generation.parse().unwrap();
let start = start.parse().unwrap();
let end = end.parse().unwrap();
let data = data.unwrap();
self.schedule(completion, async move |s, c| {
s.wal_push(c, Some(baton), generation, start, end, data)
.await
});
}
_ => panic!("unexpected sync server request: {method} {path:?}"),
};
}
Ok(completion)
}
fn full_read(&self, path: &str) -> Result<Self::DataCompletion> {
let completion = TestDataCompletion::new();
let ctx = self.ctx.clone();
let files = self.files.clone();
let path = path.to_string();
self.schedule(completion.clone(), async move |_, c| {
ctx.faulty_call("full_read_start").await?;
let files = files.lock().await;
let result = files.get(&path);
c.push_data(result.cloned().unwrap_or(Vec::new()));
ctx.faulty_call("full_read_end").await?;
c.set_done();
Ok(())
});
Ok(completion)
}
fn full_write(&self, path: &str, content: Vec<u8>) -> Result<Self::DataCompletion> {
let completion = TestDataCompletion::new();
let ctx = self.ctx.clone();
let files = self.files.clone();
let path = path.to_string();
self.schedule(completion.clone(), async move |_, c| {
ctx.faulty_call("full_write_start").await?;
let mut files = files.lock().await;
files.insert(path, content);
ctx.faulty_call("full_write_end").await?;
c.set_done();
Ok(())
});
Ok(completion)
}
}

View File

@@ -1,351 +0,0 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::Mutex;
use crate::{
errors::Error,
test_context::TestContext,
test_protocol_io::TestDataCompletion,
types::{DbSyncInfo, DbSyncStatus},
Result,
};
const PAGE_SIZE: usize = 4096;
const FRAME_SIZE: usize = 24 + PAGE_SIZE;
struct Generation {
snapshot: Vec<u8>,
frames: Vec<Vec<u8>>,
}
#[derive(Clone)]
struct SyncSession {
baton: String,
conn: turso::Connection,
in_txn: bool,
}
struct TestSyncServerState {
generation: u64,
generations: HashMap<u64, Generation>,
sessions: HashMap<String, SyncSession>,
}
#[derive(Clone)]
pub struct TestSyncServer {
path: PathBuf,
ctx: Arc<TestContext>,
db: turso::Database,
state: Arc<Mutex<TestSyncServerState>>,
}
impl TestSyncServer {
pub async fn new(ctx: Arc<TestContext>, path: &Path) -> Result<Self> {
let mut generations = HashMap::new();
generations.insert(
1,
Generation {
snapshot: EMPTY_WAL_MODE_DB.to_vec(),
frames: Vec::new(),
},
);
Ok(Self {
path: path.to_path_buf(),
ctx,
db: turso::Builder::new_local(path.to_str().unwrap())
.build()
.await?,
state: Arc::new(Mutex::new(TestSyncServerState {
generation: 1,
generations,
sessions: HashMap::new(),
})),
})
}
pub async fn db_info(&self, completion: TestDataCompletion) -> Result<()> {
tracing::debug!("db_info");
self.ctx.faulty_call("db_info_start").await?;
let state = self.state.lock().await;
let result = DbSyncInfo {
current_generation: state.generation,
};
completion.set_status(200);
self.ctx.faulty_call("db_info_status").await?;
completion.push_data(serde_json::to_vec(&result)?);
self.ctx.faulty_call("db_info_data").await?;
completion.set_done();
Ok(())
}
pub async fn db_export(
&self,
completion: TestDataCompletion,
generation_id: u64,
) -> Result<()> {
tracing::debug!("db_export: {}", generation_id);
self.ctx.faulty_call("db_export_start").await?;
let state = self.state.lock().await;
let Some(generation) = state.generations.get(&generation_id) else {
return Err(Error::DatabaseSyncEngineError(
"generation not found".to_string(),
));
};
completion.set_status(200);
self.ctx.faulty_call("db_export_status").await?;
completion.push_data(generation.snapshot.clone());
self.ctx.faulty_call("db_export_push").await?;
completion.set_done();
Ok(())
}
pub async fn wal_pull(
&self,
completion: TestDataCompletion,
generation_id: u64,
start_frame: u64,
end_frame: u64,
) -> Result<()> {
tracing::debug!("wal_pull: {}/{}/{}", generation_id, start_frame, end_frame);
self.ctx.faulty_call("wal_pull_start").await?;
let state = self.state.lock().await;
let Some(generation) = state.generations.get(&generation_id) else {
return Err(Error::DatabaseSyncEngineError(
"generation not found".to_string(),
));
};
let mut data = Vec::new();
for frame_no in start_frame..end_frame {
let frame_idx = frame_no - 1;
let Some(frame) = generation.frames.get(frame_idx as usize) else {
break;
};
data.extend_from_slice(frame);
}
if data.is_empty() {
let last_generation = state.generations.get(&state.generation).unwrap();
let status = DbSyncStatus {
baton: None,
status: "checkpoint_needed".to_string(),
generation: state.generation,
max_frame_no: last_generation.frames.len() as u64,
};
completion.set_status(400);
self.ctx.faulty_call("wal_pull_400_status").await?;
completion.push_data(serde_json::to_vec(&status)?);
self.ctx.faulty_call("wal_pull_400_push").await?;
completion.set_done();
} else {
completion.set_status(200);
self.ctx.faulty_call("wal_pull_200_status").await?;
completion.push_data(data);
self.ctx.faulty_call("wal_pull_200_push").await?;
completion.set_done();
};
Ok(())
}
pub async fn wal_push(
&self,
completion: TestDataCompletion,
mut baton: Option<String>,
generation_id: u64,
start_frame: u64,
end_frame: u64,
frames: Vec<u8>,
) -> Result<()> {
tracing::debug!(
"wal_push: {}/{}/{}/{:?}",
generation_id,
start_frame,
end_frame,
baton
);
self.ctx.faulty_call("wal_push_start").await?;
let mut session = {
let mut state = self.state.lock().await;
if state.generation != generation_id {
let generation = state.generations.get(&state.generation).unwrap();
let max_frame_no = generation.frames.len();
let status = DbSyncStatus {
baton: None,
status: "checkpoint_needed".to_string(),
generation: state.generation,
max_frame_no: max_frame_no as u64,
};
let status = serde_json::to_vec(&status)?;
completion.set_status(200);
self.ctx.faulty_call("wal_push_status").await?;
completion.push_data(status);
self.ctx.faulty_call("wal_push_push").await?;
completion.set_done();
return Ok(());
}
let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let session = match state.sessions.get(&baton_str) {
Some(session) => session.clone(),
None => {
let session = SyncSession {
baton: baton_str.clone(),
conn: self.db.connect()?,
in_txn: false,
};
state.sessions.insert(baton_str.clone(), session.clone());
session
}
};
baton = Some(baton_str.clone());
session
};
let conflict = 'conflict: {
let mut offset = 0;
for frame_no in start_frame..end_frame {
if offset + FRAME_SIZE > frames.len() {
return Err(Error::DatabaseSyncEngineError(
"unexpected length of frames data".to_string(),
));
}
if !session.in_txn {
session.conn.wal_insert_begin()?;
session.in_txn = true;
}
let frame = &frames[offset..offset + FRAME_SIZE];
match session.conn.wal_insert_frame(frame_no, frame) {
Ok(info) => {
if info.is_commit_frame() {
if session.in_txn {
session.conn.wal_insert_end()?;
session.in_txn = false;
}
self.sync_frames_from_conn(&session.conn).await?;
}
}
Err(turso::Error::WalOperationError(err)) if err.contains("Conflict") => {
session.conn.wal_insert_end()?;
break 'conflict true;
}
Err(err) => {
session.conn.wal_insert_end()?;
return Err(err.into());
}
}
offset += FRAME_SIZE;
}
false
};
let mut state = self.state.lock().await;
state
.sessions
.insert(baton.clone().unwrap(), session.clone());
let status = DbSyncStatus {
baton: Some(session.baton.clone()),
status: if conflict { "conflict" } else { "ok" }.into(),
generation: state.generation,
max_frame_no: session.conn.wal_frame_count()?,
};
let status = serde_json::to_vec(&status)?;
completion.set_status(200);
self.ctx.faulty_call("wal_push_status").await?;
completion.push_data(status);
self.ctx.faulty_call("wal_push_push").await?;
completion.set_done();
Ok(())
}
pub fn db(&self) -> turso::Database {
self.db.clone()
}
pub async fn checkpoint(&self) -> Result<()> {
tracing::debug!("checkpoint sync-server db");
let conn = self.db.connect()?;
let mut rows = conn.query("PRAGMA wal_checkpoint(TRUNCATE)", ()).await?;
let Some(_) = rows.next().await? else {
return Err(Error::DatabaseSyncEngineError(
"checkpoint must return single row".to_string(),
));
};
let mut state = self.state.lock().await;
let generation = state.generation + 1;
state.generation = generation;
state.generations.insert(
generation,
Generation {
snapshot: std::fs::read(&self.path).map_err(|e| {
Error::DatabaseSyncEngineError(format!(
"failed to create generation snapshot: {e}"
))
})?,
frames: Vec::new(),
},
);
Ok(())
}
pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> {
let conn = self.db.connect()?;
conn.execute(sql, params).await?;
tracing::debug!("sync_frames_from_conn after execute");
self.sync_frames_from_conn(&conn).await?;
Ok(())
}
async fn sync_frames_from_conn(&self, conn: &turso::Connection) -> Result<()> {
let mut state = self.state.lock().await;
let generation = state.generation;
let generation = state.generations.get_mut(&generation).unwrap();
let last_frame = generation.frames.len() + 1;
let mut frame = [0u8; FRAME_SIZE];
let wal_frame_count = conn.wal_frame_count()?;
tracing::debug!("conn frames count: {}", wal_frame_count);
for frame_no in last_frame..=wal_frame_count as usize {
let frame_info = conn.wal_get_frame(frame_no as u64, &mut frame)?;
tracing::debug!("push local frame {}, info={:?}", frame_no, frame_info);
generation.frames.push(frame.to_vec());
}
Ok(())
}
}
// empty DB with single 4096-byte page and WAL mode (PRAGMA journal_mode=WAL)
// see test test_empty_wal_mode_db_content which validates asset content
pub const EMPTY_WAL_MODE_DB: &[u8] = include_bytes!("test_empty.db");
pub async fn convert_rows(rows: &mut turso::Rows) -> Result<Vec<Vec<turso::Value>>> {
let mut rows_values = vec![];
while let Some(row) = rows.next().await? {
let mut row_values = vec![];
for i in 0..row.column_count() {
row_values.push(row.get_value(i)?);
}
rows_values.push(row_values);
}
Ok(rows_values)
}