Merge 'Sync engine fixes' from Nikita Sivukhin

This PR fixes several small bugs around the sync-engine:
1. WAL pull handled end of the "frame" iterator incorrectly - this is
fixed in the https://github.com/tursodatabase/turso/commit/eff8d8540d1e8
3214459822ac6eeb0d3409ecc24 and test with concurrent DBs were added
2. Using `:memory:` in the sync engine lead to weird behavior because
engine will create different `MemoryIO` instances but turso-core under
the hood will use global registry of databases. I **changed** criteria
for determining in-memory databases by checking the prefix of the path
to be equal to `:memory:` (https://github.com/tursodatabase/turso/commit
/80476b3069f2cd460f368e10b4b2ef51f7608077)
3. Switched from `Buffer` to `Vec<u8>` for now as browser seems to not
support `Buffer` natively: https://github.com/tursodatabase/turso/commit
/2ca8a15dcc8ec79eddfaf7068fab4e1aa3241506
4. Added tracing to the sync engine: https://github.com/tursodatabase/tu
rso/commit/33ef1aa0da7dfe9416025e26e98f9cf9d48c9119

Closes #2582
This commit is contained in:
Pekka Enberg
2025-08-13 18:49:36 +03:00
committed by GitHub
9 changed files with 157 additions and 38 deletions

1
Cargo.lock generated
View File

@@ -4028,6 +4028,7 @@ dependencies = [
"napi",
"napi-build",
"napi-derive",
"tracing-subscriber",
"turso_core",
"turso_node",
"turso_sync_engine",

View File

@@ -255,8 +255,10 @@ impl Database {
enable_indexes: bool,
enable_views: bool,
) -> Result<Arc<Database>> {
if path == ":memory:" {
return Self::do_open_with_flags(
// turso-sync-engine create 2 databases with different names in the same IO if MemoryIO is used
// in this case we need to bypass registry (as this is MemoryIO DB) but also preserve original distinction in names (e.g. :memory:-draft and :memory:-synced)
if path.starts_with(":memory:") {
return Self::open_with_flags_bypass_registry(
io,
path,
db_file,
@@ -277,7 +279,7 @@ impl Database {
if let Some(db) = registry.get(&canonical_path).and_then(Weak::upgrade) {
return Ok(db);
}
let db = Self::do_open_with_flags(
let db = Self::open_with_flags_bypass_registry(
io,
path,
db_file,
@@ -291,7 +293,7 @@ impl Database {
}
#[allow(clippy::arc_with_non_send_sync)]
fn do_open_with_flags(
fn open_with_flags_bypass_registry(
io: Arc<dyn IO>,
path: &str,
db_file: Arc<dyn DatabaseStorage>,
@@ -983,21 +985,8 @@ impl Connection {
input,
)?;
let mut stmt =
Statement::new(program, self._db.mv_store.clone(), pager.clone());
loop {
match stmt.step()? {
vdbe::StepResult::Done => {
break;
}
vdbe::StepResult::IO => stmt.run_once()?,
vdbe::StepResult::Row => {}
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
Statement::new(program, self._db.mv_store.clone(), pager.clone())
.run_ignore_rows()?;
}
_ => unreachable!(),
}
@@ -1118,21 +1107,8 @@ impl Connection {
input,
)?;
let mut stmt =
Statement::new(program, self._db.mv_store.clone(), pager.clone());
loop {
match stmt.step()? {
vdbe::StepResult::Done => {
break;
}
vdbe::StepResult::IO => stmt.run_once()?,
vdbe::StepResult::Row => {}
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
Statement::new(program, self._db.mv_store.clone(), pager.clone())
.run_ignore_rows()?;
}
}
}
@@ -1970,6 +1946,19 @@ impl Statement {
res
}
pub(crate) fn run_ignore_rows(&mut self) -> Result<()> {
loop {
match self.step()? {
vdbe::StepResult::Done => return Ok(()),
vdbe::StepResult::IO => self.run_once()?,
vdbe::StepResult::Row => continue,
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
}
#[instrument(skip_all, level = Level::DEBUG)]
fn reprepare(&mut self) -> Result<()> {
tracing::trace!("repreparing statement");

View File

@@ -456,6 +456,7 @@ pub mod tests {
use crate::{
database_sync_engine::DatabaseSyncEngineOpts,
errors::Error,
test_context::{FaultInjectionStrategy, TestContext},
test_protocol_io::TestProtocolIo,
test_sync_server::convert_rows,
@@ -645,6 +646,107 @@ pub mod tests {
});
}
#[test]
pub fn test_sync_many_dbs_update_sync_concurrent() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::MemoryIO::new());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
protocol
.server
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ())
.await
.unwrap();
protocol
.server
.execute(
"INSERT INTO t VALUES ('id-1', 'client1'), ('id-2', 'client2')",
(),
)
.await
.unwrap();
let mut runner1 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
runner1
.init(
":memory:-1",
DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 2,
},
)
.await
.unwrap();
let mut runner2 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone());
runner2
.init(
":memory:-2",
DatabaseSyncEngineOpts {
client_name: "id-2".to_string(),
wal_pull_batch_size: 2,
},
)
.await
.unwrap();
let conn1 = runner1.connect().await.unwrap();
let conn2 = runner2.connect().await.unwrap();
let syncs1 = async move {
for i in 0..10 {
tracing::info!("sync attempt #{i}");
match runner1.sync().await {
Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue,
Err(err) => panic!("unexpected error: {err}"),
}
}
};
let syncs2 = async move {
for i in 0..10 {
tracing::info!("sync attempt #{i}");
match runner2.sync().await {
Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue,
Err(err) => panic!("unexpected error: {err}"),
}
}
};
let ctx1 = ctx.clone();
let updates1 = async move {
for i in 0..100 {
tracing::info!("update attempt #{i}");
let sql = format!("INSERT INTO t VALUES ('key-1-{i}', 'value')");
match conn1.execute(&sql, ()).await {
Ok(_) => {}
Err(err) if err.to_string().contains("database is locked") => {}
Err(err) => panic!("update failed: {err}"),
}
ctx1.random_sleep_n(10).await;
}
};
let ctx2 = ctx.clone();
let updates2 = async move {
for i in 0..100 {
tracing::info!("update attempt #{i}");
let sql = format!("INSERT INTO t VALUES ('key-2-{i}', 'value')");
match conn2.execute(&sql, ()).await {
Ok(_) => {}
Err(err) if err.to_string().contains("database is locked") => {}
Err(err) => panic!("update failed: {err}"),
}
ctx2.random_sleep_n(10).await;
}
};
join!(updates1, updates2, syncs1, syncs2);
});
}
#[test]
pub fn test_sync_single_db_many_pulls_big_payloads() {
deterministic_runtime(async || {

View File

@@ -179,6 +179,10 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result<
}
coro.yield_(ProtocolCommand::IO).await?;
}
if start_frame < end_frame {
// chunk which was sent from the server has ended early - so there is nothing left on server-side for pull
return Ok(WalPullResult::Done);
}
if !buffer.is_empty() {
return Err(Error::DatabaseSyncEngineError(format!(
"wal_pull: response has unexpected trailing data: buffer.len()={}",

View File

@@ -17,6 +17,7 @@ turso_sync_engine = { workspace = true }
turso_core = { workspace = true }
turso_node = { workspace = true }
genawaiter = { version = "0.99.1", default-features = false }
tracing-subscriber = "0.3.19"
[build-dependencies]
napi-build = "2.2.3"

View File

@@ -160,4 +160,5 @@ export interface SyncEngineOpts {
path: string
clientName?: string
walPullBatchSize?: number
enableTracing?: boolean
}

View File

@@ -14,7 +14,7 @@ pub enum JsProtocolRequest {
Http {
method: String,
path: String,
body: Option<Buffer>,
body: Option<Vec<u8>>,
},
FullRead {
path: String,
@@ -134,7 +134,7 @@ impl ProtocolIO for JsProtocolIo {
Ok(self.add_request(JsProtocolRequest::Http {
method: method.to_string(),
path: path.to_string(),
body: body.map(Buffer::from),
body,
}))
}

View File

@@ -3,10 +3,11 @@
pub mod generator;
pub mod js_protocol_io;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, OnceLock};
use napi::bindgen_prelude::AsyncTask;
use napi_derive::napi;
use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan};
use turso_node::IoLoopTask;
use turso_sync_engine::{
database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts},
@@ -39,12 +40,32 @@ pub struct SyncEngineOpts {
pub path: String,
pub client_name: Option<String>,
pub wal_pull_batch_size: Option<u32>,
pub enable_tracing: Option<String>,
}
static TRACING_INIT: OnceLock<()> = OnceLock::new();
fn init_tracing(level_filter: LevelFilter) {
TRACING_INIT.get_or_init(|| {
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_ids(true)
.with_span_events(FmtSpan::ACTIVE)
.with_max_level(level_filter)
.init();
});
}
#[napi]
impl SyncEngine {
#[napi(constructor)]
pub fn new(opts: SyncEngineOpts) -> napi::Result<Self> {
// helpful for local debugging
match opts.enable_tracing.as_deref() {
Some("info") => init_tracing(LevelFilter::INFO),
Some("debug") => init_tracing(LevelFilter::DEBUG),
Some("trace") => init_tracing(LevelFilter::TRACE),
_ => {}
}
let is_memory = opts.path == ":memory:";
let io: Arc<dyn turso_core::IO> = if is_memory {
Arc::new(turso_core::MemoryIO::new())

View File

@@ -66,7 +66,7 @@ async function process(opts, request) {
const response = await fetch(`${opts.url}${requestType.path}`, {
method: requestType.method,
headers: opts.headers,
body: requestType.body
body: requestType.body != null ? new Uint8Array(requestType.body) : null,
});
completion.status(response.status);
const reader = response.body.getReader();