From 2e23230e7951d4e495abc7630172feb327d43eaf Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 4 Aug 2025 16:55:50 +0400 Subject: [PATCH] extend raw WAL API with few more methods - try_wal_watermark_read_page - try to read page from the DB with given WAL watermark value - wal_changed_pages_after - return set of unique pages changed after watermark WAL position --- bindings/rust/src/lib.rs | 31 ++++- core/lib.rs | 38 +++++- core/storage/pager.rs | 83 ++++++++---- core/storage/sqlite3_ondisk.rs | 10 +- core/storage/wal.rs | 39 +++++- core/types.rs | 4 + packages/turso-sync/src/database_inner.rs | 4 +- packages/turso-sync/src/sync_server/test.rs | 4 +- sqlite3/src/lib.rs | 6 +- tests/integration/functions/test_wal_api.rs | 140 +++++++++++++++++++- 10 files changed, 305 insertions(+), 54 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 72b4096d1..ebaa0293c 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -192,6 +192,33 @@ impl Connection { .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}"))) } + #[cfg(feature = "conn_raw_api")] + pub fn try_wal_watermark_read_page( + &self, + page_idx: u32, + page: &mut [u8], + frame_watermark: Option, + ) -> Result { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.try_wal_watermark_read_page(page_idx, page, frame_watermark) + .map_err(|e| { + Error::WalOperationError(format!("try_wal_watermark_read_page failed: {e}")) + }) + } + + #[cfg(feature = "conn_raw_api")] + pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_changed_pages_after(frame_watermark) + .map_err(|e| Error::WalOperationError(format!("wal_changed_pages_after failed: {e}"))) + } + #[cfg(feature = "conn_raw_api")] pub fn wal_insert_begin(&self) -> Result<()> { let conn = self @@ -213,7 +240,7 @@ impl Connection { } #[cfg(feature = "conn_raw_api")] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { + pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result { let conn = self .inner .lock() @@ -223,7 +250,7 @@ impl Connection { } #[cfg(feature = "conn_raw_api")] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { + pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result { let conn = self .inner .lock() diff --git a/core/lib.rs b/core/lib.rs index da7c92b9a..dc9dea9a7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1141,22 +1141,54 @@ impl Connection { Ok(()) } + #[cfg(all(feature = "fs", feature = "conn_raw_api"))] + pub fn try_wal_watermark_read_page( + &self, + page_idx: u32, + page: &mut [u8], + frame_watermark: Option, + ) -> Result { + let pager = self.pager.borrow(); + let (page_ref, c) = pager.read_page_no_cache(page_idx as usize, frame_watermark, true)?; + pager.io.wait_for_completion(c)?; + + let content = page_ref.get_contents(); + // empty read - attempt to read absent page + if content.buffer.borrow().is_empty() { + return Ok(false); + } + page.copy_from_slice(content.as_ptr()); + Ok(true) + } + + #[cfg(all(feature = "fs", feature = "conn_raw_api"))] + pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result> { + self.pager.borrow().wal_changed_pages_after(frame_watermark) + } + #[cfg(all(feature = "fs", feature = "conn_raw_api"))] pub fn wal_frame_count(&self) -> Result { self.pager.borrow().wal_frame_count() } #[cfg(all(feature = "fs", feature = "conn_raw_api"))] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { + pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result { + use crate::storage::sqlite3_ondisk::parse_wal_frame_header; + let c = self.pager.borrow().wal_get_frame(frame_no, frame)?; - self._db.io.wait_for_completion(c) + self._db.io.wait_for_completion(c)?; + let (header, _) = parse_wal_frame_header(frame); + Ok(WalFrameInfo { + page_no: header.page_number, + db_size: header.db_size, + }) } /// Insert `frame` (header included) at the position `frame_no` in the WAL /// If WAL already has frame at that position - turso-db will compare content of the page and either report conflict or return OK /// If attempt to write frame at the position `frame_no` will create gap in the WAL - method will return error #[cfg(all(feature = "fs", feature = "conn_raw_api"))] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { + pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result { self.pager.borrow().wal_insert_frame(frame_no, frame) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 759ab2950..7fb5dca0a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -929,6 +929,43 @@ impl Pager { Ok(()) } + /// Reads a page from disk bypassing page-cache + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn read_page_no_cache( + &self, + page_idx: usize, + frame_watermark: Option, + allow_empty_read: bool, + ) -> Result<(PageRef, Completion)> { + tracing::trace!("read_page_no_cache(page_idx = {})", page_idx); + let page = Arc::new(Page::new(page_idx)); + page.set_locked(); + + let Some(wal) = self.wal.as_ref() else { + turso_assert!( + matches!(frame_watermark, Some(0) | None), + "frame_watermark must be either None or Some(0) because DB has no WAL and read with other watermark is invalid" + ); + let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read)?; + return Ok((page, c)); + }; + + if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64, frame_watermark)? { + let c = wal + .borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + { + page.set_uptodate(); + } + // TODO(pere) should probably first insert to page cache, and if successful, + // read frame or page + return Ok((page, c)); + } + + let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read)?; + Ok((page, c)) + } + /// Reads a page from the database. #[tracing::instrument(skip_all, level = Level::DEBUG)] pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> { @@ -940,39 +977,23 @@ impl Pager { // Dummy completion being passed, as we do not need to read from database or wal return Ok((page.clone(), Completion::new_write(|_| {}))); } - let page = Arc::new(Page::new(page_idx)); - page.set_locked(); - - let Some(wal) = self.wal.as_ref() else { - let c = self.begin_read_disk_page(page_idx, page.clone())?; - self.cache_insert(page_idx, page.clone(), &mut page_cache)?; - return Ok((page, c)); - }; - - if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { - let c = wal - .borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); - } - // TODO(pere) should probably first insert to page cache, and if successful, - // read frame or page - self.cache_insert(page_idx, page.clone(), &mut page_cache)?; - return Ok((page, c)); - } - - let c = self.begin_read_disk_page(page_idx, page.clone())?; + let (page, c) = self.read_page_no_cache(page_idx, None, false)?; self.cache_insert(page_idx, page.clone(), &mut page_cache)?; Ok((page, c)) } - fn begin_read_disk_page(&self, page_idx: usize, page: PageRef) -> Result { + fn begin_read_disk_page( + &self, + page_idx: usize, + page: PageRef, + allow_empty_read: bool, + ) -> Result { sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), page, page_idx, + allow_empty_read, ) } @@ -1279,18 +1300,24 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result { + pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result> { + let wal = self.wal.as_ref().unwrap().borrow(); + wal.changed_pages_after(frame_watermark) + } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( "wal_get_frame() called on database without WAL".to_string(), )); }; let wal = wal.borrow(); - wal.read_frame_raw(frame_no.into(), frame) + wal.read_frame_raw(frame_no, frame) } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { + pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( "wal_insert_frame() called on database without WAL".to_string(), @@ -1300,7 +1327,7 @@ impl Pager { let (header, raw_page) = parse_wal_frame_header(frame); wal.write_frame_raw( self.buffer_pool.clone(), - frame_no as u64, + frame_no, header.page_number as u64, header.db_size as u64, raw_page, diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 2139fa580..b1bd4484c 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -777,12 +777,15 @@ impl PageContent { } } +/// Send read request for DB page read to the IO +/// if allow_empty_read is set, than empty read will be raise error for the page, but will not panic #[instrument(skip_all, level = Level::DEBUG)] pub fn begin_read_page( db_file: Arc, buffer_pool: Arc, page: PageRef, page_idx: usize, + allow_empty_read: bool, ) -> Result { tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx); let buf = buffer_pool.get(); @@ -792,13 +795,16 @@ pub fn begin_read_page( }); #[allow(clippy::arc_with_non_send_sync)] let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn))); - let complete = Box::new(move |buf: Arc>, bytes_read: i32| { + let complete = Box::new(move |mut buf: Arc>, bytes_read: i32| { let buf_len = buf.borrow().len(); turso_assert!( - bytes_read == buf_len as i32, + (allow_empty_read && bytes_read == 0) || bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); let page = page.clone(); + if bytes_read == 0 { + buf = Arc::new(RefCell::new(Buffer::allocate(0, Rc::new(|_| {})))); + } if finish_read_page(page_idx, buf, page.clone()).is_err() { page.set_error(); } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index bb7c31c4a..e188e241f 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -3,7 +3,7 @@ use std::array; use std::cell::UnsafeCell; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use strum::EnumString; use tracing::{instrument, Level}; @@ -223,7 +223,7 @@ pub trait Wal { fn end_write_tx(&self); /// Find the latest frame containing a page. - fn find_frame(&self, page_id: u64) -> Result>; + fn find_frame(&self, page_id: u64, frame_watermark: Option) -> Result>; /// Read a frame from the WAL. fn read_frame( @@ -276,6 +276,9 @@ pub trait Wal { fn get_min_frame(&self) -> u64; fn rollback(&mut self) -> Result<()>; + /// Return unique set of pages changed **after** frame_watermark position and until current WAL session max_frame_no + fn changed_pages_after(&self, frame_watermark: u64) -> Result>; + #[cfg(debug_assertions)] fn as_any(&self) -> &dyn std::any::Any; } @@ -850,14 +853,22 @@ impl Wal for WalFile { /// Find the latest frame containing a page. #[instrument(skip_all, level = Level::DEBUG)] - fn find_frame(&self, page_id: u64) -> Result> { + fn find_frame(&self, page_id: u64, frame_watermark: Option) -> Result> { + #[cfg(not(feature = "conn_raw_api"))] + turso_assert!( + frame_watermark.is_none(), + "unexpected use of frame_watermark optional argument" + ); + // if we are holding read_lock 0, skip and read right from db file. if self.max_frame_read_lock_index.get() == 0 { return Ok(None); } let shared = self.get_shared(); let frames = shared.frame_cache.lock(); - let range = self.min_frame..=self.max_frame; + let range = frame_watermark + .map(|x| 0..=x) + .unwrap_or(self.min_frame..=self.max_frame); if let Some(list) = frames.get(&page_id) { if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) { return Ok(Some(*f)); @@ -1164,6 +1175,24 @@ impl Wal for WalFile { shared.last_checksum = self.last_checksum; Ok(()) } + + fn changed_pages_after(&self, frame_watermark: u64) -> Result> { + let frame_count = self.get_max_frame(); + let page_size = self.page_size(); + let mut frame = vec![0u8; page_size as usize + WAL_FRAME_HEADER_SIZE]; + let mut seen = HashSet::new(); + let mut pages = Vec::with_capacity((frame_count - frame_watermark) as usize); + for frame_no in frame_watermark + 1..=frame_count { + let c = self.read_frame_raw(frame_no, &mut frame)?; + self.io.wait_for_completion(c)?; + let (header, _) = sqlite3_ondisk::parse_wal_frame_header(&frame); + if seen.insert(header.page_number) { + pages.push(header.page_number); + } + } + Ok(pages) + } + #[cfg(debug_assertions)] fn as_any(&self) -> &dyn std::any::Any { self @@ -2759,7 +2788,7 @@ pub mod test { { let pager = conn1.pager.borrow(); let wal = pager.wal.as_ref().unwrap().borrow(); - let frame = wal.find_frame(5); + let frame = wal.find_frame(5, None); // since we hold readlock0, we should ignore the db file and find_frame should return none assert!(frame.is_ok_and(|f| f.is_none())); } diff --git a/core/types.rs b/core/types.rs index 5c9dbe89e..8a6d65d76 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2599,6 +2599,10 @@ impl WalFrameInfo { pub fn is_commit_frame(&self) -> bool { self.db_size > 0 } + pub fn put_to_frame_header(&self, frame: &mut [u8]) { + frame[0..4].copy_from_slice(&self.page_no.to_be_bytes()); + frame[4..8].copy_from_slice(&self.db_size.to_be_bytes()); + } } #[cfg(test)] diff --git a/packages/turso-sync/src/database_inner.rs b/packages/turso-sync/src/database_inner.rs index e51841b39..f7563dc58 100644 --- a/packages/turso-sync/src/database_inner.rs +++ b/packages/turso-sync/src/database_inner.rs @@ -369,7 +369,7 @@ impl DatabaseInner { if !wal_session.in_txn() { wal_session.begin()?; } - let wal_frame_info = clean_conn.wal_insert_frame(frame_no as u32, &buffer)?; + let wal_frame_info = clean_conn.wal_insert_frame(frame_no as u64, &buffer)?; if wal_frame_info.is_commit_frame() { wal_session.end()?; // transaction boundary reached - it's safe to commit progress @@ -437,7 +437,7 @@ impl DatabaseInner { let mut buffer = [0u8; FRAME_SIZE]; for frame_no in (frame_no + 1)..=clean_frames { - clean_conn.wal_get_frame(frame_no as u32, &mut buffer)?; + clean_conn.wal_get_frame(frame_no as u64, &mut buffer)?; frames.extend_from_slice(&buffer); frames_cnt += 1; } diff --git a/packages/turso-sync/src/sync_server/test.rs b/packages/turso-sync/src/sync_server/test.rs index fed11fc57..6d2b7d130 100644 --- a/packages/turso-sync/src/sync_server/test.rs +++ b/packages/turso-sync/src/sync_server/test.rs @@ -185,7 +185,7 @@ impl SyncServer for TestSyncServer { session.in_txn = true; } let frame = &frames[offset..offset + FRAME_SIZE]; - match session.conn.wal_insert_frame(frame_no as u32, frame) { + match session.conn.wal_insert_frame(frame_no as u64, frame) { Ok(info) => { if info.is_commit_frame() { if session.in_txn { @@ -276,7 +276,7 @@ impl TestSyncServer { let wal_frame_count = conn.wal_frame_count()?; tracing::debug!("conn frames count: {}", wal_frame_count); for frame_no in last_frame..=wal_frame_count as usize { - conn.wal_get_frame(frame_no as u32, &mut frame)?; + conn.wal_get_frame(frame_no as u64, &mut frame)?; tracing::debug!("push local frame {}", frame_no); generation.frames.push(frame.to_vec()); } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 2fa5419b6..1f3ce9110 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1213,8 +1213,8 @@ pub unsafe extern "C" fn libsql_wal_get_frame( let db: &mut sqlite3 = &mut *db; let db = db.inner.lock().unwrap(); let frame = std::slice::from_raw_parts_mut(p_frame, frame_len as usize); - match db.conn.wal_get_frame(frame_no, frame) { - Ok(()) => SQLITE_OK, + match db.conn.wal_get_frame(frame_no as u64, frame) { + Ok(..) => SQLITE_OK, Err(_) => SQLITE_ERROR, } } @@ -1250,7 +1250,7 @@ pub unsafe extern "C" fn libsql_wal_insert_frame( let db: &mut sqlite3 = &mut *db; let db = db.inner.lock().unwrap(); let frame = std::slice::from_raw_parts(p_frame, frame_len as usize); - match db.conn.wal_insert_frame(frame_no, frame) { + match db.conn.wal_insert_frame(frame_no as u64, frame) { Ok(_) => SQLITE_OK, Err(LimboError::Conflict(..)) => { if !p_conflict.is_null() { diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index 2c1e2c509..24118dab9 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -1,6 +1,9 @@ +use std::{collections::HashSet, sync::Arc}; + use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use rusqlite::types::Value; +use turso_core::types::WalFrameInfo; use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase}; @@ -41,7 +44,7 @@ fn test_wal_frame_transfer_no_schema_changes() { assert_eq!(conn1.wal_frame_count().unwrap(), 15); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - let frames_count = conn1.wal_frame_count().unwrap() as u32; + let frames_count = conn1.wal_frame_count().unwrap(); for frame_id in 1..=frames_count { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); @@ -72,7 +75,7 @@ fn test_wal_frame_transfer_various_schema_changes() { let mut frame = [0u8; 24 + 4096]; let mut synced_frame = 0; let mut sync = || { - let last_frame = conn1.wal_frame_count().unwrap() as u32; + let last_frame = conn1.wal_frame_count().unwrap(); conn2.wal_insert_begin().unwrap(); for frame_id in (synced_frame + 1)..=last_frame { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); @@ -137,7 +140,7 @@ fn test_wal_frame_transfer_schema_changes() { let mut frame = [0u8; 24 + 4096]; let mut commits = 0; conn2.wal_insert_begin().unwrap(); - for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { + for frame_id in 1..=conn1.wal_frame_count().unwrap() { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); let info = conn2.wal_insert_frame(frame_id, &frame).unwrap(); if info.is_commit_frame() { @@ -176,7 +179,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); // Intentionally leave out the final commit frame, so the big randomblob is not committed and should not be visible to transactions. - for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) { + for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } @@ -211,7 +214,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() { assert_eq!(conn1.wal_frame_count().unwrap(), 14); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) { + for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } @@ -311,8 +314,8 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_no in (synced_frame + 1)..=next_frame { - conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); - conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap(); + conn1.wal_get_frame(frame_no, &mut frame).unwrap(); + conn2.wal_insert_frame(frame_no, &frame[..]).unwrap(); } conn2.wal_insert_end().unwrap(); for (i, committed) in commit_frames.iter().enumerate() { @@ -332,3 +335,126 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { } } } + +#[test] +fn test_wal_api_changed_pages() { + let db1 = TempDatabase::new_empty(false); + let conn1 = db1.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn1 + .execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y)") + .unwrap(); + assert_eq!( + conn1 + .wal_changed_pages_after(0) + .unwrap() + .into_iter() + .collect::>(), + HashSet::from([1, 2, 3]) + ); + let frames = conn1.wal_frame_count().unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 2)").unwrap(); + conn1.execute("INSERT INTO t VALUES (3, 4)").unwrap(); + assert_eq!( + conn1 + .wal_changed_pages_after(frames) + .unwrap() + .into_iter() + .collect::>(), + HashSet::from([2]) + ); + let frames = conn1.wal_frame_count().unwrap(); + conn1 + .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))") + .unwrap(); + assert_eq!( + conn1 + .wal_changed_pages_after(frames) + .unwrap() + .into_iter() + .collect::>(), + HashSet::from([1, 2, 4, 5]) + ); +} + +fn revert_to(conn: &Arc, frame_watermark: u64) -> turso_core::Result<()> { + let mut frame = [0u8; 4096 + 24]; + let frame_watermark_info = conn.wal_get_frame(frame_watermark, &mut frame)?; + + let changed_pages = conn.wal_changed_pages_after(frame_watermark)?; + + conn.wal_insert_begin()?; + let mut frames = Vec::new(); + for page_id in changed_pages { + let has_page = + conn.try_wal_watermark_read_page(page_id, &mut frame[24..], Some(frame_watermark))?; + if !has_page { + continue; + } + frames.push((page_id, frame.clone())); + } + + let mut frame_no = conn.wal_frame_count().unwrap(); + for (i, (page_id, mut frame)) in frames.iter().enumerate() { + let info = WalFrameInfo { + db_size: if i == frames.len() - 1 { + frame_watermark_info.db_size + } else { + 0 + }, + page_no: *page_id, + }; + info.put_to_frame_header(&mut frame); + frame_no += 1; + conn.wal_insert_frame(frame_no, &frame)?; + } + conn.wal_insert_end()?; + + Ok(()) +} + +#[test] +fn test_wal_api_revert_pages() { + let db1 = TempDatabase::new_empty(false); + let conn1 = db1.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + let watermark1 = conn1.wal_frame_count().unwrap(); + conn1 + .execute("INSERT INTO t VALUES (1, randomblob(10))") + .unwrap(); + let watermark2 = conn1.wal_frame_count().unwrap(); + + conn1 + .execute("INSERT INTO t VALUES (3, randomblob(20))") + .unwrap(); + conn1 + .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))") + .unwrap(); + + assert_eq!( + limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"), + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(3), Value::Integer(20)], + vec![Value::Integer(1024), Value::Integer(4096 * 2)], + ] + ); + + revert_to(&conn1, watermark2).unwrap(); + + assert_eq!( + limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"), + vec![vec![Value::Integer(1), Value::Integer(10)],] + ); + + revert_to(&conn1, watermark1).unwrap(); + + assert_eq!( + limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"), + vec![] as Vec>, + ); +}