mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-05 09:14:24 +01:00
Merge 'Wal' from Pere Diaz Bou
This pr introduces first iteration of WAL with automatic checkpointing of 1000 pages (not configurable right now). Closes #344
This commit is contained in:
@@ -4,6 +4,7 @@ use limbo_core::IO;
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::PyList;
|
||||
use pyo3::types::PyTuple;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
mod errors;
|
||||
@@ -198,7 +199,7 @@ fn stmt_is_dml(sql: &str) -> bool {
|
||||
#[pyclass]
|
||||
#[derive(Clone)]
|
||||
pub struct Connection {
|
||||
conn: Arc<Mutex<limbo_core::Connection>>,
|
||||
conn: Arc<Mutex<Rc<limbo_core::Connection>>>,
|
||||
io: Arc<limbo_core::PlatformIO>,
|
||||
}
|
||||
|
||||
@@ -238,7 +239,7 @@ pub fn connect(path: &str) -> Result<Connection> {
|
||||
})?);
|
||||
let db = limbo_core::Database::open_file(io.clone(), path)
|
||||
.map_err(|e| PyErr::new::<DatabaseError, _>(format!("Failed to open database: {:?}", e)))?;
|
||||
let conn: limbo_core::Connection = db.connect();
|
||||
let conn: Rc<limbo_core::Connection> = db.connect();
|
||||
Ok(Connection {
|
||||
conn: Arc::new(Mutex::new(conn)),
|
||||
io,
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use limbo_core::{Result, IO};
|
||||
use limbo_core::{OpenFlags, Page, Result, IO};
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use wasm_bindgen::prelude::*;
|
||||
|
||||
#[wasm_bindgen]
|
||||
pub struct Database {
|
||||
_inner: limbo_core::Database,
|
||||
_inner: Rc<limbo_core::Database>,
|
||||
}
|
||||
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
@@ -14,9 +15,9 @@ impl Database {
|
||||
#[wasm_bindgen(constructor)]
|
||||
pub fn new(path: &str) -> Database {
|
||||
let io = Arc::new(PlatformIO { vfs: VFS::new() });
|
||||
let file = io.open_file(path).unwrap();
|
||||
let file = io.open_file(path, limbo_core::OpenFlags::None).unwrap();
|
||||
let page_io = Rc::new(DatabaseStorage::new(file));
|
||||
let wal = Rc::new(Wal {});
|
||||
let wal = Rc::new(RefCell::new(Wal {}));
|
||||
let inner = limbo_core::Database::open(io, page_io, wal).unwrap();
|
||||
Database { _inner: inner }
|
||||
}
|
||||
@@ -51,7 +52,7 @@ impl limbo_core::File for File {
|
||||
fn pread(&self, pos: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
|
||||
let r = match &*c {
|
||||
limbo_core::Completion::Read(r) => r,
|
||||
limbo_core::Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
{
|
||||
let mut buf = r.buf_mut();
|
||||
@@ -71,6 +72,14 @@ impl limbo_core::File for File {
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn sync(&self, _c: Rc<limbo_core::Completion>) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PlatformIO {
|
||||
@@ -78,7 +87,7 @@ pub struct PlatformIO {
|
||||
}
|
||||
|
||||
impl limbo_core::IO for PlatformIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn limbo_core::File>> {
|
||||
fn open_file(&self, path: &str, _flags: OpenFlags) -> Result<Rc<dyn limbo_core::File>> {
|
||||
let fd = self.vfs.open(path);
|
||||
Ok(Rc::new(File {
|
||||
vfs: VFS::new(),
|
||||
@@ -127,11 +136,13 @@ impl DatabaseStorage {
|
||||
}
|
||||
}
|
||||
|
||||
struct BufferPool {}
|
||||
|
||||
impl limbo_core::DatabaseStorage for DatabaseStorage {
|
||||
fn read_page(&self, page_idx: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
|
||||
let r = match &(*c) {
|
||||
limbo_core::Completion::Read(r) => r,
|
||||
limbo_core::Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
@@ -151,6 +162,10 @@ impl limbo_core::DatabaseStorage for DatabaseStorage {
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn sync(&self, _c: Rc<limbo_core::Completion>) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Wal {}
|
||||
@@ -168,13 +183,48 @@ impl limbo_core::Wal for Wal {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn begin_write_tx(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn end_write_tx(&self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn read_frame(
|
||||
&self,
|
||||
_frame_id: u64,
|
||||
_page: Rc<std::cell::RefCell<limbo_core::Page>>,
|
||||
_buffer_pool: Rc<limbo_core::BufferPool>,
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
_page: Rc<RefCell<Page>>,
|
||||
_db_size: u32,
|
||||
_pager: &limbo_core::Pager,
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
_pager: &limbo_core::Pager,
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<limbo_core::CheckpointStatus> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn sync(&mut self) -> Result<limbo_core::CheckpointStatus> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[wasm_bindgen(module = "/vfs.js")]
|
||||
|
||||
31
cli/main.rs
31
cli/main.rs
@@ -6,6 +6,7 @@ use limbo_core::{Database, RowResult, Value};
|
||||
use opcodes_dictionary::OPCODE_DESCRIPTIONS;
|
||||
use rustyline::{error::ReadlineError, DefaultEditor};
|
||||
use std::path::PathBuf;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -147,7 +148,7 @@ Note:
|
||||
|
||||
fn handle_dot_command(
|
||||
io: Arc<dyn limbo_core::IO>,
|
||||
conn: &limbo_core::Connection,
|
||||
conn: &Rc<limbo_core::Connection>,
|
||||
line: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: Vec<&str> = line.split_whitespace().collect();
|
||||
@@ -196,7 +197,7 @@ fn handle_dot_command(
|
||||
|
||||
fn display_schema(
|
||||
io: Arc<dyn limbo_core::IO>,
|
||||
conn: &limbo_core::Connection,
|
||||
conn: &Rc<limbo_core::Connection>,
|
||||
table: Option<&str>,
|
||||
) -> anyhow::Result<()> {
|
||||
let sql = match table {
|
||||
@@ -251,7 +252,7 @@ fn display_schema(
|
||||
|
||||
fn query(
|
||||
io: Arc<dyn limbo_core::IO>,
|
||||
conn: &limbo_core::Connection,
|
||||
conn: &Rc<limbo_core::Connection>,
|
||||
sql: &str,
|
||||
output_mode: &OutputMode,
|
||||
interrupt_count: &Arc<AtomicUsize>,
|
||||
@@ -264,8 +265,8 @@ fn query(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match rows.next_row()? {
|
||||
RowResult::Row(row) => {
|
||||
match rows.next_row() {
|
||||
Ok(RowResult::Row(row)) => {
|
||||
for (i, value) in row.values.iter().enumerate() {
|
||||
if i > 0 {
|
||||
print!("|");
|
||||
@@ -282,10 +283,14 @@ fn query(
|
||||
}
|
||||
println!();
|
||||
}
|
||||
RowResult::IO => {
|
||||
Ok(RowResult::IO) => {
|
||||
io.run_once()?;
|
||||
}
|
||||
RowResult::Done => {
|
||||
Ok(RowResult::Done) => {
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -297,8 +302,8 @@ fn query(
|
||||
}
|
||||
let mut table_rows: Vec<Vec<_>> = vec![];
|
||||
loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::Row(row) => {
|
||||
match rows.next_row() {
|
||||
Ok(RowResult::Row(row)) => {
|
||||
table_rows.push(
|
||||
row.values
|
||||
.iter()
|
||||
@@ -314,10 +319,14 @@ fn query(
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
RowResult::IO => {
|
||||
Ok(RowResult::IO) => {
|
||||
io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
Ok(RowResult::Done) => break,
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
let table = table_rows.table();
|
||||
|
||||
@@ -13,7 +13,7 @@ pub mod tests {
|
||||
// Parent process opens the file
|
||||
let io1 = create_io().expect("Failed to create IO");
|
||||
let _file1 = io1
|
||||
.open_file(&path)
|
||||
.open_file(&path, crate::io::OpenFlags::None)
|
||||
.expect("Failed to open file in parent process");
|
||||
|
||||
let current_exe = std::env::current_exe().expect("Failed to get current executable path");
|
||||
@@ -38,7 +38,7 @@ pub mod tests {
|
||||
if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() {
|
||||
let path = std::env::var("RUST_TEST_FILE_PATH")?;
|
||||
let io = create_io()?;
|
||||
match io.open_file(&path) {
|
||||
match io.open_file(&path, crate::io::OpenFlags::None) {
|
||||
Ok(_) => std::process::exit(0),
|
||||
Err(_) => std::process::exit(1),
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::error::LimboError;
|
||||
use crate::io::common;
|
||||
use crate::Result;
|
||||
|
||||
use super::{Completion, File, IO};
|
||||
use super::{Completion, File, OpenFlags, IO};
|
||||
use libc::{c_short, fcntl, flock, F_SETLK};
|
||||
use log::trace;
|
||||
use polling::{Event, Events, Poller};
|
||||
@@ -12,6 +12,7 @@ use rustix::io::Errno;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Read, Seek, Write};
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::rc::Rc;
|
||||
|
||||
pub struct DarwinIO {
|
||||
@@ -31,12 +32,13 @@ impl DarwinIO {
|
||||
}
|
||||
|
||||
impl IO for DarwinIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
|
||||
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
let file = std::fs::File::options()
|
||||
.read(true)
|
||||
.custom_flags(libc::O_NONBLOCK)
|
||||
.write(true)
|
||||
.create(matches!(flags, OpenFlags::Create))
|
||||
.open(path)?;
|
||||
|
||||
let darwin_file = Rc::new(DarwinFile {
|
||||
@@ -70,7 +72,7 @@ impl IO for DarwinIO {
|
||||
let c: &Completion = c;
|
||||
let r = match c {
|
||||
Completion::Read(r) => r,
|
||||
Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut buf = r.buf_mut();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
@@ -153,7 +155,9 @@ impl File for DarwinFile {
|
||||
if lock_result == -1 {
|
||||
let err = std::io::Error::last_os_error();
|
||||
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||
return Err(LimboError::LockingError("Failed locking file. File is locked by another process".to_string()));
|
||||
return Err(LimboError::LockingError(
|
||||
"Failed locking file. File is locked by another process".to_string(),
|
||||
));
|
||||
} else {
|
||||
return Err(LimboError::LockingError(format!(
|
||||
"Failed locking file, {}",
|
||||
@@ -184,12 +188,12 @@ impl File for DarwinFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
|
||||
let file = self.file.borrow();
|
||||
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
|
||||
let file = self.file.borrow();
|
||||
let result = {
|
||||
let r = match &(*c) {
|
||||
Completion::Read(r) => r,
|
||||
Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut buf = r.buf_mut();
|
||||
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
|
||||
@@ -256,6 +260,24 @@ impl File for DarwinFile {
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()> {
|
||||
let file = self.file.borrow();
|
||||
let result = rustix::fs::fsync(file.as_fd());
|
||||
match result {
|
||||
std::result::Result::Ok(()) => {
|
||||
trace!("fsync");
|
||||
c.complete(0);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
let file = self.file.borrow();
|
||||
Ok(file.metadata().unwrap().len())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DarwinFile {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{Completion, File, Result, IO};
|
||||
use crate::{Completion, File, LimboError, OpenFlags, Result, IO};
|
||||
use log::trace;
|
||||
use std::cell::RefCell;
|
||||
use std::io::{Read, Seek, Write};
|
||||
@@ -13,7 +13,7 @@ impl GenericIO {
|
||||
}
|
||||
|
||||
impl IO for GenericIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
|
||||
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
let file = std::fs::File::open(path)?;
|
||||
Ok(Rc::new(GenericFile {
|
||||
@@ -57,7 +57,7 @@ impl File for GenericFile {
|
||||
{
|
||||
let r = match &(*c) {
|
||||
Completion::Read(r) => r,
|
||||
Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut buf = r.buf_mut();
|
||||
let buf = buf.as_mut_slice();
|
||||
@@ -78,8 +78,21 @@ impl File for GenericFile {
|
||||
let buf = buffer.borrow();
|
||||
let buf = buf.as_slice();
|
||||
file.write_all(buf)?;
|
||||
c.complete(buf.len() as i32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()> {
|
||||
let mut file = self.file.borrow_mut();
|
||||
file.sync_all().map_err(|err| LimboError::IOError(err))?;
|
||||
c.complete(0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
let file = self.file.borrow();
|
||||
Ok(file.metadata().unwrap().len())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for GenericFile {
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use super::{common, Completion, File, IO};
|
||||
use super::{common, Completion, File, OpenFlags, IO};
|
||||
use crate::{LimboError, Result};
|
||||
use libc::{c_short, fcntl, flock, iovec, F_SETLK};
|
||||
use log::{debug, trace};
|
||||
use nix::fcntl::{FcntlArg, OFlag};
|
||||
use std::cell::RefCell;
|
||||
use std::fmt;
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::rc::Rc;
|
||||
use thiserror::Error;
|
||||
@@ -48,7 +49,10 @@ impl LinuxIO {
|
||||
pub fn new() -> Result<Self> {
|
||||
let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?;
|
||||
let inner = InnerLinuxIO {
|
||||
ring: WrappedIOUring{ring, pending_ops: 0},
|
||||
ring: WrappedIOUring {
|
||||
ring,
|
||||
pending_ops: 0,
|
||||
},
|
||||
iovecs: [iovec {
|
||||
iov_base: std::ptr::null_mut(),
|
||||
iov_len: 0,
|
||||
@@ -74,7 +78,8 @@ impl InnerLinuxIO {
|
||||
impl WrappedIOUring {
|
||||
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
|
||||
unsafe {
|
||||
self.ring.submission()
|
||||
self.ring
|
||||
.submission()
|
||||
.push(entry)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
@@ -102,9 +107,13 @@ impl WrappedIOUring {
|
||||
}
|
||||
|
||||
impl IO for LinuxIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
|
||||
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
let file = std::fs::File::options().read(true).write(true).open(path)?;
|
||||
let file = std::fs::File::options()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(matches!(flags, OpenFlags::Create))
|
||||
.open(path)?;
|
||||
// Let's attempt to enable direct I/O. Not all filesystems support it
|
||||
// so ignore any errors.
|
||||
let fd = file.as_raw_fd();
|
||||
@@ -128,7 +137,7 @@ impl IO for LinuxIO {
|
||||
let ring = &mut inner.ring;
|
||||
|
||||
if ring.empty() {
|
||||
return Ok(())
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
ring.wait_for_completion()?;
|
||||
@@ -216,7 +225,7 @@ impl File for LinuxFile {
|
||||
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
|
||||
let r = match &(*c) {
|
||||
Completion::Read(r) => r,
|
||||
Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
|
||||
let fd = io_uring::types::Fd(self.file.as_raw_fd());
|
||||
@@ -256,6 +265,21 @@ impl File for LinuxFile {
|
||||
io.ring.submit_entry(&write);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()> {
|
||||
let fd = io_uring::types::Fd(self.file.as_raw_fd());
|
||||
let ptr = Rc::into_raw(c.clone());
|
||||
let sync = io_uring::opcode::Fsync::new(fd)
|
||||
.build()
|
||||
.user_data(ptr as u64);
|
||||
let mut io = self.io.borrow_mut();
|
||||
io.ring.submit_entry(&sync);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
Ok(self.file.metadata().unwrap().len())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LinuxFile {
|
||||
|
||||
@@ -14,10 +14,17 @@ pub trait File {
|
||||
fn unlock_file(&self) -> Result<()>;
|
||||
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
|
||||
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()>;
|
||||
fn size(&self) -> Result<u64>;
|
||||
}
|
||||
|
||||
pub enum OpenFlags {
|
||||
None,
|
||||
Create,
|
||||
}
|
||||
|
||||
pub trait IO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>>;
|
||||
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>>;
|
||||
|
||||
fn run_once(&self) -> Result<()>;
|
||||
|
||||
@@ -28,10 +35,12 @@ pub trait IO {
|
||||
|
||||
pub type Complete = dyn Fn(Rc<RefCell<Buffer>>);
|
||||
pub type WriteComplete = dyn Fn(i32);
|
||||
pub type SyncComplete = dyn Fn(i32);
|
||||
|
||||
pub enum Completion {
|
||||
Read(ReadCompletion),
|
||||
Write(WriteCompletion),
|
||||
Sync(SyncCompletion),
|
||||
}
|
||||
|
||||
pub struct ReadCompletion {
|
||||
@@ -43,7 +52,8 @@ impl Completion {
|
||||
pub fn complete(&self, result: i32) {
|
||||
match self {
|
||||
Completion::Read(r) => r.complete(),
|
||||
Completion::Write(w) => w.complete(result), // fix
|
||||
Completion::Write(w) => w.complete(result),
|
||||
Completion::Sync(s) => s.complete(result), // fix
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -52,6 +62,10 @@ pub struct WriteCompletion {
|
||||
pub complete: Box<WriteComplete>,
|
||||
}
|
||||
|
||||
pub struct SyncCompletion {
|
||||
pub complete: Box<SyncComplete>,
|
||||
}
|
||||
|
||||
impl ReadCompletion {
|
||||
pub fn new(buf: Rc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
|
||||
Self { buf, complete }
|
||||
@@ -74,11 +88,22 @@ impl WriteCompletion {
|
||||
pub fn new(complete: Box<WriteComplete>) -> Self {
|
||||
Self { complete }
|
||||
}
|
||||
|
||||
pub fn complete(&self, bytes_written: i32) {
|
||||
(self.complete)(bytes_written);
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncCompletion {
|
||||
pub fn new(complete: Box<SyncComplete>) -> Self {
|
||||
Self { complete }
|
||||
}
|
||||
|
||||
pub fn complete(&self, res: i32) {
|
||||
(self.complete)(res);
|
||||
}
|
||||
}
|
||||
|
||||
pub type BufferData = Pin<Vec<u8>>;
|
||||
|
||||
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{Completion, File, Result, WriteCompletion, IO};
|
||||
use crate::{Completion, File, LimboError, OpenFlags, Result, WriteCompletion, IO};
|
||||
use log::trace;
|
||||
use std::cell::RefCell;
|
||||
use std::io::{Read, Seek, Write};
|
||||
@@ -13,9 +13,13 @@ impl WindowsIO {
|
||||
}
|
||||
|
||||
impl IO for WindowsIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
|
||||
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
let file = std::fs::File::open(path)?;
|
||||
let file = std::fs::File::options()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(matches!(flags, OpenFlags::Create))
|
||||
.open(path)?;
|
||||
Ok(Rc::new(WindowsFile {
|
||||
file: RefCell::new(file),
|
||||
}))
|
||||
@@ -55,7 +59,7 @@ impl File for WindowsFile {
|
||||
{
|
||||
let r = match &(*c) {
|
||||
Completion::Read(r) => r,
|
||||
Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut buf = r.buf_mut();
|
||||
let buf = buf.as_mut_slice();
|
||||
@@ -76,6 +80,19 @@ impl File for WindowsFile {
|
||||
let buf = buffer.borrow();
|
||||
let buf = buf.as_slice();
|
||||
file.write_all(buf)?;
|
||||
c.complete(buffer.borrow().len() as i32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()> {
|
||||
let mut file = self.file.borrow_mut();
|
||||
file.sync_all().map_err(|err| LimboError::IOError(err))?;
|
||||
c.complete(0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
let file = self.file.borrow();
|
||||
Ok(file.metadata().unwrap().len())
|
||||
}
|
||||
}
|
||||
|
||||
69
core/lib.rs
69
core/lib.rs
@@ -19,11 +19,11 @@ use log::trace;
|
||||
use schema::Schema;
|
||||
use sqlite3_parser::ast;
|
||||
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
|
||||
use std::rc::Weak;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
#[cfg(feature = "fs")]
|
||||
use storage::database::FileStorage;
|
||||
use storage::pager::Pager;
|
||||
use storage::sqlite3_ondisk::DatabaseHeader;
|
||||
#[cfg(feature = "fs")]
|
||||
use storage::wal::WalFile;
|
||||
@@ -34,43 +34,61 @@ use translate::planner::prepare_select_plan;
|
||||
pub use error::LimboError;
|
||||
pub type Result<T> = std::result::Result<T, error::LimboError>;
|
||||
|
||||
pub use io::OpenFlags;
|
||||
#[cfg(feature = "fs")]
|
||||
pub use io::PlatformIO;
|
||||
pub use io::{Buffer, Completion, File, WriteCompletion, IO};
|
||||
pub use storage::buffer_pool::BufferPool;
|
||||
pub use storage::database::DatabaseStorage;
|
||||
pub use storage::pager::Page;
|
||||
pub use storage::pager::Pager;
|
||||
pub use storage::wal::CheckpointStatus;
|
||||
pub use storage::wal::Wal;
|
||||
pub use types::Value;
|
||||
|
||||
pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
|
||||
|
||||
#[derive(Clone)]
|
||||
enum TransactionState {
|
||||
Write,
|
||||
Read,
|
||||
None,
|
||||
}
|
||||
|
||||
pub struct Database {
|
||||
pager: Rc<Pager>,
|
||||
schema: Rc<Schema>,
|
||||
header: Rc<RefCell<DatabaseHeader>>,
|
||||
transaction_state: RefCell<TransactionState>,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
#[cfg(feature = "fs")]
|
||||
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Database> {
|
||||
let file = io.open_file(path)?;
|
||||
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Rc<Database>> {
|
||||
let file = io.open_file(path, io::OpenFlags::None)?;
|
||||
let page_io = Rc::new(FileStorage::new(file));
|
||||
let wal_path = format!("{}-wal", path);
|
||||
let wal = Rc::new(WalFile::new(io.clone(), wal_path));
|
||||
let db_header = Pager::begin_open(page_io.clone())?;
|
||||
io.run_once()?;
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
io.clone(),
|
||||
wal_path,
|
||||
db_header.borrow().page_size as usize,
|
||||
)));
|
||||
Self::open(io, page_io, wal)
|
||||
}
|
||||
|
||||
pub fn open(
|
||||
io: Arc<dyn IO>,
|
||||
page_io: Rc<dyn DatabaseStorage>,
|
||||
wal: Rc<dyn Wal>,
|
||||
) -> Result<Database> {
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
) -> Result<Rc<Database>> {
|
||||
let db_header = Pager::begin_open(page_io.clone())?;
|
||||
io.run_once()?;
|
||||
DATABASE_VERSION.get_or_init(|| {
|
||||
let version = db_header.borrow().version_number;
|
||||
version.to_string()
|
||||
});
|
||||
io.run_once()?;
|
||||
let pager = Rc::new(Pager::finish_open(
|
||||
db_header.clone(),
|
||||
page_io,
|
||||
@@ -78,11 +96,12 @@ impl Database {
|
||||
io.clone(),
|
||||
)?);
|
||||
let bootstrap_schema = Rc::new(Schema::new());
|
||||
let conn = Connection {
|
||||
let conn = Rc::new(Connection {
|
||||
pager: pager.clone(),
|
||||
schema: bootstrap_schema.clone(),
|
||||
header: db_header.clone(),
|
||||
};
|
||||
db: Weak::new(),
|
||||
});
|
||||
let mut schema = Schema::new();
|
||||
let rows = conn.query("SELECT * FROM sqlite_schema")?;
|
||||
if let Some(mut rows) = rows {
|
||||
@@ -126,19 +145,21 @@ impl Database {
|
||||
}
|
||||
let schema = Rc::new(schema);
|
||||
let header = db_header;
|
||||
Ok(Database {
|
||||
Ok(Rc::new(Database {
|
||||
pager,
|
||||
schema,
|
||||
header,
|
||||
})
|
||||
transaction_state: RefCell::new(TransactionState::None),
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn connect(&self) -> Connection {
|
||||
Connection {
|
||||
pub fn connect(self: &Rc<Database>) -> Rc<Connection> {
|
||||
Rc::new(Connection {
|
||||
pager: self.pager.clone(),
|
||||
schema: self.schema.clone(),
|
||||
header: self.header.clone(),
|
||||
}
|
||||
db: Rc::downgrade(self),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,10 +167,11 @@ pub struct Connection {
|
||||
pager: Rc<Pager>,
|
||||
schema: Rc<Schema>,
|
||||
header: Rc<RefCell<DatabaseHeader>>,
|
||||
db: Weak<Database>, // backpointer to the database holding this connection
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn prepare(&self, sql: impl Into<String>) -> Result<Statement> {
|
||||
pub fn prepare(self: &Rc<Connection>, sql: impl Into<String>) -> Result<Statement> {
|
||||
let sql = sql.into();
|
||||
trace!("Preparing: {}", sql);
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
@@ -162,6 +184,7 @@ impl Connection {
|
||||
stmt,
|
||||
self.header.clone(),
|
||||
self.pager.clone(),
|
||||
Rc::downgrade(self),
|
||||
)?);
|
||||
Ok(Statement::new(program, self.pager.clone()))
|
||||
}
|
||||
@@ -173,7 +196,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query(&self, sql: impl Into<String>) -> Result<Option<Rows>> {
|
||||
pub fn query(self: &Rc<Connection>, sql: impl Into<String>) -> Result<Option<Rows>> {
|
||||
let sql = sql.into();
|
||||
trace!("Querying: {}", sql);
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
@@ -186,6 +209,7 @@ impl Connection {
|
||||
stmt,
|
||||
self.header.clone(),
|
||||
self.pager.clone(),
|
||||
Rc::downgrade(&self),
|
||||
)?);
|
||||
let stmt = Statement::new(program, self.pager.clone());
|
||||
Ok(Some(Rows { stmt }))
|
||||
@@ -196,6 +220,7 @@ impl Connection {
|
||||
stmt,
|
||||
self.header.clone(),
|
||||
self.pager.clone(),
|
||||
Rc::downgrade(self),
|
||||
)?;
|
||||
program.explain();
|
||||
Ok(None)
|
||||
@@ -217,7 +242,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn execute(&self, sql: impl Into<String>) -> Result<()> {
|
||||
pub fn execute(self: &Rc<Connection>, sql: impl Into<String>) -> Result<()> {
|
||||
let sql = sql.into();
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
let cmd = parser.next()?;
|
||||
@@ -229,6 +254,7 @@ impl Connection {
|
||||
stmt,
|
||||
self.header.clone(),
|
||||
self.pager.clone(),
|
||||
Rc::downgrade(self),
|
||||
)?;
|
||||
program.explain();
|
||||
}
|
||||
@@ -239,6 +265,7 @@ impl Connection {
|
||||
stmt,
|
||||
self.header.clone(),
|
||||
self.pager.clone(),
|
||||
Rc::downgrade(self),
|
||||
)?;
|
||||
let mut state = vdbe::ProgramState::new(program.max_registers);
|
||||
program.step(&mut state, self.pager.clone())?;
|
||||
@@ -248,8 +275,12 @@ impl Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn cacheflush(&self) -> Result<()> {
|
||||
self.pager.cacheflush()?;
|
||||
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
|
||||
self.pager.cacheflush()
|
||||
}
|
||||
|
||||
pub fn clear_page_cache(&self) -> Result<()> {
|
||||
self.pager.clear_page_cache();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ pub trait DatabaseStorage {
|
||||
buffer: Rc<RefCell<Buffer>>,
|
||||
c: Rc<Completion>,
|
||||
) -> Result<()>;
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
@@ -26,7 +27,7 @@ impl DatabaseStorage for FileStorage {
|
||||
fn read_page(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
|
||||
let r = match &(*c) {
|
||||
Completion::Read(r) => r,
|
||||
Completion::Write(_) => unreachable!(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
@@ -52,6 +53,10 @@ impl DatabaseStorage for FileStorage {
|
||||
self.file.pwrite(pos, buffer, c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()> {
|
||||
self.file.sync(c)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
|
||||
@@ -13,6 +13,8 @@ use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::wal::CheckpointStatus;
|
||||
|
||||
pub struct Page {
|
||||
pub flags: AtomicUsize,
|
||||
pub contents: RwLock<Option<PageContent>>,
|
||||
@@ -230,6 +232,13 @@ impl DumbLruPageCache {
|
||||
}
|
||||
self.detach(tail);
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
let to_remove: Vec<usize> = self.map.borrow().iter().map(|v| *v.0).collect();
|
||||
for key in to_remove {
|
||||
self.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -256,6 +265,23 @@ impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum FlushState {
|
||||
Start,
|
||||
SyncWal,
|
||||
Checkpoint,
|
||||
CheckpointDone,
|
||||
SyncDbFile,
|
||||
WaitSyncDbFile,
|
||||
}
|
||||
|
||||
/// This will keep track of the state of current cache flush in order to not repeat work
|
||||
struct FlushInfo {
|
||||
state: FlushState,
|
||||
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
|
||||
in_flight_writes: Rc<RefCell<usize>>,
|
||||
}
|
||||
|
||||
/// The pager interface implements the persistence layer by providing access
|
||||
/// to pages of the database file, including caching, concurrency control, and
|
||||
/// transaction management.
|
||||
@@ -263,7 +289,7 @@ pub struct Pager {
|
||||
/// Source of the database pages.
|
||||
pub page_io: Rc<dyn DatabaseStorage>,
|
||||
/// The write-ahead log (WAL) for the database.
|
||||
wal: Rc<dyn Wal>,
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
/// A page cache for the database.
|
||||
page_cache: RefCell<DumbLruPageCache>,
|
||||
/// Buffer pool for temporary data storage.
|
||||
@@ -272,6 +298,9 @@ pub struct Pager {
|
||||
pub io: Arc<dyn crate::io::IO>,
|
||||
dirty_pages: Rc<RefCell<HashSet<usize>>>,
|
||||
db_header: Rc<RefCell<DatabaseHeader>>,
|
||||
|
||||
flush_info: RefCell<FlushInfo>,
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
}
|
||||
|
||||
impl Pager {
|
||||
@@ -284,7 +313,7 @@ impl Pager {
|
||||
pub fn finish_open(
|
||||
db_header_ref: Rc<RefCell<DatabaseHeader>>,
|
||||
page_io: Rc<dyn DatabaseStorage>,
|
||||
wal: Rc<dyn Wal>,
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
) -> Result<Self> {
|
||||
let db_header = RefCell::borrow(&db_header_ref);
|
||||
@@ -299,19 +328,30 @@ impl Pager {
|
||||
io,
|
||||
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
|
||||
db_header: db_header_ref.clone(),
|
||||
flush_info: RefCell::new(FlushInfo {
|
||||
state: FlushState::Start,
|
||||
in_flight_writes: Rc::new(RefCell::new(0)),
|
||||
}),
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn begin_read_tx(&self) -> Result<()> {
|
||||
self.wal.begin_read_tx()?;
|
||||
self.wal.borrow().begin_read_tx()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn end_read_tx(&self) -> Result<()> {
|
||||
self.wal.end_read_tx()?;
|
||||
pub fn begin_write_tx(&self) -> Result<()> {
|
||||
self.wal.borrow().begin_read_tx()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn end_tx(&self) -> Result<CheckpointStatus> {
|
||||
self.cacheflush()?;
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
Ok(CheckpointStatus::Done)
|
||||
}
|
||||
|
||||
/// Reads a page from the database.
|
||||
pub fn read_page(&self, page_idx: usize) -> crate::Result<Rc<RefCell<Page>>> {
|
||||
trace!("read_page(page_idx = {})", page_idx);
|
||||
@@ -321,8 +361,10 @@ impl Pager {
|
||||
}
|
||||
let page = Rc::new(RefCell::new(Page::new(page_idx)));
|
||||
RefCell::borrow(&page).set_locked();
|
||||
if let Some(frame_id) = self.wal.find_frame(page_idx as u64)? {
|
||||
self.wal.read_frame(frame_id, page.clone())?;
|
||||
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
|
||||
self.wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
let page = page.borrow_mut();
|
||||
page.set_uptodate();
|
||||
@@ -356,19 +398,91 @@ impl Pager {
|
||||
dirty_pages.insert(page_id);
|
||||
}
|
||||
|
||||
pub fn cacheflush(&self) -> Result<()> {
|
||||
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
|
||||
if dirty_pages.len() == 0 {
|
||||
return Ok(());
|
||||
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
|
||||
loop {
|
||||
let state = self.flush_info.borrow().state.clone();
|
||||
match state {
|
||||
FlushState::Start => {
|
||||
let db_size = self.db_header.borrow().database_size;
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.borrow_mut();
|
||||
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
self.wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
db_size,
|
||||
self,
|
||||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
)?;
|
||||
}
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
self.flush_info.borrow_mut().state = FlushState::SyncWal;
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
FlushState::Checkpoint => {
|
||||
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
|
||||
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
|
||||
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
|
||||
CheckpointStatus::Done => {
|
||||
self.flush_info.borrow_mut().state = FlushState::CheckpointDone;
|
||||
}
|
||||
};
|
||||
}
|
||||
FlushState::CheckpointDone => {
|
||||
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
|
||||
if in_flight == 0 {
|
||||
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
|
||||
} else {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
}
|
||||
FlushState::SyncWal => {
|
||||
match self.wal.borrow_mut().sync() {
|
||||
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
|
||||
Ok(CheckpointStatus::Done) => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
let should_checkpoint = self.wal.borrow().should_checkpoint();
|
||||
if should_checkpoint {
|
||||
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
|
||||
} else {
|
||||
self.flush_info.borrow_mut().state = FlushState::Start;
|
||||
break;
|
||||
}
|
||||
}
|
||||
FlushState::SyncDbFile => {
|
||||
sqlite3_ondisk::begin_sync(self.page_io.clone(), self.syncing.clone())?;
|
||||
self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile;
|
||||
}
|
||||
FlushState::WaitSyncDbFile => {
|
||||
if *self.syncing.borrow() {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
} else {
|
||||
self.flush_info.borrow_mut().state = FlushState::Start;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for page_id in dirty_pages.iter() {
|
||||
let mut cache = self.page_cache.borrow_mut();
|
||||
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
sqlite3_ondisk::begin_write_btree_page(self, &page)?;
|
||||
Ok(CheckpointStatus::Done)
|
||||
}
|
||||
|
||||
// WARN: used for testing purposes
|
||||
pub fn clear_page_cache(&self) {
|
||||
loop {
|
||||
match self
|
||||
.wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, Rc::new(RefCell::new(0)))
|
||||
{
|
||||
Ok(CheckpointStatus::IO) => {}
|
||||
Ok(CheckpointStatus::Done) => {
|
||||
break;
|
||||
}
|
||||
Err(err) => panic!("error while clearing cache {}", err),
|
||||
}
|
||||
}
|
||||
dirty_pages.clear();
|
||||
self.io.run_once()?;
|
||||
Ok(())
|
||||
self.page_cache.borrow_mut().clear();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -382,14 +496,23 @@ impl Pager {
|
||||
header.database_size += 1;
|
||||
{
|
||||
// update database size
|
||||
let first_page_ref = self.read_page(1).unwrap();
|
||||
let first_page = RefCell::borrow_mut(&first_page_ref);
|
||||
first_page.set_dirty();
|
||||
self.add_dirty(1);
|
||||
// read sync for now
|
||||
loop {
|
||||
let first_page_ref = self.read_page(1)?;
|
||||
let first_page = RefCell::borrow_mut(&first_page_ref);
|
||||
if first_page.is_locked() {
|
||||
drop(first_page);
|
||||
self.io.run_once()?;
|
||||
continue;
|
||||
}
|
||||
first_page.set_dirty();
|
||||
self.add_dirty(1);
|
||||
|
||||
let contents = first_page.contents.write().unwrap();
|
||||
let contents = contents.as_ref().unwrap();
|
||||
contents.write_database_header(&header);
|
||||
let contents = first_page.contents.write().unwrap();
|
||||
let contents = contents.as_ref().unwrap();
|
||||
contents.write_database_header(&header);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let page_ref = Rc::new(RefCell::new(Page::new(0)));
|
||||
|
||||
@@ -42,7 +42,7 @@
|
||||
//! https://www.sqlite.org/fileformat.html
|
||||
|
||||
use crate::error::LimboError;
|
||||
use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion};
|
||||
use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion};
|
||||
use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::pager::{Page, Pager};
|
||||
@@ -87,16 +87,19 @@ pub struct DatabaseHeader {
|
||||
pub version_number: u32,
|
||||
}
|
||||
|
||||
pub const WAL_HEADER_SIZE: usize = 32;
|
||||
pub const WAL_FRAME_HEADER_SIZE: usize = 24;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct WalHeader {
|
||||
magic: [u8; 4],
|
||||
file_format: u32,
|
||||
page_size: u32,
|
||||
checkpoint_seq: u32,
|
||||
salt_1: u32,
|
||||
salt_2: u32,
|
||||
checksum_1: u32,
|
||||
checksum_2: u32,
|
||||
pub magic: [u8; 4],
|
||||
pub file_format: u32,
|
||||
pub page_size: u32,
|
||||
pub checkpoint_seq: u32,
|
||||
pub salt_1: u32,
|
||||
pub salt_2: u32,
|
||||
pub checksum_1: u32,
|
||||
pub checksum_2: u32,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -525,7 +528,11 @@ fn finish_read_page(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result<()> {
|
||||
pub fn begin_write_btree_page(
|
||||
pager: &Pager,
|
||||
page: &Rc<RefCell<Page>>,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
let page_source = &pager.page_io;
|
||||
let page_finish = page.clone();
|
||||
|
||||
@@ -537,11 +544,13 @@ pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result
|
||||
contents.buffer.clone()
|
||||
};
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
let write_complete = {
|
||||
let buf_copy = buffer.clone();
|
||||
Box::new(move |bytes_written: i32| {
|
||||
let buf_copy = buf_copy.clone();
|
||||
let buf_len = buf_copy.borrow().len();
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
|
||||
page_finish.borrow_mut().clear_dirty();
|
||||
if bytes_written < buf_len as i32 {
|
||||
@@ -554,6 +563,18 @@ pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn begin_sync(page_io: Rc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
|
||||
assert!(!*syncing.borrow());
|
||||
*syncing.borrow_mut() = true;
|
||||
let completion = Completion::Sync(SyncCompletion {
|
||||
complete: Box::new(move |_| {
|
||||
*syncing.borrow_mut() = false;
|
||||
}),
|
||||
});
|
||||
page_io.sync(Rc::new(completion))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BTreeCell {
|
||||
@@ -935,9 +956,9 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
|
||||
payload.extend_from_slice(&varint);
|
||||
}
|
||||
|
||||
pub fn begin_read_wal_header(io: Rc<dyn File>) -> Result<Rc<RefCell<WalHeader>>> {
|
||||
pub fn begin_read_wal_header(io: &Rc<dyn File>) -> Result<Rc<RefCell<WalHeader>>> {
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn)));
|
||||
let buf = Rc::new(RefCell::new(Buffer::allocate(WAL_HEADER_SIZE, drop_fn)));
|
||||
let result = Rc::new(RefCell::new(WalHeader::default()));
|
||||
let header = result.clone();
|
||||
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
|
||||
@@ -964,26 +985,122 @@ fn finish_read_wal_header(buf: Rc<RefCell<Buffer>>, header: Rc<RefCell<WalHeader
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn begin_read_wal_frame_header(
|
||||
io: &dyn File,
|
||||
pub fn begin_read_wal_frame(
|
||||
io: &Rc<dyn File>,
|
||||
offset: usize,
|
||||
) -> Result<Rc<RefCell<WalFrameHeader>>> {
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn)));
|
||||
let result = Rc::new(RefCell::new(WalFrameHeader::default()));
|
||||
let frame = result.clone();
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
page: Rc<RefCell<Page>>,
|
||||
) -> Result<()> {
|
||||
let buf = buffer_pool.get();
|
||||
let drop_fn = Rc::new(move |buf| {
|
||||
let buffer_pool = buffer_pool.clone();
|
||||
buffer_pool.put(buf);
|
||||
});
|
||||
let buf = Rc::new(RefCell::new(Buffer::new(buf, drop_fn)));
|
||||
let frame = page.clone();
|
||||
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
|
||||
let frame = frame.clone();
|
||||
finish_read_wal_frame_header(buf, frame).unwrap();
|
||||
finish_read_page(2, buf, frame).unwrap();
|
||||
});
|
||||
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
|
||||
io.pread(offset, c)?;
|
||||
Ok(result)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn finish_read_wal_frame_header(
|
||||
pub fn begin_write_wal_frame(
|
||||
io: &Rc<dyn File>,
|
||||
offset: usize,
|
||||
page: &Rc<RefCell<Page>>,
|
||||
db_size: u32,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
let page_finish = page.clone();
|
||||
let page_id = page.borrow().id;
|
||||
|
||||
let header = WalFrameHeader {
|
||||
page_number: page_id as u32,
|
||||
db_size,
|
||||
salt_1: 0,
|
||||
salt_2: 0,
|
||||
checksum_1: 0,
|
||||
checksum_2: 0,
|
||||
};
|
||||
let buffer = {
|
||||
let page = page.borrow();
|
||||
let contents = page.contents.read().unwrap();
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
let contents = contents.as_ref().unwrap();
|
||||
|
||||
let mut buffer = Buffer::allocate(
|
||||
contents.buffer.borrow().len() + WAL_FRAME_HEADER_SIZE,
|
||||
drop_fn,
|
||||
);
|
||||
let buf = buffer.as_mut_slice();
|
||||
|
||||
buf[0..4].copy_from_slice(&header.page_number.to_ne_bytes());
|
||||
buf[4..8].copy_from_slice(&header.db_size.to_ne_bytes());
|
||||
buf[8..12].copy_from_slice(&header.salt_1.to_ne_bytes());
|
||||
buf[12..16].copy_from_slice(&header.salt_2.to_ne_bytes());
|
||||
buf[16..20].copy_from_slice(&header.checksum_1.to_ne_bytes());
|
||||
buf[20..24].copy_from_slice(&header.checksum_2.to_ne_bytes());
|
||||
buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(&contents.as_ptr());
|
||||
|
||||
Rc::new(RefCell::new(buffer))
|
||||
};
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
let write_complete = {
|
||||
let buf_copy = buffer.clone();
|
||||
Box::new(move |bytes_written: i32| {
|
||||
let buf_copy = buf_copy.clone();
|
||||
let buf_len = buf_copy.borrow().len();
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
|
||||
page_finish.borrow_mut().clear_dirty();
|
||||
if bytes_written < buf_len as i32 {
|
||||
log::error!("wrote({bytes_written}) less than expected({buf_len})");
|
||||
}
|
||||
})
|
||||
};
|
||||
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
|
||||
io.pwrite(offset, buffer.clone(), c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn begin_write_wal_header(io: &Rc<dyn File>, header: &WalHeader) -> Result<()> {
|
||||
let buffer = {
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
|
||||
let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn);
|
||||
let buf = buffer.as_mut_slice();
|
||||
|
||||
buf[0..4].copy_from_slice(&header.magic);
|
||||
buf[4..8].copy_from_slice(&header.file_format.to_be_bytes());
|
||||
buf[8..12].copy_from_slice(&header.page_size.to_be_bytes());
|
||||
buf[12..16].copy_from_slice(&header.checkpoint_seq.to_be_bytes());
|
||||
buf[16..20].copy_from_slice(&header.salt_1.to_be_bytes());
|
||||
buf[20..24].copy_from_slice(&header.salt_2.to_be_bytes());
|
||||
buf[24..28].copy_from_slice(&header.checksum_1.to_be_bytes());
|
||||
buf[28..32].copy_from_slice(&header.checksum_2.to_be_bytes());
|
||||
|
||||
Rc::new(RefCell::new(buffer))
|
||||
};
|
||||
|
||||
let write_complete = {
|
||||
Box::new(move |bytes_written: i32| {
|
||||
if bytes_written < WAL_HEADER_SIZE as i32 {
|
||||
log::error!(
|
||||
"wal header wrote({bytes_written}) less than expected({WAL_HEADER_SIZE})"
|
||||
);
|
||||
}
|
||||
})
|
||||
};
|
||||
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
|
||||
io.pwrite(0, buffer.clone(), c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish_read_wal_frame(
|
||||
buf: Rc<RefCell<Buffer>>,
|
||||
frame: Rc<RefCell<WalFrameHeader>>,
|
||||
) -> Result<()> {
|
||||
|
||||
@@ -1,23 +1,58 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::{cell::RefCell, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::io::{File, IO};
|
||||
use crate::io::{File, SyncCompletion, IO};
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
};
|
||||
use crate::Completion;
|
||||
use crate::{storage::pager::Page, Result};
|
||||
|
||||
use super::sqlite3_ondisk;
|
||||
use super::buffer_pool::BufferPool;
|
||||
use super::pager::Pager;
|
||||
use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
|
||||
|
||||
/// Write-ahead log (WAL).
|
||||
pub trait Wal {
|
||||
/// Begin a write transaction.
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&self) -> Result<()>;
|
||||
|
||||
/// End a write transaction.
|
||||
/// Begin a write transaction.
|
||||
fn begin_write_tx(&self) -> Result<()>;
|
||||
|
||||
/// End a read transaction.
|
||||
fn end_read_tx(&self) -> Result<()>;
|
||||
|
||||
/// End a write transaction.
|
||||
fn end_write_tx(&self) -> Result<()>;
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
fn read_frame(&self, frame_id: u64, page: Rc<RefCell<Page>>) -> Result<()>;
|
||||
fn read_frame(
|
||||
&self,
|
||||
frame_id: u64,
|
||||
page: Rc<RefCell<Page>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Write a frame to the WAL.
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: Rc<RefCell<Page>>,
|
||||
db_size: u32,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()>;
|
||||
|
||||
fn should_checkpoint(&self) -> bool;
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<CheckpointStatus>;
|
||||
fn sync(&mut self) -> Result<CheckpointStatus>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
@@ -26,53 +61,231 @@ pub struct WalFile {
|
||||
wal_path: String,
|
||||
file: RefCell<Option<Rc<dyn File>>>,
|
||||
wal_header: RefCell<Option<Rc<RefCell<sqlite3_ondisk::WalHeader>>>>,
|
||||
min_frame: RefCell<u64>,
|
||||
max_frame: RefCell<u64>,
|
||||
nbackfills: RefCell<u64>,
|
||||
// Maps pgno to frame id and offset in wal file
|
||||
frame_cache: RefCell<HashMap<u64, Vec<u64>>>, // FIXME: for now let's use a simple hashmap instead of a shm file
|
||||
checkpoint_threshold: usize,
|
||||
ongoing_checkpoint: HashSet<usize>,
|
||||
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
page_size: usize,
|
||||
}
|
||||
|
||||
pub enum CheckpointStatus {
|
||||
Done,
|
||||
IO,
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
impl Wal for WalFile {
|
||||
/// Begin a write transaction.
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&self) -> Result<()> {
|
||||
self.min_frame.replace(*self.nbackfills.borrow() + 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// End a write transaction.
|
||||
/// End a read transaction.
|
||||
fn end_read_tx(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Find the latest frame containing a page.
|
||||
fn find_frame(&self, _page_id: u64) -> Result<Option<u64>> {
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
|
||||
let frame_cache = self.frame_cache.borrow();
|
||||
let frames = frame_cache.get(&page_id);
|
||||
if frames.is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
self.ensure_init()?;
|
||||
let frames = frames.unwrap();
|
||||
for frame in frames.iter().rev() {
|
||||
if *frame <= *self.max_frame.borrow() {
|
||||
return Ok(Some(*frame));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
fn read_frame(&self, _frame_id: u64, _page: Rc<RefCell<Page>>) -> Result<()> {
|
||||
todo!();
|
||||
fn read_frame(
|
||||
&self,
|
||||
frame_id: u64,
|
||||
page: Rc<RefCell<Page>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
) -> Result<()> {
|
||||
let offset = self.frame_offset(frame_id);
|
||||
begin_read_wal_frame(
|
||||
self.file.borrow().as_ref().unwrap(),
|
||||
offset,
|
||||
buffer_pool,
|
||||
page,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write a frame to the WAL.
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: Rc<RefCell<Page>>,
|
||||
db_size: u32,
|
||||
_pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
self.ensure_init()?;
|
||||
let page_id = page.borrow().id;
|
||||
let frame_id = *self.max_frame.borrow();
|
||||
let offset = self.frame_offset(frame_id);
|
||||
begin_write_wal_frame(
|
||||
self.file.borrow().as_ref().unwrap(),
|
||||
offset,
|
||||
&page,
|
||||
db_size,
|
||||
write_counter,
|
||||
)?;
|
||||
self.max_frame.replace(frame_id + 1);
|
||||
{
|
||||
let mut frame_cache = self.frame_cache.borrow_mut();
|
||||
let frames = frame_cache.get_mut(&(page_id as u64));
|
||||
match frames {
|
||||
Some(frames) => frames.push(frame_id),
|
||||
None => {
|
||||
frame_cache.insert(page_id as u64, vec![frame_id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Begin a write transaction
|
||||
fn begin_write_tx(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// End a write transaction
|
||||
fn end_write_tx(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
let frame_id = *self.max_frame.borrow() as usize;
|
||||
frame_id >= self.checkpoint_threshold
|
||||
}
|
||||
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<CheckpointStatus> {
|
||||
for (page_id, _frames) in self.frame_cache.borrow().iter() {
|
||||
// move page from WAL to database file
|
||||
// TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf
|
||||
let page_id = *page_id as usize;
|
||||
if self.ongoing_checkpoint.contains(&page_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let page = pager.read_page(page_id)?;
|
||||
if page.borrow().is_locked() {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
|
||||
begin_write_btree_page(pager, &page, write_counter.clone())?;
|
||||
self.ongoing_checkpoint.insert(page_id);
|
||||
}
|
||||
|
||||
self.frame_cache.borrow_mut().clear();
|
||||
*self.max_frame.borrow_mut() = 0;
|
||||
self.ongoing_checkpoint.clear();
|
||||
Ok(CheckpointStatus::Done)
|
||||
}
|
||||
|
||||
fn sync(&mut self) -> Result<CheckpointStatus> {
|
||||
self.ensure_init()?;
|
||||
let file = self.file.borrow();
|
||||
let file = file.as_ref().unwrap();
|
||||
{
|
||||
let syncing = self.syncing.clone();
|
||||
let completion = Completion::Sync(SyncCompletion {
|
||||
complete: Box::new(move |_| {
|
||||
*syncing.borrow_mut() = false;
|
||||
}),
|
||||
});
|
||||
file.sync(Rc::new(completion))?;
|
||||
}
|
||||
|
||||
if *self.syncing.borrow() {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
} else {
|
||||
return Ok(CheckpointStatus::Done);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
impl WalFile {
|
||||
pub fn new(io: Arc<dyn IO>, wal_path: String) -> Self {
|
||||
pub fn new(io: Arc<dyn IO>, wal_path: String, page_size: usize) -> Self {
|
||||
Self {
|
||||
io,
|
||||
wal_path,
|
||||
file: RefCell::new(None),
|
||||
wal_header: RefCell::new(None),
|
||||
frame_cache: RefCell::new(HashMap::new()),
|
||||
min_frame: RefCell::new(0),
|
||||
max_frame: RefCell::new(0),
|
||||
nbackfills: RefCell::new(0),
|
||||
checkpoint_threshold: 1000,
|
||||
ongoing_checkpoint: HashSet::new(),
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
page_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_init(&self) -> Result<()> {
|
||||
if self.file.borrow().is_none() {
|
||||
if let Ok(file) = self.io.open_file(&self.wal_path) {
|
||||
*self.file.borrow_mut() = Some(file.clone());
|
||||
let wal_header = sqlite3_ondisk::begin_read_wal_header(file)?;
|
||||
// TODO: Return a completion instead.
|
||||
self.io.run_once()?;
|
||||
self.wal_header.replace(Some(wal_header));
|
||||
}
|
||||
match self
|
||||
.io
|
||||
.open_file(&self.wal_path, crate::io::OpenFlags::Create)
|
||||
{
|
||||
Ok(file) => {
|
||||
if file.size()? > 0 {
|
||||
let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) {
|
||||
Ok(header) => header,
|
||||
Err(err) => panic!("Couldn't read header page: {:?}", err),
|
||||
};
|
||||
// TODO: Return a completion instead.
|
||||
self.io.run_once()?;
|
||||
self.wal_header.replace(Some(wal_header));
|
||||
} else {
|
||||
let wal_header = WalHeader {
|
||||
magic: (0x377f0682_u32).to_be_bytes(),
|
||||
file_format: 3007000,
|
||||
page_size: self.page_size as u32,
|
||||
checkpoint_seq: 0, // TODO implement sequence number
|
||||
salt_1: 0, // TODO implement salt
|
||||
salt_2: 0,
|
||||
checksum_1: 0,
|
||||
checksum_2: 0, // TODO implement checksum header
|
||||
};
|
||||
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
|
||||
self.wal_header
|
||||
.replace(Some(Rc::new(RefCell::new(wal_header))));
|
||||
}
|
||||
*self.file.borrow_mut() = Some(file);
|
||||
}
|
||||
Err(err) => panic!("{:?} {}", err, &self.wal_path),
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn frame_offset(&self, frame_id: u64) -> usize {
|
||||
let header = self.wal_header.borrow();
|
||||
let header = header.as_ref().unwrap().borrow();
|
||||
let page_size = header.page_size;
|
||||
let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64);
|
||||
let offset = WAL_HEADER_SIZE as u64 + WAL_FRAME_HEADER_SIZE as u64 + page_offset;
|
||||
offset as usize
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
use std::rc::{Rc, Weak};
|
||||
use std::usize;
|
||||
|
||||
use sqlite3_parser::ast;
|
||||
|
||||
@@ -11,7 +12,7 @@ use crate::translate::plan::Search;
|
||||
use crate::types::{OwnedRecord, OwnedValue};
|
||||
use crate::vdbe::builder::ProgramBuilder;
|
||||
use crate::vdbe::{BranchOffset, Insn, Program};
|
||||
use crate::Result;
|
||||
use crate::{Connection, Result};
|
||||
|
||||
use super::expr::{
|
||||
translate_aggregation, translate_condition_expr, translate_expr, translate_table_columns,
|
||||
@@ -1683,7 +1684,7 @@ fn epilogue(
|
||||
});
|
||||
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_insn(Insn::Transaction);
|
||||
program.emit_insn(Insn::Transaction { write: false });
|
||||
|
||||
program.emit_constant_insns();
|
||||
program.emit_insn(Insn::Goto {
|
||||
@@ -1699,6 +1700,7 @@ pub fn emit_program(
|
||||
database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
mut plan: Plan,
|
||||
cache: ExpressionResultCache,
|
||||
connection: Weak<Connection>,
|
||||
) -> Result<Program> {
|
||||
let (mut program, mut metadata, init_label, start_offset) = prologue(cache)?;
|
||||
loop {
|
||||
@@ -1717,7 +1719,7 @@ pub fn emit_program(
|
||||
}
|
||||
OpStepResult::Done => {
|
||||
epilogue(&mut program, &mut metadata, init_label, start_offset)?;
|
||||
return Ok(program.build(database_header));
|
||||
return Ok(program.build(database_header, connection));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::rc::Weak;
|
||||
use std::{cell::RefCell, ops::Deref, rc::Rc};
|
||||
|
||||
use sqlite3_parser::ast::{
|
||||
@@ -5,13 +6,13 @@ use sqlite3_parser::ast::{
|
||||
};
|
||||
|
||||
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
|
||||
use crate::Result;
|
||||
use crate::{
|
||||
schema::{Schema, Table},
|
||||
storage::sqlite3_ondisk::DatabaseHeader,
|
||||
translate::expr::translate_expr,
|
||||
vdbe::{builder::ProgramBuilder, Insn, Program},
|
||||
};
|
||||
use crate::{Connection, Result};
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn translate_insert(
|
||||
@@ -23,6 +24,7 @@ pub fn translate_insert(
|
||||
body: &InsertBody,
|
||||
_returning: &Option<Vec<ResultColumn>>,
|
||||
database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
connection: Weak<Connection>,
|
||||
) -> Result<Program> {
|
||||
assert!(with.is_none());
|
||||
assert!(or_conflict.is_none());
|
||||
@@ -203,11 +205,11 @@ pub fn translate_insert(
|
||||
description: String::new(),
|
||||
});
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_insn(Insn::Transaction);
|
||||
program.emit_insn(Insn::Transaction { write: true });
|
||||
program.emit_constant_insns();
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: start_offset,
|
||||
});
|
||||
program.resolve_deferred_labels();
|
||||
Ok(program.build(database_header))
|
||||
Ok(program.build(database_header, connection))
|
||||
}
|
||||
|
||||
@@ -16,13 +16,13 @@ pub(crate) mod planner;
|
||||
pub(crate) mod select;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::rc::{Rc, Weak};
|
||||
|
||||
use crate::schema::Schema;
|
||||
use crate::storage::pager::Pager;
|
||||
use crate::storage::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE};
|
||||
use crate::vdbe::{builder::ProgramBuilder, Insn, Program};
|
||||
use crate::{bail_parse_error, Result};
|
||||
use crate::{bail_parse_error, Connection, Result};
|
||||
use insert::translate_insert;
|
||||
use select::translate_select;
|
||||
use sqlite3_parser::ast;
|
||||
@@ -33,6 +33,7 @@ pub fn translate(
|
||||
stmt: ast::Stmt,
|
||||
database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
pager: Rc<Pager>,
|
||||
connection: Weak<Connection>,
|
||||
) -> Result<Program> {
|
||||
match stmt {
|
||||
ast::Stmt::AlterTable(_, _) => bail_parse_error!("ALTER TABLE not supported yet"),
|
||||
@@ -53,12 +54,14 @@ pub fn translate(
|
||||
ast::Stmt::DropTable { .. } => bail_parse_error!("DROP TABLE not supported yet"),
|
||||
ast::Stmt::DropTrigger { .. } => bail_parse_error!("DROP TRIGGER not supported yet"),
|
||||
ast::Stmt::DropView { .. } => bail_parse_error!("DROP VIEW not supported yet"),
|
||||
ast::Stmt::Pragma(name, body) => translate_pragma(&name, body, database_header, pager),
|
||||
ast::Stmt::Pragma(name, body) => {
|
||||
translate_pragma(&name, body, database_header, pager, connection)
|
||||
}
|
||||
ast::Stmt::Reindex { .. } => bail_parse_error!("REINDEX not supported yet"),
|
||||
ast::Stmt::Release(_) => bail_parse_error!("RELEASE not supported yet"),
|
||||
ast::Stmt::Rollback { .. } => bail_parse_error!("ROLLBACK not supported yet"),
|
||||
ast::Stmt::Savepoint(_) => bail_parse_error!("SAVEPOINT not supported yet"),
|
||||
ast::Stmt::Select(select) => translate_select(schema, select, database_header),
|
||||
ast::Stmt::Select(select) => translate_select(schema, select, database_header, connection),
|
||||
ast::Stmt::Update { .. } => bail_parse_error!("UPDATE not supported yet"),
|
||||
ast::Stmt::Vacuum(_, _) => bail_parse_error!("VACUUM not supported yet"),
|
||||
ast::Stmt::Insert {
|
||||
@@ -77,6 +80,7 @@ pub fn translate(
|
||||
&body,
|
||||
&returning,
|
||||
database_header,
|
||||
connection,
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -86,6 +90,7 @@ fn translate_pragma(
|
||||
body: Option<ast::PragmaBody>,
|
||||
database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
pager: Rc<Pager>,
|
||||
connection: Weak<Connection>,
|
||||
) -> Result<Program> {
|
||||
let mut program = ProgramBuilder::new();
|
||||
let init_label = program.allocate_label();
|
||||
@@ -96,6 +101,7 @@ fn translate_pragma(
|
||||
init_label,
|
||||
);
|
||||
let start_offset = program.offset();
|
||||
let mut write = false;
|
||||
match body {
|
||||
None => {
|
||||
let pragma_result = program.alloc_register();
|
||||
@@ -124,6 +130,7 @@ fn translate_pragma(
|
||||
},
|
||||
_ => 0,
|
||||
};
|
||||
write = true;
|
||||
update_pragma(
|
||||
&name.name.0,
|
||||
value_to_update,
|
||||
@@ -140,13 +147,13 @@ fn translate_pragma(
|
||||
description: String::new(),
|
||||
});
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_insn(Insn::Transaction);
|
||||
program.emit_insn(Insn::Transaction { write });
|
||||
program.emit_constant_insns();
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: start_offset,
|
||||
});
|
||||
program.resolve_deferred_labels();
|
||||
Ok(program.build(database_header))
|
||||
Ok(program.build(database_header, connection))
|
||||
}
|
||||
|
||||
fn update_pragma(name: &str, value: i64, header: Rc<RefCell<DatabaseHeader>>, pager: Rc<Pager>) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::rc::Weak;
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use crate::storage::sqlite3_ondisk::DatabaseHeader;
|
||||
use crate::Connection;
|
||||
use crate::{schema::Schema, vdbe::Program, Result};
|
||||
use sqlite3_parser::ast;
|
||||
|
||||
@@ -12,8 +14,14 @@ pub fn translate_select(
|
||||
schema: &Schema,
|
||||
select: ast::Select,
|
||||
database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
connection: Weak<Connection>,
|
||||
) -> Result<Program> {
|
||||
let select_plan = prepare_select_plan(schema, select)?;
|
||||
let (optimized_plan, expr_result_cache) = optimize_plan(select_plan)?;
|
||||
emit_program(database_header, optimized_plan, expr_result_cache)
|
||||
emit_program(
|
||||
database_header,
|
||||
optimized_plan,
|
||||
expr_result_cache,
|
||||
connection,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
use std::{cell::RefCell, collections::HashMap, rc::Rc};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
collections::HashMap,
|
||||
rc::{Rc, Weak},
|
||||
};
|
||||
|
||||
use crate::storage::sqlite3_ondisk::DatabaseHeader;
|
||||
use crate::{storage::sqlite3_ondisk::DatabaseHeader, Connection};
|
||||
|
||||
use super::{BranchOffset, CursorID, Insn, InsnReference, Program, Table};
|
||||
|
||||
@@ -354,7 +358,11 @@ impl ProgramBuilder {
|
||||
self.deferred_label_resolutions.clear();
|
||||
}
|
||||
|
||||
pub fn build(self, database_header: Rc<RefCell<DatabaseHeader>>) -> Program {
|
||||
pub fn build(
|
||||
self,
|
||||
database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
connection: Weak<Connection>,
|
||||
) -> Program {
|
||||
assert!(
|
||||
self.deferred_label_resolutions.is_empty(),
|
||||
"deferred_label_resolutions is not empty when build() is called, did you forget to call resolve_deferred_labels()?"
|
||||
@@ -369,6 +377,8 @@ impl ProgramBuilder {
|
||||
cursor_ref: self.cursor_ref,
|
||||
database_header,
|
||||
comments: self.comments,
|
||||
connection,
|
||||
auto_commit: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,10 +395,10 @@ pub fn insn_to_str(
|
||||
0,
|
||||
"".to_string(),
|
||||
),
|
||||
Insn::Transaction => (
|
||||
Insn::Transaction { write } => (
|
||||
"Transaction",
|
||||
0,
|
||||
0,
|
||||
*write as i32,
|
||||
0,
|
||||
OwnedValue::Text(Rc::new("".to_string())),
|
||||
0,
|
||||
|
||||
@@ -33,7 +33,8 @@ use crate::storage::{btree::BTreeCursor, pager::Pager};
|
||||
use crate::types::{
|
||||
AggContext, Cursor, CursorResult, OwnedRecord, OwnedValue, Record, SeekKey, SeekOp,
|
||||
};
|
||||
use crate::{Result, DATABASE_VERSION};
|
||||
use crate::DATABASE_VERSION;
|
||||
use crate::{Connection, Result, TransactionState};
|
||||
|
||||
use datetime::{exec_date, exec_time, exec_unixepoch};
|
||||
|
||||
@@ -44,7 +45,7 @@ use std::borrow::BorrowMut;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Display;
|
||||
use std::rc::Rc;
|
||||
use std::rc::{Rc, Weak};
|
||||
|
||||
pub type BranchOffset = i64;
|
||||
|
||||
@@ -240,7 +241,9 @@ pub enum Insn {
|
||||
},
|
||||
|
||||
// Start a transaction.
|
||||
Transaction,
|
||||
Transaction {
|
||||
write: bool,
|
||||
},
|
||||
|
||||
// Branch to the given PC.
|
||||
Goto {
|
||||
@@ -529,6 +532,8 @@ pub struct Program {
|
||||
pub cursor_ref: Vec<(Option<String>, Option<Table>)>,
|
||||
pub database_header: Rc<RefCell<DatabaseHeader>>,
|
||||
pub comments: HashMap<BranchOffset, &'static str>,
|
||||
pub connection: Weak<Connection>,
|
||||
pub auto_commit: bool,
|
||||
}
|
||||
|
||||
impl Program {
|
||||
@@ -1093,11 +1098,41 @@ impl Program {
|
||||
)));
|
||||
}
|
||||
}
|
||||
pager.end_read_tx()?;
|
||||
return Ok(StepResult::Done);
|
||||
if self.auto_commit {
|
||||
return match pager.end_tx() {
|
||||
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),
|
||||
Ok(crate::storage::wal::CheckpointStatus::Done) => Ok(StepResult::Done),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
} else {
|
||||
return Ok(StepResult::Done);
|
||||
}
|
||||
}
|
||||
Insn::Transaction => {
|
||||
pager.begin_read_tx()?;
|
||||
Insn::Transaction { write } => {
|
||||
let connection = self.connection.upgrade().unwrap();
|
||||
if let Some(db) = connection.db.upgrade() {
|
||||
// TODO(pere): are backpointers good ?? this looks ugly af
|
||||
// upgrade transaction if needed
|
||||
let new_transaction_state =
|
||||
match (db.transaction_state.borrow().clone(), write) {
|
||||
(crate::TransactionState::Write, true) => TransactionState::Write,
|
||||
(crate::TransactionState::Write, false) => TransactionState::Write,
|
||||
(crate::TransactionState::Read, true) => TransactionState::Write,
|
||||
(crate::TransactionState::Read, false) => TransactionState::Read,
|
||||
(crate::TransactionState::None, true) => TransactionState::Read,
|
||||
(crate::TransactionState::None, false) => TransactionState::Read,
|
||||
};
|
||||
// TODO(Pere):
|
||||
// 1. lock wal
|
||||
// 2. lock shared
|
||||
// 3. lock write db if write
|
||||
db.transaction_state.replace(new_transaction_state.clone());
|
||||
if matches!(new_transaction_state, TransactionState::Write) {
|
||||
pager.begin_read_tx()?;
|
||||
} else {
|
||||
pager.begin_write_tx()?;
|
||||
}
|
||||
}
|
||||
state.pc += 1;
|
||||
}
|
||||
Insn::Goto { target_pc } => {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use limbo_core::{Database, File, PlatformIO, Result, IO};
|
||||
use limbo_core::{Database, File, OpenFlags, PlatformIO, Result, IO};
|
||||
use rand::prelude::*;
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use std::cell::RefCell;
|
||||
@@ -14,9 +14,12 @@ fn main() {
|
||||
println!("Seed: {}", seed);
|
||||
let mut rng = ChaCha8Rng::seed_from_u64(seed);
|
||||
let io = Arc::new(SimulatorIO::new(seed).unwrap());
|
||||
let db = match Database::open_file(io.clone(), "./testing/testing.db") {
|
||||
let test_path = "./testing/testing.db";
|
||||
let db = match Database::open_file(io.clone(), test_path) {
|
||||
Ok(db) => db,
|
||||
Err(_) => todo!(),
|
||||
Err(e) => {
|
||||
panic!("error opening database test file {}: {:?}", test_path, e);
|
||||
}
|
||||
};
|
||||
for _ in 0..100 {
|
||||
let conn = db.connect();
|
||||
@@ -91,8 +94,8 @@ impl SimulatorIO {
|
||||
}
|
||||
|
||||
impl IO for SimulatorIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn limbo_core::File>> {
|
||||
let inner = self.inner.open_file(path)?;
|
||||
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn limbo_core::File>> {
|
||||
let inner = self.inner.open_file(path, flags)?;
|
||||
let file = Rc::new(SimulatorFile {
|
||||
inner,
|
||||
fault: RefCell::new(false),
|
||||
@@ -187,6 +190,14 @@ impl limbo_core::File for SimulatorFile {
|
||||
}
|
||||
self.inner.pwrite(pos, buffer, c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: Rc<limbo_core::Completion>) -> Result<()> {
|
||||
self.inner.sync(c)
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
Ok(self.inner.size()?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SimulatorFile {
|
||||
|
||||
@@ -5,6 +5,7 @@ use log::trace;
|
||||
use std::cell::RefCell;
|
||||
use std::ffi;
|
||||
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
macro_rules! stub {
|
||||
@@ -32,8 +33,8 @@ pub mod util;
|
||||
use util::sqlite3_safety_check_sick_or_ok;
|
||||
|
||||
pub struct sqlite3 {
|
||||
pub(crate) _db: limbo_core::Database,
|
||||
pub(crate) conn: limbo_core::Connection,
|
||||
pub(crate) _db: Rc<limbo_core::Database>,
|
||||
pub(crate) conn: Rc<limbo_core::Connection>,
|
||||
pub(crate) err_code: ffi::c_int,
|
||||
pub(crate) err_mask: ffi::c_int,
|
||||
pub(crate) malloc_failed: bool,
|
||||
@@ -42,7 +43,7 @@ pub struct sqlite3 {
|
||||
}
|
||||
|
||||
impl sqlite3 {
|
||||
pub fn new(db: limbo_core::Database, conn: limbo_core::Connection) -> Self {
|
||||
pub fn new(db: Rc<limbo_core::Database>, conn: Rc<limbo_core::Connection>) -> Self {
|
||||
Self {
|
||||
_db: db,
|
||||
conn,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use limbo_core::Database;
|
||||
use std::path::PathBuf;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -16,6 +17,9 @@ impl TempDatabase {
|
||||
path.push("test.db");
|
||||
{
|
||||
let connection = rusqlite::Connection::open(&path).unwrap();
|
||||
connection
|
||||
.pragma_update(None, "journal_mode", "wal")
|
||||
.unwrap();
|
||||
connection.execute(table_sql, ()).unwrap();
|
||||
}
|
||||
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new().unwrap());
|
||||
@@ -23,7 +27,7 @@ impl TempDatabase {
|
||||
Self { path, io }
|
||||
}
|
||||
|
||||
pub fn connect_limbo(&self) -> limbo_core::Connection {
|
||||
pub fn connect_limbo(&self) -> Rc<limbo_core::Connection> {
|
||||
let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap();
|
||||
|
||||
db.connect()
|
||||
@@ -33,7 +37,8 @@ impl TempDatabase {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use limbo_core::{RowResult, Value};
|
||||
use limbo_core::{CheckpointStatus, Connection, RowResult, Value};
|
||||
use log::debug;
|
||||
|
||||
#[ignore]
|
||||
#[test]
|
||||
@@ -92,7 +97,7 @@ mod tests {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
conn.cacheflush()?;
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -129,7 +134,7 @@ mod tests {
|
||||
};
|
||||
|
||||
// this flush helped to review hex of test.db
|
||||
conn.cacheflush()?;
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
|
||||
match conn.query(list_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
@@ -160,7 +165,7 @@ mod tests {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
conn.cacheflush()?;
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -234,7 +239,69 @@ mod tests {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
conn.cacheflush()?;
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_wal_checkpoint() -> anyhow::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
|
||||
// threshold is 1000 by default
|
||||
let iterations = 1001_usize;
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
for i in 0..iterations {
|
||||
let insert_query = format!("INSERT INTO test VALUES ({})", i);
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
conn.clear_page_cache().unwrap();
|
||||
match conn.query(insert_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
conn.clear_page_cache().unwrap();
|
||||
let list_query = "SELECT * FROM test LIMIT 1";
|
||||
let mut current_index = 0;
|
||||
match conn.query(list_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::Row(row) => {
|
||||
let first_value = &row.values[0];
|
||||
let id = match first_value {
|
||||
Value::Integer(i) => *i as i32,
|
||||
Value::Float(f) => *f as i32,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
assert_eq!(current_index, id as usize);
|
||||
current_index += 1;
|
||||
}
|
||||
RowResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -254,4 +321,18 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn do_flush(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
|
||||
loop {
|
||||
match conn.cacheflush()? {
|
||||
CheckpointStatus::Done => {
|
||||
break;
|
||||
}
|
||||
CheckpointStatus::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user