use std::collections::{HashMap, HashSet}; use std::{cell::RefCell, rc::Rc, sync::Arc}; use log::{debug, trace}; use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::Completion; use crate::{storage::pager::Page, Result}; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; use super::buffer_pool::BufferPool; use super::pager::Pager; use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader}; /// Write-ahead log (WAL). pub trait Wal { /// Begin a read transaction. fn begin_read_tx(&self) -> Result<()>; /// Begin a write transaction. fn begin_write_tx(&self) -> Result<()>; /// End a read transaction. fn end_read_tx(&self) -> Result<()>; /// End a write transaction. fn end_write_tx(&self) -> Result<()>; /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result>; /// Read a frame from the WAL. fn read_frame( &self, frame_id: u64, page: Rc>, buffer_pool: Rc, ) -> Result<()>; /// Write a frame to the WAL. fn append_frame( &mut self, page: Rc>, db_size: u32, pager: &Pager, write_counter: Rc>, ) -> Result<()>; fn should_checkpoint(&self) -> bool; fn checkpoint( &mut self, pager: &Pager, write_counter: Rc>, ) -> Result; fn sync(&mut self) -> Result; } pub struct WalFile { io: Arc, wal_path: String, file: RefCell>>, wal_header: RefCell>>>, min_frame: RefCell, max_frame: RefCell, nbackfills: RefCell, // Maps pgno to frame id and offset in wal file frame_cache: RefCell>>, // FIXME: for now let's use a simple hashmap instead of a shm file checkpoint_threshold: usize, ongoing_checkpoint: HashSet, syncing: Rc>, page_size: usize, last_checksum: RefCell<(u32, u32)>, // Check of last frame in WAL, this is a cumulative checksum // over all frames in the WAL } pub enum CheckpointStatus { Done, IO, } impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&self) -> Result<()> { self.min_frame.replace(*self.nbackfills.borrow() + 1); Ok(()) } /// End a read transaction. fn end_read_tx(&self) -> Result<()> { Ok(()) } /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { let frame_cache = self.frame_cache.borrow(); let frames = frame_cache.get(&page_id); if frames.is_none() { return Ok(None); } self.ensure_init()?; let frames = frames.unwrap(); for frame in frames.iter().rev() { if *frame <= *self.max_frame.borrow() { return Ok(Some(*frame)); } } Ok(None) } /// Read a frame from the WAL. fn read_frame( &self, frame_id: u64, page: Rc>, buffer_pool: Rc, ) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); begin_read_wal_frame( self.file.borrow().as_ref().unwrap(), offset + WAL_FRAME_HEADER_SIZE, buffer_pool, page, )?; Ok(()) } /// Write a frame to the WAL. fn append_frame( &mut self, page: Rc>, db_size: u32, _pager: &Pager, write_counter: Rc>, ) -> Result<()> { self.ensure_init()?; let page_id = page.borrow().id; let frame_id = *self.max_frame.borrow(); let offset = self.frame_offset(frame_id); trace!( "append_frame(frame={}, offset={}, page_id={})", frame_id, offset, page_id ); let header = self.wal_header.borrow(); let header = header.as_ref().unwrap(); let header = header.borrow(); let checksums = *self.last_checksum.borrow(); let checksums = begin_write_wal_frame( self.file.borrow().as_ref().unwrap(), offset, &page, db_size, write_counter, &header, checksums, )?; self.last_checksum.replace(checksums); self.max_frame.replace(frame_id + 1); { let mut frame_cache = self.frame_cache.borrow_mut(); let frames = frame_cache.get_mut(&(page_id as u64)); match frames { Some(frames) => frames.push(frame_id), None => { frame_cache.insert(page_id as u64, vec![frame_id]); } } } Ok(()) } /// Begin a write transaction fn begin_write_tx(&self) -> Result<()> { Ok(()) } /// End a write transaction fn end_write_tx(&self) -> Result<()> { Ok(()) } fn should_checkpoint(&self) -> bool { let frame_id = *self.max_frame.borrow() as usize; frame_id >= self.checkpoint_threshold } fn checkpoint( &mut self, pager: &Pager, write_counter: Rc>, ) -> Result { for (page_id, _frames) in self.frame_cache.borrow().iter() { // move page from WAL to database file // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf let page_id = *page_id as usize; if self.ongoing_checkpoint.contains(&page_id) { continue; } let page = pager.read_page(page_id)?; if page.borrow().is_locked() { return Ok(CheckpointStatus::IO); } begin_write_btree_page(pager, &page, write_counter.clone())?; self.ongoing_checkpoint.insert(page_id); } self.frame_cache.borrow_mut().clear(); *self.max_frame.borrow_mut() = 0; self.ongoing_checkpoint.clear(); Ok(CheckpointStatus::Done) } fn sync(&mut self) -> Result { self.ensure_init()?; let file = self.file.borrow(); let file = file.as_ref().unwrap(); { let syncing = self.syncing.clone(); let completion = Completion::Sync(SyncCompletion { complete: Box::new(move |_| { *syncing.borrow_mut() = false; }), }); file.sync(Rc::new(completion))?; } if *self.syncing.borrow() { Ok(CheckpointStatus::IO) } else { Ok(CheckpointStatus::Done) } } } impl WalFile { pub fn new(io: Arc, wal_path: String, page_size: usize) -> Self { Self { io, wal_path, file: RefCell::new(None), wal_header: RefCell::new(None), frame_cache: RefCell::new(HashMap::new()), min_frame: RefCell::new(0), max_frame: RefCell::new(0), nbackfills: RefCell::new(0), checkpoint_threshold: 1000, ongoing_checkpoint: HashSet::new(), syncing: Rc::new(RefCell::new(false)), page_size, last_checksum: RefCell::new((0, 0)), } } fn ensure_init(&self) -> Result<()> { if self.file.borrow().is_none() { match self .io .open_file(&self.wal_path, crate::io::OpenFlags::Create, false) { Ok(file) => { if file.size()? > 0 { let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { Ok(header) => header, Err(err) => panic!("Couldn't read header page: {:?}", err), }; // TODO: Return a completion instead. self.io.run_once()?; self.wal_header.replace(Some(wal_header)); } else { // magic is a single number represented as WAL_MAGIC_LE but the big endian // counterpart is just the same number with LSB set to 1. let magic = if cfg!(target_endian = "big") { WAL_MAGIC_BE } else { WAL_MAGIC_LE }; let mut wal_header = WalHeader { magic, file_format: 3007000, page_size: self.page_size as u32, checkpoint_seq: 0, // TODO implement sequence number salt_1: 0, // TODO implement salt salt_2: 0, checksum_1: 0, checksum_2: 0, }; let native = cfg!(target_endian = "big"); // if target_endian is // already big then we don't care but if isn't, header hasn't yet been // encoded to big endian, therefore we wan't to swap bytes to compute this // checksum. let checksums = *self.last_checksum.borrow_mut(); let checksums = checksum_wal( &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes &wal_header, checksums, native, // this is false because we haven't encoded the wal header yet ); wal_header.checksum_1 = checksums.0; wal_header.checksum_2 = checksums.1; self.last_checksum.replace(checksums); sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; self.wal_header .replace(Some(Rc::new(RefCell::new(wal_header)))); } *self.file.borrow_mut() = Some(file); } Err(err) => panic!("{:?} {}", err, &self.wal_path), }; } Ok(()) } fn frame_offset(&self, frame_id: u64) -> usize { let header = self.wal_header.borrow(); let header = header.as_ref().unwrap().borrow(); let page_size = header.page_size; let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64); let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } }