mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-03 00:14:21 +01:00
Merge 'sync-engine: avoid unnecessary WAL push' from Nikita Sivukhin
This PR ignores changes from `TURSO_SYNC_TABLE_NAME` meta table in order to not generate unnecessary push commands when nothing actually changed on the client side. Closes #2597
This commit is contained in:
@@ -45,7 +45,7 @@ async fn update_meta<IO: ProtocolIO>(
|
||||
) -> Result<()> {
|
||||
let mut meta = orig.as_ref().unwrap().clone();
|
||||
update(&mut meta);
|
||||
tracing::debug!("update_meta: {meta:?}");
|
||||
tracing::info!("update_meta: {meta:?}");
|
||||
let completion = io.full_write(meta_path, meta.dump()?)?;
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
wait_full_body(coro, &completion).await?;
|
||||
@@ -60,7 +60,7 @@ async fn set_meta<IO: ProtocolIO>(
|
||||
orig: &mut Option<DatabaseMetadata>,
|
||||
meta: DatabaseMetadata,
|
||||
) -> Result<()> {
|
||||
tracing::debug!("set_meta: {meta:?}");
|
||||
tracing::info!("set_meta: {meta:?}");
|
||||
let completion = io.full_write(meta_path, meta.dump()?)?;
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
wait_full_body(coro, &completion).await?;
|
||||
@@ -103,7 +103,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
/// This method will **not** send local changed to the remote
|
||||
/// This method will block writes for the period of pull
|
||||
pub async fn pull(&mut self, coro: &Coro) -> Result<()> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"pull: draft={}, synced={}",
|
||||
self.draft_path,
|
||||
self.synced_path
|
||||
@@ -120,7 +120,6 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
// we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote
|
||||
// so, we pass 'capture: true' as we need to preserve all changes for future push of WAL
|
||||
let synced = self.io.open_tape(&self.synced_path, true)?;
|
||||
tracing::info!("opened synced");
|
||||
|
||||
// we will start wal write session for Draft DB in order to hold write lock during transfer of changes
|
||||
let mut draft_session = WalSession::new(connect(coro, &self.draft_tape).await?);
|
||||
@@ -168,7 +167,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
let WalPullResult::NeedCheckpoint = pull_result else {
|
||||
break;
|
||||
};
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"ready to checkpoint synced db file at {:?}, generation={}",
|
||||
self.synced_path,
|
||||
self.meta().synced_generation
|
||||
@@ -198,7 +197,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
/// This method will **not** pull remote changes to the local DB
|
||||
/// This method will **not** block writes for the period of sync
|
||||
pub async fn push(&mut self, coro: &Coro) -> Result<()> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"push: draft={}, synced={}",
|
||||
self.draft_path,
|
||||
self.synced_path
|
||||
@@ -221,6 +220,8 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?;
|
||||
|
||||
self.push_synced_to_remote(coro).await?;
|
||||
|
||||
self.reset_synced_if_dirty(coro).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -235,7 +236,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
}
|
||||
|
||||
async fn init(&mut self, coro: &Coro) -> Result<()> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"initialize sync engine: draft={}, synced={}, opts={:?}",
|
||||
self.draft_path,
|
||||
self.synced_path,
|
||||
@@ -256,7 +257,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
}
|
||||
None => {
|
||||
let meta = self.bootstrap_db_files(coro).await?;
|
||||
tracing::debug!("write meta after successful bootstrap: meta={meta:?}");
|
||||
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
|
||||
set_meta(
|
||||
coro,
|
||||
self.protocol.as_ref(),
|
||||
@@ -283,7 +284,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
}
|
||||
|
||||
async fn pull_synced_from_remote(&mut self, coro: &Coro) -> Result<WalPullResult> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"pull_synced_from_remote: draft={:?}, synced={:?}",
|
||||
self.draft_path,
|
||||
self.synced_path,
|
||||
@@ -328,7 +329,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
}
|
||||
|
||||
async fn push_synced_to_remote(&mut self, coro: &Coro) -> Result<()> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"push_synced_to_remote: draft={}, synced={}, id={}",
|
||||
self.draft_path,
|
||||
self.synced_path,
|
||||
@@ -379,7 +380,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
self.meta.is_none(),
|
||||
"bootstrap_db_files must be called only when meta is not set"
|
||||
);
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"bootstrap_db_files: draft={}, synced={}",
|
||||
self.draft_path,
|
||||
self.synced_path,
|
||||
@@ -408,7 +409,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
let db_info = db_bootstrap(coro, self.protocol.as_ref(), files).await?;
|
||||
|
||||
let elapsed = std::time::Instant::now().duration_since(start_time);
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"bootstrap_db_files: finished draft={:?}, synced={:?}: elapsed={:?}",
|
||||
self.draft_path,
|
||||
self.synced_path,
|
||||
@@ -426,7 +427,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
|
||||
|
||||
/// Reset WAL of Synced database which potentially can have some local changes
|
||||
async fn reset_synced_if_dirty(&mut self, coro: &Coro) -> Result<()> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"reset_synced: synced_path={:?}, synced_is_dirty={}",
|
||||
self.synced_path,
|
||||
self.synced_is_dirty
|
||||
@@ -457,7 +458,7 @@ pub mod tests {
|
||||
use crate::{
|
||||
database_sync_engine::DatabaseSyncEngineOpts,
|
||||
errors::Error,
|
||||
test_context::{FaultInjectionStrategy, TestContext},
|
||||
test_context::{FaultInjectionPlan, FaultInjectionStrategy, TestContext},
|
||||
test_protocol_io::TestProtocolIo,
|
||||
test_sync_server::convert_rows,
|
||||
tests::{deterministic_runtime, seed_u64, TestRunner},
|
||||
@@ -592,6 +593,105 @@ pub mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_no_changes_no_push() {
|
||||
deterministic_runtime(async || {
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let server_path = dir.path().join("server.db");
|
||||
let ctx = Arc::new(TestContext::new(seed_u64()));
|
||||
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
|
||||
let local_path = dir.path().join("local.db").to_str().unwrap().to_string();
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: "id-1".to_string(),
|
||||
wal_pull_batch_size: 1,
|
||||
};
|
||||
runner.init(&local_path, opts).await.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
protocol
|
||||
.server
|
||||
.execute("INSERT INTO t VALUES (1)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let conn = runner.connect().await.unwrap();
|
||||
|
||||
runner.sync().await.unwrap();
|
||||
assert_eq!(
|
||||
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
|
||||
vec![vec![turso::Value::Integer(1)]]
|
||||
);
|
||||
|
||||
conn.execute("INSERT INTO t VALUES (100)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
protocol
|
||||
.server
|
||||
.execute("INSERT INTO t VALUES (2)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
runner.sync().await.unwrap();
|
||||
assert_eq!(
|
||||
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
|
||||
vec![
|
||||
vec![turso::Value::Integer(1)],
|
||||
vec![turso::Value::Integer(2)],
|
||||
vec![turso::Value::Integer(100)],
|
||||
]
|
||||
);
|
||||
|
||||
protocol
|
||||
.server
|
||||
.execute("INSERT INTO t VALUES (3)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
runner.sync().await.unwrap();
|
||||
assert_eq!(
|
||||
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
|
||||
vec![
|
||||
vec![turso::Value::Integer(1)],
|
||||
vec![turso::Value::Integer(2)],
|
||||
vec![turso::Value::Integer(3)],
|
||||
vec![turso::Value::Integer(100)],
|
||||
]
|
||||
);
|
||||
|
||||
ctx.switch_mode(FaultInjectionStrategy::Enabled {
|
||||
plan: FaultInjectionPlan {
|
||||
is_fault: Box::new(|name, _| Box::pin(async move { name == "wal_push_start" })),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
protocol
|
||||
.server
|
||||
.execute("INSERT INTO t VALUES (4)", ())
|
||||
.await
|
||||
.unwrap();
|
||||
runner.sync().await.unwrap();
|
||||
assert_eq!(
|
||||
query_rows(&conn, "SELECT * FROM t").await.unwrap(),
|
||||
vec![
|
||||
vec![turso::Value::Integer(1)],
|
||||
vec![turso::Value::Integer(2)],
|
||||
vec![turso::Value::Integer(3)],
|
||||
vec![turso::Value::Integer(4)],
|
||||
vec![turso::Value::Integer(100)],
|
||||
]
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_sync_single_db_update_sync_concurrent() {
|
||||
deterministic_runtime(async || {
|
||||
@@ -772,7 +872,7 @@ pub mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
runner.pull().await.unwrap();
|
||||
runner.sync().await.unwrap();
|
||||
|
||||
// create connection in outer scope in order to prevent Database from being dropped in between of pull operations
|
||||
let conn = runner.connect().await.unwrap();
|
||||
@@ -791,7 +891,7 @@ pub mod tests {
|
||||
}
|
||||
|
||||
tracing::info!("pull attempt={}", attempt);
|
||||
runner.pull().await.unwrap();
|
||||
runner.sync().await.unwrap();
|
||||
|
||||
let expected = expected
|
||||
.iter()
|
||||
|
||||
@@ -122,7 +122,7 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result<
|
||||
end_frame: u64,
|
||||
mut update: U,
|
||||
) -> Result<WalPullResult> {
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"wal_pull: generation={}, start_frame={}, end_frame={}",
|
||||
generation,
|
||||
start_frame,
|
||||
@@ -209,7 +209,7 @@ pub async fn wal_push<C: ProtocolIO>(
|
||||
end_frame: u64,
|
||||
) -> Result<WalPushResult> {
|
||||
assert!(wal_session.in_txn());
|
||||
tracing::debug!("wal_push: baton={baton:?}, generation={generation}, start_frame={start_frame}, end_frame={end_frame}");
|
||||
tracing::info!("wal_push: baton={baton:?}, generation={generation}, start_frame={start_frame}, end_frame={end_frame}");
|
||||
|
||||
if start_frame == end_frame {
|
||||
return Ok(WalPushResult::Ok { baton: None });
|
||||
@@ -254,7 +254,8 @@ pub async fn wal_push<C: ProtocolIO>(
|
||||
}
|
||||
}
|
||||
|
||||
const TURSO_SYNC_META_TABLE: &str =
|
||||
const TURSO_SYNC_TABLE_NAME: &str = "turso_sync_last_change_id";
|
||||
const TURSO_SYNC_CREATE_TABLE: &str =
|
||||
"CREATE TABLE IF NOT EXISTS turso_sync_last_change_id (client_id TEXT PRIMARY KEY, pull_gen INTEGER, change_id INTEGER)";
|
||||
const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &str =
|
||||
"SELECT pull_gen, change_id FROM turso_sync_last_change_id WHERE client_id = ?";
|
||||
@@ -272,7 +273,7 @@ pub async fn transfer_logical_changes(
|
||||
client_id: &str,
|
||||
bump_pull_gen: bool,
|
||||
) -> Result<()> {
|
||||
tracing::debug!("transfer_logical_changes: client_id={client_id}");
|
||||
tracing::info!("transfer_logical_changes: client_id={client_id}");
|
||||
let source_conn = connect_untracked(source)?;
|
||||
let target_conn = connect_untracked(target)?;
|
||||
|
||||
@@ -293,17 +294,17 @@ pub async fn transfer_logical_changes(
|
||||
Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string())
|
||||
})?,
|
||||
None => {
|
||||
tracing::debug!("transfer_logical_changes: client_id={client_id}, turso_sync_last_change_id table is not found");
|
||||
tracing::info!("transfer_logical_changes: client_id={client_id}, turso_sync_last_change_id table is not found");
|
||||
0
|
||||
}
|
||||
}
|
||||
};
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"transfer_logical_changes: client_id={client_id}, source_pull_gen={source_pull_gen}"
|
||||
);
|
||||
|
||||
// fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure
|
||||
let mut schema_stmt = target_conn.prepare(TURSO_SYNC_META_TABLE)?;
|
||||
let mut schema_stmt = target_conn.prepare(TURSO_SYNC_CREATE_TABLE)?;
|
||||
exec_stmt(coro, &mut schema_stmt).await?;
|
||||
|
||||
let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?;
|
||||
@@ -361,17 +362,19 @@ pub async fn transfer_logical_changes(
|
||||
};
|
||||
let mut rows_changed = 0;
|
||||
let mut changes = source.iterate_changes(iterate_opts)?;
|
||||
let mut updated = false;
|
||||
while let Some(operation) = changes.next(coro).await? {
|
||||
match &operation {
|
||||
DatabaseTapeOperation::RowChange(change) => {
|
||||
rows_changed += 1;
|
||||
assert!(
|
||||
last_change_id.is_none() || last_change_id.unwrap() < change.change_id,
|
||||
"change id must be strictly increasing: last_change_id={:?}, change.change_id={}",
|
||||
last_change_id,
|
||||
change.change_id
|
||||
);
|
||||
if change.table_name == TURSO_SYNC_TABLE_NAME {
|
||||
continue;
|
||||
}
|
||||
rows_changed += 1;
|
||||
// we give user full control over CDC table - so let's not emit assert here for now
|
||||
if last_change_id.is_some() && last_change_id.unwrap() + 1 != change.change_id {
|
||||
tracing::warn!(
|
||||
@@ -381,10 +384,9 @@ pub async fn transfer_logical_changes(
|
||||
);
|
||||
}
|
||||
last_change_id = Some(change.change_id);
|
||||
updated = true;
|
||||
}
|
||||
DatabaseTapeOperation::Commit if updated || bump_pull_gen => {
|
||||
tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
|
||||
DatabaseTapeOperation::Commit if rows_changed > 0 || bump_pull_gen => {
|
||||
tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
|
||||
// update turso_sync_last_change_id table with new value before commit
|
||||
let mut set_last_change_id_stmt =
|
||||
session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?;
|
||||
@@ -393,7 +395,7 @@ pub async fn transfer_logical_changes(
|
||||
} else {
|
||||
(source_pull_gen, last_change_id.unwrap_or(0))
|
||||
};
|
||||
tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}");
|
||||
tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
|
||||
set_last_change_id_stmt
|
||||
.bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen));
|
||||
set_last_change_id_stmt
|
||||
@@ -401,7 +403,6 @@ pub async fn transfer_logical_changes(
|
||||
set_last_change_id_stmt
|
||||
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
|
||||
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
|
||||
|
||||
let session_schema_cookie = session.conn().read_schema_version()?;
|
||||
if session_schema_cookie <= source_schema_cookie {
|
||||
session
|
||||
@@ -414,7 +415,7 @@ pub async fn transfer_logical_changes(
|
||||
session.replay(coro, operation).await?;
|
||||
}
|
||||
|
||||
tracing::debug!("transfer_logical_changes: rows_changed={:?}", rows_changed);
|
||||
tracing::info!("transfer_logical_changes: rows_changed={:?}", rows_changed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -428,7 +429,7 @@ pub async fn transfer_physical_changes(
|
||||
source_sync_watermark: u64,
|
||||
target_wal_match_watermark: u64,
|
||||
) -> Result<u64> {
|
||||
tracing::debug!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}");
|
||||
tracing::info!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}");
|
||||
|
||||
let source_conn = connect(coro, source).await?;
|
||||
let mut source_session = WalSession::new(source_conn.clone());
|
||||
@@ -454,7 +455,7 @@ pub async fn transfer_physical_changes(
|
||||
let mut last_frame_info = None;
|
||||
let mut frame = vec![0u8; WAL_FRAME_SIZE];
|
||||
let mut target_sync_watermark = target_session.frames_count()?;
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"transfer_physical_changes: start={}, end={}",
|
||||
source_wal_match_watermark + 1,
|
||||
source_frames_count
|
||||
@@ -465,7 +466,7 @@ pub async fn transfer_physical_changes(
|
||||
target_session.append_page(frame_info.page_no, &frame[WAL_FRAME_HEADER..])?;
|
||||
if source_frame_no == source_sync_watermark {
|
||||
target_sync_watermark = target_session.frames_count()? + 1; // +1 because page will be actually commited on next iteration
|
||||
tracing::debug!("set target_sync_watermark to {}", target_sync_watermark);
|
||||
tracing::info!("set target_sync_watermark to {}", target_sync_watermark);
|
||||
}
|
||||
last_frame_info = Some(frame_info);
|
||||
}
|
||||
@@ -653,12 +654,16 @@ pub mod tests {
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new(|coro| async move {
|
||||
let conn1 = db1.connect(&coro).await?;
|
||||
conn1.execute("CREATE TABLE t(x, y)")?;
|
||||
conn1.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")?;
|
||||
conn1.execute("CREATE TABLE t(x, y)").unwrap();
|
||||
conn1
|
||||
.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")
|
||||
.unwrap();
|
||||
|
||||
let conn2 = db2.connect(&coro).await?;
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
|
||||
transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?;
|
||||
transfer_logical_changes(&coro, &db1, &db2, "id-1", false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
|
||||
@@ -674,8 +679,10 @@ pub mod tests {
|
||||
]
|
||||
);
|
||||
|
||||
conn1.execute("INSERT INTO t VALUES (7, 8)")?;
|
||||
transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?;
|
||||
conn1.execute("INSERT INTO t VALUES (7, 8)").unwrap();
|
||||
transfer_logical_changes(&coro, &db1, &db2, "id-1", false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
|
||||
|
||||
@@ -167,12 +167,14 @@ impl DatabaseTape {
|
||||
opts: DatabaseReplaySessionOpts,
|
||||
) -> Result<DatabaseReplaySession> {
|
||||
tracing::debug!("opening replay session");
|
||||
let conn = self.connect(coro).await?;
|
||||
conn.execute("BEGIN IMMEDIATE")?;
|
||||
Ok(DatabaseReplaySession {
|
||||
conn: self.connect(coro).await?,
|
||||
conn,
|
||||
cached_delete_stmt: HashMap::new(),
|
||||
cached_insert_stmt: HashMap::new(),
|
||||
cached_update_stmt: HashMap::new(),
|
||||
in_txn: false,
|
||||
in_txn: true,
|
||||
opts,
|
||||
})
|
||||
}
|
||||
@@ -407,7 +409,7 @@ impl DatabaseChangesIterator {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseReplaySessionOpts {
|
||||
pub use_implicit_rowid: bool,
|
||||
}
|
||||
|
||||
@@ -313,6 +313,7 @@ impl TestSyncServer {
|
||||
pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> {
|
||||
let conn = self.db.connect()?;
|
||||
conn.execute(sql, params).await?;
|
||||
tracing::debug!("sync_frames_from_conn after execute");
|
||||
self.sync_frames_from_conn(&conn).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user