diff --git a/core/storage/pager.rs b/core/storage/pager.rs index b36ce6f39..39cabb1a0 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -3,7 +3,9 @@ use crate::result::LimboResult; use crate::storage::btree::BTreePageInner; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; -use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; +use crate::storage::sqlite3_ondisk::{ + self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID, +}; use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus}; use crate::Completion; use crate::{Buffer, LimboError, Result}; @@ -382,8 +384,18 @@ impl Pager { } /// Writes the database header. - pub fn write_database_header(&self, header: &DatabaseHeader) { - sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header"); + pub fn write_database_header(&self, header: &DatabaseHeader) -> Result<()> { + let header_page = self.read_page(DATABASE_HEADER_PAGE_ID)?; + while header_page.is_locked() { + self.io.run_once()?; + } + header_page.set_dirty(); + self.add_dirty(DATABASE_HEADER_PAGE_ID); + + let contents = header_page.get().contents.as_ref().unwrap(); + contents.write_database_header(&header); + + Ok(()) } /// Changes the size of the page cache. @@ -665,24 +677,8 @@ impl Pager { let header = &self.db_header; let mut header = header.lock(); header.database_size += 1; - { - // update database size - // read sync for now - loop { - let first_page_ref = self.read_page(1)?; - if first_page_ref.is_locked() { - // FIXME: we should never run io here! - self.io.run_once()?; - continue; - } - first_page_ref.set_dirty(); - self.add_dirty(1); - - let contents = first_page_ref.get().contents.as_ref().unwrap(); - contents.write_database_header(&header); - break; - } - } + // update database size + self.write_database_header(&mut header)?; // FIXME: should reserve page cache entry before modifying the database let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0); @@ -694,13 +690,11 @@ impl Pager { let page_key = PageCacheKey::new(page.get().id); let mut cache = self.page_cache.write(); match cache.insert(page_key, page.clone()) { - Err(CacheError::Full) => return Err(LimboError::CacheFull), - Err(_) => { - return Err(LimboError::InternalError( - "Unknown error inserting page to cache".into(), - )) - } - Ok(_) => return Ok(page), + Err(CacheError::Full) => Err(LimboError::CacheFull), + Err(_) => Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )), + Ok(_) => Ok(page), } } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 6fa6d5ba0..bb510446f 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -83,6 +83,8 @@ const MAX_PAGE_SIZE: u32 = 65536; /// The default page size in bytes. const DEFAULT_PAGE_SIZE: u16 = 4096; +pub const DATABASE_HEADER_PAGE_ID: usize = 1; + /// The database header. /// The first 100 bytes of the database file comprise the database file header. /// The database file header is divided into fields as shown by the table below. @@ -296,7 +298,7 @@ pub fn begin_read_database_header( }); let c = Completion::Read(ReadCompletion::new(buf, complete)); #[allow(clippy::arc_with_non_send_sync)] - db_file.read_page(1, Arc::new(c))?; + db_file.read_page(DATABASE_HEADER_PAGE_ID, Arc::new(c))?; Ok(result) } @@ -336,48 +338,6 @@ fn finish_read_database_header( Ok(()) } -pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Result<()> { - let page_source = pager.db_file.clone(); - let header = Rc::new(header.clone()); - - let drop_fn = Rc::new(|_buf| {}); - #[allow(clippy::arc_with_non_send_sync)] - let buffer_to_copy = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let buffer_to_copy_in_cb = buffer_to_copy.clone(); - - let read_complete = Box::new(move |buffer: Arc>| { - let buffer = buffer.borrow().clone(); - let buffer = Rc::new(RefCell::new(buffer)); - let mut buf_mut = buffer.borrow_mut(); - write_header_to_buf(buf_mut.as_mut_slice(), &header); - let mut dest_buf = buffer_to_copy_in_cb.borrow_mut(); - dest_buf.as_mut_slice().copy_from_slice(buf_mut.as_slice()); - }); - - let drop_fn = Rc::new(|_buf| {}); - #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let c = Completion::Read(ReadCompletion::new(buf, read_complete)); - #[allow(clippy::arc_with_non_send_sync)] - page_source.read_page(1, Arc::new(c))?; - // run get header block - pager.io.run_once()?; - - let buffer_to_copy_in_cb = buffer_to_copy.clone(); - let write_complete = Box::new(move |bytes_written: i32| { - let buf_len = buffer_to_copy_in_cb.borrow().len(); - if bytes_written < buf_len as i32 { - tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); - } - // finish_read_database_header(buf, header).unwrap(); - }); - - let c = Completion::Write(WriteCompletion::new(write_complete)); - page_source.write_page(1, buffer_to_copy, Arc::new(c))?; - - Ok(()) -} - pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) { buf[0..16].copy_from_slice(&header.magic); buf[16..18].copy_from_slice(&header.page_size.to_be_bytes()); @@ -833,7 +793,7 @@ pub fn finish_read_page( page: PageRef, ) -> Result<()> { trace!("finish_read_btree_page(page_idx = {})", page_idx); - let pos = if page_idx == 1 { + let pos = if page_idx == DATABASE_HEADER_PAGE_ID { DATABASE_HEADER_SIZE } else { 0 diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 7859d64ed..5c36d5474 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -125,7 +125,7 @@ fn update_pragma( }, _ => bail_parse_error!("Not a valid value"), }; - update_cache_size(cache_size, header, pager); + update_cache_size(cache_size, header, pager)?; Ok(()) } PragmaName::JournalMode => { @@ -283,7 +283,11 @@ fn query_pragma( Ok(()) } -fn update_cache_size(value: i64, header: Arc>, pager: Rc) { +fn update_cache_size( + value: i64, + header: Arc>, + pager: Rc, +) -> crate::Result<()> { let mut cache_size_unformatted: i64 = value; let mut cache_size = if cache_size_unformatted < 0 { let kb = cache_size_unformatted.abs() * 1024; @@ -306,10 +310,12 @@ fn update_cache_size(value: i64, header: Arc>, pager: R .unwrap_or_else(|_| panic!("invalid value, too big for a i32 {}", value)); // update in disk - pager.write_database_header(&header_guard); + pager.write_database_header(&header_guard)?; // update cache size pager .change_page_cache_size(cache_size) .expect("couldn't update page cache size"); + + Ok(()) } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 3e24caa33..9879c106e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4572,7 +4572,7 @@ pub fn op_set_cookie( Cookie::UserVersion => { let mut header_guard = pager.db_header.lock(); header_guard.user_version = *value; - pager.write_database_header(&*header_guard); + pager.write_database_header(&*header_guard)?; } cookie => todo!("{cookie:?} is not yet implement for SetCookie"), }