Merge 'Integrate MVCC' from Pekka Enberg

This pull request integrates MVCC with the VDBE.

The long term plan is to implement SQLite `BEGIN CONCURRENT` by
introducing a "MV store" abstraction (that implements the Hekaton in-
memory MVCC index) above the pager. Traditional SQLite transactions use
the pager and the WAL, but MV store has its own transaction path that
updates the in-memory multi-versioned index. If a key does not exist in
the MVCC index, we read records from the pager. When a MVCC transaction
commits, we emit WAL entries.

In this pull request, we wire up the MVCC transaction machinery to VDBE
and multi-version cursor to the b-tree cursor. Currently, the database
either runs in normal b-tree mode or in in-memory MVCC, but we need to
explore if we can make it a hybrid solution where you can read from both
MVCC index and B-Tree. Note that this pull request also does not add
logical logging to a file, which is something we'll defer for later.

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #917
This commit is contained in:
Pekka Enberg
2025-03-06 13:00:35 +02:00
19 changed files with 306 additions and 117 deletions

View File

@@ -25,7 +25,7 @@ pub unsafe extern "C" fn db_open(path: *const c_char) -> *mut c_void {
p if p.contains(":memory:") => Arc::new(limbo_core::MemoryIO::new()),
_ => Arc::new(limbo_core::PlatformIO::new().expect("Failed to create IO")),
};
let db = Database::open_file(io.clone(), path);
let db = Database::open_file(io.clone(), path, false);
match db {
Ok(db) => {
let conn = db.connect().unwrap();

View File

@@ -67,7 +67,7 @@ pub extern "system" fn Java_tech_turso_core_LimboDB_openUtf8<'local>(
}
};
let db = match Database::open_file(io.clone(), &path) {
let db = match Database::open_file(io.clone(), &path, false) {
Ok(db) => db,
Err(e) => {
set_err_msg_and_throw_exception(&mut env, obj, LIMBO_ETC, e.to_string());

View File

@@ -277,7 +277,7 @@ pub fn connect(path: &str) -> Result<Connection> {
io: Arc<dyn limbo_core::IO>,
path: &str,
) -> std::result::Result<Arc<limbo_core::Database>, PyErr> {
limbo_core::Database::open_file(io, path).map_err(|e| {
limbo_core::Database::open_file(io, path, false).map_err(|e| {
PyErr::new::<DatabaseError, _>(format!("Failed to open database: {:?}", e))
})
}

View File

@@ -42,12 +42,12 @@ impl Builder {
match self.path.as_str() {
":memory:" => {
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
let db = limbo_core::Database::open_file(io, self.path.as_str())?;
let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?;
Ok(Database { inner: db })
}
path => {
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
let db = limbo_core::Database::open_file(io, path)?;
let db = limbo_core::Database::open_file(io, path, false)?;
Ok(Database { inner: db })
}
}

View File

@@ -32,7 +32,7 @@ impl Database {
let wal_path = format!("{}-wal", path);
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size).unwrap();
let db = limbo_core::Database::open(io, page_io, wal_shared).unwrap();
let db = limbo_core::Database::open(io, page_io, wal_shared, false).unwrap();
let conn = db.connect().unwrap();
Database { db, conn }
}

View File

@@ -51,6 +51,8 @@ pub struct Opts {
\t'io-uring' when built for Linux with feature 'io_uring'\n"
)]
pub io: Io,
#[clap(long, help = "Enable experimental MVCC feature")]
pub experimental_mvcc: bool,
}
#[derive(Debug, Clone)]
@@ -210,7 +212,7 @@ impl<'a> Limbo<'a> {
_path => get_io(DbLocation::Path, opts.io)?,
}
};
let db = Database::open_file(io.clone(), &db_file)?;
let db = Database::open_file(io.clone(), &db_file, opts.experimental_mvcc)?;
let conn = db.connect().unwrap();
let h = LimboHelper::new(conn.clone(), io.clone());
rl.set_helper(Some(h));
@@ -412,7 +414,7 @@ impl<'a> Limbo<'a> {
}
};
self.io = Arc::clone(&io);
let db = Database::open_file(self.io.clone(), path)?;
let db = Database::open_file(self.io.clone(), path, self.opts.experimental_mvcc)?;
self.conn = db.connect().unwrap();
self.opts.db_file = path.to_string();
Ok(())

View File

@@ -65,6 +65,7 @@ pub struct Settings {
pub echo: bool,
pub is_stdout: bool,
pub io: Io,
pub experimental_mvcc: bool,
}
impl From<&Opts> for Settings {
@@ -80,6 +81,7 @@ impl From<&Opts> for Settings {
.as_ref()
.map_or(":memory:".to_string(), |p| p.to_string_lossy().to_string()),
io: opts.io,
experimental_mvcc: opts.experimental_mvcc,
}
}
}

View File

@@ -18,7 +18,7 @@ fn bench(criterion: &mut Criterion) {
#[allow(clippy::arc_with_non_send_sync)]
let io = Arc::new(PlatformIO::new().unwrap());
let db = Database::open_file(io.clone(), "../testing/testing.db").unwrap();
let db = Database::open_file(io.clone(), "../testing/testing.db", false).unwrap();
let limbo_conn = db.connect().unwrap();
let queries = [

View File

@@ -83,7 +83,12 @@ enum TransactionState {
None,
}
pub(crate) type MvStore = crate::mvcc::MvStore<crate::mvcc::LocalClock>;
pub(crate) type MvCursor = crate::mvcc::cursor::ScanCursor<crate::mvcc::LocalClock>;
pub struct Database {
mv_store: Option<Rc<MvStore>>,
schema: Arc<RwLock<Schema>>,
// TODO: make header work without lock
header: Arc<Mutex<DatabaseHeader>>,
@@ -101,7 +106,7 @@ unsafe impl Sync for Database {}
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Arc<Database>> {
pub fn open_file(io: Arc<dyn IO>, path: &str, enable_mvcc: bool) -> Result<Arc<Database>> {
use storage::wal::WalFileShared;
let file = io.open_file(path, OpenFlags::Create, true)?;
@@ -112,7 +117,7 @@ impl Database {
io.run_once()?;
let page_size = db_header.lock().unwrap().page_size;
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?;
Self::open(io, page_io, wal_shared)
Self::open(io, page_io, wal_shared, enable_mvcc)
}
#[allow(clippy::arc_with_non_send_sync)]
@@ -120,6 +125,7 @@ impl Database {
io: Arc<dyn IO>,
page_io: Arc<dyn DatabaseStorage>,
shared_wal: Arc<RwLock<WalFileShared>>,
enable_mvcc: bool,
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(page_io.clone())?;
io.run_once()?;
@@ -127,11 +133,20 @@ impl Database {
let version = db_header.lock().unwrap().version_number;
version.to_string()
});
let mv_store = if enable_mvcc {
Some(Rc::new(MvStore::new(
crate::mvcc::LocalClock::new(),
crate::mvcc::persistent_storage::Storage::new_noop(),
)))
} else {
None
};
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
let page_size = db_header.lock().unwrap().page_size;
let header = db_header;
let schema = Arc::new(RwLock::new(Schema::new()));
let db = Database {
mv_store,
schema: schema.clone(),
header: header.clone(),
shared_page_cache: shared_page_cache.clone(),
@@ -149,7 +164,7 @@ impl Database {
.try_write()
.expect("lock on schema should succeed first try");
let syms = conn.syms.borrow();
parse_schema_rows(rows, &mut schema, io, syms.deref())?;
parse_schema_rows(rows, &mut schema, io, syms.deref(), None)?;
}
Ok(db)
}
@@ -178,6 +193,7 @@ impl Database {
header: self.header.clone(),
last_insert_rowid: Cell::new(0),
auto_commit: RefCell::new(true),
mv_transactions: RefCell::new(Vec::new()),
transaction_state: RefCell::new(TransactionState::None),
last_change: Cell::new(0),
syms: RefCell::new(SymbolTable::new()),
@@ -244,6 +260,7 @@ pub struct Connection {
schema: Arc<RwLock<Schema>>,
header: Arc<Mutex<DatabaseHeader>>,
auto_commit: RefCell<bool>,
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
transaction_state: RefCell<TransactionState>,
last_insert_rowid: Cell<u64>,
last_change: Cell<i64>,
@@ -274,7 +291,11 @@ impl Connection {
&syms,
QueryMode::Normal,
)?);
Ok(Statement::new(program, self.pager.clone()))
Ok(Statement::new(
program,
self._db.mv_store.clone(),
self.pager.clone(),
))
}
Cmd::Explain(_stmt) => todo!(),
Cmd::ExplainQueryPlan(_stmt) => todo!(),
@@ -312,7 +333,7 @@ impl Connection {
&syms,
QueryMode::Normal,
)?);
let stmt = Statement::new(program, self.pager.clone());
let stmt = Statement::new(program, self._db.mv_store.clone(), self.pager.clone());
Ok(Some(stmt))
}
Cmd::Explain(stmt) => {
@@ -407,7 +428,7 @@ impl Connection {
let mut state =
vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
program.step(&mut state, self.pager.clone())?;
program.step(&mut state, self._db.mv_store.clone(), self.pager.clone())?;
}
}
}
@@ -489,25 +510,36 @@ impl Connection {
pub struct Statement {
program: Rc<vdbe::Program>,
state: vdbe::ProgramState,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
}
impl Statement {
pub fn new(program: Rc<vdbe::Program>, pager: Rc<Pager>) -> Self {
pub fn new(
program: Rc<vdbe::Program>,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
) -> Self {
let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
Self {
program,
state,
mv_store,
pager,
}
}
pub fn set_mv_tx_id(&mut self, mv_tx_id: Option<u64>) {
self.state.mv_tx_id = mv_tx_id;
}
pub fn interrupt(&mut self) {
self.state.interrupt();
}
pub fn step(&mut self) -> Result<StepResult> {
self.program.step(&mut self.state, self.pager.clone())
self.program
.step(&mut self.state, self.mv_store.clone(), self.pager.clone())
}
pub fn num_columns(&self) -> usize {

View File

@@ -1,17 +1,18 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::database::{MvStore, Result, Row, RowID};
use std::fmt::Debug;
use std::rc::Rc;
#[derive(Debug)]
pub struct ScanCursor<'a, Clock: LogicalClock> {
pub db: &'a MvStore<Clock>,
pub struct ScanCursor<Clock: LogicalClock> {
pub db: Rc<MvStore<Clock>>,
pub row_ids: Vec<RowID>,
pub index: usize,
tx_id: u64,
}
impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
pub fn new(db: &'a MvStore<Clock>, tx_id: u64, table_id: u64) -> Result<ScanCursor<'a, Clock>> {
impl<Clock: LogicalClock> ScanCursor<Clock> {
pub fn new(db: Rc<MvStore<Clock>>, tx_id: u64, table_id: u64) -> Result<ScanCursor<Clock>> {
let row_ids = db.scan_row_ids_for_table(table_id)?;
Ok(Self {
db,
@@ -21,6 +22,10 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
})
}
pub fn insert(&self, row: Row) -> Result<()> {
self.db.insert(self.tx_id, row)
}
pub fn current_row_id(&self) -> Option<RowID> {
if self.index >= self.row_ids.len() {
return None;

View File

@@ -17,6 +17,12 @@ pub struct RowID {
pub row_id: u64,
}
impl RowID {
pub fn new(table_id: u64, row_id: u64) -> Self {
Self { table_id, row_id }
}
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Row {
@@ -24,6 +30,12 @@ pub struct Row {
pub data: Vec<u8>,
}
impl Row {
pub fn new(id: RowID, data: Vec<u8>) -> Self {
Self { id, data }
}
}
/// A row version.
#[derive(Clone, Debug, PartialEq)]
pub struct RowVersion {
@@ -242,6 +254,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// * `row` - the row object containing the values to be inserted.
///
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
tracing::trace!("insert(tx_id={}, row.id={:?})", tx_id, row.id);
let tx = self
.txs
.get(&tx_id)
@@ -279,6 +292,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
///
/// Returns `true` if the row was successfully updated, and `false` otherwise.
pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
tracing::trace!("update(tx_id={}, row.id={:?})", tx_id, row.id);
if !self.delete(tx_id, row.id)? {
return Ok(false);
}
@@ -289,6 +303,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Inserts a row in the database with new values, previously deleting
/// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
tracing::trace!("upsert(tx_id={}, row.id={:?})", tx_id, row.id);
self.delete(tx_id, row.id)?;
self.insert(tx_id, row)
}
@@ -308,6 +323,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id);
let row_versions_opt = self.rows.get(&id);
if let Some(ref row_versions) = row_versions_opt {
let mut row_versions = row_versions.value().write().unwrap();
@@ -362,6 +378,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
tracing::trace!("read(tx_id={}, id={:?})", tx_id, id);
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read().unwrap();
assert_eq!(tx.state, TransactionState::Active);
@@ -382,12 +399,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Gets all row ids in the database.
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
tracing::trace!("scan_row_ids");
let keys = self.rows.iter().map(|entry| *entry.key());
Ok(keys.collect())
}
/// Gets all row ids in the database for a given table.
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
Ok(self
.rows
.range(
@@ -412,7 +431,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!("BEGIN {tx}");
tracing::trace!("begin_tx(tx_id={})", tx_id);
self.txs.insert(tx_id, RwLock::new(tx));
tx_id
}
@@ -439,7 +458,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
}
tx.state.store(TransactionState::Preparing);
tracing::trace!("PREPARE {tx}");
tracing::trace!("prepare_tx(tx_id={})", tx_id);
/* TODO: The code we have here is sufficient for snapshot isolation.
** In order to implement serializability, we need the following steps:
@@ -515,7 +534,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
"""
*/
tx.state.store(TransactionState::Committed(end_ts));
tracing::trace!("COMMIT {tx}");
tracing::trace!("commit_tx(tx_id={})", tx_id);
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
drop(tx);
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
@@ -550,7 +569,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
}
}
tracing::trace!("UPDATED TX{tx_id}");
tracing::trace!("updated(tx_id={})", tx_id);
// We have now updated all the versions with a reference to the
// transaction ID to a timestamp and can, therefore, remove the
// transaction. Please note that when we move to lockless, the
@@ -563,7 +582,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
if !log_record.row_versions.is_empty() {
self.storage.log_tx(log_record)?;
}
tracing::trace!("LOGGED {tx_id}");
tracing::trace!("logged(tx_id={})", tx_id);
Ok(())
}
@@ -580,7 +599,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = tx_unlocked.value().write().unwrap();
assert_eq!(tx.state, TransactionState::Active);
tx.state.store(TransactionState::Aborted);
tracing::trace!("ABORT {tx}");
tracing::trace!("abort(tx_id={})", tx_id);
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
drop(tx);
@@ -596,7 +615,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = tx_unlocked.value().read().unwrap();
tx.state.store(TransactionState::Terminated);
tracing::trace!("TERMINATE {tx}");
tracing::trace!("terminate(tx_id={})", tx_id);
// FIXME: verify that we can already remove the transaction here!
// Maybe it's fine for snapshot isolation, but too early for serializable?
self.txs.remove(&tx_id);
@@ -617,7 +636,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Returns the number of removed versions.
pub fn drop_unused_row_versions(&self) -> usize {
tracing::trace!(
"Dropping unused row versions. Database stats: transactions: {}; rows: {}",
"drop_unused_row_versions() -> txs: {}; rows: {}",
self.txs.len(),
self.rows.len()
);
@@ -673,7 +692,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn recover(&self) -> Result<()> {
let tx_log = self.storage.read_tx_log()?;
for record in tx_log {
tracing::debug!("RECOVERING {:?}", record);
tracing::debug!("recover() -> tx_timestamp={}", record.tx_timestamp);
for version in record.row_versions {
self.insert_version(version.row.id, version);
}

View File

@@ -37,6 +37,9 @@ pub mod database;
pub mod errors;
pub mod persistent_storage;
pub use clock::LocalClock;
pub use database::MvStore;
#[cfg(test)]
mod tests {
use crate::mvcc::clock::LocalClock;

View File

@@ -4,6 +4,7 @@ use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{
read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell,
};
use crate::MvCursor;
use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp};
use crate::{return_corrupt, LimboError, Result};
@@ -135,6 +136,9 @@ impl CursorState {
}
pub struct BTreeCursor {
/// The multi-version cursor that is used to read and write to the database file.
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
/// The pager that is used to read and write to the database file.
pager: Rc<Pager>,
/// Page id of the root page used to go back up fast.
root_page: usize,
@@ -178,8 +182,13 @@ struct CellArray {
}
impl BTreeCursor {
pub fn new(pager: Rc<Pager>, root_page: usize) -> Self {
pub fn new(
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
pager: Rc<Pager>,
root_page: usize,
) -> Self {
Self {
mv_cursor,
pager,
root_page,
rowid: Cell::new(None),
@@ -198,6 +207,10 @@ impl BTreeCursor {
/// Check if the table is empty.
/// This is done by checking if the root page has no cells.
fn is_empty_table(&self) -> Result<CursorResult<bool>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mv_cursor = mv_cursor.borrow();
return Ok(CursorResult::Ok(mv_cursor.is_empty()));
}
let page = self.pager.read_page(self.root_page)?;
return_if_locked!(page);
@@ -290,6 +303,19 @@ impl BTreeCursor {
&mut self,
predicate: Option<(SeekKey<'_>, SeekOp)>,
) -> Result<CursorResult<(Option<u64>, Option<Record>)>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let rowid = mv_cursor.current_row_id();
match rowid {
Some(rowid) => {
let record = mv_cursor.current_row().unwrap().unwrap();
let record: Record = crate::storage::sqlite3_ondisk::read_record(&record.data)?;
mv_cursor.forward();
return Ok(CursorResult::Ok((Some(rowid.row_id), Some(record))));
}
None => return Ok(CursorResult::Ok((None, None))),
}
}
loop {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_cell_index() as usize;
@@ -592,6 +618,7 @@ impl BTreeCursor {
}
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
//
@@ -1592,15 +1619,22 @@ impl BTreeCursor {
}
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
self.move_to_root();
if let Some(_) = &self.mv_cursor {
let (rowid, record) = return_if_io!(self.get_next_record(None));
self.rowid.replace(rowid);
self.record.replace(record);
} else {
self.move_to_root();
let (rowid, record) = return_if_io!(self.get_next_record(None));
self.rowid.replace(rowid);
self.record.replace(record);
let (rowid, record) = return_if_io!(self.get_next_record(None));
self.rowid.replace(rowid);
self.record.replace(record);
}
Ok(CursorResult::Ok(()))
}
pub fn last(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
match self.move_to_rightmost()? {
CursorResult::Ok(_) => self.prev(),
CursorResult::IO => Ok(CursorResult::IO),
@@ -1615,6 +1649,7 @@ impl BTreeCursor {
}
pub fn prev(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
match self.get_prev_record()? {
CursorResult::Ok((rowid, record)) => {
self.rowid.replace(rowid);
@@ -1631,10 +1666,15 @@ impl BTreeCursor {
}
pub fn rowid(&self) -> Result<Option<u64>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mv_cursor = mv_cursor.borrow();
return Ok(mv_cursor.current_row_id().map(|rowid| rowid.row_id));
}
Ok(self.rowid.get())
}
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
assert!(self.mv_cursor.is_none());
let (rowid, record) = return_if_io!(self.do_seek(key, op));
self.rowid.replace(rowid);
self.record.replace(record);
@@ -1648,23 +1688,35 @@ impl BTreeCursor {
pub fn insert(
&mut self,
key: &OwnedValue,
_record: &Record,
record: &Record,
moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */
) -> Result<CursorResult<()>> {
let int_key = match key {
OwnedValue::Integer(i) => i,
_ => unreachable!("btree tables are indexed by integers!"),
};
if !moved_before {
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
}
return_if_io!(self.insert_into_page(key, _record));
self.rowid.replace(Some(*int_key as u64));
match &self.mv_cursor {
Some(mv_cursor) => {
let row_id =
crate::mvcc::database::RowID::new(self.table_id() as u64, *int_key as u64);
let mut record_buf = Vec::new();
record.serialize(&mut record_buf);
let row = crate::mvcc::database::Row::new(row_id, record_buf);
mv_cursor.borrow_mut().insert(row).unwrap();
}
None => {
if !moved_before {
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
}
return_if_io!(self.insert_into_page(key, record));
self.rowid.replace(Some(*int_key as u64));
}
};
Ok(CursorResult::Ok(()))
}
pub fn delete(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
let page = self.stack.top();
return_if_locked!(page);
@@ -1808,6 +1860,7 @@ impl BTreeCursor {
}
pub fn exists(&mut self, key: &OwnedValue) -> Result<CursorResult<bool>> {
assert!(self.mv_cursor.is_none());
let int_key = match key {
OwnedValue::Integer(i) => i,
_ => unreachable!("btree tables are indexed by integers!"),
@@ -1927,6 +1980,10 @@ impl BTreeCursor {
Ok(Some(n_overflow))
}
pub fn table_id(&self) -> usize {
self.root_page
}
}
impl PageStack {
@@ -2811,7 +2868,7 @@ mod tests {
.unwrap();
}
let io: Arc<dyn IO> = Arc::new(PlatformIO::new().unwrap());
let db = Database::open_file(io.clone(), path.to_str().unwrap()).unwrap();
let db = Database::open_file(io.clone(), path.to_str().unwrap(), false).unwrap();
db
}
@@ -2905,7 +2962,7 @@ mod tests {
}
fn validate_btree(pager: Rc<Pager>, page_idx: usize) -> (usize, bool) {
let cursor = BTreeCursor::new(pager.clone(), page_idx);
let cursor = BTreeCursor::new(None, pager.clone(), page_idx);
let page = pager.read_page(page_idx).unwrap();
let page = page.get();
let contents = page.contents.as_ref().unwrap();
@@ -2969,7 +3026,7 @@ mod tests {
}
fn format_btree(pager: Rc<Pager>, page_idx: usize, depth: usize) -> String {
let cursor = BTreeCursor::new(pager.clone(), page_idx);
let cursor = BTreeCursor::new(None, pager.clone(), page_idx);
let page = pager.read_page(page_idx).unwrap();
let page = page.get();
let contents = page.contents.as_ref().unwrap();
@@ -3100,7 +3157,7 @@ mod tests {
.as_slice(),
] {
let (pager, root_page) = empty_btree();
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
for (key, size) in sequence.iter() {
run_until_done(
|| {
@@ -3151,7 +3208,7 @@ mod tests {
tracing::info!("super seed: {}", seed);
for _ in 0..attempts {
let (pager, root_page) = empty_btree();
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
let mut keys = Vec::new();
let seed = rng.next_u64();
tracing::info!("seed: {}", seed);
@@ -3332,7 +3389,7 @@ mod tests {
#[ignore]
pub fn test_clear_overflow_pages() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1);
let cursor = BTreeCursor::new(None, pager.clone(), 1);
let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096);
let usable_size = cursor.usable_space();
@@ -3430,7 +3487,7 @@ mod tests {
#[test]
pub fn test_clear_overflow_pages_no_overflow() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1);
let cursor = BTreeCursor::new(None, pager.clone(), 1);
let small_payload = vec![b'A'; 10];
@@ -3869,7 +3926,7 @@ mod tests {
let (pager, root_page) = empty_btree();
let mut keys = Vec::new();
for i in 0..10000 {
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
tracing::info!("INSERT INTO t VALUES ({});", i,);
let key = OwnedValue::Integer(i);
let value = Record::new(vec![OwnedValue::Integer(i)]);
@@ -3893,7 +3950,7 @@ mod tests {
format_btree(pager.clone(), root_page, 0)
);
for key in keys.iter() {
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
let key = OwnedValue::Integer(*key);
let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap();
assert!(exists, "key not found {}", key);

View File

@@ -41,8 +41,10 @@ pub fn parse_schema_rows(
schema: &mut Schema,
io: Arc<dyn IO>,
syms: &SymbolTable,
mv_tx_id: Option<u64>,
) -> Result<()> {
if let Some(mut rows) = rows {
rows.set_mv_tx_id(mv_tx_id);
let mut automatic_indexes = Vec::new();
loop {
match rows.step()? {

View File

@@ -55,7 +55,9 @@ use crate::{
json::json_quote, json::json_remove, json::json_set, json::json_type,
};
use crate::{info, CheckpointStatus};
use crate::{resolve_ext_path, Connection, Result, TransactionState, DATABASE_VERSION};
use crate::{
resolve_ext_path, Connection, MvCursor, MvStore, Result, TransactionState, DATABASE_VERSION,
};
use insn::{
exec_add, exec_and, exec_bit_and, exec_bit_not, exec_bit_or, exec_boolean_not, exec_concat,
exec_divide, exec_multiply, exec_or, exec_remainder, exec_shift_left, exec_shift_right,
@@ -235,6 +237,7 @@ pub struct ProgramState {
deferred_seek: Option<(CursorID, CursorID)>,
ended_coroutine: Bitfield<4>, // flag to indicate that a coroutine has ended (key is the yield register. currently we assume that the yield register is always between 0-255, YOLO)
regex_cache: RegexCache,
pub(crate) mv_tx_id: Option<crate::mvcc::database::TxID>,
interrupted: bool,
parameters: HashMap<NonZero<usize>, OwnedValue>,
halt_state: Option<HaltState>,
@@ -254,6 +257,7 @@ impl ProgramState {
deferred_seek: None,
ended_coroutine: Bitfield::new(),
regex_cache: RegexCache::new(),
mv_tx_id: None,
interrupted: false,
parameters: HashMap::new(),
halt_state: None,
@@ -358,7 +362,12 @@ impl Program {
}
#[instrument(skip_all)]
pub fn step(&self, state: &mut ProgramState, pager: Rc<Pager>) -> Result<StepResult> {
pub fn step(
&self,
state: &mut ProgramState,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
) -> Result<StepResult> {
loop {
if state.is_interrupted() {
return Ok(StepResult::Interrupt);
@@ -755,7 +764,18 @@ impl Program {
root_page,
} => {
let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap();
let cursor = BTreeCursor::new(pager.clone(), *root_page);
let mv_cursor = match state.mv_tx_id {
Some(tx_id) => {
let table_id = *root_page as u64;
let mv_store = mv_store.as_ref().unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
MvCursor::new(mv_store, tx_id, table_id).unwrap(),
));
Some(mv_cursor)
}
None => None,
};
let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page);
let mut cursors = state.cursors.borrow_mut();
match cursor_type {
CursorType::BTreeTable(_) => {
@@ -1204,36 +1224,49 @@ impl Program {
)));
}
}
return self.halt(pager, state);
return self.halt(pager, state, mv_store);
}
Insn::Transaction { write } => {
let connection = self.connection.upgrade().unwrap();
let current_state = connection.transaction_state.borrow().clone();
let (new_transaction_state, updated) = match (&current_state, write) {
(TransactionState::Write, true) => (TransactionState::Write, false),
(TransactionState::Write, false) => (TransactionState::Write, false),
(TransactionState::Read, true) => (TransactionState::Write, true),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (TransactionState::Write, true),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(StepResult::Busy);
if let Some(mv_store) = &mv_store {
if state.mv_tx_id.is_none() {
let tx_id = mv_store.begin_tx();
self.connection
.upgrade()
.unwrap()
.mv_transactions
.borrow_mut()
.push(tx_id);
state.mv_tx_id = Some(tx_id);
}
}
} else {
let connection = self.connection.upgrade().unwrap();
let current_state = connection.transaction_state.borrow().clone();
let (new_transaction_state, updated) = match (&current_state, write) {
(TransactionState::Write, true) => (TransactionState::Write, false),
(TransactionState::Write, false) => (TransactionState::Write, false),
(TransactionState::Read, true) => (TransactionState::Write, true),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (TransactionState::Write, true),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(new_transaction_state, TransactionState::Write) {
if let LimboResult::Busy = pager.begin_write_tx()? {
tracing::trace!("begin_write_tx busy");
return Ok(StepResult::Busy);
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(StepResult::Busy);
}
}
if updated && matches!(new_transaction_state, TransactionState::Write) {
if let LimboResult::Busy = pager.begin_write_tx()? {
tracing::trace!("begin_write_tx busy");
return Ok(StepResult::Busy);
}
}
if updated {
connection
.transaction_state
.replace(new_transaction_state.clone());
}
}
if updated {
connection
.transaction_state
.replace(new_transaction_state.clone());
}
state.pc += 1;
}
@@ -1261,7 +1294,7 @@ impl Program {
"cannot commit - no transaction is active".to_string(),
));
}
return self.halt(pager, state);
return self.halt(pager, state, mv_store);
}
Insn::Goto { target_pc } => {
assert!(target_pc.is_offset());
@@ -2935,7 +2968,18 @@ impl Program {
let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap();
let mut cursors = state.cursors.borrow_mut();
let is_index = cursor_type.is_index();
let cursor = BTreeCursor::new(pager.clone(), *root_page);
let mv_cursor = match state.mv_tx_id {
Some(tx_id) => {
let table_id = *root_page as u64;
let mv_store = mv_store.as_ref().unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
MvCursor::new(mv_store, tx_id, table_id).unwrap(),
));
Some(mv_cursor)
}
None => None,
};
let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page);
if is_index {
cursors
.get_mut(*cursor_id)
@@ -3014,6 +3058,7 @@ impl Program {
&mut schema,
conn.pager.io.clone(),
&conn.syms.borrow(),
state.mv_tx_id,
)?;
state.pc += 1;
}
@@ -3084,41 +3129,59 @@ impl Program {
}
}
fn halt(&self, pager: Rc<Pager>, program_state: &mut ProgramState) -> Result<StepResult> {
let connection = self
.connection
.upgrade()
.expect("only weak ref to connection?");
let auto_commit = *connection.auto_commit.borrow();
tracing::trace!("Halt auto_commit {}", auto_commit);
assert!(
program_state.halt_state.is_none()
|| (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing))
);
if program_state.halt_state.is_some() {
self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref())
} else {
fn halt(
&self,
pager: Rc<Pager>,
program_state: &mut ProgramState,
mv_store: Option<Rc<MvStore>>,
) -> Result<StepResult> {
if let Some(mv_store) = mv_store {
let conn = self.connection.upgrade().unwrap();
let auto_commit = *conn.auto_commit.borrow();
if auto_commit {
let current_state = connection.transaction_state.borrow().clone();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.halt_state,
connection.deref(),
),
TransactionState::Read => {
pager.end_read_tx()?;
Ok(StepResult::Done)
}
TransactionState::None => Ok(StepResult::Done),
let mut mv_transactions = conn.mv_transactions.borrow_mut();
for tx_id in mv_transactions.iter() {
mv_store.commit_tx(*tx_id).unwrap();
}
mv_transactions.clear();
}
return Ok(StepResult::Done);
} else {
let connection = self
.connection
.upgrade()
.expect("only weak ref to connection?");
let auto_commit = *connection.auto_commit.borrow();
tracing::trace!("Halt auto_commit {}", auto_commit);
assert!(
program_state.halt_state.is_none()
|| (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing))
);
if program_state.halt_state.is_some() {
self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref())
} else {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
if auto_commit {
let current_state = connection.transaction_state.borrow().clone();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.halt_state,
connection.deref(),
),
TransactionState::Read => {
pager.end_read_tx()?;
Ok(StepResult::Done)
}
TransactionState::None => Ok(StepResult::Done),
}
} else {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
Ok(StepResult::Done)
}
Ok(StepResult::Done)
}
}
}

View File

@@ -308,8 +308,12 @@ fn doublecheck(
) {
{
let mut env_ = env.lock().unwrap();
env_.db =
Database::open_file(env_.io.clone(), paths.doublecheck_db.to_str().unwrap()).unwrap();
env_.db = Database::open_file(
env_.io.clone(),
paths.doublecheck_db.to_str().unwrap(),
false,
)
.unwrap();
}
// Run the simulation again

View File

@@ -87,7 +87,7 @@ impl SimulatorEnv {
std::fs::remove_file(db_path).unwrap();
}
let db = match Database::open_file(io.clone(), db_path.to_str().unwrap()) {
let db = match Database::open_file(io.clone(), db_path.to_str().unwrap(), false) {
Ok(db) => db,
Err(e) => {
panic!("error opening simulator test file {:?}: {:?}", db_path, e);

View File

@@ -116,7 +116,7 @@ pub unsafe extern "C" fn sqlite3_open(
Err(_) => return SQLITE_MISUSE,
},
};
match limbo_core::Database::open_file(io, filename) {
match limbo_core::Database::open_file(io, filename, false) {
Ok(db) => {
let conn = db.connect().unwrap();
*db_out = Box::leak(Box::new(sqlite3::new(db, conn)));

View File

@@ -42,7 +42,7 @@ impl TempDatabase {
pub fn connect_limbo(&self) -> Rc<limbo_core::Connection> {
log::debug!("conneting to limbo");
let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap();
let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap(), false).unwrap();
let conn = db.connect().unwrap();
log::debug!("connected to limbo");
@@ -51,7 +51,7 @@ impl TempDatabase {
pub fn limbo_database(&self) -> Arc<limbo_core::Database> {
log::debug!("conneting to limbo");
Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap()
Database::open_file(self.io.clone(), self.path.to_str().unwrap(), false).unwrap()
}
}