mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-31 13:54:27 +01:00
more instrumentation + write counter should decrement if pwrite fails
This commit is contained in:
@@ -18,7 +18,7 @@ use std::{
|
||||
io::{ErrorKind, Read, Seek, Write},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
use tracing::{debug, instrument, trace, Level};
|
||||
|
||||
struct OwnedCallbacks(UnsafeCell<Callbacks>);
|
||||
// We assume we locking on IO level is done by user.
|
||||
@@ -219,6 +219,7 @@ impl IO for UnixIO {
|
||||
Ok(unix_file)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn run_once(&self) -> Result<()> {
|
||||
if self.callbacks.is_empty() {
|
||||
return Ok(());
|
||||
@@ -333,6 +334,7 @@ impl File for UnixFile<'_> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
|
||||
let file = self.file.borrow();
|
||||
let result = {
|
||||
@@ -366,6 +368,7 @@ impl File for UnixFile<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pwrite(
|
||||
&self,
|
||||
pos: usize,
|
||||
@@ -401,6 +404,7 @@ impl File for UnixFile<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
|
||||
let file = self.file.borrow();
|
||||
let result = fs::fsync(file.as_fd());
|
||||
|
||||
@@ -464,7 +464,7 @@ pub struct Connection {
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
|
||||
if sql.as_ref().is_empty() {
|
||||
return Err(LimboError::InvalidArgument(
|
||||
@@ -505,7 +505,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
|
||||
let sql = sql.as_ref();
|
||||
tracing::trace!("Querying: {}", sql);
|
||||
@@ -521,7 +521,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub(crate) fn run_cmd(
|
||||
self: &Arc<Connection>,
|
||||
cmd: Cmd,
|
||||
@@ -574,7 +574,7 @@ impl Connection {
|
||||
|
||||
/// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
|
||||
/// TODO: make this api async
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn execute(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<()> {
|
||||
let sql = sql.as_ref();
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
|
||||
@@ -592,6 +592,7 @@ impl BTreeCursor {
|
||||
|
||||
/// Check if the table is empty.
|
||||
/// This is done by checking if the root page has no cells.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn is_empty_table(&self) -> Result<CursorResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
@@ -606,7 +607,7 @@ impl BTreeCursor {
|
||||
|
||||
/// Move the cursor to the previous record and return it.
|
||||
/// Used in backwards iteration.
|
||||
#[instrument(err,skip(self), level = Level::TRACE, name = "prev")]
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "prev")]
|
||||
fn get_prev_record(&mut self) -> Result<CursorResult<bool>> {
|
||||
loop {
|
||||
let page = self.stack.top();
|
||||
@@ -717,7 +718,7 @@ impl BTreeCursor {
|
||||
|
||||
/// Reads the record of a cell that has overflow pages. This is a state machine that requires to be called until completion so everything
|
||||
/// that calls this function should be reentrant.
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn process_overflow_read(
|
||||
&self,
|
||||
payload: &'static [u8],
|
||||
@@ -835,6 +836,7 @@ impl BTreeCursor {
|
||||
///
|
||||
/// If the cell has overflow pages, it will skip till the overflow page which
|
||||
/// is at the offset given.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn read_write_payload_with_offset(
|
||||
&mut self,
|
||||
mut offset: u32,
|
||||
@@ -945,6 +947,7 @@ impl BTreeCursor {
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn continue_payload_overflow_with_offset(
|
||||
&mut self,
|
||||
buffer: &mut Vec<u8>,
|
||||
@@ -1121,7 +1124,7 @@ impl BTreeCursor {
|
||||
|
||||
/// Move the cursor to the next record and return it.
|
||||
/// Used in forwards iteration, which is the default.
|
||||
#[instrument(err,skip(self), level = Level::TRACE, name = "next")]
|
||||
#[instrument(skip(self), level = Level::TRACE, name = "next")]
|
||||
fn get_next_record(&mut self) -> Result<CursorResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.borrow_mut();
|
||||
@@ -1261,7 +1264,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Move the cursor to the root page of the btree.
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn move_to_root(&mut self) -> Result<()> {
|
||||
self.seek_state = CursorSeekState::Start;
|
||||
self.going_upwards = false;
|
||||
@@ -1273,7 +1276,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Move the cursor to the rightmost record in the btree.
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
fn move_to_rightmost(&mut self) -> Result<CursorResult<bool>> {
|
||||
self.move_to_root()?;
|
||||
|
||||
@@ -1308,7 +1311,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Specialized version of move_to() for table btrees.
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<()>> {
|
||||
'outer: loop {
|
||||
let page = self.stack.top();
|
||||
@@ -1426,7 +1429,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Specialized version of move_to() for index btrees.
|
||||
#[instrument(err,skip(self, index_key), level = Level::TRACE)]
|
||||
#[instrument(skip(self, index_key), level = Level::TRACE)]
|
||||
fn indexbtree_move_to(
|
||||
&mut self,
|
||||
index_key: &ImmutableRecord,
|
||||
@@ -1642,7 +1645,7 @@ impl BTreeCursor {
|
||||
|
||||
/// Specialized version of do_seek() for table btrees that uses binary search instead
|
||||
/// of iterating cells in order.
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<bool>> {
|
||||
turso_assert!(
|
||||
self.mv_cursor.is_none(),
|
||||
@@ -1762,7 +1765,7 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn indexbtree_seek(
|
||||
&mut self,
|
||||
key: &ImmutableRecord,
|
||||
@@ -2033,7 +2036,7 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
|
||||
turso_assert!(
|
||||
self.mv_cursor.is_none(),
|
||||
@@ -2086,7 +2089,7 @@ impl BTreeCursor {
|
||||
|
||||
/// Insert a record into the btree.
|
||||
/// If the insert operation overflows the page, it will be split and the btree will be balanced.
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn insert_into_page(&mut self, bkey: &BTreeKey) -> Result<CursorResult<()>> {
|
||||
let record = bkey
|
||||
.get_record()
|
||||
@@ -2267,7 +2270,7 @@ impl BTreeCursor {
|
||||
/// This is a naive algorithm that doesn't try to distribute cells evenly by content.
|
||||
/// It will try to split the page in half by keys not by content.
|
||||
/// Sqlite tries to have a page at least 40% full.
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
fn balance(&mut self) -> Result<CursorResult<()>> {
|
||||
turso_assert!(
|
||||
matches!(self.state, CursorState::Write(_)),
|
||||
@@ -2329,6 +2332,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Balance a non root page by trying to balance cells between a maximum of 3 siblings that should be neighboring the page that overflowed/underflowed.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn balance_non_root(&mut self) -> Result<CursorResult<()>> {
|
||||
turso_assert!(
|
||||
matches!(self.state, CursorState::Write(_)),
|
||||
@@ -3863,6 +3867,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Find the index of the cell in the page that contains the given rowid.
|
||||
#[instrument( skip_all, level = Level::TRACE)]
|
||||
fn find_cell(&mut self, page: &PageContent, key: &BTreeKey) -> Result<CursorResult<usize>> {
|
||||
if self.find_cell_state.0.is_none() {
|
||||
self.find_cell_state.set(0);
|
||||
@@ -3937,6 +3942,7 @@ impl BTreeCursor {
|
||||
Ok(CursorResult::Ok(cell_idx))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn seek_end(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none()); // unsure about this -_-
|
||||
self.move_to_root()?;
|
||||
@@ -3965,6 +3971,7 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
|
||||
let has_record = return_if_io!(self.move_to_rightmost());
|
||||
self.invalidate_record();
|
||||
@@ -3985,6 +3992,7 @@ impl BTreeCursor {
|
||||
self.root_page
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
|
||||
if self.mv_cursor.is_some() {
|
||||
let cursor_has_record = return_if_io!(self.get_next_record());
|
||||
@@ -4000,6 +4008,7 @@ impl BTreeCursor {
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn last(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let cursor_has_record = return_if_io!(self.move_to_rightmost());
|
||||
@@ -4008,6 +4017,7 @@ impl BTreeCursor {
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn next(&mut self) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.restore_context());
|
||||
let cursor_has_record = return_if_io!(self.get_next_record());
|
||||
@@ -4023,6 +4033,7 @@ impl BTreeCursor {
|
||||
.invalidate();
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn prev(&mut self) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
return_if_io!(self.restore_context());
|
||||
@@ -4032,7 +4043,7 @@ impl BTreeCursor {
|
||||
Ok(CursorResult::Ok(cursor_has_record))
|
||||
}
|
||||
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn rowid(&mut self) -> Result<CursorResult<Option<i64>>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
@@ -4074,7 +4085,7 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
// Empty trace to capture the span information
|
||||
@@ -4095,7 +4106,7 @@ impl BTreeCursor {
|
||||
/// Return a reference to the record the cursor is currently pointing to.
|
||||
/// If record was not parsed yet, then we have to parse it and in case of I/O we yield control
|
||||
/// back.
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn record(&self) -> Result<CursorResult<Option<Ref<ImmutableRecord>>>> {
|
||||
if !self.has_record.get() {
|
||||
return Ok(CursorResult::Ok(None));
|
||||
@@ -4163,7 +4174,7 @@ impl BTreeCursor {
|
||||
Ok(CursorResult::Ok(Some(record_ref)))
|
||||
}
|
||||
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
key: &BTreeKey,
|
||||
@@ -4233,7 +4244,7 @@ impl BTreeCursor {
|
||||
/// 7. WaitForBalancingToComplete -> perform balancing
|
||||
/// 8. SeekAfterBalancing -> adjust the cursor to a node that is closer to the deleted value. go to Finish
|
||||
/// 9. Finish -> Delete operation is done. Return CursorResult(Ok())
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn delete(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
|
||||
@@ -4610,6 +4621,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE { eq_only: true }));
|
||||
|
||||
@@ -4639,6 +4651,7 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn exists(&mut self, key: &Value) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let int_key = match key {
|
||||
@@ -4655,6 +4668,7 @@ impl BTreeCursor {
|
||||
/// Clear the overflow pages linked to a specific page provided by the leaf cell
|
||||
/// Uses a state machine to keep track of it's operations so that traversal can be
|
||||
/// resumed from last point after IO interruption
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result<CursorResult<()>> {
|
||||
loop {
|
||||
let state = self.overflow_state.take().unwrap_or(OverflowState::Start);
|
||||
@@ -4723,7 +4737,7 @@ impl BTreeCursor {
|
||||
/// ```
|
||||
///
|
||||
/// The destruction order would be: [4',4,5,2,6,7,3,1]
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn btree_destroy(&mut self) -> Result<CursorResult<Option<usize>>> {
|
||||
if let CursorState::None = &self.state {
|
||||
self.move_to_root()?;
|
||||
@@ -4999,7 +5013,7 @@ impl BTreeCursor {
|
||||
/// Count the number of entries in the b-tree
|
||||
///
|
||||
/// Only supposed to be used in the context of a simple Count Select Statement
|
||||
#[instrument(err,skip(self), level = Level::TRACE)]
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn count(&mut self) -> Result<CursorResult<usize>> {
|
||||
if self.count == 0 {
|
||||
self.move_to_root()?;
|
||||
@@ -5108,6 +5122,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// If context is defined, restore it and set it None on success
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn restore_context(&mut self) -> Result<CursorResult<()>> {
|
||||
if self.context.is_none() || !matches!(self.valid_state, CursorValidState::RequireSeek) {
|
||||
return Ok(CursorResult::Ok(()));
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::error::LimboError;
|
||||
use crate::io::CompletionType;
|
||||
use crate::{io::Completion, Buffer, Result};
|
||||
use std::{cell::RefCell, sync::Arc};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
/// DatabaseStorage is an interface a database file that consists of pages.
|
||||
///
|
||||
@@ -32,6 +33,7 @@ unsafe impl Sync for DatabaseFile {}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
impl DatabaseStorage for DatabaseFile {
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
|
||||
let r = c.as_read();
|
||||
let size = r.buf().len();
|
||||
@@ -44,6 +46,7 @@ impl DatabaseStorage for DatabaseFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
@@ -60,11 +63,13 @@ impl DatabaseStorage for DatabaseFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn sync(&self, c: Completion) -> Result<()> {
|
||||
let _ = self.file.sync(c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn size(&self) -> Result<u64> {
|
||||
self.file.size()
|
||||
}
|
||||
@@ -85,6 +90,7 @@ unsafe impl Send for FileMemoryStorage {}
|
||||
unsafe impl Sync for FileMemoryStorage {}
|
||||
|
||||
impl DatabaseStorage for FileMemoryStorage {
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
|
||||
let r = match c.completion_type {
|
||||
CompletionType::Read(ref r) => r,
|
||||
@@ -100,6 +106,7 @@ impl DatabaseStorage for FileMemoryStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
@@ -115,11 +122,13 @@ impl DatabaseStorage for FileMemoryStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn sync(&self, c: Completion) -> Result<()> {
|
||||
let _ = self.file.sync(c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn size(&self) -> Result<u64> {
|
||||
self.file.size()
|
||||
}
|
||||
|
||||
@@ -471,6 +471,7 @@ impl Pager {
|
||||
|
||||
/// This method is used to allocate a new root page for a btree, both for tables and indexes
|
||||
/// FIXME: handle no room in page cache
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<CursorResult<u32>> {
|
||||
let page_type = match flags {
|
||||
_ if flags.is_table() => PageType::TableLeaf,
|
||||
@@ -589,6 +590,7 @@ impl Pager {
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn begin_read_tx(&self) -> Result<CursorResult<LimboResult>> {
|
||||
// We allocate the first page lazily in the first transaction
|
||||
match self.maybe_allocate_page1()? {
|
||||
@@ -598,6 +600,7 @@ impl Pager {
|
||||
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_read_tx()?))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn maybe_allocate_page1(&self) -> Result<CursorResult<()>> {
|
||||
if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
if let Ok(_lock) = self.init_lock.try_lock() {
|
||||
@@ -621,6 +624,7 @@ impl Pager {
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn begin_write_tx(&self) -> Result<CursorResult<LimboResult>> {
|
||||
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
|
||||
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
|
||||
@@ -631,6 +635,7 @@ impl Pager {
|
||||
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_write_tx()?))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn end_tx(
|
||||
&self,
|
||||
rollback: bool,
|
||||
@@ -666,6 +671,7 @@ impl Pager {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn end_read_tx(&self) -> Result<()> {
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
Ok(())
|
||||
@@ -759,11 +765,12 @@ impl Pager {
|
||||
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
|
||||
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
|
||||
/// the database file and then fsync the database file.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result<PagerCacheflushStatus> {
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
loop {
|
||||
let state = self.flush_info.borrow().state;
|
||||
trace!("cacheflush {:?}", state);
|
||||
trace!(?state);
|
||||
match state {
|
||||
FlushState::Start => {
|
||||
let db_size = header_accessor::get_database_size(self)?;
|
||||
@@ -841,6 +848,7 @@ impl Pager {
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn wal_get_frame(
|
||||
&self,
|
||||
frame_no: u32,
|
||||
@@ -856,6 +864,7 @@ impl Pager {
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
loop {
|
||||
@@ -932,6 +941,7 @@ impl Pager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn wal_checkpoint(&self, wal_checkpoint_disabled: bool) -> Result<CheckpointResult> {
|
||||
if wal_checkpoint_disabled {
|
||||
return Ok(CheckpointResult {
|
||||
@@ -965,6 +975,7 @@ impl Pager {
|
||||
|
||||
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
|
||||
// This is implemented in accordance with sqlite freepage2() function.
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
|
||||
tracing::trace!("free_page(page_id={})", page_id);
|
||||
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
|
||||
@@ -1036,6 +1047,7 @@ impl Pager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn allocate_page1(&self) -> Result<CursorResult<PageRef>> {
|
||||
let state = self.allocate_page1_state.borrow().clone();
|
||||
match state {
|
||||
@@ -1111,6 +1123,7 @@ impl Pager {
|
||||
*/
|
||||
// FIXME: handle no room in page cache
|
||||
#[allow(clippy::readonly_write_lock)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn allocate_page(&self) -> Result<PageRef> {
|
||||
let old_db_size = header_accessor::get_database_size(self)?;
|
||||
#[allow(unused_mut)]
|
||||
|
||||
@@ -727,6 +727,7 @@ impl PageContent {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn begin_read_page(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
@@ -773,6 +774,7 @@ pub fn finish_read_page(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn begin_write_btree_page(
|
||||
pager: &Pager,
|
||||
page: &PageRef,
|
||||
@@ -791,13 +793,14 @@ pub fn begin_write_btree_page(
|
||||
};
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
let clone_counter = write_counter.clone();
|
||||
let write_complete = {
|
||||
let buf_copy = buffer.clone();
|
||||
Box::new(move |bytes_written: i32| {
|
||||
tracing::trace!("finish_write_btree_page");
|
||||
let buf_copy = buf_copy.clone();
|
||||
let buf_len = buf_copy.borrow().len();
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
*clone_counter.borrow_mut() -= 1;
|
||||
|
||||
page_finish.clear_dirty();
|
||||
if bytes_written < buf_len as i32 {
|
||||
@@ -806,10 +809,15 @@ pub fn begin_write_btree_page(
|
||||
})
|
||||
};
|
||||
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
|
||||
page_source.write_page(page_id, buffer.clone(), c)?;
|
||||
Ok(())
|
||||
let res = page_source.write_page(page_id, buffer.clone(), c);
|
||||
if res.is_err() {
|
||||
// Avoid infinite loop if write page fails
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
|
||||
assert!(!*syncing.borrow());
|
||||
*syncing.borrow_mut() = true;
|
||||
|
||||
@@ -705,7 +705,7 @@ impl Wal for WalFile {
|
||||
frame_id >= self.checkpoint_threshold
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
@@ -869,7 +869,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::DEBUG)]
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn sync(&mut self) -> Result<WalFsyncStatus> {
|
||||
match self.sync_state.get() {
|
||||
SyncState::NotSyncing => {
|
||||
|
||||
@@ -11,7 +11,7 @@ use turso_sqlite3_parser::ast::{CompoundOperator, SortOrder};
|
||||
|
||||
use tracing::Level;
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
|
||||
@@ -198,7 +198,7 @@ pub enum TransactionMode {
|
||||
|
||||
/// Main entry point for emitting bytecode for a SQL query
|
||||
/// Takes a query plan and generates the corresponding bytecode program
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn emit_program(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
@@ -216,7 +216,7 @@ pub fn emit_program(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_select(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: SelectPlan,
|
||||
@@ -395,7 +395,7 @@ pub fn emit_query<'a>(
|
||||
Ok(t_ctx.reg_result_cols_start.unwrap())
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_delete(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: DeletePlan,
|
||||
@@ -580,7 +580,7 @@ fn emit_delete_insns(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_update(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: UpdatePlan,
|
||||
@@ -699,7 +699,7 @@ fn emit_program_for_update(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_update_insns(
|
||||
plan: &UpdatePlan,
|
||||
t_ctx: &TranslateCtx,
|
||||
|
||||
@@ -131,7 +131,7 @@ macro_rules! expect_arguments_even {
|
||||
}};
|
||||
}
|
||||
|
||||
#[instrument(err,skip(program, referenced_tables, expr, resolver), level = Level::TRACE)]
|
||||
#[instrument(skip(program, referenced_tables, expr, resolver), level = Level::TRACE)]
|
||||
pub fn translate_condition_expr(
|
||||
program: &mut ProgramBuilder,
|
||||
referenced_tables: &TableReferences,
|
||||
|
||||
@@ -53,7 +53,7 @@ use transaction::{translate_tx_begin, translate_tx_commit};
|
||||
use turso_sqlite3_parser::ast::{self, Delete, Insert};
|
||||
use update::translate_update;
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn translate(
|
||||
schema: &Schema,
|
||||
|
||||
@@ -368,6 +368,7 @@ pub struct Program {
|
||||
}
|
||||
|
||||
impl Program {
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn step(
|
||||
&self,
|
||||
state: &mut ProgramState,
|
||||
@@ -398,7 +399,7 @@ impl Program {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip_all, level = Level::TRACE)]
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn commit_txn(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
@@ -464,7 +465,7 @@ impl Program {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(err,skip(self, pager, connection), level = Level::TRACE)]
|
||||
#[instrument(skip(self, pager, connection), level = Level::TRACE)]
|
||||
fn step_end_write_txn(
|
||||
&self,
|
||||
pager: &Rc<Pager>,
|
||||
|
||||
Reference in New Issue
Block a user