From 1185298670d6bfd4b2e05019b792798daa9be44c Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 10:37:26 +0400 Subject: [PATCH] fix replay generator --- sync/engine/src/database_replay_generator.rs | 9 ++++----- sync/engine/src/database_sync_operations.rs | 13 ++++++------- sync/engine/src/database_tape.rs | 8 ++++---- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sync/engine/src/database_replay_generator.rs b/sync/engine/src/database_replay_generator.rs index daa3b9f0f..02532d825 100644 --- a/sync/engine/src/database_replay_generator.rs +++ b/sync/engine/src/database_replay_generator.rs @@ -264,13 +264,12 @@ impl DatabaseReplayGenerator { let update = self.update_query(coro, table_name, &columns).await?; Ok(update) } else { - let columns = [true].repeat(after.len()); - let update = self.update_query(coro, table_name, &columns).await?; - Ok(update) + let upsert = self.upsert_query(coro, table_name, after.len()).await?; + Ok(upsert) } } DatabaseTapeRowChangeType::Insert { after } => { - let insert = self.insert_query(coro, table_name, after.len()).await?; + let insert = self.upsert_query(coro, table_name, after.len()).await?; Ok(insert) } } @@ -320,7 +319,7 @@ impl DatabaseReplayGenerator { is_ddl_replay: false, }) } - pub(crate) async fn insert_query( + pub(crate) async fn upsert_query( &self, coro: &Coro, table_name: &str, diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index d82ace6a4..5197e04e0 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -216,7 +216,7 @@ pub async fn wal_pull_to_file_v1( encoding: PageUpdatesEncodingReq::Raw as i32, server_revision: String::new(), client_revision: revision.to_string(), - long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_secs() as u32).unwrap_or(0), + long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0), server_pages: BytesMut::new().into(), client_pages: BytesMut::new().into(), }; @@ -807,12 +807,11 @@ pub async fn push_logical_changes( }), DatabaseTapeOperation::RowChange(change) => { let replay_info = generator.replay_info(coro, &change).await?; - let change_type = (&change.change).into(); match change.change { DatabaseTapeRowChangeType::Delete { before } => { let values = generator.replay_values( &replay_info, - change_type, + replay_info.change_type, change.id, before, None, @@ -829,7 +828,7 @@ pub async fn push_logical_changes( DatabaseTapeRowChangeType::Insert { after } => { let values = generator.replay_values( &replay_info, - change_type, + replay_info.change_type, change.id, after, None, @@ -850,7 +849,7 @@ pub async fn push_logical_changes( } => { let values = generator.replay_values( &replay_info, - change_type, + replay_info.change_type, change.id, after, Some(updates), @@ -871,7 +870,7 @@ pub async fn push_logical_changes( } => { let values = generator.replay_values( &replay_info, - change_type, + replay_info.change_type, change.id, after, None, @@ -1361,7 +1360,7 @@ pub async fn wait_proto_message( Error::DatabaseSyncEngineError(format!("unable to deserialize protobuf message: {e}")) })?; let _ = bytes.split_to(message_length + prefix_length); - tracing::debug!( + tracing::trace!( "wait_proto_message: elapsed={:?}", std::time::Instant::now().duration_since(start_time) ); diff --git a/sync/engine/src/database_tape.rs b/sync/engine/src/database_tape.rs index b98cd0847..b8dfdb820 100644 --- a/sync/engine/src/database_tape.rs +++ b/sync/engine/src/database_tape.rs @@ -10,7 +10,7 @@ use crate::{ database_sync_operations::WAL_FRAME_HEADER, errors::Error, types::{ - Coro, DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange, + Coro, DatabaseChange, DatabaseChangeType, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType, ProtocolCommand, }, wal_session::WalSession, @@ -584,7 +584,7 @@ impl DatabaseReplaySession { cached.stmt.reset(); let values = self.generator.replay_values( &cached.info, - change_type, + DatabaseChangeType::Delete, change.id, before, None, @@ -600,7 +600,7 @@ impl DatabaseReplaySession { cached.stmt.reset(); let values = self.generator.replay_values( &cached.info, - change_type, + DatabaseChangeType::Insert, change.id, after, None, @@ -643,7 +643,7 @@ impl DatabaseReplaySession { table, columns ); - let info = self.generator.insert_query(coro, table, columns).await?; + let info = self.generator.upsert_query(coro, table, columns).await?; let stmt = self.conn.prepare(&info.query)?; self.cached_insert_stmt .insert(key.clone(), CachedStmt { stmt, info });