extend raw WAL API with few more methods

- try_wal_watermark_read_page - try to read page from the DB with given WAL watermark value
- wal_changed_pages_after - return set of unique pages changed after watermark WAL position
This commit is contained in:
Nikita Sivukhin
2025-08-04 16:55:50 +04:00
parent 83b1e99a61
commit 2e23230e79
10 changed files with 305 additions and 54 deletions

View File

@@ -192,6 +192,33 @@ impl Connection {
.map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
}
#[cfg(feature = "conn_raw_api")]
pub fn try_wal_watermark_read_page(
&self,
page_idx: u32,
page: &mut [u8],
frame_watermark: Option<u64>,
) -> Result<bool> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.try_wal_watermark_read_page(page_idx, page, frame_watermark)
.map_err(|e| {
Error::WalOperationError(format!("try_wal_watermark_read_page failed: {e}"))
})
}
#[cfg(feature = "conn_raw_api")]
pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.wal_changed_pages_after(frame_watermark)
.map_err(|e| Error::WalOperationError(format!("wal_changed_pages_after failed: {e}")))
}
#[cfg(feature = "conn_raw_api")]
pub fn wal_insert_begin(&self) -> Result<()> {
let conn = self
@@ -213,7 +240,7 @@ impl Connection {
}
#[cfg(feature = "conn_raw_api")]
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalFrameInfo> {
pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
let conn = self
.inner
.lock()
@@ -223,7 +250,7 @@ impl Connection {
}
#[cfg(feature = "conn_raw_api")]
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> {
pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<WalFrameInfo> {
let conn = self
.inner
.lock()

View File

@@ -1141,22 +1141,54 @@ impl Connection {
Ok(())
}
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn try_wal_watermark_read_page(
&self,
page_idx: u32,
page: &mut [u8],
frame_watermark: Option<u64>,
) -> Result<bool> {
let pager = self.pager.borrow();
let (page_ref, c) = pager.read_page_no_cache(page_idx as usize, frame_watermark, true)?;
pager.io.wait_for_completion(c)?;
let content = page_ref.get_contents();
// empty read - attempt to read absent page
if content.buffer.borrow().is_empty() {
return Ok(false);
}
page.copy_from_slice(content.as_ptr());
Ok(true)
}
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
self.pager.borrow().wal_changed_pages_after(frame_watermark)
}
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn wal_frame_count(&self) -> Result<u64> {
self.pager.borrow().wal_frame_count()
}
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> {
pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<WalFrameInfo> {
use crate::storage::sqlite3_ondisk::parse_wal_frame_header;
let c = self.pager.borrow().wal_get_frame(frame_no, frame)?;
self._db.io.wait_for_completion(c)
self._db.io.wait_for_completion(c)?;
let (header, _) = parse_wal_frame_header(frame);
Ok(WalFrameInfo {
page_no: header.page_number,
db_size: header.db_size,
})
}
/// Insert `frame` (header included) at the position `frame_no` in the WAL
/// If WAL already has frame at that position - turso-db will compare content of the page and either report conflict or return OK
/// If attempt to write frame at the position `frame_no` will create gap in the WAL - method will return error
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalFrameInfo> {
pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
self.pager.borrow().wal_insert_frame(frame_no, frame)
}

View File

@@ -929,6 +929,43 @@ impl Pager {
Ok(())
}
/// Reads a page from disk bypassing page-cache
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page_no_cache(
&self,
page_idx: usize,
frame_watermark: Option<u64>,
allow_empty_read: bool,
) -> Result<(PageRef, Completion)> {
tracing::trace!("read_page_no_cache(page_idx = {})", page_idx);
let page = Arc::new(Page::new(page_idx));
page.set_locked();
let Some(wal) = self.wal.as_ref() else {
turso_assert!(
matches!(frame_watermark, Some(0) | None),
"frame_watermark must be either None or Some(0) because DB has no WAL and read with other watermark is invalid"
);
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read)?;
return Ok((page, c));
};
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64, frame_watermark)? {
let c = wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
page.set_uptodate();
}
// TODO(pere) should probably first insert to page cache, and if successful,
// read frame or page
return Ok((page, c));
}
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read)?;
Ok((page, c))
}
/// Reads a page from the database.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> {
@@ -940,39 +977,23 @@ impl Pager {
// Dummy completion being passed, as we do not need to read from database or wal
return Ok((page.clone(), Completion::new_write(|_| {})));
}
let page = Arc::new(Page::new(page_idx));
page.set_locked();
let Some(wal) = self.wal.as_ref() else {
let c = self.begin_read_disk_page(page_idx, page.clone())?;
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
return Ok((page, c));
};
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
let c = wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
page.set_uptodate();
}
// TODO(pere) should probably first insert to page cache, and if successful,
// read frame or page
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
return Ok((page, c));
}
let c = self.begin_read_disk_page(page_idx, page.clone())?;
let (page, c) = self.read_page_no_cache(page_idx, None, false)?;
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
Ok((page, c))
}
fn begin_read_disk_page(&self, page_idx: usize, page: PageRef) -> Result<Completion> {
fn begin_read_disk_page(
&self,
page_idx: usize,
page: PageRef,
allow_empty_read: bool,
) -> Result<Completion> {
sqlite3_ondisk::begin_read_page(
self.db_file.clone(),
self.buffer_pool.clone(),
page,
page_idx,
allow_empty_read,
)
}
@@ -1279,18 +1300,24 @@ impl Pager {
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<Completion> {
pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
let wal = self.wal.as_ref().unwrap().borrow();
wal.changed_pages_after(frame_watermark)
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<Completion> {
let Some(wal) = self.wal.as_ref() else {
return Err(LimboError::InternalError(
"wal_get_frame() called on database without WAL".to_string(),
));
};
let wal = wal.borrow();
wal.read_frame_raw(frame_no.into(), frame)
wal.read_frame_raw(frame_no, frame)
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalFrameInfo> {
pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
let Some(wal) = self.wal.as_ref() else {
return Err(LimboError::InternalError(
"wal_insert_frame() called on database without WAL".to_string(),
@@ -1300,7 +1327,7 @@ impl Pager {
let (header, raw_page) = parse_wal_frame_header(frame);
wal.write_frame_raw(
self.buffer_pool.clone(),
frame_no as u64,
frame_no,
header.page_number as u64,
header.db_size as u64,
raw_page,

View File

@@ -777,12 +777,15 @@ impl PageContent {
}
}
/// Send read request for DB page read to the IO
/// if allow_empty_read is set, than empty read will be raise error for the page, but will not panic
#[instrument(skip_all, level = Level::DEBUG)]
pub fn begin_read_page(
db_file: Arc<dyn DatabaseStorage>,
buffer_pool: Arc<BufferPool>,
page: PageRef,
page_idx: usize,
allow_empty_read: bool,
) -> Result<Completion> {
tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx);
let buf = buffer_pool.get();
@@ -792,13 +795,16 @@ pub fn begin_read_page(
});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let complete = Box::new(move |mut buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf_len = buf.borrow().len();
turso_assert!(
bytes_read == buf_len as i32,
(allow_empty_read && bytes_read == 0) || bytes_read == buf_len as i32,
"read({bytes_read}) != expected({buf_len})"
);
let page = page.clone();
if bytes_read == 0 {
buf = Arc::new(RefCell::new(Buffer::allocate(0, Rc::new(|_| {}))));
}
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.set_error();
}

View File

@@ -3,7 +3,7 @@
use std::array;
use std::cell::UnsafeCell;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use strum::EnumString;
use tracing::{instrument, Level};
@@ -223,7 +223,7 @@ pub trait Wal {
fn end_write_tx(&self);
/// Find the latest frame containing a page.
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
fn find_frame(&self, page_id: u64, frame_watermark: Option<u64>) -> Result<Option<u64>>;
/// Read a frame from the WAL.
fn read_frame(
@@ -276,6 +276,9 @@ pub trait Wal {
fn get_min_frame(&self) -> u64;
fn rollback(&mut self) -> Result<()>;
/// Return unique set of pages changed **after** frame_watermark position and until current WAL session max_frame_no
fn changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>>;
#[cfg(debug_assertions)]
fn as_any(&self) -> &dyn std::any::Any;
}
@@ -850,14 +853,22 @@ impl Wal for WalFile {
/// Find the latest frame containing a page.
#[instrument(skip_all, level = Level::DEBUG)]
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
fn find_frame(&self, page_id: u64, frame_watermark: Option<u64>) -> Result<Option<u64>> {
#[cfg(not(feature = "conn_raw_api"))]
turso_assert!(
frame_watermark.is_none(),
"unexpected use of frame_watermark optional argument"
);
// if we are holding read_lock 0, skip and read right from db file.
if self.max_frame_read_lock_index.get() == 0 {
return Ok(None);
}
let shared = self.get_shared();
let frames = shared.frame_cache.lock();
let range = self.min_frame..=self.max_frame;
let range = frame_watermark
.map(|x| 0..=x)
.unwrap_or(self.min_frame..=self.max_frame);
if let Some(list) = frames.get(&page_id) {
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
return Ok(Some(*f));
@@ -1164,6 +1175,24 @@ impl Wal for WalFile {
shared.last_checksum = self.last_checksum;
Ok(())
}
fn changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
let frame_count = self.get_max_frame();
let page_size = self.page_size();
let mut frame = vec![0u8; page_size as usize + WAL_FRAME_HEADER_SIZE];
let mut seen = HashSet::new();
let mut pages = Vec::with_capacity((frame_count - frame_watermark) as usize);
for frame_no in frame_watermark + 1..=frame_count {
let c = self.read_frame_raw(frame_no, &mut frame)?;
self.io.wait_for_completion(c)?;
let (header, _) = sqlite3_ondisk::parse_wal_frame_header(&frame);
if seen.insert(header.page_number) {
pages.push(header.page_number);
}
}
Ok(pages)
}
#[cfg(debug_assertions)]
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -2759,7 +2788,7 @@ pub mod test {
{
let pager = conn1.pager.borrow();
let wal = pager.wal.as_ref().unwrap().borrow();
let frame = wal.find_frame(5);
let frame = wal.find_frame(5, None);
// since we hold readlock0, we should ignore the db file and find_frame should return none
assert!(frame.is_ok_and(|f| f.is_none()));
}

View File

@@ -2599,6 +2599,10 @@ impl WalFrameInfo {
pub fn is_commit_frame(&self) -> bool {
self.db_size > 0
}
pub fn put_to_frame_header(&self, frame: &mut [u8]) {
frame[0..4].copy_from_slice(&self.page_no.to_be_bytes());
frame[4..8].copy_from_slice(&self.db_size.to_be_bytes());
}
}
#[cfg(test)]

View File

@@ -369,7 +369,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
if !wal_session.in_txn() {
wal_session.begin()?;
}
let wal_frame_info = clean_conn.wal_insert_frame(frame_no as u32, &buffer)?;
let wal_frame_info = clean_conn.wal_insert_frame(frame_no as u64, &buffer)?;
if wal_frame_info.is_commit_frame() {
wal_session.end()?;
// transaction boundary reached - it's safe to commit progress
@@ -437,7 +437,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
let mut buffer = [0u8; FRAME_SIZE];
for frame_no in (frame_no + 1)..=clean_frames {
clean_conn.wal_get_frame(frame_no as u32, &mut buffer)?;
clean_conn.wal_get_frame(frame_no as u64, &mut buffer)?;
frames.extend_from_slice(&buffer);
frames_cnt += 1;
}

View File

@@ -185,7 +185,7 @@ impl SyncServer for TestSyncServer {
session.in_txn = true;
}
let frame = &frames[offset..offset + FRAME_SIZE];
match session.conn.wal_insert_frame(frame_no as u32, frame) {
match session.conn.wal_insert_frame(frame_no as u64, frame) {
Ok(info) => {
if info.is_commit_frame() {
if session.in_txn {
@@ -276,7 +276,7 @@ impl TestSyncServer {
let wal_frame_count = conn.wal_frame_count()?;
tracing::debug!("conn frames count: {}", wal_frame_count);
for frame_no in last_frame..=wal_frame_count as usize {
conn.wal_get_frame(frame_no as u32, &mut frame)?;
conn.wal_get_frame(frame_no as u64, &mut frame)?;
tracing::debug!("push local frame {}", frame_no);
generation.frames.push(frame.to_vec());
}

View File

@@ -1213,8 +1213,8 @@ pub unsafe extern "C" fn libsql_wal_get_frame(
let db: &mut sqlite3 = &mut *db;
let db = db.inner.lock().unwrap();
let frame = std::slice::from_raw_parts_mut(p_frame, frame_len as usize);
match db.conn.wal_get_frame(frame_no, frame) {
Ok(()) => SQLITE_OK,
match db.conn.wal_get_frame(frame_no as u64, frame) {
Ok(..) => SQLITE_OK,
Err(_) => SQLITE_ERROR,
}
}
@@ -1250,7 +1250,7 @@ pub unsafe extern "C" fn libsql_wal_insert_frame(
let db: &mut sqlite3 = &mut *db;
let db = db.inner.lock().unwrap();
let frame = std::slice::from_raw_parts(p_frame, frame_len as usize);
match db.conn.wal_insert_frame(frame_no, frame) {
match db.conn.wal_insert_frame(frame_no as u64, frame) {
Ok(_) => SQLITE_OK,
Err(LimboError::Conflict(..)) => {
if !p_conflict.is_null() {

View File

@@ -1,6 +1,9 @@
use std::{collections::HashSet, sync::Arc};
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use rusqlite::types::Value;
use turso_core::types::WalFrameInfo;
use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase};
@@ -41,7 +44,7 @@ fn test_wal_frame_transfer_no_schema_changes() {
assert_eq!(conn1.wal_frame_count().unwrap(), 15);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
let frames_count = conn1.wal_frame_count().unwrap() as u32;
let frames_count = conn1.wal_frame_count().unwrap();
for frame_id in 1..=frames_count {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
@@ -72,7 +75,7 @@ fn test_wal_frame_transfer_various_schema_changes() {
let mut frame = [0u8; 24 + 4096];
let mut synced_frame = 0;
let mut sync = || {
let last_frame = conn1.wal_frame_count().unwrap() as u32;
let last_frame = conn1.wal_frame_count().unwrap();
conn2.wal_insert_begin().unwrap();
for frame_id in (synced_frame + 1)..=last_frame {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
@@ -137,7 +140,7 @@ fn test_wal_frame_transfer_schema_changes() {
let mut frame = [0u8; 24 + 4096];
let mut commits = 0;
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 {
for frame_id in 1..=conn1.wal_frame_count().unwrap() {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
let info = conn2.wal_insert_frame(frame_id, &frame).unwrap();
if info.is_commit_frame() {
@@ -176,7 +179,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() {
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
// Intentionally leave out the final commit frame, so the big randomblob is not committed and should not be visible to transactions.
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
@@ -211,7 +214,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() {
assert_eq!(conn1.wal_frame_count().unwrap(), 14);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
@@ -311,8 +314,8 @@ fn test_wal_frame_api_no_schema_changes_fuzz() {
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_no in (synced_frame + 1)..=next_frame {
conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap();
conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap();
conn1.wal_get_frame(frame_no, &mut frame).unwrap();
conn2.wal_insert_frame(frame_no, &frame[..]).unwrap();
}
conn2.wal_insert_end().unwrap();
for (i, committed) in commit_frames.iter().enumerate() {
@@ -332,3 +335,126 @@ fn test_wal_frame_api_no_schema_changes_fuzz() {
}
}
}
#[test]
fn test_wal_api_changed_pages() {
let db1 = TempDatabase::new_empty(false);
let conn1 = db1.connect_limbo();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn1
.execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y)")
.unwrap();
assert_eq!(
conn1
.wal_changed_pages_after(0)
.unwrap()
.into_iter()
.collect::<HashSet<_>>(),
HashSet::from([1, 2, 3])
);
let frames = conn1.wal_frame_count().unwrap();
conn1.execute("INSERT INTO t VALUES (1, 2)").unwrap();
conn1.execute("INSERT INTO t VALUES (3, 4)").unwrap();
assert_eq!(
conn1
.wal_changed_pages_after(frames)
.unwrap()
.into_iter()
.collect::<HashSet<_>>(),
HashSet::from([2])
);
let frames = conn1.wal_frame_count().unwrap();
conn1
.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))")
.unwrap();
assert_eq!(
conn1
.wal_changed_pages_after(frames)
.unwrap()
.into_iter()
.collect::<HashSet<_>>(),
HashSet::from([1, 2, 4, 5])
);
}
fn revert_to(conn: &Arc<turso_core::Connection>, frame_watermark: u64) -> turso_core::Result<()> {
let mut frame = [0u8; 4096 + 24];
let frame_watermark_info = conn.wal_get_frame(frame_watermark, &mut frame)?;
let changed_pages = conn.wal_changed_pages_after(frame_watermark)?;
conn.wal_insert_begin()?;
let mut frames = Vec::new();
for page_id in changed_pages {
let has_page =
conn.try_wal_watermark_read_page(page_id, &mut frame[24..], Some(frame_watermark))?;
if !has_page {
continue;
}
frames.push((page_id, frame.clone()));
}
let mut frame_no = conn.wal_frame_count().unwrap();
for (i, (page_id, mut frame)) in frames.iter().enumerate() {
let info = WalFrameInfo {
db_size: if i == frames.len() - 1 {
frame_watermark_info.db_size
} else {
0
},
page_no: *page_id,
};
info.put_to_frame_header(&mut frame);
frame_no += 1;
conn.wal_insert_frame(frame_no, &frame)?;
}
conn.wal_insert_end()?;
Ok(())
}
#[test]
fn test_wal_api_revert_pages() {
let db1 = TempDatabase::new_empty(false);
let conn1 = db1.connect_limbo();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
let watermark1 = conn1.wal_frame_count().unwrap();
conn1
.execute("INSERT INTO t VALUES (1, randomblob(10))")
.unwrap();
let watermark2 = conn1.wal_frame_count().unwrap();
conn1
.execute("INSERT INTO t VALUES (3, randomblob(20))")
.unwrap();
conn1
.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))")
.unwrap();
assert_eq!(
limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"),
vec![
vec![Value::Integer(1), Value::Integer(10)],
vec![Value::Integer(3), Value::Integer(20)],
vec![Value::Integer(1024), Value::Integer(4096 * 2)],
]
);
revert_to(&conn1, watermark2).unwrap();
assert_eq!(
limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"),
vec![vec![Value::Integer(1), Value::Integer(10)],]
);
revert_to(&conn1, watermark1).unwrap();
assert_eq!(
limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"),
vec![] as Vec<Vec<Value>>,
);
}