mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Merge 'Page cache by page_number and frame_number' from Pere Diaz Bou
Since page cache is now shared by default, we need to cache pages by page number and something else. I chose to go with max_frame of connection, because this connection will have a max_frame set until from the start of a transaction until the end of it. With key pairs of (pgno, max_frame) we make sure each connection is caching based on the snapshot it is at as two different connections might have the same pageno being using but a different frame. If both have same max_frame then they will share same page. Closes #468
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared};
|
||||
use limbo_core::{
|
||||
maybe_init_database_file, BufferPool, OpenFlags, Pager, Result, WalFile, WalFileShared,
|
||||
};
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
@@ -26,17 +28,19 @@ impl Database {
|
||||
// ensure db header is there
|
||||
io.run_once().unwrap();
|
||||
|
||||
let page_size = db_header.borrow().page_size;
|
||||
|
||||
let wal_path = format!("{}-wal", path);
|
||||
let wal_shared =
|
||||
WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)
|
||||
.unwrap();
|
||||
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size).unwrap();
|
||||
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
io.clone(),
|
||||
db_header.borrow().page_size as usize,
|
||||
wal_shared.clone(),
|
||||
buffer_pool.clone(),
|
||||
)));
|
||||
|
||||
let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap();
|
||||
let db = limbo_core::Database::open(io, page_io, wal, wal_shared, buffer_pool).unwrap();
|
||||
let conn = db.connect();
|
||||
Database { db, conn }
|
||||
}
|
||||
@@ -258,9 +262,6 @@ impl DatabaseStorage {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct BufferPool {}
|
||||
|
||||
impl limbo_core::DatabaseStorage for DatabaseStorage {
|
||||
fn read_page(&self, page_idx: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
|
||||
let r = match &(*c) {
|
||||
|
||||
13
core/lib.rs
13
core/lib.rs
@@ -27,7 +27,8 @@ use std::{cell::RefCell, rc::Rc};
|
||||
use storage::btree::btree_init_page;
|
||||
#[cfg(feature = "fs")]
|
||||
use storage::database::FileStorage;
|
||||
use storage::pager::{allocate_page, DumbLruPageCache};
|
||||
use storage::page_cache::DumbLruPageCache;
|
||||
use storage::pager::allocate_page;
|
||||
use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE};
|
||||
pub use storage::wal::WalFile;
|
||||
pub use storage::wal::WalFileShared;
|
||||
@@ -82,14 +83,16 @@ impl Database {
|
||||
let wal_path = format!("{}-wal", path);
|
||||
let db_header = Pager::begin_open(page_io.clone())?;
|
||||
io.run_once()?;
|
||||
let wal_shared =
|
||||
WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?;
|
||||
let page_size = db_header.borrow().page_size;
|
||||
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?;
|
||||
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
io.clone(),
|
||||
db_header.borrow().page_size as usize,
|
||||
wal_shared.clone(),
|
||||
buffer_pool.clone(),
|
||||
)));
|
||||
Self::open(io, page_io, wal, wal_shared)
|
||||
Self::open(io, page_io, wal, wal_shared, buffer_pool)
|
||||
}
|
||||
|
||||
pub fn open(
|
||||
@@ -97,6 +100,7 @@ impl Database {
|
||||
page_io: Rc<dyn DatabaseStorage>,
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
shared_wal: Arc<RwLock<WalFileShared>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
) -> Result<Arc<Database>> {
|
||||
let db_header = Pager::begin_open(page_io.clone())?;
|
||||
io.run_once()?;
|
||||
@@ -111,6 +115,7 @@ impl Database {
|
||||
wal,
|
||||
io.clone(),
|
||||
shared_page_cache.clone(),
|
||||
buffer_pool,
|
||||
)?);
|
||||
let bootstrap_schema = Rc::new(RefCell::new(Schema::new()));
|
||||
let conn = Rc::new(Connection {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
pub(crate) mod btree;
|
||||
pub(crate) mod buffer_pool;
|
||||
pub(crate) mod database;
|
||||
pub(crate) mod page_cache;
|
||||
pub(crate) mod pager;
|
||||
pub(crate) mod sqlite3_ondisk;
|
||||
pub(crate) mod wal;
|
||||
|
||||
191
core/storage/page_cache.rs
Normal file
191
core/storage/page_cache.rs
Normal file
@@ -0,0 +1,191 @@
|
||||
use std::{cell::RefCell, collections::HashMap, ptr::NonNull};
|
||||
|
||||
use log::debug;
|
||||
|
||||
use super::pager::PageRef;
|
||||
|
||||
// In limbo, page cache is shared by default, meaning that multiple frames from WAL can reside in
|
||||
// the cache, meaning, we need a way to differentiate between pages cached in different
|
||||
// connections. For this we include the max_frame that will read a connection from so that if two
|
||||
// connections have different max_frames, they might or not have different frame read from WAL.
|
||||
//
|
||||
// WAL was introduced after Shared cache in SQLite, so this is why these two features don't work
|
||||
// well together because pages with different snapshots may collide.
|
||||
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
|
||||
pub struct PageCacheKey {
|
||||
pgno: usize,
|
||||
max_frame: Option<u64>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct PageCacheEntry {
|
||||
key: PageCacheKey,
|
||||
page: PageRef,
|
||||
prev: Option<NonNull<PageCacheEntry>>,
|
||||
next: Option<NonNull<PageCacheEntry>>,
|
||||
}
|
||||
|
||||
impl PageCacheEntry {
|
||||
fn as_non_null(&mut self) -> NonNull<PageCacheEntry> {
|
||||
NonNull::new(&mut *self).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DumbLruPageCache {
|
||||
capacity: usize,
|
||||
map: RefCell<HashMap<PageCacheKey, NonNull<PageCacheEntry>>>,
|
||||
head: RefCell<Option<NonNull<PageCacheEntry>>>,
|
||||
tail: RefCell<Option<NonNull<PageCacheEntry>>>,
|
||||
}
|
||||
unsafe impl Send for DumbLruPageCache {}
|
||||
unsafe impl Sync for DumbLruPageCache {}
|
||||
|
||||
impl PageCacheKey {
|
||||
pub fn new(pgno: usize, max_frame: Option<u64>) -> Self {
|
||||
Self { pgno, max_frame }
|
||||
}
|
||||
}
|
||||
impl DumbLruPageCache {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
map: RefCell::new(HashMap::new()),
|
||||
head: RefCell::new(None),
|
||||
tail: RefCell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains_key(&mut self, key: &PageCacheKey) -> bool {
|
||||
self.map.borrow().contains_key(key)
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: PageCacheKey, value: PageRef) {
|
||||
self._delete(key.clone(), false);
|
||||
debug!("cache_insert(key={:?})", key);
|
||||
let mut entry = Box::new(PageCacheEntry {
|
||||
key: key.clone(),
|
||||
next: None,
|
||||
prev: None,
|
||||
page: value,
|
||||
});
|
||||
self.touch(&mut entry);
|
||||
|
||||
if self.map.borrow().len() >= self.capacity {
|
||||
self.pop_if_not_dirty();
|
||||
}
|
||||
let b = Box::into_raw(entry);
|
||||
let as_non_null = NonNull::new(b).unwrap();
|
||||
self.map.borrow_mut().insert(key, as_non_null);
|
||||
}
|
||||
|
||||
pub fn delete(&mut self, key: PageCacheKey) {
|
||||
self._delete(key, true)
|
||||
}
|
||||
|
||||
pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) {
|
||||
debug!("cache_delete(key={:?}, clean={})", key, clean_page);
|
||||
let ptr = self.map.borrow_mut().remove(&key);
|
||||
if ptr.is_none() {
|
||||
return;
|
||||
}
|
||||
let mut ptr = ptr.unwrap();
|
||||
{
|
||||
let ptr = unsafe { ptr.as_mut() };
|
||||
self.detach(ptr, clean_page);
|
||||
}
|
||||
unsafe { std::ptr::drop_in_place(ptr.as_ptr()) };
|
||||
}
|
||||
|
||||
fn get_ptr(&mut self, key: &PageCacheKey) -> Option<NonNull<PageCacheEntry>> {
|
||||
let m = self.map.borrow_mut();
|
||||
let ptr = m.get(key);
|
||||
ptr.copied()
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: &PageCacheKey) -> Option<PageRef> {
|
||||
debug!("cache_get(key={:?})", key);
|
||||
let ptr = self.get_ptr(key);
|
||||
ptr?;
|
||||
let ptr = unsafe { ptr.unwrap().as_mut() };
|
||||
let page = ptr.page.clone();
|
||||
//self.detach(ptr);
|
||||
self.touch(ptr);
|
||||
Some(page)
|
||||
}
|
||||
|
||||
pub fn resize(&mut self, capacity: usize) {
|
||||
let _ = capacity;
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) {
|
||||
let mut current = entry.as_non_null();
|
||||
|
||||
if clean_page {
|
||||
// evict buffer
|
||||
let page = &entry.page;
|
||||
page.clear_loaded();
|
||||
debug!("cleaning up page {}", page.get().id);
|
||||
let _ = page.get().contents.take();
|
||||
}
|
||||
|
||||
let (next, prev) = unsafe {
|
||||
let c = current.as_mut();
|
||||
let next = c.next;
|
||||
let prev = c.prev;
|
||||
c.prev = None;
|
||||
c.next = None;
|
||||
(next, prev)
|
||||
};
|
||||
|
||||
// detach
|
||||
match (prev, next) {
|
||||
(None, None) => {}
|
||||
(None, Some(_)) => todo!(),
|
||||
(Some(p), None) => {
|
||||
self.tail = RefCell::new(Some(p));
|
||||
}
|
||||
(Some(mut p), Some(mut n)) => unsafe {
|
||||
let p_mut = p.as_mut();
|
||||
p_mut.next = Some(n);
|
||||
let n_mut = n.as_mut();
|
||||
n_mut.prev = Some(p);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn touch(&mut self, entry: &mut PageCacheEntry) {
|
||||
let mut current = entry.as_non_null();
|
||||
unsafe {
|
||||
let c = current.as_mut();
|
||||
c.next = *self.head.borrow();
|
||||
}
|
||||
|
||||
if let Some(mut head) = *self.head.borrow_mut() {
|
||||
unsafe {
|
||||
let head = head.as_mut();
|
||||
head.prev = Some(current);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_if_not_dirty(&mut self) {
|
||||
let tail = *self.tail.borrow();
|
||||
if tail.is_none() {
|
||||
return;
|
||||
}
|
||||
let tail = unsafe { tail.unwrap().as_mut() };
|
||||
if tail.page.is_dirty() {
|
||||
// TODO: drop from another clean entry?
|
||||
return;
|
||||
}
|
||||
self.detach(tail, true);
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
let to_remove: Vec<PageCacheKey> = self.map.borrow().iter().map(|v| v.0.clone()).collect();
|
||||
for key in to_remove {
|
||||
self.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,15 +4,13 @@ use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
|
||||
use crate::storage::wal::Wal;
|
||||
use crate::{Buffer, Result};
|
||||
use log::{debug, trace};
|
||||
use sieve_cache::SieveCache;
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::Hash;
|
||||
use std::ptr::{drop_in_place, NonNull};
|
||||
use std::collections::HashSet;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::page_cache::{DumbLruPageCache, PageCacheKey};
|
||||
use super::wal::CheckpointStatus;
|
||||
|
||||
pub struct PageInner {
|
||||
@@ -117,198 +115,6 @@ impl Page {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct PageCacheEntry {
|
||||
key: usize,
|
||||
page: PageRef,
|
||||
prev: Option<NonNull<PageCacheEntry>>,
|
||||
next: Option<NonNull<PageCacheEntry>>,
|
||||
}
|
||||
|
||||
impl PageCacheEntry {
|
||||
fn as_non_null(&mut self) -> NonNull<PageCacheEntry> {
|
||||
NonNull::new(&mut *self).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DumbLruPageCache {
|
||||
capacity: usize,
|
||||
map: RefCell<HashMap<usize, NonNull<PageCacheEntry>>>,
|
||||
head: RefCell<Option<NonNull<PageCacheEntry>>>,
|
||||
tail: RefCell<Option<NonNull<PageCacheEntry>>>,
|
||||
}
|
||||
unsafe impl Send for DumbLruPageCache {}
|
||||
unsafe impl Sync for DumbLruPageCache {}
|
||||
|
||||
impl DumbLruPageCache {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
map: RefCell::new(HashMap::new()),
|
||||
head: RefCell::new(None),
|
||||
tail: RefCell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains_key(&mut self, key: usize) -> bool {
|
||||
self.map.borrow().contains_key(&key)
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: usize, value: PageRef) {
|
||||
self._delete(key, false);
|
||||
debug!("cache_insert(key={})", key);
|
||||
let mut entry = Box::new(PageCacheEntry {
|
||||
key,
|
||||
next: None,
|
||||
prev: None,
|
||||
page: value,
|
||||
});
|
||||
self.touch(&mut entry);
|
||||
|
||||
if self.map.borrow().len() >= self.capacity {
|
||||
self.pop_if_not_dirty();
|
||||
}
|
||||
let b = Box::into_raw(entry);
|
||||
let as_non_null = NonNull::new(b).unwrap();
|
||||
self.map.borrow_mut().insert(key, as_non_null);
|
||||
}
|
||||
|
||||
pub fn delete(&mut self, key: usize) {
|
||||
self._delete(key, true)
|
||||
}
|
||||
|
||||
pub fn _delete(&mut self, key: usize, clean_page: bool) {
|
||||
debug!("cache_delete(key={}, clean={})", key, clean_page);
|
||||
let ptr = self.map.borrow_mut().remove(&key);
|
||||
if ptr.is_none() {
|
||||
return;
|
||||
}
|
||||
let mut ptr = ptr.unwrap();
|
||||
{
|
||||
let ptr = unsafe { ptr.as_mut() };
|
||||
self.detach(ptr, clean_page);
|
||||
}
|
||||
unsafe { drop_in_place(ptr.as_ptr()) };
|
||||
}
|
||||
|
||||
fn get_ptr(&mut self, key: usize) -> Option<NonNull<PageCacheEntry>> {
|
||||
let m = self.map.borrow_mut();
|
||||
let ptr = m.get(&key);
|
||||
ptr.copied()
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: &usize) -> Option<PageRef> {
|
||||
debug!("cache_get(key={})", key);
|
||||
let ptr = self.get_ptr(*key);
|
||||
ptr?;
|
||||
let ptr = unsafe { ptr.unwrap().as_mut() };
|
||||
let page = ptr.page.clone();
|
||||
//self.detach(ptr);
|
||||
self.touch(ptr);
|
||||
Some(page)
|
||||
}
|
||||
|
||||
pub fn resize(&mut self, capacity: usize) {
|
||||
let _ = capacity;
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) {
|
||||
let mut current = entry.as_non_null();
|
||||
|
||||
if clean_page {
|
||||
// evict buffer
|
||||
let page = &entry.page;
|
||||
page.clear_loaded();
|
||||
debug!("cleaning up page {}", page.get().id);
|
||||
let _ = page.get().contents.take();
|
||||
}
|
||||
|
||||
let (next, prev) = unsafe {
|
||||
let c = current.as_mut();
|
||||
let next = c.next;
|
||||
let prev = c.prev;
|
||||
c.prev = None;
|
||||
c.next = None;
|
||||
(next, prev)
|
||||
};
|
||||
|
||||
// detach
|
||||
match (prev, next) {
|
||||
(None, None) => {}
|
||||
(None, Some(_)) => todo!(),
|
||||
(Some(p), None) => {
|
||||
self.tail = RefCell::new(Some(p));
|
||||
}
|
||||
(Some(mut p), Some(mut n)) => unsafe {
|
||||
let p_mut = p.as_mut();
|
||||
p_mut.next = Some(n);
|
||||
let n_mut = n.as_mut();
|
||||
n_mut.prev = Some(p);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn touch(&mut self, entry: &mut PageCacheEntry) {
|
||||
let mut current = entry.as_non_null();
|
||||
unsafe {
|
||||
let c = current.as_mut();
|
||||
c.next = *self.head.borrow();
|
||||
}
|
||||
|
||||
if let Some(mut head) = *self.head.borrow_mut() {
|
||||
unsafe {
|
||||
let head = head.as_mut();
|
||||
head.prev = Some(current);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_if_not_dirty(&mut self) {
|
||||
let tail = *self.tail.borrow();
|
||||
if tail.is_none() {
|
||||
return;
|
||||
}
|
||||
let tail = unsafe { tail.unwrap().as_mut() };
|
||||
if tail.page.is_dirty() {
|
||||
// TODO: drop from another clean entry?
|
||||
return;
|
||||
}
|
||||
self.detach(tail, true);
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
let to_remove: Vec<usize> = self.map.borrow().iter().map(|v| *v.0).collect();
|
||||
for key in to_remove {
|
||||
self.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct PageCache<K: Eq + Hash + Clone, V> {
|
||||
cache: SieveCache<K, V>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
|
||||
pub fn new(cache: SieveCache<K, V>) -> Self {
|
||||
Self { cache }
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: K, value: V) {
|
||||
self.cache.insert(key, value);
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: &K) -> Option<&V> {
|
||||
self.cache.get(key)
|
||||
}
|
||||
|
||||
pub fn resize(&mut self, capacity: usize) {
|
||||
self.cache = SieveCache::new(capacity).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum FlushState {
|
||||
Start,
|
||||
@@ -322,6 +128,8 @@ enum FlushState {
|
||||
#[derive(Clone, Debug)]
|
||||
enum CheckpointState {
|
||||
Checkpoint,
|
||||
SyncDbFile,
|
||||
WaitSyncDbFile,
|
||||
CheckpointDone,
|
||||
}
|
||||
|
||||
@@ -368,14 +176,11 @@ impl Pager {
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
) -> Result<Self> {
|
||||
let db_header = RefCell::borrow(&db_header_ref);
|
||||
let page_size = db_header.page_size as usize;
|
||||
let buffer_pool = Rc::new(BufferPool::new(page_size));
|
||||
Ok(Self {
|
||||
page_io,
|
||||
wal,
|
||||
buffer_pool,
|
||||
page_cache,
|
||||
io,
|
||||
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
|
||||
@@ -387,6 +192,7 @@ impl Pager {
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
|
||||
checkpoint_inflight: Rc::new(RefCell::new(0)),
|
||||
buffer_pool,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -413,7 +219,8 @@ impl Pager {
|
||||
pub fn read_page(&self, page_idx: usize) -> crate::Result<PageRef> {
|
||||
trace!("read_page(page_idx = {})", page_idx);
|
||||
let mut page_cache = self.page_cache.write().unwrap();
|
||||
if let Some(page) = page_cache.get(&page_idx) {
|
||||
let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame()));
|
||||
if let Some(page) = page_cache.get(&page_key) {
|
||||
trace!("read_page(page_idx = {}) = cached", page_idx);
|
||||
return Ok(page.clone());
|
||||
}
|
||||
@@ -429,7 +236,7 @@ impl Pager {
|
||||
}
|
||||
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
|
||||
// and if successful, read frame or page
|
||||
page_cache.insert(page_idx, page.clone());
|
||||
page_cache.insert(page_key, page.clone());
|
||||
return Ok(page);
|
||||
}
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
@@ -439,7 +246,7 @@ impl Pager {
|
||||
page_idx,
|
||||
)?;
|
||||
// TODO(pere) ensure page is inserted
|
||||
page_cache.insert(page_idx, page.clone());
|
||||
page_cache.insert(page_key, page.clone());
|
||||
Ok(page)
|
||||
}
|
||||
|
||||
@@ -449,6 +256,7 @@ impl Pager {
|
||||
trace!("load_page(page_idx = {})", id);
|
||||
let mut page_cache = self.page_cache.write().unwrap();
|
||||
page.set_locked();
|
||||
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
|
||||
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
|
||||
self.wal
|
||||
.borrow()
|
||||
@@ -457,8 +265,8 @@ impl Pager {
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(id) {
|
||||
page_cache.insert(id, page.clone());
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
@@ -469,8 +277,8 @@ impl Pager {
|
||||
id,
|
||||
)?;
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(id) {
|
||||
page_cache.insert(id, page.clone());
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -500,13 +308,14 @@ impl Pager {
|
||||
let db_size = self.db_header.borrow().database_size;
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write().unwrap();
|
||||
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
let page_key =
|
||||
PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame()));
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
|
||||
debug!("appending frame {} {:?}", page_id, page_type);
|
||||
log::trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
|
||||
self.wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
db_size,
|
||||
self,
|
||||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
)?;
|
||||
}
|
||||
@@ -565,18 +374,30 @@ impl Pager {
|
||||
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
|
||||
loop {
|
||||
let state = self.checkpoint_state.borrow().clone();
|
||||
log::trace!("checkpoint(state={:?})", state);
|
||||
log::trace!("pager_checkpoint(state={:?})", state);
|
||||
match state {
|
||||
CheckpointState::Checkpoint => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
|
||||
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
|
||||
CheckpointStatus::Done => {
|
||||
self.checkpoint_state
|
||||
.replace(CheckpointState::CheckpointDone);
|
||||
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
|
||||
}
|
||||
};
|
||||
}
|
||||
CheckpointState::SyncDbFile => {
|
||||
sqlite3_ondisk::begin_sync(self.page_io.clone(), self.syncing.clone())?;
|
||||
self.checkpoint_state
|
||||
.replace(CheckpointState::WaitSyncDbFile);
|
||||
}
|
||||
CheckpointState::WaitSyncDbFile => {
|
||||
if *self.syncing.borrow() {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
} else {
|
||||
self.checkpoint_state
|
||||
.replace(CheckpointState::CheckpointDone);
|
||||
}
|
||||
}
|
||||
CheckpointState::CheckpointDone => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
if *in_flight.borrow() > 0 {
|
||||
@@ -598,13 +419,16 @@ impl Pager {
|
||||
.borrow_mut()
|
||||
.checkpoint(self, Rc::new(RefCell::new(0)))
|
||||
{
|
||||
Ok(CheckpointStatus::IO) => {}
|
||||
Ok(CheckpointStatus::IO) => {
|
||||
self.io.run_once();
|
||||
}
|
||||
Ok(CheckpointStatus::Done) => {
|
||||
break;
|
||||
}
|
||||
Err(err) => panic!("error while clearing cache {}", err),
|
||||
}
|
||||
}
|
||||
// TODO: only clear cache of things that are really invalidated
|
||||
self.page_cache.write().unwrap().clear();
|
||||
}
|
||||
|
||||
@@ -641,7 +465,9 @@ impl Pager {
|
||||
page.set_dirty();
|
||||
self.add_dirty(page.get().id);
|
||||
let mut cache = self.page_cache.write().unwrap();
|
||||
cache.insert(page.get().id, page.clone());
|
||||
let page_key =
|
||||
PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame()));
|
||||
cache.insert(page_key, page.clone());
|
||||
}
|
||||
Ok(page)
|
||||
}
|
||||
@@ -649,7 +475,8 @@ impl Pager {
|
||||
pub fn put_loaded_page(&self, id: usize, page: PageRef) {
|
||||
let mut cache = self.page_cache.write().unwrap();
|
||||
// cache insert invalidates previous page
|
||||
cache.insert(id, page.clone());
|
||||
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
|
||||
cache.insert(page_key, page.clone());
|
||||
page.set_loaded();
|
||||
}
|
||||
|
||||
@@ -682,7 +509,9 @@ pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize
|
||||
mod tests {
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::{DumbLruPageCache, Page};
|
||||
use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey};
|
||||
|
||||
use super::Page;
|
||||
|
||||
#[test]
|
||||
fn test_shared_cache() {
|
||||
@@ -693,12 +522,14 @@ mod tests {
|
||||
let cache = cache.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut cache = cache.write().unwrap();
|
||||
cache.insert(1, Arc::new(Page::new(1)));
|
||||
let page_key = PageCacheKey::new(1, None);
|
||||
cache.insert(page_key, Arc::new(Page::new(1)));
|
||||
})
|
||||
};
|
||||
let _ = thread.join();
|
||||
let mut cache = cache.write().unwrap();
|
||||
let page = cache.get(&1);
|
||||
let page_key = PageCacheKey::new(1, None);
|
||||
let page = cache.get(&page_key);
|
||||
assert_eq!(page.unwrap().get().id, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -581,6 +581,7 @@ pub fn begin_write_btree_page(
|
||||
page: &PageRef,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
log::trace!("begin_write_btree_page(page={})", page.get().id);
|
||||
let page_source = &pager.page_io;
|
||||
let page_finish = page.clone();
|
||||
|
||||
@@ -1039,6 +1040,11 @@ pub fn begin_read_wal_frame(
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
page: PageRef,
|
||||
) -> Result<()> {
|
||||
log::trace!(
|
||||
"begin_read_wal_frame(offset={}, page={})",
|
||||
offset,
|
||||
page.get().id
|
||||
);
|
||||
let buf = buffer_pool.get();
|
||||
let drop_fn = Rc::new(move |buf| {
|
||||
let buffer_pool = buffer_pool.clone();
|
||||
|
||||
@@ -8,12 +8,13 @@ use crate::io::{File, SyncCompletion, IO};
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
};
|
||||
use crate::Completion;
|
||||
use crate::Result;
|
||||
use crate::{Buffer, Result};
|
||||
use crate::{Completion, Page};
|
||||
|
||||
use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE};
|
||||
use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
|
||||
|
||||
use super::buffer_pool::BufferPool;
|
||||
use super::page_cache::PageCacheKey;
|
||||
use super::pager::{PageRef, Pager};
|
||||
use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
|
||||
|
||||
@@ -42,7 +43,6 @@ pub trait Wal {
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
db_size: u32,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()>;
|
||||
|
||||
@@ -53,38 +53,85 @@ pub trait Wal {
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<CheckpointStatus>;
|
||||
fn sync(&mut self) -> Result<CheckpointStatus>;
|
||||
fn get_max_frame(&self) -> u64;
|
||||
fn get_min_frame(&self) -> u64;
|
||||
}
|
||||
|
||||
// Syncing requires a state machine because we need to schedule a sync and then wait until it is
|
||||
// finished. If we don't wait there will be undefined behaviour that no one wants to debug.
|
||||
#[derive(Copy, Clone)]
|
||||
enum SyncState {
|
||||
NotSyncing,
|
||||
Syncing,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum CheckpointState {
|
||||
Start,
|
||||
ReadFrame,
|
||||
WaitReadFrame,
|
||||
WritePage,
|
||||
WaitWritePage,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub enum CheckpointStatus {
|
||||
Done,
|
||||
IO,
|
||||
}
|
||||
|
||||
// Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save
|
||||
// in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do
|
||||
// page operations like reading a frame to a page, and writing a page to disk. This page should not
|
||||
// be placed back in pager page cache or anything, it's just a helper.
|
||||
// min_frame and max_frame is the range of frames that can be safely transferred from WAL to db
|
||||
// file.
|
||||
// current_page is a helper to iterate through all the pages that might have a frame in the safe
|
||||
// range. This is inneficient for now.
|
||||
struct OngoingCheckpoint {
|
||||
page: PageRef,
|
||||
state: CheckpointState,
|
||||
min_frame: u64,
|
||||
max_frame: u64,
|
||||
current_page: u64,
|
||||
}
|
||||
|
||||
pub struct WalFile {
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
|
||||
sync_state: RefCell<SyncState>,
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
page_size: usize,
|
||||
|
||||
ongoing_checkpoint: HashSet<usize>,
|
||||
shared: Arc<RwLock<WalFileShared>>,
|
||||
ongoing_checkpoint: OngoingCheckpoint,
|
||||
checkpoint_threshold: usize,
|
||||
// min and max frames for this connection
|
||||
max_frame: u64,
|
||||
min_frame: u64,
|
||||
}
|
||||
|
||||
/// WalFileShared is the part of a WAL that will be shared between threads. A wal has information
|
||||
/// that needs to be communicated between threads so this struct does the job.
|
||||
pub struct WalFileShared {
|
||||
wal_header: Arc<RwLock<sqlite3_ondisk::WalHeader>>,
|
||||
min_frame: u64,
|
||||
max_frame: u64,
|
||||
nbackfills: u64,
|
||||
// Maps pgno to frame id and offset in wal file
|
||||
frame_cache: HashMap<u64, Vec<u64>>, // FIXME: for now let's use a simple hashmap instead of a shm file
|
||||
// Frame cache maps a Page to all the frames it has stored in WAL in ascending order.
|
||||
// This is do to easily find the frame it must checkpoint each connection if a checkpoint is
|
||||
// necessary.
|
||||
// One difference between SQLite and limbo is that we will never support multi process, meaning
|
||||
// we don't need WAL's index file. So we can do stuff like this without shared memory.
|
||||
// TODO: this will need refactoring because this is incredible memory inneficient.
|
||||
frame_cache: HashMap<u64, Vec<u64>>,
|
||||
// Another memory inneficient array made to just keep track of pages that are in frame_cache.
|
||||
pages_in_frames: Vec<u64>,
|
||||
last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
|
||||
file: Rc<dyn File>,
|
||||
}
|
||||
|
||||
pub enum CheckpointStatus {
|
||||
Done,
|
||||
IO,
|
||||
}
|
||||
|
||||
impl Wal for WalFile {
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&mut self) -> Result<()> {
|
||||
@@ -120,6 +167,7 @@ impl Wal for WalFile {
|
||||
debug!("read_frame({})", frame_id);
|
||||
let offset = self.frame_offset(frame_id);
|
||||
let shared = self.shared.read().unwrap();
|
||||
page.set_locked();
|
||||
begin_read_wal_frame(
|
||||
&shared.file,
|
||||
offset + WAL_FRAME_HEADER_SIZE,
|
||||
@@ -134,7 +182,6 @@ impl Wal for WalFile {
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
db_size: u32,
|
||||
_pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
let page_id = page.get().id;
|
||||
@@ -167,6 +214,7 @@ impl Wal for WalFile {
|
||||
Some(frames) => frames.push(frame_id),
|
||||
None => {
|
||||
shared.frame_cache.insert(page_id as u64, vec![frame_id]);
|
||||
shared.pages_in_frames.push(page_id as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,63 +242,180 @@ impl Wal for WalFile {
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<CheckpointStatus> {
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
for (page_id, _frames) in shared.frame_cache.iter() {
|
||||
// move page from WAL to database file
|
||||
// TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf
|
||||
let page_id = *page_id as usize;
|
||||
if self.ongoing_checkpoint.contains(&page_id) {
|
||||
continue;
|
||||
}
|
||||
'checkpoint_loop: loop {
|
||||
let state = self.ongoing_checkpoint.state;
|
||||
log::debug!("checkpoint(state={:?})", state);
|
||||
match state {
|
||||
CheckpointState::Start => {
|
||||
// TODO(pere): check what frames are safe to checkpoint between many readers!
|
||||
self.ongoing_checkpoint.min_frame = self.min_frame;
|
||||
self.ongoing_checkpoint.max_frame = self.max_frame;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
}
|
||||
CheckpointState::ReadFrame => {
|
||||
let shared = self.shared.read().unwrap();
|
||||
assert!(
|
||||
self.ongoing_checkpoint.current_page as usize
|
||||
<= shared.pages_in_frames.len()
|
||||
);
|
||||
if self.ongoing_checkpoint.current_page as usize == shared.pages_in_frames.len()
|
||||
{
|
||||
self.ongoing_checkpoint.state = CheckpointState::Done;
|
||||
continue 'checkpoint_loop;
|
||||
}
|
||||
let page =
|
||||
shared.pages_in_frames[self.ongoing_checkpoint.current_page as usize];
|
||||
let frames = shared
|
||||
.frame_cache
|
||||
.get(&page)
|
||||
.expect("page must be in frame cache if it's in list");
|
||||
|
||||
let page = pager.read_page(page_id)?;
|
||||
if page.is_locked() {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
for frame in frames.iter().rev() {
|
||||
// TODO: do proper selection of frames to checkpoint
|
||||
if *frame >= self.ongoing_checkpoint.min_frame {
|
||||
log::debug!(
|
||||
"checkpoint page(state={:?}, page={}, frame={})",
|
||||
state,
|
||||
page,
|
||||
*frame
|
||||
);
|
||||
self.ongoing_checkpoint.page.get().id = page as usize;
|
||||
|
||||
begin_write_btree_page(pager, &page, write_counter.clone())?;
|
||||
self.ongoing_checkpoint.insert(page_id);
|
||||
self.read_frame(
|
||||
*frame,
|
||||
self.ongoing_checkpoint.page.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
)?;
|
||||
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
|
||||
self.ongoing_checkpoint.current_page += 1;
|
||||
continue 'checkpoint_loop;
|
||||
}
|
||||
}
|
||||
self.ongoing_checkpoint.current_page += 1;
|
||||
}
|
||||
CheckpointState::WaitReadFrame => {
|
||||
if self.ongoing_checkpoint.page.is_locked() {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
} else {
|
||||
self.ongoing_checkpoint.state = CheckpointState::WritePage;
|
||||
}
|
||||
}
|
||||
CheckpointState::WritePage => {
|
||||
self.ongoing_checkpoint.page.set_dirty();
|
||||
begin_write_btree_page(
|
||||
pager,
|
||||
&self.ongoing_checkpoint.page,
|
||||
write_counter.clone(),
|
||||
)?;
|
||||
self.ongoing_checkpoint.state = CheckpointState::WaitWritePage;
|
||||
}
|
||||
CheckpointState::WaitWritePage => {
|
||||
if *write_counter.borrow() > 0 {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
let shared = self.shared.read().unwrap();
|
||||
if (self.ongoing_checkpoint.current_page as usize)
|
||||
< shared.pages_in_frames.len()
|
||||
{
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
} else {
|
||||
self.ongoing_checkpoint.state = CheckpointState::Done;
|
||||
}
|
||||
}
|
||||
CheckpointState::Done => {
|
||||
if *write_counter.borrow() > 0 {
|
||||
return Ok(CheckpointStatus::IO);
|
||||
}
|
||||
let mut shared = self.shared.write().unwrap();
|
||||
shared.frame_cache.clear();
|
||||
shared.pages_in_frames.clear();
|
||||
shared.max_frame = 0;
|
||||
shared.nbackfills = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
return Ok(CheckpointStatus::Done);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: only clear checkpointed frames
|
||||
shared.frame_cache.clear();
|
||||
shared.max_frame = 0;
|
||||
shared.nbackfills = 0;
|
||||
self.ongoing_checkpoint.clear();
|
||||
Ok(CheckpointStatus::Done)
|
||||
}
|
||||
|
||||
fn sync(&mut self) -> Result<CheckpointStatus> {
|
||||
let shared = self.shared.write().unwrap();
|
||||
{
|
||||
let syncing = self.syncing.clone();
|
||||
let completion = Completion::Sync(SyncCompletion {
|
||||
complete: Box::new(move |_| {
|
||||
*syncing.borrow_mut() = false;
|
||||
}),
|
||||
});
|
||||
shared.file.sync(Rc::new(completion))?;
|
||||
let state = *self.sync_state.borrow();
|
||||
match state {
|
||||
SyncState::NotSyncing => {
|
||||
let shared = self.shared.write().unwrap();
|
||||
log::debug!("wal_sync");
|
||||
{
|
||||
let syncing = self.syncing.clone();
|
||||
*syncing.borrow_mut() = true;
|
||||
let completion = Completion::Sync(SyncCompletion {
|
||||
complete: Box::new(move |_| {
|
||||
log::debug!("wal_sync finish");
|
||||
*syncing.borrow_mut() = false;
|
||||
}),
|
||||
});
|
||||
shared.file.sync(Rc::new(completion))?;
|
||||
}
|
||||
self.sync_state.replace(SyncState::Syncing);
|
||||
Ok(CheckpointStatus::IO)
|
||||
}
|
||||
SyncState::Syncing => {
|
||||
if *self.syncing.borrow() {
|
||||
Ok(CheckpointStatus::IO)
|
||||
} else {
|
||||
self.sync_state.replace(SyncState::NotSyncing);
|
||||
Ok(CheckpointStatus::Done)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if *self.syncing.borrow() {
|
||||
Ok(CheckpointStatus::IO)
|
||||
} else {
|
||||
Ok(CheckpointStatus::Done)
|
||||
}
|
||||
fn get_max_frame(&self) -> u64 {
|
||||
self.max_frame
|
||||
}
|
||||
|
||||
fn get_min_frame(&self) -> u64 {
|
||||
self.min_frame
|
||||
}
|
||||
}
|
||||
|
||||
impl WalFile {
|
||||
pub fn new(io: Arc<dyn IO>, page_size: usize, shared: Arc<RwLock<WalFileShared>>) -> Self {
|
||||
pub fn new(
|
||||
io: Arc<dyn IO>,
|
||||
page_size: usize,
|
||||
shared: Arc<RwLock<WalFileShared>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
) -> Self {
|
||||
let checkpoint_page = Arc::new(Page::new(0));
|
||||
let buffer = buffer_pool.get();
|
||||
{
|
||||
let buffer_pool = buffer_pool.clone();
|
||||
let drop_fn = Rc::new(move |buf| {
|
||||
buffer_pool.put(buf);
|
||||
});
|
||||
checkpoint_page.get().contents = Some(PageContent {
|
||||
offset: 0,
|
||||
buffer: Rc::new(RefCell::new(Buffer::new(buffer, drop_fn))),
|
||||
overflow_cells: Vec::new(),
|
||||
});
|
||||
}
|
||||
Self {
|
||||
io,
|
||||
shared,
|
||||
ongoing_checkpoint: HashSet::new(),
|
||||
ongoing_checkpoint: OngoingCheckpoint {
|
||||
page: checkpoint_page,
|
||||
state: CheckpointState::Start,
|
||||
min_frame: 0,
|
||||
max_frame: 0,
|
||||
current_page: 0,
|
||||
},
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
checkpoint_threshold: 1000,
|
||||
page_size,
|
||||
max_frame: 0,
|
||||
min_frame: 0,
|
||||
buffer_pool,
|
||||
sync_state: RefCell::new(SyncState::NotSyncing),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -322,6 +487,7 @@ impl WalFileShared {
|
||||
frame_cache: HashMap::new(),
|
||||
last_checksum: checksum,
|
||||
file,
|
||||
pages_in_frames: Vec::new(),
|
||||
};
|
||||
Ok(Arc::new(RwLock::new(shared)))
|
||||
}
|
||||
|
||||
@@ -257,7 +257,7 @@ mod tests {
|
||||
for i in 0..iterations {
|
||||
let insert_query = format!("INSERT INTO test VALUES ({})", i);
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
conn.clear_page_cache().unwrap();
|
||||
conn.checkpoint().unwrap();
|
||||
match conn.query(insert_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
|
||||
Reference in New Issue
Block a user