mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-01 07:24:19 +01:00
Merge 'turso-sync package: initial commit' from Nikita Sivukhin
This PR introduce `turso-sync` package which will provide additional sync features build on top of the `turso` and `turso-core` packages. In this PR `turso-sync` introduces simple `DatabaseTape` wrapper which allows to extract operations from CDC table and apply them (potentially, to completely different database). Closes #2306
This commit is contained in:
8
.github/labeler.yml
vendored
8
.github/labeler.yml
vendored
@@ -96,6 +96,14 @@ vector:
|
||||
- changed-files:
|
||||
- any-glob-to-any-file: "core/vector/*"
|
||||
|
||||
turso-serverless:
|
||||
- changed-files:
|
||||
- any-glob-to-any-file: 'packages/turso-serverless'
|
||||
|
||||
turso-sync:
|
||||
- changed-files:
|
||||
- any-glob-to-any-file: 'packages/turso-sync'
|
||||
|
||||
antithesis:
|
||||
- changed-files:
|
||||
- any-glob-to-any-file:
|
||||
|
||||
1036
Cargo.lock
generated
1036
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -26,6 +26,7 @@ members = [
|
||||
"testing/sqlite_test_ext",
|
||||
"tests",
|
||||
"vendored/sqlite3-parser/sqlparser_bench",
|
||||
"packages/turso-sync",
|
||||
]
|
||||
exclude = ["perf/latency/limbo"]
|
||||
|
||||
@@ -37,6 +38,7 @@ license = "MIT"
|
||||
repository = "https://github.com/tursodatabase/turso"
|
||||
|
||||
[workspace.dependencies]
|
||||
turso = { path = "bindings/rust", version = "0.1.3" }
|
||||
limbo_completion = { path = "extensions/completion", version = "0.1.3" }
|
||||
turso_core = { path = "core", version = "0.1.3" }
|
||||
limbo_crypto = { path = "extensions/crypto", version = "0.1.3" }
|
||||
|
||||
@@ -110,6 +110,18 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<turso_core::Value> for Value {
|
||||
fn from(val: turso_core::Value) -> Self {
|
||||
match val {
|
||||
turso_core::Value::Null => Value::Null,
|
||||
turso_core::Value::Integer(n) => Value::Integer(n),
|
||||
turso_core::Value::Float(n) => Value::Real(n),
|
||||
turso_core::Value::Text(t) => Value::Text(t.into()),
|
||||
turso_core::Value::Blob(items) => Value::Blob(items),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Value> for turso_core::Value {
|
||||
fn from(val: Value) -> Self {
|
||||
match val {
|
||||
|
||||
@@ -176,6 +176,12 @@ impl From<String> for Text {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Text> for String {
|
||||
fn from(value: Text) -> Self {
|
||||
String::from_utf8(value.value).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for TextRef {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
@@ -956,6 +962,12 @@ impl ImmutableRecord {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_bin_record(payload: Vec<u8>) -> Self {
|
||||
Self {
|
||||
payload: Value::Blob(payload),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: inline the complete record parsing code here.
|
||||
// Its probably more efficient.
|
||||
pub fn get_values(&self) -> Vec<RefValue> {
|
||||
@@ -2436,6 +2448,22 @@ impl RawSlice {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseChangeType {
|
||||
Delete,
|
||||
Update { bin_record: Vec<u8> },
|
||||
Insert { bin_record: Vec<u8> },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseChange {
|
||||
pub change_id: i64,
|
||||
pub change_time: u64,
|
||||
pub change: DatabaseChangeType,
|
||||
pub table_name: String,
|
||||
pub id: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WalInsertInfo {
|
||||
pub page_no: usize,
|
||||
|
||||
19
packages/turso-sync/Cargo.toml
Normal file
19
packages/turso-sync/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "turso-sync"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
[dependencies]
|
||||
turso_core = { workspace = true }
|
||||
turso = { workspace = true }
|
||||
thiserror = "2.0.12"
|
||||
tracing = "0.1.41"
|
||||
|
||||
[dev-dependencies]
|
||||
ctor = "0.4.2"
|
||||
tempfile = "3.20.0"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
||||
tokio = { version = "1.46.1", features = ["macros", "rt-multi-thread"] }
|
||||
20
packages/turso-sync/README.md
Normal file
20
packages/turso-sync/README.md
Normal file
@@ -0,0 +1,20 @@
|
||||
# Turso sync package
|
||||
|
||||
turso-sync package extends turso-db embedded database with sync capabilities
|
||||
|
||||
> [!NOTE]
|
||||
> This package is experimental and, therefore, subject to change at any time.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
cargo add turso-sync
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
Check out the `examples/` directory for complete usage examples.
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
77
packages/turso-sync/examples/example_tape.rs
Normal file
77
packages/turso-sync/examples/example_tape.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use std::io::{self, Write};
|
||||
|
||||
use turso::Builder;
|
||||
use turso_sync::{
|
||||
database_tape::{DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseTape},
|
||||
types::DatabaseTapeOperation,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let db = Builder::new_local("local.db").build().await.unwrap();
|
||||
let db = DatabaseTape::new(db);
|
||||
|
||||
let conn = db.connect().await.unwrap();
|
||||
|
||||
loop {
|
||||
print!("> ");
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
let mut input = String::new();
|
||||
let bytes_read = io::stdin().read_line(&mut input).unwrap();
|
||||
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let trimmed = input.trim();
|
||||
if trimmed == ".exit" || trimmed == ".quit" {
|
||||
break;
|
||||
}
|
||||
if trimmed.starts_with(".undo ") || trimmed.starts_with(".redo ") {
|
||||
let first_change_id = Some(trimmed[".undo ".len()..].parse().unwrap());
|
||||
let mode = match &trimmed[0..(".undo".len())] {
|
||||
".undo" => DatabaseChangesIteratorMode::Revert,
|
||||
".redo" => DatabaseChangesIteratorMode::Apply,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut iterator = db
|
||||
.iterate_changes(DatabaseChangesIteratorOpts {
|
||||
first_change_id,
|
||||
mode,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let mut session = db.start_tape_session().await.unwrap();
|
||||
if let Some(change) = iterator.next().await.unwrap() {
|
||||
session.replay(change).await.unwrap();
|
||||
session.replay(DatabaseTapeOperation::Commit).await.unwrap();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
let mut stmt = conn.prepare(&input).await.unwrap();
|
||||
let mut rows = stmt.query(()).await.unwrap();
|
||||
while let Some(row) = rows.next().await.unwrap() {
|
||||
let mut values = vec![];
|
||||
for i in 0..row.column_count() {
|
||||
let value = row.get_value(i).unwrap();
|
||||
match value {
|
||||
turso::Value::Null => values.push("NULL".to_string()),
|
||||
turso::Value::Integer(x) => values.push(format!("{x}",)),
|
||||
turso::Value::Real(x) => values.push(format!("{x}")),
|
||||
turso::Value::Text(x) => values.push(format!("'{x}'")),
|
||||
turso::Value::Blob(x) => values.push(format!(
|
||||
"x'{}'",
|
||||
x.iter()
|
||||
.map(|x| format!("{x:02x}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(""),
|
||||
)),
|
||||
}
|
||||
}
|
||||
println!("{}", &values.join(" "));
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
478
packages/turso-sync/src/database_tape.rs
Normal file
478
packages/turso-sync/src/database_tape.rs
Normal file
@@ -0,0 +1,478 @@
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
types::{
|
||||
DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
|
||||
/// Simple wrapper over [turso::Database] which extends its intereface with few methods
|
||||
/// to collect changes made to the database and apply/revert arbitrary changes to the database
|
||||
pub struct DatabaseTape {
|
||||
inner: turso::Database,
|
||||
cdc_table: Arc<String>,
|
||||
pragma_query: String,
|
||||
}
|
||||
|
||||
const DEFAULT_CDC_TABLE_NAME: &str = "turso_cdc";
|
||||
const DEFAULT_CDC_MODE: &str = "full";
|
||||
const DEFAULT_CHANGES_BATCH_SIZE: usize = 100;
|
||||
const CDC_PRAGMA_NAME: &str = "unstable_capture_data_changes_conn";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseTapeOpts {
|
||||
pub cdc_table: Option<String>,
|
||||
pub cdc_mode: Option<String>,
|
||||
}
|
||||
|
||||
impl DatabaseTape {
|
||||
pub fn new(database: turso::Database) -> Self {
|
||||
let opts = DatabaseTapeOpts {
|
||||
cdc_table: None,
|
||||
cdc_mode: None,
|
||||
};
|
||||
Self::new_with_opts(database, opts)
|
||||
}
|
||||
pub fn new_with_opts(database: turso::Database, opts: DatabaseTapeOpts) -> Self {
|
||||
tracing::debug!("create local sync database with options {:?}", opts);
|
||||
let cdc_table = opts.cdc_table.unwrap_or(DEFAULT_CDC_TABLE_NAME.to_string());
|
||||
let cdc_mode = opts.cdc_mode.unwrap_or(DEFAULT_CDC_MODE.to_string());
|
||||
let pragma_query = format!("PRAGMA {CDC_PRAGMA_NAME}('{cdc_mode},{cdc_table}')");
|
||||
Self {
|
||||
inner: database,
|
||||
cdc_table: Arc::new(cdc_table.to_string()),
|
||||
pragma_query,
|
||||
}
|
||||
}
|
||||
pub async fn connect(&self) -> Result<turso::Connection> {
|
||||
let connection = self.inner.connect()?;
|
||||
tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection");
|
||||
connection.execute(&self.pragma_query, ()).await?;
|
||||
Ok(connection)
|
||||
}
|
||||
/// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table
|
||||
pub async fn iterate_changes(
|
||||
&self,
|
||||
opts: DatabaseChangesIteratorOpts,
|
||||
) -> Result<DatabaseChangesIterator> {
|
||||
tracing::debug!("opening changes iterator with options {:?}", opts);
|
||||
let conn = self.connect().await?;
|
||||
let query = opts.mode.query(&self.cdc_table, opts.batch_size);
|
||||
let query_stmt = conn.prepare(&query).await?;
|
||||
Ok(DatabaseChangesIterator {
|
||||
first_change_id: opts.first_change_id,
|
||||
batch: VecDeque::with_capacity(opts.batch_size),
|
||||
query_stmt,
|
||||
txn_boundary_returned: false,
|
||||
mode: opts.mode,
|
||||
})
|
||||
}
|
||||
/// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes]
|
||||
pub async fn start_tape_session(&self) -> Result<DatabaseReplaySession> {
|
||||
tracing::debug!("opening replay session");
|
||||
Ok(DatabaseReplaySession {
|
||||
conn: self.connect().await?,
|
||||
cached_delete_stmt: HashMap::new(),
|
||||
cached_insert_stmt: HashMap::new(),
|
||||
in_txn: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseChangesIteratorMode {
|
||||
Apply,
|
||||
Revert,
|
||||
}
|
||||
|
||||
impl DatabaseChangesIteratorMode {
|
||||
pub fn query(&self, table_name: &str, limit: usize) -> String {
|
||||
let (operation, order) = match self {
|
||||
DatabaseChangesIteratorMode::Apply => (">=", "ASC"),
|
||||
DatabaseChangesIteratorMode::Revert => ("<=", "DESC"),
|
||||
};
|
||||
format!(
|
||||
"SELECT * FROM {table_name} WHERE change_id {operation} ? ORDER BY change_id {order} LIMIT {limit}",
|
||||
)
|
||||
}
|
||||
pub fn first_id(&self) -> i64 {
|
||||
match self {
|
||||
DatabaseChangesIteratorMode::Apply => -1,
|
||||
DatabaseChangesIteratorMode::Revert => i64::MAX,
|
||||
}
|
||||
}
|
||||
pub fn next_id(&self, id: i64) -> i64 {
|
||||
match self {
|
||||
DatabaseChangesIteratorMode::Apply => id + 1,
|
||||
DatabaseChangesIteratorMode::Revert => id - 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseChangesIteratorOpts {
|
||||
pub first_change_id: Option<i64>,
|
||||
pub batch_size: usize,
|
||||
pub mode: DatabaseChangesIteratorMode,
|
||||
}
|
||||
|
||||
impl Default for DatabaseChangesIteratorOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
first_change_id: None,
|
||||
batch_size: DEFAULT_CHANGES_BATCH_SIZE,
|
||||
mode: DatabaseChangesIteratorMode::Apply,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatabaseChangesIterator {
|
||||
query_stmt: turso::Statement,
|
||||
first_change_id: Option<i64>,
|
||||
batch: VecDeque<DatabaseTapeRowChange>,
|
||||
txn_boundary_returned: bool,
|
||||
mode: DatabaseChangesIteratorMode,
|
||||
}
|
||||
|
||||
impl DatabaseChangesIterator {
|
||||
pub async fn next(&mut self) -> Result<Option<DatabaseTapeOperation>> {
|
||||
if self.batch.is_empty() {
|
||||
self.refill().await?;
|
||||
}
|
||||
// todo(sivukhin): iterator must be more clever about transaction boundaries - but for that we need to extend CDC table
|
||||
// for now, if iterator reach the end of CDC table - we are sure that this is a transaction boundary
|
||||
if let Some(change) = self.batch.pop_front() {
|
||||
self.txn_boundary_returned = false;
|
||||
Ok(Some(DatabaseTapeOperation::RowChange(change)))
|
||||
} else if !self.txn_boundary_returned {
|
||||
self.txn_boundary_returned = true;
|
||||
Ok(Some(DatabaseTapeOperation::Commit))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
async fn refill(&mut self) -> Result<()> {
|
||||
let change_id_filter = self.first_change_id.unwrap_or(self.mode.first_id());
|
||||
self.query_stmt.reset();
|
||||
|
||||
let mut rows = self.query_stmt.query((change_id_filter,)).await?;
|
||||
while let Some(row) = rows.next().await? {
|
||||
let database_change: DatabaseChange = row.try_into()?;
|
||||
let tape_change = match self.mode {
|
||||
DatabaseChangesIteratorMode::Apply => database_change.into_apply()?,
|
||||
DatabaseChangesIteratorMode::Revert => database_change.into_revert()?,
|
||||
};
|
||||
self.batch.push_back(tape_change);
|
||||
}
|
||||
let batch_len = self.batch.len();
|
||||
if batch_len > 0 {
|
||||
self.first_change_id = Some(self.mode.next_id(self.batch[batch_len - 1].change_id));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatabaseReplaySession {
|
||||
conn: turso::Connection,
|
||||
cached_delete_stmt: HashMap<String, turso::Statement>,
|
||||
cached_insert_stmt: HashMap<(String, usize), turso::Statement>,
|
||||
in_txn: bool,
|
||||
}
|
||||
|
||||
impl DatabaseReplaySession {
|
||||
pub async fn replay(&mut self, operation: DatabaseTapeOperation) -> Result<()> {
|
||||
match operation {
|
||||
DatabaseTapeOperation::Commit => {
|
||||
tracing::trace!("replay: commit replayed changes after transaction boundary");
|
||||
if self.in_txn {
|
||||
self.conn.execute("COMMIT", ()).await?;
|
||||
self.in_txn = false;
|
||||
}
|
||||
}
|
||||
DatabaseTapeOperation::RowChange(change) => {
|
||||
if !self.in_txn {
|
||||
tracing::trace!("replay: start txn for replaying changes");
|
||||
self.conn.execute("BEGIN", ()).await?;
|
||||
self.in_txn = true;
|
||||
}
|
||||
tracing::trace!("replay: change={:?}", change);
|
||||
let table_name = &change.table_name;
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete => {
|
||||
self.replay_delete(table_name, change.id).await?
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { bin_record } => {
|
||||
self.replay_delete(table_name, change.id).await?;
|
||||
let values = parse_bin_record(bin_record)?;
|
||||
self.replay_insert(table_name, change.id, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { bin_record } => {
|
||||
let values = parse_bin_record(bin_record)?;
|
||||
self.replay_insert(table_name, change.id, values).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_delete(&mut self, table_name: &str, id: i64) -> Result<()> {
|
||||
let stmt = self.cached_delete_stmt(table_name).await?;
|
||||
stmt.execute((id,)).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_insert(
|
||||
&mut self,
|
||||
table_name: &str,
|
||||
id: i64,
|
||||
mut values: Vec<turso::Value>,
|
||||
) -> Result<()> {
|
||||
let columns = values.len();
|
||||
let stmt = self.cached_insert_stmt(table_name, columns).await?;
|
||||
|
||||
values.push(turso::Value::Integer(id));
|
||||
let params = turso::params::Params::Positional(values);
|
||||
|
||||
stmt.execute(params).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn cached_delete_stmt(&mut self, table_name: &str) -> Result<&mut turso::Statement> {
|
||||
if !self.cached_delete_stmt.contains_key(table_name) {
|
||||
tracing::trace!("prepare delete statement for replay: table={}", table_name);
|
||||
let query = format!("DELETE FROM {table_name} WHERE rowid = ?");
|
||||
let stmt = self.conn.prepare(&query).await?;
|
||||
self.cached_delete_stmt.insert(table_name.to_string(), stmt);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared delete statement for replay: table={}",
|
||||
table_name
|
||||
);
|
||||
Ok(self.cached_delete_stmt.get_mut(table_name).unwrap())
|
||||
}
|
||||
async fn cached_insert_stmt(
|
||||
&mut self,
|
||||
table_name: &str,
|
||||
columns: usize,
|
||||
) -> Result<&mut turso::Statement> {
|
||||
let key = (table_name.to_string(), columns);
|
||||
if !self.cached_insert_stmt.contains_key(&key) {
|
||||
tracing::trace!(
|
||||
"prepare insert statement for replay: table={}, columns={}",
|
||||
table_name,
|
||||
columns
|
||||
);
|
||||
let mut table_info = self
|
||||
.conn
|
||||
.query(
|
||||
&format!("SELECT name FROM pragma_table_info('{table_name}')"),
|
||||
(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut column_names = Vec::with_capacity(columns + 1);
|
||||
while let Some(table_info_row) = table_info.next().await? {
|
||||
let value = table_info_row.get_value(0)?;
|
||||
column_names.push(value.as_text().expect("must be text").to_string());
|
||||
}
|
||||
column_names.push("rowid".to_string());
|
||||
|
||||
let placeholders = ["?"].repeat(columns + 1).join(",");
|
||||
let column_names = column_names.join(", ");
|
||||
let query = format!("INSERT INTO {table_name}({column_names}) VALUES ({placeholders})");
|
||||
let stmt = self.conn.prepare(&query).await?;
|
||||
self.cached_insert_stmt.insert(key.clone(), stmt);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared insert statement for replay: table={}, columns={}",
|
||||
table_name,
|
||||
columns
|
||||
);
|
||||
Ok(self.cached_insert_stmt.get_mut(&key).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso::Value>> {
|
||||
let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record);
|
||||
let mut cursor = turso_core::types::RecordCursor::new();
|
||||
let columns = cursor.count(&record);
|
||||
let mut values = Vec::with_capacity(columns);
|
||||
for i in 0..columns {
|
||||
let value = cursor.get_value(&record, i)?;
|
||||
values.push(value.to_owned().into());
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
use crate::database_tape::DatabaseTape;
|
||||
|
||||
async fn fetch_rows(conn: &turso::Connection, query: &str) -> Vec<Vec<turso::Value>> {
|
||||
let mut rows = vec![];
|
||||
let mut iterator = conn.query(query, ()).await.unwrap();
|
||||
while let Some(row) = iterator.next().await.unwrap() {
|
||||
let mut row_values = vec![];
|
||||
for i in 0..row.column_count() {
|
||||
row_values.push(row.get_value(i).unwrap());
|
||||
}
|
||||
rows.push(row_values);
|
||||
}
|
||||
rows
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_cdc() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
|
||||
let db1 = turso::Builder::new_local(db_path1).build().await.unwrap();
|
||||
let db1 = DatabaseTape::new(db1);
|
||||
let conn1 = db1.connect().await.unwrap();
|
||||
|
||||
let db2 = turso::Builder::new_local(db_path2).build().await.unwrap();
|
||||
let db2 = DatabaseTape::new(db2);
|
||||
let conn2 = db2.connect().await.unwrap();
|
||||
|
||||
conn1
|
||||
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE b(x INTEGER PRIMARY KEY, y, z);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE b(x INTEGER PRIMARY KEY, y, z);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
conn1
|
||||
.execute("INSERT INTO a VALUES (1, 'hello'), (2, 'turso')", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
conn1
|
||||
.execute(
|
||||
"INSERT INTO b VALUES (3, 'bye', 0.1), (4, 'limbo', 0.2)",
|
||||
(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut iterator = db1.iterate_changes(Default::default()).await.unwrap();
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM a").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(1),
|
||||
turso::Value::Text("hello".to_string())
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(2),
|
||||
turso::Value::Text("turso".to_string())
|
||||
],
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM b").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(3),
|
||||
turso::Value::Text("bye".to_string()),
|
||||
turso::Value::Real(0.1)
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(4),
|
||||
turso::Value::Text("limbo".to_string()),
|
||||
turso::Value::Real(0.2)
|
||||
],
|
||||
]
|
||||
);
|
||||
|
||||
conn1
|
||||
.execute("DELETE FROM b WHERE y = 'limbo'", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM a").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(1),
|
||||
turso::Value::Text("hello".to_string())
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(2),
|
||||
turso::Value::Text("turso".to_string())
|
||||
],
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM b").await,
|
||||
vec![vec![
|
||||
turso::Value::Integer(3),
|
||||
turso::Value::Text("bye".to_string()),
|
||||
turso::Value::Real(0.1)
|
||||
],]
|
||||
);
|
||||
|
||||
conn1
|
||||
.execute("UPDATE b SET y = x'deadbeef' WHERE x = 3", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM a").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(1),
|
||||
turso::Value::Text("hello".to_string())
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(2),
|
||||
turso::Value::Text("turso".to_string())
|
||||
],
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM b").await,
|
||||
vec![vec![
|
||||
turso::Value::Integer(3),
|
||||
turso::Value::Blob(vec![0xde, 0xad, 0xbe, 0xef]),
|
||||
turso::Value::Real(0.1)
|
||||
]]
|
||||
);
|
||||
}
|
||||
}
|
||||
19
packages/turso-sync/src/errors.rs
Normal file
19
packages/turso-sync/src/errors.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("database error: {0}")]
|
||||
TursoError(turso::Error),
|
||||
#[error("database tape error: {0}")]
|
||||
DatabaseTapeError(String),
|
||||
}
|
||||
|
||||
impl From<turso::Error> for Error {
|
||||
fn from(value: turso::Error) -> Self {
|
||||
Self::TursoError(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<turso_core::LimboError> for Error {
|
||||
fn from(value: turso_core::LimboError) -> Self {
|
||||
Self::TursoError(value.into())
|
||||
}
|
||||
}
|
||||
17
packages/turso-sync/src/lib.rs
Normal file
17
packages/turso-sync/src/lib.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
pub mod database_tape;
|
||||
pub mod errors;
|
||||
pub mod types;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, errors::Error>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[ctor::ctor]
|
||||
fn init() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
}
|
||||
}
|
||||
210
packages/turso-sync/src/types.rs
Normal file
210
packages/turso-sync/src/types.rs
Normal file
@@ -0,0 +1,210 @@
|
||||
use crate::{errors::Error, Result};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DatabaseChangeType {
|
||||
Delete,
|
||||
Update,
|
||||
Insert,
|
||||
}
|
||||
|
||||
/// [DatabaseChange] struct represents data from CDC table as-i
|
||||
/// (see `turso_cdc_table_columns` definition in turso-core)
|
||||
#[derive(Clone)]
|
||||
pub struct DatabaseChange {
|
||||
/// Monotonically incrementing change number
|
||||
pub change_id: i64,
|
||||
/// Unix timestamp of the change (not guaranteed to be strictly monotonic as host clocks can drift)
|
||||
pub change_time: u64,
|
||||
/// Type of the change
|
||||
pub change_type: DatabaseChangeType,
|
||||
/// Table of the change
|
||||
pub table_name: String,
|
||||
/// Rowid of changed row
|
||||
pub id: i64,
|
||||
/// Binary record of the row before the change, if CDC pragma set to either 'before' or 'full'
|
||||
pub before: Option<Vec<u8>>,
|
||||
/// Binary record of the row after the change, if CDC pragma set to either 'after' or 'full'
|
||||
pub after: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl DatabaseChange {
|
||||
/// Converts [DatabaseChange] into the operation which effect will be the application of the change
|
||||
pub fn into_apply(self) -> Result<DatabaseTapeRowChange> {
|
||||
let tape_change = match self.change_type {
|
||||
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Delete,
|
||||
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
|
||||
bin_record: self.after.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'after'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert {
|
||||
bin_record: self.after.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'after'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
};
|
||||
Ok(DatabaseTapeRowChange {
|
||||
change_id: self.change_id,
|
||||
change_time: self.change_time,
|
||||
change: tape_change,
|
||||
table_name: self.table_name,
|
||||
id: self.id,
|
||||
})
|
||||
}
|
||||
/// Converts [DatabaseChange] into the operation which effect will be the revert of the change
|
||||
pub fn into_revert(self) -> Result<DatabaseTapeRowChange> {
|
||||
let tape_change = match self.change_type {
|
||||
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Insert {
|
||||
bin_record: self.before.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'before'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
|
||||
bin_record: self.before.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'before'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete,
|
||||
};
|
||||
Ok(DatabaseTapeRowChange {
|
||||
change_id: self.change_id,
|
||||
change_time: self.change_time,
|
||||
change: tape_change,
|
||||
table_name: self.table_name,
|
||||
id: self.id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DatabaseChange {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DatabaseChange")
|
||||
.field("change_id", &self.change_id)
|
||||
.field("change_time", &self.change_time)
|
||||
.field("change_type", &self.change_type)
|
||||
.field("table_name", &self.table_name)
|
||||
.field("id", &self.id)
|
||||
.field("before.len()", &self.before.as_ref().map(|x| x.len()))
|
||||
.field("after.len()", &self.after.as_ref().map(|x| x.len()))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<turso::Row> for DatabaseChange {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(row: turso::Row) -> Result<Self> {
|
||||
let change_id = get_value_i64(&row, 0)?;
|
||||
let change_time = get_value_i64(&row, 1)? as u64;
|
||||
let change_type = get_value_i64(&row, 2)?;
|
||||
let table_name = get_value_text(&row, 3)?;
|
||||
let id = get_value_i64(&row, 4)?;
|
||||
let before = get_value_blob_or_null(&row, 5)?;
|
||||
let after = get_value_blob_or_null(&row, 6)?;
|
||||
|
||||
let change_type = match change_type {
|
||||
-1 => DatabaseChangeType::Delete,
|
||||
0 => DatabaseChangeType::Update,
|
||||
1 => DatabaseChangeType::Insert,
|
||||
v => {
|
||||
return Err(Error::DatabaseTapeError(format!(
|
||||
"unexpected change type: expected -1|0|1, got '{v:?}'"
|
||||
)))
|
||||
}
|
||||
};
|
||||
Ok(Self {
|
||||
change_id,
|
||||
change_time,
|
||||
change_type,
|
||||
table_name,
|
||||
id,
|
||||
before,
|
||||
after,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DatabaseTapeRowChangeType {
|
||||
Delete,
|
||||
Update { bin_record: Vec<u8> },
|
||||
Insert { bin_record: Vec<u8> },
|
||||
}
|
||||
|
||||
/// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary
|
||||
///
|
||||
/// This helps [crate::database_tape::DatabaseTapeSession] to properly maintain transaction state and COMMIT or ROLLBACK changes in appropriate time
|
||||
/// by consuming events from [crate::database_tape::DatabaseChangesIterator]
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseTapeOperation {
|
||||
RowChange(DatabaseTapeRowChange),
|
||||
Commit,
|
||||
}
|
||||
|
||||
/// [DatabaseTapeRowChange] is the specific operation over single row which can be performed on database
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseTapeRowChange {
|
||||
pub change_id: i64,
|
||||
pub change_time: u64,
|
||||
pub change: DatabaseTapeRowChangeType,
|
||||
pub table_name: String,
|
||||
pub id: i64,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DatabaseTapeRowChangeType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Delete => write!(f, "Delete"),
|
||||
Self::Update { bin_record } => f
|
||||
.debug_struct("Update")
|
||||
.field("bin_record.len()", &bin_record.len())
|
||||
.finish(),
|
||||
Self::Insert { bin_record } => f
|
||||
.debug_struct("Insert")
|
||||
.field("bin_record.len()", &bin_record.len())
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value(row: &turso::Row, index: usize) -> Result<turso::Value> {
|
||||
row.get_value(index).map_err(Error::TursoError)
|
||||
}
|
||||
|
||||
fn get_value_i64(row: &turso::Row, index: usize) -> Result<i64> {
|
||||
let v = get_value(row, index)?;
|
||||
match v {
|
||||
turso::Value::Integer(v) => Ok(v),
|
||||
v => Err(Error::DatabaseTapeError(format!(
|
||||
"column {index} type mismatch: expected integer, got '{v:?}'"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value_text(row: &turso::Row, index: usize) -> Result<String> {
|
||||
let v = get_value(row, index)?;
|
||||
match v {
|
||||
turso::Value::Text(x) => Ok(x),
|
||||
v => Err(Error::DatabaseTapeError(format!(
|
||||
"column {index} type mismatch: expected string, got '{v:?}'"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value_blob_or_null(row: &turso::Row, index: usize) -> Result<Option<Vec<u8>>> {
|
||||
let v = get_value(row, index)?;
|
||||
match v {
|
||||
turso::Value::Null => Ok(None),
|
||||
turso::Value::Blob(x) => Ok(Some(x)),
|
||||
v => Err(Error::DatabaseTapeError(format!(
|
||||
"column {index} type mismatch: expected blob, got '{v:?}'"
|
||||
))),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user