Merge 'core: Wrap MvCursor in Arc<RwLock<>>' from Pekka Enberg

Make it Send and Sync.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3174
This commit is contained in:
Pekka Enberg
2025-09-17 13:34:34 +03:00
committed by GitHub
3 changed files with 23 additions and 25 deletions

View File

@@ -1,4 +1,5 @@
use crate::incremental::view::IncrementalView;
use parking_lot::RwLock;
/// Simple view structure for non-materialized views
#[derive(Debug, Clone)]
@@ -24,10 +25,8 @@ use crate::{
};
use crate::{util::normalize_ident, Result};
use core::fmt;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::trace;
@@ -293,7 +292,7 @@ impl Schema {
/// Update [Schema] by scanning the first root page (sqlite_schema)
pub fn make_from_btree(
&mut self,
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
syms: &SymbolTable,
) -> Result<()> {

View File

@@ -38,6 +38,7 @@ use super::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE,
},
};
use parking_lot::RwLock;
use std::{
cell::{Cell, Ref, RefCell},
cmp::{Ordering, Reverse},
@@ -45,7 +46,6 @@ use std::{
fmt::Debug,
ops::DerefMut,
pin::Pin,
rc::Rc,
sync::Arc,
};
@@ -479,7 +479,7 @@ pub enum CursorSeekState {
pub struct BTreeCursor {
/// The multi-version cursor that is used to read and write to the database file.
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
/// The pager that is used to read and write to the database file.
pub pager: Arc<Pager>,
/// Cached value of the usable space of a BTree page, since it is very expensive to call in a hot loop via pager.usable_space().
@@ -583,7 +583,7 @@ impl BTreeNodeState {
impl BTreeCursor {
pub fn new(
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
root_page: usize,
num_columns: usize,
@@ -634,7 +634,7 @@ impl BTreeCursor {
}
pub fn new_table(
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
root_page: usize,
num_columns: usize,
@@ -643,7 +643,7 @@ impl BTreeCursor {
}
pub fn new_index(
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
root_page: usize,
index: &Index,
@@ -690,7 +690,7 @@ impl BTreeCursor {
match state {
EmptyTableState::Start => {
if let Some(mv_cursor) = &self.mv_cursor {
let mv_cursor = mv_cursor.borrow();
let mv_cursor = mv_cursor.read();
return Ok(IOResult::Done(mv_cursor.is_empty()));
}
let (page, c) = self.pager.read_page(self.root_page)?;
@@ -1219,7 +1219,7 @@ impl BTreeCursor {
#[instrument(skip(self), level = Level::DEBUG, name = "next")]
pub fn get_next_record(&mut self) -> Result<IOResult<bool>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let mut mv_cursor = mv_cursor.write();
mv_cursor.forward();
let rowid = mv_cursor.current_row_id();
match rowid {
@@ -4387,7 +4387,7 @@ impl BTreeCursor {
RewindState::Start => {
self.rewind_state = RewindState::NextRecord;
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let mut mv_cursor = mv_cursor.write();
mv_cursor.rewind();
} else {
let c = self.move_to_root()?;
@@ -4484,7 +4484,7 @@ impl BTreeCursor {
#[instrument(skip(self), level = Level::DEBUG)]
pub fn rowid(&self) -> Result<IOResult<Option<i64>>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let mut mv_cursor = mv_cursor.write();
let Some(rowid) = mv_cursor.current_row_id() else {
return Ok(IOResult::Done(None));
};
@@ -4513,7 +4513,7 @@ impl BTreeCursor {
#[instrument(skip(self, key), level = Level::DEBUG)]
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<IOResult<SeekResult>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let mut mv_cursor = mv_cursor.write();
return mv_cursor.seek(key, op);
}
self.skip_advance.set(false);
@@ -4551,7 +4551,7 @@ impl BTreeCursor {
return Ok(IOResult::Done(Some(record_ref)));
}
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let mut mv_cursor = mv_cursor.write();
let Some(row) = mv_cursor.current_row()? else {
return Ok(IOResult::Done(None));
};
@@ -4628,7 +4628,7 @@ impl BTreeCursor {
}
};
let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns);
mv_cursor.borrow_mut().insert(row)?;
mv_cursor.write().insert(row)?;
}
None => todo!("Support mvcc inserts with index btrees"),
},
@@ -4657,8 +4657,8 @@ impl BTreeCursor {
#[instrument(skip(self), level = Level::DEBUG)]
pub fn delete(&mut self) -> Result<IOResult<()>> {
if let Some(mv_cursor) = &self.mv_cursor {
let rowid = mv_cursor.borrow_mut().current_row_id().unwrap();
mv_cursor.borrow_mut().delete(rowid)?;
let rowid = mv_cursor.write().current_row_id().unwrap();
mv_cursor.write().delete(rowid)?;
return Ok(IOResult::Done(()));
}
@@ -5647,7 +5647,7 @@ impl BTreeCursor {
.do_allocate_page(page_type, offset, BtreePageAllocMode::Any)
}
pub fn get_mvcc_cursor(&self) -> Rc<RefCell<MvCursor>> {
pub fn get_mvcc_cursor(&self) -> Arc<RwLock<MvCursor>> {
self.mv_cursor.as_ref().unwrap().clone()
}
}

View File

@@ -37,7 +37,6 @@ use std::env::temp_dir;
use std::ops::DerefMut;
use std::{
borrow::BorrowMut,
rc::Rc,
sync::{Arc, Mutex},
};
use turso_macros::match_ignore_ascii_case;
@@ -82,7 +81,7 @@ use super::{
sorter::Sorter,
};
use regex::{Regex, RegexBuilder};
use std::{cell::RefCell, collections::HashMap};
use std::collections::HashMap;
#[cfg(feature = "json")]
use crate::{
@@ -944,7 +943,7 @@ pub fn op_open_read(
Some((tx_id, _)) => {
let table_id = *root_page as u64;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store, tx_id, table_id, pager.clone()).unwrap(),
));
Some(mv_cursor)
@@ -6007,7 +6006,7 @@ pub fn op_new_rowid(
let cursor = state.get_cursor(*cursor);
let cursor = cursor.as_btree_mut();
let mvcc_cursor = cursor.get_mvcc_cursor();
let mut mvcc_cursor = RefCell::borrow_mut(&mvcc_cursor);
let mut mvcc_cursor = mvcc_cursor.write();
mvcc_cursor.get_next_rowid()
};
state.registers[*rowid_reg] = Register::Value(Value::Integer(rowid));
@@ -6387,7 +6386,7 @@ pub fn op_open_write(
Some((tx_id, _)) => {
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
));
Some(mv_cursor)
@@ -7160,7 +7159,7 @@ pub fn op_open_ephemeral(
Some((tx_id, _)) => {
let table_id = root_page as u64;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
));
Some(mv_cursor)
@@ -7266,7 +7265,7 @@ pub fn op_open_dup(
Some((tx_id, _)) => {
let table_id = root_page as u64;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(MvCursor::new(
let mv_cursor = Arc::new(RwLock::new(MvCursor::new(
mv_store,
tx_id,
table_id,