mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-27 03:44:25 +01:00
report network stats
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{Arc, Mutex},
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
},
|
||||
};
|
||||
|
||||
use turso_core::{DatabaseStorage, OpenFlags};
|
||||
@@ -11,7 +14,7 @@ use crate::{
|
||||
acquire_slot, apply_transformation, bootstrap_db_file, connect_untracked,
|
||||
count_local_changes, has_table, push_logical_changes, read_last_change_id, read_wal_salt,
|
||||
reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file,
|
||||
wal_pull_to_file, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE,
|
||||
wal_pull_to_file, ProtocolIoStats, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE,
|
||||
},
|
||||
database_sync_partial_storage::PartialDatabaseStorage,
|
||||
database_tape::{
|
||||
@@ -44,9 +47,29 @@ pub struct DatabaseSyncEngineOpts {
|
||||
pub partial: bool,
|
||||
}
|
||||
|
||||
pub struct DataStats {
|
||||
pub written_bytes: AtomicUsize,
|
||||
pub read_bytes: AtomicUsize,
|
||||
}
|
||||
|
||||
impl DataStats {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
written_bytes: AtomicUsize::new(0),
|
||||
read_bytes: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
pub fn write(&self, size: usize) {
|
||||
self.written_bytes.fetch_add(size, Ordering::SeqCst);
|
||||
}
|
||||
pub fn read(&self, size: usize) {
|
||||
self.read_bytes.fetch_add(size, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatabaseSyncEngine<P: ProtocolIO> {
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
protocol: Arc<P>,
|
||||
protocol: ProtocolIoStats<P>,
|
||||
db_file: Arc<dyn turso_core::storage::database::DatabaseStorage>,
|
||||
main_tape: DatabaseTape,
|
||||
main_db_wal_path: String,
|
||||
@@ -80,12 +103,13 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
tracing::info!("init(path={}): opts={:?}", main_db_path, opts);
|
||||
|
||||
let completion = protocol.full_read(&meta_path)?;
|
||||
let data = wait_all_results(coro, &completion).await?;
|
||||
let data = wait_all_results(coro, &completion, None).await?;
|
||||
let meta = if data.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(DatabaseMetadata::load(&data)?)
|
||||
};
|
||||
let protocol = ProtocolIoStats::new(protocol);
|
||||
|
||||
let meta = match meta {
|
||||
Some(meta) => meta,
|
||||
@@ -98,7 +122,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
};
|
||||
let revision = bootstrap_db_file(
|
||||
coro,
|
||||
protocol.as_ref(),
|
||||
&protocol,
|
||||
&io,
|
||||
main_db_path,
|
||||
opts.protocol_version_hint,
|
||||
@@ -124,7 +148,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
|
||||
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
wait_all_results(coro, &completion).await?;
|
||||
wait_all_results(coro, &completion, None).await?;
|
||||
meta
|
||||
}
|
||||
None => {
|
||||
@@ -154,7 +178,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
|
||||
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
wait_all_results(coro, &completion).await?;
|
||||
wait_all_results(coro, &completion, None).await?;
|
||||
meta
|
||||
}
|
||||
};
|
||||
@@ -342,6 +366,16 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
last_pull_unix_time,
|
||||
last_push_unix_time,
|
||||
revision,
|
||||
network_sent_bytes: self
|
||||
.protocol
|
||||
.network_stats
|
||||
.written_bytes
|
||||
.load(Ordering::SeqCst),
|
||||
network_received_bytes: self
|
||||
.protocol
|
||||
.network_stats
|
||||
.read_bytes
|
||||
.load(Ordering::SeqCst),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -458,7 +492,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
let revision = self.meta().synced_revision.clone();
|
||||
let next_revision = wal_pull_to_file(
|
||||
coro,
|
||||
self.protocol.as_ref(),
|
||||
&self.protocol,
|
||||
&file.value,
|
||||
&revision,
|
||||
self.opts.wal_pull_batch_size,
|
||||
@@ -707,13 +741,8 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
|
||||
let mut transformed = if self.opts.use_transform {
|
||||
Some(
|
||||
apply_transformation(
|
||||
coro,
|
||||
self.protocol.as_ref(),
|
||||
&local_changes,
|
||||
&replay.generator,
|
||||
)
|
||||
.await?,
|
||||
apply_transformation(coro, &self.protocol, &local_changes, &replay.generator)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
@@ -755,7 +784,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
|
||||
let (_, change_id) = push_logical_changes(
|
||||
coro,
|
||||
self.protocol.as_ref(),
|
||||
&self.protocol,
|
||||
&self.main_tape,
|
||||
&self.client_unique_id,
|
||||
&self.opts,
|
||||
@@ -808,7 +837,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
tracing::info!("update_meta: {meta:?}");
|
||||
let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?;
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
wait_all_results(coro, &completion).await?;
|
||||
wait_all_results(coro, &completion, None).await?;
|
||||
*self.meta.lock().unwrap() = meta;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{
|
||||
ops::Deref,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use prost::Message;
|
||||
@@ -10,7 +13,7 @@ use turso_core::{
|
||||
|
||||
use crate::{
|
||||
database_replay_generator::DatabaseReplayGenerator,
|
||||
database_sync_engine::DatabaseSyncEngineOpts,
|
||||
database_sync_engine::{DataStats, DatabaseSyncEngineOpts},
|
||||
database_tape::{
|
||||
run_stmt_expect_one_row, run_stmt_ignore_rows, DatabaseChangesIteratorMode,
|
||||
DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
|
||||
@@ -60,6 +63,37 @@ pub(crate) fn acquire_slot<T: Clone>(slot: &Arc<Mutex<Option<T>>>) -> Result<Mut
|
||||
})
|
||||
}
|
||||
|
||||
pub struct ProtocolIoStats<P: ProtocolIO> {
|
||||
pub protocol: Arc<P>,
|
||||
pub network_stats: Arc<DataStats>,
|
||||
}
|
||||
|
||||
impl<P: ProtocolIO> ProtocolIoStats<P> {
|
||||
pub fn new(protocol: Arc<P>) -> Self {
|
||||
Self {
|
||||
protocol,
|
||||
network_stats: Arc::new(DataStats::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: ProtocolIO> Clone for ProtocolIoStats<P> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
protocol: self.protocol.clone(),
|
||||
network_stats: self.network_stats.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: ProtocolIO> Deref for ProtocolIoStats<P> {
|
||||
type Target = P;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.protocol
|
||||
}
|
||||
}
|
||||
|
||||
enum WalHttpPullResult<C: DataCompletion<u8>> {
|
||||
Frames(C),
|
||||
NeedCheckpoint(DbSyncStatus),
|
||||
@@ -79,7 +113,7 @@ pub fn connect_untracked(tape: &DatabaseTape) -> Result<Arc<turso_core::Connecti
|
||||
/// Bootstrap multiple DB files from latest generation from remote
|
||||
pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
db: Arc<dyn turso_core::File>,
|
||||
) -> Result<DbSyncInfo> {
|
||||
tracing::info!("db_bootstrap");
|
||||
@@ -90,8 +124,10 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
|
||||
let mut pos = 0;
|
||||
loop {
|
||||
while let Some(chunk) = content.poll_data()? {
|
||||
client.network_stats.read(chunk.data().len());
|
||||
let chunk = chunk.data();
|
||||
let content_len = chunk.len();
|
||||
|
||||
// todo(sivukhin): optimize allocations here
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let buffer = Arc::new(Buffer::new_temporary(chunk.len()));
|
||||
@@ -164,7 +200,7 @@ pub async fn wal_apply_from_file<Ctx>(
|
||||
|
||||
pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
frames_file: &Arc<dyn turso_core::File>,
|
||||
revision: &Option<DatabasePullRevision>,
|
||||
wal_pull_batch_size: u64,
|
||||
@@ -207,7 +243,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
|
||||
/// Pull updates from remote to the separate file
|
||||
pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
frames_file: &Arc<dyn turso_core::File>,
|
||||
revision: &str,
|
||||
long_poll_timeout: Option<std::time::Duration>,
|
||||
@@ -224,6 +260,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
client_pages: BytesMut::new().into(),
|
||||
};
|
||||
let request = request.encode_to_vec();
|
||||
client.network_stats.write(request.len());
|
||||
let completion = client.http(
|
||||
"POST",
|
||||
"/pull-updates",
|
||||
@@ -233,8 +270,13 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
("accept-encoding", "application/protobuf"),
|
||||
],
|
||||
)?;
|
||||
let Some(header) =
|
||||
wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(coro, &completion, &mut bytes).await?
|
||||
let Some(header) = wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(
|
||||
coro,
|
||||
&completion,
|
||||
&client.network_stats,
|
||||
&mut bytes,
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
"no header returned in the pull-updates protobuf call".to_string(),
|
||||
@@ -247,7 +289,8 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE));
|
||||
|
||||
let mut page_data_opt =
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?;
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &client.network_stats, &mut bytes)
|
||||
.await?;
|
||||
while let Some(page_data) = page_data_opt.take() {
|
||||
let page_id = page_data.page_id;
|
||||
tracing::info!("received page {}", page_id);
|
||||
@@ -260,7 +303,8 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
)));
|
||||
}
|
||||
buffer.as_mut_slice()[WAL_FRAME_HEADER..].copy_from_slice(&page);
|
||||
page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?;
|
||||
page_data_opt =
|
||||
wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?;
|
||||
let mut frame_info = WalFrameInfo {
|
||||
db_size: 0,
|
||||
page_no: page_id as u32 + 1,
|
||||
@@ -302,7 +346,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
/// Pull pages from remote
|
||||
pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
server_revision: &str,
|
||||
pages: &[u32],
|
||||
) -> Result<Vec<u8>> {
|
||||
@@ -326,6 +370,7 @@ pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
client_pages: BytesMut::new().into(),
|
||||
};
|
||||
let request = request.encode_to_vec();
|
||||
client.network_stats.write(request.len());
|
||||
let completion = client.http(
|
||||
"POST",
|
||||
"/pull-updates",
|
||||
@@ -335,8 +380,13 @@ pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
("accept-encoding", "application/protobuf"),
|
||||
],
|
||||
)?;
|
||||
let Some(header) =
|
||||
wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(coro, &completion, &mut bytes).await?
|
||||
let Some(header) = wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(
|
||||
coro,
|
||||
&completion,
|
||||
&client.network_stats,
|
||||
&mut bytes,
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
"no header returned in the pull-updates protobuf call".to_string(),
|
||||
@@ -347,7 +397,8 @@ pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
let mut pages = Vec::with_capacity(PAGE_SIZE * pages.len());
|
||||
|
||||
let mut page_data_opt =
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?;
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &client.network_stats, &mut bytes)
|
||||
.await?;
|
||||
while let Some(page_data) = page_data_opt.take() {
|
||||
let page_id = page_data.page_id;
|
||||
tracing::info!("received page {}", page_id);
|
||||
@@ -360,7 +411,8 @@ pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
)));
|
||||
}
|
||||
pages.extend_from_slice(&page);
|
||||
page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?;
|
||||
page_data_opt =
|
||||
wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?;
|
||||
tracing::info!("page_data_opt: {}", page_data_opt.is_some());
|
||||
}
|
||||
|
||||
@@ -370,7 +422,7 @@ pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
/// Pull updates from remote to the separate file
|
||||
pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
frames_file: &Arc<dyn turso_core::File>,
|
||||
mut generation: u64,
|
||||
mut start_frame: u64,
|
||||
@@ -408,7 +460,9 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
|
||||
};
|
||||
loop {
|
||||
while let Some(chunk) = data.poll_data()? {
|
||||
client.network_stats.read(chunk.data().len());
|
||||
let mut chunk = chunk.data();
|
||||
|
||||
while !chunk.is_empty() {
|
||||
let to_fill = (WAL_FRAME_SIZE - buffer_len).min(chunk.len());
|
||||
buffer.as_mut_slice()[buffer_len..buffer_len + to_fill]
|
||||
@@ -494,7 +548,7 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
|
||||
/// and can be called multiple times with same frame range
|
||||
pub async fn wal_push<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
wal_session: &mut WalSession,
|
||||
baton: Option<String>,
|
||||
generation: u64,
|
||||
@@ -693,7 +747,7 @@ pub async fn read_last_change_id<Ctx>(
|
||||
|
||||
pub async fn fetch_last_change_id<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
source_conn: &Arc<turso_core::Connection>,
|
||||
client_id: &str,
|
||||
) -> Result<(i64, Option<i64>)> {
|
||||
@@ -773,7 +827,7 @@ pub async fn fetch_last_change_id<C: ProtocolIO, Ctx>(
|
||||
|
||||
pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
source: &DatabaseTape,
|
||||
client_id: &str,
|
||||
opts: &DatabaseSyncEngineOpts,
|
||||
@@ -992,9 +1046,9 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
|
||||
Ok((source_pull_gen, last_change_id.unwrap_or(0)))
|
||||
}
|
||||
|
||||
pub async fn apply_transformation<Ctx, P: ProtocolIO>(
|
||||
pub async fn apply_transformation<Ctx, C: ProtocolIO>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &P,
|
||||
client: &ProtocolIoStats<C>,
|
||||
changes: &Vec<DatabaseTapeRowChange>,
|
||||
generator: &DatabaseReplayGenerator,
|
||||
) -> Result<Vec<DatabaseRowTransformResult>> {
|
||||
@@ -1004,7 +1058,7 @@ pub async fn apply_transformation<Ctx, P: ProtocolIO>(
|
||||
mutations.push(generator.create_mutation(&replay_info, change)?);
|
||||
}
|
||||
let completion = client.transform(mutations)?;
|
||||
let transformed = wait_all_results(coro, &completion).await?;
|
||||
let transformed = wait_all_results(coro, &completion, None).await?;
|
||||
if transformed.len() != changes.len() {
|
||||
return Err(Error::DatabaseSyncEngineError(format!(
|
||||
"unexpected result from custom transformation: mismatch in shapes: {} != {}",
|
||||
@@ -1064,7 +1118,7 @@ pub async fn checkpoint_wal_file<Ctx>(
|
||||
|
||||
pub async fn bootstrap_db_file<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
io: &Arc<dyn turso_core::IO>,
|
||||
main_db_path: &str,
|
||||
protocol: DatabaseSyncEngineProtocolVersion,
|
||||
@@ -1087,7 +1141,7 @@ pub async fn bootstrap_db_file<C: ProtocolIO, Ctx>(
|
||||
|
||||
pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
io: &Arc<dyn turso_core::IO>,
|
||||
main_db_path: &str,
|
||||
prefix: Option<usize>,
|
||||
@@ -1117,6 +1171,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
client_pages: BytesMut::new().into(),
|
||||
};
|
||||
let request = request.encode_to_vec();
|
||||
client.network_stats.write(request.len());
|
||||
let completion = client.http(
|
||||
"POST",
|
||||
"/pull-updates",
|
||||
@@ -1127,8 +1182,13 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
],
|
||||
)?;
|
||||
let mut bytes = BytesMut::new();
|
||||
let Some(header) =
|
||||
wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(coro, &completion, &mut bytes).await?
|
||||
let Some(header) = wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(
|
||||
coro,
|
||||
&completion,
|
||||
&client.network_stats,
|
||||
&mut bytes,
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
"no header returned in the pull-updates protobuf call".to_string(),
|
||||
@@ -1154,7 +1214,8 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let buffer = Arc::new(Buffer::new_temporary(PAGE_SIZE));
|
||||
while let Some(page_data) =
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &client.network_stats, &mut bytes)
|
||||
.await?
|
||||
{
|
||||
let offset = page_data.page_id * PAGE_SIZE as u64;
|
||||
let page = decode_page(&header, page_data)?;
|
||||
@@ -1205,7 +1266,7 @@ fn decode_page(header: &PullUpdatesRespProtoBody, page_data: PageData) -> Result
|
||||
|
||||
pub async fn bootstrap_db_file_legacy<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
io: &Arc<dyn turso_core::IO>,
|
||||
main_db_path: &str,
|
||||
) -> Result<DatabasePullRevision> {
|
||||
@@ -1264,17 +1325,20 @@ pub async fn reset_wal_file<Ctx>(
|
||||
|
||||
async fn sql_execute_http<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
request: server_proto::PipelineReqBody,
|
||||
) -> Result<Vec<StmtResult>> {
|
||||
let body = serde_json::to_vec(&request)?;
|
||||
|
||||
client.network_stats.write(body.len());
|
||||
let completion = client.http("POST", "/v2/pipeline", Some(body), &[])?;
|
||||
|
||||
let status = wait_status(coro, &completion).await?;
|
||||
if status != http::StatusCode::OK {
|
||||
let error = format!("sql_execute_http: unexpected status code: {status}");
|
||||
return Err(Error::DatabaseSyncEngineError(error));
|
||||
}
|
||||
let response = wait_all_results(coro, &completion).await?;
|
||||
let response = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
|
||||
let response: server_proto::PipelineRespBody = serde_json::from_slice(&response)?;
|
||||
tracing::debug!("hrana response: {:?}", response);
|
||||
let mut results = Vec::new();
|
||||
@@ -1312,7 +1376,7 @@ async fn sql_execute_http<C: ProtocolIO, Ctx>(
|
||||
|
||||
async fn wal_pull_http<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
generation: u64,
|
||||
start_frame: u64,
|
||||
end_frame: u64,
|
||||
@@ -1325,7 +1389,7 @@ async fn wal_pull_http<C: ProtocolIO, Ctx>(
|
||||
)?;
|
||||
let status = wait_status(coro, &completion).await?;
|
||||
if status == http::StatusCode::BAD_REQUEST {
|
||||
let status_body = wait_all_results(coro, &completion).await?;
|
||||
let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
|
||||
let status: DbSyncStatus = serde_json::from_slice(&status_body)?;
|
||||
if status.status == "checkpoint_needed" {
|
||||
return Ok(WalHttpPullResult::NeedCheckpoint(status));
|
||||
@@ -1343,7 +1407,7 @@ async fn wal_pull_http<C: ProtocolIO, Ctx>(
|
||||
|
||||
async fn wal_push_http<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
baton: Option<String>,
|
||||
generation: u64,
|
||||
start_frame: u64,
|
||||
@@ -1353,6 +1417,8 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
|
||||
let baton = baton
|
||||
.map(|baton| format!("/{baton}"))
|
||||
.unwrap_or("".to_string());
|
||||
|
||||
client.network_stats.write(frames.len());
|
||||
let completion = client.http(
|
||||
"POST",
|
||||
&format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"),
|
||||
@@ -1360,7 +1426,7 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
|
||||
&[],
|
||||
)?;
|
||||
let status = wait_status(coro, &completion).await?;
|
||||
let status_body = wait_all_results(coro, &completion).await?;
|
||||
let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
|
||||
if status != http::StatusCode::OK {
|
||||
let error = std::str::from_utf8(&status_body).ok().unwrap_or("");
|
||||
return Err(Error::DatabaseSyncEngineError(format!(
|
||||
@@ -1370,10 +1436,13 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
|
||||
Ok(serde_json::from_slice(&status_body)?)
|
||||
}
|
||||
|
||||
async fn db_info_http<C: ProtocolIO, Ctx>(coro: &Coro<Ctx>, client: &C) -> Result<DbSyncInfo> {
|
||||
async fn db_info_http<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &ProtocolIoStats<C>,
|
||||
) -> Result<DbSyncInfo> {
|
||||
let completion = client.http("GET", "/info", None, &[])?;
|
||||
let status = wait_status(coro, &completion).await?;
|
||||
let status_body = wait_all_results(coro, &completion).await?;
|
||||
let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
|
||||
if status != http::StatusCode::OK {
|
||||
return Err(Error::DatabaseSyncEngineError(format!(
|
||||
"db_info go unexpected status: {status}"
|
||||
@@ -1384,7 +1453,7 @@ async fn db_info_http<C: ProtocolIO, Ctx>(coro: &Coro<Ctx>, client: &C) -> Resul
|
||||
|
||||
async fn db_bootstrap_http<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
client: &ProtocolIoStats<C>,
|
||||
generation: u64,
|
||||
) -> Result<C::DataCompletionBytes> {
|
||||
let completion = client.http("GET", &format!("/export/{generation}"), None, &[])?;
|
||||
@@ -1430,6 +1499,7 @@ pub fn read_varint(buf: &[u8]) -> Result<Option<(usize, usize)>> {
|
||||
pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
|
||||
coro: &Coro<Ctx>,
|
||||
completion: &impl DataCompletion<u8>,
|
||||
network_stats: &DataStats,
|
||||
bytes: &mut BytesMut,
|
||||
) -> Result<Option<T>> {
|
||||
let start_time = std::time::Instant::now();
|
||||
@@ -1441,6 +1511,7 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
|
||||
};
|
||||
if not_enough_bytes {
|
||||
if let Some(poll) = completion.poll_data()? {
|
||||
network_stats.read(poll.data().len());
|
||||
bytes.extend_from_slice(poll.data());
|
||||
} else if !completion.is_done()? {
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
@@ -1469,10 +1540,12 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
|
||||
pub async fn wait_all_results<Ctx, T: Clone>(
|
||||
coro: &Coro<Ctx>,
|
||||
completion: &impl DataCompletion<T>,
|
||||
stats: Option<&DataStats>,
|
||||
) -> Result<Vec<T>> {
|
||||
let mut results = Vec::new();
|
||||
loop {
|
||||
while let Some(poll) = completion.poll_data()? {
|
||||
stats.inspect(|s| s.read(poll.data().len()));
|
||||
results.extend_from_slice(poll.data());
|
||||
}
|
||||
if completion.is_done()? {
|
||||
@@ -1491,6 +1564,7 @@ mod tests {
|
||||
use prost::Message;
|
||||
|
||||
use crate::{
|
||||
database_sync_engine::DataStats,
|
||||
database_sync_operations::wait_proto_message,
|
||||
protocol_io::{DataCompletion, DataPollResult},
|
||||
server_proto::PageData,
|
||||
@@ -1552,9 +1626,15 @@ mod tests {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let mut bytes = BytesMut::new();
|
||||
let mut count = 0;
|
||||
while wait_proto_message::<(), PageData>(&coro, &completion, &mut bytes)
|
||||
.await?
|
||||
.is_some()
|
||||
let mut network_stats = DataStats::new();
|
||||
while wait_proto_message::<(), PageData>(
|
||||
&coro,
|
||||
&completion,
|
||||
&network_stats,
|
||||
&mut bytes,
|
||||
)
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
assert!(bytes.capacity() <= 16 * 1024 + 1024);
|
||||
count += 1;
|
||||
|
||||
@@ -71,6 +71,8 @@ pub struct SyncEngineStats {
|
||||
pub last_pull_unix_time: Option<i64>,
|
||||
pub last_push_unix_time: Option<i64>,
|
||||
pub revision: Option<String>,
|
||||
pub network_sent_bytes: usize,
|
||||
pub network_received_bytes: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
|
||||
Reference in New Issue
Block a user