Merge 'Partial sync basic' from Nikita Sivukhin

This PR implements basic support for partial sync. Right now the scope
is limited to only `:memory:` IO and later will be properly expanded to
the file based IO later.
The main addition is `PartialDatabaseStorage` which make request to the
remote server for missing local pages on demand.
The main change is that now tursodatabase JS bindings accept optional
"external" IO event loop which in case of sync will drive `ProtocolIo`
internal work associated with remote page fetching tasks.

Closes #3931
This commit is contained in:
Pekka Enberg
2025-11-13 16:38:04 +02:00
committed by GitHub
29 changed files with 1226 additions and 200 deletions

View File

@@ -1,17 +1,21 @@
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};
use turso_core::OpenFlags;
use turso_core::{DatabaseStorage, OpenFlags};
use crate::{
database_replay_generator::DatabaseReplayGenerator,
database_sync_lazy_storage::LazyDatabaseStorage,
database_sync_operations::{
acquire_slot, apply_transformation, bootstrap_db_file, connect_untracked,
count_local_changes, has_table, push_logical_changes, read_last_change_id, read_wal_salt,
reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file,
wal_pull_to_file, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE,
wal_pull_to_file, ProtocolIoStats, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE,
},
database_tape::{
DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySession,
@@ -30,6 +34,12 @@ use crate::{
Result,
};
#[derive(Clone, Debug)]
pub enum PartialBootstrapStrategy {
Prefix { length: usize },
Query { query: String },
}
#[derive(Clone, Debug)]
pub struct DatabaseSyncEngineOpts {
pub client_name: String,
@@ -40,11 +50,38 @@ pub struct DatabaseSyncEngineOpts {
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
pub bootstrap_if_empty: bool,
pub reserved_bytes: usize,
pub partial_bootstrap_strategy: Option<PartialBootstrapStrategy>,
}
pub struct DataStats {
pub written_bytes: AtomicUsize,
pub read_bytes: AtomicUsize,
}
impl Default for DataStats {
fn default() -> Self {
Self::new()
}
}
impl DataStats {
pub fn new() -> Self {
Self {
written_bytes: AtomicUsize::new(0),
read_bytes: AtomicUsize::new(0),
}
}
pub fn write(&self, size: usize) {
self.written_bytes.fetch_add(size, Ordering::SeqCst);
}
pub fn read(&self, size: usize) {
self.read_bytes.fetch_add(size, Ordering::SeqCst);
}
}
pub struct DatabaseSyncEngine<P: ProtocolIO> {
io: Arc<dyn turso_core::IO>,
protocol: Arc<P>,
protocol: ProtocolIoStats<P>,
db_file: Arc<dyn turso_core::storage::database::DatabaseStorage>,
main_tape: DatabaseTape,
main_db_wal_path: String,
@@ -68,7 +105,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
io: Arc<dyn turso_core::IO>,
protocol: Arc<P>,
main_db_path: &str,
opts: DatabaseSyncEngineOpts,
mut opts: DatabaseSyncEngineOpts,
) -> Result<Self> {
let main_db_wal_path = format!("{main_db_path}-wal");
let revert_db_wal_path = format!("{main_db_path}-wal-revert");
@@ -78,12 +115,15 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
tracing::info!("init(path={}): opts={:?}", main_db_path, opts);
let completion = protocol.full_read(&meta_path)?;
let data = wait_all_results(coro, &completion).await?;
let data = wait_all_results(coro, &completion, None).await?;
let meta = if data.is_empty() {
None
} else {
Some(DatabaseMetadata::load(&data)?)
};
let protocol = ProtocolIoStats::new(protocol);
let partial_bootstrap_strategy = opts.partial_bootstrap_strategy.take();
let partial = partial_bootstrap_strategy.is_some();
let meta = match meta {
Some(meta) => meta,
@@ -91,27 +131,33 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
let revision = bootstrap_db_file(
coro,
protocol.as_ref(),
&protocol,
&io,
main_db_path,
opts.protocol_version_hint,
partial_bootstrap_strategy,
)
.await?;
let meta = DatabaseMetadata {
version: DATABASE_METADATA_VERSION.to_string(),
client_unique_id,
synced_revision: Some(revision),
synced_revision: Some(revision.clone()),
revert_since_wal_salt: None,
revert_since_wal_watermark: 0,
last_pushed_change_id_hint: 0,
last_pushed_pull_gen_hint: 0,
last_pull_unix_time: Some(io.now().secs),
last_push_unix_time: None,
partial_bootstrap_server_revision: if partial {
Some(revision.clone())
} else {
None
},
};
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_all_results(coro, &completion).await?;
wait_all_results(coro, &completion, None).await?;
meta
}
None => {
@@ -120,6 +166,11 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
"deferred bootstrap is not supported for legacy protocol".to_string(),
));
}
if partial {
return Err(Error::DatabaseSyncEngineError(
"deferred bootstrap is not supported for partial sync".to_string(),
));
}
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
let meta = DatabaseMetadata {
version: DATABASE_METADATA_VERSION.to_string(),
@@ -131,11 +182,12 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
last_pushed_pull_gen_hint: 0,
last_pull_unix_time: None,
last_push_unix_time: None,
partial_bootstrap_server_revision: None,
};
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_all_results(coro, &completion).await?;
wait_all_results(coro, &completion, None).await?;
meta
}
};
@@ -156,7 +208,27 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
}
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
let db_file: Arc<dyn DatabaseStorage> = if partial {
let Some(partial_bootstrap_server_revision) = &meta.partial_bootstrap_server_revision
else {
return Err(Error::DatabaseSyncEngineError(
"partial_bootstrap_server_revision must be set in the metadata".to_string(),
));
};
let DatabasePullRevision::V1 { revision } = &partial_bootstrap_server_revision else {
return Err(Error::DatabaseSyncEngineError(
"partial sync is supported only for V1 protocol".to_string(),
));
};
Arc::new(LazyDatabaseStorage::new(
db_file,
None, // todo(sivukhin): allocate dirty file for FS IO
protocol.clone(),
revision.to_string(),
))
} else {
Arc::new(turso_core::storage::database::DatabaseFile::new(db_file))
};
let main_db = turso_core::Database::open_with_flags(
io.clone(),
@@ -304,6 +376,16 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
last_pull_unix_time,
last_push_unix_time,
revision,
network_sent_bytes: self
.protocol
.network_stats
.written_bytes
.load(Ordering::SeqCst),
network_received_bytes: self
.protocol
.network_stats
.read_bytes
.load(Ordering::SeqCst),
})
}
@@ -420,7 +502,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let revision = self.meta().synced_revision.clone();
let next_revision = wal_pull_to_file(
coro,
self.protocol.as_ref(),
&self.protocol,
&file.value,
&revision,
self.opts.wal_pull_batch_size,
@@ -669,13 +751,8 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let mut transformed = if self.opts.use_transform {
Some(
apply_transformation(
coro,
self.protocol.as_ref(),
&local_changes,
&replay.generator,
)
.await?,
apply_transformation(coro, &self.protocol, &local_changes, &replay.generator)
.await?,
)
} else {
None
@@ -717,7 +794,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let (_, change_id) = push_logical_changes(
coro,
self.protocol.as_ref(),
&self.protocol,
&self.main_tape,
&self.client_unique_id,
&self.opts,
@@ -770,7 +847,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
tracing::info!("update_meta: {meta:?}");
let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_all_results(coro, &completion).await?;
wait_all_results(coro, &completion, None).await?;
*self.meta.lock().unwrap() = meta;
Ok(())
}

View File

@@ -0,0 +1,247 @@
use std::sync::Arc;
use turso_core::{Buffer, Completion, CompletionError, DatabaseStorage, File, LimboError};
use crate::{
database_sync_operations::{pull_pages_v1, ProtocolIoStats, PAGE_SIZE},
errors,
protocol_io::ProtocolIO,
types::Coro,
};
pub struct LazyDatabaseStorage<P: ProtocolIO> {
clean_file: Arc<dyn File>,
dirty_file: Option<Arc<dyn File>>,
protocol: ProtocolIoStats<P>,
server_revision: String,
}
impl<P: ProtocolIO> LazyDatabaseStorage<P> {
pub fn new(
clean_file: Arc<dyn File>,
dirty_file: Option<Arc<dyn File>>,
protocol: ProtocolIoStats<P>,
server_revision: String,
) -> Self {
Self {
clean_file,
dirty_file,
protocol,
server_revision,
}
}
}
impl<P: ProtocolIO> DatabaseStorage for LazyDatabaseStorage<P> {
fn read_header(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
assert!(
!self.clean_file.has_hole(0, PAGE_SIZE)?,
"first page must be filled"
);
self.clean_file.pread(0, c)
}
fn read_page(
&self,
page_idx: usize,
io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
io_ctx.encryption_context().is_none(),
"encryption or checksum are not supported with partial sync"
);
assert!(page_idx as i64 >= 0, "page should be positive");
let r = c.as_read();
let size = r.buf().len();
assert!(page_idx > 0);
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
return Err(LimboError::NotADB);
}
let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else {
return Err(LimboError::IntegerOverflow);
};
if !self.clean_file.has_hole(pos as usize, size)? {
let Some(dirty_file) = &self.dirty_file else {
// no dirty file was set - this means that FS is atomic (e.g. MemoryIO)
return self.clean_file.pread(pos, c);
};
if dirty_file.has_hole(pos as usize, size)? {
// dirty file has no hole - this means that we cleanly removed the hole when we wrote to the clean file
return self.clean_file.pread(pos, c);
}
let check_buffer = Arc::new(Buffer::new_temporary(size));
let check_c =
dirty_file.pread(pos, Completion::new_read(check_buffer.clone(), |_| {}))?;
assert!(
check_c.finished(),
"LazyDatabaseStorage works only with sync IO"
);
let clean_buffer = r.buf_arc();
let clean_c = self
.clean_file
.pread(pos, Completion::new_read(clean_buffer.clone(), |_| {}))?;
assert!(
clean_c.finished(),
"LazyDatabaseStorage works only with sync IO"
);
if check_buffer.as_slice().eq(clean_buffer.as_slice()) {
// dirty buffer matches clean buffer - this means that clean data is valid
return self.clean_file.pread(pos, c);
}
}
tracing::info!(
"PartialDatabaseStorage::read_page(page_idx={}): read page from the remote server",
page_idx
);
let mut generator = genawaiter::sync::Gen::new({
let protocol = self.protocol.clone();
let server_revision = self.server_revision.clone();
let clean_file = self.clean_file.clone();
let dirty_file = self.dirty_file.clone();
let c = c.clone();
|coro| async move {
let coro = Coro::new((), coro);
let pages = [(page_idx - 1) as u32];
let result = pull_pages_v1(&coro, &protocol, &server_revision, &pages).await;
match result {
Ok(page) => {
let read = c.as_read();
let buf = read.buf_arc();
buf.as_mut_slice().copy_from_slice(&page);
if let Some(dirty_file) = &dirty_file {
let dirty_c = dirty_file.pwrite(
pos,
buf.clone(),
Completion::new_write(|_| {}),
)?;
assert!(
dirty_c.finished(),
"LazyDatabaseStorage works only with sync IO"
);
}
let clean_c =
clean_file.pwrite(pos, buf.clone(), Completion::new_write(|_| {}))?;
assert!(
clean_c.finished(),
"LazyDatabaseStorage works only with sync IO"
);
if let Some(dirty_file) = &dirty_file {
dirty_file.punch_hole(pos as usize, buf.len())?;
}
c.complete(buf.len() as i32);
Ok::<(), errors::Error>(())
}
Err(err) => {
tracing::error!("failed to fetch path from remote server: {err}");
c.error(CompletionError::IOError(std::io::ErrorKind::Other));
Err(err)
}
}
}
});
self.protocol
.add_work(Box::new(move || match generator.resume_with(Ok(())) {
genawaiter::GeneratorState::Yielded(_) => false,
genawaiter::GeneratorState::Complete(_) => true,
}));
Ok(c)
}
fn write_page(
&self,
page_idx: usize,
buffer: std::sync::Arc<turso_core::Buffer>,
io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
io_ctx.encryption_context().is_none(),
"encryption or checksum are not supported with partial sync"
);
let buffer_size = buffer.len();
assert!(page_idx > 0);
assert!(buffer_size >= 512);
assert!(buffer_size <= 65536);
assert_eq!(buffer_size & (buffer_size - 1), 0);
let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else {
return Err(LimboError::IntegerOverflow);
};
// we write to the database only during checkpoint - so we need to punch hole in the dirty file in order to mark this region as valid
if let Some(dirty_file) = &self.dirty_file {
dirty_file.punch_hole(pos as usize, buffer_size)?;
}
self.clean_file.pwrite(pos, buffer, c)
}
fn write_pages(
&self,
first_page_idx: usize,
page_size: usize,
buffers: Vec<std::sync::Arc<turso_core::Buffer>>,
io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
io_ctx.encryption_context().is_none(),
"encryption or checksum are not supported with partial sync"
);
assert!(first_page_idx > 0);
assert!(page_size >= 512);
assert!(page_size <= 65536);
assert_eq!(page_size & (page_size - 1), 0);
let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else {
return Err(LimboError::IntegerOverflow);
};
// we write to the database only during checkpoint - so we need to punch hole in the dirty file in order to mark this region as valid
if let Some(dirty_file) = &self.dirty_file {
let buffers_size = buffers.iter().map(|b| b.len()).sum();
dirty_file.punch_hole(pos as usize, buffers_size)?;
}
let c = self.clean_file.pwritev(pos, buffers, c)?;
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
if let Some(dirty_file) = &self.dirty_file {
let dirty_c = dirty_file.sync(Completion::new_sync(|_| {}))?;
assert!(
dirty_c.finished(),
"LazyDatabaseStorage works only with sync IO"
);
}
self.clean_file.sync(c)
}
fn size(&self) -> turso_core::Result<u64> {
self.clean_file.size()
}
fn truncate(
&self,
len: usize,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
if let Some(dirty_file) = &self.dirty_file {
let dirty_c = dirty_file.truncate(len as u64, Completion::new_trunc(|_| {}))?;
assert!(
dirty_c.finished(),
"LazyDatabaseStorage works only with sync IO"
);
}
self.clean_file.truncate(len as u64, c)
}
}

View File

@@ -1,7 +1,11 @@
use std::sync::{Arc, Mutex};
use std::{
ops::Deref,
sync::{Arc, Mutex},
};
use bytes::BytesMut;
use prost::Message;
use roaring::RoaringBitmap;
use turso_core::{
types::{Text, WalFrameInfo},
Buffer, Completion, LimboError, OpenFlags, Value,
@@ -9,7 +13,7 @@ use turso_core::{
use crate::{
database_replay_generator::DatabaseReplayGenerator,
database_sync_engine::DatabaseSyncEngineOpts,
database_sync_engine::{DataStats, DatabaseSyncEngineOpts, PartialBootstrapStrategy},
database_tape::{
run_stmt_expect_one_row, run_stmt_ignore_rows, DatabaseChangesIteratorMode,
DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
@@ -59,6 +63,37 @@ pub(crate) fn acquire_slot<T: Clone>(slot: &Arc<Mutex<Option<T>>>) -> Result<Mut
})
}
pub struct ProtocolIoStats<P: ProtocolIO> {
pub protocol: Arc<P>,
pub network_stats: Arc<DataStats>,
}
impl<P: ProtocolIO> ProtocolIoStats<P> {
pub fn new(protocol: Arc<P>) -> Self {
Self {
protocol,
network_stats: Arc::new(DataStats::new()),
}
}
}
impl<P: ProtocolIO> Clone for ProtocolIoStats<P> {
fn clone(&self) -> Self {
Self {
protocol: self.protocol.clone(),
network_stats: self.network_stats.clone(),
}
}
}
impl<P: ProtocolIO> Deref for ProtocolIoStats<P> {
type Target = P;
fn deref(&self) -> &Self::Target {
&self.protocol
}
}
enum WalHttpPullResult<C: DataCompletion<u8>> {
Frames(C),
NeedCheckpoint(DbSyncStatus),
@@ -78,7 +113,7 @@ pub fn connect_untracked(tape: &DatabaseTape) -> Result<Arc<turso_core::Connecti
/// Bootstrap multiple DB files from latest generation from remote
pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
db: Arc<dyn turso_core::File>,
) -> Result<DbSyncInfo> {
tracing::info!("db_bootstrap");
@@ -89,8 +124,10 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
let mut pos = 0;
loop {
while let Some(chunk) = content.poll_data()? {
client.network_stats.read(chunk.data().len());
let chunk = chunk.data();
let content_len = chunk.len();
// todo(sivukhin): optimize allocations here
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(Buffer::new_temporary(chunk.len()));
@@ -163,7 +200,7 @@ pub async fn wal_apply_from_file<Ctx>(
pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
frames_file: &Arc<dyn turso_core::File>,
revision: &Option<DatabasePullRevision>,
wal_pull_batch_size: u64,
@@ -206,7 +243,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
/// Pull updates from remote to the separate file
pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
frames_file: &Arc<dyn turso_core::File>,
revision: &str,
long_poll_timeout: Option<std::time::Duration>,
@@ -219,10 +256,12 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
server_revision: String::new(),
client_revision: revision.to_string(),
long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0),
server_pages: BytesMut::new().into(),
server_pages_selector: BytesMut::new().into(),
server_query_selector: String::new(),
client_pages: BytesMut::new().into(),
};
let request = request.encode_to_vec();
client.network_stats.write(request.len());
let completion = client.http(
"POST",
"/pull-updates",
@@ -232,8 +271,13 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
("accept-encoding", "application/protobuf"),
],
)?;
let Some(header) =
wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(coro, &completion, &mut bytes).await?
let Some(header) = wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(
coro,
&completion,
&client.network_stats,
&mut bytes,
)
.await?
else {
return Err(Error::DatabaseSyncEngineError(
"no header returned in the pull-updates protobuf call".to_string(),
@@ -246,7 +290,8 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE));
let mut page_data_opt =
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?;
wait_proto_message::<Ctx, PageData>(coro, &completion, &client.network_stats, &mut bytes)
.await?;
while let Some(page_data) = page_data_opt.take() {
let page_id = page_data.page_id;
tracing::info!("received page {}", page_id);
@@ -259,7 +304,8 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
)));
}
buffer.as_mut_slice()[WAL_FRAME_HEADER..].copy_from_slice(&page);
page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?;
page_data_opt =
wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?;
let mut frame_info = WalFrameInfo {
db_size: 0,
page_no: page_id as u32 + 1,
@@ -298,10 +344,87 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
})
}
/// Pull pages from remote
pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &ProtocolIoStats<C>,
server_revision: &str,
pages: &[u32],
) -> Result<Vec<u8>> {
tracing::info!("pull_pages_v1: revision={server_revision}, pages={pages:?}");
let mut bytes = BytesMut::new();
let mut bitmap = RoaringBitmap::new();
bitmap.extend(pages);
let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size());
bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| {
Error::DatabaseSyncEngineError(format!("unable to serialize pull page request: {e}"))
})?;
let request = PullUpdatesReqProtoBody {
encoding: PageUpdatesEncodingReq::Raw as i32,
server_revision: server_revision.to_string(),
client_revision: String::new(),
long_poll_timeout_ms: 0,
server_pages_selector: bitmap_bytes.into(),
server_query_selector: String::new(),
client_pages: BytesMut::new().into(),
};
let request = request.encode_to_vec();
client.network_stats.write(request.len());
let completion = client.http(
"POST",
"/pull-updates",
Some(request),
&[
("content-type", "application/protobuf"),
("accept-encoding", "application/protobuf"),
],
)?;
let Some(header) = wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(
coro,
&completion,
&client.network_stats,
&mut bytes,
)
.await?
else {
return Err(Error::DatabaseSyncEngineError(
"no header returned in the pull-updates protobuf call".to_string(),
));
};
tracing::info!("pull_pages_v1: got header={:?}", header);
let mut pages = Vec::with_capacity(PAGE_SIZE * pages.len());
let mut page_data_opt =
wait_proto_message::<Ctx, PageData>(coro, &completion, &client.network_stats, &mut bytes)
.await?;
while let Some(page_data) = page_data_opt.take() {
let page_id = page_data.page_id;
tracing::info!("received page {}", page_id);
let page = decode_page(&header, page_data)?;
if page.len() != PAGE_SIZE {
return Err(Error::DatabaseSyncEngineError(format!(
"page has unexpected size: {} != {}",
page.len(),
PAGE_SIZE
)));
}
pages.extend_from_slice(&page);
page_data_opt =
wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?;
tracing::info!("page_data_opt: {}", page_data_opt.is_some());
}
Ok(pages)
}
/// Pull updates from remote to the separate file
pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
frames_file: &Arc<dyn turso_core::File>,
mut generation: u64,
mut start_frame: u64,
@@ -339,7 +462,9 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
};
loop {
while let Some(chunk) = data.poll_data()? {
client.network_stats.read(chunk.data().len());
let mut chunk = chunk.data();
while !chunk.is_empty() {
let to_fill = (WAL_FRAME_SIZE - buffer_len).min(chunk.len());
buffer.as_mut_slice()[buffer_len..buffer_len + to_fill]
@@ -425,7 +550,7 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
/// and can be called multiple times with same frame range
pub async fn wal_push<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
wal_session: &mut WalSession,
baton: Option<String>,
generation: u64,
@@ -630,7 +755,7 @@ pub async fn read_last_change_id<Ctx>(
pub async fn fetch_last_change_id<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
source_conn: &Arc<turso_core::Connection>,
client_id: &str,
) -> Result<(i64, Option<i64>)> {
@@ -710,7 +835,7 @@ pub async fn fetch_last_change_id<C: ProtocolIO, Ctx>(
pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
source: &DatabaseTape,
client_id: &str,
opts: &DatabaseSyncEngineOpts,
@@ -929,9 +1054,9 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
Ok((source_pull_gen, last_change_id.unwrap_or(0)))
}
pub async fn apply_transformation<Ctx, P: ProtocolIO>(
pub async fn apply_transformation<Ctx, C: ProtocolIO>(
coro: &Coro<Ctx>,
client: &P,
client: &ProtocolIoStats<C>,
changes: &Vec<DatabaseTapeRowChange>,
generator: &DatabaseReplayGenerator,
) -> Result<Vec<DatabaseRowTransformResult>> {
@@ -941,7 +1066,7 @@ pub async fn apply_transformation<Ctx, P: ProtocolIO>(
mutations.push(generator.create_mutation(&replay_info, change)?);
}
let completion = client.transform(mutations)?;
let transformed = wait_all_results(coro, &completion).await?;
let transformed = wait_all_results(coro, &completion, None).await?;
if transformed.len() != changes.len() {
return Err(Error::DatabaseSyncEngineError(format!(
"unexpected result from custom transformation: mismatch in shapes: {} != {}",
@@ -1001,39 +1126,81 @@ pub async fn checkpoint_wal_file<Ctx>(
pub async fn bootstrap_db_file<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
io: &Arc<dyn turso_core::IO>,
main_db_path: &str,
protocol: DatabaseSyncEngineProtocolVersion,
partial_bootstrap_strategy: Option<PartialBootstrapStrategy>,
) -> Result<DatabasePullRevision> {
match protocol {
DatabaseSyncEngineProtocolVersion::Legacy => {
if partial_bootstrap_strategy.is_some() {
return Err(Error::DatabaseSyncEngineError(
"can't bootstrap prefix of database with legacy protocol".to_string(),
));
}
bootstrap_db_file_legacy(coro, client, io, main_db_path).await
}
DatabaseSyncEngineProtocolVersion::V1 => {
bootstrap_db_file_v1(coro, client, io, main_db_path).await
bootstrap_db_file_v1(coro, client, io, main_db_path, partial_bootstrap_strategy).await
}
}
}
pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
io: &Arc<dyn turso_core::IO>,
main_db_path: &str,
bootstrap: Option<PartialBootstrapStrategy>,
) -> Result<DatabasePullRevision> {
let mut bytes = BytesMut::new();
let server_pages_selector = if let Some(PartialBootstrapStrategy::Prefix { length }) =
&bootstrap
{
let mut bitmap = RoaringBitmap::new();
bitmap.insert_range(0..(*length / PAGE_SIZE) as u32);
let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size());
bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| {
Error::DatabaseSyncEngineError(format!("unable to serialize bootstrap request: {e}"))
})?;
bitmap_bytes
} else {
Vec::new()
};
let server_query_selector = if let Some(PartialBootstrapStrategy::Query { query }) = bootstrap {
query
} else {
String::new()
};
let request = PullUpdatesReqProtoBody {
encoding: PageUpdatesEncodingReq::Raw as i32,
server_revision: String::new(),
client_revision: String::new(),
long_poll_timeout_ms: 0,
server_pages_selector: server_pages_selector.into(),
server_query_selector,
client_pages: BytesMut::new().into(),
};
let request = request.encode_to_vec();
client.network_stats.write(request.len());
let completion = client.http(
"GET",
"POST",
"/pull-updates",
None,
Some(request),
&[
("content-type", "application/protobuf"),
("accept-encoding", "application/protobuf"),
],
)?;
let Some(header) =
wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(coro, &completion, &mut bytes).await?
let mut bytes = BytesMut::new();
let Some(header) = wait_proto_message::<Ctx, PullUpdatesRespProtoBody>(
coro,
&completion,
&client.network_stats,
&mut bytes,
)
.await?
else {
return Err(Error::DatabaseSyncEngineError(
"no header returned in the pull-updates protobuf call".to_string(),
@@ -1059,8 +1226,13 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(Buffer::new_temporary(PAGE_SIZE));
while let Some(page_data) =
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?
wait_proto_message::<Ctx, PageData>(coro, &completion, &client.network_stats, &mut bytes)
.await?
{
tracing::info!(
"bootstrap_db_file: received page page_id={}",
page_data.page_id
);
let offset = page_data.page_id * PAGE_SIZE as u64;
let page = decode_page(&header, page_data)?;
if page.len() != PAGE_SIZE {
@@ -1110,7 +1282,7 @@ fn decode_page(header: &PullUpdatesRespProtoBody, page_data: PageData) -> Result
pub async fn bootstrap_db_file_legacy<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
io: &Arc<dyn turso_core::IO>,
main_db_path: &str,
) -> Result<DatabasePullRevision> {
@@ -1169,17 +1341,20 @@ pub async fn reset_wal_file<Ctx>(
async fn sql_execute_http<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
request: server_proto::PipelineReqBody,
) -> Result<Vec<StmtResult>> {
let body = serde_json::to_vec(&request)?;
client.network_stats.write(body.len());
let completion = client.http("POST", "/v2/pipeline", Some(body), &[])?;
let status = wait_status(coro, &completion).await?;
if status != http::StatusCode::OK {
let error = format!("sql_execute_http: unexpected status code: {status}");
return Err(Error::DatabaseSyncEngineError(error));
}
let response = wait_all_results(coro, &completion).await?;
let response = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
let response: server_proto::PipelineRespBody = serde_json::from_slice(&response)?;
tracing::debug!("hrana response: {:?}", response);
let mut results = Vec::new();
@@ -1217,7 +1392,7 @@ async fn sql_execute_http<C: ProtocolIO, Ctx>(
async fn wal_pull_http<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
generation: u64,
start_frame: u64,
end_frame: u64,
@@ -1230,7 +1405,7 @@ async fn wal_pull_http<C: ProtocolIO, Ctx>(
)?;
let status = wait_status(coro, &completion).await?;
if status == http::StatusCode::BAD_REQUEST {
let status_body = wait_all_results(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
let status: DbSyncStatus = serde_json::from_slice(&status_body)?;
if status.status == "checkpoint_needed" {
return Ok(WalHttpPullResult::NeedCheckpoint(status));
@@ -1248,7 +1423,7 @@ async fn wal_pull_http<C: ProtocolIO, Ctx>(
async fn wal_push_http<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
baton: Option<String>,
generation: u64,
start_frame: u64,
@@ -1258,6 +1433,8 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
let baton = baton
.map(|baton| format!("/{baton}"))
.unwrap_or("".to_string());
client.network_stats.write(frames.len());
let completion = client.http(
"POST",
&format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"),
@@ -1265,7 +1442,7 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
&[],
)?;
let status = wait_status(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
if status != http::StatusCode::OK {
let error = std::str::from_utf8(&status_body).ok().unwrap_or("");
return Err(Error::DatabaseSyncEngineError(format!(
@@ -1275,10 +1452,13 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
Ok(serde_json::from_slice(&status_body)?)
}
async fn db_info_http<C: ProtocolIO, Ctx>(coro: &Coro<Ctx>, client: &C) -> Result<DbSyncInfo> {
async fn db_info_http<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &ProtocolIoStats<C>,
) -> Result<DbSyncInfo> {
let completion = client.http("GET", "/info", None, &[])?;
let status = wait_status(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?;
if status != http::StatusCode::OK {
return Err(Error::DatabaseSyncEngineError(format!(
"db_info go unexpected status: {status}"
@@ -1289,7 +1469,7 @@ async fn db_info_http<C: ProtocolIO, Ctx>(coro: &Coro<Ctx>, client: &C) -> Resul
async fn db_bootstrap_http<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
client: &ProtocolIoStats<C>,
generation: u64,
) -> Result<C::DataCompletionBytes> {
let completion = client.http("GET", &format!("/export/{generation}"), None, &[])?;
@@ -1335,6 +1515,7 @@ pub fn read_varint(buf: &[u8]) -> Result<Option<(usize, usize)>> {
pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
coro: &Coro<Ctx>,
completion: &impl DataCompletion<u8>,
network_stats: &DataStats,
bytes: &mut BytesMut,
) -> Result<Option<T>> {
let start_time = std::time::Instant::now();
@@ -1346,6 +1527,7 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
};
if not_enough_bytes {
if let Some(poll) = completion.poll_data()? {
network_stats.read(poll.data().len());
bytes.extend_from_slice(poll.data());
} else if !completion.is_done()? {
coro.yield_(ProtocolCommand::IO).await?;
@@ -1374,10 +1556,12 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
pub async fn wait_all_results<Ctx, T: Clone>(
coro: &Coro<Ctx>,
completion: &impl DataCompletion<T>,
stats: Option<&DataStats>,
) -> Result<Vec<T>> {
let mut results = Vec::new();
loop {
while let Some(poll) = completion.poll_data()? {
stats.inspect(|s| s.read(poll.data().len()));
results.extend_from_slice(poll.data());
}
if completion.is_done()? {
@@ -1396,6 +1580,7 @@ mod tests {
use prost::Message;
use crate::{
database_sync_engine::DataStats,
database_sync_operations::wait_proto_message,
protocol_io::{DataCompletion, DataPollResult},
server_proto::PageData,
@@ -1416,6 +1601,8 @@ mod tests {
chunk: usize,
}
unsafe impl Sync for TestCompletion {}
impl DataCompletion<u8> for TestCompletion {
type DataPollResult = TestPollResult;
fn status(&self) -> crate::Result<Option<u16>> {
@@ -1457,9 +1644,15 @@ mod tests {
let coro: Coro<()> = coro.into();
let mut bytes = BytesMut::new();
let mut count = 0;
while wait_proto_message::<(), PageData>(&coro, &completion, &mut bytes)
.await?
.is_some()
let network_stats = DataStats::new();
while wait_proto_message::<(), PageData>(
&coro,
&completion,
&network_stats,
&mut bytes,
)
.await?
.is_some()
{
assert!(bytes.capacity() <= 16 * 1024 + 1024);
count += 1;

View File

@@ -1,5 +1,6 @@
pub mod database_replay_generator;
pub mod database_sync_engine;
pub mod database_sync_lazy_storage;
pub mod database_sync_operations;
pub mod database_tape;
pub mod errors;
@@ -9,6 +10,9 @@ pub mod server_proto;
pub mod types;
pub mod wal_session;
#[cfg(target_os = "linux")]
pub mod sparse_io;
pub type Result<T> = std::result::Result<T, errors::Error>;
#[cfg(test)]

View File

@@ -3,18 +3,18 @@ use crate::{
Result,
};
pub trait DataPollResult<T> {
pub trait DataPollResult<T>: Send + Sync + 'static {
fn data(&self) -> &[T];
}
pub trait DataCompletion<T> {
pub trait DataCompletion<T>: Send + Sync + 'static {
type DataPollResult: DataPollResult<T>;
fn status(&self) -> Result<Option<u16>>;
fn poll_data(&self) -> Result<Option<Self::DataPollResult>>;
fn is_done(&self) -> Result<bool>;
}
pub trait ProtocolIO {
pub trait ProtocolIO: Send + Sync + 'static {
type DataCompletionBytes: DataCompletion<u8>;
type DataCompletionTransform: DataCompletion<DatabaseRowTransformResult>;
fn full_read(&self, path: &str) -> Result<Self::DataCompletionBytes>;
@@ -30,4 +30,6 @@ pub trait ProtocolIO {
body: Option<Vec<u8>>,
headers: &[(&str, &str)],
) -> Result<Self::DataCompletionBytes>;
fn add_work(&self, callback: Box<dyn FnMut() -> bool + Send>);
fn step_work(&self);
}

View File

@@ -23,7 +23,9 @@ pub struct PullUpdatesReqProtoBody {
#[prost(uint32, tag = "4")]
pub long_poll_timeout_ms: u32,
#[prost(bytes, tag = "5")]
pub server_pages: Bytes,
pub server_pages_selector: Bytes,
#[prost(string, tag = "7")]
pub server_query_selector: String,
#[prost(bytes, tag = "6")]
pub client_pages: Bytes,
}

View File

@@ -0,0 +1,205 @@
use std::{
os::{fd::AsRawFd, unix::fs::FileExt},
sync::{Arc, RwLock},
};
use tracing::{instrument, Level};
use turso_core::{
io::clock::DefaultClock, Buffer, Clock, Completion, File, Instant, OpenFlags, Result, IO,
};
pub struct SparseLinuxIo {}
impl SparseLinuxIo {
pub fn new() -> Result<Self> {
Ok(Self {})
}
}
impl IO for SparseLinuxIo {
#[instrument(err, skip_all, level = Level::TRACE)]
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
let mut file = std::fs::File::options();
file.read(true);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path)?;
Ok(Arc::new(SparseLinuxFile {
file: RwLock::new(file),
}))
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn remove_file(&self, path: &str) -> Result<()> {
Ok(std::fs::remove_file(path)?)
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn step(&self) -> Result<()> {
Ok(())
}
}
impl Clock for SparseLinuxIo {
fn now(&self) -> Instant {
DefaultClock.now()
}
}
pub struct SparseLinuxFile {
file: RwLock<std::fs::File>,
}
#[allow(clippy::readonly_write_lock)]
impl File for SparseLinuxFile {
#[instrument(err, skip_all, level = Level::TRACE)]
fn lock_file(&self, _exclusive: bool) -> Result<()> {
Ok(())
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn unlock_file(&self) -> Result<()> {
Ok(())
}
#[instrument(skip(self, c), level = Level::TRACE)]
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let file = self.file.read().unwrap();
let nr = {
let r = c.as_read();
let buf = r.buf();
let buf = buf.as_mut_slice();
file.read_exact_at(buf, pos)?;
buf.len() as i32
};
c.complete(nr);
Ok(c)
}
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
let file = self.file.write().unwrap();
let buf = buffer.as_slice();
file.write_all_at(buf, pos)?;
c.complete(buffer.len() as i32);
Ok(c)
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn sync(&self, c: Completion) -> Result<Completion> {
let file = self.file.write().unwrap();
file.sync_all()?;
c.complete(0);
Ok(c)
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let file = self.file.write().unwrap();
file.set_len(len)?;
c.complete(0);
Ok(c)
}
fn size(&self) -> Result<u64> {
let file = self.file.read().unwrap();
Ok(file.metadata()?.len())
}
fn has_hole(&self, pos: usize, len: usize) -> turso_core::Result<bool> {
let file = self.file.read().unwrap();
// SEEK_DATA: Adjust the file offset to the next location in the file
// greater than or equal to offset containing data. If offset
// points to data, then the file offset is set to offset
// (see https://man7.org/linux/man-pages/man2/lseek.2.html#DESCRIPTION)
let res = unsafe { libc::lseek(file.as_raw_fd(), pos as i64, libc::SEEK_DATA) };
if res == -1 {
let errno = unsafe { *libc::__errno_location() };
if errno == libc::ENXIO {
// ENXIO: whence is SEEK_DATA or SEEK_HOLE, and offset is beyond the
// end of the file, or whence is SEEK_DATA and offset is
// within a hole at the end of the file.
// (see https://man7.org/linux/man-pages/man2/lseek.2.html#ERRORS)
return Ok(true);
} else {
return Err(turso_core::LimboError::CompletionError(
turso_core::CompletionError::IOError(
std::io::Error::from_raw_os_error(errno).kind(),
),
));
}
}
// lseek succeeded - the hole is here if next data is strictly before pos + len - 1 (the last byte of the checked region
Ok(res as usize >= pos + len)
}
fn punch_hole(&self, pos: usize, len: usize) -> turso_core::Result<()> {
let file = self.file.write().unwrap();
let res = unsafe {
libc::fallocate(
file.as_raw_fd(),
libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
pos as i64,
len as i64,
)
};
if res == -1 {
let errno = unsafe { *libc::__errno_location() };
Err(turso_core::LimboError::CompletionError(
turso_core::CompletionError::IOError(
std::io::Error::from_raw_os_error(errno).kind(),
),
))
} else {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use turso_core::{Buffer, Completion, OpenFlags, IO};
use crate::sparse_io::SparseLinuxIo;
#[test]
pub fn sparse_io_test() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let tmp_path = tmp.into_temp_path();
let tmp_path = tmp_path.as_os_str().to_str().unwrap();
let io = SparseLinuxIo::new().unwrap();
let file = io.open_file(tmp_path, OpenFlags::default(), false).unwrap();
let _ = file
.truncate(1024 * 1024, Completion::new_trunc(|_| {}))
.unwrap();
assert!(file.has_hole(0, 4096).unwrap());
let buffer = Arc::new(Buffer::new_temporary(4096));
buffer.as_mut_slice().fill(1);
let _ = file
.pwrite(0, buffer.clone(), Completion::new_write(|_| {}))
.unwrap();
assert!(!file.has_hole(0, 4096).unwrap());
assert!(file.has_hole(4096, 4096).unwrap());
assert!(file.has_hole(4096 * 2, 4096).unwrap());
let _ = file
.pwrite(4096 * 2, buffer.clone(), Completion::new_write(|_| {}))
.unwrap();
assert!(file.has_hole(4096, 4096).unwrap());
assert!(!file.has_hole(4096 * 2, 4096).unwrap());
assert!(!file.has_hole(4096, 4097).unwrap());
file.punch_hole(2 * 4096, 4096).unwrap();
assert!(file.has_hole(4096 * 2, 4096).unwrap());
assert!(file.has_hole(4096, 4097).unwrap());
}
}

View File

@@ -1,24 +1,27 @@
use std::{cell::RefCell, collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use serde::{Deserialize, Serialize};
use crate::{database_sync_operations::MutexSlot, errors::Error, Result};
pub struct Coro<Ctx> {
pub ctx: RefCell<Ctx>,
pub ctx: Mutex<Ctx>,
gen: genawaiter::sync::Co<ProtocolCommand, Result<Ctx>>,
}
impl<Ctx> Coro<Ctx> {
pub fn new(ctx: Ctx, gen: genawaiter::sync::Co<ProtocolCommand, Result<Ctx>>) -> Self {
Self {
ctx: RefCell::new(ctx),
ctx: Mutex::new(ctx),
gen,
}
}
pub async fn yield_(&self, value: ProtocolCommand) -> Result<()> {
let ctx = self.gen.yield_(value).await?;
self.ctx.replace(ctx);
*self.ctx.lock().unwrap() = ctx;
Ok(())
}
}
@@ -27,7 +30,7 @@ impl From<genawaiter::sync::Co<ProtocolCommand, Result<()>>> for Coro<()> {
fn from(value: genawaiter::sync::Co<ProtocolCommand, Result<()>>) -> Self {
Self {
gen: value,
ctx: RefCell::new(()),
ctx: Mutex::new(()),
}
}
}
@@ -68,6 +71,8 @@ pub struct SyncEngineStats {
pub last_pull_unix_time: Option<i64>,
pub last_push_unix_time: Option<i64>,
pub revision: Option<String>,
pub network_sent_bytes: usize,
pub network_received_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -95,6 +100,7 @@ pub struct DatabaseMetadata {
pub last_push_unix_time: Option<i64>,
pub last_pushed_pull_gen_hint: i64,
pub last_pushed_change_id_hint: i64,
pub partial_bootstrap_server_revision: Option<DatabasePullRevision>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]