mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-02 14:54:23 +01:00
Merge 'core: wal transaction start' from Pere Diaz Bou
This pr adds support for multiple readers and a single writer with a custom made lock called `LimboRwLock`. Basically there are 5 allowed read locks which store the max frame allowed in that "snapshot" and any reader will try to acquire the biggest one possible. Writer will just try to lock the `write_lock` and if not successful, it will return busy. The only checkpoint mode supported for now is `PASSIVE` but it should be trivial to add more modes. This needs testing, but I will do it in another PR. I just wanted to do it in another PR. Closes #544
This commit is contained in:
@@ -143,6 +143,11 @@ impl Cursor {
|
||||
limbo_core::RowResult::Done => {
|
||||
return Ok(None);
|
||||
}
|
||||
limbo_core::RowResult::Busy => {
|
||||
return Err(
|
||||
PyErr::new::<OperationalError, _>("Busy error".to_string()).into()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -177,6 +182,11 @@ impl Cursor {
|
||||
limbo_core::RowResult::Done => {
|
||||
return Ok(results);
|
||||
}
|
||||
limbo_core::RowResult::Busy => {
|
||||
return Err(
|
||||
PyErr::new::<OperationalError, _>("Busy error".to_string()).into()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -85,7 +85,8 @@ impl Statement {
|
||||
}
|
||||
Ok(limbo_core::RowResult::IO)
|
||||
| Ok(limbo_core::RowResult::Done)
|
||||
| Ok(limbo_core::RowResult::Interrupt) => JsValue::UNDEFINED,
|
||||
| Ok(limbo_core::RowResult::Interrupt)
|
||||
| Ok(limbo_core::RowResult::Busy) => JsValue::UNDEFINED,
|
||||
Err(e) => panic!("Error: {:?}", e),
|
||||
}
|
||||
}
|
||||
@@ -105,6 +106,7 @@ impl Statement {
|
||||
Ok(limbo_core::RowResult::IO) => {}
|
||||
Ok(limbo_core::RowResult::Interrupt) => break,
|
||||
Ok(limbo_core::RowResult::Done) => break,
|
||||
Ok(limbo_core::RowResult::Busy) => break,
|
||||
Err(e) => panic!("Error: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
16
cli/app.rs
16
cli/app.rs
@@ -525,6 +525,10 @@ impl Limbo {
|
||||
Ok(RowResult::Done) => {
|
||||
break;
|
||||
}
|
||||
Ok(RowResult::Busy) => {
|
||||
self.writeln("database is busy");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = self.writeln(err.to_string());
|
||||
break;
|
||||
@@ -560,6 +564,10 @@ impl Limbo {
|
||||
}
|
||||
Ok(RowResult::Interrupt) => break,
|
||||
Ok(RowResult::Done) => break,
|
||||
Ok(RowResult::Busy) => {
|
||||
self.writeln("database is busy");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = self.write_fmt(format_args!("{}", err));
|
||||
break;
|
||||
@@ -610,6 +618,10 @@ impl Limbo {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => {
|
||||
self.writeln("database is busy");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
@@ -663,6 +675,10 @@ impl Limbo {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => {
|
||||
self.writeln("database is busy");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,6 +52,9 @@ fn limbo_bench(criterion: &mut Criterion) {
|
||||
limbo_core::RowResult::Done => {
|
||||
unreachable!();
|
||||
}
|
||||
limbo_core::RowResult::Busy => {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
stmt.reset();
|
||||
});
|
||||
@@ -77,6 +80,9 @@ fn limbo_bench(criterion: &mut Criterion) {
|
||||
limbo_core::RowResult::Done => {
|
||||
unreachable!();
|
||||
}
|
||||
limbo_core::RowResult::Busy => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
stmt.reset();
|
||||
});
|
||||
@@ -103,6 +109,9 @@ fn limbo_bench(criterion: &mut Criterion) {
|
||||
limbo_core::RowResult::Done => {
|
||||
unreachable!();
|
||||
}
|
||||
limbo_core::RowResult::Busy => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
stmt.reset();
|
||||
});
|
||||
|
||||
@@ -5,6 +5,7 @@ mod io;
|
||||
#[cfg(feature = "json")]
|
||||
mod json;
|
||||
mod pseudo;
|
||||
mod result;
|
||||
mod schema;
|
||||
mod storage;
|
||||
mod translate;
|
||||
@@ -66,7 +67,6 @@ pub struct Database {
|
||||
pager: Rc<Pager>,
|
||||
schema: Rc<RefCell<Schema>>,
|
||||
header: Rc<RefCell<DatabaseHeader>>,
|
||||
transaction_state: RefCell<TransactionState>,
|
||||
// Shared structures of a Database are the parts that are common to multiple threads that might
|
||||
// create DB connections.
|
||||
shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
@@ -123,6 +123,7 @@ impl Database {
|
||||
pager: pager.clone(),
|
||||
schema: bootstrap_schema.clone(),
|
||||
header: db_header.clone(),
|
||||
transaction_state: RefCell::new(TransactionState::None),
|
||||
db: Weak::new(),
|
||||
last_insert_rowid: Cell::new(0),
|
||||
});
|
||||
@@ -135,7 +136,6 @@ impl Database {
|
||||
pager,
|
||||
schema,
|
||||
header,
|
||||
transaction_state: RefCell::new(TransactionState::None),
|
||||
shared_page_cache,
|
||||
shared_wal,
|
||||
}))
|
||||
@@ -148,6 +148,7 @@ impl Database {
|
||||
header: self.header.clone(),
|
||||
last_insert_rowid: Cell::new(0),
|
||||
db: Arc::downgrade(self),
|
||||
transaction_state: RefCell::new(TransactionState::None),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -206,6 +207,7 @@ pub struct Connection {
|
||||
schema: Rc<RefCell<Schema>>,
|
||||
header: Rc<RefCell<DatabaseHeader>>,
|
||||
db: Weak<Database>, // backpointer to the database holding this connection
|
||||
transaction_state: RefCell<TransactionState>,
|
||||
last_insert_rowid: Cell<u64>,
|
||||
}
|
||||
|
||||
@@ -379,6 +381,7 @@ impl Statement {
|
||||
vdbe::StepResult::IO => Ok(RowResult::IO),
|
||||
vdbe::StepResult::Done => Ok(RowResult::Done),
|
||||
vdbe::StepResult::Interrupt => Ok(RowResult::Interrupt),
|
||||
vdbe::StepResult::Busy => Ok(RowResult::Busy),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,6 +398,7 @@ pub enum RowResult<'a> {
|
||||
IO,
|
||||
Done,
|
||||
Interrupt,
|
||||
Busy,
|
||||
}
|
||||
|
||||
pub struct Row<'a> {
|
||||
|
||||
6
core/result.rs
Normal file
6
core/result.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
/// Common results that different functions can return in limbo.
|
||||
pub enum LimboResult {
|
||||
/// Couldn't acquire a lock
|
||||
Busy,
|
||||
Ok,
|
||||
}
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
|
||||
use crate::storage::wal::Wal;
|
||||
use crate::{Buffer, Result};
|
||||
use log::{debug, trace};
|
||||
use log::trace;
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::collections::HashSet;
|
||||
use std::rc::Rc;
|
||||
@@ -11,7 +12,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::page_cache::{DumbLruPageCache, PageCacheKey};
|
||||
use super::wal::CheckpointStatus;
|
||||
use super::wal::{CheckpointMode, CheckpointStatus};
|
||||
|
||||
pub struct PageInner {
|
||||
pub flags: AtomicUsize,
|
||||
@@ -196,14 +197,12 @@ impl Pager {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn begin_read_tx(&self) -> Result<()> {
|
||||
self.wal.borrow_mut().begin_read_tx()?;
|
||||
Ok(())
|
||||
pub fn begin_read_tx(&self) -> Result<LimboResult> {
|
||||
self.wal.borrow_mut().begin_read_tx()
|
||||
}
|
||||
|
||||
pub fn begin_write_tx(&self) -> Result<()> {
|
||||
self.wal.borrow_mut().begin_write_tx()?;
|
||||
Ok(())
|
||||
pub fn begin_write_tx(&self) -> Result<LimboResult> {
|
||||
self.wal.borrow_mut().begin_write_tx()
|
||||
}
|
||||
|
||||
pub fn end_tx(&self) -> Result<CheckpointStatus> {
|
||||
@@ -378,7 +377,11 @@ impl Pager {
|
||||
match state {
|
||||
CheckpointState::Checkpoint => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
self,
|
||||
in_flight,
|
||||
CheckpointMode::Passive,
|
||||
)? {
|
||||
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
|
||||
CheckpointStatus::Done => {
|
||||
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
|
||||
@@ -414,11 +417,11 @@ impl Pager {
|
||||
// WARN: used for testing purposes
|
||||
pub fn clear_page_cache(&self) {
|
||||
loop {
|
||||
match self
|
||||
.wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, Rc::new(RefCell::new(0)))
|
||||
{
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
self,
|
||||
Rc::new(RefCell::new(0)),
|
||||
CheckpointMode::Passive,
|
||||
) {
|
||||
Ok(CheckpointStatus::IO) => {
|
||||
self.io.run_once();
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::{cell::RefCell, rc::Rc, sync::Arc};
|
||||
|
||||
use log::{debug, trace};
|
||||
|
||||
use crate::io::{File, SyncCompletion, IO};
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
};
|
||||
@@ -18,19 +20,116 @@ use super::page_cache::PageCacheKey;
|
||||
use super::pager::{PageRef, Pager};
|
||||
use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
|
||||
|
||||
pub const READMARK_NOT_USED: u32 = 0xffffffff;
|
||||
|
||||
pub const NO_LOCK: u32 = 0;
|
||||
pub const SHARED_LOCK: u32 = 1;
|
||||
pub const WRITE_LOCK: u32 = 2;
|
||||
|
||||
pub enum CheckpointMode {
|
||||
Passive,
|
||||
Full,
|
||||
Restart,
|
||||
Truncate,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LimboRwLock {
|
||||
lock: AtomicU32,
|
||||
nreads: AtomicU32,
|
||||
value: AtomicU32,
|
||||
}
|
||||
|
||||
impl LimboRwLock {
|
||||
/// Shared lock. Returns true if it was successful, false if it couldn't lock it
|
||||
pub fn read(&mut self) -> bool {
|
||||
let lock = self.lock.load(Ordering::SeqCst);
|
||||
match lock {
|
||||
NO_LOCK => {
|
||||
let res = self.lock.compare_exchange(
|
||||
lock,
|
||||
SHARED_LOCK,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
let ok = res.is_ok();
|
||||
if ok {
|
||||
self.nreads.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
ok
|
||||
}
|
||||
SHARED_LOCK => {
|
||||
self.nreads.fetch_add(1, Ordering::SeqCst);
|
||||
true
|
||||
}
|
||||
WRITE_LOCK => false,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Locks exlusively. Returns true if it was successful, false if it couldn't lock it
|
||||
pub fn write(&mut self) -> bool {
|
||||
let lock = self.lock.load(Ordering::SeqCst);
|
||||
match lock {
|
||||
NO_LOCK => {
|
||||
let res = self.lock.compare_exchange(
|
||||
lock,
|
||||
WRITE_LOCK,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
res.is_ok()
|
||||
}
|
||||
SHARED_LOCK => {
|
||||
// no op
|
||||
false
|
||||
}
|
||||
WRITE_LOCK => true,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Unlock the current held lock.
|
||||
pub fn unlock(&mut self) {
|
||||
let lock = self.lock.load(Ordering::SeqCst);
|
||||
match lock {
|
||||
NO_LOCK => {}
|
||||
SHARED_LOCK => {
|
||||
let prev = self.nreads.fetch_sub(1, Ordering::SeqCst);
|
||||
if prev == 1 {
|
||||
let res = self.lock.compare_exchange(
|
||||
lock,
|
||||
NO_LOCK,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
}
|
||||
WRITE_LOCK => {
|
||||
let res =
|
||||
self.lock
|
||||
.compare_exchange(lock, NO_LOCK, Ordering::SeqCst, Ordering::SeqCst);
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write-ahead log (WAL).
|
||||
pub trait Wal {
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&mut self) -> Result<()>;
|
||||
fn begin_read_tx(&mut self) -> Result<LimboResult>;
|
||||
|
||||
/// Begin a write transaction.
|
||||
fn begin_write_tx(&mut self) -> Result<()>;
|
||||
fn begin_write_tx(&mut self) -> Result<LimboResult>;
|
||||
|
||||
/// End a read transaction.
|
||||
fn end_read_tx(&self) -> Result<()>;
|
||||
fn end_read_tx(&self) -> Result<LimboResult>;
|
||||
|
||||
/// End a write transaction.
|
||||
fn end_write_tx(&self) -> Result<()>;
|
||||
fn end_write_tx(&self) -> Result<LimboResult>;
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
|
||||
@@ -51,6 +150,7 @@ pub trait Wal {
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<CheckpointStatus>;
|
||||
fn sync(&mut self) -> Result<CheckpointStatus>;
|
||||
fn get_max_frame(&self) -> u64;
|
||||
@@ -108,10 +208,16 @@ pub struct WalFile {
|
||||
ongoing_checkpoint: OngoingCheckpoint,
|
||||
checkpoint_threshold: usize,
|
||||
// min and max frames for this connection
|
||||
/// This is the index to the read_lock in WalFileShared that we are holding. This lock contains
|
||||
/// the max frame for this connection.
|
||||
max_frame_read_lock_index: usize,
|
||||
/// Max frame allowed to lookup range=(minframe..max_frame)
|
||||
max_frame: u64,
|
||||
/// Start of range to look for frames range=(minframe..max_frame)
|
||||
min_frame: u64,
|
||||
}
|
||||
|
||||
// TODO(pere): lock only important parts + pin WalFileShared
|
||||
/// WalFileShared is the part of a WAL that will be shared between threads. A wal has information
|
||||
/// that needs to be communicated between threads so this struct does the job.
|
||||
pub struct WalFileShared {
|
||||
@@ -130,20 +236,94 @@ pub struct WalFileShared {
|
||||
pages_in_frames: Vec<u64>,
|
||||
last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
|
||||
file: Rc<dyn File>,
|
||||
/// read_locks is a list of read locks that can coexist with the max_frame nubmer stored in
|
||||
/// value. There is a limited amount because and unbounded amount of connections could be
|
||||
/// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max
|
||||
/// frames that is equal to 5
|
||||
read_locks: [LimboRwLock; 5],
|
||||
/// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only
|
||||
/// one used.
|
||||
write_lock: LimboRwLock,
|
||||
}
|
||||
|
||||
impl Wal for WalFile {
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&mut self) -> Result<()> {
|
||||
let shared = self.shared.read().unwrap();
|
||||
fn begin_read_tx(&mut self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let max_frame_in_wal = shared.max_frame;
|
||||
self.min_frame = shared.nbackfills + 1;
|
||||
self.max_frame = shared.max_frame;
|
||||
Ok(())
|
||||
|
||||
let mut max_read_mark = 0;
|
||||
let mut max_read_mark_index = -1;
|
||||
// Find the largest mark we can find, ignore frames that are impossible to be in range and
|
||||
// that are not set
|
||||
for (index, lock) in shared.read_locks.iter().enumerate() {
|
||||
let this_mark = lock.value.load(Ordering::SeqCst);
|
||||
if this_mark > max_read_mark && this_mark <= max_frame_in_wal as u32 {
|
||||
max_read_mark = this_mark;
|
||||
max_read_mark_index = index as i64;
|
||||
}
|
||||
}
|
||||
|
||||
// If we didn't find any mark, then let's add a new one
|
||||
if max_read_mark_index == -1 {
|
||||
for (index, lock) in shared.read_locks.iter_mut().enumerate() {
|
||||
let busy = !lock.write();
|
||||
if !busy {
|
||||
// If this was busy then it must mean >1 threads tried to set this read lock
|
||||
lock.value.store(max_frame_in_wal as u32, Ordering::SeqCst);
|
||||
max_read_mark = max_frame_in_wal as u32;
|
||||
max_read_mark_index = index as i64;
|
||||
lock.unlock();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if max_read_mark_index == -1 {
|
||||
return Ok(LimboResult::Busy);
|
||||
}
|
||||
|
||||
let lock = &mut shared.read_locks[max_read_mark_index as usize];
|
||||
let busy = !lock.read();
|
||||
if busy {
|
||||
return Ok(LimboResult::Busy);
|
||||
}
|
||||
self.max_frame_read_lock_index = max_read_mark_index as usize;
|
||||
self.max_frame = max_read_mark as u64;
|
||||
self.min_frame = shared.nbackfills + 1;
|
||||
log::trace!(
|
||||
"begin_read_tx(min_frame={}, max_frame={}, lock={})",
|
||||
self.min_frame,
|
||||
self.max_frame,
|
||||
self.max_frame_read_lock_index
|
||||
);
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
/// End a read transaction.
|
||||
fn end_read_tx(&self) -> Result<()> {
|
||||
Ok(())
|
||||
fn end_read_tx(&self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let read_lock = &mut shared.read_locks[self.max_frame_read_lock_index];
|
||||
read_lock.unlock();
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
/// Begin a write transaction
|
||||
fn begin_write_tx(&mut self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let busy = !shared.write_lock.write();
|
||||
if busy {
|
||||
return Ok(LimboResult::Busy);
|
||||
}
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
/// End a write transaction
|
||||
fn end_write_tx(&self) -> Result<LimboResult> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
shared.write_lock.unlock();
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
@@ -186,7 +366,11 @@ impl Wal for WalFile {
|
||||
) -> Result<()> {
|
||||
let page_id = page.get().id;
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let frame_id = shared.max_frame;
|
||||
let frame_id = if shared.max_frame == 0 {
|
||||
1
|
||||
} else {
|
||||
shared.max_frame
|
||||
};
|
||||
let offset = self.frame_offset(frame_id);
|
||||
trace!(
|
||||
"append_frame(frame={}, offset={}, page_id={})",
|
||||
@@ -221,16 +405,6 @@ impl Wal for WalFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Begin a write transaction
|
||||
fn begin_write_tx(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// End a write transaction
|
||||
fn end_write_tx(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
let shared = self.shared.read().unwrap();
|
||||
let frame_id = shared.max_frame as usize;
|
||||
@@ -241,7 +415,12 @@ impl Wal for WalFile {
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<CheckpointStatus> {
|
||||
assert!(
|
||||
matches!(mode, CheckpointMode::Passive),
|
||||
"only passive mode supported for now"
|
||||
);
|
||||
'checkpoint_loop: loop {
|
||||
let state = self.ongoing_checkpoint.state;
|
||||
log::debug!("checkpoint(state={:?})", state);
|
||||
@@ -249,9 +428,29 @@ impl Wal for WalFile {
|
||||
CheckpointState::Start => {
|
||||
// TODO(pere): check what frames are safe to checkpoint between many readers!
|
||||
self.ongoing_checkpoint.min_frame = self.min_frame;
|
||||
self.ongoing_checkpoint.max_frame = self.max_frame;
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
let max_frame_in_wal = shared.max_frame as u32;
|
||||
let mut max_safe_frame = shared.max_frame;
|
||||
for read_lock in shared.read_locks.iter_mut() {
|
||||
let this_mark = read_lock.value.load(Ordering::SeqCst);
|
||||
if this_mark < max_safe_frame as u32 {
|
||||
let busy = !read_lock.write();
|
||||
if !busy {
|
||||
read_lock.value.store(max_frame_in_wal, Ordering::SeqCst);
|
||||
read_lock.unlock();
|
||||
} else {
|
||||
max_safe_frame = this_mark as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.ongoing_checkpoint.max_frame = max_safe_frame;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
log::trace!(
|
||||
"checkpoint_start(min_frame={}, max_frame={})",
|
||||
self.ongoing_checkpoint.max_frame,
|
||||
self.ongoing_checkpoint.min_frame
|
||||
);
|
||||
}
|
||||
CheckpointState::ReadFrame => {
|
||||
let shared = self.shared.read().unwrap();
|
||||
@@ -272,8 +471,9 @@ impl Wal for WalFile {
|
||||
.expect("page must be in frame cache if it's in list");
|
||||
|
||||
for frame in frames.iter().rev() {
|
||||
// TODO: do proper selection of frames to checkpoint
|
||||
if *frame >= self.ongoing_checkpoint.min_frame {
|
||||
if *frame >= self.ongoing_checkpoint.min_frame
|
||||
&& *frame <= self.ongoing_checkpoint.max_frame
|
||||
{
|
||||
log::debug!(
|
||||
"checkpoint page(state={:?}, page={}, frame={})",
|
||||
state,
|
||||
@@ -328,10 +528,18 @@ impl Wal for WalFile {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
shared.frame_cache.clear();
|
||||
shared.pages_in_frames.clear();
|
||||
shared.max_frame = 0;
|
||||
shared.nbackfills = 0;
|
||||
let everything_backfilled =
|
||||
shared.max_frame == self.ongoing_checkpoint.max_frame;
|
||||
if everything_backfilled {
|
||||
// Here we know that we backfilled everything, therefore we can safely
|
||||
// reset the wal.
|
||||
shared.frame_cache.clear();
|
||||
shared.pages_in_frames.clear();
|
||||
shared.max_frame = 0;
|
||||
shared.nbackfills = 0;
|
||||
} else {
|
||||
shared.nbackfills = self.ongoing_checkpoint.max_frame;
|
||||
}
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
return Ok(CheckpointStatus::Done);
|
||||
}
|
||||
@@ -412,10 +620,11 @@ impl WalFile {
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
checkpoint_threshold: 1000,
|
||||
page_size,
|
||||
max_frame: 0,
|
||||
min_frame: 0,
|
||||
buffer_pool,
|
||||
sync_state: RefCell::new(SyncState::NotSyncing),
|
||||
max_frame: 0,
|
||||
min_frame: 0,
|
||||
max_frame_read_lock_index: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,6 +697,38 @@ impl WalFileShared {
|
||||
last_checksum: checksum,
|
||||
file,
|
||||
pages_in_frames: Vec::new(),
|
||||
read_locks: [
|
||||
LimboRwLock {
|
||||
lock: AtomicU32::new(NO_LOCK),
|
||||
nreads: AtomicU32::new(0),
|
||||
value: AtomicU32::new(READMARK_NOT_USED),
|
||||
},
|
||||
LimboRwLock {
|
||||
lock: AtomicU32::new(NO_LOCK),
|
||||
nreads: AtomicU32::new(0),
|
||||
value: AtomicU32::new(READMARK_NOT_USED),
|
||||
},
|
||||
LimboRwLock {
|
||||
lock: AtomicU32::new(NO_LOCK),
|
||||
nreads: AtomicU32::new(0),
|
||||
value: AtomicU32::new(READMARK_NOT_USED),
|
||||
},
|
||||
LimboRwLock {
|
||||
lock: AtomicU32::new(NO_LOCK),
|
||||
nreads: AtomicU32::new(0),
|
||||
value: AtomicU32::new(READMARK_NOT_USED),
|
||||
},
|
||||
LimboRwLock {
|
||||
lock: AtomicU32::new(NO_LOCK),
|
||||
nreads: AtomicU32::new(0),
|
||||
value: AtomicU32::new(READMARK_NOT_USED),
|
||||
},
|
||||
],
|
||||
write_lock: LimboRwLock {
|
||||
lock: AtomicU32::new(NO_LOCK),
|
||||
nreads: AtomicU32::new(0),
|
||||
value: AtomicU32::new(READMARK_NOT_USED),
|
||||
},
|
||||
};
|
||||
Ok(Arc::new(RwLock::new(shared)))
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ pub fn parse_schema_rows(rows: Option<Rows>, schema: &mut Schema, io: Arc<dyn IO
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::error::{LimboError, SQLITE_CONSTRAINT_PRIMARYKEY};
|
||||
use crate::ext::{exec_ts_from_uuid7, exec_uuid, exec_uuidblob, exec_uuidstr, ExtFunc, UuidFunc};
|
||||
use crate::function::{AggFunc, FuncCtx, MathFunc, MathFuncArity, ScalarFunc};
|
||||
use crate::pseudo::PseudoCursor;
|
||||
use crate::result::LimboResult;
|
||||
use crate::schema::Table;
|
||||
use crate::storage::sqlite3_ondisk::DatabaseHeader;
|
||||
use crate::storage::{btree::BTreeCursor, pager::Pager};
|
||||
@@ -537,6 +538,7 @@ pub enum StepResult<'a> {
|
||||
IO,
|
||||
Row(Record<'a>),
|
||||
Interrupt,
|
||||
Busy,
|
||||
}
|
||||
|
||||
/// If there is I/O, the instruction is restarted.
|
||||
@@ -1657,29 +1659,34 @@ impl Program {
|
||||
}
|
||||
Insn::Transaction { write } => {
|
||||
let connection = self.connection.upgrade().unwrap();
|
||||
if let Some(db) = connection.db.upgrade() {
|
||||
// TODO(pere): are backpointers good ?? this looks ugly af
|
||||
// upgrade transaction if needed
|
||||
let new_transaction_state =
|
||||
match (db.transaction_state.borrow().clone(), write) {
|
||||
(crate::TransactionState::Write, true) => TransactionState::Write,
|
||||
(crate::TransactionState::Write, false) => TransactionState::Write,
|
||||
(crate::TransactionState::Read, true) => TransactionState::Write,
|
||||
(crate::TransactionState::Read, false) => TransactionState::Read,
|
||||
(crate::TransactionState::None, true) => TransactionState::Read,
|
||||
(crate::TransactionState::None, false) => TransactionState::Read,
|
||||
};
|
||||
// TODO(Pere):
|
||||
// 1. lock wal
|
||||
// 2. lock shared
|
||||
// 3. lock write db if write
|
||||
db.transaction_state.replace(new_transaction_state.clone());
|
||||
if matches!(new_transaction_state, TransactionState::Write) {
|
||||
pager.begin_read_tx()?;
|
||||
} else {
|
||||
pager.begin_write_tx()?;
|
||||
let current_state = connection.transaction_state.borrow().clone();
|
||||
let (new_transaction_state, updated) = match (¤t_state, write) {
|
||||
(crate::TransactionState::Write, true) => (TransactionState::Write, false),
|
||||
(crate::TransactionState::Write, false) => (TransactionState::Write, false),
|
||||
(crate::TransactionState::Read, true) => (TransactionState::Write, true),
|
||||
(crate::TransactionState::Read, false) => (TransactionState::Read, false),
|
||||
(crate::TransactionState::None, true) => (TransactionState::Write, true),
|
||||
(crate::TransactionState::None, false) => (TransactionState::Read, true),
|
||||
};
|
||||
|
||||
if updated && matches!(current_state, TransactionState::None) {
|
||||
if let LimboResult::Busy = pager.begin_read_tx()? {
|
||||
log::trace!("begin_read_tx busy");
|
||||
return Ok(StepResult::Busy);
|
||||
}
|
||||
}
|
||||
|
||||
if updated && matches!(new_transaction_state, TransactionState::Write) {
|
||||
if let LimboResult::Busy = pager.begin_write_tx()? {
|
||||
log::trace!("begin_write_tx busy");
|
||||
return Ok(StepResult::Busy);
|
||||
}
|
||||
}
|
||||
if updated {
|
||||
connection
|
||||
.transaction_state
|
||||
.replace(new_transaction_state.clone());
|
||||
}
|
||||
state.pc += 1;
|
||||
}
|
||||
Insn::Goto { target_pc } => {
|
||||
|
||||
@@ -235,6 +235,7 @@ impl Interaction {
|
||||
RowResult::Done => {
|
||||
break;
|
||||
}
|
||||
RowResult::Busy => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -379,6 +379,9 @@ fn get_all_rows(
|
||||
RowResult::Done => {
|
||||
break;
|
||||
}
|
||||
RowResult::Busy => {
|
||||
// for now let's retry?
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
|
||||
@@ -246,6 +246,7 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> std::ffi::c_in
|
||||
stmt.row.replace(Some(row));
|
||||
SQLITE_ROW
|
||||
}
|
||||
limbo_core::RowResult::Busy => SQLITE_BUSY,
|
||||
}
|
||||
} else {
|
||||
SQLITE_ERROR
|
||||
|
||||
@@ -95,6 +95,9 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => {
|
||||
panic!("Database is busy");
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
@@ -163,6 +166,7 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
@@ -237,6 +241,7 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
@@ -300,6 +305,7 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
@@ -361,6 +367,7 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => panic!("Database is busy"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -453,6 +460,7 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => panic!("Database is busy"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -487,6 +495,7 @@ mod tests {
|
||||
}
|
||||
RowResult::Interrupt => break,
|
||||
RowResult::Done => break,
|
||||
RowResult::Busy => panic!("Database is busy"),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
|
||||
Reference in New Issue
Block a user