diff --git a/Cargo.lock b/Cargo.lock index 019b8dfbd..014ed79e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4360,7 +4360,7 @@ dependencies = [ [[package]] name = "turso_sync_engine" -version = "0.1.4-pre.1" +version = "0.1.4-pre.2" dependencies = [ "bytes", "ctor", diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 077c8e7ca..fdb879ab9 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -43,7 +43,7 @@ async fn update_meta( let mut meta = orig.as_ref().unwrap().clone(); update(&mut meta); tracing::debug!("update_meta: {meta:?}"); - let completion = io.full_write(&meta_path, meta.dump()?)?; + let completion = io.full_write(meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated wait_full_body(coro, &completion).await?; *orig = Some(meta); @@ -58,7 +58,7 @@ async fn set_meta( meta: DatabaseMetadata, ) -> Result<()> { tracing::debug!("set_meta: {meta:?}"); - let completion = io.full_write(&meta_path, meta.dump()?)?; + let completion = io.full_write(meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated wait_full_body(coro, &completion).await?; *orig = Some(meta); diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 1e81f0dc7..eff3d8826 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -47,15 +47,16 @@ pub async fn db_bootstrap( let chunk = chunk.data(); let content_len = chunk.len(); // todo(sivukhin): optimize allocations here + #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::allocate(chunk.len(), Rc::new(|_| {}))); - buffer.as_mut_slice().copy_from_slice(&chunk); + buffer.as_mut_slice().copy_from_slice(chunk); let mut completions = Vec::with_capacity(dbs.len()); - for i in 0..dbs.len() { + for db in dbs { let c = Completion::new_write(move |size| { // todo(sivukhin): we need to error out in case of partial read assert!(size as usize == content_len); }); - completions.push(dbs[i].pwrite(pos, buffer.clone(), c)?); + completions.push(db.pwrite(pos, buffer.clone(), c)?); } while !completions.iter().all(|x| x.is_completed()) { coro.yield_(ProtocolCommand::IO).await?; @@ -184,7 +185,7 @@ pub async fn wal_push( let mut frames_data = Vec::with_capacity((end_frame - start_frame) as usize * WAL_FRAME_SIZE); let mut buffer = [0u8; WAL_FRAME_SIZE]; for frame_no in start_frame..end_frame { - let frame_info = wal_session.read_at(frame_no as u64, &mut buffer)?; + let frame_info = wal_session.read_at(frame_no, &mut buffer)?; tracing::trace!( "wal_push: collect frame {} ({:?}) for push", frame_no, @@ -216,13 +217,13 @@ pub async fn wal_push( Ok(status.baton) } -const TURSO_SYNC_META_TABLE: &'static str = +const TURSO_SYNC_META_TABLE: &str = "CREATE TABLE IF NOT EXISTS turso_sync_last_change_id (client_id TEXT PRIMARY KEY, pull_gen INTEGER, change_id INTEGER)"; -const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &'static str = +const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &str = "SELECT pull_gen, change_id FROM turso_sync_last_change_id WHERE client_id = ?"; -const TURSO_SYNC_INSERT_LAST_CHANGE_ID: &'static str = +const TURSO_SYNC_INSERT_LAST_CHANGE_ID: &str = "INSERT INTO turso_sync_last_change_id(client_id, pull_gen, change_id) VALUES (?, 0, 0)"; -const TURSO_SYNC_UPDATE_LAST_CHANGE_ID: &'static str = +const TURSO_SYNC_UPDATE_LAST_CHANGE_ID: &str = "UPDATE turso_sync_last_change_id SET pull_gen = ?, change_id = ? WHERE client_id = ?"; /// Transfers row changes from source DB to target DB @@ -252,7 +253,7 @@ pub async fn transfer_logical_changes( match run_stmt(coro, &mut select_last_change_id_stmt).await? { Some(row) => row.get_value(0).as_int().ok_or_else(|| { - Error::DatabaseSyncEngineError(format!("unexpected source pull_gen type")) + Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string()) })?, None => { tracing::debug!("transfer_logical_changes: client_id={client_id}, turso_sync_last_change_id table is not found"); @@ -274,10 +275,10 @@ pub async fn transfer_logical_changes( let mut last_change_id = match run_stmt(coro, &mut select_last_change_id_stmt).await? { Some(row) => { let target_pull_gen = row.get_value(0).as_int().ok_or_else(|| { - Error::DatabaseSyncEngineError(format!("unexpected target pull_gen type")) + Error::DatabaseSyncEngineError("unexpected target pull_gen type".to_string()) })?; let target_change_id = row.get_value(1).as_int().ok_or_else(|| { - Error::DatabaseSyncEngineError(format!("unexpected target change_id type")) + Error::DatabaseSyncEngineError("unexpected target change_id type".to_string()) })?; tracing::debug!( "transfer_logical_changes: client_id={client_id}, target_pull_gen={target_pull_gen}, target_change_id={target_change_id}" @@ -396,7 +397,7 @@ pub async fn transfer_physical_changes( let target_sync_watermark = { let mut target_session = DatabaseWalSession::new(coro, target_session).await?; - let _ = target_session.rollback_changes_after(target_wal_match_watermark)?; + target_session.rollback_changes_after(target_wal_match_watermark)?; let mut last_frame_info = None; let mut frame = vec![0u8; WAL_FRAME_SIZE]; let mut target_sync_watermark = target_session.frames_count()?; @@ -505,7 +506,7 @@ async fn wal_push_http( } async fn db_info_http(coro: &Coro, client: &C) -> Result { - let completion = client.http(http::Method::GET, format!("/info"), None)?; + let completion = client.http(http::Method::GET, "/info".to_string(), None)?; let status = wait_status(coro, &completion).await?; let status_body = wait_full_body(coro, &completion).await?; if status != http::StatusCode::OK { diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index 9c83ea6a8..a2bbddaca 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -60,10 +60,7 @@ pub(crate) async fn run_stmt<'a>( } } -pub(crate) async fn exec_stmt<'a>( - coro: &'_ Coro, - stmt: &'a mut turso_core::Statement, -) -> Result<()> { +pub(crate) async fn exec_stmt(coro: &Coro, stmt: &mut turso_core::Statement) -> Result<()> { loop { match stmt.step()? { StepResult::IO => { @@ -112,7 +109,7 @@ impl DatabaseTape { let connection = self.inner.connect()?; tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection"); let mut stmt = connection.prepare(&self.pragma_query)?; - run_stmt(&coro, &mut stmt).await?; + run_stmt(coro, &mut stmt).await?; Ok(connection) } /// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table @@ -138,7 +135,7 @@ impl DatabaseTape { let conn = self.connect(coro).await?; let mut wal_session = WalSession::new(conn); wal_session.begin()?; - Ok(DatabaseWalSession::new(coro, wal_session).await?) + DatabaseWalSession::new(coro, wal_session).await } /// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes] @@ -149,7 +146,7 @@ impl DatabaseTape { ) -> Result { tracing::debug!("opening replay session"); Ok(DatabaseReplaySession { - conn: self.connect(&coro).await?, + conn: self.connect(coro).await?, cached_delete_stmt: HashMap::new(), cached_insert_stmt: HashMap::new(), in_txn: false, @@ -554,11 +551,11 @@ impl DatabaseReplaySession { let placeholders = ["?"].repeat(columns).join(","); format!("INSERT INTO {table_name} VALUES ({placeholders})") } else { - let mut table_info_stmt = self.conn.prepare(&format!( + let mut table_info_stmt = self.conn.prepare(format!( "SELECT name FROM pragma_table_info('{table_name}')" ))?; let mut column_names = Vec::with_capacity(columns + 1); - while let Some(column) = run_stmt(&coro, &mut table_info_stmt).await? { + while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? { let turso_core::Value::Text(text) = column.get_value(0) else { return Err(Error::DatabaseTapeError( "unexpected column type for pragma_table_info query".to_string(), @@ -579,12 +576,12 @@ impl DatabaseReplaySession { let (query, pk_column_indices) = if self.opts.use_implicit_rowid { (format!("DELETE FROM {table_name} WHERE rowid = ?"), None) } else { - let mut pk_info_stmt = self.conn.prepare(&format!( + let mut pk_info_stmt = self.conn.prepare(format!( "SELECT cid, name FROM pragma_table_info('{table_name}') WHERE pk = 1" ))?; let mut pk_predicates = Vec::with_capacity(1); let mut pk_column_indices = Vec::with_capacity(1); - while let Some(column) = run_stmt(&coro, &mut pk_info_stmt).await? { + while let Some(column) = run_stmt(coro, &mut pk_info_stmt).await? { let turso_core::Value::Integer(column_id) = column.get_value(0) else { return Err(Error::DatabaseTapeError( "unexpected column type for pragma_table_info query".to_string(), @@ -624,7 +621,7 @@ fn parse_bin_record(bin_record: Vec) -> Result> { let mut values = Vec::with_capacity(columns); for i in 0..columns { let value = cursor.get_value(&record, i)?; - values.push(value.to_owned().into()); + values.push(value.to_owned()); } Ok(values) } diff --git a/packages/turso-sync-engine/src/io_operations.rs b/packages/turso-sync-engine/src/io_operations.rs index b5187c960..a5eee38f7 100644 --- a/packages/turso-sync-engine/src/io_operations.rs +++ b/packages/turso-sync-engine/src/io_operations.rs @@ -34,9 +34,7 @@ impl IoOperations for Arc { fn try_open(&self, path: &str) -> Result>> { match self.open_file(path, OpenFlags::None, false) { Ok(file) => Ok(Some(file)), - Err(LimboError::IOError(err)) if err.kind() == std::io::ErrorKind::NotFound => { - return Ok(None); - } + Err(LimboError::IOError(err)) if err.kind() == std::io::ErrorKind::NotFound => Ok(None), Err(err) => Err(err.into()), } } diff --git a/packages/turso-sync-engine/src/types.rs b/packages/turso-sync-engine/src/types.rs index 0d00d1c0d..af7cad9ce 100644 --- a/packages/turso-sync-engine/src/types.rs +++ b/packages/turso-sync-engine/src/types.rs @@ -39,7 +39,7 @@ pub struct DatabaseMetadata { impl DatabaseMetadata { pub fn load(data: &[u8]) -> Result { - let meta = serde_json::from_slice::(&data[..])?; + let meta = serde_json::from_slice::(data)?; Ok(meta) } pub fn dump(&self) -> Result> {