From a855a657aa0f3b9082324a337e9cae1df8a17f5a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 10 Nov 2025 17:25:39 +0400 Subject: [PATCH] report network stats --- sync/engine/src/database_sync_engine.rs | 63 +++++--- sync/engine/src/database_sync_operations.rs | 158 +++++++++++++++----- sync/engine/src/types.rs | 2 + 3 files changed, 167 insertions(+), 56 deletions(-) diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index 7f795ba29..415be3f4a 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -1,6 +1,9 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use turso_core::{DatabaseStorage, OpenFlags}; @@ -11,7 +14,7 @@ use crate::{ acquire_slot, apply_transformation, bootstrap_db_file, connect_untracked, count_local_changes, has_table, push_logical_changes, read_last_change_id, read_wal_salt, reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file, - wal_pull_to_file, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE, + wal_pull_to_file, ProtocolIoStats, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE, }, database_sync_partial_storage::PartialDatabaseStorage, database_tape::{ @@ -44,9 +47,29 @@ pub struct DatabaseSyncEngineOpts { pub partial: bool, } +pub struct DataStats { + pub written_bytes: AtomicUsize, + pub read_bytes: AtomicUsize, +} + +impl DataStats { + pub fn new() -> Self { + Self { + written_bytes: AtomicUsize::new(0), + read_bytes: AtomicUsize::new(0), + } + } + pub fn write(&self, size: usize) { + self.written_bytes.fetch_add(size, Ordering::SeqCst); + } + pub fn read(&self, size: usize) { + self.read_bytes.fetch_add(size, Ordering::SeqCst); + } +} + pub struct DatabaseSyncEngine { io: Arc, - protocol: Arc

, + protocol: ProtocolIoStats

, db_file: Arc, main_tape: DatabaseTape, main_db_wal_path: String, @@ -80,12 +103,13 @@ impl DatabaseSyncEngine

{ tracing::info!("init(path={}): opts={:?}", main_db_path, opts); let completion = protocol.full_read(&meta_path)?; - let data = wait_all_results(coro, &completion).await?; + let data = wait_all_results(coro, &completion, None).await?; let meta = if data.is_empty() { None } else { Some(DatabaseMetadata::load(&data)?) }; + let protocol = ProtocolIoStats::new(protocol); let meta = match meta { Some(meta) => meta, @@ -98,7 +122,7 @@ impl DatabaseSyncEngine

{ }; let revision = bootstrap_db_file( coro, - protocol.as_ref(), + &protocol, &io, main_db_path, opts.protocol_version_hint, @@ -124,7 +148,7 @@ impl DatabaseSyncEngine

{ tracing::info!("write meta after successful bootstrap: meta={meta:?}"); let completion = protocol.full_write(&meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated - wait_all_results(coro, &completion).await?; + wait_all_results(coro, &completion, None).await?; meta } None => { @@ -154,7 +178,7 @@ impl DatabaseSyncEngine

{ tracing::info!("write meta after successful bootstrap: meta={meta:?}"); let completion = protocol.full_write(&meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated - wait_all_results(coro, &completion).await?; + wait_all_results(coro, &completion, None).await?; meta } }; @@ -342,6 +366,16 @@ impl DatabaseSyncEngine

{ last_pull_unix_time, last_push_unix_time, revision, + network_sent_bytes: self + .protocol + .network_stats + .written_bytes + .load(Ordering::SeqCst), + network_received_bytes: self + .protocol + .network_stats + .read_bytes + .load(Ordering::SeqCst), }) } @@ -458,7 +492,7 @@ impl DatabaseSyncEngine

{ let revision = self.meta().synced_revision.clone(); let next_revision = wal_pull_to_file( coro, - self.protocol.as_ref(), + &self.protocol, &file.value, &revision, self.opts.wal_pull_batch_size, @@ -707,13 +741,8 @@ impl DatabaseSyncEngine

{ let mut transformed = if self.opts.use_transform { Some( - apply_transformation( - coro, - self.protocol.as_ref(), - &local_changes, - &replay.generator, - ) - .await?, + apply_transformation(coro, &self.protocol, &local_changes, &replay.generator) + .await?, ) } else { None @@ -755,7 +784,7 @@ impl DatabaseSyncEngine

{ let (_, change_id) = push_logical_changes( coro, - self.protocol.as_ref(), + &self.protocol, &self.main_tape, &self.client_unique_id, &self.opts, @@ -808,7 +837,7 @@ impl DatabaseSyncEngine

{ tracing::info!("update_meta: {meta:?}"); let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated - wait_all_results(coro, &completion).await?; + wait_all_results(coro, &completion, None).await?; *self.meta.lock().unwrap() = meta; Ok(()) } diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 0c79bde36..1bd9372ff 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -1,4 +1,7 @@ -use std::sync::{Arc, Mutex}; +use std::{ + ops::Deref, + sync::{Arc, Mutex}, +}; use bytes::BytesMut; use prost::Message; @@ -10,7 +13,7 @@ use turso_core::{ use crate::{ database_replay_generator::DatabaseReplayGenerator, - database_sync_engine::DatabaseSyncEngineOpts, + database_sync_engine::{DataStats, DatabaseSyncEngineOpts}, database_tape::{ run_stmt_expect_one_row, run_stmt_ignore_rows, DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession, @@ -60,6 +63,37 @@ pub(crate) fn acquire_slot(slot: &Arc>>) -> Result { + pub protocol: Arc

, + pub network_stats: Arc, +} + +impl ProtocolIoStats

{ + pub fn new(protocol: Arc

) -> Self { + Self { + protocol, + network_stats: Arc::new(DataStats::new()), + } + } +} + +impl Clone for ProtocolIoStats

{ + fn clone(&self) -> Self { + Self { + protocol: self.protocol.clone(), + network_stats: self.network_stats.clone(), + } + } +} + +impl Deref for ProtocolIoStats

{ + type Target = P; + + fn deref(&self) -> &Self::Target { + &self.protocol + } +} + enum WalHttpPullResult> { Frames(C), NeedCheckpoint(DbSyncStatus), @@ -79,7 +113,7 @@ pub fn connect_untracked(tape: &DatabaseTape) -> Result( coro: &Coro, - client: &C, + client: &ProtocolIoStats, db: Arc, ) -> Result { tracing::info!("db_bootstrap"); @@ -90,8 +124,10 @@ pub async fn db_bootstrap( let mut pos = 0; loop { while let Some(chunk) = content.poll_data()? { + client.network_stats.read(chunk.data().len()); let chunk = chunk.data(); let content_len = chunk.len(); + // todo(sivukhin): optimize allocations here #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(chunk.len())); @@ -164,7 +200,7 @@ pub async fn wal_apply_from_file( pub async fn wal_pull_to_file( coro: &Coro, - client: &C, + client: &ProtocolIoStats, frames_file: &Arc, revision: &Option, wal_pull_batch_size: u64, @@ -207,7 +243,7 @@ pub async fn wal_pull_to_file( /// Pull updates from remote to the separate file pub async fn wal_pull_to_file_v1( coro: &Coro, - client: &C, + client: &ProtocolIoStats, frames_file: &Arc, revision: &str, long_poll_timeout: Option, @@ -224,6 +260,7 @@ pub async fn wal_pull_to_file_v1( client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); + client.network_stats.write(request.len()); let completion = client.http( "POST", "/pull-updates", @@ -233,8 +270,13 @@ pub async fn wal_pull_to_file_v1( ("accept-encoding", "application/protobuf"), ], )?; - let Some(header) = - wait_proto_message::(coro, &completion, &mut bytes).await? + let Some(header) = wait_proto_message::( + coro, + &completion, + &client.network_stats, + &mut bytes, + ) + .await? else { return Err(Error::DatabaseSyncEngineError( "no header returned in the pull-updates protobuf call".to_string(), @@ -247,7 +289,8 @@ pub async fn wal_pull_to_file_v1( let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE)); let mut page_data_opt = - wait_proto_message::(coro, &completion, &mut bytes).await?; + wait_proto_message::(coro, &completion, &client.network_stats, &mut bytes) + .await?; while let Some(page_data) = page_data_opt.take() { let page_id = page_data.page_id; tracing::info!("received page {}", page_id); @@ -260,7 +303,8 @@ pub async fn wal_pull_to_file_v1( ))); } buffer.as_mut_slice()[WAL_FRAME_HEADER..].copy_from_slice(&page); - page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?; + page_data_opt = + wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?; let mut frame_info = WalFrameInfo { db_size: 0, page_no: page_id as u32 + 1, @@ -302,7 +346,7 @@ pub async fn wal_pull_to_file_v1( /// Pull pages from remote pub async fn pull_pages_v1( coro: &Coro, - client: &C, + client: &ProtocolIoStats, server_revision: &str, pages: &[u32], ) -> Result> { @@ -326,6 +370,7 @@ pub async fn pull_pages_v1( client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); + client.network_stats.write(request.len()); let completion = client.http( "POST", "/pull-updates", @@ -335,8 +380,13 @@ pub async fn pull_pages_v1( ("accept-encoding", "application/protobuf"), ], )?; - let Some(header) = - wait_proto_message::(coro, &completion, &mut bytes).await? + let Some(header) = wait_proto_message::( + coro, + &completion, + &client.network_stats, + &mut bytes, + ) + .await? else { return Err(Error::DatabaseSyncEngineError( "no header returned in the pull-updates protobuf call".to_string(), @@ -347,7 +397,8 @@ pub async fn pull_pages_v1( let mut pages = Vec::with_capacity(PAGE_SIZE * pages.len()); let mut page_data_opt = - wait_proto_message::(coro, &completion, &mut bytes).await?; + wait_proto_message::(coro, &completion, &client.network_stats, &mut bytes) + .await?; while let Some(page_data) = page_data_opt.take() { let page_id = page_data.page_id; tracing::info!("received page {}", page_id); @@ -360,7 +411,8 @@ pub async fn pull_pages_v1( ))); } pages.extend_from_slice(&page); - page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?; + page_data_opt = + wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?; tracing::info!("page_data_opt: {}", page_data_opt.is_some()); } @@ -370,7 +422,7 @@ pub async fn pull_pages_v1( /// Pull updates from remote to the separate file pub async fn wal_pull_to_file_legacy( coro: &Coro, - client: &C, + client: &ProtocolIoStats, frames_file: &Arc, mut generation: u64, mut start_frame: u64, @@ -408,7 +460,9 @@ pub async fn wal_pull_to_file_legacy( }; loop { while let Some(chunk) = data.poll_data()? { + client.network_stats.read(chunk.data().len()); let mut chunk = chunk.data(); + while !chunk.is_empty() { let to_fill = (WAL_FRAME_SIZE - buffer_len).min(chunk.len()); buffer.as_mut_slice()[buffer_len..buffer_len + to_fill] @@ -494,7 +548,7 @@ pub async fn wal_pull_to_file_legacy( /// and can be called multiple times with same frame range pub async fn wal_push( coro: &Coro, - client: &C, + client: &ProtocolIoStats, wal_session: &mut WalSession, baton: Option, generation: u64, @@ -693,7 +747,7 @@ pub async fn read_last_change_id( pub async fn fetch_last_change_id( coro: &Coro, - client: &C, + client: &ProtocolIoStats, source_conn: &Arc, client_id: &str, ) -> Result<(i64, Option)> { @@ -773,7 +827,7 @@ pub async fn fetch_last_change_id( pub async fn push_logical_changes( coro: &Coro, - client: &C, + client: &ProtocolIoStats, source: &DatabaseTape, client_id: &str, opts: &DatabaseSyncEngineOpts, @@ -992,9 +1046,9 @@ pub async fn push_logical_changes( Ok((source_pull_gen, last_change_id.unwrap_or(0))) } -pub async fn apply_transformation( +pub async fn apply_transformation( coro: &Coro, - client: &P, + client: &ProtocolIoStats, changes: &Vec, generator: &DatabaseReplayGenerator, ) -> Result> { @@ -1004,7 +1058,7 @@ pub async fn apply_transformation( mutations.push(generator.create_mutation(&replay_info, change)?); } let completion = client.transform(mutations)?; - let transformed = wait_all_results(coro, &completion).await?; + let transformed = wait_all_results(coro, &completion, None).await?; if transformed.len() != changes.len() { return Err(Error::DatabaseSyncEngineError(format!( "unexpected result from custom transformation: mismatch in shapes: {} != {}", @@ -1064,7 +1118,7 @@ pub async fn checkpoint_wal_file( pub async fn bootstrap_db_file( coro: &Coro, - client: &C, + client: &ProtocolIoStats, io: &Arc, main_db_path: &str, protocol: DatabaseSyncEngineProtocolVersion, @@ -1087,7 +1141,7 @@ pub async fn bootstrap_db_file( pub async fn bootstrap_db_file_v1( coro: &Coro, - client: &C, + client: &ProtocolIoStats, io: &Arc, main_db_path: &str, prefix: Option, @@ -1117,6 +1171,7 @@ pub async fn bootstrap_db_file_v1( client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); + client.network_stats.write(request.len()); let completion = client.http( "POST", "/pull-updates", @@ -1127,8 +1182,13 @@ pub async fn bootstrap_db_file_v1( ], )?; let mut bytes = BytesMut::new(); - let Some(header) = - wait_proto_message::(coro, &completion, &mut bytes).await? + let Some(header) = wait_proto_message::( + coro, + &completion, + &client.network_stats, + &mut bytes, + ) + .await? else { return Err(Error::DatabaseSyncEngineError( "no header returned in the pull-updates protobuf call".to_string(), @@ -1154,7 +1214,8 @@ pub async fn bootstrap_db_file_v1( #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(PAGE_SIZE)); while let Some(page_data) = - wait_proto_message::(coro, &completion, &mut bytes).await? + wait_proto_message::(coro, &completion, &client.network_stats, &mut bytes) + .await? { let offset = page_data.page_id * PAGE_SIZE as u64; let page = decode_page(&header, page_data)?; @@ -1205,7 +1266,7 @@ fn decode_page(header: &PullUpdatesRespProtoBody, page_data: PageData) -> Result pub async fn bootstrap_db_file_legacy( coro: &Coro, - client: &C, + client: &ProtocolIoStats, io: &Arc, main_db_path: &str, ) -> Result { @@ -1264,17 +1325,20 @@ pub async fn reset_wal_file( async fn sql_execute_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, request: server_proto::PipelineReqBody, ) -> Result> { let body = serde_json::to_vec(&request)?; + + client.network_stats.write(body.len()); let completion = client.http("POST", "/v2/pipeline", Some(body), &[])?; + let status = wait_status(coro, &completion).await?; if status != http::StatusCode::OK { let error = format!("sql_execute_http: unexpected status code: {status}"); return Err(Error::DatabaseSyncEngineError(error)); } - let response = wait_all_results(coro, &completion).await?; + let response = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; let response: server_proto::PipelineRespBody = serde_json::from_slice(&response)?; tracing::debug!("hrana response: {:?}", response); let mut results = Vec::new(); @@ -1312,7 +1376,7 @@ async fn sql_execute_http( async fn wal_pull_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, generation: u64, start_frame: u64, end_frame: u64, @@ -1325,7 +1389,7 @@ async fn wal_pull_http( )?; let status = wait_status(coro, &completion).await?; if status == http::StatusCode::BAD_REQUEST { - let status_body = wait_all_results(coro, &completion).await?; + let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; let status: DbSyncStatus = serde_json::from_slice(&status_body)?; if status.status == "checkpoint_needed" { return Ok(WalHttpPullResult::NeedCheckpoint(status)); @@ -1343,7 +1407,7 @@ async fn wal_pull_http( async fn wal_push_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, baton: Option, generation: u64, start_frame: u64, @@ -1353,6 +1417,8 @@ async fn wal_push_http( let baton = baton .map(|baton| format!("/{baton}")) .unwrap_or("".to_string()); + + client.network_stats.write(frames.len()); let completion = client.http( "POST", &format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"), @@ -1360,7 +1426,7 @@ async fn wal_push_http( &[], )?; let status = wait_status(coro, &completion).await?; - let status_body = wait_all_results(coro, &completion).await?; + let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; if status != http::StatusCode::OK { let error = std::str::from_utf8(&status_body).ok().unwrap_or(""); return Err(Error::DatabaseSyncEngineError(format!( @@ -1370,10 +1436,13 @@ async fn wal_push_http( Ok(serde_json::from_slice(&status_body)?) } -async fn db_info_http(coro: &Coro, client: &C) -> Result { +async fn db_info_http( + coro: &Coro, + client: &ProtocolIoStats, +) -> Result { let completion = client.http("GET", "/info", None, &[])?; let status = wait_status(coro, &completion).await?; - let status_body = wait_all_results(coro, &completion).await?; + let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; if status != http::StatusCode::OK { return Err(Error::DatabaseSyncEngineError(format!( "db_info go unexpected status: {status}" @@ -1384,7 +1453,7 @@ async fn db_info_http(coro: &Coro, client: &C) -> Resul async fn db_bootstrap_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, generation: u64, ) -> Result { let completion = client.http("GET", &format!("/export/{generation}"), None, &[])?; @@ -1430,6 +1499,7 @@ pub fn read_varint(buf: &[u8]) -> Result> { pub async fn wait_proto_message( coro: &Coro, completion: &impl DataCompletion, + network_stats: &DataStats, bytes: &mut BytesMut, ) -> Result> { let start_time = std::time::Instant::now(); @@ -1441,6 +1511,7 @@ pub async fn wait_proto_message( }; if not_enough_bytes { if let Some(poll) = completion.poll_data()? { + network_stats.read(poll.data().len()); bytes.extend_from_slice(poll.data()); } else if !completion.is_done()? { coro.yield_(ProtocolCommand::IO).await?; @@ -1469,10 +1540,12 @@ pub async fn wait_proto_message( pub async fn wait_all_results( coro: &Coro, completion: &impl DataCompletion, + stats: Option<&DataStats>, ) -> Result> { let mut results = Vec::new(); loop { while let Some(poll) = completion.poll_data()? { + stats.inspect(|s| s.read(poll.data().len())); results.extend_from_slice(poll.data()); } if completion.is_done()? { @@ -1491,6 +1564,7 @@ mod tests { use prost::Message; use crate::{ + database_sync_engine::DataStats, database_sync_operations::wait_proto_message, protocol_io::{DataCompletion, DataPollResult}, server_proto::PageData, @@ -1552,9 +1626,15 @@ mod tests { let coro: Coro<()> = coro.into(); let mut bytes = BytesMut::new(); let mut count = 0; - while wait_proto_message::<(), PageData>(&coro, &completion, &mut bytes) - .await? - .is_some() + let mut network_stats = DataStats::new(); + while wait_proto_message::<(), PageData>( + &coro, + &completion, + &network_stats, + &mut bytes, + ) + .await? + .is_some() { assert!(bytes.capacity() <= 16 * 1024 + 1024); count += 1; diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index c74431f2d..81951baab 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -71,6 +71,8 @@ pub struct SyncEngineStats { pub last_pull_unix_time: Option, pub last_push_unix_time: Option, pub revision: Option, + pub network_sent_bytes: usize, + pub network_received_bytes: usize, } #[derive(Debug, Clone, Copy, PartialEq)]