do not push wal unnecessary when nothing was changed locally

This commit is contained in:
Nikita Sivukhin
2025-08-13 20:22:10 +04:00
parent aaf7b39d96
commit 887b25dd00
4 changed files with 49 additions and 38 deletions

View File

@@ -221,6 +221,8 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?;
self.push_synced_to_remote(coro).await?;
self.reset_synced_if_dirty(coro).await?;
Ok(())
}
@@ -772,7 +774,7 @@ pub mod tests {
.await
.unwrap();
runner.pull().await.unwrap();
runner.sync().await.unwrap();
// create connection in outer scope in order to prevent Database from being dropped in between of pull operations
let conn = runner.connect().await.unwrap();
@@ -791,7 +793,7 @@ pub mod tests {
}
tracing::info!("pull attempt={}", attempt);
runner.pull().await.unwrap();
runner.sync().await.unwrap();
let expected = expected
.iter()

View File

@@ -361,7 +361,6 @@ pub async fn transfer_logical_changes(
};
let mut rows_changed = 0;
let mut changes = source.iterate_changes(iterate_opts)?;
let mut updated = false;
while let Some(operation) = changes.next(coro).await? {
match &operation {
DatabaseTapeOperation::RowChange(change) => {
@@ -381,35 +380,36 @@ pub async fn transfer_logical_changes(
);
}
last_change_id = Some(change.change_id);
updated = true;
}
DatabaseTapeOperation::Commit if updated || bump_pull_gen => {
tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
// update turso_sync_last_change_id table with new value before commit
let mut set_last_change_id_stmt =
session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?;
let (next_pull_gen, next_change_id) = if bump_pull_gen {
(source_pull_gen + 1, 0)
} else {
(source_pull_gen, last_change_id.unwrap_or(0))
};
tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}");
set_last_change_id_stmt
.bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen));
set_last_change_id_stmt
.bind_at(2.try_into().unwrap(), Value::Integer(next_change_id));
set_last_change_id_stmt
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
let session_schema_cookie = session.conn().read_schema_version()?;
if session_schema_cookie <= source_schema_cookie {
session
.conn()
.write_schema_version(source_schema_cookie + 1)?;
DatabaseTapeOperation::Commit => {
if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) {
tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
// update turso_sync_last_change_id table with new value before commit
let mut set_last_change_id_stmt =
session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?;
let (next_pull_gen, next_change_id) = if bump_pull_gen {
(source_pull_gen + 1, 0)
} else {
(source_pull_gen, last_change_id.unwrap_or(0))
};
tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
set_last_change_id_stmt
.bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen));
set_last_change_id_stmt
.bind_at(2.try_into().unwrap(), Value::Integer(next_change_id));
set_last_change_id_stmt
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
}
if bump_pull_gen {
let session_schema_cookie = session.conn().read_schema_version()?;
if session_schema_cookie <= source_schema_cookie {
session
.conn()
.write_schema_version(source_schema_cookie + 1)?;
}
}
}
_ => {}
}
session.replay(coro, operation).await?;
}
@@ -653,12 +653,16 @@ pub mod tests {
let mut gen = genawaiter::sync::Gen::new(|coro| async move {
let conn1 = db1.connect(&coro).await?;
conn1.execute("CREATE TABLE t(x, y)")?;
conn1.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")?;
conn1.execute("CREATE TABLE t(x, y)").unwrap();
conn1
.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")
.unwrap();
let conn2 = db2.connect(&coro).await?;
let conn2 = db2.connect(&coro).await.unwrap();
transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?;
transfer_logical_changes(&coro, &db1, &db2, "id-1", false)
.await
.unwrap();
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
@@ -674,8 +678,10 @@ pub mod tests {
]
);
conn1.execute("INSERT INTO t VALUES (7, 8)")?;
transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?;
conn1.execute("INSERT INTO t VALUES (7, 8)").unwrap();
transfer_logical_changes(&coro, &db1, &db2, "id-1", false)
.await
.unwrap();
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();

View File

@@ -167,12 +167,14 @@ impl DatabaseTape {
opts: DatabaseReplaySessionOpts,
) -> Result<DatabaseReplaySession> {
tracing::debug!("opening replay session");
let conn = self.connect(coro).await?;
conn.execute("BEGIN IMMEDIATE")?;
Ok(DatabaseReplaySession {
conn: self.connect(coro).await?,
conn,
cached_delete_stmt: HashMap::new(),
cached_insert_stmt: HashMap::new(),
cached_update_stmt: HashMap::new(),
in_txn: false,
in_txn: true,
opts,
})
}
@@ -407,7 +409,7 @@ impl DatabaseChangesIterator {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DatabaseReplaySessionOpts {
pub use_implicit_rowid: bool,
}

View File

@@ -313,6 +313,7 @@ impl TestSyncServer {
pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> {
let conn = self.db.connect()?;
conn.execute(sql, params).await?;
tracing::debug!("sync_frames_from_conn after execute");
self.sync_frames_from_conn(&conn).await?;
Ok(())
}