mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-07 10:14:21 +01:00
open draft database only once
- otherwise there will be problems with memory IO backend
This commit is contained in:
@@ -5,6 +5,7 @@ use crate::{
|
||||
checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes,
|
||||
transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult,
|
||||
},
|
||||
database_tape::DatabaseTape,
|
||||
errors::Error,
|
||||
io_operations::IoOperations,
|
||||
protocol_io::ProtocolIO,
|
||||
@@ -22,6 +23,7 @@ pub struct DatabaseSyncEngineOpts {
|
||||
pub struct DatabaseSyncEngine<P: ProtocolIO> {
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
protocol: Arc<P>,
|
||||
draft_tape: DatabaseTape,
|
||||
draft_path: String,
|
||||
synced_path: String,
|
||||
meta_path: String,
|
||||
@@ -74,10 +76,13 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
path: &str,
|
||||
opts: DatabaseSyncEngineOpts,
|
||||
) -> Result<Self> {
|
||||
let draft_path = format!("{path}-draft");
|
||||
let draft_tape = io.open_tape(&draft_path, true)?;
|
||||
let mut db = Self {
|
||||
io,
|
||||
protocol,
|
||||
draft_path: format!("{path}-draft"),
|
||||
draft_tape,
|
||||
draft_path,
|
||||
synced_path: format!("{path}-synced"),
|
||||
meta_path: format!("{path}-info"),
|
||||
opts,
|
||||
@@ -90,8 +95,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
/// Create database connection and appropriately configure it before use
|
||||
pub async fn connect(&self, coro: &Coro) -> Result<Arc<turso_core::Connection>> {
|
||||
let db = self.io.open_tape(&self.draft_path, true)?;
|
||||
db.connect(coro).await
|
||||
self.draft_tape.connect(coro).await
|
||||
}
|
||||
|
||||
/// Sync all new changes from remote DB and apply them locally
|
||||
@@ -114,13 +118,11 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
{
|
||||
// we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote
|
||||
// so, we pass 'capture: true' as we need to preserve all changes for future push of WAL
|
||||
let draft = self.io.open_tape(&self.draft_path, true)?;
|
||||
tracing::info!("opened draft");
|
||||
let synced = self.io.open_tape(&self.synced_path, true)?;
|
||||
tracing::info!("opened synced");
|
||||
|
||||
// we will start wal write session for Draft DB in order to hold write lock during transfer of changes
|
||||
let mut draft_session = WalSession::new(draft.connect(coro).await?);
|
||||
let mut draft_session = WalSession::new(self.draft_tape.connect(coro).await?);
|
||||
draft_session.begin()?;
|
||||
|
||||
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
|
||||
@@ -128,7 +130,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
// transfer logical changes to the Synced DB in order to later execute physical "rebase" operation
|
||||
let client_id = &self.meta().client_unique_id;
|
||||
transfer_logical_changes(coro, &draft, &synced, client_id, true).await?;
|
||||
transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, true).await?;
|
||||
|
||||
// now we are ready to do the rebase: let's transfer physical changes from Synced to Draft
|
||||
let synced_wal_watermark = self.meta().synced_wal_match_watermark;
|
||||
@@ -209,14 +211,13 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
// we will push Synced WAL to the remote
|
||||
// so, we pass 'capture: false' as we don't need to preserve changes made to Synced WAL in turso_cdc
|
||||
let draft = self.io.open_tape(&self.draft_path, true)?;
|
||||
let synced = self.io.open_tape(&self.synced_path, false)?;
|
||||
|
||||
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
|
||||
self.synced_is_dirty = true;
|
||||
|
||||
let client_id = &self.meta().client_unique_id;
|
||||
transfer_logical_changes(coro, &draft, &synced, client_id, false).await?;
|
||||
transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?;
|
||||
|
||||
self.push_synced_to_remote(coro).await?;
|
||||
Ok(())
|
||||
@@ -450,6 +451,7 @@ pub mod tests {
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use rand::RngCore;
|
||||
use tokio::join;
|
||||
|
||||
use crate::{
|
||||
database_sync_engine::DatabaseSyncEngineOpts,
|
||||
@@ -476,12 +478,12 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -588,6 +590,60 @@ pub mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_update_sync_concurrent() {
|
||||
deterministic_runtime(async || {
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::MemoryIO::new());
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let server_path = dir.path().join("server.db");
|
||||
let ctx = Arc::new(TestContext::new(seed_u64()));
|
||||
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
|
||||
protocol
|
||||
.server
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
protocol
|
||||
.server
|
||||
.execute("INSERT INTO t VALUES ('hello', 'world')", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
runner.init(":memory:", opts).await.unwrap();
|
||||
let conn = runner.connect().await.unwrap();
|
||||
|
||||
let syncs = async move {
|
||||
for i in 0..10 {
|
||||
tracing::info!("sync attempt #{i}");
|
||||
runner.sync().await.unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
let updates = async move {
|
||||
for i in 0..10 {
|
||||
tracing::info!("update attempt #{i}");
|
||||
let sql = format!("INSERT INTO t VALUES ('key-{i}', 'value-{i}')");
|
||||
match conn.execute(&sql, ()).await {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.to_string().contains("database is locked") => {}
|
||||
Err(err) => panic!("update failed: {err}"),
|
||||
}
|
||||
ctx.random_sleep_n(50).await;
|
||||
}
|
||||
};
|
||||
|
||||
join!(updates, syncs);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_many_pulls_big_payloads() {
|
||||
deterministic_runtime(async || {
|
||||
@@ -599,13 +655,13 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -659,12 +715,12 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -723,12 +779,12 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
server
|
||||
.server
|
||||
@@ -794,12 +850,17 @@ pub mod tests {
|
||||
const CLIENTS: usize = 8;
|
||||
for i in 0..CLIENTS {
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{i}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{i}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{i}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
dbs.push(runner);
|
||||
}
|
||||
|
||||
@@ -877,12 +938,16 @@ pub mod tests {
|
||||
let sync_lock = sync_lock.clone();
|
||||
async move {
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone());
|
||||
let local_path = dir.join(format!("local-{i}.db"));
|
||||
let local_path = dir
|
||||
.join(format!("local-{i}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{i}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
runner.pull().await.unwrap();
|
||||
let conn = runner.connect().await.unwrap();
|
||||
for query in queries {
|
||||
@@ -933,12 +998,17 @@ pub mod tests {
|
||||
it += 1;
|
||||
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{it}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{it}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. });
|
||||
|
||||
@@ -1003,12 +1073,17 @@ pub mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{it}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{it}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -1078,12 +1153,17 @@ pub mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{it}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{it}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
let conn = runner.connect().await.unwrap();
|
||||
|
||||
|
||||
@@ -84,23 +84,12 @@ mod tests {
|
||||
db: None,
|
||||
}
|
||||
}
|
||||
pub async fn init(
|
||||
&mut self,
|
||||
local_path: PathBuf,
|
||||
opts: DatabaseSyncEngineOpts,
|
||||
) -> Result<()> {
|
||||
pub async fn init(&mut self, local_path: &str, opts: DatabaseSyncEngineOpts) -> Result<()> {
|
||||
let io = self.io.clone();
|
||||
let server = self.sync_server.clone();
|
||||
let db = self
|
||||
.run(genawaiter::sync::Gen::new(|coro| async move {
|
||||
DatabaseSyncEngine::new(
|
||||
&coro,
|
||||
io,
|
||||
Arc::new(server),
|
||||
local_path.to_str().unwrap(),
|
||||
opts,
|
||||
)
|
||||
.await
|
||||
DatabaseSyncEngine::new(&coro, io, Arc::new(server), local_path, opts).await
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -77,6 +77,13 @@ impl TestContext {
|
||||
let delay = self.rng.lock().await.next_u64() % 1000;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await
|
||||
}
|
||||
pub async fn random_sleep_n(&self, n: u64) {
|
||||
let delay = {
|
||||
let mut rng = self.rng.lock().await;
|
||||
rng.next_u64() % 1000 * (rng.next_u64() % n + 1)
|
||||
};
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await
|
||||
}
|
||||
|
||||
pub async fn rng(&self) -> tokio::sync::MutexGuard<ChaCha8Rng> {
|
||||
self.rng.lock().await
|
||||
|
||||
Reference in New Issue
Block a user