Merge pull request #18 from penberg/async-io

Asynchronous I/O
This commit is contained in:
Pekka Enberg
2024-01-24 17:22:58 +02:00
committed by GitHub
6 changed files with 44 additions and 26 deletions

View File

@@ -33,10 +33,10 @@ fn main() -> anyhow::Result<()> {
let opts = Opts::parse();
let path = opts.database.to_str().unwrap();
let io = Arc::new(limbo_core::PlatformIO::new()?);
let db = Database::open_file(io, path)?;
let db = Database::open_file(io.clone(), path)?;
let conn = db.connect();
if let Some(sql) = opts.sql {
query(&conn, &sql, &opts.output_mode)?;
query(io.clone(), &conn, &sql, &opts.output_mode)?;
return Ok(());
}
let mut rl = DefaultEditor::new()?;
@@ -51,7 +51,7 @@ fn main() -> anyhow::Result<()> {
match readline {
Ok(line) => {
rl.add_history_entry(line.to_owned())?;
query(&conn, &line, &opts.output_mode)?;
query(io.clone(), &conn, &line, &opts.output_mode)?;
}
Err(ReadlineError::Interrupted) => {
break;
@@ -68,7 +68,7 @@ fn main() -> anyhow::Result<()> {
Ok(())
}
fn query(conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) -> anyhow::Result<()> {
fn query(io: Arc<dyn limbo_core::IO>, conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) -> anyhow::Result<()> {
match conn.query(sql) {
Ok(Some(ref mut rows)) => match output_mode {
OutputMode::Raw => loop {
@@ -87,7 +87,7 @@ fn query(conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) ->
println!();
}
RowResult::IO => {
todo!();
io.run_once()?;
}
RowResult::Done => break,
}
@@ -111,7 +111,7 @@ fn query(conn: &limbo_core::Connection, sql: &str, output_mode: &OutputMode) ->
);
}
RowResult::IO => {
todo!();
io.run_once()?;
}
RowResult::Done => break,
}

View File

@@ -34,6 +34,8 @@ impl IO for LinuxIO {
let mut ring = self.ring.borrow_mut();
ring.submit_and_wait(1)?;
let cqe = ring.completion().next().expect("completion queue is empty");
let c = unsafe { Arc::from_raw(cqe.user_data() as *const Completion) };
c.complete();
Ok(())
}
}
@@ -63,11 +65,6 @@ impl File for LinuxFile {
.push(&read_e)
.expect("submission queue is full");
}
// TODO: move this to run_once()
ring.submit_and_wait(1)?;
let cqe = ring.completion().next().expect("completion queue is empty");
let c = unsafe { Arc::from_raw(cqe.user_data() as *const Completion) };
c.complete();
Ok(())
}
}

View File

@@ -33,14 +33,16 @@ pub struct Database {
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<impl crate::io::IO>, path: &str) -> Result<Database> {
pub fn open_file(io: Arc<dyn crate::io::IO>, path: &str) -> Result<Database> {
let file = io.open_file(path)?;
let storage = storage::PageSource::from_file(file);
Self::open(io, storage)
}
pub fn open(io: Arc<impl crate::io::IO>, page_source: PageSource) -> Result<Database> {
let pager = Arc::new(Pager::open(page_source)?);
pub fn open(io: Arc<dyn crate::io::IO>, page_source: PageSource) -> Result<Database> {
let db_header = Pager::begin_open(&page_source)?;
io.run_once()?;
let pager = Arc::new(Pager::finish_open(db_header, page_source)?);
let bootstrap_schema = Arc::new(Schema::new());
let conn = Connection {
pager: pager.clone(),

View File

@@ -1,14 +1,14 @@
use crate::buffer_pool::BufferPool;
use crate::sqlite3_ondisk;
use crate::sqlite3_ondisk::{self, DatabaseHeader};
use crate::sqlite3_ondisk::BTreePage;
use crate::PageSource;
use concurrent_lru::unsharded::LruCache;
use log::trace;
use std::sync::RwLock;
use std::sync::{RwLock, Mutex};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use log::trace;
pub struct Page {
flags: AtomicUsize,
@@ -74,8 +74,12 @@ pub struct Pager {
}
impl Pager {
pub fn open(page_source: PageSource) -> anyhow::Result<Self> {
let db_header = sqlite3_ondisk::read_database_header(&page_source)?;
pub fn begin_open(page_source: &PageSource) -> anyhow::Result<Arc<Mutex<DatabaseHeader>>> {
sqlite3_ondisk::begin_read_database_header(page_source)
}
pub fn finish_open(db_header: Arc<Mutex<DatabaseHeader>>, page_source: PageSource) -> anyhow::Result<Self> {
let db_header = db_header.lock().unwrap();
let page_size = db_header.page_size as usize;
let buffer_pool = Arc::new(BufferPool::new(page_size));
let page_cache = LruCache::new(10);

View File

@@ -29,8 +29,8 @@ use crate::pager::Page;
use crate::types::{Record, Value};
use crate::PageSource;
use anyhow::{anyhow, Result};
use std::sync::{Arc, Mutex};
use log::trace;
use std::sync::Arc;
/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
@@ -62,15 +62,23 @@ pub struct DatabaseHeader {
version_number: u32,
}
pub fn read_database_header(page_source: &PageSource) -> Result<DatabaseHeader> {
pub fn begin_read_database_header(page_source: &PageSource) -> Result<Arc<Mutex<DatabaseHeader>>> {
let drop_fn = Arc::new(|_buf| {});
let buf = Buffer::allocate(512, drop_fn);
let complete = Box::new(move |_buf: &Buffer| {});
let result = Arc::new(Mutex::new(DatabaseHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: &Buffer| {
let header = header.clone();
finish_read_database_header(buf, header).unwrap();
});
let c = Arc::new(Completion::new(buf, complete));
page_source.get(1, c.clone())?;
let buf = c.buf();
Ok(result)
}
fn finish_read_database_header(buf: &Buffer, header: Arc<Mutex<DatabaseHeader>>) -> Result<()> {
let buf = buf.as_slice();
let mut header = DatabaseHeader::default();
let mut header = header.lock().unwrap();
header.magic.copy_from_slice(&buf[0..16]);
header.page_size = u16::from_be_bytes([buf[16], buf[17]]);
header.write_version = buf[18];
@@ -94,7 +102,7 @@ pub fn read_database_header(page_source: &PageSource) -> Result<DatabaseHeader>
header.reserved.copy_from_slice(&buf[72..92]);
header.version_valid_for = u32::from_be_bytes([buf[92], buf[93], buf[94], buf[95]]);
header.version_number = u32::from_be_bytes([buf[96], buf[97], buf[98], buf[99]]);
Ok(header)
Ok(())
}
#[derive(Debug)]

View File

@@ -215,6 +215,7 @@ impl Program {
match cursor.rewind()? {
CursorResult::Ok(()) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
@@ -258,7 +259,13 @@ impl Program {
}
Insn::NextAsync { cursor_id } => {
let cursor = state.cursors.get_mut(cursor_id).unwrap();
cursor.next()?;
match cursor.next()? {
CursorResult::Ok(_) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
state.pc += 1;
}
Insn::NextAwait {