Merge 'More async' from Nikita Sivukhin

This PR eliminates blocking IO calls (`io.block` /
`io.wait_for_completion`) from common execution paths
I need this because I am trying to make turso in browser work smoothly
and current paradigm doesn't work well as it use same connection from
different threads. But, in order to run DB on main thread only (expect
IO) - we need to eliminate all blocking calls (they block main thread
and it can't exit from this state).
This PR eliminates blocking behaviour from following places:
1. `append_frames` now fully async but `prepare_wal_start` /
`prepare_wal_finish` must be called before it in order to ensure that
WAL header is initialized
2. `op_transaction` is non-blocking and read db header async
3. `op_sorter_open` is non-blocking and read db header async in the
beginnig of execution
4. `op_open_ephemeral` is non-blocking and read db header async in the
beginning of execution (note, that I am also removed weird logic which
read page size from the empty ephemeral DB file)
5. `op_checkpoint` is non blocking and checkpoint itself now have more
complex state machine to handle previously blocking behaviour

Closes #3179
This commit is contained in:
Jussi Saurio
2025-09-18 10:31:25 +03:00
committed by GitHub
5 changed files with 610 additions and 429 deletions

View File

@@ -316,8 +316,16 @@ impl Page {
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache commit.
enum CommitState {
/// Prepare WAL header for commit if needed
PrepareWal,
/// Sync WAL header after prepare
PrepareWalSync,
/// Get DB size (mostly from page cache - but in rare cases we can read it from disk)
GetDbSize,
/// Appends all frames to the WAL.
Start,
PrepareFrames {
db_size: u32,
},
/// Fsync the on-disk WAL.
SyncWal,
/// Checkpoint the WAL to the database file (if needed).
@@ -581,7 +589,7 @@ impl Pager {
commit_info: CommitInfo {
result: RefCell::new(None),
completions: RefCell::new(Vec::new()),
state: CommitState::Start.into(),
state: CommitState::PrepareWal.into(),
time: now.into(),
},
syncing: Rc::new(Cell::new(false)),
@@ -1258,6 +1266,14 @@ impl Pager {
let mut completions: Vec<Completion> = Vec::new();
let mut pages = Vec::with_capacity(len);
let page_sz = self.page_size.get().unwrap_or_default();
let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?;
if let Some(c) = prepare {
self.io.wait_for_completion(c)?;
let c = wal.borrow_mut().prepare_wal_finish()?;
self.io.wait_for_completion(c)?;
}
let commit_frame = None; // cacheflush only so we are not setting a commit frame here
for (idx, page_id) in dirty_pages.iter().enumerate() {
let page = {
@@ -1314,6 +1330,26 @@ impl Pager {
wal_auto_checkpoint_disabled: bool,
sync_mode: crate::SyncMode,
data_sync_retry: bool,
) -> Result<IOResult<PagerCommitResult>> {
match self.commit_dirty_pages_inner(
wal_auto_checkpoint_disabled,
sync_mode,
data_sync_retry,
) {
r @ (Ok(IOResult::Done(..)) | Err(..)) => {
self.commit_info.state.set(CommitState::PrepareWal);
r
}
Ok(IOResult::IO(io)) => Ok(IOResult::IO(io)),
}
}
#[instrument(skip_all, level = Level::DEBUG)]
fn commit_dirty_pages_inner(
&self,
wal_auto_checkpoint_disabled: bool,
sync_mode: crate::SyncMode,
data_sync_retry: bool,
) -> Result<IOResult<PagerCommitResult>> {
let Some(wal) = self.wal.as_ref() else {
return Err(LimboError::InternalError(
@@ -1325,14 +1361,34 @@ impl Pager {
let state = self.commit_info.state.get();
trace!(?state);
match state {
CommitState::Start => {
CommitState::PrepareWal => {
let page_sz = self.page_size.get().expect("page size not set");
let c = wal.borrow_mut().prepare_wal_start(page_sz)?;
let Some(c) = c else {
self.commit_info.state.set(CommitState::GetDbSize);
continue;
};
self.commit_info.state.set(CommitState::PrepareWalSync);
if !c.is_completed() {
io_yield_one!(c);
}
}
CommitState::PrepareWalSync => {
let c = wal.borrow_mut().prepare_wal_finish()?;
self.commit_info.state.set(CommitState::GetDbSize);
if !c.is_completed() {
io_yield_one!(c);
}
}
CommitState::GetDbSize => {
let db_size = return_if_io!(self.with_header(|header| header.database_size));
self.commit_info.state.set(CommitState::PrepareFrames {
db_size: db_size.get(),
});
}
CommitState::PrepareFrames { db_size } => {
let now = self.io.now();
self.commit_info.time.set(now);
let db_size_after = {
self.io
.block(|| self.with_header(|header| header.database_size))?
.get()
};
let dirty_ids: Vec<usize> = self.dirty_pages.read().iter().copied().collect();
if dirty_ids.is_empty() {
@@ -1364,7 +1420,7 @@ impl Pager {
if end_of_chunk {
let commit_flag = if i == total - 1 {
// Only the commit frame (final) frame carries the db_size
Some(db_size_after)
Some(db_size)
} else {
None
};
@@ -1473,7 +1529,7 @@ impl Pager {
let mut completions = self.commit_info.completions.borrow_mut();
if completions.iter().all(|c| c.is_completed()) {
completions.clear();
self.commit_info.state.set(CommitState::Start);
self.commit_info.state.set(CommitState::PrepareWal);
wal.borrow_mut().finish_append_frames_commit()?;
let result = self.commit_info.result.borrow_mut().take();
return Ok(IOResult::Done(result.expect("commit result should be set")));
@@ -1645,15 +1701,20 @@ impl Pager {
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
pub fn wal_checkpoint_start(&self, mode: CheckpointMode) -> Result<IOResult<CheckpointResult>> {
let Some(wal) = self.wal.as_ref() else {
return Err(LimboError::InternalError(
"wal_checkpoint() called on database without WAL".to_string(),
));
};
let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?;
wal.borrow_mut().checkpoint(self, mode)
}
pub fn wal_checkpoint_finish(
&self,
checkpoint_result: &mut CheckpointResult,
) -> Result<IOResult<()>> {
'ensure_sync: {
if checkpoint_result.num_backfilled != 0 {
if checkpoint_result.everything_backfilled() {
@@ -1664,27 +1725,35 @@ impl Pager {
let page_size = self.page_size.get().unwrap_or_default();
let expected = (db_size * page_size.get()) as u64;
if expected < self.db_file.size()? {
self.io.wait_for_completion(self.db_file.truncate(
expected as usize,
Completion::new_trunc(move |_| {
tracing::trace!(
"Database file truncated to expected size: {} bytes",
expected
);
}),
)?)?;
self.io
.wait_for_completion(self.db_file.sync(Completion::new_sync(
move |_| {
tracing::trace!("Database file syncd after truncation");
},
))?)?;
if !checkpoint_result.db_truncate_sent {
let c = self.db_file.truncate(
expected as usize,
Completion::new_trunc(move |_| {
tracing::trace!(
"Database file truncated to expected size: {} bytes",
expected
);
}),
)?;
checkpoint_result.db_truncate_sent = true;
io_yield_one!(c);
}
if !checkpoint_result.db_sync_sent {
let c = self.db_file.sync(Completion::new_sync(move |_| {
tracing::trace!("Database file syncd after truncation");
}))?;
checkpoint_result.db_sync_sent = true;
io_yield_one!(c);
}
break 'ensure_sync;
}
}
// if we backfilled at all, we have to sync the db-file here
self.io
.wait_for_completion(self.db_file.sync(Completion::new_sync(move |_| {}))?)?;
if !checkpoint_result.db_sync_sent {
// if we backfilled at all, we have to sync the db-file here
let c = self.db_file.sync(Completion::new_sync(move |_| {}))?;
checkpoint_result.db_sync_sent = true;
io_yield_one!(c);
}
}
}
checkpoint_result.release_guard();
@@ -1693,7 +1762,14 @@ impl Pager {
.write()
.clear()
.map_err(|e| LimboError::InternalError(format!("Failed to clear page cache: {e:?}")))?;
Ok(checkpoint_result)
Ok(IOResult::Done(()))
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
let mut result = self.io.block(|| self.wal_checkpoint_start(mode))?;
self.io.block(|| self.wal_checkpoint_finish(&mut result))?;
Ok(result)
}
pub fn freepage_list(&self) -> u32 {
@@ -2179,7 +2255,7 @@ impl Pager {
fn reset_internal_states(&self) {
*self.checkpoint_state.write() = CheckpointState::Checkpoint;
self.syncing.replace(false);
self.commit_info.state.set(CommitState::Start);
self.commit_info.state.set(CommitState::PrepareWal);
self.commit_info.time.set(self.io.now());
self.allocate_page_state.replace(AllocatePageState::Start);
self.free_page_state.replace(FreePageState::Start);

View File

@@ -24,8 +24,8 @@ use crate::storage::sqlite3_ondisk::{
};
use crate::types::{IOCompletions, IOResult};
use crate::{
bail_corrupt_error, io_yield_many, turso_assert, Buffer, Completion, CompletionError,
IOContext, LimboError, Result,
bail_corrupt_error, io_yield_many, io_yield_one, return_if_io, turso_assert, Buffer,
Completion, CompletionError, IOContext, LimboError, Result,
};
#[derive(Debug, Clone, Default)]
@@ -38,6 +38,8 @@ pub struct CheckpointResult {
/// In the case of everything backfilled, we need to hold the locks until the db
/// file is truncated.
maybe_guard: Option<CheckpointLocks>,
pub db_truncate_sent: bool,
pub db_sync_sent: bool,
}
impl Drop for CheckpointResult {
@@ -53,6 +55,8 @@ impl CheckpointResult {
num_backfilled: n_ckpt,
max_frame,
maybe_guard: None,
db_sync_sent: false,
db_truncate_sent: false,
}
}
@@ -265,18 +269,16 @@ pub trait Wal: Debug {
page: &[u8],
) -> Result<()>;
/// Write a frame to the WAL.
/// db_size is the database size in pages after the transaction finishes.
/// db_size > 0 -> last frame written in transaction
/// db_size == 0 -> non-last frame written in transaction
/// write_counter is the counter we use to track when the I/O operation starts and completes
fn append_frame(
&mut self,
page: PageRef,
page_size: PageSize,
db_size: u32,
) -> Result<Completion>;
/// Prepare WAL header for the future append
/// Most of the time this method will return Ok(None)
fn prepare_wal_start(&mut self, page_sz: PageSize) -> Result<Option<Completion>>;
fn prepare_wal_finish(&mut self) -> Result<Completion>;
/// Write a bunch of frames to the WAL.
/// db_size is the database size in pages after the transaction finishes.
/// db_size is set -> last frame written in transaction
/// db_size is none -> non-last frame written in transaction
fn append_frames_vectored(
&mut self,
pages: Vec<PageRef>,
@@ -316,11 +318,16 @@ pub trait Wal: Debug {
fn as_any(&self) -> &dyn std::any::Any;
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Clone)]
pub enum CheckpointState {
Start,
Processing,
Done,
Finalize,
Truncate {
checkpoint_result: Option<CheckpointResult>,
truncate_sent: bool,
sync_sent: bool,
},
}
/// IOV_MAX is 1024 on most systems, lets use 512 to be safe
@@ -1290,90 +1297,6 @@ impl Wal for WalFile {
Ok(())
}
/// Write a frame to the WAL.
#[instrument(skip_all, level = Level::DEBUG)]
fn append_frame(
&mut self,
page: PageRef,
page_size: PageSize,
db_size: u32,
) -> Result<Completion> {
self.ensure_header_if_needed(page_size)?;
let shared_page_size = {
let shared = self.get_shared();
let page_size = shared.wal_header.lock().page_size;
page_size
};
turso_assert!(
shared_page_size == page_size.get(),
"page size mismatch - tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}",
page_size.get()
);
let page_id = page.get().id;
let frame_id = self.max_frame + 1;
let offset = self.frame_offset(frame_id);
tracing::debug!(frame_id, offset, page_id);
let (c, checksums) = {
let shared = self.get_shared();
let shared_file = self.shared.clone();
let header = shared.wal_header.lock();
let checksums = self.last_checksum;
let page_content = page.get_contents();
let page_buf = page_content.as_ptr();
let io_ctx = self.io_ctx.borrow();
let encrypted_data;
let data_to_write = match &io_ctx.encryption_or_checksum() {
EncryptionOrChecksum::Encryption(ctx) => {
encrypted_data = ctx.encrypt_page(page_buf, page_id)?;
encrypted_data.as_slice()
}
EncryptionOrChecksum::Checksum(ctx) => {
ctx.add_checksum_to_page(page_buf, page_id)?;
page_buf
}
EncryptionOrChecksum::None => page_buf,
};
let (frame_checksums, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
checksums,
header.page_size,
page_id as u32,
db_size,
data_to_write,
);
let c = Completion::new_write({
let frame_bytes = frame_bytes.clone();
move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
let frame_len = frame_bytes.len();
turso_assert!(
bytes_written == frame_len as i32,
"wrote({bytes_written}) != expected({frame_len})"
);
page.clear_dirty();
let seq = shared_file.read().epoch.load(Ordering::Acquire);
page.set_wal_tag(frame_id, seq);
}
});
assert!(
shared.enabled.load(Ordering::Relaxed),
"WAL must be enabled"
);
let file = shared.file.as_ref().unwrap();
let result = file.pwrite(offset, frame_bytes.clone(), c)?;
(result, frame_checksums)
};
self.complete_append_frame(page_id as u64, frame_id, checksums);
Ok(c)
}
#[instrument(skip_all, level = Level::DEBUG)]
fn should_checkpoint(&self) -> bool {
let shared = self.get_shared();
@@ -1487,6 +1410,65 @@ impl Wal for WalFile {
Ok(pages)
}
fn prepare_wal_start(&mut self, page_size: PageSize) -> Result<Option<Completion>> {
if self.get_shared().is_initialized()? {
return Ok(None);
}
tracing::debug!("ensure_header_if_needed");
self.last_checksum = {
let mut shared = self.get_shared_mut();
let checksum = {
let mut hdr = shared.wal_header.lock();
hdr.magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {
WAL_MAGIC_LE
};
if hdr.page_size == 0 {
hdr.page_size = page_size.get();
}
if hdr.salt_1 == 0 && hdr.salt_2 == 0 {
hdr.salt_1 = self.io.generate_random_number() as u32;
hdr.salt_2 = self.io.generate_random_number() as u32;
}
// recompute header checksum
let prefix = &hdr.as_bytes()[..WAL_HEADER_SIZE - 8];
let use_native = (hdr.magic & 1) != 0;
let (c1, c2) = checksum_wal(prefix, &hdr, (0, 0), use_native);
hdr.checksum_1 = c1;
hdr.checksum_2 = c2;
(c1, c2)
};
shared.last_checksum = checksum;
checksum
};
self.max_frame = 0;
let shared = self.get_shared();
assert!(
shared.enabled.load(Ordering::Relaxed),
"WAL must be enabled"
);
let file = shared.file.as_ref().unwrap();
let c = sqlite3_ondisk::begin_write_wal_header(file, &shared.wal_header.lock())?;
Ok(Some(c))
}
fn prepare_wal_finish(&mut self) -> Result<Completion> {
let shared = self.get_shared();
assert!(
shared.enabled.load(Ordering::Relaxed),
"WAL must be enabled"
);
let file = shared.file.as_ref().unwrap();
let shared = self.shared.clone();
let c = file.sync(Completion::new_sync(move |_| {
shared.read().initialized.store(true, Ordering::Release);
}))?;
Ok(c)
}
/// Use pwritev to append many frames to the log at once
fn append_frames_vectored(
&mut self,
@@ -1498,7 +1480,10 @@ impl Wal for WalFile {
pages.len() <= IOV_MAX,
"we limit number of iovecs to IOV_MAX"
);
self.ensure_header_if_needed(page_sz)?;
turso_assert!(
self.get_shared().is_initialized()?,
"WAL must be prepared with prepare_wal_start/prepare_wal_finish method"
);
let (header, shared_page_size, epoch) = {
let shared = self.get_shared();
@@ -1711,54 +1696,12 @@ impl WalFile {
/// the WAL file has been truncated and we are writing the first
/// frame since then. We need to ensure that the header is initialized.
fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> {
if self.get_shared().is_initialized()? {
let Some(c) = self.prepare_wal_start(page_size)? else {
return Ok(());
}
tracing::debug!("ensure_header_if_needed");
self.last_checksum = {
let mut shared = self.get_shared_mut();
let checksum = {
let mut hdr = shared.wal_header.lock();
hdr.magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {
WAL_MAGIC_LE
};
if hdr.page_size == 0 {
hdr.page_size = page_size.get();
}
if hdr.salt_1 == 0 && hdr.salt_2 == 0 {
hdr.salt_1 = self.io.generate_random_number() as u32;
hdr.salt_2 = self.io.generate_random_number() as u32;
}
// recompute header checksum
let prefix = &hdr.as_bytes()[..WAL_HEADER_SIZE - 8];
let use_native = (hdr.magic & 1) != 0;
let (c1, c2) = checksum_wal(prefix, &hdr, (0, 0), use_native);
hdr.checksum_1 = c1;
hdr.checksum_2 = c2;
(c1, c2)
};
shared.last_checksum = checksum;
checksum
};
self.max_frame = 0;
let shared = self.get_shared();
assert!(
shared.enabled.load(Ordering::Relaxed),
"WAL must be enabled"
);
let file = shared.file.as_ref().unwrap();
self.io
.wait_for_completion(sqlite3_ondisk::begin_write_wal_header(
file,
&shared.wal_header.lock(),
)?)?;
self.io
.wait_for_completion(file.sync(Completion::new_sync(|_| {}))?)?;
shared.initialized.store(true, Ordering::Release);
self.io.wait_for_completion(c)?;
let c = self.prepare_wal_finish()?;
self.io.wait_for_completion(c)?;
Ok(())
}
@@ -1768,7 +1711,7 @@ impl WalFile {
mode: CheckpointMode,
) -> Result<IOResult<CheckpointResult>> {
loop {
let state = self.ongoing_checkpoint.state;
let state = &mut self.ongoing_checkpoint.state;
tracing::debug!(?state);
match state {
// Acquire the relevant exclusive locks and checkpoint_lock
@@ -1790,6 +1733,8 @@ impl WalFile {
num_backfilled: self.prev_checkpoint.num_backfilled,
max_frame: nbackfills,
maybe_guard: None,
db_sync_sent: false,
db_truncate_sent: false,
}));
}
// acquire the appropriate exclusive locks depending on the checkpoint mode
@@ -1942,19 +1887,19 @@ impl WalFile {
if !completions.is_empty() {
io_yield_many!(completions);
} else if self.ongoing_checkpoint.complete() {
self.ongoing_checkpoint.state = CheckpointState::Done;
self.ongoing_checkpoint.state = CheckpointState::Finalize;
}
}
// All eligible frames copied to the db file
// Update nBackfills
// In Restart or Truncate mode, we need to restart the log over and possibly truncate the file
// Release all locks and return the current num of wal frames and the amount we backfilled
CheckpointState::Done => {
CheckpointState::Finalize => {
turso_assert!(
self.ongoing_checkpoint.complete(),
"checkpoint pending flush must have finished"
);
let mut checkpoint_result = {
let checkpoint_result = {
let shared = self.get_shared();
let current_mx = shared.max_frame.load(Ordering::Acquire);
let nbackfills = shared.nbackfills.load(Ordering::Acquire);
@@ -2002,6 +1947,28 @@ impl WalFile {
if mode.should_restart_log() {
self.restart_log(mode)?;
}
self.ongoing_checkpoint.state = CheckpointState::Truncate {
checkpoint_result: Some(checkpoint_result),
truncate_sent: false,
sync_sent: false,
};
}
CheckpointState::Truncate { .. } => {
if matches!(mode, CheckpointMode::Truncate { .. }) {
return_if_io!(self.truncate_log());
}
if mode.should_restart_log() {
Self::unlock_after_restart(&self.shared, None);
}
let mut checkpoint_result = {
let CheckpointState::Truncate {
checkpoint_result, ..
} = &mut self.ongoing_checkpoint.state
else {
panic!("unxpected state");
};
checkpoint_result.take().unwrap()
};
// increment wal epoch to ensure no stale pages are used for backfilling
self.get_shared().epoch.fetch_add(1, Ordering::Release);
@@ -2128,62 +2095,89 @@ impl WalFile {
}
}
let unlock = |e: Option<&LimboError>| {
// release all read locks we just acquired, the caller will take care of the others
let shared = self.shared.write();
for idx in 1..shared.read_locks.len() {
shared.read_locks[idx].unlock();
}
if let Some(e) = e {
tracing::error!(
"Failed to restart WAL header: {:?}, releasing read locks",
e
);
}
};
// reinitialize inmemory state
self.get_shared_mut()
.restart_wal_header(&self.io, mode)
.inspect_err(|e| {
unlock(Some(e));
})?;
self.get_shared_mut().restart_wal_header(&self.io, mode);
let cksm = self.get_shared().last_checksum;
self.last_checksum = cksm;
self.max_frame = 0;
self.min_frame = 0;
self.checkpoint_seq.fetch_add(1, Ordering::Release);
Ok(())
}
// For TRUNCATE mode: shrink the WAL file to 0B
if matches!(mode, CheckpointMode::Truncate { .. }) {
let c = Completion::new_trunc(|_| {
tracing::trace!("WAL file truncated to 0B");
});
fn truncate_log(&mut self) -> Result<IOResult<()>> {
let file = {
let shared = self.get_shared();
// for now at least, lets do all this IO syncronously
assert!(
shared.enabled.load(Ordering::Relaxed),
"WAL must be enabled"
);
let file = shared.file.as_ref().unwrap();
let c = file.truncate(0, c).inspect_err(|e| unlock(Some(e)))?;
shared.initialized.store(false, Ordering::Release);
self.io
.wait_for_completion(c)
.inspect_err(|e| unlock(Some(e)))?;
// fsync after truncation
self.io
.wait_for_completion(
file.sync(Completion::new_sync(|_| {
tracing::trace!("WAL file synced after reset/truncation");
}))
.inspect_err(|e| unlock(Some(e)))?,
)
.inspect_err(|e| unlock(Some(e)))?;
}
shared.file.as_ref().unwrap().clone()
};
// release readlocks 1..4
unlock(None);
Ok(())
let CheckpointState::Truncate {
sync_sent,
truncate_sent,
..
} = &mut self.ongoing_checkpoint.state
else {
panic!("unxpected state");
};
if !*truncate_sent {
// For TRUNCATE mode: shrink the WAL file to 0B
let c = Completion::new_trunc({
let shared = self.shared.clone();
move |result| {
if let Err(err) = result {
Self::unlock_after_restart(
&shared,
Some(&LimboError::InternalError(err.to_string())),
);
} else {
tracing::trace!("WAL file truncated to 0 B");
}
}
});
let c = file
.truncate(0, c)
.inspect_err(|e| Self::unlock_after_restart(&self.shared, Some(e)))?;
*truncate_sent = true;
io_yield_one!(c);
} else if !*sync_sent {
let shared = self.shared.clone();
let c = file
.sync(Completion::new_sync(move |result| {
if let Err(err) = result {
Self::unlock_after_restart(
&shared,
Some(&LimboError::InternalError(err.to_string())),
);
} else {
tracing::trace!("WAL file synced after reset/truncation");
}
}))
.inspect_err(|e| Self::unlock_after_restart(&self.shared, Some(e)))?;
*sync_sent = true;
io_yield_one!(c);
}
Ok(IOResult::Done(()))
}
// unlock shared read locks taken by RESTART/TRUNCATE checkpoint modes
fn unlock_after_restart(shared: &Arc<RwLock<WalFileShared>>, e: Option<&LimboError>) {
// release all read locks we just acquired, the caller will take care of the others
let shared = shared.write();
for idx in 1..shared.read_locks.len() {
shared.read_locks[idx].unlock();
}
if let Some(e) = e {
tracing::error!(
"Failed to restart WAL header: {:?}, releasing read locks",
e
);
}
}
fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> {
@@ -2401,7 +2395,7 @@ impl WalFileShared {
/// This function updates the shared-memory structures so that the next
/// client to write to the database (which may be this one) does so by
/// writing frames into the start of the log file.
fn restart_wal_header(&mut self, io: &Arc<dyn IO>, mode: CheckpointMode) -> Result<()> {
fn restart_wal_header(&mut self, io: &Arc<dyn IO>, mode: CheckpointMode) {
turso_assert!(
matches!(
mode,
@@ -2428,7 +2422,6 @@ impl WalFileShared {
for lock in &self.read_locks[2..] {
lock.set_value_exclusive(READMARK_NOT_USED);
}
Ok(())
}
}

View File

@@ -8,13 +8,13 @@ use crate::storage::btree::{
use crate::storage::database::DatabaseFile;
use crate::storage::page_cache::PageCache;
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader};
use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader, PageSize};
use crate::translate::collate::CollationSeq;
use crate::types::{
compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord,
SeekResult, Text,
};
use crate::util::{normalize_ident, IOExt as _};
use crate::util::normalize_ident;
use crate::vdbe::insn::InsertFlags;
use crate::vdbe::registers_to_ref_values;
use crate::vector::{vector_concat, vector_slice};
@@ -330,12 +330,35 @@ pub fn op_bit_not(
Ok(InsnFunctionStepResult::Step)
}
#[derive(Debug)]
pub enum OpCheckpointState {
StartCheckpoint,
FinishCheckpoint { result: Option<CheckpointResult> },
CompleteResult { result: Result<CheckpointResult> },
}
pub fn op_checkpoint(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
match op_checkpoint_inner(program, state, insn, pager, mv_store) {
Ok(result) => Ok(result),
Err(err) => {
state.op_checkpoint_state = OpCheckpointState::StartCheckpoint;
Err(err)
}
}
}
pub fn op_checkpoint_inner(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
Checkpoint {
@@ -352,26 +375,75 @@ pub fn op_checkpoint(
// however.
return Err(LimboError::TableLocked);
}
let result = program.connection.checkpoint(*checkpoint_mode);
match result {
Ok(CheckpointResult {
num_attempted,
num_backfilled,
..
}) => {
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
state.registers[*dest] = Register::Value(Value::Integer(0));
// 2nd col: # modified pages written to wal file
state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64));
// 3rd col: # pages moved to db after checkpoint
state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64));
}
Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)),
}
loop {
match &mut state.op_checkpoint_state {
OpCheckpointState::StartCheckpoint => {
let step_result = program
.connection
.pager
.borrow_mut()
.wal_checkpoint_start(*checkpoint_mode);
match step_result {
Ok(IOResult::Done(result)) => {
state.op_checkpoint_state = OpCheckpointState::FinishCheckpoint {
result: Some(result),
};
continue;
}
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
Err(err) => {
state.op_checkpoint_state =
OpCheckpointState::CompleteResult { result: Err(err) };
continue;
}
}
}
OpCheckpointState::FinishCheckpoint { result } => {
let step_result = program
.connection
.pager
.borrow_mut()
.wal_checkpoint_finish(result.as_mut().unwrap());
match step_result {
Ok(IOResult::Done(())) => {
state.op_checkpoint_state = OpCheckpointState::CompleteResult {
result: Ok(result.take().unwrap()),
};
continue;
}
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
Err(err) => {
state.op_checkpoint_state =
OpCheckpointState::CompleteResult { result: Err(err) };
continue;
}
}
}
OpCheckpointState::CompleteResult { result } => {
match result {
Ok(CheckpointResult {
num_attempted,
num_backfilled,
..
}) => {
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
state.registers[*dest] = Register::Value(Value::Integer(0));
// 2nd col: # modified pages written to wal file
state.registers[*dest + 1] =
Register::Value(Value::Integer(*num_attempted as i64));
// 3rd col: # pages moved to db after checkpoint
state.registers[*dest + 2] =
Register::Value(Value::Integer(*num_backfilled as i64));
}
Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)),
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
}
}
}
pub fn op_null(
@@ -2080,7 +2152,29 @@ pub fn op_halt_if_null(
}
}
#[derive(Debug, Clone, Copy)]
pub enum OpTransactionState {
Start,
CheckSchemaCookie,
}
pub fn op_transaction(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
match op_transaction_inner(program, state, insn, pager, mv_store) {
Ok(result) => Ok(result),
Err(err) => {
state.op_transaction_state = OpTransactionState::Start;
Err(err)
}
}
}
pub fn op_transaction_inner(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
@@ -2095,176 +2189,188 @@ pub fn op_transaction(
},
insn
);
let conn = program.connection.clone();
let write = matches!(tx_mode, TransactionMode::Write);
if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) {
return Err(LimboError::ReadOnly);
}
let pager = program.get_pager_from_database_index(db);
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = if conn.is_nested_stmt.get() {
(current_state, false)
} else {
match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
}
};
// 2. Start transaction if needed
if let Some(mv_store) = &mv_store {
// In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed.
// Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough
// for both.
if program.connection.mv_tx.get().is_none() {
// We allocate the first page lazily in the first transaction.
// TODO: when we fix MVCC enable schema cookie detection for reprepare statements
// let header_schema_cookie = pager
// .io
// .block(|| pager.with_header(|header| header.schema_cookie.get()))?;
// if header_schema_cookie != *schema_cookie {
// return Err(LimboError::SchemaUpdated);
// }
let tx_id = match tx_mode {
TransactionMode::None | TransactionMode::Read | TransactionMode::Concurrent => {
mv_store.begin_tx(pager.clone())?
loop {
match state.op_transaction_state {
OpTransactionState::Start => {
let conn = program.connection.clone();
let write = matches!(tx_mode, TransactionMode::Write);
if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) {
return Err(LimboError::ReadOnly);
}
TransactionMode::Write => {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None))
}
};
program.connection.mv_tx.set(Some((tx_id, *tx_mode)));
} else if updated {
// TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example
let mv_tx_mode = program.connection.mv_tx.get().unwrap().1;
let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent {
TransactionMode::Concurrent
} else {
*tx_mode
};
if matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(actual_tx_mode, TransactionMode::Write)
{
let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap();
if mv_tx_mode == TransactionMode::Read {
return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)));
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = if conn.is_nested_stmt.get() {
(current_state, false)
} else {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
}
};
// 2. Start transaction if needed
if let Some(mv_store) = &mv_store {
// In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed.
// Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough
// for both.
if program.connection.mv_tx.get().is_none() {
// We allocate the first page lazily in the first transaction.
// TODO: when we fix MVCC enable schema cookie detection for reprepare statements
// let header_schema_cookie = pager
// .io
// .block(|| pager.with_header(|header| header.schema_cookie.get()))?;
// if header_schema_cookie != *schema_cookie {
// return Err(LimboError::SchemaUpdated);
// }
let tx_id = match tx_mode {
TransactionMode::None
| TransactionMode::Read
| TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?,
TransactionMode::Write => {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None))
}
};
program.connection.mv_tx.set(Some((tx_id, *tx_mode)));
} else if updated {
// TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example
let mv_tx_mode = program.connection.mv_tx.get().unwrap().1;
let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent {
TransactionMode::Concurrent
} else {
*tx_mode
};
if matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(actual_tx_mode, TransactionMode::Write)
{
let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap();
if mv_tx_mode == TransactionMode::Read {
return_if_io!(
mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))
);
} else {
return_if_io!(
mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))
);
}
}
}
} else {
if matches!(tx_mode, TransactionMode::Concurrent) {
return Err(LimboError::TxError(
"Concurrent transaction mode is only supported when MVCC is enabled"
.to_string(),
));
}
if updated && matches!(current_state, TransactionState::None) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new read transaction"
);
pager.begin_read_tx()?;
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new write transaction"
);
let begin_w_tx_res = pager.begin_write_tx();
if let Err(LimboError::Busy) = begin_w_tx_res {
// We failed to upgrade to write transaction so put the transaction into its original state.
// That is, if the transaction had not started, end the read transaction so that next time we
// start a new one.
if matches!(current_state, TransactionState::None) {
pager.end_read_tx()?;
conn.transaction_state.replace(TransactionState::None);
}
assert_eq!(conn.transaction_state.get(), current_state);
return Err(LimboError::Busy);
}
if let IOResult::IO(io) = begin_w_tx_res? {
// set the transaction state to pending so we don't have to
// end the read transaction.
program
.connection
.transaction_state
.replace(TransactionState::PendingUpgrade);
return Ok(InsnFunctionStepResult::IO(io));
}
}
}
}
}
} else {
if matches!(tx_mode, TransactionMode::Concurrent) {
return Err(LimboError::TxError(
"Concurrent transaction mode is only supported when MVCC is enabled".to_string(),
));
}
if updated && matches!(current_state, TransactionState::None) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new read transaction"
);
pager.begin_read_tx()?;
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new write transaction"
);
let begin_w_tx_res = pager.begin_write_tx();
if let Err(LimboError::Busy) = begin_w_tx_res {
// We failed to upgrade to write transaction so put the transaction into its original state.
// That is, if the transaction had not started, end the read transaction so that next time we
// start a new one.
if matches!(current_state, TransactionState::None) {
pager.end_read_tx()?;
conn.transaction_state.replace(TransactionState::None);
// 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
}
assert_eq!(conn.transaction_state.get(), current_state);
return Err(LimboError::Busy);
state.op_transaction_state = OpTransactionState::CheckSchemaCookie;
continue;
}
if let IOResult::IO(io) = begin_w_tx_res? {
// set the transaction state to pending so we don't have to
// end the read transaction.
program
.connection
.transaction_state
.replace(TransactionState::PendingUpgrade);
return Ok(InsnFunctionStepResult::IO(io));
// 4. Check whether schema has changed if we are actually going to access the database.
// Can only read header if page 1 has been allocated already
// begin_write_tx that happens, but not begin_read_tx
OpTransactionState::CheckSchemaCookie => {
let res = with_header(&pager, mv_store, program, |header| {
header.schema_cookie.get()
});
match res {
Ok(IOResult::Done(header_schema_cookie)) => {
if header_schema_cookie != *schema_cookie {
tracing::debug!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
// This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution
Err(LimboError::Page1NotAlloc) => {}
Err(err) => {
return Err(err);
}
}
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
}
}
// 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
}
// 4. Check whether schema has changed if we are actually going to access the database.
// Can only read header if page 1 has been allocated already
// begin_write_tx that happens, but not begin_read_tx
// TODO: this is a hack to make the pager run the IO loop
let res = pager.io.block(|| {
with_header(&pager, mv_store, program, |header| {
header.schema_cookie.get()
})
});
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::debug!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}
// This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution
Err(LimboError::Page1NotAlloc) => {}
Err(err) => {
return Err(err);
}
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_auto_commit(
@@ -3891,14 +3997,17 @@ pub fn op_sorter_open(
},
insn
);
let cache_size = program.connection.get_cache_size();
// Set the buffer size threshold to be roughly the same as the limit configured for the page-cache.
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get() as usize;
// be careful here - we must not use any async operations after pager.with_header because this op-code has no proper state-machine
let page_size = match pager.with_header(|header| header.page_size) {
Ok(IOResult::Done(page_size)) => page_size,
Err(_) => PageSize::default(),
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
};
let page_size = page_size.get() as usize;
let cache_size = program.connection.get_cache_size();
// Set the buffer size threshold to be roughly the same as the limit configured for the page-cache.
let max_buffer_size_bytes = if cache_size < 0 {
(cache_size.abs() * 1024) as usize
} else {
@@ -7076,6 +7185,8 @@ pub fn op_open_ephemeral(
match &mut state.op_open_ephemeral_state {
OpOpenEphemeralState::Start => {
tracing::trace!("Start");
let page_size =
return_if_io!(with_header(pager, mv_store, program, |header| header.page_size));
let conn = program.connection.clone();
let io = conn.pager.borrow().io.clone();
let rand_num = io.generate_random_number();
@@ -7108,11 +7219,6 @@ pub fn op_open_ephemeral(
db_file_io = io;
}
let page_size = pager
.io
.block(|| with_header(pager, mv_store, program, |header| header.page_size))?
.get();
let buffer_pool = program.connection._db.buffer_pool.clone();
let page_cache = Arc::new(RwLock::new(PageCache::default()));
@@ -7126,11 +7232,6 @@ pub fn op_open_ephemeral(
Arc::new(Mutex::new(())),
)?);
let page_size = pager
.io
.block(|| with_header(&pager, mv_store, program, |header| header.page_size))
.unwrap_or_default();
pager.page_size.set(Some(page_size));
state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager };

View File

@@ -35,8 +35,9 @@ use crate::{
types::{IOCompletions, IOResult, RawSlice, TextRef},
vdbe::{
execute::{
OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState,
OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState,
OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState,
OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState,
OpSeekState, OpTransactionState,
},
metrics::StatementMetrics,
},
@@ -290,6 +291,8 @@ pub struct ProgramState {
current_collation: Option<CollationSeq>,
op_column_state: OpColumnState,
op_row_id_state: OpRowIdState,
op_transaction_state: OpTransactionState,
op_checkpoint_state: OpCheckpointState,
/// State machine for committing view deltas with I/O handling
view_delta_state: ViewDeltaCommitState,
}
@@ -333,6 +336,8 @@ impl ProgramState {
current_collation: None,
op_column_state: OpColumnState::Start,
op_row_id_state: OpRowIdState::Start,
op_transaction_state: OpTransactionState::Start,
op_checkpoint_state: OpCheckpointState::StartCheckpoint,
view_delta_state: ViewDeltaCommitState::NotStarted,
}
}

View File

@@ -16,10 +16,16 @@ fn test_schema_reprepare() {
let mut stmt = conn2.prepare("SELECT y, z FROM t").unwrap();
let mut stmt2 = conn2.prepare("SELECT x, z FROM t").unwrap();
conn1.execute("ALTER TABLE t DROP COLUMN x").unwrap();
assert_eq!(
stmt2.step().unwrap_err().to_string(),
"Parse error: no such column: x"
);
loop {
match stmt2.step() {
Ok(StepResult::IO) => tmp_db.io.step().unwrap(),
Err(err) => {
assert_eq!(err.to_string(), "Parse error: no such column: x");
break;
}
r => panic!("unexpected response: {r:?}"),
}
}
let mut rows = Vec::new();
loop {