From 6e124d927efabe1da8e2393f5d6113520289ab62 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 27 Aug 2025 15:51:29 +0400 Subject: [PATCH] fix clippy --- sync/engine/src/database_replay_generator.rs | 11 +++-- sync/engine/src/database_sync_engine.rs | 32 +++++------- sync/engine/src/database_sync_operations.rs | 51 +++++++++++--------- sync/engine/src/database_tape.rs | 6 +-- sync/engine/src/types.rs | 5 +- sync/javascript/src/lib.rs | 11 +++-- 6 files changed, 60 insertions(+), 56 deletions(-) diff --git a/sync/engine/src/database_replay_generator.rs b/sync/engine/src/database_replay_generator.rs index 553a0243a..4aada3af0 100644 --- a/sync/engine/src/database_replay_generator.rs +++ b/sync/engine/src/database_replay_generator.rs @@ -65,14 +65,14 @@ impl DatabaseReplayGenerator { after: Some(self.create_row_full(info, after)), updates: updates .as_ref() - .map(|updates| self.create_row_update(info, &updates)), + .map(|updates| self.create_row_update(info, updates)), }), } } fn create_row_full( &self, info: &ReplayInfo, - values: &Vec, + values: &[turso_core::Value], ) -> HashMap { let mut row = HashMap::with_capacity(info.column_names.len()); for (i, value) in values.iter().enumerate() { @@ -83,7 +83,7 @@ impl DatabaseReplayGenerator { fn create_row_update( &self, info: &ReplayInfo, - updates: &Vec, + updates: &[turso_core::Value], ) -> HashMap { let mut row = HashMap::with_capacity(info.column_names.len()); assert!(updates.len() % 2 == 0); @@ -289,7 +289,7 @@ impl DatabaseReplayGenerator { pk_predicates.push(format!("{} = ?", column_names[idx])); } for (idx, name) in column_names.iter().enumerate() { - if columns[idx as usize] { + if columns[idx] { column_updates.push(format!("{name} = ?")); } } @@ -326,7 +326,8 @@ impl DatabaseReplayGenerator { table_name: &str, columns: usize, ) -> Result { - let (mut column_names, pk_column_indices) = self.table_columns_info(coro, table_name).await?; + let (mut column_names, pk_column_indices) = + self.table_columns_info(coro, table_name).await?; let conflict_clause = if !pk_column_indices.is_empty() { let mut pk_column_names = Vec::new(); for &idx in &pk_column_indices { diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index b15fa876b..9b49d3614 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -23,8 +23,8 @@ use crate::{ io_operations::IoOperations, protocol_io::ProtocolIO, types::{ - Coro, DatabaseMetadata, DatabasePullRevision, DatabaseRowMutation, DatabaseRowStatement, - DatabaseSyncEngineProtocolVersion, DatabaseTapeOperation, DbChangesStatus, SyncEngineStats, + Coro, DatabaseMetadata, DatabasePullRevision, DatabaseSyncEngineProtocolVersion, + DatabaseTapeOperation, DbChangesStatus, SyncEngineStats, Transform, }, wal_session::WalSession, Result, @@ -34,9 +34,7 @@ use crate::{ pub struct DatabaseSyncEngineOpts { pub client_name: String, pub tables_ignore: Vec, - pub transform: Option< - Arc Result> + 'static>, - >, + pub transform: Option>, pub wal_pull_batch_size: u64, pub protocol_version_hint: DatabaseSyncEngineProtocolVersion, } @@ -63,6 +61,7 @@ pub struct DatabaseSyncEngine { meta_path: String, opts: DatabaseSyncEngineOpts, meta: RefCell, + client_unique_id: String, } fn db_size_from_page(page: &[u8]) -> u32 { @@ -103,7 +102,7 @@ impl DatabaseSyncEngine { coro, protocol.as_ref(), &io, - &main_db_path, + main_db_path, opts.protocol_version_hint, ) .await?; @@ -123,7 +122,7 @@ impl DatabaseSyncEngine { } }; - let main_exists = io.try_open(&main_db_path)?.is_some(); + let main_exists = io.try_open(main_db_path)?.is_some(); if !main_exists { let error = "main DB file doesn't exists, but metadata is".to_string(); return Err(Error::DatabaseSyncEngineError(error)); @@ -156,6 +155,7 @@ impl DatabaseSyncEngine { meta_path: format!("{main_db_path}-info"), opts, meta: RefCell::new(meta.clone()), + client_unique_id: meta.client_unique_id.clone(), }; let synced_revision = meta.synced_revision.as_ref().unwrap(); @@ -189,7 +189,7 @@ impl DatabaseSyncEngine { } async fn checkpoint_passive(&mut self, coro: &Coro) -> Result<(Option>, u64)> { - let watermark = self.meta().revert_since_wal_watermark as u64; + let watermark = self.meta().revert_since_wal_watermark; tracing::info!( "checkpoint(path={:?}): revert_since_wal_watermark={}", self.main_db_path, @@ -464,7 +464,7 @@ impl DatabaseSyncEngine { coro, self.protocol.as_ref(), &main_conn, - &self.meta().client_unique_id, + &self.client_unique_id, ) .await?; @@ -539,15 +539,9 @@ impl DatabaseSyncEngine { final_schema_version ); - update_last_change_id( - coro, - &main_conn, - &self.meta().client_unique_id, - pull_gen + 1, - 0, - ) - .await - .inspect_err(|e| tracing::error!("update_last_change_id failed: {e}"))?; + update_last_change_id(coro, &main_conn, &self.client_unique_id, pull_gen + 1, 0) + .await + .inspect_err(|e| tracing::error!("update_last_change_id failed: {e}"))?; if had_cdc_table { tracing::info!( @@ -592,7 +586,7 @@ impl DatabaseSyncEngine { coro, self.protocol.as_ref(), &self.main_tape, - &self.meta().client_unique_id, + &self.client_unique_id, &self.opts, ) .await?; diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 9101d9829..78691c405 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -111,6 +111,7 @@ pub async fn wal_apply_from_file( ) -> Result { let size = frames_file.size()?; assert!(size % WAL_FRAME_SIZE as u64 == 0); + #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE)); tracing::debug!("wal_apply_from_file: size={}", size); let mut db_size = 0; @@ -195,13 +196,14 @@ pub async fn wal_pull_to_file_v1( let Some(header) = wait_proto_message::(coro, &completion, &mut bytes).await? else { - return Err(Error::DatabaseSyncEngineError(format!( - "no header returned in the pull-updates protobuf call" - ))); + return Err(Error::DatabaseSyncEngineError( + "no header returned in the pull-updates protobuf call".to_string(), + )); }; tracing::info!("wal_pull_to_file: got header={:?}", header); let mut offset = 0; + #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE)); let mut page_data_opt = @@ -271,6 +273,7 @@ pub async fn wal_pull_to_file_legacy( ); // todo(sivukhin): optimize allocation by using buffer pool in the DatabaseSyncOperations + #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE)); let mut buffer_len = 0; let mut last_offset = 0; @@ -338,14 +341,13 @@ pub async fn wal_pull_to_file_legacy( if start_frame < end_frame { // chunk which was sent from the server has ended early - so there is nothing left on server-side for pull break DatabasePullRevision::Legacy { - generation: generation, + generation, synced_frame_no: Some(start_frame - 1), }; } if buffer_len != 0 { return Err(Error::DatabaseSyncEngineError(format!( - "wal_pull: response has unexpected trailing data: buffer_len={}", - buffer_len + "wal_pull: response has unexpected trailing data: buffer_len={buffer_len}" ))); } }; @@ -521,7 +523,7 @@ pub async fn update_last_change_id( let row = run_stmt_expect_one_row(coro, &mut select_stmt).await?; tracing::info!("update_last_change_id(client_id={client_id}): selected client row if any"); - if let Some(_) = row { + if row.is_some() { let mut update_stmt = conn.prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; update_stmt.bind_at(1.try_into().unwrap(), turso_core::Value::Integer(pull_gen)); update_stmt.bind_at(2.try_into().unwrap(), turso_core::Value::Integer(change_id)); @@ -878,6 +880,7 @@ pub async fn read_wal_salt( coro: &Coro, wal: &Arc, ) -> Result>> { + #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(WAL_HEADER)); let c = Completion::new_read(buffer.clone(), |result| { let Ok((buffer, len)) = result else { @@ -891,7 +894,7 @@ pub async fn read_wal_salt( while !c.is_completed() { coro.yield_(ProtocolCommand::IO).await?; } - if buffer.as_mut_slice() == &[0u8; WAL_HEADER] { + if buffer.as_mut_slice() == [0u8; WAL_HEADER] { return Ok(None); } let salt1 = u32::from_be_bytes(buffer.as_slice()[16..20].try_into().unwrap()); @@ -955,9 +958,9 @@ pub async fn bootstrap_db_file_v1( let Some(header) = wait_proto_message::(coro, &completion, &mut bytes).await? else { - return Err(Error::DatabaseSyncEngineError(format!( - "no header returned in the pull-updates protobuf call" - ))); + return Err(Error::DatabaseSyncEngineError( + "no header returned in the pull-updates protobuf call".to_string(), + )); }; tracing::info!( "bootstrap_db_file(path={}): got header={:?}", @@ -976,6 +979,7 @@ pub async fn bootstrap_db_file_v1( coro.yield_(ProtocolCommand::IO).await?; } + #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(PAGE_SIZE)); while let Some(page_data) = wait_proto_message::(coro, &completion, &mut bytes).await? @@ -1019,7 +1023,7 @@ fn decode_page(header: &PullUpdatesRespProtoBody, page_data: PageData) -> Result )); } - if let Some(_) = header.raw_encoding { + if header.raw_encoding.is_some() { return Ok(page_data.encoded_page.to_vec()); } Err(Error::DatabaseSyncEngineError( @@ -1038,14 +1042,14 @@ pub async fn bootstrap_db_file_legacy( let start_time = std::time::Instant::now(); // cleanup all files left from previous attempt to bootstrap // we shouldn't write any WAL files - but let's truncate them too for safety - if let Some(file) = io.try_open(&main_db_path)? { + if let Some(file) = io.try_open(main_db_path)? { io.truncate(coro, file, 0).await?; } - if let Some(file) = io.try_open(&format!("{}-wal", main_db_path))? { + if let Some(file) = io.try_open(&format!("{main_db_path}-wal"))? { io.truncate(coro, file, 0).await?; } - let file = io.create(&main_db_path)?; + let file = io.create(main_db_path)?; let db_info = db_bootstrap(coro, client, file).await?; let elapsed = std::time::Instant::now().duration_since(start_time); @@ -1224,7 +1228,7 @@ pub fn read_varint(buf: &[u8]) -> Result> { for i in 0..9 { match buf.get(i) { Some(c) => { - v = (((c & 0x7f) as u64) << (i * 7)) | v; + v |= ((c & 0x7f) as u64) << (i * 7); if (c & 0x80) == 0 { return Ok(Some((v as usize, i + 1))); } @@ -1232,10 +1236,10 @@ pub fn read_varint(buf: &[u8]) -> Result> { None => return Ok(None), } } - return Err(Error::DatabaseSyncEngineError(format!( + Err(Error::DatabaseSyncEngineError(format!( "invalid variant byte: {:?}", &buf[0..=8] - ))); + ))) } pub async fn wait_proto_message( @@ -1245,7 +1249,7 @@ pub async fn wait_proto_message( ) -> Result> { let start_time = std::time::Instant::now(); loop { - let length = read_varint(&bytes)?; + let length = read_varint(bytes)?; let not_enough_bytes = match length { None => true, Some((message_length, prefix_length)) => message_length + prefix_length > bytes.len(), @@ -1255,7 +1259,7 @@ pub async fn wait_proto_message( bytes.extend_from_slice(poll.data()); } else if !completion.is_done()? { coro.yield_(ProtocolCommand::IO).await?; - } else if bytes.len() == 0 { + } else if bytes.is_empty() { return Ok(None); } else { return Err(Error::DatabaseSyncEngineError( @@ -1341,7 +1345,7 @@ mod tests { } fn is_done(&self) -> crate::Result { - Ok(self.data.borrow().len() == 0) + Ok(self.data.borrow().is_empty()) } } @@ -1364,8 +1368,9 @@ mod tests { let coro: Coro<()> = coro.into(); let mut bytes = BytesMut::new(); let mut count = 0; - while let Some(_) = - wait_proto_message::<(), PageData>(&coro, &completion, &mut bytes).await? + while wait_proto_message::<(), PageData>(&coro, &completion, &mut bytes) + .await? + .is_some() { assert!(bytes.capacity() <= 16 * 1024 + 1024); count += 1; diff --git a/sync/engine/src/database_tape.rs b/sync/engine/src/database_tape.rs index f66d97651..372076a17 100644 --- a/sync/engine/src/database_tape.rs +++ b/sync/engine/src/database_tape.rs @@ -11,7 +11,7 @@ use crate::{ errors::Error, types::{ Coro, DatabaseChange, DatabaseRowMutation, DatabaseRowStatement, DatabaseTapeOperation, - DatabaseTapeRowChange, DatabaseTapeRowChangeType, ProtocolCommand, + DatabaseTapeRowChange, DatabaseTapeRowChangeType, ProtocolCommand, Transform, }, wal_session::WalSession, Result, @@ -433,9 +433,7 @@ impl DatabaseChangesIterator { #[derive(Clone)] pub struct DatabaseReplaySessionOpts { pub use_implicit_rowid: bool, - pub transform: Option< - Arc Result> + 'static>, - >, + pub transform: Option>, } impl std::fmt::Debug for DatabaseReplaySessionOpts { diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index 20964b761..895b065a9 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -1,9 +1,12 @@ -use std::{cell::RefCell, collections::HashMap}; +use std::{cell::RefCell, collections::HashMap, sync::Arc}; use serde::{Deserialize, Serialize}; use crate::{errors::Error, Result}; +pub type Transform = + Arc Result> + 'static>; + pub struct Coro { pub ctx: RefCell, gen: genawaiter::sync::Co>, diff --git a/sync/javascript/src/lib.rs b/sync/javascript/src/lib.rs index 3c6a2a7c9..290d9235f 100644 --- a/sync/javascript/src/lib.rs +++ b/sync/javascript/src/lib.rs @@ -1,4 +1,5 @@ -#![deny(clippy::all)] +#![allow(clippy::await_holding_lock)] +#![allow(clippy::type_complexity)] pub mod generator; pub mod js_protocol_io; @@ -70,7 +71,7 @@ fn core_change_type_to_js(value: DatabaseChangeType) -> DatabaseChangeTypeJs { fn js_value_to_core(value: Either5>) -> turso_core::Value { match value { Either5::A(_) => turso_core::Value::Null, - Either5::B(value) => turso_core::Value::Integer(value as i64), + Either5::B(value) => turso_core::Value::Integer(value), Either5::C(value) => turso_core::Value::Float(value), Either5::D(value) => turso_core::Value::Text(turso_core::types::Text::new(&value)), Either5::E(value) => turso_core::Value::Blob(value), @@ -164,8 +165,9 @@ impl SyncEngine { path: opts.path, client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()), wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100), - tables_ignore: opts.tables_ignore.unwrap_or(Vec::new()), + tables_ignore: opts.tables_ignore.unwrap_or_default(), transform: opts.transform.map(|x| x.create_ref().unwrap()), + #[allow(clippy::arc_with_non_send_sync)] sync_engine: Arc::new(RwLock::new(None)), io, protocol: Arc::new(JsProtocolIo::default()), @@ -182,6 +184,7 @@ impl SyncEngine { #[napi] pub fn init(&mut self, env: Env) -> GeneratorHolder { + #[allow(clippy::type_complexity)] let transform: Option< Arc< dyn Fn( @@ -194,7 +197,7 @@ impl SyncEngine { > = match self.transform.take() { Some(f) => Some(Arc::new(move |env, mutation| { let result = f - .borrow_back(&env) + .borrow_back(env) .unwrap() .call(DatabaseRowMutationJs { change_time: mutation.change_time as i64,