diff --git a/bindings/javascript/sync/src/lib.rs b/bindings/javascript/sync/src/lib.rs index 0322f70a6..378bf3bc9 100644 --- a/bindings/javascript/sync/src/lib.rs +++ b/bindings/javascript/sync/src/lib.rs @@ -180,7 +180,7 @@ impl SyncEngine { let io: Arc = if is_memory { Arc::new(turso_core::MemoryIO::new()) } else { - #[cfg(not(feature = "browser"))] + #[cfg(all(target_os = "linux", not(feature = "browser")))] { if opts.partial_boostrap_strategy.is_none() { Arc::new(turso_core::PlatformIO::new().map_err(|e| { @@ -200,6 +200,15 @@ impl SyncEngine { })?) } } + #[cfg(all(not(target_os = "linux"), not(feature = "browser")))] + { + Arc::new(turso_core::PlatformIO::new().map_err(|e| { + napi::Error::new( + napi::Status::GenericFailure, + format!("Failed to create platform IO: {e}"), + ) + })?) + } #[cfg(feature = "browser")] { turso_node::browser::opfs() diff --git a/sync/engine/src/database_sync_lazy_storage.rs b/sync/engine/src/database_sync_lazy_storage.rs new file mode 100644 index 000000000..ecd3fc0ad --- /dev/null +++ b/sync/engine/src/database_sync_lazy_storage.rs @@ -0,0 +1,249 @@ +use std::sync::Arc; + +use turso_core::{ + turso_assert, Buffer, Completion, CompletionError, DatabaseStorage, File, LimboError, +}; + +use crate::{ + database_sync_operations::{pull_pages_v1, ProtocolIoStats, PAGE_SIZE}, + errors, + protocol_io::ProtocolIO, + types::Coro, +}; + +pub struct LazyDatabaseStorage { + clean_file: Arc, + dirty_file: Option>, + protocol: ProtocolIoStats

, + server_revision: String, +} + +impl LazyDatabaseStorage

{ + pub fn new( + clean_file: Arc, + dirty_file: Option>, + protocol: ProtocolIoStats

, + server_revision: String, + ) -> Self { + Self { + clean_file, + dirty_file, + protocol, + server_revision, + } + } +} + +impl DatabaseStorage for LazyDatabaseStorage

{ + fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { + assert!( + !self.clean_file.has_hole(0, PAGE_SIZE)?, + "first page must be filled" + ); + self.clean_file.pread(0, c) + } + + fn read_page( + &self, + page_idx: usize, + io_ctx: &turso_core::IOContext, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + io_ctx.encryption_context().is_none(), + "encryption or checksum are not supported with partial sync" + ); + assert!(page_idx as i64 >= 0, "page should be positive"); + let r = c.as_read(); + let size = r.buf().len(); + assert!(page_idx > 0); + if !(512..=65536).contains(&size) || size & (size - 1) != 0 { + return Err(LimboError::NotADB); + } + let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else { + return Err(LimboError::IntegerOverflow); + }; + + if !self.clean_file.has_hole(pos as usize, size)? { + let Some(dirty_file) = &self.dirty_file else { + // no dirty file was set - this means that FS is atomic (e.g. MemoryIO) + return self.clean_file.pread(pos, c); + }; + if dirty_file.has_hole(pos as usize, size)? { + // dirty file has no hole - this means that we cleanly removed the hole when we wrote to the clean file + return self.clean_file.pread(pos, c); + } + let check_buffer = Arc::new(Buffer::new_temporary(size)); + let check_c = + dirty_file.pread(pos, Completion::new_read(check_buffer.clone(), |_| {}))?; + turso_assert!( + check_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + + let clean_buffer = r.buf_arc(); + let clean_c = self + .clean_file + .pread(pos, Completion::new_read(clean_buffer.clone(), |_| {}))?; + turso_assert!( + clean_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + + if check_buffer.as_slice().eq(clean_buffer.as_slice()) { + // dirty buffer matches clean buffer - this means that clean data is valid + return self.clean_file.pread(pos, c); + } + } + tracing::info!( + "PartialDatabaseStorage::read_page(page_idx={}): read page from the remote server", + page_idx + ); + let mut generator = genawaiter::sync::Gen::new({ + let protocol = self.protocol.clone(); + let server_revision = self.server_revision.clone(); + let clean_file = self.clean_file.clone(); + let dirty_file = self.dirty_file.clone(); + let c = c.clone(); + |coro| async move { + let coro = Coro::new((), coro); + let pages = [(page_idx - 1) as u32]; + let result = pull_pages_v1(&coro, &protocol, &server_revision, &pages).await; + match result { + Ok(page) => { + let read = c.as_read(); + let buf = read.buf_arc(); + buf.as_mut_slice().copy_from_slice(&page); + + if let Some(dirty_file) = &dirty_file { + let dirty_c = dirty_file.pwrite( + pos, + buf.clone(), + Completion::new_write(|_| {}), + )?; + turso_assert!( + dirty_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + } + + let clean_c = + clean_file.pwrite(pos, buf.clone(), Completion::new_write(|_| {}))?; + turso_assert!( + clean_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + + if let Some(dirty_file) = &dirty_file { + dirty_file.punch_hole(pos as usize, buf.len())?; + } + + c.complete(buf.len() as i32); + Ok::<(), errors::Error>(()) + } + Err(err) => { + tracing::error!("failed to fetch path from remote server: {err}"); + c.error(CompletionError::IOError(std::io::ErrorKind::Other)); + Err(err) + } + } + } + }); + self.protocol + .add_work(Box::new(move || match generator.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(_) => false, + genawaiter::GeneratorState::Complete(_) => true, + })); + Ok(c) + } + + fn write_page( + &self, + page_idx: usize, + buffer: std::sync::Arc, + io_ctx: &turso_core::IOContext, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + io_ctx.encryption_context().is_none(), + "encryption or checksum are not supported with partial sync" + ); + + let buffer_size = buffer.len(); + assert!(page_idx > 0); + assert!(buffer_size >= 512); + assert!(buffer_size <= 65536); + assert_eq!(buffer_size & (buffer_size - 1), 0); + let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else { + return Err(LimboError::IntegerOverflow); + }; + + // we write to the database only during checkpoint - so we need to punch hole in the dirty file in order to mark this region as valid + if let Some(dirty_file) = &self.dirty_file { + dirty_file.punch_hole(pos as usize, buffer_size)?; + } + self.clean_file.pwrite(pos, buffer, c) + } + + fn write_pages( + &self, + first_page_idx: usize, + page_size: usize, + buffers: Vec>, + io_ctx: &turso_core::IOContext, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + io_ctx.encryption_context().is_none(), + "encryption or checksum are not supported with partial sync" + ); + + assert!(first_page_idx > 0); + assert!(page_size >= 512); + assert!(page_size <= 65536); + assert_eq!(page_size & (page_size - 1), 0); + + let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else { + return Err(LimboError::IntegerOverflow); + }; + // we write to the database only during checkpoint - so we need to punch hole in the dirty file in order to mark this region as valid + if let Some(dirty_file) = &self.dirty_file { + let buffers_size = buffers.iter().map(|b| b.len()).sum(); + dirty_file.punch_hole(pos as usize, buffers_size)?; + } + let c = self.clean_file.pwritev(pos, buffers, c)?; + Ok(c) + } + + fn sync(&self, c: turso_core::Completion) -> turso_core::Result { + if let Some(dirty_file) = &self.dirty_file { + let dirty_c = dirty_file.sync(Completion::new_sync(|_| {}))?; + turso_assert!( + dirty_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + } + + self.clean_file.sync(c) + } + + fn size(&self) -> turso_core::Result { + self.clean_file.size() + } + + fn truncate( + &self, + len: usize, + c: turso_core::Completion, + ) -> turso_core::Result { + if let Some(dirty_file) = &self.dirty_file { + let dirty_c = dirty_file.truncate(len as u64, Completion::new_trunc(|_| {}))?; + turso_assert!( + dirty_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + } + + self.clean_file.truncate(len as u64, c) + } +}