Merge pull request #44 from penberg/splittx

bindings: split transcation begin from insert/read
This commit is contained in:
Pekka Enberg
2023-05-15 15:48:34 +03:00
committed by GitHub
4 changed files with 54 additions and 13 deletions

View File

@@ -25,13 +25,21 @@ MVCCDatabaseRef MVCCDatabaseOpen(const char *path);
void MVCCDatabaseClose(MVCCDatabaseRef db);
uint64_t MVCCTransactionBegin(MVCCDatabaseRef db);
MVCCError MVCCTransactionCommit(MVCCDatabaseRef db, uint64_t tx_id);
MVCCError MVCCTransactionRollback(MVCCDatabaseRef db, uint64_t tx_id);
MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db,
uint64_t tx_id,
uint64_t table_id,
uint64_t row_id,
const void *value_ptr,
uintptr_t value_len);
MVCCError MVCCDatabaseRead(MVCCDatabaseRef db,
uint64_t tx_id,
uint64_t table_id,
uint64_t row_id,
uint8_t **value_ptr,
@@ -39,7 +47,7 @@ MVCCError MVCCDatabaseRead(MVCCDatabaseRef db,
void MVCCFreeStr(void *ptr);
MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t table_id);
MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t tx_id, uint64_t table_id);
void MVCCScanCursorClose(MVCCScanCursorRef cursor);

View File

@@ -55,9 +55,42 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) {
let _ = unsafe { Box::from_raw(db.get_ref_mut()) };
}
#[no_mangle]
pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
let tx_id = runtime.block_on(async move { db.begin_tx().await });
tracing::debug!("MVCCTransactionBegin: {tx_id}");
tx_id
}
#[no_mangle]
pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
tracing::debug!("MVCCTransactionCommit: {tx_id}");
match runtime.block_on(async move { db.commit_tx(tx_id).await }) {
Ok(()) => MVCCError::MVCC_OK,
Err(e) => {
tracing::error!("MVCCTransactionCommit: {e}");
MVCCError::MVCC_IO_ERROR_WRITE
}
}
}
#[no_mangle]
pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
tracing::debug!("MVCCTransactionRollback: {tx_id}");
runtime.block_on(async move { db.rollback_tx(tx_id).await });
MVCCError::MVCC_OK
}
#[no_mangle]
pub unsafe extern "C" fn MVCCDatabaseInsert(
db: MVCCDatabaseRef,
tx_id: u64,
table_id: u64,
row_id: u64,
value_ptr: *const std::ffi::c_void,
@@ -77,11 +110,7 @@ pub unsafe extern "C" fn MVCCDatabaseInsert(
let id = database::RowID { table_id, row_id };
let row = database::Row { id, data };
tracing::debug!("MVCCDatabaseInsert: {row:?}");
match runtime.block_on(async move {
let tx = db.begin_tx().await;
db.insert(tx, row).await?;
db.commit_tx(tx).await
}) {
match runtime.block_on(async move { db.insert(tx_id, row).await }) {
Ok(_) => {
tracing::debug!("MVCCDatabaseInsert: success");
MVCCError::MVCC_OK
@@ -96,6 +125,7 @@ pub unsafe extern "C" fn MVCCDatabaseInsert(
#[no_mangle]
pub unsafe extern "C" fn MVCCDatabaseRead(
db: MVCCDatabaseRef,
tx_id: u64,
table_id: u64,
row_id: u64,
value_ptr: *mut *mut u8,
@@ -105,9 +135,8 @@ pub unsafe extern "C" fn MVCCDatabaseRead(
let (db, runtime) = (&db.db, &db.runtime);
match runtime.block_on(async move {
let tx = db.begin_tx().await;
let id = database::RowID { table_id, row_id };
let maybe_row = db.read(tx, id).await?;
let maybe_row = db.read(tx_id, id).await?;
match maybe_row {
Some(row) => {
tracing::debug!("Found row {row:?}");
@@ -148,6 +177,7 @@ pub unsafe extern "C" fn MVCCFreeStr(ptr: *mut std::ffi::c_void) {
#[no_mangle]
pub unsafe extern "C" fn MVCCScanCursorOpen(
db: MVCCDatabaseRef,
tx_id: u64,
table_id: u64,
) -> MVCCScanCursorRef {
tracing::debug!("MVCCScanCursorOpen()");
@@ -156,7 +186,7 @@ pub unsafe extern "C" fn MVCCScanCursorOpen(
let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) };
let (database, runtime) = (&database.db, &database.runtime);
match runtime
.block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, table_id).await })
.block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, tx_id, table_id).await })
{
Ok(cursor) => {
if cursor.is_empty() {

View File

@@ -10,8 +10,11 @@ pub struct ScanCursor<'a, Clock: LogicalClock> {
}
impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
pub async fn new(db: &'a Database<Clock>, table_id: u64) -> Result<ScanCursor<'a, Clock>> {
let tx_id = db.begin_tx().await;
pub async fn new(
db: &'a Database<Clock>,
tx_id: u64,
table_id: u64,
) -> Result<ScanCursor<'a, Clock>> {
let row_ids = db.scan_row_ids_for_table(table_id).await?;
Ok(Self {
db,
@@ -37,7 +40,7 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
}
pub async fn close(self) -> Result<()> {
self.db.commit_tx(self.tx_id).await
Ok(())
}
pub fn forward(&mut self) -> bool {

View File

@@ -137,7 +137,7 @@ impl<Clock: LogicalClock> Database<Clock> {
rows: RefCell::new(BTreeMap::new()),
txs: RefCell::new(HashMap::new()),
tx_timestamps: RefCell::new(BTreeMap::new()),
tx_ids: AtomicU64::new(0),
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
clock,
storage,
};