ignore changes in the turso_sync_last_change_id

This commit is contained in:
Nikita Sivukhin
2025-08-14 12:39:44 +04:00
parent 8c9d648852
commit 34a7b2ffd4

View File

@@ -254,7 +254,8 @@ pub async fn wal_push<C: ProtocolIO>(
}
}
const TURSO_SYNC_META_TABLE: &str =
const TURSO_SYNC_TABLE_NAME: &str = "turso_sync_last_change_id";
const TURSO_SYNC_CREATE_TABLE: &str =
"CREATE TABLE IF NOT EXISTS turso_sync_last_change_id (client_id TEXT PRIMARY KEY, pull_gen INTEGER, change_id INTEGER)";
const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &str =
"SELECT pull_gen, change_id FROM turso_sync_last_change_id WHERE client_id = ?";
@@ -303,7 +304,7 @@ pub async fn transfer_logical_changes(
);
// fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure
let mut schema_stmt = target_conn.prepare(TURSO_SYNC_META_TABLE)?;
let mut schema_stmt = target_conn.prepare(TURSO_SYNC_CREATE_TABLE)?;
exec_stmt(coro, &mut schema_stmt).await?;
let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?;
@@ -364,13 +365,16 @@ pub async fn transfer_logical_changes(
while let Some(operation) = changes.next(coro).await? {
match &operation {
DatabaseTapeOperation::RowChange(change) => {
rows_changed += 1;
assert!(
last_change_id.is_none() || last_change_id.unwrap() < change.change_id,
"change id must be strictly increasing: last_change_id={:?}, change.change_id={}",
last_change_id,
change.change_id
);
if change.table_name == TURSO_SYNC_TABLE_NAME {
continue;
}
rows_changed += 1;
// we give user full control over CDC table - so let's not emit assert here for now
if last_change_id.is_some() && last_change_id.unwrap() + 1 != change.change_id {
tracing::warn!(
@@ -381,35 +385,32 @@ pub async fn transfer_logical_changes(
}
last_change_id = Some(change.change_id);
}
DatabaseTapeOperation::Commit => {
if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) {
tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
// update turso_sync_last_change_id table with new value before commit
let mut set_last_change_id_stmt =
session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?;
let (next_pull_gen, next_change_id) = if bump_pull_gen {
(source_pull_gen + 1, 0)
} else {
(source_pull_gen, last_change_id.unwrap_or(0))
};
tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
set_last_change_id_stmt
.bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen));
set_last_change_id_stmt
.bind_at(2.try_into().unwrap(), Value::Integer(next_change_id));
set_last_change_id_stmt
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
}
if bump_pull_gen {
let session_schema_cookie = session.conn().read_schema_version()?;
if session_schema_cookie <= source_schema_cookie {
session
.conn()
.write_schema_version(source_schema_cookie + 1)?;
}
DatabaseTapeOperation::Commit if rows_changed > 0 || bump_pull_gen => {
tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
// update turso_sync_last_change_id table with new value before commit
let mut set_last_change_id_stmt =
session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?;
let (next_pull_gen, next_change_id) = if bump_pull_gen {
(source_pull_gen + 1, 0)
} else {
(source_pull_gen, last_change_id.unwrap_or(0))
};
tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
set_last_change_id_stmt
.bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen));
set_last_change_id_stmt
.bind_at(2.try_into().unwrap(), Value::Integer(next_change_id));
set_last_change_id_stmt
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
let session_schema_cookie = session.conn().read_schema_version()?;
if session_schema_cookie <= source_schema_cookie {
session
.conn()
.write_schema_version(source_schema_cookie + 1)?;
}
}
_ => {}
}
session.replay(coro, operation).await?;
}