diff --git a/packages/turso-sync/src/database.rs b/packages/turso-sync/src/database.rs index 73a67834f..597a71023 100644 --- a/packages/turso-sync/src/database.rs +++ b/packages/turso-sync/src/database.rs @@ -30,7 +30,7 @@ impl Builder { Self { path: path.to_string(), sync_url: sync_url.to_string(), - auth_token: auth_token, + auth_token, encryption_key: None, connector: None, } @@ -48,12 +48,8 @@ impl Builder { } } pub async fn build(self) -> Result { - let path = PathBuf::try_from(self.path) - .map_err(|e| Error::DatabaseSyncError(format!("invalid synced database path: {e}")))?; - let connector = self - .connector - .map(Ok) - .unwrap_or_else(|| default_connector())?; + let path = PathBuf::from(self.path); + let connector = self.connector.map(Ok).unwrap_or_else(default_connector)?; let executor = TokioExecutor::new(); let client = hyper_util::client::legacy::Builder::new(executor).build(connector); let sync_server = TursoSyncServer::new( @@ -92,7 +88,7 @@ impl Database { pub fn default_connector() -> Result> { Ok(HttpsConnectorBuilder::new() .with_native_roots() - .map_err(|e| Error::DatabaseSyncError(format!("unable to configure CA roots: {}", e)))? + .map_err(|e| Error::DatabaseSyncError(format!("unable to configure CA roots: {e}")))? .https_or_http() .enable_http1() .build()) diff --git a/packages/turso-sync/src/database_inner.rs b/packages/turso-sync/src/database_inner.rs index 9f2f13013..dfd5d8fbb 100644 --- a/packages/turso-sync/src/database_inner.rs +++ b/packages/turso-sync/src/database_inner.rs @@ -73,9 +73,9 @@ impl DatabaseInner { let path_str = path .to_str() .ok_or_else(|| Error::DatabaseSyncError(format!("invalid path: {path:?}")))?; - let draft_path = PathBuf::from(format!("{}-draft", path_str)); - let synced_path = PathBuf::from(format!("{}-synced", path_str)); - let meta_path = PathBuf::from(format!("{}-info", path_str)); + let draft_path = PathBuf::from(format!("{path_str}-draft")); + let synced_path = PathBuf::from(format!("{path_str}-synced")); + let meta_path = PathBuf::from(format!("{path_str}-info")); let database_container = Arc::new(RwLock::new(ActiveDatabaseContainer { db: None, active_type: ActiveDatabase::Draft, @@ -202,9 +202,9 @@ impl DatabaseInner { let draft_exists = self.filesystem.exists_file(&self.draft_path).await?; let synced_exists = self.filesystem.exists_file(&self.synced_path).await?; if !draft_exists || !synced_exists { - return Err(Error::DatabaseSyncError(format!( - "Draft or Synced files doesn't exists, but metadata is" - ))); + return Err(Error::DatabaseSyncError( + "Draft or Synced files doesn't exists, but metadata is".to_string(), + )); } // Synced db is active - we need to finish transfer from Synced to Draft then @@ -302,10 +302,7 @@ impl DatabaseInner { }) } - async fn write_meta( - &self, - update: impl Fn(&mut DatabaseMetadata) -> (), - ) -> Result { + async fn write_meta(&self, update: impl Fn(&mut DatabaseMetadata)) -> Result { let mut meta = self.meta().clone(); update(&mut meta); // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated @@ -358,7 +355,7 @@ impl DatabaseInner { }; while let Some(mut chunk) = data.read_chunk().await? { // chunk is arbitrary - aggregate groups of FRAME_SIZE bytes out from the chunks stream - while chunk.len() > 0 { + while !chunk.is_empty() { let to_fill = FRAME_SIZE - buffer.len(); let prefix = chunk.split_to(to_fill.min(chunk.len())); buffer.extend_from_slice(&prefix); @@ -542,9 +539,9 @@ impl DatabaseInner { let draft_path_str = self.draft_path.to_str().unwrap_or(""); let clean_path_str = self.synced_path.to_str().unwrap_or(""); - let draft_wal = PathBuf::from(format!("{}-wal", draft_path_str)); - let clean_wal = PathBuf::from(format!("{}-wal", clean_path_str)); - let draft_shm = PathBuf::from(format!("{}-shm", draft_path_str)); + let draft_wal = PathBuf::from(format!("{draft_path_str}-wal")); + let clean_wal = PathBuf::from(format!("{clean_path_str}-wal")); + let draft_shm = PathBuf::from(format!("{draft_path_str}-shm")); self.filesystem .copy_file(&self.synced_path, &self.draft_path) .await?; @@ -568,7 +565,7 @@ impl DatabaseInner { } let clean_path_str = self.synced_path.to_str().unwrap_or(""); - let clean_wal_path = PathBuf::from(format!("{}-wal", clean_path_str)); + let clean_wal_path = PathBuf::from(format!("{clean_path_str}-wal")); let wal_size = WAL_HEADER + FRAME_SIZE * self.meta().synced_frame_no; tracing::debug!( "reset Synced DB WAL to the size of {} frames", @@ -617,7 +614,7 @@ mod tests { sql: &str, ) -> Result>> { let mut rows = db.query(sql, ()).await?; - Ok(convert_rows(&mut rows).await?) + convert_rows(&mut rows).await } #[test] @@ -807,7 +804,7 @@ mod tests { let db = DatabaseInner::new( TestFilesystem::new(ctx.clone()), server.clone(), - &dir.path().join(&format!("local-{}.db", i)), + &dir.path().join(format!("local-{i}.db")), ) .await .unwrap(); @@ -889,7 +886,7 @@ mod tests { let server = server.clone(); let sync_lock = sync_lock.clone(); async move { - let local_path = dir.join(format!("local-{}.db", i)); + let local_path = dir.join(format!("local-{i}.db")); let fs = TestFilesystem::new(ctx.clone()); let mut db = DatabaseInner::new(fs, server.clone(), &local_path) .await @@ -936,7 +933,7 @@ mod tests { let mut session = ctx.fault_session(); let mut it = 0; while let Some(strategy) = session.next().await { - let local_path = dir.path().join(format!("local-{}.db", it)); + let local_path = dir.path().join(format!("local-{it}.db")); it += 1; let fs = TestFilesystem::new(ctx.clone()); @@ -992,14 +989,14 @@ mod tests { let mut session = ctx.fault_session(); let mut it = 0; while let Some(strategy) = session.next().await { - let server_path = dir.path().join(format!("server-{}.db", it)); + let server_path = dir.path().join(format!("server-{it}.db")); let server = TestSyncServer::new(ctx.clone(), &server_path, opts.clone()) .await .unwrap(); server.execute("CREATE TABLE t(x)", ()).await.unwrap(); - let local_path = dir.path().join(format!("local-{}.db", it)); + let local_path = dir.path().join(format!("local-{it}.db")); it += 1; let fs = TestFilesystem::new(ctx.clone()); @@ -1053,7 +1050,7 @@ mod tests { let mut session = ctx.fault_session(); let mut it = 0; while let Some(strategy) = session.next().await { - let server_path = dir.path().join(format!("server-{}.db", it)); + let server_path = dir.path().join(format!("server-{it}.db")); let server = TestSyncServer::new(ctx.clone(), &server_path, opts.clone()) .await .unwrap(); @@ -1068,7 +1065,7 @@ mod tests { .await .unwrap(); - let local_path = dir.path().join(format!("local-{}.db", it)); + let local_path = dir.path().join(format!("local-{it}.db")); it += 1; let fs = TestFilesystem::new(ctx.clone()); diff --git a/packages/turso-sync/src/filesystem/tokio.rs b/packages/turso-sync/src/filesystem/tokio.rs index f9d88b75a..b729d21c1 100644 --- a/packages/turso-sync/src/filesystem/tokio.rs +++ b/packages/turso-sync/src/filesystem/tokio.rs @@ -61,7 +61,7 @@ impl Filesystem for TokioFilesystem { async fn write_file(&self, file: &mut Self::File, buf: &[u8]) -> Result<()> { tracing::debug!("write buffer of size {} to file", buf.len()); - file.write_all(&buf).await?; + file.write_all(buf).await?; Ok(()) } diff --git a/packages/turso-sync/src/metadata.rs b/packages/turso-sync/src/metadata.rs index 203ae8eb8..78d74fa33 100644 --- a/packages/turso-sync/src/metadata.rs +++ b/packages/turso-sync/src/metadata.rs @@ -47,18 +47,17 @@ impl DatabaseMetadata { tracing::debug!("write metadata to {:?}: {:?}", path, self); let directory = path.parent().ok_or_else(|| { Error::MetadataError(format!( - "unable to get parent of the provided path: {:?}", - path + "unable to get parent of the provided path: {path:?}", )) })?; let filename = path .file_name() .and_then(|x| x.to_str()) - .ok_or_else(|| Error::MetadataError(format!("unable to get filename: {:?}", path)))?; + .ok_or_else(|| Error::MetadataError(format!("unable to get filename: {path:?}")))?; let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH); let timestamp = timestamp.map_err(|e| { - Error::MetadataError(format!("failed to get current time for temp file: {}", e)) + Error::MetadataError(format!("failed to get current time for temp file: {e}")) })?; let temp_name = format!("{}.tmp.{}", filename, timestamp.as_nanos()); let temp_path = directory.join(temp_name); @@ -72,7 +71,7 @@ impl DatabaseMetadata { } drop(temp_file); if result.is_ok() { - result = fs.rename_file(&temp_path, &path).await; + result = fs.rename_file(&temp_path, path).await; } if result.is_err() { let _ = fs.remove_file(&temp_path).await.inspect_err(|e| { diff --git a/packages/turso-sync/src/sync_server/test.rs b/packages/turso-sync/src/sync_server/test.rs index c0c13a9ba..32b9667b4 100644 --- a/packages/turso-sync/src/sync_server/test.rs +++ b/packages/turso-sync/src/sync_server/test.rs @@ -96,7 +96,7 @@ impl SyncServer for TestSyncServer { let state = self.state.lock().await; let Some(generation) = state.generations.get(&generation_id) else { - return Err(Error::DatabaseSyncError(format!("generation not found"))); + return Err(Error::DatabaseSyncError("generation not found".to_string())); }; Ok(TestStream::new( self.ctx.clone(), @@ -110,7 +110,7 @@ impl SyncServer for TestSyncServer { let state = self.state.lock().await; let Some(generation) = state.generations.get(&generation_id) else { - return Err(Error::DatabaseSyncError(format!("generation not found"))); + return Err(Error::DatabaseSyncError("generation not found".to_string())); }; let mut data = Vec::new(); for frame_no in start_frame..start_frame + self.opts.pull_batch_size { @@ -120,7 +120,7 @@ impl SyncServer for TestSyncServer { }; data.extend_from_slice(frame); } - if data.len() == 0 { + if data.is_empty() { let last_generation = state.generations.get(&state.generation).unwrap(); return Err(Error::PullNeedCheckpoint(DbSyncStatus { baton: None, @@ -152,7 +152,9 @@ impl SyncServer for TestSyncServer { let mut session = { let mut state = self.state.lock().await; if state.generation != generation_id { - return Err(Error::DatabaseSyncError(format!("generation id mismatch"))); + return Err(Error::DatabaseSyncError( + "generation id mismatch".to_string(), + )); } let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); let session = match state.sessions.get(&baton_str) { @@ -174,9 +176,9 @@ impl SyncServer for TestSyncServer { let mut offset = 0; for frame_no in start_frame..end_frame { if offset + FRAME_SIZE > frames.len() { - return Err(Error::DatabaseSyncError(format!( - "unexpected length of frames data" - ))); + return Err(Error::DatabaseSyncError( + "unexpected length of frames data".to_string(), + )); } if !session.in_txn { session.conn.wal_insert_begin()?; @@ -251,7 +253,7 @@ impl TestSyncServer { opts: Arc::new(opts), state: Arc::new(Mutex::new(TestSyncServerState { generation: 1, - generations: generations, + generations, sessions: HashMap::new(), })), }) diff --git a/packages/turso-sync/src/sync_server/turso.rs b/packages/turso-sync/src/sync_server/turso.rs index b12dca605..963103f45 100644 --- a/packages/turso-sync/src/sync_server/turso.rs +++ b/packages/turso-sync/src/sync_server/turso.rs @@ -33,7 +33,7 @@ pub struct TursoSyncServer { fn sync_server_error(status: http::StatusCode, body: impl Buf) -> Error { let mut body_str = String::new(); if let Err(e) = body.reader().read_to_string(&mut body_str) { - Error::SyncServerError(status, format!("unable to read response body: {}", e)) + Error::SyncServerError(status, format!("unable to read response body: {e}")) } else { Error::SyncServerError(status, body_str) } @@ -57,7 +57,7 @@ impl Stream for HyperStream { let frame = frame.map_err(Error::HyperResponse)?; let frame = frame .into_data() - .map_err(|_| Error::DatabaseSyncError(format!("failed to read export chunk")))?; + .map_err(|_| Error::DatabaseSyncError("failed to read export chunk".to_string()))?; Ok(Some(frame)) } } @@ -67,7 +67,7 @@ impl TursoSyncServer { let auth_token_header = opts .auth_token .as_ref() - .map(|token| hyper::header::HeaderValue::from_str(&format!("Bearer {}", token))) + .map(|token| hyper::header::HeaderValue::from_str(&format!("Bearer {token}"))) .transpose() .map_err(|e| Error::Http(e.into()))?; Ok(Self { @@ -150,8 +150,8 @@ impl SyncServer for TursoSyncServer { return Err(sync_server_error(status_code, body)); } - let status: DbSyncStatus; - status = serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?; + let status: DbSyncStatus = + serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?; match status.status.as_str() { "ok" => Ok(status), @@ -185,8 +185,8 @@ impl SyncServer for TursoSyncServer { let (status, body) = self.send(http::Method::GET, &url, empty).await?; if status == http::StatusCode::BAD_REQUEST { let body = aggregate_body(body).await?; - let status: DbSyncStatus; - status = serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?; + let status: DbSyncStatus = + serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?; if status.status == "checkpoint_needed" { return Err(Error::PullNeedCheckpoint(status)); } else { diff --git a/packages/turso-sync/src/test_context.rs b/packages/turso-sync/src/test_context.rs index 123b6308d..9f18294ea 100644 --- a/packages/turso-sync/src/test_context.rs +++ b/packages/turso-sync/src/test_context.rs @@ -11,9 +11,10 @@ use tokio::sync::Mutex; use crate::{errors::Error, Result}; +type PinnedFuture = Pin + Send>>; + pub struct FaultInjectionPlan { - pub is_fault: - Box Pin + Send>> + Send + Sync>, + pub is_fault: Box PinnedFuture + Send + Sync>, } pub enum FaultInjectionStrategy { @@ -55,7 +56,7 @@ impl FaultSession { } let plans = self.plans.as_mut().unwrap(); - if plans.len() == 0 { + if plans.is_empty() { return None; } @@ -100,7 +101,7 @@ impl TestContext { let call = call.clone(); let count = count.clone(); Box::pin(async move { - if &(name, bt) != &call { + if (name, bt) != call { return false; } let mut count = count.lock().await; @@ -115,10 +116,9 @@ impl TestContext { pub async fn faulty_call(&self, name: &str) -> Result<()> { tracing::trace!("faulty_call: {}", name); tokio::task::yield_now().await; - match &*self.fault_injection.lock().await { - FaultInjectionStrategy::Disabled => return Ok(()), - _ => {} - }; + if let FaultInjectionStrategy::Disabled = &*self.fault_injection.lock().await { + return Ok(()); + } let bt = std::backtrace::Backtrace::force_capture().to_string(); match &mut *self.fault_injection.lock().await { FaultInjectionStrategy::Record => { @@ -128,7 +128,7 @@ impl TestContext { } FaultInjectionStrategy::Enabled { plan } => { if plan.is_fault.as_ref()(name.to_string(), bt.clone()).await { - Err(Error::DatabaseSyncError(format!("injected fault"))) + Err(Error::DatabaseSyncError("injected fault".to_string())) } else { Ok(()) }