mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
fix buffer pool is not thread safe problem
This commit is contained in:
@@ -234,7 +234,7 @@ impl Database {
|
||||
}
|
||||
|
||||
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
|
||||
let buffer_pool = Rc::new(BufferPool::new(None));
|
||||
let buffer_pool = Arc::new(BufferPool::new(None));
|
||||
|
||||
// Open existing WAL file if present
|
||||
if let Some(shared_wal) = self.maybe_shared_wal.read().clone() {
|
||||
|
||||
@@ -6869,7 +6869,7 @@ mod tests {
|
||||
let db_file = Arc::new(DatabaseFile::new(io_file));
|
||||
let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap();
|
||||
|
||||
let buffer_pool = Rc::new(BufferPool::new(Some(page_size as usize)));
|
||||
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
|
||||
let wal_shared = WalFileShared::new_shared(page_size, &io, wal_file).unwrap();
|
||||
let wal_file = WalFile::new(io.clone(), wal_shared, buffer_pool.clone());
|
||||
let wal = Rc::new(RefCell::new(wal_file));
|
||||
@@ -7377,7 +7377,7 @@ mod tests {
|
||||
fn setup_test_env(database_size: u32) -> Rc<Pager> {
|
||||
let page_size = 512;
|
||||
|
||||
let buffer_pool = Rc::new(BufferPool::new(Some(page_size as usize)));
|
||||
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
|
||||
|
||||
// Initialize buffer pool with correctly sized buffers
|
||||
for _ in 0..10 {
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use crate::io::BufferData;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use parking_lot::Mutex;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
pub struct BufferPool {
|
||||
pub free_buffers: RefCell<Vec<BufferData>>,
|
||||
page_size: Cell<usize>,
|
||||
pub free_buffers: Mutex<Vec<BufferData>>,
|
||||
page_size: AtomicUsize,
|
||||
}
|
||||
|
||||
const DEFAULT_PAGE_SIZE: usize = 4096;
|
||||
@@ -12,26 +13,33 @@ const DEFAULT_PAGE_SIZE: usize = 4096;
|
||||
impl BufferPool {
|
||||
pub fn new(page_size: Option<usize>) -> Self {
|
||||
Self {
|
||||
free_buffers: RefCell::new(Vec::new()),
|
||||
page_size: Cell::new(page_size.unwrap_or(DEFAULT_PAGE_SIZE)),
|
||||
free_buffers: Mutex::new(Vec::new()),
|
||||
page_size: AtomicUsize::new(page_size.unwrap_or(DEFAULT_PAGE_SIZE)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_page_size(&self, page_size: usize) {
|
||||
self.page_size.set(page_size);
|
||||
self.page_size.store(page_size, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get(&self) -> BufferData {
|
||||
let mut free_buffers = self.free_buffers.borrow_mut();
|
||||
if let Some(buffer) = free_buffers.pop() {
|
||||
buffer
|
||||
} else {
|
||||
Pin::new(vec![0; self.page_size.get()])
|
||||
}
|
||||
let buffer = self.free_buffers.lock().pop();
|
||||
buffer.unwrap_or_else(|| Pin::new(vec![0; self.page_size.load(Ordering::Relaxed)]))
|
||||
}
|
||||
|
||||
pub fn put(&self, buffer: BufferData) {
|
||||
let mut free_buffers = self.free_buffers.borrow_mut();
|
||||
free_buffers.push(buffer);
|
||||
self.free_buffers.lock().push(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn is_send_sync_static<T: Send + Sync + 'static>() {}
|
||||
|
||||
#[test]
|
||||
fn test_send_sync() {
|
||||
is_send_sync_static::<BufferPool>();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ pub struct Pager {
|
||||
/// A page cache for the database.
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
/// Buffer pool for temporary data storage.
|
||||
pub buffer_pool: Rc<BufferPool>,
|
||||
pub buffer_pool: Arc<BufferPool>,
|
||||
/// I/O interface for input/output operations.
|
||||
pub io: Arc<dyn crate::io::IO>,
|
||||
dirty_pages: Rc<RefCell<HashSet<usize>>>,
|
||||
@@ -264,7 +264,7 @@ impl Pager {
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
is_empty: Arc<AtomicUsize>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
) -> Result<Self> {
|
||||
@@ -1183,7 +1183,7 @@ impl Pager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize) -> PageRef {
|
||||
pub fn allocate_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
|
||||
let page = Arc::new(Page::new(page_id));
|
||||
{
|
||||
let buffer = buffer_pool.get();
|
||||
@@ -1469,7 +1469,7 @@ mod ptrmap_tests {
|
||||
));
|
||||
|
||||
// Construct interfaces for the pager
|
||||
let buffer_pool = Rc::new(BufferPool::new(Some(page_size as usize)));
|
||||
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(
|
||||
(initial_db_pages + 10) as usize,
|
||||
)));
|
||||
|
||||
@@ -729,7 +729,7 @@ impl PageContent {
|
||||
|
||||
pub fn begin_read_page(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
page: PageRef,
|
||||
page_idx: usize,
|
||||
) -> Result<()> {
|
||||
@@ -1465,7 +1465,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
||||
pub fn begin_read_wal_frame(
|
||||
io: &Arc<dyn File>,
|
||||
offset: usize,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
complete: Box<dyn Fn(Arc<RefCell<Buffer>>)>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
tracing::trace!("begin_read_wal_frame(offset={})", offset);
|
||||
|
||||
@@ -211,13 +211,13 @@ pub trait Wal {
|
||||
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()>;
|
||||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()>;
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
fn read_frame_raw(
|
||||
&self,
|
||||
frame_id: u64,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
frame: *mut u8,
|
||||
frame_len: u32,
|
||||
) -> Result<Arc<Completion>>;
|
||||
@@ -283,7 +283,7 @@ impl Wal for DummyWAL {
|
||||
&self,
|
||||
_frame_id: u64,
|
||||
_page: crate::PageRef,
|
||||
_buffer_pool: Rc<BufferPool>,
|
||||
_buffer_pool: Arc<BufferPool>,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
@@ -291,7 +291,7 @@ impl Wal for DummyWAL {
|
||||
fn read_frame_raw(
|
||||
&self,
|
||||
_frame_id: u64,
|
||||
_buffer_pool: Rc<BufferPool>,
|
||||
_buffer_pool: Arc<BufferPool>,
|
||||
_frame: *mut u8,
|
||||
_frame_len: u32,
|
||||
) -> Result<Arc<Completion>> {
|
||||
@@ -408,7 +408,7 @@ impl fmt::Debug for OngoingCheckpoint {
|
||||
#[allow(dead_code)]
|
||||
pub struct WalFile {
|
||||
io: Arc<dyn IO>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
|
||||
syncing: Rc<Cell<bool>>,
|
||||
sync_state: Cell<SyncState>,
|
||||
@@ -606,7 +606,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
|
||||
/// Read a frame from the WAL.
|
||||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()> {
|
||||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()> {
|
||||
tracing::debug!("read_frame({})", frame_id);
|
||||
let offset = self.frame_offset(frame_id);
|
||||
page.set_locked();
|
||||
@@ -627,7 +627,7 @@ impl Wal for WalFile {
|
||||
fn read_frame_raw(
|
||||
&self,
|
||||
frame_id: u64,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
frame: *mut u8,
|
||||
frame_len: u32,
|
||||
) -> Result<Arc<Completion>> {
|
||||
@@ -953,7 +953,7 @@ impl WalFile {
|
||||
pub fn new(
|
||||
io: Arc<dyn IO>,
|
||||
shared: Arc<UnsafeCell<WalFileShared>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
) -> Self {
|
||||
let checkpoint_page = Arc::new(Page::new(0));
|
||||
let buffer = buffer_pool.get();
|
||||
|
||||
@@ -5218,7 +5218,7 @@ pub fn op_open_ephemeral(
|
||||
let file = io.open_file("", OpenFlags::Create, true)?;
|
||||
let db_file = Arc::new(FileMemoryStorage::new(file));
|
||||
|
||||
let buffer_pool = Rc::new(BufferPool::new(None));
|
||||
let buffer_pool = Arc::new(BufferPool::new(None));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
|
||||
let pager = Rc::new(Pager::new(
|
||||
|
||||
Reference in New Issue
Block a user