fix replay generator

This commit is contained in:
Nikita Sivukhin
2025-09-17 10:37:26 +04:00
parent e68b642f4f
commit 1185298670
3 changed files with 14 additions and 16 deletions

View File

@@ -264,13 +264,12 @@ impl DatabaseReplayGenerator {
let update = self.update_query(coro, table_name, &columns).await?;
Ok(update)
} else {
let columns = [true].repeat(after.len());
let update = self.update_query(coro, table_name, &columns).await?;
Ok(update)
let upsert = self.upsert_query(coro, table_name, after.len()).await?;
Ok(upsert)
}
}
DatabaseTapeRowChangeType::Insert { after } => {
let insert = self.insert_query(coro, table_name, after.len()).await?;
let insert = self.upsert_query(coro, table_name, after.len()).await?;
Ok(insert)
}
}
@@ -320,7 +319,7 @@ impl DatabaseReplayGenerator {
is_ddl_replay: false,
})
}
pub(crate) async fn insert_query<Ctx>(
pub(crate) async fn upsert_query<Ctx>(
&self,
coro: &Coro<Ctx>,
table_name: &str,

View File

@@ -216,7 +216,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
encoding: PageUpdatesEncodingReq::Raw as i32,
server_revision: String::new(),
client_revision: revision.to_string(),
long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_secs() as u32).unwrap_or(0),
long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0),
server_pages: BytesMut::new().into(),
client_pages: BytesMut::new().into(),
};
@@ -807,12 +807,11 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
}),
DatabaseTapeOperation::RowChange(change) => {
let replay_info = generator.replay_info(coro, &change).await?;
let change_type = (&change.change).into();
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
before,
None,
@@ -829,7 +828,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
DatabaseTapeRowChangeType::Insert { after } => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
None,
@@ -850,7 +849,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
} => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
Some(updates),
@@ -871,7 +870,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
} => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
None,
@@ -1361,7 +1360,7 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
Error::DatabaseSyncEngineError(format!("unable to deserialize protobuf message: {e}"))
})?;
let _ = bytes.split_to(message_length + prefix_length);
tracing::debug!(
tracing::trace!(
"wait_proto_message: elapsed={:?}",
std::time::Instant::now().duration_since(start_time)
);

View File

@@ -10,7 +10,7 @@ use crate::{
database_sync_operations::WAL_FRAME_HEADER,
errors::Error,
types::{
Coro, DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange,
Coro, DatabaseChange, DatabaseChangeType, DatabaseTapeOperation, DatabaseTapeRowChange,
DatabaseTapeRowChangeType, ProtocolCommand,
},
wal_session::WalSession,
@@ -584,7 +584,7 @@ impl DatabaseReplaySession {
cached.stmt.reset();
let values = self.generator.replay_values(
&cached.info,
change_type,
DatabaseChangeType::Delete,
change.id,
before,
None,
@@ -600,7 +600,7 @@ impl DatabaseReplaySession {
cached.stmt.reset();
let values = self.generator.replay_values(
&cached.info,
change_type,
DatabaseChangeType::Insert,
change.id,
after,
None,
@@ -643,7 +643,7 @@ impl DatabaseReplaySession {
table,
columns
);
let info = self.generator.insert_query(coro, table, columns).await?;
let info = self.generator.upsert_query(coro, table, columns).await?;
let stmt = self.conn.prepare(&info.query)?;
self.cached_insert_stmt
.insert(key.clone(), CachedStmt { stmt, info });