parse bin record early

This commit is contained in:
Nikita Sivukhin
2025-08-14 17:16:44 +04:00
parent 71bbc36f61
commit 36106a2d6c

View File

@@ -75,27 +75,31 @@ impl DatabaseChange {
pub fn into_apply(self) -> Result<DatabaseTapeRowChange> {
let tape_change = match self.change_type {
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Delete {
before: self.before.ok_or_else(|| {
before: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?,
})?)?,
},
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
before: self.before.ok_or_else(|| {
before: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?,
after: self.after.ok_or_else(|| {
})?)?,
after: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?,
updates: self.updates,
})?)?,
updates: if let Some(updates) = self.updates {
Some(parse_bin_record(updates)?)
} else {
None
},
},
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert {
after: self.after.ok_or_else(|| {
after: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'after'".to_string(),
)
})?,
})?)?,
},
};
Ok(DatabaseTapeRowChange {
@@ -110,29 +114,29 @@ impl DatabaseChange {
pub fn into_revert(self) -> Result<DatabaseTapeRowChange> {
let tape_change = match self.change_type {
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Insert {
after: self.before.ok_or_else(|| {
after: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?,
})?)?,
},
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
before: self.after.ok_or_else(|| {
before: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?,
after: self.before.ok_or_else(|| {
})?)?,
after: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?,
})?)?,
updates: None,
},
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete {
before: self.after.ok_or_else(|| {
before: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'after'".to_string(),
)
})?,
})?)?,
},
};
Ok(DatabaseTapeRowChange {
@@ -197,18 +201,28 @@ impl TryFrom<&turso_core::Row> for DatabaseChange {
pub enum DatabaseTapeRowChangeType {
Delete {
before: Vec<u8>,
before: Vec<turso_core::Value>,
},
Update {
before: Vec<u8>,
after: Vec<u8>,
updates: Option<Vec<u8>>,
before: Vec<turso_core::Value>,
after: Vec<turso_core::Value>,
updates: Option<Vec<turso_core::Value>>,
},
Insert {
after: Vec<u8>,
after: Vec<turso_core::Value>,
},
}
impl From<&DatabaseTapeRowChangeType> for DatabaseChangeType {
fn from(value: &DatabaseTapeRowChangeType) -> Self {
match value {
DatabaseTapeRowChangeType::Delete { .. } => DatabaseChangeType::Delete,
DatabaseTapeRowChangeType::Update { .. } => DatabaseChangeType::Update,
DatabaseTapeRowChangeType::Insert { .. } => DatabaseChangeType::Insert,
}
}
}
/// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary
///
/// This helps [crate::database_tape::DatabaseTapeSession] to properly maintain transaction state and COMMIT or ROLLBACK changes in appropriate time
@@ -286,3 +300,15 @@ pub enum ProtocolCommand {
// Protocol waits for some IO - caller must spin turso-db IO event loop and also drive ProtocolIO
IO,
}
pub fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso_core::Value>> {
let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record);
let mut cursor = turso_core::types::RecordCursor::new();
let columns = cursor.count(&record);
let mut values = Vec::with_capacity(columns);
for i in 0..columns {
let value = cursor.get_value(&record, i)?;
values.push(value.to_owned());
}
Ok(values)
}