mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-17 14:04:34 +01:00
Merge 'extend raw WAL API with few more methods' from Nikita Sivukhin
This PR extends raw WAL API with few methods which will be helpful for
offline-sync:
1. `try_wal_watermark_read_page` - try to read page from the DB with
given WAL watermark value\
* Usually, WAL max_frame is set automatically to the latest value
(`shared.max_frame`) when transaction is started and then this
"watermark" is preserved throughout whole transaction
* New method allows to simulate "read from the past" by controlling
frame watermark explicitly
* There is an alternative to implement some API like
`start_read_session(frame_watermark: u64)` - but I decided to expose
just single method to simplify the logic and reduce "surface" of actions
which can be executed in this "controllable" manner
* Also, for simplicity, now `try_wal_watermark_read_page` always
read data from disk and bypass any cached values (and also do not
populate the cache)
2. `wal_changed_pages_after` - return set of unique pages changed after
watermark WAL position in the current WAL session
With these 2 methods we can implement `REVERT frame_watermark` logic
which will just fetch all changed pages first, and then revert them to
the previous value by using `try_wal_watermark_read_page` and
`wal_insert_frame` methods (see `test_wal_api_revert_pages` test).
Note, that if there were schema changes - than `REVERT` logic described
above can bring connection to the inconsistent state, as it will
preserve schema information in memory and will still think that table
exist (while it can be reverted). This should be considered by any
consumer of this new methods.
Closes #2433
This commit is contained in:
@@ -192,6 +192,33 @@ impl Connection {
|
||||
.map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
|
||||
}
|
||||
|
||||
#[cfg(feature = "conn_raw_api")]
|
||||
pub fn try_wal_watermark_read_page(
|
||||
&self,
|
||||
page_idx: u32,
|
||||
page: &mut [u8],
|
||||
frame_watermark: Option<u64>,
|
||||
) -> Result<bool> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
.map_err(|e| Error::MutexError(e.to_string()))?;
|
||||
conn.try_wal_watermark_read_page(page_idx, page, frame_watermark)
|
||||
.map_err(|e| {
|
||||
Error::WalOperationError(format!("try_wal_watermark_read_page failed: {e}"))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "conn_raw_api")]
|
||||
pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
.map_err(|e| Error::MutexError(e.to_string()))?;
|
||||
conn.wal_changed_pages_after(frame_watermark)
|
||||
.map_err(|e| Error::WalOperationError(format!("wal_changed_pages_after failed: {e}")))
|
||||
}
|
||||
|
||||
#[cfg(feature = "conn_raw_api")]
|
||||
pub fn wal_insert_begin(&self) -> Result<()> {
|
||||
let conn = self
|
||||
@@ -213,7 +240,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
#[cfg(feature = "conn_raw_api")]
|
||||
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalFrameInfo> {
|
||||
pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
@@ -223,7 +250,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
#[cfg(feature = "conn_raw_api")]
|
||||
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> {
|
||||
pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<WalFrameInfo> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
|
||||
52
core/lib.rs
52
core/lib.rs
@@ -1141,22 +1141,68 @@ impl Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to read page with given ID with fixed WAL watermark position
|
||||
/// This method return false if page is not found (so, this is probably new page created after watermark position which wasn't checkpointed to the DB file yet)
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn try_wal_watermark_read_page(
|
||||
&self,
|
||||
page_idx: u32,
|
||||
page: &mut [u8],
|
||||
frame_watermark: Option<u64>,
|
||||
) -> Result<bool> {
|
||||
let pager = self.pager.borrow();
|
||||
let (page_ref, c) = match pager.read_page_no_cache(page_idx as usize, frame_watermark, true)
|
||||
{
|
||||
Ok(result) => result,
|
||||
// on windows, zero read will trigger UnexpectedEof
|
||||
#[cfg(target_os = "windows")]
|
||||
Err(LimboError::IOError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
return Ok(false)
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
pager.io.wait_for_completion(c)?;
|
||||
|
||||
let content = page_ref.get_contents();
|
||||
// empty read - attempt to read absent page
|
||||
if content.buffer.borrow().is_empty() {
|
||||
return Ok(false);
|
||||
}
|
||||
page.copy_from_slice(content.as_ptr());
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Return unique set of page numbers changes after WAL watermark position in the current WAL session
|
||||
/// (so, if concurrent connection wrote something to the WAL - this method will not see this change)
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
|
||||
self.pager.borrow().wal_changed_pages_after(frame_watermark)
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn wal_frame_count(&self) -> Result<u64> {
|
||||
self.pager.borrow().wal_frame_count()
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> {
|
||||
pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<WalFrameInfo> {
|
||||
use crate::storage::sqlite3_ondisk::parse_wal_frame_header;
|
||||
|
||||
let c = self.pager.borrow().wal_get_frame(frame_no, frame)?;
|
||||
self._db.io.wait_for_completion(c)
|
||||
self._db.io.wait_for_completion(c)?;
|
||||
let (header, _) = parse_wal_frame_header(frame);
|
||||
Ok(WalFrameInfo {
|
||||
page_no: header.page_number,
|
||||
db_size: header.db_size,
|
||||
})
|
||||
}
|
||||
|
||||
/// Insert `frame` (header included) at the position `frame_no` in the WAL
|
||||
/// If WAL already has frame at that position - turso-db will compare content of the page and either report conflict or return OK
|
||||
/// If attempt to write frame at the position `frame_no` will create gap in the WAL - method will return error
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalFrameInfo> {
|
||||
pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
|
||||
self.pager.borrow().wal_insert_frame(frame_no, frame)
|
||||
}
|
||||
|
||||
|
||||
@@ -929,6 +929,43 @@ impl Pager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads a page from disk (either WAL or DB file) bypassing page-cache
|
||||
#[tracing::instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn read_page_no_cache(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
frame_watermark: Option<u64>,
|
||||
allow_empty_read: bool,
|
||||
) -> Result<(PageRef, Completion)> {
|
||||
tracing::trace!("read_page_no_cache(page_idx = {})", page_idx);
|
||||
let page = Arc::new(Page::new(page_idx));
|
||||
page.set_locked();
|
||||
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
turso_assert!(
|
||||
matches!(frame_watermark, Some(0) | None),
|
||||
"frame_watermark must be either None or Some(0) because DB has no WAL and read with other watermark is invalid"
|
||||
);
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read)?;
|
||||
return Ok((page, c));
|
||||
};
|
||||
|
||||
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64, frame_watermark)? {
|
||||
let c = wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) should probably first insert to page cache, and if successful,
|
||||
// read frame or page
|
||||
return Ok((page, c));
|
||||
}
|
||||
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read)?;
|
||||
Ok((page, c))
|
||||
}
|
||||
|
||||
/// Reads a page from the database.
|
||||
#[tracing::instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> {
|
||||
@@ -940,39 +977,23 @@ impl Pager {
|
||||
// Dummy completion being passed, as we do not need to read from database or wal
|
||||
return Ok((page.clone(), Completion::new_write(|_| {})));
|
||||
}
|
||||
let page = Arc::new(Page::new(page_idx));
|
||||
page.set_locked();
|
||||
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone())?;
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
return Ok((page, c));
|
||||
};
|
||||
|
||||
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
|
||||
let c = wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) should probably first insert to page cache, and if successful,
|
||||
// read frame or page
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
return Ok((page, c));
|
||||
}
|
||||
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone())?;
|
||||
let (page, c) = self.read_page_no_cache(page_idx, None, false)?;
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
Ok((page, c))
|
||||
}
|
||||
|
||||
fn begin_read_disk_page(&self, page_idx: usize, page: PageRef) -> Result<Completion> {
|
||||
fn begin_read_disk_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
page: PageRef,
|
||||
allow_empty_read: bool,
|
||||
) -> Result<Completion> {
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
page,
|
||||
page_idx,
|
||||
allow_empty_read,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1279,18 +1300,24 @@ impl Pager {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<Completion> {
|
||||
pub fn wal_changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
|
||||
let wal = self.wal.as_ref().unwrap().borrow();
|
||||
wal.changed_pages_after(frame_watermark)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_get_frame(&self, frame_no: u64, frame: &mut [u8]) -> Result<Completion> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_get_frame() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let wal = wal.borrow();
|
||||
wal.read_frame_raw(frame_no.into(), frame)
|
||||
wal.read_frame_raw(frame_no, frame)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalFrameInfo> {
|
||||
pub fn wal_insert_frame(&self, frame_no: u64, frame: &[u8]) -> Result<WalFrameInfo> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_insert_frame() called on database without WAL".to_string(),
|
||||
@@ -1300,7 +1327,7 @@ impl Pager {
|
||||
let (header, raw_page) = parse_wal_frame_header(frame);
|
||||
wal.write_frame_raw(
|
||||
self.buffer_pool.clone(),
|
||||
frame_no as u64,
|
||||
frame_no,
|
||||
header.page_number as u64,
|
||||
header.db_size as u64,
|
||||
raw_page,
|
||||
|
||||
@@ -777,12 +777,15 @@ impl PageContent {
|
||||
}
|
||||
}
|
||||
|
||||
/// Send read request for DB page read to the IO
|
||||
/// if allow_empty_read is set, than empty read will be raise error for the page, but will not panic
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn begin_read_page(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
page: PageRef,
|
||||
page_idx: usize,
|
||||
allow_empty_read: bool,
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx);
|
||||
let buf = buffer_pool.get();
|
||||
@@ -792,13 +795,16 @@ pub fn begin_read_page(
|
||||
});
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
|
||||
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
|
||||
let complete = Box::new(move |mut buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
|
||||
let buf_len = buf.borrow().len();
|
||||
turso_assert!(
|
||||
bytes_read == buf_len as i32,
|
||||
(allow_empty_read && bytes_read == 0) || bytes_read == buf_len as i32,
|
||||
"read({bytes_read}) != expected({buf_len})"
|
||||
);
|
||||
let page = page.clone();
|
||||
if bytes_read == 0 {
|
||||
buf = Arc::new(RefCell::new(Buffer::allocate(0, Rc::new(|_| {}))));
|
||||
}
|
||||
if finish_read_page(page_idx, buf, page.clone()).is_err() {
|
||||
page.set_error();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use std::array;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use strum::EnumString;
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
@@ -223,7 +223,10 @@ pub trait Wal {
|
||||
fn end_write_tx(&self);
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
|
||||
///
|
||||
/// optional frame_watermark parameter can be passed to force WAL to find frame not larger than watermark value
|
||||
/// caller must guarantee, that frame_watermark must be greater than last checkpointed frame, otherwise method will panic
|
||||
fn find_frame(&self, page_id: u64, frame_watermark: Option<u64>) -> Result<Option<u64>>;
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
fn read_frame(
|
||||
@@ -276,6 +279,9 @@ pub trait Wal {
|
||||
fn get_min_frame(&self) -> u64;
|
||||
fn rollback(&mut self) -> Result<()>;
|
||||
|
||||
/// Return unique set of pages changed **after** frame_watermark position and until current WAL session max_frame_no
|
||||
fn changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>>;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
fn as_any(&self) -> &dyn std::any::Any;
|
||||
}
|
||||
@@ -845,14 +851,36 @@ impl Wal for WalFile {
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
|
||||
fn find_frame(&self, page_id: u64, frame_watermark: Option<u64>) -> Result<Option<u64>> {
|
||||
#[cfg(not(feature = "conn_raw_api"))]
|
||||
turso_assert!(
|
||||
frame_watermark.is_none(),
|
||||
"unexpected use of frame_watermark optional argument"
|
||||
);
|
||||
|
||||
turso_assert!(
|
||||
frame_watermark.unwrap_or(0) <= self.max_frame,
|
||||
"frame_watermark must be <= than current WAL max_frame value"
|
||||
);
|
||||
|
||||
// we can guarantee correctness of the method, only if frame_watermark is strictly after the current checkpointed prefix
|
||||
//
|
||||
// if it's not, than pages from WAL range [frame_watermark..nBackfill] are already in the DB file,
|
||||
// and in case if page first occurrence in WAL was after frame_watermark - we will be unable to read proper previous version of the page
|
||||
turso_assert!(
|
||||
frame_watermark.is_none() || frame_watermark.unwrap() >= self.min_frame,
|
||||
"frame_watermark must be >= than current WAL min_value value"
|
||||
);
|
||||
|
||||
// if we are holding read_lock 0, skip and read right from db file.
|
||||
if self.max_frame_read_lock_index.get() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
let shared = self.get_shared();
|
||||
let frames = shared.frame_cache.lock();
|
||||
let range = self.min_frame..=self.max_frame;
|
||||
let range = frame_watermark
|
||||
.map(|x| 0..=x)
|
||||
.unwrap_or(self.min_frame..=self.max_frame);
|
||||
if let Some(list) = frames.get(&page_id) {
|
||||
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
|
||||
return Ok(Some(*f));
|
||||
@@ -1161,6 +1189,24 @@ impl Wal for WalFile {
|
||||
shared.last_checksum = self.last_checksum;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
|
||||
let frame_count = self.get_max_frame();
|
||||
let page_size = self.page_size();
|
||||
let mut frame = vec![0u8; page_size as usize + WAL_FRAME_HEADER_SIZE];
|
||||
let mut seen = HashSet::new();
|
||||
let mut pages = Vec::with_capacity((frame_count - frame_watermark) as usize);
|
||||
for frame_no in frame_watermark + 1..=frame_count {
|
||||
let c = self.read_frame_raw(frame_no, &mut frame)?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
let (header, _) = sqlite3_ondisk::parse_wal_frame_header(&frame);
|
||||
if seen.insert(header.page_number) {
|
||||
pages.push(header.page_number);
|
||||
}
|
||||
}
|
||||
Ok(pages)
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
@@ -2758,7 +2804,7 @@ pub mod test {
|
||||
{
|
||||
let pager = conn1.pager.borrow();
|
||||
let wal = pager.wal.as_ref().unwrap().borrow();
|
||||
let frame = wal.find_frame(5);
|
||||
let frame = wal.find_frame(5, None);
|
||||
// since we hold readlock0, we should ignore the db file and find_frame should return none
|
||||
assert!(frame.is_ok_and(|f| f.is_none()));
|
||||
}
|
||||
|
||||
@@ -2599,6 +2599,10 @@ impl WalFrameInfo {
|
||||
pub fn is_commit_frame(&self) -> bool {
|
||||
self.db_size > 0
|
||||
}
|
||||
pub fn put_to_frame_header(&self, frame: &mut [u8]) {
|
||||
frame[0..4].copy_from_slice(&self.page_no.to_be_bytes());
|
||||
frame[4..8].copy_from_slice(&self.db_size.to_be_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -369,7 +369,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
if !wal_session.in_txn() {
|
||||
wal_session.begin()?;
|
||||
}
|
||||
let wal_frame_info = clean_conn.wal_insert_frame(frame_no as u32, &buffer)?;
|
||||
let wal_frame_info = clean_conn.wal_insert_frame(frame_no as u64, &buffer)?;
|
||||
if wal_frame_info.is_commit_frame() {
|
||||
wal_session.end()?;
|
||||
// transaction boundary reached - it's safe to commit progress
|
||||
@@ -437,7 +437,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
|
||||
let mut buffer = [0u8; FRAME_SIZE];
|
||||
for frame_no in (frame_no + 1)..=clean_frames {
|
||||
clean_conn.wal_get_frame(frame_no as u32, &mut buffer)?;
|
||||
clean_conn.wal_get_frame(frame_no as u64, &mut buffer)?;
|
||||
frames.extend_from_slice(&buffer);
|
||||
frames_cnt += 1;
|
||||
}
|
||||
|
||||
@@ -185,7 +185,7 @@ impl SyncServer for TestSyncServer {
|
||||
session.in_txn = true;
|
||||
}
|
||||
let frame = &frames[offset..offset + FRAME_SIZE];
|
||||
match session.conn.wal_insert_frame(frame_no as u32, frame) {
|
||||
match session.conn.wal_insert_frame(frame_no as u64, frame) {
|
||||
Ok(info) => {
|
||||
if info.is_commit_frame() {
|
||||
if session.in_txn {
|
||||
@@ -276,7 +276,7 @@ impl TestSyncServer {
|
||||
let wal_frame_count = conn.wal_frame_count()?;
|
||||
tracing::debug!("conn frames count: {}", wal_frame_count);
|
||||
for frame_no in last_frame..=wal_frame_count as usize {
|
||||
conn.wal_get_frame(frame_no as u32, &mut frame)?;
|
||||
conn.wal_get_frame(frame_no as u64, &mut frame)?;
|
||||
tracing::debug!("push local frame {}", frame_no);
|
||||
generation.frames.push(frame.to_vec());
|
||||
}
|
||||
|
||||
@@ -1213,8 +1213,8 @@ pub unsafe extern "C" fn libsql_wal_get_frame(
|
||||
let db: &mut sqlite3 = &mut *db;
|
||||
let db = db.inner.lock().unwrap();
|
||||
let frame = std::slice::from_raw_parts_mut(p_frame, frame_len as usize);
|
||||
match db.conn.wal_get_frame(frame_no, frame) {
|
||||
Ok(()) => SQLITE_OK,
|
||||
match db.conn.wal_get_frame(frame_no as u64, frame) {
|
||||
Ok(..) => SQLITE_OK,
|
||||
Err(_) => SQLITE_ERROR,
|
||||
}
|
||||
}
|
||||
@@ -1250,7 +1250,7 @@ pub unsafe extern "C" fn libsql_wal_insert_frame(
|
||||
let db: &mut sqlite3 = &mut *db;
|
||||
let db = db.inner.lock().unwrap();
|
||||
let frame = std::slice::from_raw_parts(p_frame, frame_len as usize);
|
||||
match db.conn.wal_insert_frame(frame_no, frame) {
|
||||
match db.conn.wal_insert_frame(frame_no as u64, frame) {
|
||||
Ok(_) => SQLITE_OK,
|
||||
Err(LimboError::Conflict(..)) => {
|
||||
if !p_conflict.is_null() {
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use rusqlite::types::Value;
|
||||
use turso_core::types::WalFrameInfo;
|
||||
|
||||
use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase};
|
||||
|
||||
@@ -41,7 +44,7 @@ fn test_wal_frame_transfer_no_schema_changes() {
|
||||
assert_eq!(conn1.wal_frame_count().unwrap(), 15);
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
let frames_count = conn1.wal_frame_count().unwrap() as u32;
|
||||
let frames_count = conn1.wal_frame_count().unwrap();
|
||||
for frame_id in 1..=frames_count {
|
||||
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
|
||||
conn2.wal_insert_frame(frame_id, &frame).unwrap();
|
||||
@@ -72,7 +75,7 @@ fn test_wal_frame_transfer_various_schema_changes() {
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
let mut synced_frame = 0;
|
||||
let mut sync = || {
|
||||
let last_frame = conn1.wal_frame_count().unwrap() as u32;
|
||||
let last_frame = conn1.wal_frame_count().unwrap();
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
for frame_id in (synced_frame + 1)..=last_frame {
|
||||
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
|
||||
@@ -137,7 +140,7 @@ fn test_wal_frame_transfer_schema_changes() {
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
let mut commits = 0;
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 {
|
||||
for frame_id in 1..=conn1.wal_frame_count().unwrap() {
|
||||
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
|
||||
let info = conn2.wal_insert_frame(frame_id, &frame).unwrap();
|
||||
if info.is_commit_frame() {
|
||||
@@ -176,7 +179,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() {
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
// Intentionally leave out the final commit frame, so the big randomblob is not committed and should not be visible to transactions.
|
||||
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
|
||||
for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) {
|
||||
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
|
||||
conn2.wal_insert_frame(frame_id, &frame).unwrap();
|
||||
}
|
||||
@@ -211,7 +214,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() {
|
||||
assert_eq!(conn1.wal_frame_count().unwrap(), 14);
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
|
||||
for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) {
|
||||
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
|
||||
conn2.wal_insert_frame(frame_id, &frame).unwrap();
|
||||
}
|
||||
@@ -311,8 +314,8 @@ fn test_wal_frame_api_no_schema_changes_fuzz() {
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
for frame_no in (synced_frame + 1)..=next_frame {
|
||||
conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap();
|
||||
conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap();
|
||||
conn1.wal_get_frame(frame_no, &mut frame).unwrap();
|
||||
conn2.wal_insert_frame(frame_no, &frame[..]).unwrap();
|
||||
}
|
||||
conn2.wal_insert_end().unwrap();
|
||||
for (i, committed) in commit_frames.iter().enumerate() {
|
||||
@@ -332,3 +335,126 @@ fn test_wal_frame_api_no_schema_changes_fuzz() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_api_changed_pages() {
|
||||
let db1 = TempDatabase::new_empty(false);
|
||||
let conn1 = db1.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
conn1
|
||||
.wal_changed_pages_after(0)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>(),
|
||||
HashSet::from([1, 2, 3])
|
||||
);
|
||||
let frames = conn1.wal_frame_count().unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 2)").unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (3, 4)").unwrap();
|
||||
assert_eq!(
|
||||
conn1
|
||||
.wal_changed_pages_after(frames)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>(),
|
||||
HashSet::from([2])
|
||||
);
|
||||
let frames = conn1.wal_frame_count().unwrap();
|
||||
conn1
|
||||
.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))")
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
conn1
|
||||
.wal_changed_pages_after(frames)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>(),
|
||||
HashSet::from([1, 2, 4, 5])
|
||||
);
|
||||
}
|
||||
|
||||
fn revert_to(conn: &Arc<turso_core::Connection>, frame_watermark: u64) -> turso_core::Result<()> {
|
||||
let mut frame = [0u8; 4096 + 24];
|
||||
let frame_watermark_info = conn.wal_get_frame(frame_watermark, &mut frame)?;
|
||||
|
||||
let changed_pages = conn.wal_changed_pages_after(frame_watermark)?;
|
||||
|
||||
conn.wal_insert_begin()?;
|
||||
let mut frames = Vec::new();
|
||||
for page_id in changed_pages {
|
||||
let has_page =
|
||||
conn.try_wal_watermark_read_page(page_id, &mut frame[24..], Some(frame_watermark))?;
|
||||
if !has_page {
|
||||
continue;
|
||||
}
|
||||
frames.push((page_id, frame));
|
||||
}
|
||||
|
||||
let mut frame_no = conn.wal_frame_count().unwrap();
|
||||
for (i, (page_id, mut frame)) in frames.iter().enumerate() {
|
||||
let info = WalFrameInfo {
|
||||
db_size: if i == frames.len() - 1 {
|
||||
frame_watermark_info.db_size
|
||||
} else {
|
||||
0
|
||||
},
|
||||
page_no: *page_id,
|
||||
};
|
||||
info.put_to_frame_header(&mut frame);
|
||||
frame_no += 1;
|
||||
conn.wal_insert_frame(frame_no, &frame)?;
|
||||
}
|
||||
conn.wal_insert_end()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_api_revert_pages() {
|
||||
let db1 = TempDatabase::new_empty(false);
|
||||
let conn1 = db1.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
let watermark1 = conn1.wal_frame_count().unwrap();
|
||||
conn1
|
||||
.execute("INSERT INTO t VALUES (1, randomblob(10))")
|
||||
.unwrap();
|
||||
let watermark2 = conn1.wal_frame_count().unwrap();
|
||||
|
||||
conn1
|
||||
.execute("INSERT INTO t VALUES (3, randomblob(20))")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"),
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(3), Value::Integer(20)],
|
||||
vec![Value::Integer(1024), Value::Integer(4096 * 2)],
|
||||
]
|
||||
);
|
||||
|
||||
revert_to(&conn1, watermark2).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"),
|
||||
vec![vec![Value::Integer(1), Value::Integer(10)],]
|
||||
);
|
||||
|
||||
revert_to(&conn1, watermark1).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
limbo_exec_rows(&db1, &conn1, "SELECT x, length(y) FROM t"),
|
||||
vec![] as Vec<Vec<Value>>,
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user