introduce turso-sync package with simple implementation of DatabaseTape

This commit is contained in:
Nikita Sivukhin
2025-07-28 17:34:27 +04:00
parent 8adc807cd7
commit b27bc05c7d
6 changed files with 627 additions and 0 deletions

View File

@@ -26,6 +26,7 @@ members = [
"testing/sqlite_test_ext",
"tests",
"vendored/sqlite3-parser/sqlparser_bench",
"packages/turso-sync",
]
exclude = ["perf/latency/limbo"]
@@ -37,6 +38,7 @@ license = "MIT"
repository = "https://github.com/tursodatabase/turso"
[workspace.dependencies]
turso = { path = "bindings/rust", version = "0.1.3" }
limbo_completion = { path = "extensions/completion", version = "0.1.3" }
turso_core = { path = "core", version = "0.1.3" }
limbo_crypto = { path = "extensions/crypto", version = "0.1.3" }

View File

@@ -0,0 +1,29 @@
[package]
name = "turso_sync"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
[dependencies]
turso_core = { workspace = true }
turso = { workspace = true }
http = "1.3.1"
http-body-util = "0.1.3"
hyper = { version = "1.6.0", features = ["client", "http1"] }
hyper-rustls = "0.27.7"
hyper-util = { version = "0.1.16", features = ["http1", "client"] }
rustls = "0.23.29"
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror = "2.0.12"
tokio = { version = "1.46.1", features = ["fs", "io-util", "rt", "test-util"] }
tracing = "0.1.41"
uuid = "1.17.0"
[dev-dependencies]
ctor = "0.4.2"
tempfile = "3.20.0"
tokio = { version = "1.46.1", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

View File

@@ -0,0 +1,77 @@
use std::io::{self, Write};
use turso::Builder;
use turso_sync::{
database_tape::{DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseTape},
types::DatabaseTapeOperation,
};
#[tokio::main]
async fn main() {
let db = Builder::new_local("local.db").build().await.unwrap();
let db = DatabaseTape::new(db);
let conn = db.connect().await.unwrap();
loop {
print!("> ");
io::stdout().flush().unwrap();
let mut input = String::new();
let bytes_read = io::stdin().read_line(&mut input).unwrap();
if bytes_read == 0 {
break;
}
let trimmed = input.trim();
if trimmed == ".exit" || trimmed == ".quit" {
break;
}
if trimmed.starts_with(".undo ") || trimmed.starts_with(".redo ") {
let first_change_id = Some(trimmed[".undo ".len()..].parse().unwrap());
let mode = match &trimmed[0..(".undo".len())] {
".undo" => DatabaseChangesIteratorMode::Revert,
".redo" => DatabaseChangesIteratorMode::Apply,
_ => unreachable!(),
};
let mut iterator = db
.iterate_changes(DatabaseChangesIteratorOpts {
first_change_id,
mode,
..Default::default()
})
.await
.unwrap();
let mut session = db.start_tape_session().await.unwrap();
if let Some(change) = iterator.next().await.unwrap() {
session.replay(change).await.unwrap();
session.replay(DatabaseTapeOperation::Commit).await.unwrap();
}
continue;
}
let mut stmt = conn.prepare(&input).await.unwrap();
let mut rows = stmt.query(()).await.unwrap();
while let Some(row) = rows.next().await.unwrap() {
let mut values = vec![];
for i in 0..row.column_count() {
let value = row.get_value(i).unwrap();
match value {
turso::Value::Null => values.push(format!("NULL")),
turso::Value::Integer(x) => values.push(format!("{}", x)),
turso::Value::Real(x) => values.push(format!("{}", x)),
turso::Value::Text(x) => values.push(format!("'{}'", x)),
turso::Value::Blob(x) => values.push(format!(
"x'{}'",
x.iter()
.map(|x| format!("{:02x}", x))
.collect::<Vec<_>>()
.join(""),
)),
}
}
println!("{}", &values.join(" "));
io::stdout().flush().unwrap();
}
}
}

View File

@@ -0,0 +1,484 @@
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use crate::{
errors::Error,
types::{
DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType,
},
Result,
};
/// Simple wrapper over [turso::Database] which extends its intereface with few methods
/// to collect changes made to the database and apply/revert arbitrary changes to the database
pub struct DatabaseTape {
inner: turso::Database,
cdc_table: Arc<String>,
pragma_query: String,
}
const DEFAULT_CDC_TABLE_NAME: &str = "turso_cdc";
const DEFAULT_CDC_MODE: &str = "full";
const DEFAULT_CHANGES_BATCH_SIZE: usize = 100;
const CDC_PRAGMA_NAME: &str = "unstable_capture_data_changes_conn";
#[derive(Debug)]
pub struct DatabaseTapeOpts {
pub cdc_table: Option<String>,
pub cdc_mode: Option<String>,
}
impl DatabaseTape {
pub fn new(database: turso::Database) -> Self {
let opts = DatabaseTapeOpts {
cdc_table: None,
cdc_mode: None,
};
Self::new_with_opts(database, opts)
}
pub fn new_with_opts(database: turso::Database, opts: DatabaseTapeOpts) -> Self {
tracing::debug!("create local sync database with options {:?}", opts);
let cdc_table = opts.cdc_table.unwrap_or(DEFAULT_CDC_TABLE_NAME.to_string());
let cdc_mode = opts.cdc_mode.unwrap_or(DEFAULT_CDC_MODE.to_string());
let pragma_query = format!("PRAGMA {}('{},{}')", CDC_PRAGMA_NAME, cdc_mode, cdc_table);
Self {
inner: database,
cdc_table: Arc::new(cdc_table.to_string()),
pragma_query,
}
}
pub async fn connect(&self) -> Result<turso::Connection> {
let connection = self.inner.connect()?;
tracing::debug!("set '{}' for new connection", CDC_PRAGMA_NAME);
connection.execute(&self.pragma_query, ()).await?;
Ok(connection)
}
/// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table
pub async fn iterate_changes(
&self,
opts: DatabaseChangesIteratorOpts,
) -> Result<DatabaseChangesIterator> {
tracing::debug!("opening changes iterator with options {:?}", opts);
let conn = self.connect().await?;
let query = opts.mode.query(&self.cdc_table, opts.batch_size);
let query_stmt = conn.prepare(&query).await?;
Ok(DatabaseChangesIterator {
first_change_id: opts.first_change_id,
batch: VecDeque::with_capacity(opts.batch_size),
query_stmt,
txn_boundary_returned: false,
mode: opts.mode,
})
}
/// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes]
pub async fn start_tape_session(&self) -> Result<DatabaseReplaySession> {
tracing::debug!("opening replay session");
Ok(DatabaseReplaySession {
conn: self.connect().await?,
cached_delete_stmt: HashMap::new(),
cached_insert_stmt: HashMap::new(),
in_txn: false,
})
}
}
#[derive(Debug)]
pub enum DatabaseChangesIteratorMode {
Apply,
Revert,
}
impl DatabaseChangesIteratorMode {
pub fn query(&self, table_name: &str, limit: usize) -> String {
let (operation, order) = match self {
DatabaseChangesIteratorMode::Apply => (">=", "ASC"),
DatabaseChangesIteratorMode::Revert => ("<=", "DESC"),
};
format!(
"SELECT * FROM {} WHERE change_id {} ? ORDER BY change_id {} LIMIT {}",
table_name, operation, order, limit
)
}
pub fn first_id(&self) -> i64 {
match self {
DatabaseChangesIteratorMode::Apply => -1,
DatabaseChangesIteratorMode::Revert => i64::MAX,
}
}
pub fn next_id(&self, id: i64) -> i64 {
match self {
DatabaseChangesIteratorMode::Apply => id + 1,
DatabaseChangesIteratorMode::Revert => id - 1,
}
}
}
#[derive(Debug)]
pub struct DatabaseChangesIteratorOpts {
pub first_change_id: Option<i64>,
pub batch_size: usize,
pub mode: DatabaseChangesIteratorMode,
}
impl Default for DatabaseChangesIteratorOpts {
fn default() -> Self {
Self {
first_change_id: None,
batch_size: DEFAULT_CHANGES_BATCH_SIZE,
mode: DatabaseChangesIteratorMode::Apply,
}
}
}
pub struct DatabaseChangesIterator {
query_stmt: turso::Statement,
first_change_id: Option<i64>,
batch: VecDeque<DatabaseTapeRowChange>,
txn_boundary_returned: bool,
mode: DatabaseChangesIteratorMode,
}
impl DatabaseChangesIterator {
pub async fn next(&mut self) -> Result<Option<DatabaseTapeOperation>> {
if self.batch.is_empty() {
self.refill().await?;
}
// todo(sivukhin): iterator must be more clever about transaction boundaries - but for that we need to extend CDC table
// for now, if iterator reach the end of CDC table - we are sure that this is a transaction boundary
if let Some(change) = self.batch.pop_front() {
self.txn_boundary_returned = false;
Ok(Some(DatabaseTapeOperation::RowChange(change)))
} else if !self.txn_boundary_returned {
self.txn_boundary_returned = true;
Ok(Some(DatabaseTapeOperation::Commit))
} else {
Ok(None)
}
}
async fn refill(&mut self) -> Result<()> {
let change_id_filter = self.first_change_id.unwrap_or(self.mode.first_id());
self.query_stmt.reset();
let mut rows = self.query_stmt.query((change_id_filter,)).await?;
while let Some(row) = rows.next().await? {
let database_change: DatabaseChange = row.try_into()?;
let tape_change = match self.mode {
DatabaseChangesIteratorMode::Apply => database_change.into_apply()?,
DatabaseChangesIteratorMode::Revert => database_change.into_revert()?,
};
self.batch.push_back(tape_change);
}
let batch_len = self.batch.len();
if batch_len > 0 {
self.first_change_id = Some(self.mode.next_id(self.batch[batch_len - 1].change_id));
}
Ok(())
}
}
pub struct DatabaseReplaySession {
conn: turso::Connection,
cached_delete_stmt: HashMap<String, turso::Statement>,
cached_insert_stmt: HashMap<(String, usize), turso::Statement>,
in_txn: bool,
}
impl DatabaseReplaySession {
pub async fn replay(&mut self, operation: DatabaseTapeOperation) -> Result<()> {
match operation {
DatabaseTapeOperation::Commit => {
tracing::trace!("replay: commit replayed changes after transaction boundary");
if self.in_txn {
self.conn.execute("COMMIT", ()).await?;
self.in_txn = false;
}
}
DatabaseTapeOperation::RowChange(change) => {
if !self.in_txn {
tracing::trace!("replay: start txn for replaying changes");
self.conn.execute("BEGIN", ()).await?;
self.in_txn = true;
}
tracing::trace!("replay: change={:?}", change);
let table_name = &change.table_name;
match change.change {
DatabaseTapeRowChangeType::Delete => {
self.replay_delete(table_name, change.id).await?
}
DatabaseTapeRowChangeType::Update { bin_record } => {
self.replay_delete(table_name, change.id).await?;
let values = parse_bin_record(bin_record)?;
self.replay_insert(table_name, change.id, values).await?;
}
DatabaseTapeRowChangeType::Insert { bin_record } => {
let values = parse_bin_record(bin_record)?;
self.replay_insert(table_name, change.id, values).await?;
}
}
}
}
Ok(())
}
async fn replay_delete(&mut self, table_name: &str, id: i64) -> Result<()> {
let stmt = self.cached_delete_stmt(table_name).await?;
stmt.execute((id,)).await?;
Ok(())
}
async fn replay_insert(
&mut self,
table_name: &str,
id: i64,
mut values: Vec<turso::Value>,
) -> Result<()> {
let columns = values.len();
let stmt = self.cached_insert_stmt(table_name, columns).await?;
values.push(turso::Value::Integer(id));
let params = turso::params::Params::Positional(values);
stmt.execute(params).await?;
Ok(())
}
async fn cached_delete_stmt(&mut self, table_name: &str) -> Result<&mut turso::Statement> {
if !self.cached_delete_stmt.contains_key(table_name) {
tracing::trace!("prepare delete statement for replay: table={}", table_name);
let query = format!("DELETE FROM {} WHERE rowid = ?", table_name);
let stmt = self.conn.prepare(&query).await?;
self.cached_delete_stmt.insert(table_name.to_string(), stmt);
}
tracing::trace!(
"ready to use prepared delete statement for replay: table={}",
table_name
);
Ok(self.cached_delete_stmt.get_mut(table_name).unwrap())
}
async fn cached_insert_stmt(
&mut self,
table_name: &str,
columns: usize,
) -> Result<&mut turso::Statement> {
let key = (table_name.to_string(), columns);
if !self.cached_insert_stmt.contains_key(&key) {
tracing::trace!(
"prepare insert statement for replay: table={}, columns={}",
table_name,
columns
);
let mut table_info = self
.conn
.query(
&format!("SELECT name FROM pragma_table_info('{}')", table_name),
(),
)
.await?;
let mut column_names = Vec::with_capacity(columns + 1);
while let Some(table_info_row) = table_info.next().await? {
let value = table_info_row.get_value(0)?;
column_names.push(value.as_text().expect("must be text").to_string());
}
column_names.push("rowid".to_string());
let placeholders = ["?"].repeat(columns + 1).join(",");
let query = format!(
"INSERT INTO {}({}) VALUES ({})",
table_name,
column_names.join(", "),
placeholders
);
let stmt = self.conn.prepare(&query).await?;
self.cached_insert_stmt.insert(key.clone(), stmt);
}
tracing::trace!(
"ready to use prepared insert statement for replay: table={}, columns={}",
table_name,
columns
);
Ok(self.cached_insert_stmt.get_mut(&key).unwrap())
}
}
fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso::Value>> {
let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record);
let mut cursor = turso_core::types::RecordCursor::new();
let columns = cursor.count(&record);
let mut values = Vec::with_capacity(columns);
for i in 0..columns {
let value = cursor.get_value(&record, i)?;
values.push(value.to_owned().into());
}
Ok(values)
}
#[cfg(test)]
mod tests {
use tempfile::NamedTempFile;
use crate::database_tape::DatabaseTape;
async fn fetch_rows(conn: &turso::Connection, query: &str) -> Vec<Vec<turso::Value>> {
let mut rows = vec![];
let mut iterator = conn.query(query, ()).await.unwrap();
while let Some(row) = iterator.next().await.unwrap() {
let mut row_values = vec![];
for i in 0..row.column_count() {
row_values.push(row.get_value(i).unwrap());
}
rows.push(row_values);
}
rows
}
#[tokio::test]
async fn test_database_cdc() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let db_path2 = temp_file2.path().to_str().unwrap();
let db1 = turso::Builder::new_local(db_path1).build().await.unwrap();
let db1 = DatabaseTape::new(db1);
let conn1 = db1.connect().await.unwrap();
let db2 = turso::Builder::new_local(db_path2).build().await.unwrap();
let db2 = DatabaseTape::new(db2);
let conn2 = db2.connect().await.unwrap();
conn1
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
.await
.unwrap();
conn1
.execute("CREATE TABLE b(x INTEGER PRIMARY KEY, y, z);", ())
.await
.unwrap();
conn2
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
.await
.unwrap();
conn2
.execute("CREATE TABLE b(x INTEGER PRIMARY KEY, y, z);", ())
.await
.unwrap();
conn1
.execute("INSERT INTO a VALUES (1, 'hello'), (2, 'turso')", ())
.await
.unwrap();
conn1
.execute(
"INSERT INTO b VALUES (3, 'bye', 0.1), (4, 'limbo', 0.2)",
(),
)
.await
.unwrap();
let mut iterator = db1.iterate_changes(Default::default()).await.unwrap();
{
let mut replay = db2.start_tape_session().await.unwrap();
while let Some(change) = iterator.next().await.unwrap() {
replay.replay(change).await.unwrap();
}
}
assert_eq!(
fetch_rows(&conn2, "SELECT * FROM a").await,
vec![
vec![
turso::Value::Integer(1),
turso::Value::Text("hello".to_string())
],
vec![
turso::Value::Integer(2),
turso::Value::Text("turso".to_string())
],
]
);
assert_eq!(
fetch_rows(&conn2, "SELECT * FROM b").await,
vec![
vec![
turso::Value::Integer(3),
turso::Value::Text("bye".to_string()),
turso::Value::Real(0.1)
],
vec![
turso::Value::Integer(4),
turso::Value::Text("limbo".to_string()),
turso::Value::Real(0.2)
],
]
);
conn1
.execute("DELETE FROM b WHERE y = 'limbo'", ())
.await
.unwrap();
{
let mut replay = db2.start_tape_session().await.unwrap();
while let Some(change) = iterator.next().await.unwrap() {
replay.replay(change).await.unwrap();
}
}
assert_eq!(
fetch_rows(&conn2, "SELECT * FROM a").await,
vec![
vec![
turso::Value::Integer(1),
turso::Value::Text("hello".to_string())
],
vec![
turso::Value::Integer(2),
turso::Value::Text("turso".to_string())
],
]
);
assert_eq!(
fetch_rows(&conn2, "SELECT * FROM b").await,
vec![vec![
turso::Value::Integer(3),
turso::Value::Text("bye".to_string()),
turso::Value::Real(0.1)
],]
);
conn1
.execute("UPDATE b SET y = x'deadbeef' WHERE x = 3", ())
.await
.unwrap();
{
let mut replay = db2.start_tape_session().await.unwrap();
while let Some(change) = iterator.next().await.unwrap() {
replay.replay(change).await.unwrap();
}
}
assert_eq!(
fetch_rows(&conn2, "SELECT * FROM a").await,
vec![
vec![
turso::Value::Integer(1),
turso::Value::Text("hello".to_string())
],
vec![
turso::Value::Integer(2),
turso::Value::Text("turso".to_string())
],
]
);
assert_eq!(
fetch_rows(&conn2, "SELECT * FROM b").await,
vec![vec![
turso::Value::Integer(3),
turso::Value::Blob(vec![0xde, 0xad, 0xbe, 0xef]),
turso::Value::Real(0.1)
]]
);
}
}

View File

@@ -0,0 +1,19 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("database error: {0}")]
TursoError(turso::Error),
#[error("database tape error: {0}")]
DatabaseTapeError(String),
}
impl From<turso::Error> for Error {
fn from(value: turso::Error) -> Self {
Self::TursoError(value)
}
}
impl From<turso_core::LimboError> for Error {
fn from(value: turso_core::LimboError) -> Self {
Self::TursoError(value.into())
}
}

View File

@@ -0,0 +1,16 @@
pub mod database_tape;
pub mod errors;
pub type Result<T> = std::result::Result<T, errors::Error>;
#[cfg(test)]
mod tests {
use tracing_subscriber::EnvFilter;
#[ctor::ctor]
fn init() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
}
}