From 5da87739fa06a2771631e6e61615b5a70cd98857 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 15 May 2023 14:32:22 +0200 Subject: [PATCH] bindings: split transcation begin from insert/read libSQL expects to be able to begin/commit a transaction independently of reading or inserting data. --- core/mvcc/bindings/c/include/mvcc.h | 10 ++++++- core/mvcc/bindings/c/src/lib.rs | 46 ++++++++++++++++++++++++----- core/mvcc/mvcc-rs/src/cursor.rs | 9 ++++-- core/mvcc/mvcc-rs/src/database.rs | 2 +- 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/core/mvcc/bindings/c/include/mvcc.h b/core/mvcc/bindings/c/include/mvcc.h index e0c432cd9..eead91b01 100644 --- a/core/mvcc/bindings/c/include/mvcc.h +++ b/core/mvcc/bindings/c/include/mvcc.h @@ -25,13 +25,21 @@ MVCCDatabaseRef MVCCDatabaseOpen(const char *path); void MVCCDatabaseClose(MVCCDatabaseRef db); +uint64_t MVCCTransactionBegin(MVCCDatabaseRef db); + +MVCCError MVCCTransactionCommit(MVCCDatabaseRef db, uint64_t tx_id); + +MVCCError MVCCTransactionRollback(MVCCDatabaseRef db, uint64_t tx_id); + MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db, + uint64_t tx_id, uint64_t table_id, uint64_t row_id, const void *value_ptr, uintptr_t value_len); MVCCError MVCCDatabaseRead(MVCCDatabaseRef db, + uint64_t tx_id, uint64_t table_id, uint64_t row_id, uint8_t **value_ptr, @@ -39,7 +47,7 @@ MVCCError MVCCDatabaseRead(MVCCDatabaseRef db, void MVCCFreeStr(void *ptr); -MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t table_id); +MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t tx_id, uint64_t table_id); void MVCCScanCursorClose(MVCCScanCursorRef cursor); diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index 18e08db0b..9a64ddee5 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -55,9 +55,42 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) { let _ = unsafe { Box::from_raw(db.get_ref_mut()) }; } +#[no_mangle] +pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 { + let db = db.get_ref(); + let (db, runtime) = (&db.db, &db.runtime); + let tx_id = runtime.block_on(async move { db.begin_tx().await }); + tracing::debug!("MVCCTransactionBegin: {tx_id}"); + tx_id +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { + let db = db.get_ref(); + let (db, runtime) = (&db.db, &db.runtime); + tracing::debug!("MVCCTransactionCommit: {tx_id}"); + match runtime.block_on(async move { db.commit_tx(tx_id).await }) { + Ok(()) => MVCCError::MVCC_OK, + Err(e) => { + tracing::error!("MVCCTransactionCommit: {e}"); + MVCCError::MVCC_IO_ERROR_WRITE + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { + let db = db.get_ref(); + let (db, runtime) = (&db.db, &db.runtime); + tracing::debug!("MVCCTransactionRollback: {tx_id}"); + runtime.block_on(async move { db.rollback_tx(tx_id).await }); + MVCCError::MVCC_OK +} + #[no_mangle] pub unsafe extern "C" fn MVCCDatabaseInsert( db: MVCCDatabaseRef, + tx_id: u64, table_id: u64, row_id: u64, value_ptr: *const std::ffi::c_void, @@ -77,11 +110,7 @@ pub unsafe extern "C" fn MVCCDatabaseInsert( let id = database::RowID { table_id, row_id }; let row = database::Row { id, data }; tracing::debug!("MVCCDatabaseInsert: {row:?}"); - match runtime.block_on(async move { - let tx = db.begin_tx().await; - db.insert(tx, row).await?; - db.commit_tx(tx).await - }) { + match runtime.block_on(async move { db.insert(tx_id, row).await }) { Ok(_) => { tracing::debug!("MVCCDatabaseInsert: success"); MVCCError::MVCC_OK @@ -96,6 +125,7 @@ pub unsafe extern "C" fn MVCCDatabaseInsert( #[no_mangle] pub unsafe extern "C" fn MVCCDatabaseRead( db: MVCCDatabaseRef, + tx_id: u64, table_id: u64, row_id: u64, value_ptr: *mut *mut u8, @@ -105,9 +135,8 @@ pub unsafe extern "C" fn MVCCDatabaseRead( let (db, runtime) = (&db.db, &db.runtime); match runtime.block_on(async move { - let tx = db.begin_tx().await; let id = database::RowID { table_id, row_id }; - let maybe_row = db.read(tx, id).await?; + let maybe_row = db.read(tx_id, id).await?; match maybe_row { Some(row) => { tracing::debug!("Found row {row:?}"); @@ -148,6 +177,7 @@ pub unsafe extern "C" fn MVCCFreeStr(ptr: *mut std::ffi::c_void) { #[no_mangle] pub unsafe extern "C" fn MVCCScanCursorOpen( db: MVCCDatabaseRef, + tx_id: u64, table_id: u64, ) -> MVCCScanCursorRef { tracing::debug!("MVCCScanCursorOpen()"); @@ -156,7 +186,7 @@ pub unsafe extern "C" fn MVCCScanCursorOpen( let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) }; let (database, runtime) = (&database.db, &database.runtime); match runtime - .block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, table_id).await }) + .block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, tx_id, table_id).await }) { Ok(cursor) => { if cursor.is_empty() { diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs index e289093ff..1c761f663 100644 --- a/core/mvcc/mvcc-rs/src/cursor.rs +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -10,8 +10,11 @@ pub struct ScanCursor<'a, Clock: LogicalClock> { } impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { - pub async fn new(db: &'a Database, table_id: u64) -> Result> { - let tx_id = db.begin_tx().await; + pub async fn new( + db: &'a Database, + tx_id: u64, + table_id: u64, + ) -> Result> { let row_ids = db.scan_row_ids_for_table(table_id).await?; Ok(Self { db, @@ -37,7 +40,7 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { } pub async fn close(self) -> Result<()> { - self.db.commit_tx(self.tx_id).await + Ok(()) } pub fn forward(&mut self) -> bool { diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index 8d6521756..1c885be6e 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -137,7 +137,7 @@ impl Database { rows: RefCell::new(BTreeMap::new()), txs: RefCell::new(HashMap::new()), tx_timestamps: RefCell::new(BTreeMap::new()), - tx_ids: AtomicU64::new(0), + tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes clock, storage, };