From 36106a2d6c009f18f9246dba73e21dc7baaa7765 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 14 Aug 2025 17:16:44 +0400 Subject: [PATCH] parse bin record early --- packages/turso-sync-engine/src/types.rs | 70 +++++++++++++++++-------- 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/packages/turso-sync-engine/src/types.rs b/packages/turso-sync-engine/src/types.rs index c4fd297d1..5b32462c0 100644 --- a/packages/turso-sync-engine/src/types.rs +++ b/packages/turso-sync-engine/src/types.rs @@ -75,27 +75,31 @@ impl DatabaseChange { pub fn into_apply(self) -> Result { let tape_change = match self.change_type { DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Delete { - before: self.before.ok_or_else(|| { + before: parse_bin_record(self.before.ok_or_else(|| { Error::DatabaseTapeError( "cdc_mode must be set to either 'full' or 'before'".to_string(), ) - })?, + })?)?, }, DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update { - before: self.before.ok_or_else(|| { + before: parse_bin_record(self.before.ok_or_else(|| { Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string()) - })?, - after: self.after.ok_or_else(|| { + })?)?, + after: parse_bin_record(self.after.ok_or_else(|| { Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string()) - })?, - updates: self.updates, + })?)?, + updates: if let Some(updates) = self.updates { + Some(parse_bin_record(updates)?) + } else { + None + }, }, DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert { - after: self.after.ok_or_else(|| { + after: parse_bin_record(self.after.ok_or_else(|| { Error::DatabaseTapeError( "cdc_mode must be set to either 'full' or 'after'".to_string(), ) - })?, + })?)?, }, }; Ok(DatabaseTapeRowChange { @@ -110,29 +114,29 @@ impl DatabaseChange { pub fn into_revert(self) -> Result { let tape_change = match self.change_type { DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Insert { - after: self.before.ok_or_else(|| { + after: parse_bin_record(self.before.ok_or_else(|| { Error::DatabaseTapeError( "cdc_mode must be set to either 'full' or 'before'".to_string(), ) - })?, + })?)?, }, DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update { - before: self.after.ok_or_else(|| { + before: parse_bin_record(self.after.ok_or_else(|| { Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string()) - })?, - after: self.before.ok_or_else(|| { + })?)?, + after: parse_bin_record(self.before.ok_or_else(|| { Error::DatabaseTapeError( "cdc_mode must be set to either 'full' or 'before'".to_string(), ) - })?, + })?)?, updates: None, }, DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete { - before: self.after.ok_or_else(|| { + before: parse_bin_record(self.after.ok_or_else(|| { Error::DatabaseTapeError( "cdc_mode must be set to either 'full' or 'after'".to_string(), ) - })?, + })?)?, }, }; Ok(DatabaseTapeRowChange { @@ -197,18 +201,28 @@ impl TryFrom<&turso_core::Row> for DatabaseChange { pub enum DatabaseTapeRowChangeType { Delete { - before: Vec, + before: Vec, }, Update { - before: Vec, - after: Vec, - updates: Option>, + before: Vec, + after: Vec, + updates: Option>, }, Insert { - after: Vec, + after: Vec, }, } +impl From<&DatabaseTapeRowChangeType> for DatabaseChangeType { + fn from(value: &DatabaseTapeRowChangeType) -> Self { + match value { + DatabaseTapeRowChangeType::Delete { .. } => DatabaseChangeType::Delete, + DatabaseTapeRowChangeType::Update { .. } => DatabaseChangeType::Update, + DatabaseTapeRowChangeType::Insert { .. } => DatabaseChangeType::Insert, + } + } +} + /// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary /// /// This helps [crate::database_tape::DatabaseTapeSession] to properly maintain transaction state and COMMIT or ROLLBACK changes in appropriate time @@ -286,3 +300,15 @@ pub enum ProtocolCommand { // Protocol waits for some IO - caller must spin turso-db IO event loop and also drive ProtocolIO IO, } + +pub fn parse_bin_record(bin_record: Vec) -> Result> { + let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record); + let mut cursor = turso_core::types::RecordCursor::new(); + let columns = cursor.count(&record); + let mut values = Vec::with_capacity(columns); + for i in 0..columns { + let value = cursor.get_value(&record, i)?; + values.push(value.to_owned()); + } + Ok(values) +}