Merge branch 'main' into cdc_fail_autoincrement

This commit is contained in:
Pavan Nambi
2025-09-22 21:11:26 +05:30
committed by GitHub
121 changed files with 10978 additions and 2216 deletions

View File

@@ -264,13 +264,12 @@ impl DatabaseReplayGenerator {
let update = self.update_query(coro, table_name, &columns).await?;
Ok(update)
} else {
let columns = [true].repeat(after.len());
let update = self.update_query(coro, table_name, &columns).await?;
Ok(update)
let upsert = self.upsert_query(coro, table_name, after.len()).await?;
Ok(upsert)
}
}
DatabaseTapeRowChangeType::Insert { after } => {
let insert = self.insert_query(coro, table_name, after.len()).await?;
let insert = self.upsert_query(coro, table_name, after.len()).await?;
Ok(insert)
}
}
@@ -320,7 +319,7 @@ impl DatabaseReplayGenerator {
is_ddl_replay: false,
})
}
pub(crate) async fn insert_query<Ctx>(
pub(crate) async fn upsert_query<Ctx>(
&self,
coro: &Coro<Ctx>,
table_name: &str,

View File

@@ -1,5 +1,4 @@
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
@@ -37,6 +36,7 @@ pub struct DatabaseSyncEngineOpts {
pub tables_ignore: Vec<String>,
pub use_transform: bool,
pub wal_pull_batch_size: u64,
pub long_poll_timeout: Option<std::time::Duration>,
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
}
@@ -51,7 +51,7 @@ pub struct DatabaseSyncEngine<P: ProtocolIO> {
meta_path: String,
changes_file: Arc<Mutex<Option<Arc<dyn turso_core::File>>>>,
opts: DatabaseSyncEngineOpts,
meta: RefCell<DatabaseMetadata>,
meta: Mutex<DatabaseMetadata>,
client_unique_id: String,
}
@@ -147,7 +147,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
tracing::info!("initialize database tape connection: path={}", main_db_path);
let main_tape = DatabaseTape::new_with_opts(main_db, tape_opts);
let changes_file = io.open_file(&changes_path, OpenFlags::Create, false)?;
let mut db = Self {
let db = Self {
io,
protocol,
db_file,
@@ -158,7 +158,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
meta_path: format!("{main_db_path}-info"),
changes_file: Arc::new(Mutex::new(Some(changes_file))),
opts,
meta: RefCell::new(meta.clone()),
meta: Mutex::new(meta.clone()),
client_unique_id: meta.client_unique_id.clone(),
};
@@ -176,7 +176,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(db)
}
fn open_revert_db_conn(&mut self) -> Result<Arc<turso_core::Connection>> {
fn open_revert_db_conn(&self) -> Result<Arc<turso_core::Connection>> {
let db = turso_core::Database::open_with_flags_bypass_registry(
self.io.clone(),
&self.main_db_path,
@@ -191,10 +191,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(conn)
}
async fn checkpoint_passive<Ctx>(
&mut self,
coro: &Coro<Ctx>,
) -> Result<(Option<Vec<u32>>, u64)> {
async fn checkpoint_passive<Ctx>(&self, coro: &Coro<Ctx>) -> Result<(Option<Vec<u32>>, u64)> {
let watermark = self.meta().revert_since_wal_watermark;
tracing::info!(
"checkpoint(path={:?}): revert_since_wal_watermark={}",
@@ -273,9 +270,13 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
})
}
pub async fn checkpoint<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
pub async fn checkpoint<Ctx>(&self, coro: &Coro<Ctx>) -> Result<()> {
let (main_wal_salt, watermark) = self.checkpoint_passive(coro).await?;
tracing::info!(
"checkpoint(path={:?}): passive checkpoint is done",
self.main_db_path
);
let main_conn = connect_untracked(&self.main_tape)?;
let revert_conn = self.open_revert_db_conn()?;
@@ -386,6 +387,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
&file.value,
&revision,
self.opts.wal_pull_batch_size,
self.opts.long_poll_timeout,
)
.await?;
@@ -419,10 +421,17 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
/// This method will **not** send local changed to the remote
/// This method will block writes for the period of pull
pub async fn apply_changes_from_remote<Ctx>(
&mut self,
&self,
coro: &Coro<Ctx>,
remote_changes: DbChangesStatus,
) -> Result<()> {
if remote_changes.file_slot.is_none() {
self.update_meta(coro, |m| {
m.last_pull_unix_time = remote_changes.time.secs;
})
.await?;
return Ok(());
}
assert!(remote_changes.file_slot.is_some(), "file_slot must be set");
let changes_file = remote_changes.file_slot.as_ref().unwrap().value.clone();
let pull_result = self.apply_changes_internal(coro, &changes_file).await;
@@ -447,7 +456,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(())
}
async fn apply_changes_internal<Ctx>(
&mut self,
&self,
coro: &Coro<Ctx>,
changes_file: &Arc<dyn turso_core::File>,
) -> Result<u64> {
@@ -652,7 +661,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
/// Sync local changes to remote DB and bring new changes from remote to local
/// This method will block writes for the period of sync
pub async fn sync<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
pub async fn sync<Ctx>(&self, coro: &Coro<Ctx>) -> Result<()> {
// todo(sivukhin): this is bit suboptimal as both 'push' and 'pull' will call pull_synced_from_remote
// but for now - keep it simple
self.push_changes_to_remote(coro).await?;
@@ -660,21 +669,14 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(())
}
pub async fn pull_changes_from_remote<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
pub async fn pull_changes_from_remote<Ctx>(&self, coro: &Coro<Ctx>) -> Result<()> {
let changes = self.wait_changes_from_remote(coro).await?;
if changes.file_slot.is_some() {
self.apply_changes_from_remote(coro, changes).await?;
} else {
self.update_meta(coro, |m| {
m.last_pull_unix_time = changes.time.secs;
})
.await?;
}
self.apply_changes_from_remote(coro, changes).await?;
Ok(())
}
fn meta(&self) -> std::cell::Ref<'_, DatabaseMetadata> {
self.meta.borrow()
fn meta(&self) -> std::sync::MutexGuard<'_, DatabaseMetadata> {
self.meta.lock().unwrap()
}
async fn update_meta<Ctx>(
@@ -688,7 +690,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_all_results(coro, &completion).await?;
self.meta.replace(meta);
*self.meta.lock().unwrap() = meta;
Ok(())
}
}

View File

@@ -166,6 +166,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
frames_file: &Arc<dyn turso_core::File>,
revision: &DatabasePullRevision,
wal_pull_batch_size: u64,
long_poll_timeout: Option<std::time::Duration>,
) -> Result<DatabasePullRevision> {
// truncate file before pulling new data
let c = Completion::new_trunc(move |result| {
@@ -195,7 +196,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
.await
}
DatabasePullRevision::V1 { revision } => {
wal_pull_to_file_v1(coro, client, frames_file, revision).await
wal_pull_to_file_v1(coro, client, frames_file, revision, long_poll_timeout).await
}
}
}
@@ -206,6 +207,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
client: &C,
frames_file: &Arc<dyn turso_core::File>,
revision: &str,
long_poll_timeout: Option<std::time::Duration>,
) -> Result<DatabasePullRevision> {
tracing::info!("wal_pull: revision={revision}");
let mut bytes = BytesMut::new();
@@ -214,7 +216,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
encoding: PageUpdatesEncodingReq::Raw as i32,
server_revision: String::new(),
client_revision: revision.to_string(),
long_poll_timeout_ms: 0,
long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0),
server_pages: BytesMut::new().into(),
client_pages: BytesMut::new().into(),
};
@@ -805,12 +807,11 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
}),
DatabaseTapeOperation::RowChange(change) => {
let replay_info = generator.replay_info(coro, &change).await?;
let change_type = (&change.change).into();
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
before,
None,
@@ -827,7 +828,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
DatabaseTapeRowChangeType::Insert { after } => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
None,
@@ -848,7 +849,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
} => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
Some(updates),
@@ -869,7 +870,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
} => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
None,
@@ -1359,7 +1360,7 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
Error::DatabaseSyncEngineError(format!("unable to deserialize protobuf message: {e}"))
})?;
let _ = bytes.split_to(message_length + prefix_length);
tracing::debug!(
tracing::trace!(
"wait_proto_message: elapsed={:?}",
std::time::Instant::now().duration_since(start_time)
);

View File

@@ -10,7 +10,7 @@ use crate::{
database_sync_operations::WAL_FRAME_HEADER,
errors::Error,
types::{
Coro, DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange,
Coro, DatabaseChange, DatabaseChangeType, DatabaseTapeOperation, DatabaseTapeRowChange,
DatabaseTapeRowChangeType, ProtocolCommand,
},
wal_session::WalSession,
@@ -584,7 +584,7 @@ impl DatabaseReplaySession {
cached.stmt.reset();
let values = self.generator.replay_values(
&cached.info,
change_type,
DatabaseChangeType::Delete,
change.id,
before,
None,
@@ -600,7 +600,7 @@ impl DatabaseReplaySession {
cached.stmt.reset();
let values = self.generator.replay_values(
&cached.info,
change_type,
DatabaseChangeType::Insert,
change.id,
after,
None,
@@ -643,7 +643,7 @@ impl DatabaseReplaySession {
table,
columns
);
let info = self.generator.insert_query(coro, table, columns).await?;
let info = self.generator.upsert_query(coro, table, columns).await?;
let stmt = self.conn.prepare(&info.query)?;
self.cached_insert_stmt
.insert(key.clone(), CachedStmt { stmt, info });