Merge 'Adding checkpoint result' from Sonny

### What?
adding checkpoint result returning number of pages in wal and num pages
checkpointed.
Part of #696
### Context
SQLite returns in checkpoint result of calling `pragma wal_checkpoint;`
`0|3|3` while limbo returns `0|0|0`.
https://sqlite.org/pragma.html#pragma_wal_checkpoint
- 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
- 2nd col: # modified pages written to wal file
- 3rd col: # pages moved to db after checkpoint
This PR aims to add 2nd and 3rd column to the checkpoint result.
SQLite
```
sqlite3 test.db
sqlite> pragma journal_mode=wal;
wal
sqlite> pragma journal_mode;
wal
sqlite> create table t1 (id text);
sqlite> insert into t1(id) values (1),(2);
sqlite> select * from t1;
1
2
sqlite> pragma wal_checkpoint;
0|3|3
```
Limbo
```
./target/debug/limbo test.db
Limbo v0.0.13
Enter ".help" for usage hints.
limbo> pragma journal_mode;
wal
limbo> create table t1(id text);
limbo> insert into t1(id) values (1),(2);
limbo> select * from t1;
1
2
# current the 2nd and 3rd columns are hard coded in limbo to 0
limbo> pragma wal_checkpoint;
0|0|0
```

Closes #827
This commit is contained in:
Pekka Enberg
2025-02-04 18:26:24 +02:00
11 changed files with 237 additions and 48 deletions

View File

@@ -48,6 +48,7 @@ pub use error::LimboError;
use translate::select::prepare_select_plan;
pub type Result<T, E = LimboError> = std::result::Result<T, E>;
use crate::storage::wal::CheckpointResult;
use crate::translate::optimizer::optimize_plan;
pub use io::OpenFlags;
pub use io::PlatformIO;
@@ -62,6 +63,7 @@ pub use storage::pager::Page;
pub use storage::pager::Pager;
pub use storage::wal::CheckpointStatus;
pub use storage::wal::Wal;
pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
#[derive(Clone, PartialEq, Eq)]
@@ -399,9 +401,9 @@ impl Connection {
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
self.pager.clear_page_cache();
Ok(())
pub fn checkpoint(&self) -> Result<CheckpointResult> {
let checkpoint_result = self.pager.clear_page_cache();
Ok(checkpoint_result)
}
#[cfg(not(target_family = "wasm"))]
@@ -414,7 +416,7 @@ impl Connection {
loop {
// TODO: make this async?
match self.pager.checkpoint()? {
CheckpointStatus::Done => {
CheckpointStatus::Done(_) => {
return Ok(());
}
CheckpointStatus::IO => {

View File

@@ -2,7 +2,7 @@ use crate::result::LimboResult;
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
use crate::storage::wal::Wal;
use crate::storage::wal::{CheckpointResult, Wal};
use crate::{Buffer, Result};
use log::trace;
use std::cell::{RefCell, UnsafeCell};
@@ -207,12 +207,14 @@ impl Pager {
}
pub fn end_tx(&self) -> Result<CheckpointStatus> {
match self.cacheflush()? {
CheckpointStatus::Done => {}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
};
self.wal.borrow().end_read_tx()?;
Ok(CheckpointStatus::Done)
let checkpoint_status = self.cacheflush()?;
match checkpoint_status {
CheckpointStatus::IO => Ok(checkpoint_status),
CheckpointStatus::Done(_) => {
self.wal.borrow().end_read_tx()?;
Ok(checkpoint_status)
}
}
}
pub fn end_read_tx(&self) -> Result<()> {
@@ -306,6 +308,7 @@ impl Pager {
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
let mut checkpoint_result = CheckpointResult::new();
loop {
let state = self.flush_info.borrow().state.clone();
match state {
@@ -339,7 +342,7 @@ impl Pager {
FlushState::SyncWal => {
match self.wal.borrow_mut().sync() {
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done) => {}
Ok(CheckpointStatus::Done(res)) => checkpoint_result = res,
Err(e) => return Err(e),
}
@@ -353,7 +356,8 @@ impl Pager {
}
FlushState::Checkpoint => {
match self.checkpoint()? {
CheckpointStatus::Done => {
CheckpointStatus::Done(res) => {
checkpoint_result = res;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
@@ -373,10 +377,11 @@ impl Pager {
}
}
}
Ok(CheckpointStatus::Done)
Ok(CheckpointStatus::Done(checkpoint_result))
}
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
let mut checkpoint_result = CheckpointResult::new();
loop {
let state = self.checkpoint_state.borrow().clone();
trace!("pager_checkpoint(state={:?})", state);
@@ -389,7 +394,8 @@ impl Pager {
CheckpointMode::Passive,
)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done => {
CheckpointStatus::Done(res) => {
checkpoint_result = res;
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
}
};
@@ -413,7 +419,7 @@ impl Pager {
Ok(CheckpointStatus::IO)
} else {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
Ok(CheckpointStatus::Done)
Ok(CheckpointStatus::Done(checkpoint_result))
};
}
}
@@ -421,7 +427,8 @@ impl Pager {
}
// WARN: used for testing purposes
pub fn clear_page_cache(&self) {
pub fn clear_page_cache(&self) -> CheckpointResult {
let checkpoint_result: CheckpointResult;
loop {
match self.wal.borrow_mut().checkpoint(
self,
@@ -431,7 +438,8 @@ impl Pager {
Ok(CheckpointStatus::IO) => {
let _ = self.io.run_once();
}
Ok(CheckpointStatus::Done) => {
Ok(CheckpointStatus::Done(res)) => {
checkpoint_result = res;
break;
}
Err(err) => panic!("error while clearing cache {}", err),
@@ -439,6 +447,7 @@ impl Pager {
}
// TODO: only clear cache of things that are really invalidated
self.page_cache.write().unwrap().clear();
checkpoint_result
}
/*

View File

@@ -1,9 +1,9 @@
use log::{debug, trace};
use std::collections::HashMap;
use std::fmt::Formatter;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::RwLock;
use std::{cell::RefCell, rc::Rc, sync::Arc};
use log::{debug, trace};
use std::{cell::RefCell, fmt, rc::Rc, sync::Arc};
use crate::io::{File, SyncCompletion, IO};
use crate::result::LimboResult;
@@ -25,6 +25,23 @@ pub const NO_LOCK: u32 = 0;
pub const SHARED_LOCK: u32 = 1;
pub const WRITE_LOCK: u32 = 2;
#[derive(Debug)]
pub struct CheckpointResult {
/// number of frames in WAL
pub num_wal_frames: u64,
/// number of frames moved successfully from WAL to db file after checkpoint
pub num_checkpointed_frames: u64,
}
impl CheckpointResult {
pub fn new() -> Self {
Self {
num_wal_frames: 0,
num_checkpointed_frames: 0,
}
}
}
#[derive(Debug)]
pub enum CheckpointMode {
Passive,
@@ -159,7 +176,7 @@ pub trait Wal {
// Syncing requires a state machine because we need to schedule a sync and then wait until it is
// finished. If we don't wait there will be undefined behaviour that no one wants to debug.
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
enum SyncState {
NotSyncing,
Syncing,
@@ -176,7 +193,7 @@ pub enum CheckpointState {
}
pub enum CheckpointStatus {
Done,
Done(CheckpointResult),
IO,
}
@@ -196,6 +213,17 @@ struct OngoingCheckpoint {
current_page: u64,
}
impl fmt::Debug for OngoingCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OngoingCheckpoint")
.field("state", &self.state)
.field("min_frame", &self.min_frame)
.field("max_frame", &self.max_frame)
.field("current_page", &self.current_page)
.finish()
}
}
#[allow(dead_code)]
pub struct WalFile {
io: Arc<dyn IO>,
@@ -218,6 +246,23 @@ pub struct WalFile {
min_frame: u64,
}
impl fmt::Debug for WalFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalFile")
.field("sync_state", &self.sync_state)
.field("syncing", &self.syncing)
.field("page_size", &self.page_size)
.field("shared", &self.shared)
.field("ongoing_checkpoint", &self.ongoing_checkpoint)
.field("checkpoint_threshold", &self.checkpoint_threshold)
.field("max_frame_read_lock_index", &self.max_frame_read_lock_index)
.field("max_frame", &self.max_frame)
.field("min_frame", &self.min_frame)
// Excluding other fields
.finish()
}
}
// TODO(pere): lock only important parts + pin WalFileShared
/// WalFileShared is the part of a WAL that will be shared between threads. A wal has information
/// that needs to be communicated between threads so this struct does the job.
@@ -248,6 +293,21 @@ pub struct WalFileShared {
write_lock: LimboRwLock,
}
impl fmt::Debug for WalFileShared {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalFileShared")
.field("wal_header", &self.wal_header)
.field("min_frame", &self.min_frame)
.field("max_frame", &self.max_frame)
.field("nbackfills", &self.nbackfills)
.field("frame_cache", &self.frame_cache)
.field("pages_in_frames", &self.pages_in_frames)
.field("last_checksum", &self.last_checksum)
// Excluding `file`, `read_locks`, and `write_lock`
.finish()
}
}
impl Wal for WalFile {
/// Begin a read transaction.
fn begin_read_tx(&mut self) -> Result<LimboResult> {
@@ -528,6 +588,13 @@ impl Wal for WalFile {
return Ok(CheckpointStatus::IO);
}
let mut shared = self.shared.write().unwrap();
// Record two num pages fields to return as checkpoint result to caller.
// Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
let checkpoint_result = CheckpointResult {
num_wal_frames: shared.max_frame,
num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
};
let everything_backfilled =
shared.max_frame == self.ongoing_checkpoint.max_frame;
if everything_backfilled {
@@ -541,7 +608,7 @@ impl Wal for WalFile {
shared.nbackfills = self.ongoing_checkpoint.max_frame;
}
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(CheckpointStatus::Done);
return Ok(CheckpointStatus::Done(checkpoint_result));
}
}
}
@@ -572,7 +639,11 @@ impl Wal for WalFile {
Ok(CheckpointStatus::IO)
} else {
self.sync_state.replace(SyncState::NotSyncing);
Ok(CheckpointStatus::Done)
let checkpoint_result = CheckpointResult {
num_wal_frames: self.max_frame,
num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
};
Ok(CheckpointStatus::Done(checkpoint_result))
}
}
}

View File

@@ -33,6 +33,7 @@ use crate::info;
use crate::pseudo::PseudoCursor;
use crate::result::LimboResult;
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::storage::wal::CheckpointResult;
use crate::storage::{btree::BTreeCursor, pager::Pager};
use crate::types::{
AggContext, Cursor, CursorResult, ExternalAggState, OwnedRecord, OwnedValue, Record, SeekKey,
@@ -139,6 +140,7 @@ pub type PageIdx = usize;
// Index of insn in list of insns
type InsnReference = u32;
#[derive(Debug)]
pub enum StepResult<'a> {
Done,
IO,
@@ -470,15 +472,18 @@ impl Program {
} => {
let result = self.connection.upgrade().unwrap().checkpoint();
match result {
Ok(()) => {
Ok(CheckpointResult {
num_wal_frames: num_wal_pages,
num_checkpointed_frames: num_checkpointed_pages,
}) => {
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
// TODO make 2nd and 3rd cols available through checkpoint method
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
state.registers[*dest] = OwnedValue::Integer(0);
// 2nd col: # modified pages written to wal file
state.registers[*dest + 1] = OwnedValue::Integer(0);
state.registers[*dest + 1] = OwnedValue::Integer(num_wal_pages as i64);
// 3rd col: # pages moved to db after checkpoint
state.registers[*dest + 2] = OwnedValue::Integer(0);
state.registers[*dest + 2] =
OwnedValue::Integer(num_checkpointed_pages as i64);
}
Err(_err) => state.registers[*dest] = OwnedValue::Integer(1),
}
@@ -1026,7 +1031,7 @@ impl Program {
return if self.auto_commit {
match pager.end_tx() {
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),
Ok(crate::storage::wal::CheckpointStatus::Done) => {
Ok(crate::storage::wal::CheckpointStatus::Done(_)) => {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());

View File

@@ -13,13 +13,17 @@ pub struct TempDatabase {
#[allow(dead_code, clippy::arc_with_non_send_sync)]
impl TempDatabase {
pub fn new_empty() -> Self {
let mut path = TempDir::new().unwrap().into_path();
path.push("test.db");
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new().unwrap());
Self::new("test.db")
}
pub fn new(db_name: &str) -> Self {
let mut path = TempDir::new().unwrap().into_path();
path.push(db_name);
let io: Arc<dyn IO> = Arc::new(limbo_core::PlatformIO::new().unwrap());
Self { path, io }
}
pub fn new(table_sql: &str) -> Self {
pub fn new_with_rusqlite(table_sql: &str) -> Self {
let mut path = TempDir::new().unwrap().into_path();
path.push("test.db");
{
@@ -47,7 +51,7 @@ impl TempDatabase {
pub(crate) fn do_flush(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
loop {
match conn.cacheflush()? {
CheckpointStatus::Done => {
CheckpointStatus::Done(_) => {
break;
}
CheckpointStatus::IO => {
@@ -86,8 +90,9 @@ mod tests {
#[test]
fn test_statement_columns() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db =
TempDatabase::new("create table test (foo integer, bar integer, baz integer);");
let tmp_db = TempDatabase::new_with_rusqlite(
"create table test (foo integer, bar integer, baz integer);",
);
let conn = tmp_db.connect_limbo();
let stmt = conn.prepare("select * from test;")?;

View File

@@ -4,7 +4,9 @@ use limbo_core::{StepResult, Value};
#[test]
fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test_rowid (id INTEGER PRIMARY KEY, val TEXT);");
let tmp_db = TempDatabase::new_with_rusqlite(
"CREATE TABLE test_rowid (id INTEGER PRIMARY KEY, val TEXT);",
);
let conn = tmp_db.connect_limbo();
// Simple insert
@@ -85,7 +87,8 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
#[test]
fn test_integer_primary_key() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test_rowid (id INTEGER PRIMARY KEY);");
let tmp_db =
TempDatabase::new_with_rusqlite("CREATE TABLE test_rowid (id INTEGER PRIMARY KEY);");
let conn = tmp_db.connect_limbo();
for query in &[

View File

@@ -3,3 +3,4 @@ mod functions;
mod fuzz;
mod pragma;
mod query_processing;
mod wal;

View File

@@ -4,7 +4,7 @@ use limbo_core::{StepResult, Value};
#[test]
fn test_statement_reset_bind() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("create table test (i integer);");
let tmp_db = TempDatabase::new_with_rusqlite("create table test (i integer);");
let conn = tmp_db.connect_limbo();
let mut stmt = conn.prepare("select ?")?;
@@ -41,7 +41,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
#[test]
fn test_statement_bind() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("create table test (i integer);");
let tmp_db = TempDatabase::new_with_rusqlite("create table test (i integer);");
let conn = tmp_db.connect_limbo();
let mut stmt = conn.prepare("select ?, ?1, :named, ?3, ?4")?;

View File

@@ -7,7 +7,8 @@ use std::rc::Rc;
#[test]
fn test_simple_overflow_page() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
let tmp_db =
TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
let conn = tmp_db.connect_limbo();
let mut huge_text = String::new();
@@ -75,7 +76,8 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
#[test]
fn test_sequential_overflow_page() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
let tmp_db =
TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
let conn = tmp_db.connect_limbo();
let iterations = 10_usize;
@@ -152,7 +154,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
fn test_sequential_write() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);");
let conn = tmp_db.connect_limbo();
let list_query = "SELECT * FROM test";
@@ -219,7 +221,7 @@ fn test_sequential_write() -> anyhow::Result<()> {
/// https://github.com/tursodatabase/limbo/pull/679
fn test_regression_multi_row_insert() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x REAL);");
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x REAL);");
let conn = tmp_db.connect_limbo();
let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)";
@@ -284,7 +286,7 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> {
#[test]
fn test_statement_reset() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("create table test (i integer);");
let tmp_db = TempDatabase::new_with_rusqlite("create table test (i integer);");
let conn = tmp_db.connect_limbo();
conn.execute("insert into test values (1)")?;
@@ -323,7 +325,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
#[ignore]
fn test_wal_checkpoint() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);");
// threshold is 1000 by default
let iterations = 1001_usize;
let conn = tmp_db.connect_limbo();
@@ -386,7 +388,7 @@ fn test_wal_checkpoint() -> anyhow::Result<()> {
#[test]
fn test_wal_restart() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);");
// threshold is 1000 by default
fn insert(i: usize, conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {

View File

@@ -0,0 +1 @@
mod test_wal;

View File

@@ -0,0 +1,90 @@
use crate::common::{do_flush, TempDatabase};
use limbo_core::{Connection, LimboError, Result, StepResult, Value};
use std::cell::RefCell;
use std::rc::Rc;
#[allow(clippy::arc_with_non_send_sync)]
#[test]
fn test_wal_checkpoint_result() -> Result<()> {
let tmp_db = TempDatabase::new("test_wal.db");
let conn = tmp_db.connect_limbo();
conn.execute("CREATE TABLE t1 (id text);")?;
let res = execute_and_get_strings(&tmp_db, &conn, "pragma journal_mode;")?;
assert_eq!(res, vec!["wal"]);
conn.execute("insert into t1(id) values (1), (2);")?;
do_flush(&conn, &tmp_db).unwrap();
conn.execute("select * from t1;")?;
do_flush(&conn, &tmp_db).unwrap();
// checkpoint result should return > 0 num pages now as database has data
let res = execute_and_get_ints(&tmp_db, &conn, "pragma wal_checkpoint;")?;
println!("'pragma wal_checkpoint;' returns: {res:?}");
assert_eq!(res.len(), 3);
assert_eq!(res[0], 0); // checkpoint successfully
assert!(res[1] > 0); // num pages in wal
assert!(res[2] > 0); // num pages checkpointed successfully
Ok(())
}
/// Execute a statement and get strings result
pub(crate) fn execute_and_get_strings(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
sql: &str,
) -> Result<Vec<String>> {
let statement = conn.prepare(sql)?;
let stmt = Rc::new(RefCell::new(statement));
let mut result = Vec::new();
while let Ok(step_result) = stmt.borrow_mut().step() {
match step_result {
StepResult::Row(row) => {
for el in &row.values {
result.push(format!("{el}"));
}
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::IO => tmp_db.io.run_once()?,
StepResult::Busy => tmp_db.io.run_once()?,
}
}
Ok(result)
}
/// Execute a statement and get integers
pub(crate) fn execute_and_get_ints(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
sql: &str,
) -> Result<Vec<i64>> {
let statement = conn.prepare(sql)?;
let stmt = Rc::new(RefCell::new(statement));
let mut result = Vec::new();
while let Ok(step_result) = stmt.borrow_mut().step() {
match step_result {
StepResult::Row(row) => {
for value in &row.values {
let out = match value {
Value::Integer(i) => *i,
_ => {
return Err(LimboError::ConversionError(format!(
"cannot convert {value} to int"
)))
}
};
result.push(out);
}
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::IO => tmp_db.io.run_once()?,
StepResult::Busy => tmp_db.io.run_once()?,
}
}
Ok(result)
}