mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-05 17:24:21 +01:00
Merge 'improve sync engine' from Nikita Sivukhin
This PR makes plenty of changes to the sync engine among which the main
ones are:
1. Now, we have only single database file which we use to directly
execute `pull` and `push` requests (so we don't have `draft` / `synced`
databases)
2. Last-write-win strategy were fixed a little bit - because before this
PR insert-insert conflict wasn't resolved automatically
3. Now sync engine can apply arbitrary `transform` function to the
logical changes
4. Sync-engine-aware checkpoint was implemented. Now, database created
by sync-engine has explicit `checkpoint()` method which under the hood
will use additional file to save frames needed for revert operation
during pull
5. Pull operation were separated into 2 phases internally: wait for
changes & apply changes
* The problem is that pull operation itself (e.g. apply) right now
require exclusive lock to the sync engine and if user wants to pull &
push independently this will be problematic (as pull will lock the db
and push will never succeed)
6. Added support for V1 pull protocol
7. Exposed simple `stats()` method which return amount of pending cdc
operations and current wal size
Closes #2810
This commit is contained in:
35
Cargo.lock
generated
35
Cargo.lock
generated
@@ -2838,6 +2838,29 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.14.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.14.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.14.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "py-turso"
|
||||
version = "0.1.4"
|
||||
@@ -3159,6 +3182,16 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "roaring"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f08d6a905edb32d74a5d5737a0c9d7e950c312f3c46cb0ca0a2ca09ea11878a0"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rstest"
|
||||
version = "0.18.2"
|
||||
@@ -4199,8 +4232,10 @@ dependencies = [
|
||||
"futures",
|
||||
"genawaiter",
|
||||
"http",
|
||||
"prost",
|
||||
"rand 0.9.2",
|
||||
"rand_chacha 0.9.0",
|
||||
"roaring",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
|
||||
@@ -17,6 +17,8 @@ genawaiter = { version = "0.99.1", default-features = false }
|
||||
http = "1.3.1"
|
||||
uuid = "1.17.0"
|
||||
base64 = "0.22.1"
|
||||
prost = "0.14.1"
|
||||
roaring = "0.11.2"
|
||||
|
||||
[dev-dependencies]
|
||||
ctor = "0.4.2"
|
||||
|
||||
@@ -1,29 +1,110 @@
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use crate::{
|
||||
database_tape::{run_stmt_once, DatabaseReplaySessionOpts},
|
||||
errors::Error,
|
||||
types::{Coro, DatabaseChangeType, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
|
||||
types::{
|
||||
Coro, DatabaseChangeType, DatabaseRowMutation, DatabaseTapeRowChange,
|
||||
DatabaseTapeRowChangeType,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
|
||||
pub struct DatabaseReplayGenerator {
|
||||
pub struct DatabaseReplayGenerator<Ctx = ()> {
|
||||
pub conn: Arc<turso_core::Connection>,
|
||||
pub opts: DatabaseReplaySessionOpts,
|
||||
pub opts: DatabaseReplaySessionOpts<Ctx>,
|
||||
}
|
||||
|
||||
pub struct ReplayInfo {
|
||||
pub change_type: DatabaseChangeType,
|
||||
pub query: String,
|
||||
pub pk_column_indices: Option<Vec<usize>>,
|
||||
pub column_names: Vec<String>,
|
||||
pub is_ddl_replay: bool,
|
||||
}
|
||||
|
||||
const SQLITE_SCHEMA_TABLE: &str = "sqlite_schema";
|
||||
impl DatabaseReplayGenerator {
|
||||
pub fn new(conn: Arc<turso_core::Connection>, opts: DatabaseReplaySessionOpts) -> Self {
|
||||
impl<Ctx> DatabaseReplayGenerator<Ctx> {
|
||||
pub fn new(conn: Arc<turso_core::Connection>, opts: DatabaseReplaySessionOpts<Ctx>) -> Self {
|
||||
Self { conn, opts }
|
||||
}
|
||||
pub fn create_mutation(
|
||||
&self,
|
||||
info: &ReplayInfo,
|
||||
change: &DatabaseTapeRowChange,
|
||||
) -> Result<DatabaseRowMutation> {
|
||||
match &change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => Ok(DatabaseRowMutation {
|
||||
change_time: change.change_time,
|
||||
table_name: change.table_name.to_string(),
|
||||
id: change.id,
|
||||
change_type: info.change_type,
|
||||
before: Some(self.create_row_full(info, before)),
|
||||
after: None,
|
||||
updates: None,
|
||||
}),
|
||||
DatabaseTapeRowChangeType::Insert { after } => Ok(DatabaseRowMutation {
|
||||
change_time: change.change_time,
|
||||
table_name: change.table_name.to_string(),
|
||||
id: change.id,
|
||||
change_type: info.change_type,
|
||||
before: None,
|
||||
after: Some(self.create_row_full(info, after)),
|
||||
updates: None,
|
||||
}),
|
||||
DatabaseTapeRowChangeType::Update {
|
||||
before,
|
||||
after,
|
||||
updates,
|
||||
} => Ok(DatabaseRowMutation {
|
||||
change_time: change.change_time,
|
||||
table_name: change.table_name.to_string(),
|
||||
id: change.id,
|
||||
change_type: info.change_type,
|
||||
before: Some(self.create_row_full(info, before)),
|
||||
after: Some(self.create_row_full(info, after)),
|
||||
updates: updates
|
||||
.as_ref()
|
||||
.map(|updates| self.create_row_update(info, updates)),
|
||||
}),
|
||||
}
|
||||
}
|
||||
fn create_row_full(
|
||||
&self,
|
||||
info: &ReplayInfo,
|
||||
values: &[turso_core::Value],
|
||||
) -> HashMap<String, turso_core::Value> {
|
||||
let mut row = HashMap::with_capacity(info.column_names.len());
|
||||
for (i, value) in values.iter().enumerate() {
|
||||
row.insert(info.column_names[i].clone(), value.clone());
|
||||
}
|
||||
row
|
||||
}
|
||||
fn create_row_update(
|
||||
&self,
|
||||
info: &ReplayInfo,
|
||||
updates: &[turso_core::Value],
|
||||
) -> HashMap<String, turso_core::Value> {
|
||||
let mut row = HashMap::with_capacity(info.column_names.len());
|
||||
assert!(updates.len() % 2 == 0);
|
||||
let columns_cnt = updates.len() / 2;
|
||||
for (i, value) in updates.iter().take(columns_cnt).enumerate() {
|
||||
let updated = match value {
|
||||
turso_core::Value::Integer(x @ (1 | 0)) => *x > 0,
|
||||
_ => {
|
||||
panic!("unexpected 'changes' binary record first-half component: {value:?}")
|
||||
}
|
||||
};
|
||||
if !updated {
|
||||
continue;
|
||||
}
|
||||
row.insert(
|
||||
info.column_names[i].clone(),
|
||||
updates[columns_cnt + i].clone(),
|
||||
);
|
||||
}
|
||||
row
|
||||
}
|
||||
pub fn replay_values(
|
||||
&self,
|
||||
info: &ReplayInfo,
|
||||
@@ -89,9 +170,9 @@ impl DatabaseReplayGenerator {
|
||||
}
|
||||
pub async fn replay_info(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
coro: &Coro<Ctx>,
|
||||
change: &DatabaseTapeRowChange,
|
||||
) -> Result<Vec<ReplayInfo>> {
|
||||
) -> Result<ReplayInfo> {
|
||||
tracing::trace!("replay: change={:?}", change);
|
||||
let table_name = &change.table_name;
|
||||
|
||||
@@ -117,9 +198,10 @@ impl DatabaseReplayGenerator {
|
||||
change_type: DatabaseChangeType::Delete,
|
||||
query,
|
||||
pk_column_indices: None,
|
||||
column_names: Vec::new(),
|
||||
is_ddl_replay: true,
|
||||
};
|
||||
Ok(vec![delete])
|
||||
Ok(delete)
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
assert!(after.len() == 5);
|
||||
@@ -133,9 +215,10 @@ impl DatabaseReplayGenerator {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query: sql.as_str().to_string(),
|
||||
pk_column_indices: None,
|
||||
column_names: Vec::new(),
|
||||
is_ddl_replay: true,
|
||||
};
|
||||
Ok(vec![insert])
|
||||
Ok(insert)
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { updates, .. } => {
|
||||
let Some(updates) = updates else {
|
||||
@@ -155,16 +238,17 @@ impl DatabaseReplayGenerator {
|
||||
change_type: DatabaseChangeType::Update,
|
||||
query: ddl_stmt.as_str().to_string(),
|
||||
pk_column_indices: None,
|
||||
column_names: Vec::new(),
|
||||
is_ddl_replay: true,
|
||||
};
|
||||
Ok(vec![update])
|
||||
Ok(update)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match &change.change {
|
||||
DatabaseTapeRowChangeType::Delete { .. } => {
|
||||
let delete = self.delete_query(coro, table_name).await?;
|
||||
Ok(vec![delete])
|
||||
Ok(delete)
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { updates, after, .. } => {
|
||||
if let Some(updates) = updates {
|
||||
@@ -178,32 +262,160 @@ impl DatabaseReplayGenerator {
|
||||
});
|
||||
}
|
||||
let update = self.update_query(coro, table_name, &columns).await?;
|
||||
Ok(vec![update])
|
||||
Ok(update)
|
||||
} else {
|
||||
let delete = self.delete_query(coro, table_name).await?;
|
||||
let insert = self.insert_query(coro, table_name, after.len()).await?;
|
||||
Ok(vec![delete, insert])
|
||||
let columns = [true].repeat(after.len());
|
||||
let update = self.update_query(coro, table_name, &columns).await?;
|
||||
Ok(update)
|
||||
}
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let insert = self.insert_query(coro, table_name, after.len()).await?;
|
||||
Ok(vec![insert])
|
||||
Ok(insert)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(crate) async fn update_query(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
coro: &Coro<Ctx>,
|
||||
table_name: &str,
|
||||
columns: &[bool],
|
||||
) -> Result<ReplayInfo> {
|
||||
let (column_names, pk_column_indices) = self.table_columns_info(coro, table_name).await?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
let mut column_updates = Vec::with_capacity(1);
|
||||
for &idx in &pk_column_indices {
|
||||
pk_predicates.push(format!("{} = ?", column_names[idx]));
|
||||
}
|
||||
for (idx, name) in column_names.iter().enumerate() {
|
||||
if columns[idx] {
|
||||
column_updates.push(format!("{name} = ?"));
|
||||
}
|
||||
}
|
||||
let (query, pk_column_indices) =
|
||||
if self.opts.use_implicit_rowid || pk_column_indices.is_empty() {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE rowid = ?",
|
||||
column_updates.join(", ")
|
||||
),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE {}",
|
||||
column_updates.join(", "),
|
||||
pk_predicates.join(" AND ")
|
||||
),
|
||||
Some(pk_column_indices),
|
||||
)
|
||||
};
|
||||
Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Update,
|
||||
query,
|
||||
column_names,
|
||||
pk_column_indices,
|
||||
is_ddl_replay: false,
|
||||
})
|
||||
}
|
||||
pub(crate) async fn insert_query(
|
||||
&self,
|
||||
coro: &Coro<Ctx>,
|
||||
table_name: &str,
|
||||
columns: usize,
|
||||
) -> Result<ReplayInfo> {
|
||||
let (mut column_names, pk_column_indices) =
|
||||
self.table_columns_info(coro, table_name).await?;
|
||||
let conflict_clause = if !pk_column_indices.is_empty() {
|
||||
let mut pk_column_names = Vec::new();
|
||||
for &idx in &pk_column_indices {
|
||||
pk_column_names.push(column_names[idx].clone());
|
||||
}
|
||||
let mut update_clauses = Vec::new();
|
||||
for name in &column_names {
|
||||
update_clauses.push(format!("{name} = excluded.{name}"));
|
||||
}
|
||||
format!(
|
||||
"ON CONFLICT({}) DO UPDATE SET {}",
|
||||
pk_column_names.join(","),
|
||||
update_clauses.join(",")
|
||||
)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
if !self.opts.use_implicit_rowid {
|
||||
let placeholders = ["?"].repeat(columns).join(",");
|
||||
let query =
|
||||
format!("INSERT INTO {table_name} VALUES ({placeholders}){conflict_clause}");
|
||||
return Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query,
|
||||
pk_column_indices: None,
|
||||
column_names,
|
||||
is_ddl_replay: false,
|
||||
});
|
||||
};
|
||||
let original_column_names = column_names.clone();
|
||||
column_names.push("rowid".to_string());
|
||||
|
||||
let placeholders = ["?"].repeat(columns + 1).join(",");
|
||||
let column_names = column_names.join(", ");
|
||||
let query = format!("INSERT INTO {table_name}({column_names}) VALUES ({placeholders})");
|
||||
Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query,
|
||||
column_names: original_column_names,
|
||||
pk_column_indices: None,
|
||||
is_ddl_replay: false,
|
||||
})
|
||||
}
|
||||
pub(crate) async fn delete_query(
|
||||
&self,
|
||||
coro: &Coro<Ctx>,
|
||||
table_name: &str,
|
||||
) -> Result<ReplayInfo> {
|
||||
let (column_names, pk_column_indices) = self.table_columns_info(coro, table_name).await?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
for &idx in &pk_column_indices {
|
||||
pk_predicates.push(format!("{} = ?", column_names[idx]));
|
||||
}
|
||||
let use_implicit_rowid = self.opts.use_implicit_rowid;
|
||||
if pk_column_indices.is_empty() || use_implicit_rowid {
|
||||
let query = format!("DELETE FROM {table_name} WHERE rowid = ?");
|
||||
tracing::trace!("delete_query: table_name={table_name}, query={query}, use_implicit_rowid={use_implicit_rowid}");
|
||||
return Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Delete,
|
||||
query,
|
||||
column_names,
|
||||
pk_column_indices: None,
|
||||
is_ddl_replay: false,
|
||||
});
|
||||
}
|
||||
let pk_predicates = pk_predicates.join(" AND ");
|
||||
let query = format!("DELETE FROM {table_name} WHERE {pk_predicates}");
|
||||
|
||||
tracing::trace!("delete_query: table_name={table_name}, query={query}, use_implicit_rowid={use_implicit_rowid}");
|
||||
Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Delete,
|
||||
query,
|
||||
column_names,
|
||||
pk_column_indices: Some(pk_column_indices),
|
||||
is_ddl_replay: false,
|
||||
})
|
||||
}
|
||||
|
||||
async fn table_columns_info(
|
||||
&self,
|
||||
coro: &Coro<Ctx>,
|
||||
table_name: &str,
|
||||
) -> Result<(Vec<String>, Vec<usize>)> {
|
||||
let mut table_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT cid, name, pk FROM pragma_table_info('{table_name}')"
|
||||
))?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
let mut pk_column_indices = Vec::with_capacity(1);
|
||||
let mut column_updates = Vec::with_capacity(1);
|
||||
let mut column_names = Vec::new();
|
||||
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
|
||||
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
@@ -221,118 +433,10 @@ impl DatabaseReplayGenerator {
|
||||
));
|
||||
};
|
||||
if *pk == 1 {
|
||||
pk_predicates.push(format!("{name} = ?"));
|
||||
pk_column_indices.push(*column_id as usize);
|
||||
}
|
||||
if columns[*column_id as usize] {
|
||||
column_updates.push(format!("{name} = ?"));
|
||||
}
|
||||
column_names.push(name.as_str().to_string());
|
||||
}
|
||||
|
||||
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE rowid = ?",
|
||||
column_updates.join(", ")
|
||||
),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE {}",
|
||||
column_updates.join(", "),
|
||||
pk_predicates.join(" AND ")
|
||||
),
|
||||
Some(pk_column_indices),
|
||||
)
|
||||
};
|
||||
Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Update,
|
||||
query,
|
||||
pk_column_indices,
|
||||
is_ddl_replay: false,
|
||||
})
|
||||
}
|
||||
pub(crate) async fn insert_query(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
columns: usize,
|
||||
) -> Result<ReplayInfo> {
|
||||
if !self.opts.use_implicit_rowid {
|
||||
let placeholders = ["?"].repeat(columns).join(",");
|
||||
let query = format!("INSERT INTO {table_name} VALUES ({placeholders})");
|
||||
return Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query,
|
||||
pk_column_indices: None,
|
||||
is_ddl_replay: false,
|
||||
});
|
||||
};
|
||||
let mut table_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT name FROM pragma_table_info('{table_name}')"
|
||||
))?;
|
||||
let mut column_names = Vec::with_capacity(columns + 1);
|
||||
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
|
||||
let turso_core::Value::Text(text) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
column_names.push(text.to_string());
|
||||
}
|
||||
column_names.push("rowid".to_string());
|
||||
|
||||
let placeholders = ["?"].repeat(columns + 1).join(",");
|
||||
let column_names = column_names.join(", ");
|
||||
let query = format!("INSERT INTO {table_name}({column_names}) VALUES ({placeholders})");
|
||||
Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query,
|
||||
pk_column_indices: None,
|
||||
is_ddl_replay: false,
|
||||
})
|
||||
}
|
||||
pub(crate) async fn delete_query(&self, coro: &Coro, table_name: &str) -> Result<ReplayInfo> {
|
||||
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
|
||||
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
|
||||
} else {
|
||||
let mut pk_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT cid, name FROM pragma_table_info('{table_name}') WHERE pk = 1"
|
||||
))?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
let mut pk_column_indices = Vec::with_capacity(1);
|
||||
while let Some(column) = run_stmt_once(coro, &mut pk_info_stmt).await? {
|
||||
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
let turso_core::Value::Text(name) = column.get_value(1) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
pk_predicates.push(format!("{name} = ?"));
|
||||
pk_column_indices.push(*column_id as usize);
|
||||
}
|
||||
|
||||
if pk_column_indices.is_empty() {
|
||||
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
|
||||
} else {
|
||||
let pk_predicates = pk_predicates.join(" AND ");
|
||||
let query = format!("DELETE FROM {table_name} WHERE {pk_predicates}");
|
||||
(query, Some(pk_column_indices))
|
||||
}
|
||||
};
|
||||
let use_implicit_rowid = self.opts.use_implicit_rowid;
|
||||
tracing::trace!("delete_query: table_name={table_name}, query={query}, use_implicit_rowid={use_implicit_rowid}");
|
||||
Ok(ReplayInfo {
|
||||
change_type: DatabaseChangeType::Delete,
|
||||
query,
|
||||
pk_column_indices,
|
||||
is_ddl_replay: false,
|
||||
})
|
||||
Ok((column_names, pk_column_indices))
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -3,7 +3,7 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use turso_core::{types::WalFrameInfo, StepResult};
|
||||
use turso_core::{types::WalFrameInfo, LimboError, StepResult};
|
||||
|
||||
use crate::{
|
||||
database_replay_generator::{DatabaseReplayGenerator, ReplayInfo},
|
||||
@@ -11,7 +11,7 @@ use crate::{
|
||||
errors::Error,
|
||||
types::{
|
||||
Coro, DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange,
|
||||
DatabaseTapeRowChangeType, ProtocolCommand,
|
||||
DatabaseTapeRowChangeType, ProtocolCommand, Transform,
|
||||
},
|
||||
wal_session::WalSession,
|
||||
Result,
|
||||
@@ -28,7 +28,7 @@ pub struct DatabaseTape {
|
||||
const DEFAULT_CDC_TABLE_NAME: &str = "turso_cdc";
|
||||
const DEFAULT_CDC_MODE: &str = "full";
|
||||
const DEFAULT_CHANGES_BATCH_SIZE: usize = 100;
|
||||
const CDC_PRAGMA_NAME: &str = "unstable_capture_data_changes_conn";
|
||||
pub const CDC_PRAGMA_NAME: &str = "unstable_capture_data_changes_conn";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseTapeOpts {
|
||||
@@ -36,8 +36,8 @@ pub struct DatabaseTapeOpts {
|
||||
pub cdc_mode: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) async fn run_stmt_once<'a>(
|
||||
coro: &'_ Coro,
|
||||
pub(crate) async fn run_stmt_once<'a, Ctx>(
|
||||
coro: &'_ Coro<Ctx>,
|
||||
stmt: &'a mut turso_core::Statement,
|
||||
) -> Result<Option<&'a turso_core::Row>> {
|
||||
loop {
|
||||
@@ -61,8 +61,8 @@ pub(crate) async fn run_stmt_once<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run_stmt_expect_one_row(
|
||||
coro: &Coro,
|
||||
pub(crate) async fn run_stmt_expect_one_row<Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
stmt: &mut turso_core::Statement,
|
||||
) -> Result<Option<Vec<turso_core::Value>>> {
|
||||
let Some(row) = run_stmt_once(coro, stmt).await? else {
|
||||
@@ -75,15 +75,18 @@ pub(crate) async fn run_stmt_expect_one_row(
|
||||
Ok(Some(values))
|
||||
}
|
||||
|
||||
pub(crate) async fn run_stmt_ignore_rows(
|
||||
coro: &Coro,
|
||||
pub(crate) async fn run_stmt_ignore_rows<Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
stmt: &mut turso_core::Statement,
|
||||
) -> Result<()> {
|
||||
while run_stmt_once(coro, stmt).await?.is_some() {}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn exec_stmt(coro: &Coro, stmt: &mut turso_core::Statement) -> Result<()> {
|
||||
pub(crate) async fn exec_stmt<Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
stmt: &mut turso_core::Statement,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
match stmt.step()? {
|
||||
StepResult::IO => {
|
||||
@@ -128,7 +131,7 @@ impl DatabaseTape {
|
||||
let connection = self.inner.connect()?;
|
||||
Ok(connection)
|
||||
}
|
||||
pub async fn connect(&self, coro: &Coro) -> Result<Arc<turso_core::Connection>> {
|
||||
pub async fn connect<Ctx>(&self, coro: &Coro<Ctx>) -> Result<Arc<turso_core::Connection>> {
|
||||
let connection = self.inner.connect()?;
|
||||
tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection");
|
||||
let mut stmt = connection.prepare(&self.pragma_query)?;
|
||||
@@ -142,19 +145,20 @@ impl DatabaseTape {
|
||||
) -> Result<DatabaseChangesIterator> {
|
||||
tracing::debug!("opening changes iterator with options {:?}", opts);
|
||||
let conn = self.inner.connect()?;
|
||||
let query = opts.mode.query(&self.cdc_table, opts.batch_size);
|
||||
let query_stmt = conn.prepare(&query)?;
|
||||
Ok(DatabaseChangesIterator {
|
||||
conn,
|
||||
cdc_table: self.cdc_table.clone(),
|
||||
first_change_id: opts.first_change_id,
|
||||
batch: VecDeque::with_capacity(opts.batch_size),
|
||||
query_stmt,
|
||||
query_stmt: None,
|
||||
txn_boundary_returned: false,
|
||||
mode: opts.mode,
|
||||
batch_size: opts.batch_size,
|
||||
ignore_schema_changes: opts.ignore_schema_changes,
|
||||
})
|
||||
}
|
||||
/// Start raw WAL edit session which can append or rollback pages directly in the current WAL
|
||||
pub async fn start_wal_session(&self, coro: &Coro) -> Result<DatabaseWalSession> {
|
||||
pub async fn start_wal_session<Ctx>(&self, coro: &Coro<Ctx>) -> Result<DatabaseWalSession> {
|
||||
let conn = self.connect(coro).await?;
|
||||
let mut wal_session = WalSession::new(conn);
|
||||
wal_session.begin()?;
|
||||
@@ -162,11 +166,11 @@ impl DatabaseTape {
|
||||
}
|
||||
|
||||
/// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes]
|
||||
pub async fn start_replay_session(
|
||||
pub async fn start_replay_session<Ctx>(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
opts: DatabaseReplaySessionOpts,
|
||||
) -> Result<DatabaseReplaySession> {
|
||||
coro: &Coro<Ctx>,
|
||||
opts: DatabaseReplaySessionOpts<Ctx>,
|
||||
) -> Result<DatabaseReplaySession<Ctx>> {
|
||||
tracing::debug!("opening replay session");
|
||||
let conn = self.connect(coro).await?;
|
||||
conn.execute("BEGIN IMMEDIATE")?;
|
||||
@@ -184,12 +188,12 @@ impl DatabaseTape {
|
||||
pub struct DatabaseWalSession {
|
||||
page_size: usize,
|
||||
next_wal_frame_no: u64,
|
||||
wal_session: WalSession,
|
||||
pub wal_session: WalSession,
|
||||
prepared_frame: Option<(u32, Vec<u8>)>,
|
||||
}
|
||||
|
||||
impl DatabaseWalSession {
|
||||
pub async fn new(coro: &Coro, wal_session: WalSession) -> Result<Self> {
|
||||
pub async fn new<Ctx>(coro: &Coro<Ctx>, wal_session: WalSession) -> Result<Self> {
|
||||
let conn = wal_session.conn();
|
||||
let frames_count = conn.wal_state()?.max_frame;
|
||||
let mut page_size_stmt = conn.prepare("PRAGMA page_size")?;
|
||||
@@ -259,13 +263,15 @@ impl DatabaseWalSession {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn rollback_changes_after(&mut self, frame_watermark: u64) -> Result<()> {
|
||||
pub fn rollback_changes_after(&mut self, frame_watermark: u64) -> Result<usize> {
|
||||
let conn = self.wal_session.conn();
|
||||
let pages = conn.wal_changed_pages_after(frame_watermark)?;
|
||||
tracing::info!("rolling back {} pages", pages.len());
|
||||
let pages_cnt = pages.len();
|
||||
for page_no in pages {
|
||||
self.rollback_page(page_no, frame_watermark)?;
|
||||
}
|
||||
Ok(())
|
||||
Ok(pages_cnt)
|
||||
}
|
||||
|
||||
pub fn db_size(&self) -> Result<u32> {
|
||||
@@ -290,7 +296,7 @@ impl DatabaseWalSession {
|
||||
frame_info.put_to_frame_header(&mut frame);
|
||||
|
||||
let frame_no = self.next_wal_frame_no;
|
||||
tracing::trace!(
|
||||
tracing::debug!(
|
||||
"flush prepared frame {:?} as frame_no {}",
|
||||
frame_info,
|
||||
frame_no
|
||||
@@ -352,17 +358,20 @@ impl Default for DatabaseChangesIteratorOpts {
|
||||
}
|
||||
|
||||
pub struct DatabaseChangesIterator {
|
||||
query_stmt: turso_core::Statement,
|
||||
conn: Arc<turso_core::Connection>,
|
||||
cdc_table: Arc<String>,
|
||||
query_stmt: Option<turso_core::Statement>,
|
||||
first_change_id: Option<i64>,
|
||||
batch: VecDeque<DatabaseTapeRowChange>,
|
||||
txn_boundary_returned: bool,
|
||||
mode: DatabaseChangesIteratorMode,
|
||||
batch_size: usize,
|
||||
ignore_schema_changes: bool,
|
||||
}
|
||||
|
||||
const SQLITE_SCHEMA_TABLE: &str = "sqlite_schema";
|
||||
impl DatabaseChangesIterator {
|
||||
pub async fn next(&mut self, coro: &Coro) -> Result<Option<DatabaseTapeOperation>> {
|
||||
pub async fn next<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<Option<DatabaseTapeOperation>> {
|
||||
if self.batch.is_empty() {
|
||||
self.refill(coro).await?;
|
||||
}
|
||||
@@ -386,15 +395,26 @@ impl DatabaseChangesIterator {
|
||||
return Ok(next);
|
||||
}
|
||||
}
|
||||
async fn refill(&mut self, coro: &Coro) -> Result<()> {
|
||||
async fn refill<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
|
||||
if self.query_stmt.is_none() {
|
||||
let query = self.mode.query(&self.cdc_table, self.batch_size);
|
||||
let stmt = match self.conn.prepare(&query) {
|
||||
Ok(stmt) => stmt,
|
||||
Err(LimboError::ParseError(err)) if err.contains("no such table") => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
self.query_stmt = Some(stmt);
|
||||
}
|
||||
let query_stmt = self.query_stmt.as_mut().unwrap();
|
||||
|
||||
let change_id_filter = self.first_change_id.unwrap_or(self.mode.first_id());
|
||||
self.query_stmt.reset();
|
||||
self.query_stmt.bind_at(
|
||||
query_stmt.reset();
|
||||
query_stmt.bind_at(
|
||||
1.try_into().unwrap(),
|
||||
turso_core::Value::Integer(change_id_filter),
|
||||
);
|
||||
|
||||
while let Some(row) = run_stmt_once(coro, &mut self.query_stmt).await? {
|
||||
while let Some(row) = run_stmt_once(coro, query_stmt).await? {
|
||||
let database_change: DatabaseChange = row.try_into()?;
|
||||
let tape_change = match self.mode {
|
||||
DatabaseChangesIteratorMode::Apply => database_change.into_apply()?,
|
||||
@@ -410,43 +430,57 @@ impl DatabaseChangesIterator {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseReplaySessionOpts {
|
||||
#[derive(Clone)]
|
||||
pub struct DatabaseReplaySessionOpts<Ctx = ()> {
|
||||
pub use_implicit_rowid: bool,
|
||||
pub transform: Option<Transform<Ctx>>,
|
||||
}
|
||||
|
||||
struct CachedStmt {
|
||||
impl<Ctx> std::fmt::Debug for DatabaseReplaySessionOpts<Ctx> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DatabaseReplaySessionOpts")
|
||||
.field("use_implicit_rowid", &self.use_implicit_rowid)
|
||||
.field("transform_mutation.is_some()", &self.transform.is_some())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct CachedStmt {
|
||||
stmt: turso_core::Statement,
|
||||
info: ReplayInfo,
|
||||
}
|
||||
|
||||
pub struct DatabaseReplaySession {
|
||||
conn: Arc<turso_core::Connection>,
|
||||
cached_delete_stmt: HashMap<String, CachedStmt>,
|
||||
cached_insert_stmt: HashMap<(String, usize), CachedStmt>,
|
||||
cached_update_stmt: HashMap<(String, Vec<bool>), CachedStmt>,
|
||||
in_txn: bool,
|
||||
generator: DatabaseReplayGenerator,
|
||||
pub struct DatabaseReplaySession<Ctx = ()> {
|
||||
pub(crate) conn: Arc<turso_core::Connection>,
|
||||
pub(crate) cached_delete_stmt: HashMap<String, CachedStmt>,
|
||||
pub(crate) cached_insert_stmt: HashMap<(String, usize), CachedStmt>,
|
||||
pub(crate) cached_update_stmt: HashMap<(String, Vec<bool>), CachedStmt>,
|
||||
pub(crate) in_txn: bool,
|
||||
pub(crate) generator: DatabaseReplayGenerator<Ctx>,
|
||||
}
|
||||
|
||||
async fn replay_stmt(
|
||||
coro: &Coro,
|
||||
cached: &mut CachedStmt,
|
||||
async fn replay_stmt<Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
stmt: &mut turso_core::Statement,
|
||||
values: Vec<turso_core::Value>,
|
||||
) -> Result<()> {
|
||||
cached.stmt.reset();
|
||||
stmt.reset();
|
||||
for (i, value) in values.into_iter().enumerate() {
|
||||
cached.stmt.bind_at((i + 1).try_into().unwrap(), value);
|
||||
stmt.bind_at((i + 1).try_into().unwrap(), value);
|
||||
}
|
||||
exec_stmt(coro, &mut cached.stmt).await?;
|
||||
exec_stmt(coro, stmt).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl DatabaseReplaySession {
|
||||
impl<Ctx> DatabaseReplaySession<Ctx> {
|
||||
pub fn conn(&self) -> Arc<turso_core::Connection> {
|
||||
self.conn.clone()
|
||||
}
|
||||
pub async fn replay(&mut self, coro: &Coro, operation: DatabaseTapeOperation) -> Result<()> {
|
||||
pub async fn replay(
|
||||
&mut self,
|
||||
coro: &Coro<Ctx>,
|
||||
operation: DatabaseTapeOperation,
|
||||
) -> Result<()> {
|
||||
match operation {
|
||||
DatabaseTapeOperation::Commit => {
|
||||
tracing::debug!("replay: commit replayed changes after transaction boundary");
|
||||
@@ -466,10 +500,23 @@ impl DatabaseReplaySession {
|
||||
|
||||
if table == SQLITE_SCHEMA_TABLE {
|
||||
let replay_info = self.generator.replay_info(coro, &change).await?;
|
||||
for replay in &replay_info {
|
||||
self.conn.execute(replay.query.as_str())?;
|
||||
}
|
||||
self.conn.execute(replay_info.query.as_str())?;
|
||||
} else {
|
||||
if let Some(transform) = &self.generator.opts.transform {
|
||||
let replay_info = self.generator.replay_info(coro, &change).await?;
|
||||
let mutation = self.generator.create_mutation(&replay_info, &change)?;
|
||||
let statement = transform(&coro.ctx.borrow(), mutation)?;
|
||||
if let Some(statement) = statement {
|
||||
tracing::info!(
|
||||
"replay: use mutation from custom transformer: sql={}, values={:?}",
|
||||
statement.sql,
|
||||
statement.values
|
||||
);
|
||||
let mut stmt = self.conn.prepare(&statement.sql)?;
|
||||
replay_stmt(coro, &mut stmt, statement.values).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => {
|
||||
let key = self.populate_delete_stmt(coro, table).await?;
|
||||
@@ -486,7 +533,7 @@ impl DatabaseReplaySession {
|
||||
before,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
replay_stmt(coro, &mut cached.stmt, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let key = self.populate_insert_stmt(coro, table, after.len()).await?;
|
||||
@@ -503,7 +550,7 @@ impl DatabaseReplaySession {
|
||||
after,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
replay_stmt(coro, &mut cached.stmt, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update {
|
||||
after,
|
||||
@@ -533,7 +580,7 @@ impl DatabaseReplaySession {
|
||||
after,
|
||||
Some(updates),
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
replay_stmt(coro, &mut cached.stmt, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update {
|
||||
before,
|
||||
@@ -554,7 +601,7 @@ impl DatabaseReplaySession {
|
||||
before,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
replay_stmt(coro, &mut cached.stmt, values).await?;
|
||||
|
||||
let key = self.populate_insert_stmt(coro, table, after.len()).await?;
|
||||
tracing::trace!(
|
||||
@@ -570,7 +617,7 @@ impl DatabaseReplaySession {
|
||||
after,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
replay_stmt(coro, &mut cached.stmt, values).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -578,7 +625,11 @@ impl DatabaseReplaySession {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn populate_delete_stmt<'a>(&mut self, coro: &Coro, table: &'a str) -> Result<&'a str> {
|
||||
async fn populate_delete_stmt<'a>(
|
||||
&mut self,
|
||||
coro: &Coro<Ctx>,
|
||||
table: &'a str,
|
||||
) -> Result<&'a str> {
|
||||
if self.cached_delete_stmt.contains_key(table) {
|
||||
return Ok(table);
|
||||
}
|
||||
@@ -591,7 +642,7 @@ impl DatabaseReplaySession {
|
||||
}
|
||||
async fn populate_insert_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
coro: &Coro<Ctx>,
|
||||
table: &str,
|
||||
columns: usize,
|
||||
) -> Result<(String, usize)> {
|
||||
@@ -612,7 +663,7 @@ impl DatabaseReplaySession {
|
||||
}
|
||||
async fn populate_update_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
coro: &Coro<Ctx>,
|
||||
table: &str,
|
||||
columns: &[bool],
|
||||
) -> Result<(String, Vec<bool>)> {
|
||||
@@ -639,7 +690,7 @@ mod tests {
|
||||
database_tape::{
|
||||
run_stmt_once, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
|
||||
},
|
||||
types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
|
||||
types::{Coro, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
|
||||
};
|
||||
|
||||
#[test]
|
||||
@@ -653,6 +704,7 @@ mod tests {
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
let db1 = db1.clone();
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn = db1.connect(&coro).await.unwrap();
|
||||
let mut stmt = conn.prepare("SELECT * FROM turso_cdc").unwrap();
|
||||
let mut rows = Vec::new();
|
||||
@@ -683,6 +735,7 @@ mod tests {
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
let db1 = db1.clone();
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn = db1.connect(&coro).await.unwrap();
|
||||
conn.execute("CREATE TABLE t(x)").unwrap();
|
||||
conn.execute("INSERT INTO t VALUES (1), (2), (3)").unwrap();
|
||||
@@ -754,6 +807,7 @@ mod tests {
|
||||
let db1 = db1.clone();
|
||||
let db2 = db2.clone();
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1.execute("CREATE TABLE t(x)").unwrap();
|
||||
conn1
|
||||
@@ -768,6 +822,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: true,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
let opts = Default::default();
|
||||
@@ -832,6 +887,7 @@ mod tests {
|
||||
let db1 = db1.clone();
|
||||
let db2 = db2.clone();
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1.execute("CREATE TABLE t(x)").unwrap();
|
||||
conn1
|
||||
@@ -846,6 +902,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
let opts = Default::default();
|
||||
@@ -904,6 +961,7 @@ mod tests {
|
||||
let db1 = db1.clone();
|
||||
let db2 = db2.clone();
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1.execute("CREATE TABLE t(x TEXT PRIMARY KEY)").unwrap();
|
||||
conn1.execute("INSERT INTO t(x) VALUES ('a')").unwrap();
|
||||
@@ -915,6 +973,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
let opts = Default::default();
|
||||
@@ -969,6 +1028,7 @@ mod tests {
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
|
||||
@@ -988,6 +1048,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
@@ -1094,6 +1155,7 @@ mod tests {
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
|
||||
@@ -1104,6 +1166,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
@@ -1177,6 +1240,7 @@ mod tests {
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
|
||||
@@ -1188,6 +1252,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
@@ -1255,6 +1320,7 @@ mod tests {
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
@@ -1283,6 +1349,7 @@ mod tests {
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
transform: None,
|
||||
};
|
||||
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ pub enum Error {
|
||||
DatabaseSyncEngineError(String),
|
||||
#[error("database sync engine conflict: {0}")]
|
||||
DatabaseSyncEngineConflict(String),
|
||||
#[error("database sync engine IO error: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -12,9 +12,9 @@ pub trait IoOperations {
|
||||
fn open_tape(&self, path: &str, capture: bool) -> Result<DatabaseTape>;
|
||||
fn try_open(&self, path: &str) -> Result<Option<Arc<dyn turso_core::File>>>;
|
||||
fn create(&self, path: &str) -> Result<Arc<dyn turso_core::File>>;
|
||||
fn truncate(
|
||||
fn truncate<Ctx>(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
coro: &Coro<Ctx>,
|
||||
file: Arc<dyn turso_core::File>,
|
||||
len: usize,
|
||||
) -> impl std::future::Future<Output = Result<()>>;
|
||||
@@ -47,9 +47,9 @@ impl IoOperations for Arc<dyn turso_core::IO> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn truncate(
|
||||
async fn truncate<Ctx>(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
coro: &Coro<Ctx>,
|
||||
file: Arc<dyn turso_core::File>,
|
||||
len: usize,
|
||||
) -> Result<()> {
|
||||
|
||||
@@ -15,6 +15,11 @@ pub trait ProtocolIO {
|
||||
type DataCompletion: DataCompletion;
|
||||
fn full_read(&self, path: &str) -> Result<Self::DataCompletion>;
|
||||
fn full_write(&self, path: &str, content: Vec<u8>) -> Result<Self::DataCompletion>;
|
||||
fn http(&self, method: &str, path: &str, body: Option<Vec<u8>>)
|
||||
-> Result<Self::DataCompletion>;
|
||||
fn http(
|
||||
&self,
|
||||
method: &str,
|
||||
path: &str,
|
||||
body: Option<Vec<u8>>,
|
||||
headers: &[(&str, &str)],
|
||||
) -> Result<Self::DataCompletion>;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,64 @@ use std::collections::VecDeque;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[derive(prost::Enumeration)]
|
||||
#[repr(i32)]
|
||||
pub enum PageUpdatesEncodingReq {
|
||||
Raw = 0,
|
||||
Zstd = 1,
|
||||
}
|
||||
|
||||
#[derive(prost::Message)]
|
||||
pub struct PullUpdatesReqProtoBody {
|
||||
#[prost(enumeration = "PageUpdatesEncodingReq", tag = "1")]
|
||||
pub encoding: i32,
|
||||
#[prost(string, tag = "2")]
|
||||
pub server_revision: String,
|
||||
#[prost(string, tag = "3")]
|
||||
pub client_revision: String,
|
||||
#[prost(uint32, tag = "4")]
|
||||
pub long_poll_timeout_ms: u32,
|
||||
#[prost(bytes, tag = "5")]
|
||||
pub server_pages: Bytes,
|
||||
#[prost(bytes, tag = "6")]
|
||||
pub client_pages: Bytes,
|
||||
}
|
||||
|
||||
#[derive(prost::Message, Serialize, Deserialize, Clone, Eq, PartialEq)]
|
||||
pub struct PageData {
|
||||
#[prost(uint64, tag = "1")]
|
||||
pub page_id: u64,
|
||||
|
||||
#[serde(with = "bytes_as_base64_pad")]
|
||||
#[prost(bytes, tag = "2")]
|
||||
pub encoded_page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(prost::Message)]
|
||||
pub struct PageSetRawEncodingProto {}
|
||||
|
||||
#[derive(prost::Message)]
|
||||
pub struct PageSetZstdEncodingProto {
|
||||
#[prost(int32, tag = "1")]
|
||||
pub level: i32,
|
||||
#[prost(uint32, repeated, tag = "2")]
|
||||
pub pages_dict: Vec<u32>,
|
||||
}
|
||||
|
||||
#[derive(prost::Message)]
|
||||
pub struct PullUpdatesRespProtoBody {
|
||||
#[prost(string, tag = "1")]
|
||||
pub server_revision: String,
|
||||
#[prost(uint64, tag = "2")]
|
||||
pub db_size: u64,
|
||||
#[prost(optional, message, tag = "3")]
|
||||
pub raw_encoding: Option<PageSetRawEncodingProto>,
|
||||
#[prost(optional, message, tag = "4")]
|
||||
pub zstd_encoding: Option<PageSetZstdEncodingProto>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct PipelineReqBody {
|
||||
pub baton: Option<String>,
|
||||
@@ -22,8 +80,6 @@ pub enum StreamRequest {
|
||||
#[serde(skip_deserializing)]
|
||||
#[default]
|
||||
None,
|
||||
/// See [`CloseStreamReq`]
|
||||
Close(CloseStreamReq),
|
||||
/// See [`ExecuteStreamReq`]
|
||||
Execute(ExecuteStreamReq),
|
||||
}
|
||||
@@ -33,15 +89,53 @@ pub enum StreamRequest {
|
||||
pub enum StreamResult {
|
||||
#[default]
|
||||
None,
|
||||
Ok,
|
||||
Ok {
|
||||
response: StreamResponse,
|
||||
},
|
||||
Error {
|
||||
error: Error,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
/// A request to close the current stream.
|
||||
pub struct CloseStreamReq {}
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum StreamResponse {
|
||||
Execute(ExecuteStreamResp),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||
/// A response to a [`ExecuteStreamReq`].
|
||||
pub struct ExecuteStreamResp {
|
||||
pub result: StmtResult,
|
||||
}
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Default)]
|
||||
pub struct StmtResult {
|
||||
pub cols: Vec<Col>,
|
||||
pub rows: Vec<Row>,
|
||||
pub affected_row_count: u64,
|
||||
#[serde(with = "option_i64_as_str")]
|
||||
pub last_insert_rowid: Option<i64>,
|
||||
#[serde(default, with = "option_u64_as_str")]
|
||||
pub replication_index: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub rows_read: u64,
|
||||
#[serde(default)]
|
||||
pub rows_written: u64,
|
||||
#[serde(default)]
|
||||
pub query_duration_ms: f64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
|
||||
pub struct Col {
|
||||
pub name: Option<String>,
|
||||
pub decltype: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
|
||||
#[serde(transparent)]
|
||||
pub struct Row {
|
||||
pub values: Vec<Value>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
/// A request to execute a single SQL statement.
|
||||
@@ -229,3 +323,80 @@ pub(crate) mod bytes_as_base64 {
|
||||
Ok(Bytes::from(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
mod option_i64_as_str {
|
||||
use serde::de::{Error, Visitor};
|
||||
use serde::{ser, Deserializer, Serialize as _};
|
||||
|
||||
pub fn serialize<S: ser::Serializer>(value: &Option<i64>, ser: S) -> Result<S::Ok, S::Error> {
|
||||
value.map(|v| v.to_string()).serialize(ser)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<i64>, D::Error> {
|
||||
struct V;
|
||||
|
||||
impl<'de> Visitor<'de> for V {
|
||||
type Value = Option<i64>;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(formatter, "a string representing a signed integer, or null")
|
||||
}
|
||||
|
||||
fn visit_some<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_any(V)
|
||||
}
|
||||
|
||||
fn visit_none<E>(self) -> Result<Self::Value, E>
|
||||
where
|
||||
E: Error,
|
||||
{
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn visit_unit<E>(self) -> Result<Self::Value, E>
|
||||
where
|
||||
E: Error,
|
||||
{
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: Error,
|
||||
{
|
||||
Ok(Some(v))
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: Error,
|
||||
{
|
||||
v.parse().map_err(E::custom).map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
d.deserialize_option(V)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod bytes_as_base64_pad {
|
||||
use base64::{engine::general_purpose::STANDARD, Engine as _};
|
||||
use bytes::Bytes;
|
||||
use serde::{de, ser};
|
||||
use serde::{de::Error as _, Serialize as _};
|
||||
|
||||
pub fn serialize<S: ser::Serializer>(value: &Bytes, ser: S) -> Result<S::Ok, S::Error> {
|
||||
STANDARD.encode(value).serialize(ser)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: de::Deserializer<'de>>(de: D) -> Result<Bytes, D::Error> {
|
||||
let text = <&'de str as de::Deserialize>::deserialize(de)?;
|
||||
let bytes = STANDARD.decode(text).map_err(|_| {
|
||||
D::Error::invalid_value(de::Unexpected::Str(text), &"binary data encoded as base64")
|
||||
})?;
|
||||
Ok(Bytes::from(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,39 @@
|
||||
use std::{cell::RefCell, collections::HashMap, sync::Arc};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{errors::Error, Result};
|
||||
|
||||
pub type Coro = genawaiter::sync::Co<ProtocolCommand, Result<()>>;
|
||||
pub type Transform<Ctx> =
|
||||
Arc<dyn Fn(&Ctx, DatabaseRowMutation) -> Result<Option<DatabaseRowStatement>> + 'static>;
|
||||
|
||||
pub struct Coro<Ctx> {
|
||||
pub ctx: RefCell<Ctx>,
|
||||
gen: genawaiter::sync::Co<ProtocolCommand, Result<Ctx>>,
|
||||
}
|
||||
|
||||
impl<Ctx> Coro<Ctx> {
|
||||
pub fn new(ctx: Ctx, gen: genawaiter::sync::Co<ProtocolCommand, Result<Ctx>>) -> Self {
|
||||
Self {
|
||||
ctx: RefCell::new(ctx),
|
||||
gen,
|
||||
}
|
||||
}
|
||||
pub async fn yield_(&self, value: ProtocolCommand) -> Result<()> {
|
||||
let ctx = self.gen.yield_(value).await?;
|
||||
self.ctx.replace(ctx);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<genawaiter::sync::Co<ProtocolCommand, Result<()>>> for Coro<()> {
|
||||
fn from(value: genawaiter::sync::Co<ProtocolCommand, Result<()>>) -> Self {
|
||||
Self {
|
||||
gen: value,
|
||||
ctx: RefCell::new(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct DbSyncInfo {
|
||||
@@ -17,6 +48,17 @@ pub struct DbSyncStatus {
|
||||
pub max_frame_no: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DbChangesStatus {
|
||||
pub revision: DatabasePullRevision,
|
||||
pub file_path: String,
|
||||
}
|
||||
|
||||
pub struct SyncEngineStats {
|
||||
pub cdc_operations: i64,
|
||||
pub wal_size: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DatabaseChangeType {
|
||||
Delete,
|
||||
@@ -29,12 +71,30 @@ pub struct DatabaseMetadata {
|
||||
/// Unique identifier of the client - generated on sync startup
|
||||
pub client_unique_id: String,
|
||||
/// Latest generation from remote which was pulled locally to the Synced DB
|
||||
pub synced_generation: u64,
|
||||
/// Latest frame number from remote which was pulled locally to the Synced DB
|
||||
pub synced_frame_no: Option<u64>,
|
||||
pub synced_revision: Option<DatabasePullRevision>,
|
||||
/// pair of frame_no for Draft and Synced DB such that content of the database file up to these frames is identical
|
||||
pub draft_wal_match_watermark: u64,
|
||||
pub synced_wal_match_watermark: u64,
|
||||
pub revert_since_wal_salt: Option<Vec<u32>>,
|
||||
pub revert_since_wal_watermark: u64,
|
||||
pub last_pushed_pull_gen_hint: i64,
|
||||
pub last_pushed_change_id_hint: i64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum DatabasePullRevision {
|
||||
Legacy {
|
||||
generation: u64,
|
||||
synced_frame_no: Option<u64>,
|
||||
},
|
||||
V1 {
|
||||
revision: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub enum DatabaseSyncEngineProtocolVersion {
|
||||
Legacy,
|
||||
V1,
|
||||
}
|
||||
|
||||
impl DatabaseMetadata {
|
||||
@@ -199,6 +259,21 @@ impl TryFrom<&turso_core::Row> for DatabaseChange {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatabaseRowMutation {
|
||||
pub change_time: u64,
|
||||
pub table_name: String,
|
||||
pub id: i64,
|
||||
pub change_type: DatabaseChangeType,
|
||||
pub before: Option<HashMap<String, turso_core::Value>>,
|
||||
pub after: Option<HashMap<String, turso_core::Value>>,
|
||||
pub updates: Option<HashMap<String, turso_core::Value>>,
|
||||
}
|
||||
|
||||
pub struct DatabaseRowStatement {
|
||||
pub sql: String,
|
||||
pub values: Vec<turso_core::Value>,
|
||||
}
|
||||
|
||||
pub enum DatabaseTapeRowChangeType {
|
||||
Delete {
|
||||
before: Vec<turso_core::Value>,
|
||||
|
||||
@@ -38,9 +38,9 @@ impl WalSession {
|
||||
let info = self.conn.wal_get_frame(frame_no, frame)?;
|
||||
Ok(info)
|
||||
}
|
||||
pub fn end(&mut self) -> Result<()> {
|
||||
pub fn end(&mut self, force_commit: bool) -> Result<()> {
|
||||
assert!(self.in_txn);
|
||||
self.conn.wal_insert_end(false)?;
|
||||
self.conn.wal_insert_end(force_commit)?;
|
||||
self.in_txn = false;
|
||||
Ok(())
|
||||
}
|
||||
@@ -53,7 +53,7 @@ impl Drop for WalSession {
|
||||
fn drop(&mut self) {
|
||||
if self.in_txn {
|
||||
let _ = self
|
||||
.end()
|
||||
.end(false)
|
||||
.inspect_err(|e| tracing::error!("failed to close WAL session: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
57
sync/javascript/index.d.ts
vendored
57
sync/javascript/index.d.ts
vendored
@@ -67,6 +67,14 @@ export declare class Database {
|
||||
* `Ok(())` if the database is closed successfully.
|
||||
*/
|
||||
close(): void
|
||||
/**
|
||||
* Sets the default safe integers mode for all statements from this database.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * `toggle` - Whether to use safe integers by default.
|
||||
*/
|
||||
defaultSafeIntegers(toggle?: boolean | undefined | null): void
|
||||
/** Runs the I/O loop synchronously. */
|
||||
ioLoopSync(): void
|
||||
/** Runs the I/O loop asynchronously, returning a Promise. */
|
||||
@@ -107,11 +115,22 @@ export declare class Statement {
|
||||
raw(raw?: boolean | undefined | null): void
|
||||
/** Sets the presentation mode to pluck. */
|
||||
pluck(pluck?: boolean | undefined | null): void
|
||||
/**
|
||||
* Sets safe integers mode for this statement.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * `toggle` - Whether to use safe integers.
|
||||
*/
|
||||
safeIntegers(toggle?: boolean | undefined | null): void
|
||||
/** Get column information for the statement */
|
||||
columns(): unknown[]
|
||||
/** Finalizes the statement. */
|
||||
finalize(): void
|
||||
}
|
||||
export declare class GeneratorHolder {
|
||||
resume(error?: string | undefined | null): number
|
||||
take(): GeneratorResponse | null
|
||||
}
|
||||
|
||||
export declare class JsDataCompletion {
|
||||
@@ -143,16 +162,42 @@ export declare class SyncEngine {
|
||||
protocolIo(): JsProtocolRequestData | null
|
||||
sync(): GeneratorHolder
|
||||
push(): GeneratorHolder
|
||||
stats(): GeneratorHolder
|
||||
pull(): GeneratorHolder
|
||||
checkpoint(): GeneratorHolder
|
||||
open(): Database
|
||||
}
|
||||
|
||||
export declare const enum DatabaseChangeTypeJs {
|
||||
Insert = 0,
|
||||
Update = 1,
|
||||
Delete = 2
|
||||
}
|
||||
|
||||
export interface DatabaseOpts {
|
||||
path: string
|
||||
}
|
||||
|
||||
export interface DatabaseRowMutationJs {
|
||||
changeTime: number
|
||||
tableName: string
|
||||
id: number
|
||||
changeType: DatabaseChangeTypeJs
|
||||
before?: Record<string, any>
|
||||
after?: Record<string, any>
|
||||
updates?: Record<string, any>
|
||||
}
|
||||
|
||||
export interface DatabaseRowStatementJs {
|
||||
sql: string
|
||||
values: Array<any>
|
||||
}
|
||||
|
||||
export type GeneratorResponse =
|
||||
| { type: 'SyncEngineStats', operations: number, wal: number }
|
||||
|
||||
export type JsProtocolRequest =
|
||||
| { type: 'Http', method: string, path: string, body?: Buffer }
|
||||
| { type: 'Http', method: string, path: string, body?: Array<number>, headers: Array<[string, string]> }
|
||||
| { type: 'FullRead', path: string }
|
||||
| { type: 'FullWrite', path: string, content: Array<number> }
|
||||
|
||||
@@ -160,5 +205,13 @@ export interface SyncEngineOpts {
|
||||
path: string
|
||||
clientName?: string
|
||||
walPullBatchSize?: number
|
||||
enableTracing?: boolean
|
||||
enableTracing?: string
|
||||
tablesIgnore?: Array<string>
|
||||
transform?: (arg: DatabaseRowMutationJs) => DatabaseRowStatementJs | null
|
||||
protocolVersion?: SyncEngineProtocolVersion
|
||||
}
|
||||
|
||||
export declare const enum SyncEngineProtocolVersion {
|
||||
Legacy = 0,
|
||||
V1 = 1
|
||||
}
|
||||
|
||||
@@ -79,12 +79,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-android-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-android-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-android-arm64')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -95,12 +90,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-android-arm-eabi')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-android-arm-eabi/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-android-arm-eabi')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -115,12 +105,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-win32-x64-msvc')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-win32-x64-msvc/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-win32-x64-msvc')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -131,12 +116,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-win32-ia32-msvc')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-win32-ia32-msvc/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-win32-ia32-msvc')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -147,12 +127,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-win32-arm64-msvc')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-win32-arm64-msvc/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-win32-arm64-msvc')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -166,12 +141,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-darwin-universal')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-darwin-universal/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-darwin-universal')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -182,12 +152,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-darwin-x64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-darwin-x64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-darwin-x64')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -198,12 +163,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-darwin-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-darwin-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-darwin-arm64')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -218,12 +178,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-freebsd-x64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-freebsd-x64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-freebsd-x64')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -234,12 +189,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-freebsd-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-freebsd-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-freebsd-arm64')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -255,12 +205,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-x64-musl')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-musl/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-x64-musl')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -271,12 +216,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-x64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-x64-gnu')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -289,12 +229,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm64-musl')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-musl/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-arm64-musl')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -305,12 +240,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-arm64-gnu')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -323,12 +253,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm-musleabihf')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-musleabihf/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-arm-musleabihf')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -339,12 +264,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm-gnueabihf')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-gnueabihf/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-arm-gnueabihf')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -357,12 +277,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-riscv64-musl')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-musl/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-riscv64-musl')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -373,12 +288,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-riscv64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-riscv64-gnu')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -390,12 +300,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-ppc64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-ppc64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-ppc64-gnu')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -406,12 +311,7 @@ function requireNative() {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-s390x-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-s390x-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-s390x-gnu')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -421,49 +321,34 @@ function requireNative() {
|
||||
} else if (process.platform === 'openharmony') {
|
||||
if (process.arch === 'arm64') {
|
||||
try {
|
||||
return require('./turso-sync-js.openharmony-arm64.node')
|
||||
return require('./turso-sync-js.linux-arm64-ohos.node')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-openharmony-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-arm64-ohos')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
} else if (process.arch === 'x64') {
|
||||
try {
|
||||
return require('./turso-sync-js.openharmony-x64.node')
|
||||
return require('./turso-sync-js.linux-x64-ohos.node')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-openharmony-x64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-openharmony-x64/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-x64-ohos')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
} else if (process.arch === 'arm') {
|
||||
try {
|
||||
return require('./turso-sync-js.openharmony-arm.node')
|
||||
return require('./turso-sync-js.linux-arm-ohos.node')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-openharmony-arm')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm/package.json').version
|
||||
if (bindingPackageVersion !== '0.1.4-pre.5') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.1.4-pre.5 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
return require('@tursodatabase/sync-linux-arm-ohos')
|
||||
} catch (e) {
|
||||
loadErrors.push(e)
|
||||
}
|
||||
@@ -508,7 +393,7 @@ if (!nativeBinding) {
|
||||
throw new Error(`Failed to load native binding`)
|
||||
}
|
||||
|
||||
const { Database, Statement, GeneratorHolder, JsDataCompletion, JsDataPollResult, JsProtocolIo, JsProtocolRequestData, SyncEngine } = nativeBinding
|
||||
const { Database, Statement, GeneratorHolder, JsDataCompletion, JsDataPollResult, JsProtocolIo, JsProtocolRequestData, SyncEngine, DatabaseChangeTypeJs, SyncEngineProtocolVersion } = nativeBinding
|
||||
export { Database }
|
||||
export { Statement }
|
||||
export { GeneratorHolder }
|
||||
@@ -517,3 +402,5 @@ export { JsDataPollResult }
|
||||
export { JsProtocolIo }
|
||||
export { JsProtocolRequestData }
|
||||
export { SyncEngine }
|
||||
export { DatabaseChangeTypeJs }
|
||||
export { SyncEngineProtocolVersion }
|
||||
|
||||
@@ -59,4 +59,4 @@
|
||||
"dependencies": {
|
||||
"@tursodatabase/database": "~0.1.4-pre.5"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
use napi::Env;
|
||||
use napi_derive::napi;
|
||||
use std::{future::Future, sync::Mutex};
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use turso_sync_engine::types::ProtocolCommand;
|
||||
|
||||
@@ -7,18 +11,18 @@ pub const GENERATOR_RESUME_IO: u32 = 0;
|
||||
pub const GENERATOR_RESUME_DONE: u32 = 1;
|
||||
|
||||
pub trait Generator {
|
||||
fn resume(&mut self, result: Option<String>) -> napi::Result<u32>;
|
||||
fn resume(&mut self, env: Env, result: Option<String>) -> napi::Result<u32>;
|
||||
}
|
||||
|
||||
impl<F: Future<Output = turso_sync_engine::Result<()>>> Generator
|
||||
for genawaiter::sync::Gen<ProtocolCommand, turso_sync_engine::Result<()>, F>
|
||||
for genawaiter::sync::Gen<ProtocolCommand, turso_sync_engine::Result<Env>, F>
|
||||
{
|
||||
fn resume(&mut self, error: Option<String>) -> napi::Result<u32> {
|
||||
fn resume(&mut self, env: Env, error: Option<String>) -> napi::Result<u32> {
|
||||
let result = match error {
|
||||
Some(err) => Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
format!("JsProtocolIo error: {err}"),
|
||||
)),
|
||||
None => Ok(()),
|
||||
None => Ok(env),
|
||||
};
|
||||
match self.resume_with(result) {
|
||||
genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => Ok(GENERATOR_RESUME_IO),
|
||||
@@ -31,15 +35,25 @@ impl<F: Future<Output = turso_sync_engine::Result<()>>> Generator
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(discriminant = "type")]
|
||||
pub enum GeneratorResponse {
|
||||
SyncEngineStats { operations: i64, wal: i64 },
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct GeneratorHolder {
|
||||
pub(crate) inner: Box<Mutex<dyn Generator>>,
|
||||
pub(crate) response: Arc<Mutex<Option<GeneratorResponse>>>,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl GeneratorHolder {
|
||||
#[napi]
|
||||
pub fn resume(&self, error: Option<String>) -> napi::Result<u32> {
|
||||
self.inner.lock().unwrap().resume(error)
|
||||
pub fn resume(&self, env: Env, error: Option<String>) -> napi::Result<u32> {
|
||||
self.inner.lock().unwrap().resume(env, error)
|
||||
}
|
||||
#[napi]
|
||||
pub fn take(&self) -> Option<GeneratorResponse> {
|
||||
self.response.lock().unwrap().take()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ pub enum JsProtocolRequest {
|
||||
method: String,
|
||||
path: String,
|
||||
body: Option<Vec<u8>>,
|
||||
headers: Vec<(String, String)>,
|
||||
},
|
||||
FullRead {
|
||||
path: String,
|
||||
@@ -130,11 +131,16 @@ impl ProtocolIO for JsProtocolIo {
|
||||
method: &str,
|
||||
path: &str,
|
||||
body: Option<Vec<u8>>,
|
||||
headers: &[(&str, &str)],
|
||||
) -> turso_sync_engine::Result<JsDataCompletion> {
|
||||
Ok(self.add_request(JsProtocolRequest::Http {
|
||||
method: method.to_string(),
|
||||
path: path.to_string(),
|
||||
body,
|
||||
headers: headers
|
||||
.iter()
|
||||
.map(|x| (x.0.to_string(), x.1.to_string()))
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,21 +1,31 @@
|
||||
#![deny(clippy::all)]
|
||||
#![allow(clippy::await_holding_lock)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
|
||||
pub mod generator;
|
||||
pub mod js_protocol_io;
|
||||
|
||||
use std::sync::{Arc, Mutex, OnceLock};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex, OnceLock, RwLock, RwLockReadGuard, RwLockWriteGuard},
|
||||
};
|
||||
|
||||
use napi::bindgen_prelude::AsyncTask;
|
||||
use napi::{
|
||||
bindgen_prelude::{AsyncTask, Either5, Function, FunctionRef, Null},
|
||||
Env,
|
||||
};
|
||||
use napi_derive::napi;
|
||||
use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan};
|
||||
use turso_node::IoLoopTask;
|
||||
use turso_sync_engine::{
|
||||
database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts},
|
||||
types::Coro,
|
||||
types::{
|
||||
Coro, DatabaseChangeType, DatabaseRowMutation, DatabaseRowStatement,
|
||||
DatabaseSyncEngineProtocolVersion,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
generator::GeneratorHolder,
|
||||
generator::{GeneratorHolder, GeneratorResponse},
|
||||
js_protocol_io::{JsProtocolIo, JsProtocolRequestData},
|
||||
};
|
||||
|
||||
@@ -29,18 +39,92 @@ pub struct SyncEngine {
|
||||
path: String,
|
||||
client_name: String,
|
||||
wal_pull_batch_size: u32,
|
||||
protocol_version: DatabaseSyncEngineProtocolVersion,
|
||||
tables_ignore: Vec<String>,
|
||||
transform: Option<FunctionRef<DatabaseRowMutationJs, Option<DatabaseRowStatementJs>>>,
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
protocol: Arc<JsProtocolIo>,
|
||||
sync_engine: Arc<Mutex<Option<DatabaseSyncEngine<JsProtocolIo>>>>,
|
||||
sync_engine: Arc<RwLock<Option<DatabaseSyncEngine<JsProtocolIo, Env>>>>,
|
||||
opened: Arc<Mutex<Option<turso_node::Database>>>,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub enum DatabaseChangeTypeJs {
|
||||
Insert,
|
||||
Update,
|
||||
Delete,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub enum SyncEngineProtocolVersion {
|
||||
Legacy,
|
||||
V1,
|
||||
}
|
||||
|
||||
fn core_change_type_to_js(value: DatabaseChangeType) -> DatabaseChangeTypeJs {
|
||||
match value {
|
||||
DatabaseChangeType::Delete => DatabaseChangeTypeJs::Delete,
|
||||
DatabaseChangeType::Update => DatabaseChangeTypeJs::Update,
|
||||
DatabaseChangeType::Insert => DatabaseChangeTypeJs::Insert,
|
||||
}
|
||||
}
|
||||
fn js_value_to_core(value: Either5<Null, i64, f64, String, Vec<u8>>) -> turso_core::Value {
|
||||
match value {
|
||||
Either5::A(_) => turso_core::Value::Null,
|
||||
Either5::B(value) => turso_core::Value::Integer(value),
|
||||
Either5::C(value) => turso_core::Value::Float(value),
|
||||
Either5::D(value) => turso_core::Value::Text(turso_core::types::Text::new(&value)),
|
||||
Either5::E(value) => turso_core::Value::Blob(value),
|
||||
}
|
||||
}
|
||||
fn core_value_to_js(value: turso_core::Value) -> Either5<Null, i64, f64, String, Vec<u8>> {
|
||||
match value {
|
||||
turso_core::Value::Null => Either5::<Null, i64, f64, String, Vec<u8>>::A(Null),
|
||||
turso_core::Value::Integer(value) => Either5::<Null, i64, f64, String, Vec<u8>>::B(value),
|
||||
turso_core::Value::Float(value) => Either5::<Null, i64, f64, String, Vec<u8>>::C(value),
|
||||
turso_core::Value::Text(value) => {
|
||||
Either5::<Null, i64, f64, String, Vec<u8>>::D(value.as_str().to_string())
|
||||
}
|
||||
turso_core::Value::Blob(value) => Either5::<Null, i64, f64, String, Vec<u8>>::E(value),
|
||||
}
|
||||
}
|
||||
fn core_values_map_to_js(
|
||||
value: HashMap<String, turso_core::Value>,
|
||||
) -> HashMap<String, Either5<Null, i64, f64, String, Vec<u8>>> {
|
||||
let mut result = HashMap::new();
|
||||
for (key, value) in value {
|
||||
result.insert(key, core_value_to_js(value));
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct DatabaseRowMutationJs {
|
||||
pub change_time: i64,
|
||||
pub table_name: String,
|
||||
pub id: i64,
|
||||
pub change_type: DatabaseChangeTypeJs,
|
||||
pub before: Option<HashMap<String, Either5<Null, i64, f64, String, Vec<u8>>>>,
|
||||
pub after: Option<HashMap<String, Either5<Null, i64, f64, String, Vec<u8>>>>,
|
||||
pub updates: Option<HashMap<String, Either5<Null, i64, f64, String, Vec<u8>>>>,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseRowStatementJs {
|
||||
pub sql: String,
|
||||
pub values: Vec<Either5<Null, i64, f64, String, Vec<u8>>>,
|
||||
}
|
||||
|
||||
#[napi(object, object_to_js = false)]
|
||||
pub struct SyncEngineOpts {
|
||||
pub path: String,
|
||||
pub client_name: Option<String>,
|
||||
pub wal_pull_batch_size: Option<u32>,
|
||||
pub enable_tracing: Option<String>,
|
||||
pub tables_ignore: Option<Vec<String>>,
|
||||
pub transform: Option<Function<'static, DatabaseRowMutationJs, Option<DatabaseRowStatementJs>>>,
|
||||
pub protocol_version: Option<SyncEngineProtocolVersion>,
|
||||
}
|
||||
|
||||
static TRACING_INIT: OnceLock<()> = OnceLock::new();
|
||||
@@ -81,19 +165,67 @@ impl SyncEngine {
|
||||
path: opts.path,
|
||||
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
|
||||
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
|
||||
sync_engine: Arc::new(Mutex::new(None)),
|
||||
tables_ignore: opts.tables_ignore.unwrap_or_default(),
|
||||
transform: opts.transform.map(|x| x.create_ref().unwrap()),
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
sync_engine: Arc::new(RwLock::new(None)),
|
||||
io,
|
||||
protocol: Arc::new(JsProtocolIo::default()),
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
opened: Arc::new(Mutex::new(None)),
|
||||
protocol_version: match opts.protocol_version {
|
||||
Some(SyncEngineProtocolVersion::Legacy) | None => {
|
||||
DatabaseSyncEngineProtocolVersion::Legacy
|
||||
}
|
||||
_ => DatabaseSyncEngineProtocolVersion::V1,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn init(&self) -> GeneratorHolder {
|
||||
pub fn init(&mut self, env: Env) -> GeneratorHolder {
|
||||
#[allow(clippy::type_complexity)]
|
||||
let transform: Option<
|
||||
Arc<
|
||||
dyn Fn(
|
||||
&Env,
|
||||
DatabaseRowMutation,
|
||||
)
|
||||
-> turso_sync_engine::Result<Option<DatabaseRowStatement>>
|
||||
+ 'static,
|
||||
>,
|
||||
> = match self.transform.take() {
|
||||
Some(f) => Some(Arc::new(move |env, mutation| {
|
||||
let result = f
|
||||
.borrow_back(env)
|
||||
.unwrap()
|
||||
.call(DatabaseRowMutationJs {
|
||||
change_time: mutation.change_time as i64,
|
||||
table_name: mutation.table_name,
|
||||
id: mutation.id,
|
||||
change_type: core_change_type_to_js(mutation.change_type),
|
||||
before: mutation.before.map(core_values_map_to_js),
|
||||
after: mutation.after.map(core_values_map_to_js),
|
||||
updates: mutation.updates.map(core_values_map_to_js),
|
||||
})
|
||||
.map_err(|e| {
|
||||
turso_sync_engine::errors::Error::DatabaseSyncEngineError(format!(
|
||||
"transform callback failed: {e}"
|
||||
))
|
||||
})?;
|
||||
Ok(result.map(|statement| DatabaseRowStatement {
|
||||
sql: statement.sql,
|
||||
values: statement.values.into_iter().map(js_value_to_core).collect(),
|
||||
}))
|
||||
})),
|
||||
None => None,
|
||||
};
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: self.client_name.clone(),
|
||||
wal_pull_batch_size: self.wal_pull_batch_size as u64,
|
||||
tables_ignore: self.tables_ignore.clone(),
|
||||
transform,
|
||||
protocol_version_hint: self.protocol_version,
|
||||
};
|
||||
|
||||
let protocol = self.protocol.clone();
|
||||
@@ -102,17 +234,19 @@ impl SyncEngine {
|
||||
let opened = self.opened.clone();
|
||||
let path = self.path.clone();
|
||||
let generator = genawaiter::sync::Gen::new(|coro| async move {
|
||||
let coro = Coro::new(env, coro);
|
||||
let initialized =
|
||||
DatabaseSyncEngine::new(&coro, io.clone(), protocol, &path, opts).await?;
|
||||
let connection = initialized.connect(&coro).await?;
|
||||
let connection = initialized.connect_rw(&coro).await?;
|
||||
let db = turso_node::Database::create(None, io.clone(), connection, false);
|
||||
|
||||
*sync_engine.lock().unwrap() = Some(initialized);
|
||||
*sync_engine.write().unwrap() = Some(initialized);
|
||||
*opened.lock().unwrap() = Some(db);
|
||||
Ok(())
|
||||
});
|
||||
GeneratorHolder {
|
||||
inner: Box::new(Mutex::new(generator)),
|
||||
response: Arc::new(Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,18 +271,63 @@ impl SyncEngine {
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn sync(&self) -> GeneratorHolder {
|
||||
self.run(async move |coro, sync_engine| sync_engine.sync(coro).await)
|
||||
pub fn sync(&self, env: Env) -> GeneratorHolder {
|
||||
self.run(env, async move |coro, sync_engine| {
|
||||
let mut sync_engine = try_write(sync_engine)?;
|
||||
let sync_engine = try_unwrap_mut(&mut sync_engine)?;
|
||||
sync_engine.sync(coro).await?;
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn push(&self) -> GeneratorHolder {
|
||||
self.run(async move |coro, sync_engine| sync_engine.push(coro).await)
|
||||
pub fn push(&self, env: Env) -> GeneratorHolder {
|
||||
self.run(env, async move |coro, sync_engine| {
|
||||
let sync_engine = try_read(sync_engine)?;
|
||||
let sync_engine = try_unwrap(&sync_engine)?;
|
||||
sync_engine.push_changes_to_remote(coro).await?;
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn pull(&self) -> GeneratorHolder {
|
||||
self.run(async move |coro, sync_engine| sync_engine.pull(coro).await)
|
||||
pub fn stats(&self, env: Env) -> GeneratorHolder {
|
||||
self.run(env, async move |coro, sync_engine| {
|
||||
let sync_engine = try_read(sync_engine)?;
|
||||
let sync_engine = try_unwrap(&sync_engine)?;
|
||||
let changes = sync_engine.stats(coro).await?;
|
||||
Ok(Some(GeneratorResponse::SyncEngineStats {
|
||||
operations: changes.cdc_operations,
|
||||
wal: changes.wal_size,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn pull(&self, env: Env) -> GeneratorHolder {
|
||||
self.run(env, async move |coro, sync_engine| {
|
||||
let changes = {
|
||||
let sync_engine = try_read(sync_engine)?;
|
||||
let sync_engine = try_unwrap(&sync_engine)?;
|
||||
sync_engine.wait_changes_from_remote(coro).await?
|
||||
};
|
||||
if let Some(changes) = changes {
|
||||
let mut sync_engine = try_write(sync_engine)?;
|
||||
let sync_engine = try_unwrap_mut(&mut sync_engine)?;
|
||||
sync_engine.apply_changes_from_remote(coro, changes).await?;
|
||||
}
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn checkpoint(&self, env: Env) -> GeneratorHolder {
|
||||
self.run(env, async move |coro, sync_engine| {
|
||||
let mut sync_engine = try_write(sync_engine)?;
|
||||
let sync_engine = try_unwrap_mut(&mut sync_engine)?;
|
||||
sync_engine.checkpoint(coro).await?;
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
@@ -165,32 +344,76 @@ impl SyncEngine {
|
||||
|
||||
fn run(
|
||||
&self,
|
||||
env: Env,
|
||||
f: impl AsyncFnOnce(
|
||||
&Coro,
|
||||
&mut DatabaseSyncEngine<JsProtocolIo>,
|
||||
) -> turso_sync_engine::Result<()>
|
||||
&Coro<Env>,
|
||||
&Arc<RwLock<Option<DatabaseSyncEngine<JsProtocolIo, Env>>>>,
|
||||
) -> turso_sync_engine::Result<Option<GeneratorResponse>>
|
||||
+ 'static,
|
||||
) -> GeneratorHolder {
|
||||
let response = Arc::new(Mutex::new(None));
|
||||
let sync_engine = self.sync_engine.clone();
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
let generator = genawaiter::sync::Gen::new(|coro| async move {
|
||||
let Ok(mut sync_engine) = sync_engine.try_lock() else {
|
||||
let nasty_error = "sync_engine is busy".to_string();
|
||||
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
nasty_error,
|
||||
));
|
||||
};
|
||||
let Some(sync_engine) = sync_engine.as_mut() else {
|
||||
let error = "sync_engine must be initialized".to_string();
|
||||
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
error,
|
||||
));
|
||||
};
|
||||
f(&coro, sync_engine).await?;
|
||||
Ok(())
|
||||
let generator = genawaiter::sync::Gen::new({
|
||||
let response = response.clone();
|
||||
|coro| async move {
|
||||
let coro = Coro::new(env, coro);
|
||||
*response.lock().unwrap() = f(&coro, &sync_engine).await?;
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
GeneratorHolder {
|
||||
inner: Box::new(Mutex::new(generator)),
|
||||
response,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_read(
|
||||
sync_engine: &RwLock<Option<DatabaseSyncEngine<JsProtocolIo, Env>>>,
|
||||
) -> turso_sync_engine::Result<RwLockReadGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo, Env>>>> {
|
||||
let Ok(sync_engine) = sync_engine.try_read() else {
|
||||
let nasty_error = "sync_engine is busy".to_string();
|
||||
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
nasty_error,
|
||||
));
|
||||
};
|
||||
Ok(sync_engine)
|
||||
}
|
||||
|
||||
fn try_write(
|
||||
sync_engine: &RwLock<Option<DatabaseSyncEngine<JsProtocolIo, Env>>>,
|
||||
) -> turso_sync_engine::Result<RwLockWriteGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo, Env>>>>
|
||||
{
|
||||
let Ok(sync_engine) = sync_engine.try_write() else {
|
||||
let nasty_error = "sync_engine is busy".to_string();
|
||||
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
nasty_error,
|
||||
));
|
||||
};
|
||||
Ok(sync_engine)
|
||||
}
|
||||
|
||||
fn try_unwrap<'a>(
|
||||
sync_engine: &'a RwLockReadGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo, Env>>>,
|
||||
) -> turso_sync_engine::Result<&'a DatabaseSyncEngine<JsProtocolIo, Env>> {
|
||||
let Some(sync_engine) = sync_engine.as_ref() else {
|
||||
let error = "sync_engine must be initialized".to_string();
|
||||
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
error,
|
||||
));
|
||||
};
|
||||
Ok(sync_engine)
|
||||
}
|
||||
|
||||
fn try_unwrap_mut<'a>(
|
||||
sync_engine: &'a mut RwLockWriteGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo, Env>>>,
|
||||
) -> turso_sync_engine::Result<&'a mut DatabaseSyncEngine<JsProtocolIo, Env>> {
|
||||
let Some(sync_engine) = sync_engine.as_mut() else {
|
||||
let error = "sync_engine must be initialized".to_string();
|
||||
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
|
||||
error,
|
||||
));
|
||||
};
|
||||
Ok(sync_engine)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"use strict";
|
||||
|
||||
import { SyncEngine } from '#entry-point';
|
||||
import { SyncEngine, DatabaseRowMutationJs, DatabaseRowStatementJs } from '#entry-point';
|
||||
import { Database } from '@tursodatabase/database';
|
||||
|
||||
const GENERATOR_RESUME_IO = 0;
|
||||
@@ -63,9 +63,16 @@ async function process(opts, request) {
|
||||
const completion = request.completion();
|
||||
if (requestType.type == 'Http') {
|
||||
try {
|
||||
let headers = opts.headers;
|
||||
if (requestType.headers != null && requestType.headers.length > 0) {
|
||||
headers = { ...opts.headers };
|
||||
for (let header of requestType.headers) {
|
||||
headers[header[0]] = header[1];
|
||||
}
|
||||
}
|
||||
const response = await fetch(`${opts.url}${requestType.path}`, {
|
||||
method: requestType.method,
|
||||
headers: opts.headers,
|
||||
headers: headers,
|
||||
body: requestType.body != null ? new Uint8Array(requestType.body) : null,
|
||||
});
|
||||
completion.status(response.status);
|
||||
@@ -101,7 +108,7 @@ async function process(opts, request) {
|
||||
}
|
||||
}
|
||||
|
||||
async function run(opts, engine, generator) {
|
||||
async function run(opts, engine, generator): Promise<any> {
|
||||
let tasks = [];
|
||||
while (generator.resume(null) !== GENERATOR_RESUME_DONE) {
|
||||
for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) {
|
||||
@@ -113,6 +120,7 @@ async function run(opts, engine, generator) {
|
||||
|
||||
tasks = tasks.filter(t => !t.finished);
|
||||
}
|
||||
return generator.take();
|
||||
}
|
||||
|
||||
interface ConnectOpts {
|
||||
@@ -121,16 +129,27 @@ interface ConnectOpts {
|
||||
url: string;
|
||||
authToken?: string;
|
||||
encryptionKey?: string;
|
||||
tablesIgnore?: string[],
|
||||
transform?: (arg: DatabaseRowMutationJs) => DatabaseRowStatementJs | null,
|
||||
enableTracing?: string,
|
||||
}
|
||||
|
||||
interface Sync {
|
||||
sync(): Promise<void>;
|
||||
push(): Promise<void>;
|
||||
pull(): Promise<void>;
|
||||
checkpoint(): Promise<void>;
|
||||
stats(): Promise<{ operations: number, wal: number }>;
|
||||
}
|
||||
|
||||
export async function connect(opts: ConnectOpts): Database & Sync {
|
||||
const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName });
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
tablesIgnore: opts.tablesIgnore,
|
||||
transform: opts.transform,
|
||||
enableTracing: opts.enableTracing
|
||||
});
|
||||
const httpOpts = {
|
||||
url: opts.url,
|
||||
headers: {
|
||||
@@ -147,5 +166,9 @@ export async function connect(opts: ConnectOpts): Database & Sync {
|
||||
db.sync = async function () { await run(httpOpts, engine, engine.sync()); }
|
||||
db.pull = async function () { await run(httpOpts, engine, engine.pull()); }
|
||||
db.push = async function () { await run(httpOpts, engine, engine.push()); }
|
||||
db.checkpoint = async function () { await run(httpOpts, engine, engine.checkpoint()); }
|
||||
db.stats = async function () { return (await run(httpOpts, engine, engine.stats())); }
|
||||
return db;
|
||||
}
|
||||
|
||||
export { Database, Sync };
|
||||
|
||||
@@ -64,3 +64,5 @@ export const JsDataPollResult = __napiModule.exports.JsDataPollResult
|
||||
export const JsProtocolIo = __napiModule.exports.JsProtocolIo
|
||||
export const JsProtocolRequestData = __napiModule.exports.JsProtocolRequestData
|
||||
export const SyncEngine = __napiModule.exports.SyncEngine
|
||||
export const DatabaseChangeTypeJs = __napiModule.exports.DatabaseChangeTypeJs
|
||||
export const SyncEngineProtocolVersion = __napiModule.exports.SyncEngineProtocolVersion
|
||||
|
||||
@@ -116,3 +116,5 @@ module.exports.JsDataPollResult = __napiModule.exports.JsDataPollResult
|
||||
module.exports.JsProtocolIo = __napiModule.exports.JsProtocolIo
|
||||
module.exports.JsProtocolRequestData = __napiModule.exports.JsProtocolRequestData
|
||||
module.exports.SyncEngine = __napiModule.exports.SyncEngine
|
||||
module.exports.DatabaseChangeTypeJs = __napiModule.exports.DatabaseChangeTypeJs
|
||||
module.exports.SyncEngineProtocolVersion = __napiModule.exports.SyncEngineProtocolVersion
|
||||
|
||||
Reference in New Issue
Block a user