fix clippy

This commit is contained in:
Nikita Sivukhin
2025-08-27 15:51:29 +04:00
parent 009aa479bf
commit 6e124d927e
6 changed files with 60 additions and 56 deletions

View File

@@ -23,8 +23,8 @@ use crate::{
io_operations::IoOperations,
protocol_io::ProtocolIO,
types::{
Coro, DatabaseMetadata, DatabasePullRevision, DatabaseRowMutation, DatabaseRowStatement,
DatabaseSyncEngineProtocolVersion, DatabaseTapeOperation, DbChangesStatus, SyncEngineStats,
Coro, DatabaseMetadata, DatabasePullRevision, DatabaseSyncEngineProtocolVersion,
DatabaseTapeOperation, DbChangesStatus, SyncEngineStats, Transform,
},
wal_session::WalSession,
Result,
@@ -34,9 +34,7 @@ use crate::{
pub struct DatabaseSyncEngineOpts<Ctx> {
pub client_name: String,
pub tables_ignore: Vec<String>,
pub transform: Option<
Arc<dyn Fn(&Ctx, DatabaseRowMutation) -> Result<Option<DatabaseRowStatement>> + 'static>,
>,
pub transform: Option<Transform<Ctx>>,
pub wal_pull_batch_size: u64,
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
}
@@ -63,6 +61,7 @@ pub struct DatabaseSyncEngine<P: ProtocolIO, Ctx> {
meta_path: String,
opts: DatabaseSyncEngineOpts<Ctx>,
meta: RefCell<DatabaseMetadata>,
client_unique_id: String,
}
fn db_size_from_page(page: &[u8]) -> u32 {
@@ -103,7 +102,7 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
coro,
protocol.as_ref(),
&io,
&main_db_path,
main_db_path,
opts.protocol_version_hint,
)
.await?;
@@ -123,7 +122,7 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
}
};
let main_exists = io.try_open(&main_db_path)?.is_some();
let main_exists = io.try_open(main_db_path)?.is_some();
if !main_exists {
let error = "main DB file doesn't exists, but metadata is".to_string();
return Err(Error::DatabaseSyncEngineError(error));
@@ -156,6 +155,7 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
meta_path: format!("{main_db_path}-info"),
opts,
meta: RefCell::new(meta.clone()),
client_unique_id: meta.client_unique_id.clone(),
};
let synced_revision = meta.synced_revision.as_ref().unwrap();
@@ -189,7 +189,7 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
}
async fn checkpoint_passive(&mut self, coro: &Coro<Ctx>) -> Result<(Option<Vec<u32>>, u64)> {
let watermark = self.meta().revert_since_wal_watermark as u64;
let watermark = self.meta().revert_since_wal_watermark;
tracing::info!(
"checkpoint(path={:?}): revert_since_wal_watermark={}",
self.main_db_path,
@@ -464,7 +464,7 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
coro,
self.protocol.as_ref(),
&main_conn,
&self.meta().client_unique_id,
&self.client_unique_id,
)
.await?;
@@ -539,15 +539,9 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
final_schema_version
);
update_last_change_id(
coro,
&main_conn,
&self.meta().client_unique_id,
pull_gen + 1,
0,
)
.await
.inspect_err(|e| tracing::error!("update_last_change_id failed: {e}"))?;
update_last_change_id(coro, &main_conn, &self.client_unique_id, pull_gen + 1, 0)
.await
.inspect_err(|e| tracing::error!("update_last_change_id failed: {e}"))?;
if had_cdc_table {
tracing::info!(
@@ -592,7 +586,7 @@ impl<P: ProtocolIO, Ctx> DatabaseSyncEngine<P, Ctx> {
coro,
self.protocol.as_ref(),
&self.main_tape,
&self.meta().client_unique_id,
&self.client_unique_id,
&self.opts,
)
.await?;