mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-21 17:14:19 +01:00
fix bug and add test with concurrent dbs in sync
This commit is contained in:
@@ -456,6 +456,7 @@ pub mod tests {
|
||||
|
||||
use crate::{
|
||||
database_sync_engine::DatabaseSyncEngineOpts,
|
||||
errors::Error,
|
||||
test_context::{FaultInjectionStrategy, TestContext},
|
||||
test_protocol_io::TestProtocolIo,
|
||||
test_sync_server::convert_rows,
|
||||
@@ -645,6 +646,107 @@ pub mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_many_dbs_update_sync_concurrent() {
|
||||
deterministic_runtime(async || {
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::MemoryIO::new());
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let server_path = dir.path().join("server.db");
|
||||
let ctx = Arc::new(TestContext::new(seed_u64()));
|
||||
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
|
||||
.await
|
||||
.unwrap();
|
||||
protocol
|
||||
.server
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
protocol
|
||||
.server
|
||||
.execute(
|
||||
"INSERT INTO t VALUES ('id-1', 'client1'), ('id-2', 'client2')",
|
||||
(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner1 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
runner1
|
||||
.init(
|
||||
":memory:-1",
|
||||
DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 2,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner2 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
runner2
|
||||
.init(
|
||||
":memory:-2",
|
||||
DatabaseSyncEngineOpts {
|
||||
client_name: "id-2".to_string(),
|
||||
wal_pull_batch_size: 2,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let conn1 = runner1.connect().await.unwrap();
|
||||
let conn2 = runner2.connect().await.unwrap();
|
||||
|
||||
let syncs1 = async move {
|
||||
for i in 0..10 {
|
||||
tracing::info!("sync attempt #{i}");
|
||||
match runner1.sync().await {
|
||||
Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue,
|
||||
Err(err) => panic!("unexpected error: {err}"),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let syncs2 = async move {
|
||||
for i in 0..10 {
|
||||
tracing::info!("sync attempt #{i}");
|
||||
match runner2.sync().await {
|
||||
Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue,
|
||||
Err(err) => panic!("unexpected error: {err}"),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let ctx1 = ctx.clone();
|
||||
let updates1 = async move {
|
||||
for i in 0..100 {
|
||||
tracing::info!("update attempt #{i}");
|
||||
let sql = format!("INSERT INTO t VALUES ('key-1-{i}', 'value')");
|
||||
match conn1.execute(&sql, ()).await {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.to_string().contains("database is locked") => {}
|
||||
Err(err) => panic!("update failed: {err}"),
|
||||
}
|
||||
ctx1.random_sleep_n(10).await;
|
||||
}
|
||||
};
|
||||
|
||||
let ctx2 = ctx.clone();
|
||||
let updates2 = async move {
|
||||
for i in 0..100 {
|
||||
tracing::info!("update attempt #{i}");
|
||||
let sql = format!("INSERT INTO t VALUES ('key-2-{i}', 'value')");
|
||||
match conn2.execute(&sql, ()).await {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.to_string().contains("database is locked") => {}
|
||||
Err(err) => panic!("update failed: {err}"),
|
||||
}
|
||||
ctx2.random_sleep_n(10).await;
|
||||
}
|
||||
};
|
||||
|
||||
join!(updates1, updates2, syncs1, syncs2);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_many_pulls_big_payloads() {
|
||||
deterministic_runtime(async || {
|
||||
|
||||
@@ -179,6 +179,10 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result<
|
||||
}
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
if start_frame < end_frame {
|
||||
// chunk which was sent from the server has ended early - so there is nothing left on server-side for pull
|
||||
return Ok(WalPullResult::Done);
|
||||
}
|
||||
if !buffer.is_empty() {
|
||||
return Err(Error::DatabaseSyncEngineError(format!(
|
||||
"wal_pull: response has unexpected trailing data: buffer.len()={}",
|
||||
|
||||
Reference in New Issue
Block a user