mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-03 08:24:19 +01:00
Merge 'disable checkpoint: adjust semantic' from Nikita Sivukhin
This PR change semantic of `wal_disable_checkpoint` flag and make it disable only "automatic" checkpoint behaviour which at the moment includes: 1. Checkpoint on shutdown 2. Checkpoint when WAL reach certain size sync-engine needs checkpoint to be in full control but it also will issue `TRUNCATE` checkpoint at some certain moments of time - so complete disablement of checkpoints will not suffice. Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Reviewed-by: Preston Thorpe <preston@turso.tech> Closes #2580
This commit is contained in:
@@ -57,6 +57,7 @@ impl IO for MemoryIO {
|
||||
files.insert(
|
||||
path.to_string(),
|
||||
Arc::new(MemoryFile {
|
||||
path: path.to_string(),
|
||||
pages: BTreeMap::new().into(),
|
||||
size: 0.into(),
|
||||
}),
|
||||
@@ -89,6 +90,7 @@ impl IO for MemoryIO {
|
||||
}
|
||||
|
||||
pub struct MemoryFile {
|
||||
path: String,
|
||||
pages: UnsafeCell<BTreeMap<usize, MemPage>>,
|
||||
size: Cell<usize>,
|
||||
}
|
||||
@@ -104,6 +106,7 @@ impl File for MemoryFile {
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!("pread(path={}): pos={}", self.path, pos);
|
||||
let r = c.as_read();
|
||||
let buf_len = r.buf().len();
|
||||
if buf_len == 0 {
|
||||
@@ -145,6 +148,12 @@ impl File for MemoryFile {
|
||||
}
|
||||
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!(
|
||||
"pwrite(path={}): pos={}, size={}",
|
||||
self.path,
|
||||
pos,
|
||||
buffer.len()
|
||||
);
|
||||
let buf_len = buffer.len();
|
||||
if buf_len == 0 {
|
||||
c.complete(0);
|
||||
@@ -180,12 +189,14 @@ impl File for MemoryFile {
|
||||
}
|
||||
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!("sync(path={})", self.path);
|
||||
// no-op
|
||||
c.complete(0);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!("truncate(path={}): len={}", self.path, len);
|
||||
if len < self.size.get() {
|
||||
// Truncate pages
|
||||
unsafe {
|
||||
@@ -199,6 +210,12 @@ impl File for MemoryFile {
|
||||
}
|
||||
|
||||
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!(
|
||||
"pwritev(path={}): pos={}, buffers={:?}",
|
||||
self.path,
|
||||
pos,
|
||||
buffers.iter().map(|x| x.len()).collect::<Vec<_>>()
|
||||
);
|
||||
let mut offset = pos;
|
||||
let mut total_written = 0;
|
||||
|
||||
@@ -236,6 +253,7 @@ impl File for MemoryFile {
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
tracing::debug!("size(path={}): {}", self.path, self.size.get());
|
||||
Ok(self.size.get() as u64)
|
||||
}
|
||||
}
|
||||
|
||||
21
core/lib.rs
21
core/lib.rs
@@ -411,7 +411,7 @@ impl Database {
|
||||
_shared_cache: false,
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
page_size: Cell::new(page_size),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
wal_auto_checkpoint_disabled: Cell::new(false),
|
||||
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
|
||||
closed: Cell::new(false),
|
||||
attached_databases: RefCell::new(DatabaseCatalog::new()),
|
||||
@@ -780,7 +780,9 @@ pub struct Connection {
|
||||
/// page size used for an uninitialized database or the next vacuum command.
|
||||
/// it's not always equal to the current page size of the database
|
||||
page_size: Cell<u32>,
|
||||
wal_checkpoint_disabled: Cell<bool>,
|
||||
/// Disable automatic checkpoint behaviour when DB is shutted down or WAL reach certain size
|
||||
/// Client still can manually execute PRAGMA wal_checkpoint(...) commands
|
||||
wal_auto_checkpoint_disabled: Cell<bool>,
|
||||
capture_data_changes: RefCell<CaptureDataChangesMode>,
|
||||
closed: Cell<bool>,
|
||||
/// Attached databases
|
||||
@@ -1385,9 +1387,7 @@ impl Connection {
|
||||
if self.closed.get() {
|
||||
return Err(LimboError::InternalError("Connection closed".to_string()));
|
||||
}
|
||||
self.pager
|
||||
.borrow()
|
||||
.wal_checkpoint(self.wal_checkpoint_disabled.get(), mode)
|
||||
self.pager.borrow().wal_checkpoint(mode)
|
||||
}
|
||||
|
||||
/// Close a connection and checkpoint.
|
||||
@@ -1407,7 +1407,7 @@ impl Connection {
|
||||
pager.end_tx(
|
||||
true, // rollback = true for close
|
||||
self,
|
||||
self.wal_checkpoint_disabled.get(),
|
||||
self.wal_auto_checkpoint_disabled.get(),
|
||||
)
|
||||
})?;
|
||||
self.transaction_state.set(TransactionState::None);
|
||||
@@ -1416,11 +1416,11 @@ impl Connection {
|
||||
|
||||
self.pager
|
||||
.borrow()
|
||||
.checkpoint_shutdown(self.wal_checkpoint_disabled.get())
|
||||
.checkpoint_shutdown(self.wal_auto_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
pub fn wal_disable_checkpoint(&self) {
|
||||
self.wal_checkpoint_disabled.set(true);
|
||||
pub fn wal_auto_checkpoint_disable(&self) {
|
||||
self.wal_auto_checkpoint_disabled.set(true);
|
||||
}
|
||||
|
||||
pub fn last_insert_rowid(&self) -> i64 {
|
||||
@@ -1895,11 +1895,10 @@ impl Connection {
|
||||
pub fn copy_db(&self, file: &str) -> Result<()> {
|
||||
// use a new PlatformIO instance here to allow for copying in-memory databases
|
||||
let io: Arc<dyn IO> = Arc::new(PlatformIO::new()?);
|
||||
let disabled = false;
|
||||
// checkpoint so everything is in the DB file before copying
|
||||
self.pager
|
||||
.borrow_mut()
|
||||
.wal_checkpoint(disabled, CheckpointMode::Truncate)?;
|
||||
.wal_checkpoint(CheckpointMode::Truncate)?;
|
||||
self.pager.borrow_mut().db_file.copy_to(&*io, file)
|
||||
}
|
||||
|
||||
|
||||
@@ -502,7 +502,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
.end_tx(
|
||||
false, // rollback = false since we're committing
|
||||
&self.connection,
|
||||
self.connection.wal_checkpoint_disabled.get(),
|
||||
self.connection.wal_auto_checkpoint_disabled.get(),
|
||||
)
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))
|
||||
.unwrap();
|
||||
|
||||
@@ -961,7 +961,7 @@ impl Pager {
|
||||
&self,
|
||||
rollback: bool,
|
||||
connection: &Connection,
|
||||
wal_checkpoint_disabled: bool,
|
||||
wal_auto_checkpoint_disabled: bool,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
tracing::trace!("end_tx(rollback={})", rollback);
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
@@ -980,7 +980,7 @@ impl Pager {
|
||||
self.rollback(schema_did_change, connection, is_write)?;
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
}
|
||||
let commit_status = return_if_io!(self.commit_dirty_pages(wal_checkpoint_disabled));
|
||||
let commit_status = return_if_io!(self.commit_dirty_pages(wal_auto_checkpoint_disabled));
|
||||
wal.borrow().end_write_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
|
||||
@@ -1163,7 +1163,7 @@ impl Pager {
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn commit_dirty_pages(
|
||||
&self,
|
||||
wal_checkpoint_disabled: bool,
|
||||
wal_auto_checkpoint_disabled: bool,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
@@ -1226,7 +1226,7 @@ impl Pager {
|
||||
return Ok(IOResult::IO(IOCompletions::Single(c)));
|
||||
}
|
||||
CommitState::AfterSyncWal => {
|
||||
if wal_checkpoint_disabled || !wal.borrow().should_checkpoint() {
|
||||
if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() {
|
||||
self.commit_info.borrow_mut().state = CommitState::Start;
|
||||
break PagerCommitResult::WalWritten;
|
||||
}
|
||||
@@ -1354,7 +1354,7 @@ impl Pager {
|
||||
.expect("Failed to clear page cache");
|
||||
}
|
||||
|
||||
pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> {
|
||||
pub fn checkpoint_shutdown(&self, wal_auto_checkpoint_disabled: bool) -> Result<()> {
|
||||
let mut _attempts = 0;
|
||||
{
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
@@ -1369,24 +1369,19 @@ impl Pager {
|
||||
let c = wal.sync()?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
}
|
||||
self.wal_checkpoint(wal_checkpoint_disabled, CheckpointMode::Passive)?;
|
||||
if !wal_auto_checkpoint_disabled {
|
||||
self.wal_checkpoint(CheckpointMode::Passive)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_checkpoint(
|
||||
&self,
|
||||
wal_checkpoint_disabled: bool,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<CheckpointResult> {
|
||||
pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_checkpoint() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
if wal_checkpoint_disabled {
|
||||
return Ok(CheckpointResult::default());
|
||||
}
|
||||
|
||||
let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?;
|
||||
|
||||
|
||||
@@ -1168,6 +1168,12 @@ impl Wal for WalFile {
|
||||
let page_size = self.page_size();
|
||||
let mut frame = vec![0u8; page_size as usize + WAL_FRAME_HEADER_SIZE];
|
||||
let mut seen = HashSet::new();
|
||||
turso_assert!(
|
||||
frame_count >= frame_watermark,
|
||||
"frame_count must be not less than frame_watermark: {} vs {}",
|
||||
frame_count,
|
||||
frame_watermark
|
||||
);
|
||||
let mut pages = Vec::with_capacity((frame_count - frame_watermark) as usize);
|
||||
for frame_no in frame_watermark + 1..=frame_count {
|
||||
let c = self.read_frame_raw(frame_no, &mut frame)?;
|
||||
|
||||
@@ -581,7 +581,7 @@ impl Program {
|
||||
let cacheflush_status = pager.end_tx(
|
||||
rollback,
|
||||
connection,
|
||||
connection.wal_checkpoint_disabled.get(),
|
||||
connection.wal_auto_checkpoint_disabled.get(),
|
||||
)?;
|
||||
match cacheflush_status {
|
||||
IOResult::Done(_) => {
|
||||
|
||||
@@ -2,9 +2,11 @@ use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
database_sync_operations::{
|
||||
checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes,
|
||||
transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult,
|
||||
checkpoint_wal_file, connect, connect_untracked, db_bootstrap, reset_wal_file,
|
||||
transfer_logical_changes, transfer_physical_changes, wait_full_body, wal_pull, wal_push,
|
||||
WalPullResult,
|
||||
},
|
||||
database_tape::DatabaseTape,
|
||||
errors::Error,
|
||||
io_operations::IoOperations,
|
||||
protocol_io::ProtocolIO,
|
||||
@@ -22,6 +24,7 @@ pub struct DatabaseSyncEngineOpts {
|
||||
pub struct DatabaseSyncEngine<P: ProtocolIO> {
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
protocol: Arc<P>,
|
||||
draft_tape: DatabaseTape,
|
||||
draft_path: String,
|
||||
synced_path: String,
|
||||
meta_path: String,
|
||||
@@ -74,10 +77,13 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
path: &str,
|
||||
opts: DatabaseSyncEngineOpts,
|
||||
) -> Result<Self> {
|
||||
let draft_path = format!("{path}-draft");
|
||||
let draft_tape = io.open_tape(&draft_path, true)?;
|
||||
let mut db = Self {
|
||||
io,
|
||||
protocol,
|
||||
draft_path: format!("{path}-draft"),
|
||||
draft_tape,
|
||||
draft_path,
|
||||
synced_path: format!("{path}-synced"),
|
||||
meta_path: format!("{path}-info"),
|
||||
opts,
|
||||
@@ -90,8 +96,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
/// Create database connection and appropriately configure it before use
|
||||
pub async fn connect(&self, coro: &Coro) -> Result<Arc<turso_core::Connection>> {
|
||||
let db = self.io.open_tape(&self.draft_path, true)?;
|
||||
db.connect(coro).await
|
||||
connect(coro, &self.draft_tape).await
|
||||
}
|
||||
|
||||
/// Sync all new changes from remote DB and apply them locally
|
||||
@@ -114,13 +119,11 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
{
|
||||
// we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote
|
||||
// so, we pass 'capture: true' as we need to preserve all changes for future push of WAL
|
||||
let draft = self.io.open_tape(&self.draft_path, true)?;
|
||||
tracing::info!("opened draft");
|
||||
let synced = self.io.open_tape(&self.synced_path, true)?;
|
||||
tracing::info!("opened synced");
|
||||
|
||||
// we will start wal write session for Draft DB in order to hold write lock during transfer of changes
|
||||
let mut draft_session = WalSession::new(draft.connect(coro).await?);
|
||||
let mut draft_session = WalSession::new(connect(coro, &self.draft_tape).await?);
|
||||
draft_session.begin()?;
|
||||
|
||||
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
|
||||
@@ -128,7 +131,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
// transfer logical changes to the Synced DB in order to later execute physical "rebase" operation
|
||||
let client_id = &self.meta().client_unique_id;
|
||||
transfer_logical_changes(coro, &draft, &synced, client_id, true).await?;
|
||||
transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, true).await?;
|
||||
|
||||
// now we are ready to do the rebase: let's transfer physical changes from Synced to Draft
|
||||
let synced_wal_watermark = self.meta().synced_wal_match_watermark;
|
||||
@@ -172,7 +175,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
);
|
||||
{
|
||||
let synced = self.io.open_tape(&self.synced_path, false)?;
|
||||
checkpoint_wal_file(coro, &synced.connect_untracked()?).await?;
|
||||
checkpoint_wal_file(coro, &connect_untracked(&synced)?).await?;
|
||||
update_meta(
|
||||
coro,
|
||||
self.protocol.as_ref(),
|
||||
@@ -209,14 +212,13 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
// we will push Synced WAL to the remote
|
||||
// so, we pass 'capture: false' as we don't need to preserve changes made to Synced WAL in turso_cdc
|
||||
let draft = self.io.open_tape(&self.draft_path, true)?;
|
||||
let synced = self.io.open_tape(&self.synced_path, false)?;
|
||||
|
||||
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
|
||||
self.synced_is_dirty = true;
|
||||
|
||||
let client_id = &self.meta().client_unique_id;
|
||||
transfer_logical_changes(coro, &draft, &synced, client_id, false).await?;
|
||||
transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?;
|
||||
|
||||
self.push_synced_to_remote(coro).await?;
|
||||
Ok(())
|
||||
@@ -287,7 +289,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
self.synced_path,
|
||||
);
|
||||
let synced = self.io.open_tape(&self.synced_path, false)?;
|
||||
let synced_conn = synced.connect(coro).await?;
|
||||
let synced_conn = connect(coro, &synced).await?;
|
||||
let mut wal = WalSession::new(synced_conn);
|
||||
|
||||
let generation = self.meta().synced_generation;
|
||||
@@ -333,7 +335,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
self.meta().client_unique_id
|
||||
);
|
||||
let synced = self.io.open_tape(&self.synced_path, false)?;
|
||||
let synced_conn = synced.connect(coro).await?;
|
||||
let synced_conn = connect(coro, &synced).await?;
|
||||
|
||||
let mut wal = WalSession::new(synced_conn);
|
||||
wal.begin()?;
|
||||
@@ -450,6 +452,7 @@ pub mod tests {
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use rand::RngCore;
|
||||
use tokio::join;
|
||||
|
||||
use crate::{
|
||||
database_sync_engine::DatabaseSyncEngineOpts,
|
||||
@@ -476,12 +479,12 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -588,6 +591,60 @@ pub mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_update_sync_concurrent() {
|
||||
deterministic_runtime(async || {
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::MemoryIO::new());
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let server_path = dir.path().join("server.db");
|
||||
let ctx = Arc::new(TestContext::new(seed_u64()));
|
||||
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
|
||||
protocol
|
||||
.server
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
protocol
|
||||
.server
|
||||
.execute("INSERT INTO t VALUES ('hello', 'world')", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
runner.init(":memory:", opts).await.unwrap();
|
||||
let conn = runner.connect().await.unwrap();
|
||||
|
||||
let syncs = async move {
|
||||
for i in 0..10 {
|
||||
tracing::info!("sync attempt #{i}");
|
||||
runner.sync().await.unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
let updates = async move {
|
||||
for i in 0..10 {
|
||||
tracing::info!("update attempt #{i}");
|
||||
let sql = format!("INSERT INTO t VALUES ('key-{i}', 'value-{i}')");
|
||||
match conn.execute(&sql, ()).await {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.to_string().contains("database is locked") => {}
|
||||
Err(err) => panic!("update failed: {err}"),
|
||||
}
|
||||
ctx.random_sleep_n(50).await;
|
||||
}
|
||||
};
|
||||
|
||||
join!(updates, syncs);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_many_pulls_big_payloads() {
|
||||
deterministic_runtime(async || {
|
||||
@@ -599,13 +656,13 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -659,12 +716,12 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -723,12 +780,12 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone());
|
||||
let local_path = dir.path().join("local.db");
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
server
|
||||
.server
|
||||
@@ -794,12 +851,17 @@ pub mod tests {
|
||||
const CLIENTS: usize = 8;
|
||||
for i in 0..CLIENTS {
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{i}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{i}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{i}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
dbs.push(runner);
|
||||
}
|
||||
|
||||
@@ -877,12 +939,16 @@ pub mod tests {
|
||||
let sync_lock = sync_lock.clone();
|
||||
async move {
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone());
|
||||
let local_path = dir.join(format!("local-{i}.db"));
|
||||
let local_path = dir
|
||||
.join(format!("local-{i}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{i}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
runner.pull().await.unwrap();
|
||||
let conn = runner.connect().await.unwrap();
|
||||
for query in queries {
|
||||
@@ -933,12 +999,17 @@ pub mod tests {
|
||||
it += 1;
|
||||
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{it}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{it}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. });
|
||||
|
||||
@@ -1003,12 +1074,17 @@ pub mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{it}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{it}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
@@ -1078,12 +1154,17 @@ pub mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
let local_path = dir
|
||||
.path()
|
||||
.join(format!("local-{it}.db"))
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: format!("id-{it}"),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(local_path, opts).await.unwrap();
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
let conn = runner.connect().await.unwrap();
|
||||
|
||||
|
||||
@@ -35,6 +35,18 @@ pub enum WalPushResult {
|
||||
NeedCheckpoint,
|
||||
}
|
||||
|
||||
pub async fn connect(coro: &Coro, tape: &DatabaseTape) -> Result<Arc<turso_core::Connection>> {
|
||||
let conn = tape.connect(coro).await?;
|
||||
conn.wal_auto_checkpoint_disable();
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
pub fn connect_untracked(tape: &DatabaseTape) -> Result<Arc<turso_core::Connection>> {
|
||||
let conn = tape.connect_untracked()?;
|
||||
conn.wal_auto_checkpoint_disable();
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
/// Bootstrap multiple DB files from latest generation from remote
|
||||
pub async fn db_bootstrap<C: ProtocolIO>(
|
||||
coro: &Coro,
|
||||
@@ -257,8 +269,8 @@ pub async fn transfer_logical_changes(
|
||||
bump_pull_gen: bool,
|
||||
) -> Result<()> {
|
||||
tracing::debug!("transfer_logical_changes: client_id={client_id}");
|
||||
let source_conn = source.connect_untracked()?;
|
||||
let target_conn = target.connect_untracked()?;
|
||||
let source_conn = connect_untracked(source)?;
|
||||
let target_conn = connect_untracked(target)?;
|
||||
|
||||
// fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure
|
||||
let source_pull_gen = 'source_pull_gen: {
|
||||
@@ -333,7 +345,7 @@ pub async fn transfer_logical_changes(
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
|
||||
let source_schema_cookie = source.connect_untracked()?.read_schema_version()?;
|
||||
let source_schema_cookie = connect_untracked(source)?.read_schema_version()?;
|
||||
|
||||
let mut session = target.start_replay_session(coro, replay_opts).await?;
|
||||
|
||||
@@ -414,7 +426,7 @@ pub async fn transfer_physical_changes(
|
||||
) -> Result<u64> {
|
||||
tracing::debug!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}");
|
||||
|
||||
let source_conn = source.connect(coro).await?;
|
||||
let source_conn = connect(coro, source).await?;
|
||||
let mut source_session = WalSession::new(source_conn.clone());
|
||||
source_session.begin()?;
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ pub type Result<T> = std::result::Result<T, errors::Error>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::{select, sync::Mutex};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
@@ -84,23 +84,12 @@ mod tests {
|
||||
db: None,
|
||||
}
|
||||
}
|
||||
pub async fn init(
|
||||
&mut self,
|
||||
local_path: PathBuf,
|
||||
opts: DatabaseSyncEngineOpts,
|
||||
) -> Result<()> {
|
||||
pub async fn init(&mut self, local_path: &str, opts: DatabaseSyncEngineOpts) -> Result<()> {
|
||||
let io = self.io.clone();
|
||||
let server = self.sync_server.clone();
|
||||
let db = self
|
||||
.run(genawaiter::sync::Gen::new(|coro| async move {
|
||||
DatabaseSyncEngine::new(
|
||||
&coro,
|
||||
io,
|
||||
Arc::new(server),
|
||||
local_path.to_str().unwrap(),
|
||||
opts,
|
||||
)
|
||||
.await
|
||||
DatabaseSyncEngine::new(&coro, io, Arc::new(server), local_path, opts).await
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -77,6 +77,13 @@ impl TestContext {
|
||||
let delay = self.rng.lock().await.next_u64() % 1000;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await
|
||||
}
|
||||
pub async fn random_sleep_n(&self, n: u64) {
|
||||
let delay = {
|
||||
let mut rng = self.rng.lock().await;
|
||||
rng.next_u64() % 1000 * (rng.next_u64() % n + 1)
|
||||
};
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await
|
||||
}
|
||||
|
||||
pub async fn rng(&self) -> tokio::sync::MutexGuard<ChaCha8Rng> {
|
||||
self.rng.lock().await
|
||||
|
||||
@@ -1476,7 +1476,7 @@ pub unsafe extern "C" fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> ffi:
|
||||
}
|
||||
let db: &mut sqlite3 = &mut *db;
|
||||
let db = db.inner.lock().unwrap();
|
||||
db.conn.wal_disable_checkpoint();
|
||||
db.conn.wal_auto_checkpoint_disable();
|
||||
SQLITE_OK
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user