Merge 'In-memory mode' from Preston Thorpe

https://github.com/tursodatabase/limbo/issues/53
This PR implements a (naive) in-memory option and makes it the default
connection when no DB file argument is passed to the CLI. If a
`:memory:` parameter is passed in place of a path to a database file, to
replicate sqlite's behavior.
It's slightly more difficult to test for obvious reasons, so I added
some dumb and probably temporary ones until I can craft a better
solution. Let it be noted that I had never touched `tcl` previously if
that wasn't obvious ;)
also cleaned up a bit of previous pr, replacing`format!` calls to
writeln with `write_fmt` to prevent double allocations.
EDIT: I originally had these additional tests running with the `test-
compat`, but they would hang whenever running on github actions for
whatever reason.

Closes #476
This commit is contained in:
Pekka Enberg
2024-12-15 09:20:06 +02:00
6 changed files with 409 additions and 70 deletions

View File

@@ -1,6 +1,6 @@
use crate::opcodes_dictionary::OPCODE_DESCRIPTIONS;
use cli_table::{Cell, Table};
use limbo_core::{Database, RowResult, Value};
use limbo_core::{Database, LimboError, RowResult, Value};
use clap::{Parser, ValueEnum};
use std::{
@@ -121,9 +121,9 @@ pub struct Limbo {
pub prompt: String,
io: Arc<dyn limbo_core::IO>,
writer: Box<dyn Write>,
conn: Option<Rc<limbo_core::Connection>>,
conn: Rc<limbo_core::Connection>,
output_filename: String,
db_file: Option<String>,
db_file: String,
output_mode: OutputMode,
is_stdout: bool,
input_buff: String,
@@ -133,17 +133,16 @@ pub struct Limbo {
impl Limbo {
#[allow(clippy::arc_with_non_send_sync)]
pub fn new(opts: &Opts) -> anyhow::Result<Self> {
let io = Arc::new(limbo_core::PlatformIO::new()?);
let mut db_file = None;
let conn = if let Some(path) = &opts.database {
let path = path.to_str().unwrap();
db_file = Some(path.to_string());
let db = Database::open_file(io.clone(), path)?;
Some(db.connect())
} else {
println!("No database file specified: Use .open <file> to open a database.");
None
let io: Arc<dyn limbo_core::IO> = match opts.database {
Some(ref path) if path.exists() => Arc::new(limbo_core::PlatformIO::new()?),
_ => Arc::new(limbo_core::MemoryIO::new()?),
};
let db_file = opts
.database
.as_ref()
.map_or(":memory:".to_string(), |p| p.to_string_lossy().to_string());
let db = Database::open_file(io.clone(), &db_file)?;
let conn = db.connect();
let writer: Box<dyn Write> = if !opts.output.is_empty() {
Box::new(std::fs::File::create(&opts.output)?)
} else {
@@ -183,23 +182,18 @@ impl Limbo {
fn show_info(&mut self) -> std::io::Result<()> {
self.writeln("------------------------------\nCurrent settings:")?;
self.writeln(format!("Output mode: {}", self.output_mode))?;
self.writeln(format!(
"DB filename: {}",
self.db_file.clone().unwrap_or(":none:".to_string())
))?;
self.writeln(format!("Output: {}", self.output_filename))?;
self.writeln(format!(
"CWD: {}",
std::env::current_dir().unwrap().display()
))?;
let null_value = if self.null_value.is_empty() {
"\'\'".to_string()
} else {
self.null_value.clone()
};
self.writeln(format!("Null value: {}", null_value))?;
self.writer.flush()
let output = format!(
"Output mode: {}\nDB: {}\nOutput: {}\nCWD: {}\nNull value: {}",
self.output_mode,
self.db_file,
self.output_filename,
std::env::current_dir().unwrap().display(),
match self.null_value.is_empty() {
true => "\'\'".to_string(),
false => self.null_value.clone(),
}
);
self.writeln(output)
}
pub fn reset_input(&mut self) {
@@ -207,19 +201,30 @@ impl Limbo {
self.input_buff.clear();
}
pub fn close_conn(&mut self) {
self.conn.as_mut().map(|c| c.close());
pub fn close_conn(&mut self) -> Result<(), LimboError> {
self.conn.close()
}
fn open_db(&mut self, path: &str) -> anyhow::Result<()> {
if self.conn.is_some() {
// close existing connection if open
self.conn.as_mut().unwrap().close()?;
self.conn.close()?;
match path {
":memory:" => {
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new()?);
self.io = Arc::clone(&io);
let db = Database::open_file(self.io.clone(), path)?;
self.conn = db.connect();
self.db_file = ":memory:".to_string();
return Ok(());
}
path => {
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
self.io = Arc::clone(&io);
let db = Database::open_file(self.io.clone(), path)?;
self.conn = db.connect();
self.db_file = path.to_string();
return Ok(());
}
}
let db = Database::open_file(self.io.clone(), path)?;
self.conn = Some(db.connect());
self.db_file = Some(path.to_string());
Ok(())
}
fn set_output_file(&mut self, path: &str) -> Result<(), String> {
@@ -245,6 +250,11 @@ impl Limbo {
self.output_mode = mode;
}
fn write_fmt(&mut self, fmt: std::fmt::Arguments) -> io::Result<()> {
let _ = self.writer.write_fmt(fmt);
self.writer.write_all(b"\n")
}
fn writeln<D: AsRef<[u8]>>(&mut self, data: D) -> io::Result<()> {
self.writer.write_all(data.as_ref())?;
self.writer.write_all(b"\n")
@@ -300,13 +310,16 @@ impl Limbo {
}
if let Ok(ref cmd) = Command::from_str(args[0]) {
if args.len() < cmd.min_args() {
let _ = self.writeln(format!("Insufficient arguments: USAGE: {}", cmd.useage()));
let _ = self.write_fmt(format_args!(
"Insufficient arguments: USAGE: {}",
cmd.useage()
));
return;
}
match cmd {
Command::Quit => {
let _ = self.writeln("Exiting Limbo SQL Shell.");
self.close_conn();
let _ = self.close_conn();
std::process::exit(0)
}
Command::Open => {
@@ -315,10 +328,6 @@ impl Limbo {
}
}
Command::Schema => {
if self.conn.is_none() {
let _ = self.writeln("Error: no database currently open");
return;
}
let table_name = args.get(1).copied();
if let Err(e) = self.display_schema(table_name) {
let _ = self.writeln(e.to_string());
@@ -328,12 +337,12 @@ impl Limbo {
if args.len() > 1 {
for op in &OPCODE_DESCRIPTIONS {
if op.name.eq_ignore_ascii_case(args.get(1).unwrap().trim()) {
let _ = self.writeln(format!("{}", op));
let _ = self.write_fmt(format_args!("{}", op));
}
}
} else {
for op in &OPCODE_DESCRIPTIONS {
let _ = self.writeln(format!("{}\n", op));
let _ = self.write_fmt(format_args!("{}\n", op));
}
}
}
@@ -351,7 +360,7 @@ impl Limbo {
Command::SetOutput => {
if args.len() == 2 {
if let Err(e) = self.set_output_file(args[1]) {
let _ = self.writeln(format!("Error: {}", e));
let _ = self.write_fmt(format_args!("Error: {}", e));
}
} else {
self.set_output_stdout();
@@ -368,7 +377,7 @@ impl Limbo {
}
}
} else {
let _ = self.writeln(format!(
let _ = self.write_fmt(format_args!(
"Unknown command: {}\nenter: .help for all available commands",
args[0]
));
@@ -376,12 +385,7 @@ impl Limbo {
}
pub fn query(&mut self, sql: &str, interrupt_count: &Arc<AtomicUsize>) -> anyhow::Result<()> {
if self.conn.is_none() {
let _ = self.writeln("Error: No database file specified.");
return Ok(());
}
let conn = self.conn.as_ref().unwrap().clone();
match conn.query(sql) {
match self.conn.query(sql) {
Ok(Some(ref mut rows)) => match self.output_mode {
OutputMode::Raw => loop {
if interrupt_count.load(Ordering::SeqCst) > 0 {
@@ -451,13 +455,13 @@ impl Limbo {
}
Ok(RowResult::Done) => break,
Err(err) => {
let _ = self.writeln(format!("{}", err));
let _ = self.write_fmt(format_args!("{}", err));
break;
}
}
}
if let Ok(table) = table_rows.table().display() {
let _ = self.writeln(format!("{}", table));
let _ = self.write_fmt(format_args!("{}", table));
} else {
let _ = self.writeln("Error displaying table.");
}
@@ -465,11 +469,11 @@ impl Limbo {
},
Ok(None) => {}
Err(err) => {
let _ = self.writeln(format!("{}", err));
let _ = self.write_fmt(format_args!("{}", err));
}
}
// for now let's cache flush always
conn.cacheflush()?;
self.conn.cacheflush()?;
Ok(())
}
@@ -484,14 +488,14 @@ impl Limbo {
),
};
match self.conn.as_ref().unwrap().query(&sql) {
match self.conn.query(&sql) {
Ok(Some(ref mut rows)) => {
let mut found = false;
loop {
match rows.next_row()? {
RowResult::Row(row) => {
if let Some(Value::Text(schema)) = row.values.first() {
let _ = self.writeln(format!("{};", schema));
let _ = self.write_fmt(format_args!("{};", schema));
found = true;
}
}
@@ -503,7 +507,8 @@ impl Limbo {
}
if !found {
if let Some(table_name) = table {
let _ = self.writeln(format!("Error: Table '{}' not found.", table_name));
let _ = self
.write_fmt(format_args!("Error: Table '{}' not found.", table_name));
} else {
let _ = self.writeln("No tables or indexes found in the database.");
}

View File

@@ -3,8 +3,10 @@ mod opcodes_dictionary;
use clap::Parser;
use rustyline::{error::ReadlineError, DefaultEditor};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[allow(clippy::arc_with_non_send_sync)]
fn main() -> anyhow::Result<()> {
@@ -29,8 +31,6 @@ fn main() -> anyhow::Result<()> {
}
return Ok(());
}
println!("Limbo v{}", env!("CARGO_PKG_VERSION"));
println!("Enter \".help\" for usage hints.");
let mut rl = DefaultEditor::new()?;
let home = dirs::home_dir().expect("Could not determine home directory");
let history_file = home.join(".limbo_history");
@@ -50,7 +50,7 @@ fn main() -> anyhow::Result<()> {
// At prompt, increment interrupt count
if interrupt_count.fetch_add(1, Ordering::SeqCst) >= 1 {
eprintln!("Interrupted. Exiting...");
app.close_conn();
let _ = app.close_conn();
break;
}
println!("Use .quit to exit or press Ctrl-C again to force quit.");
@@ -58,11 +58,11 @@ fn main() -> anyhow::Result<()> {
continue;
}
Err(ReadlineError::Eof) => {
app.close_conn();
let _ = app.close_conn();
break;
}
Err(err) => {
app.close_conn();
let _ = app.close_conn();
anyhow::bail!(err)
}
}

182
core/io/memory.rs Normal file
View File

@@ -0,0 +1,182 @@
use super::{Buffer, Completion, File, OpenFlags, IO};
use crate::Result;
use std::{
cell::{RefCell, RefMut},
collections::BTreeMap,
rc::Rc,
sync::Arc,
};
pub struct MemoryIO {
pages: RefCell<BTreeMap<usize, MemPage>>,
size: RefCell<usize>,
}
// TODO: page size flag
const PAGE_SIZE: usize = 4096;
type MemPage = Box<[u8; PAGE_SIZE]>;
impl MemoryIO {
#[allow(clippy::arc_with_non_send_sync)]
pub fn new() -> Result<Arc<Self>> {
Ok(Arc::new(Self {
pages: RefCell::new(BTreeMap::new()),
size: RefCell::new(0),
}))
}
fn get_or_allocate_page(&self, page_no: usize) -> RefMut<MemPage> {
let pages = self.pages.borrow_mut();
RefMut::map(pages, |p| {
p.entry(page_no).or_insert_with(|| Box::new([0; PAGE_SIZE]))
})
}
fn get_page(&self, page_no: usize) -> Option<RefMut<MemPage>> {
match RefMut::filter_map(self.pages.borrow_mut(), |pages| pages.get_mut(&page_no)) {
Ok(page) => Some(page),
Err(_) => None,
}
}
}
impl IO for Arc<MemoryIO> {
fn open_file(&self, _path: &str, _flags: OpenFlags, _direct: bool) -> Result<Rc<dyn File>> {
Ok(Rc::new(MemoryFile {
io: Arc::clone(self),
}))
}
fn run_once(&self) -> Result<()> {
// nop
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_current_time(&self) -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}
}
pub struct MemoryFile {
io: Arc<MemoryIO>,
}
impl File for MemoryFile {
// no-ops
fn lock_file(&self, _exclusive: bool) -> Result<()> {
Ok(())
}
fn unlock_file(&self) -> Result<()> {
Ok(())
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let r = match &*c {
Completion::Read(r) => r,
_ => unreachable!(),
};
let buf_len = r.buf().len();
if buf_len == 0 {
c.complete(0);
return Ok(());
}
let file_size = *self.io.size.borrow();
if pos >= file_size {
c.complete(0);
return Ok(());
}
let read_len = buf_len.min(file_size - pos);
{
let mut read_buf = r.buf_mut();
let mut offset = pos;
let mut remaining = read_len;
let mut buf_offset = 0;
while remaining > 0 {
let page_no = offset / PAGE_SIZE;
let page_offset = offset % PAGE_SIZE;
let bytes_to_read = remaining.min(PAGE_SIZE - page_offset);
if let Some(page) = self.io.get_page(page_no) {
{
let page_data = &*page;
read_buf.as_mut_slice()[buf_offset..buf_offset + bytes_to_read]
.copy_from_slice(&page_data[page_offset..page_offset + bytes_to_read]);
}
} else {
for b in &mut read_buf.as_mut_slice()[buf_offset..buf_offset + bytes_to_read] {
*b = 0;
}
}
offset += bytes_to_read;
buf_offset += bytes_to_read;
remaining -= bytes_to_read;
}
}
c.complete(read_len as i32);
Ok(())
}
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()> {
let buf = buffer.borrow();
let buf_len = buf.len();
if buf_len == 0 {
c.complete(0);
return Ok(());
}
let mut offset = pos;
let mut remaining = buf_len;
let mut buf_offset = 0;
let data = &buf.as_slice();
while remaining > 0 {
let page_no = offset / PAGE_SIZE;
let page_offset = offset % PAGE_SIZE;
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
{
let mut page = self.io.get_or_allocate_page(page_no);
page[page_offset..page_offset + bytes_to_write]
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
}
offset += bytes_to_write;
buf_offset += bytes_to_write;
remaining -= bytes_to_write;
}
{
let mut size = self.io.size.borrow_mut();
*size = (*size).max(pos + buf_len);
}
c.complete(buf_len as i32);
Ok(())
}
fn sync(&self, c: Rc<Completion>) -> Result<()> {
// no-op
c.complete(0);
Ok(())
}
fn size(&self) -> Result<u64> {
Ok(*self.io.size.borrow() as u64)
}
}
impl Drop for MemoryFile {
fn drop(&mut self) {
// no-op
}
}

View File

@@ -185,4 +185,6 @@ cfg_block! {
}
}
mod memory;
pub use memory::MemoryIO;
mod common;

View File

@@ -42,7 +42,7 @@ pub type Result<T> = std::result::Result<T, error::LimboError>;
pub use io::OpenFlags;
#[cfg(feature = "fs")]
pub use io::PlatformIO;
pub use io::{Buffer, Completion, File, WriteCompletion, IO};
pub use io::{Buffer, Completion, File, MemoryIO, WriteCompletion, IO};
pub use storage::buffer_pool::BufferPool;
pub use storage::database::DatabaseStorage;
pub use storage::pager::Page;

150
testing/memory-repl.tcl Executable file
View File

@@ -0,0 +1,150 @@
#!/usr/bin/env tclsh
set sqlite_exec [expr {[info exists env(SQLITE_EXEC)] ? $env(SQLITE_EXEC) : "sqlite3"}]
proc start_sqlite_repl {sqlite_exec init_commands} {
set command [list $sqlite_exec ":memory:"]
set pipe [open "|[join $command]" RDWR]
puts $pipe $init_commands
flush $pipe
return $pipe
}
proc execute_sql {pipe sql} {
puts $pipe $sql
flush $pipe
puts $pipe "SELECT 'END_OF_RESULT';"
flush $pipe
set output ""
while {[gets $pipe line] >= 0} {
if {$line eq "END_OF_RESULT"} {
break
}
append output "$line\n"
}
return [string trim $output]
}
proc run_test {pipe sql expected_output} {
set actual_output [execute_sql $pipe $sql]
if {$actual_output ne $expected_output} {
puts "Test FAILED: '$sql'"
puts "returned '$actual_output'"
puts "expected '$expected_output'"
exit 1
}
}
proc do_execsql_test {pipe test_name sql expected_output} {
puts "Running test: $test_name"
run_test $pipe $sql $expected_output
}
set init_commands {
CREATE TABLE users (
id INTEGER PRIMARY KEY,
first_name TEXT,
last_name TEXT,
age INTEGER
);
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT,
price INTEGER
);
INSERT INTO users (id, first_name, last_name, age) VALUES
(1, 'Alice', 'Smith', 30), (2, 'Bob', 'Johnson', 25), (3, 'Charlie', 'Brown', 66), (4, 'David', 'Nichols', 70); INSERT INTO products (id, name, price) VALUES (1, 'Hat', 19.99), (2, 'Shirt', 29.99), (3, 'Shorts', 39.99), (4, 'Dress', 49.99);
CREATE TABLE t(x1, x2, x3, x4);
INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3), zeroblob(1024 - 4));
}
set pipe [start_sqlite_repl $sqlite_exec $init_commands]
do_execsql_test $pipe schema {
.schema
} {CREATE TABLE users (id INTEGER PRIMARY KEY, first_name TEXT, last_name TEXT, age INTEGER);
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
CREATE TABLE t (x1, x2, x3, x4);}
do_execsql_test $pipe "select-avg" {
SELECT avg(age) FROM users;
} {47.75}
do_execsql_test $pipe select-avg-text {
SELECT avg(first_name) FROM users;
} {0.0}
do_execsql_test $pipe select-sum {
SELECT sum(age) FROM users;
} {191}
do_execsql_test $pipe select-sum-text {
SELECT sum(first_name) FROM users;
} {0.0}
do_execsql_test $pipe select-total {
SELECT total(age) FROM users;
} {191.0}
do_execsql_test $pipe select-total-text {
SELECT total(first_name) FROM users WHERE id < 3;
} {0.0}
do_execsql_test $pipe select-limit {
SELECT typeof(id) FROM users LIMIT 1;
} {integer}
do_execsql_test $pipe select-count {
SELECT count(id) FROM users;
} {4}
do_execsql_test $pipe select-count {
SELECT count(*) FROM users;
} {4}
do_execsql_test $pipe select-count-constant-true {
SELECT count(*) FROM users WHERE true;
} {4}
do_execsql_test $pipe select-count-constant-false {
SELECT count(*) FROM users WHERE false;
} {0}
# **atrocious hack to test that we can open new connection
do_execsql_test $pipe test-open-new-db {
.open testing/testing.db
} {}
# now grab random tests from other areas and make sure we are querying that database
do_execsql_test $pipe schema-1 {
.schema users
} {CREATE TABLE users (
id INTEGER PRIMARY KEY,
first_name TEXT,
last_name TEXT,
email TEXT,
phone_number TEXT,
address TEXT,
city TEXT,
state TEXT,
zipcode TEXT,
age INTEGER
);
CREATE INDEX age_idx on users (age);}
do_execsql_test $pipe cross-join {
select * from users, products limit 1;
} {1|Jamie|Foster|dylan00@example.com|496-522-9493|62375 Johnson Rest Suite 322|West Lauriestad|IL|35865|94|1|hat|79.0}
do_execsql_test $pipe left-join-self {
select u1.first_name as user_name, u2.first_name as neighbor_name from users u1 left join users as u2 on u1.id = u2.id + 1 limit 2;
} {Jamie|
Cindy|Jamie}
do_execsql_test $pipe where-clause-eq-string {
select count(1) from users where last_name = 'Rodriguez';
} {61}
puts "All tests passed successfully."
close $pipe
exit 0