mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-17 07:04:20 +01:00
wip
This commit is contained in:
@@ -10,13 +10,13 @@ use turso_core::{DatabaseStorage, OpenFlags};
|
||||
|
||||
use crate::{
|
||||
database_replay_generator::DatabaseReplayGenerator,
|
||||
database_sync_lazy_storage::LazyDatabaseStorage,
|
||||
database_sync_operations::{
|
||||
acquire_slot, apply_transformation, bootstrap_db_file, connect_untracked,
|
||||
count_local_changes, has_table, push_logical_changes, read_last_change_id, read_wal_salt,
|
||||
reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file,
|
||||
wal_pull_to_file, ProtocolIoStats, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE,
|
||||
},
|
||||
database_sync_partial_storage::PartialDatabaseStorage,
|
||||
database_tape::{
|
||||
DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySession,
|
||||
DatabaseReplaySessionOpts, DatabaseTape, DatabaseTapeOpts, DatabaseWalSession,
|
||||
@@ -34,6 +34,12 @@ use crate::{
|
||||
Result,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum PartialBootstrapStrategy {
|
||||
Prefix { length: usize },
|
||||
Query { query: String },
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DatabaseSyncEngineOpts {
|
||||
pub client_name: String,
|
||||
@@ -44,7 +50,7 @@ pub struct DatabaseSyncEngineOpts {
|
||||
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
|
||||
pub bootstrap_if_empty: bool,
|
||||
pub reserved_bytes: usize,
|
||||
pub partial: bool,
|
||||
pub partial_bootstrap_strategy: Option<PartialBootstrapStrategy>,
|
||||
}
|
||||
|
||||
pub struct DataStats {
|
||||
@@ -99,7 +105,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
io: Arc<dyn turso_core::IO>,
|
||||
protocol: Arc<P>,
|
||||
main_db_path: &str,
|
||||
opts: DatabaseSyncEngineOpts,
|
||||
mut opts: DatabaseSyncEngineOpts,
|
||||
) -> Result<Self> {
|
||||
let main_db_wal_path = format!("{main_db_path}-wal");
|
||||
let revert_db_wal_path = format!("{main_db_path}-wal-revert");
|
||||
@@ -116,23 +122,20 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
Some(DatabaseMetadata::load(&data)?)
|
||||
};
|
||||
let protocol = ProtocolIoStats::new(protocol);
|
||||
let partial_bootstrap_strategy = opts.partial_bootstrap_strategy.take();
|
||||
let partial = partial_bootstrap_strategy.is_some();
|
||||
|
||||
let meta = match meta {
|
||||
Some(meta) => meta,
|
||||
None if opts.bootstrap_if_empty => {
|
||||
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
|
||||
let prefix = if opts.partial {
|
||||
Some(128 * PAGE_SIZE)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let revision = bootstrap_db_file(
|
||||
coro,
|
||||
&protocol,
|
||||
&io,
|
||||
main_db_path,
|
||||
opts.protocol_version_hint,
|
||||
prefix,
|
||||
partial_bootstrap_strategy,
|
||||
)
|
||||
.await?;
|
||||
let meta = DatabaseMetadata {
|
||||
@@ -145,7 +148,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
last_pushed_pull_gen_hint: 0,
|
||||
last_pull_unix_time: Some(io.now().secs),
|
||||
last_push_unix_time: None,
|
||||
partial_bootstrap_server_revision: if opts.partial {
|
||||
partial_bootstrap_server_revision: if partial {
|
||||
Some(revision.clone())
|
||||
} else {
|
||||
None
|
||||
@@ -163,7 +166,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
"deferred bootstrap is not supported for legacy protocol".to_string(),
|
||||
));
|
||||
}
|
||||
if opts.partial {
|
||||
if partial {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
"deferred bootstrap is not supported for partial sync".to_string(),
|
||||
));
|
||||
@@ -205,9 +208,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
}
|
||||
|
||||
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
|
||||
let mut db_file: Arc<dyn DatabaseStorage> =
|
||||
Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
|
||||
if opts.partial {
|
||||
let db_file: Arc<dyn DatabaseStorage> = if partial {
|
||||
let Some(partial_bootstrap_server_revision) = &meta.partial_bootstrap_server_revision
|
||||
else {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
@@ -219,12 +220,15 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
"partial sync is supported only for V1 protocol".to_string(),
|
||||
));
|
||||
};
|
||||
db_file = Arc::new(PartialDatabaseStorage::new(
|
||||
Arc::new(LazyDatabaseStorage::new(
|
||||
db_file,
|
||||
None, // todo(sivukhin): allocate dirty file for FS IO
|
||||
protocol.clone(),
|
||||
revision.to_string(),
|
||||
));
|
||||
}
|
||||
))
|
||||
} else {
|
||||
Arc::new(turso_core::storage::database::DatabaseFile::new(db_file))
|
||||
};
|
||||
|
||||
let main_db = turso_core::Database::open_with_flags(
|
||||
io.clone(),
|
||||
|
||||
@@ -13,7 +13,7 @@ use turso_core::{
|
||||
|
||||
use crate::{
|
||||
database_replay_generator::DatabaseReplayGenerator,
|
||||
database_sync_engine::{DataStats, DatabaseSyncEngineOpts},
|
||||
database_sync_engine::{DataStats, DatabaseSyncEngineOpts, PartialBootstrapStrategy},
|
||||
database_tape::{
|
||||
run_stmt_expect_one_row, run_stmt_ignore_rows, DatabaseChangesIteratorMode,
|
||||
DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
|
||||
@@ -256,7 +256,8 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
server_revision: String::new(),
|
||||
client_revision: revision.to_string(),
|
||||
long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0),
|
||||
server_pages: BytesMut::new().into(),
|
||||
server_pages_selector: BytesMut::new().into(),
|
||||
server_query_selector: String::new(),
|
||||
client_pages: BytesMut::new().into(),
|
||||
};
|
||||
let request = request.encode_to_vec();
|
||||
@@ -366,7 +367,8 @@ pub async fn pull_pages_v1<C: ProtocolIO, Ctx>(
|
||||
server_revision: server_revision.to_string(),
|
||||
client_revision: String::new(),
|
||||
long_poll_timeout_ms: 0,
|
||||
server_pages: bitmap_bytes.into(),
|
||||
server_pages_selector: bitmap_bytes.into(),
|
||||
server_query_selector: String::new(),
|
||||
client_pages: BytesMut::new().into(),
|
||||
};
|
||||
let request = request.encode_to_vec();
|
||||
@@ -1122,11 +1124,11 @@ pub async fn bootstrap_db_file<C: ProtocolIO, Ctx>(
|
||||
io: &Arc<dyn turso_core::IO>,
|
||||
main_db_path: &str,
|
||||
protocol: DatabaseSyncEngineProtocolVersion,
|
||||
prefix: Option<usize>,
|
||||
partial_bootstrap_strategy: Option<PartialBootstrapStrategy>,
|
||||
) -> Result<DatabasePullRevision> {
|
||||
match protocol {
|
||||
DatabaseSyncEngineProtocolVersion::Legacy => {
|
||||
if prefix.is_some() {
|
||||
if partial_bootstrap_strategy.is_some() {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
"can't bootstrap prefix of database with legacy protocol".to_string(),
|
||||
));
|
||||
@@ -1134,7 +1136,7 @@ pub async fn bootstrap_db_file<C: ProtocolIO, Ctx>(
|
||||
bootstrap_db_file_legacy(coro, client, io, main_db_path).await
|
||||
}
|
||||
DatabaseSyncEngineProtocolVersion::V1 => {
|
||||
bootstrap_db_file_v1(coro, client, io, main_db_path, prefix).await
|
||||
bootstrap_db_file_v1(coro, client, io, main_db_path, partial_bootstrap_strategy).await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1144,22 +1146,25 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
client: &ProtocolIoStats<C>,
|
||||
io: &Arc<dyn turso_core::IO>,
|
||||
main_db_path: &str,
|
||||
prefix: Option<usize>,
|
||||
bootstrap: Option<PartialBootstrapStrategy>,
|
||||
) -> Result<DatabasePullRevision> {
|
||||
let bitmap_bytes = {
|
||||
if let Some(prefix) = prefix {
|
||||
let mut bitmap = RoaringBitmap::new();
|
||||
bitmap.insert_range(0..(prefix / PAGE_SIZE) as u32);
|
||||
let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size());
|
||||
bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| {
|
||||
Error::DatabaseSyncEngineError(format!(
|
||||
"unable to serialize bootstrap request: {e}"
|
||||
))
|
||||
})?;
|
||||
bitmap_bytes
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
let server_pages_selector = if let Some(PartialBootstrapStrategy::Prefix { length }) =
|
||||
&bootstrap
|
||||
{
|
||||
let mut bitmap = RoaringBitmap::new();
|
||||
bitmap.insert_range(0..(*length / PAGE_SIZE) as u32);
|
||||
let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size());
|
||||
bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| {
|
||||
Error::DatabaseSyncEngineError(format!("unable to serialize bootstrap request: {e}"))
|
||||
})?;
|
||||
bitmap_bytes
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let server_query_selector = if let Some(PartialBootstrapStrategy::Query { query }) = bootstrap {
|
||||
query
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
let request = PullUpdatesReqProtoBody {
|
||||
@@ -1167,7 +1172,8 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
server_revision: String::new(),
|
||||
client_revision: String::new(),
|
||||
long_poll_timeout_ms: 0,
|
||||
server_pages: bitmap_bytes.into(),
|
||||
server_pages_selector: server_pages_selector.into(),
|
||||
server_query_selector: server_query_selector,
|
||||
client_pages: BytesMut::new().into(),
|
||||
};
|
||||
let request = request.encode_to_vec();
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use turso_core::{Completion, CompletionError, DatabaseStorage};
|
||||
|
||||
use crate::{
|
||||
database_sync_operations::{pull_pages_v1, ProtocolIoStats, PAGE_SIZE},
|
||||
errors,
|
||||
protocol_io::ProtocolIO,
|
||||
types::Coro,
|
||||
};
|
||||
|
||||
pub struct PartialDatabaseStorage<P: ProtocolIO> {
|
||||
base: Arc<dyn DatabaseStorage>,
|
||||
protocol: ProtocolIoStats<P>,
|
||||
server_revision: String,
|
||||
}
|
||||
|
||||
impl<P: ProtocolIO> PartialDatabaseStorage<P> {
|
||||
pub fn new(
|
||||
base: Arc<dyn DatabaseStorage>,
|
||||
protocol: ProtocolIoStats<P>,
|
||||
server_revision: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
base,
|
||||
protocol,
|
||||
server_revision,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: ProtocolIO> DatabaseStorage for PartialDatabaseStorage<P> {
|
||||
fn read_header(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
|
||||
assert!(
|
||||
!self.base.has_hole(0, PAGE_SIZE)?,
|
||||
"first page must be filled"
|
||||
);
|
||||
self.base.read_header(c)
|
||||
}
|
||||
|
||||
fn read_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
io_ctx: &turso_core::IOContext,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
assert!(
|
||||
io_ctx.encryption_context().is_none(),
|
||||
"encryption or checksum are not supported with partial sync"
|
||||
);
|
||||
if !self.base.has_hole((page_idx - 1) * PAGE_SIZE, PAGE_SIZE)? {
|
||||
return self.base.read_page(page_idx, io_ctx, c);
|
||||
}
|
||||
tracing::info!(
|
||||
"PartialDatabaseStorage::read_page(page_idx={}): read page from the remote server",
|
||||
page_idx
|
||||
);
|
||||
let mut generator = genawaiter::sync::Gen::new({
|
||||
let protocol = self.protocol.clone();
|
||||
let server_revision = self.server_revision.clone();
|
||||
let base = self.base.clone();
|
||||
let io_ctx = io_ctx.clone();
|
||||
let c = c.clone();
|
||||
|coro| async move {
|
||||
let coro = Coro::new((), coro);
|
||||
let result =
|
||||
pull_pages_v1(&coro, &protocol, &server_revision, &[(page_idx - 1) as u32])
|
||||
.await;
|
||||
match result {
|
||||
Ok(page) => {
|
||||
let read = c.as_read();
|
||||
let buf = read.buf_arc();
|
||||
buf.as_mut_slice().copy_from_slice(&page);
|
||||
let write = Completion::new_write(move |result| {
|
||||
let Ok(_) = result else {
|
||||
panic!("unexpected write error: {result:?}");
|
||||
};
|
||||
c.complete(page.len() as i32);
|
||||
});
|
||||
let _ = base.write_page(page_idx, buf.clone(), &io_ctx, write)?;
|
||||
Ok::<(), errors::Error>(())
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("faile to fetch path from remote server: {err}");
|
||||
c.error(CompletionError::IOError(std::io::ErrorKind::Other));
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
self.protocol
|
||||
.add_work(Box::new(move || match generator.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(_) => false,
|
||||
genawaiter::GeneratorState::Complete(_) => true,
|
||||
}));
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
buffer: std::sync::Arc<turso_core::Buffer>,
|
||||
io_ctx: &turso_core::IOContext,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
self.base.write_page(page_idx, buffer, io_ctx, c)
|
||||
}
|
||||
|
||||
fn write_pages(
|
||||
&self,
|
||||
first_page_idx: usize,
|
||||
page_size: usize,
|
||||
buffers: Vec<std::sync::Arc<turso_core::Buffer>>,
|
||||
io_ctx: &turso_core::IOContext,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
self.base
|
||||
.write_pages(first_page_idx, page_size, buffers, io_ctx, c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
|
||||
self.base.sync(c)
|
||||
}
|
||||
|
||||
fn size(&self) -> turso_core::Result<u64> {
|
||||
self.base.size()
|
||||
}
|
||||
|
||||
fn truncate(
|
||||
&self,
|
||||
len: usize,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
self.base.truncate(len, c)
|
||||
}
|
||||
|
||||
fn has_hole(&self, _pos: usize, _len: usize) -> turso_core::Result<bool> {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
pub mod database_replay_generator;
|
||||
pub mod database_sync_engine;
|
||||
pub mod database_sync_lazy_storage;
|
||||
pub mod database_sync_operations;
|
||||
pub mod database_sync_partial_storage;
|
||||
pub mod database_tape;
|
||||
pub mod errors;
|
||||
pub mod io_operations;
|
||||
@@ -10,6 +10,9 @@ pub mod server_proto;
|
||||
pub mod types;
|
||||
pub mod wal_session;
|
||||
|
||||
// #[cfg(unix)]
|
||||
pub mod sparse_io;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, errors::Error>;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -23,7 +23,9 @@ pub struct PullUpdatesReqProtoBody {
|
||||
#[prost(uint32, tag = "4")]
|
||||
pub long_poll_timeout_ms: u32,
|
||||
#[prost(bytes, tag = "5")]
|
||||
pub server_pages: Bytes,
|
||||
pub server_pages_selector: Bytes,
|
||||
#[prost(string, tag = "7")]
|
||||
pub server_query_selector: String,
|
||||
#[prost(bytes, tag = "6")]
|
||||
pub client_pages: Bytes,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user