diff --git a/cli/main.rs b/cli/main.rs index 254b159c4..49232ac17 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -33,10 +33,10 @@ fn main() -> anyhow::Result<()> { let opts = Opts::parse(); let path = opts.database.to_str().unwrap(); let io = Arc::new(limbo_core::PlatformIO::new()?); - let db = Database::open_file(io, path)?; + let db = Database::open_file(io.clone(), path)?; let conn = db.connect(); if let Some(sql) = opts.sql { - query(&conn, &sql, &opts.output_mode)?; + query(io.clone(), &conn, &sql, &opts.output_mode)?; return Ok(()); } let mut rl = DefaultEditor::new()?; @@ -51,7 +51,7 @@ fn main() -> anyhow::Result<()> { match readline { Ok(line) => { rl.add_history_entry(line.to_owned())?; - query(&conn, &line, &opts.output_mode)?; + query(io.clone(), &conn, &line, &opts.output_mode)?; } Err(ReadlineError::Interrupted) => { break; @@ -68,7 +68,7 @@ fn main() -> anyhow::Result<()> { Ok(()) } -fn query(conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) -> anyhow::Result<()> { +fn query(io: Arc, conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) -> anyhow::Result<()> { match conn.query(sql) { Ok(Some(ref mut rows)) => match output_mode { OutputMode::Raw => loop { @@ -87,7 +87,7 @@ fn query(conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) -> println!(); } RowResult::IO => { - todo!(); + io.run_once()?; } RowResult::Done => break, } @@ -111,7 +111,7 @@ fn query(conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) -> ); } RowResult::IO => { - todo!(); + io.run_once()?; } RowResult::Done => break, } diff --git a/core/io/linux.rs b/core/io/linux.rs index e745c82c1..959b9014a 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -34,6 +34,8 @@ impl IO for LinuxIO { let mut ring = self.ring.borrow_mut(); ring.submit_and_wait(1)?; let cqe = ring.completion().next().expect("completion queue is empty"); + let c = unsafe { Arc::from_raw(cqe.user_data() as *const Completion) }; + c.complete(); Ok(()) } } @@ -63,11 +65,6 @@ impl File for LinuxFile { .push(&read_e) .expect("submission queue is full"); } - // TODO: move this to run_once() - ring.submit_and_wait(1)?; - let cqe = ring.completion().next().expect("completion queue is empty"); - let c = unsafe { Arc::from_raw(cqe.user_data() as *const Completion) }; - c.complete(); Ok(()) } } \ No newline at end of file diff --git a/core/lib.rs b/core/lib.rs index 340aa269d..0bae65495 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -33,14 +33,16 @@ pub struct Database { impl Database { #[cfg(feature = "fs")] - pub fn open_file(io: Arc, path: &str) -> Result { + pub fn open_file(io: Arc, path: &str) -> Result { let file = io.open_file(path)?; let storage = storage::PageSource::from_file(file); Self::open(io, storage) } - pub fn open(io: Arc, page_source: PageSource) -> Result { - let pager = Arc::new(Pager::open(page_source)?); + pub fn open(io: Arc, page_source: PageSource) -> Result { + let db_header = Pager::begin_open(&page_source)?; + io.run_once()?; + let pager = Arc::new(Pager::finish_open(db_header, page_source)?); let bootstrap_schema = Arc::new(Schema::new()); let conn = Connection { pager: pager.clone(), diff --git a/core/pager.rs b/core/pager.rs index 124216754..e6ae09a80 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -1,14 +1,14 @@ use crate::buffer_pool::BufferPool; -use crate::sqlite3_ondisk; +use crate::sqlite3_ondisk::{self, DatabaseHeader}; use crate::sqlite3_ondisk::BTreePage; use crate::PageSource; use concurrent_lru::unsharded::LruCache; -use log::trace; -use std::sync::RwLock; +use std::sync::{RwLock, Mutex}; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use log::trace; pub struct Page { flags: AtomicUsize, @@ -74,8 +74,12 @@ pub struct Pager { } impl Pager { - pub fn open(page_source: PageSource) -> anyhow::Result { - let db_header = sqlite3_ondisk::read_database_header(&page_source)?; + pub fn begin_open(page_source: &PageSource) -> anyhow::Result>> { + sqlite3_ondisk::begin_read_database_header(page_source) + } + + pub fn finish_open(db_header: Arc>, page_source: PageSource) -> anyhow::Result { + let db_header = db_header.lock().unwrap(); let page_size = db_header.page_size as usize; let buffer_pool = Arc::new(BufferPool::new(page_size)); let page_cache = LruCache::new(10); diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 089a2cc59..02a6a4aab 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -29,8 +29,8 @@ use crate::pager::Page; use crate::types::{Record, Value}; use crate::PageSource; use anyhow::{anyhow, Result}; +use std::sync::{Arc, Mutex}; use log::trace; -use std::sync::Arc; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; @@ -62,15 +62,23 @@ pub struct DatabaseHeader { version_number: u32, } -pub fn read_database_header(page_source: &PageSource) -> Result { +pub fn begin_read_database_header(page_source: &PageSource) -> Result>> { let drop_fn = Arc::new(|_buf| {}); let buf = Buffer::allocate(512, drop_fn); - let complete = Box::new(move |_buf: &Buffer| {}); + let result = Arc::new(Mutex::new(DatabaseHeader::default())); + let header = result.clone(); + let complete = Box::new(move |buf: &Buffer| { + let header = header.clone(); + finish_read_database_header(buf, header).unwrap(); + }); let c = Arc::new(Completion::new(buf, complete)); page_source.get(1, c.clone())?; - let buf = c.buf(); + Ok(result) +} + +fn finish_read_database_header(buf: &Buffer, header: Arc>) -> Result<()> { let buf = buf.as_slice(); - let mut header = DatabaseHeader::default(); + let mut header = header.lock().unwrap(); header.magic.copy_from_slice(&buf[0..16]); header.page_size = u16::from_be_bytes([buf[16], buf[17]]); header.write_version = buf[18]; @@ -94,7 +102,7 @@ pub fn read_database_header(page_source: &PageSource) -> Result header.reserved.copy_from_slice(&buf[72..92]); header.version_valid_for = u32::from_be_bytes([buf[92], buf[93], buf[94], buf[95]]); header.version_number = u32::from_be_bytes([buf[96], buf[97], buf[98], buf[99]]); - Ok(header) + Ok(()) } #[derive(Debug)] diff --git a/core/vdbe.rs b/core/vdbe.rs index 158591bd7..3c53a2d7e 100644 --- a/core/vdbe.rs +++ b/core/vdbe.rs @@ -215,6 +215,7 @@ impl Program { match cursor.rewind()? { CursorResult::Ok(()) => {} CursorResult::IO => { + // If there is I/O, the instruction is restarted. return Ok(StepResult::IO); } } @@ -258,7 +259,13 @@ impl Program { } Insn::NextAsync { cursor_id } => { let cursor = state.cursors.get_mut(cursor_id).unwrap(); - cursor.next()?; + match cursor.next()? { + CursorResult::Ok(_) => {} + CursorResult::IO => { + // If there is I/O, the instruction is restarted. + return Ok(StepResult::IO); + } + } state.pc += 1; } Insn::NextAwait {