Convert u64 rowid to i64

Rowids can be negative, therefore let's swap to i64
This commit is contained in:
Pere Diaz Bou
2025-05-30 13:07:05 +02:00
parent 0fbff54b90
commit da4190a23e
10 changed files with 75 additions and 64 deletions

View File

@@ -331,7 +331,7 @@ pub struct Connection {
auto_commit: Cell<bool>,
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
transaction_state: Cell<TransactionState>,
last_insert_rowid: Cell<u64>,
last_insert_rowid: Cell<i64>,
last_change: Cell<i64>,
total_changes: Cell<i64>,
syms: RefCell<SymbolTable>,
@@ -550,11 +550,11 @@ impl Connection {
}
}
pub fn last_insert_rowid(&self) -> u64 {
pub fn last_insert_rowid(&self) -> i64 {
self.last_insert_rowid.get()
}
fn update_last_rowid(&self, rowid: u64) {
fn update_last_rowid(&self, rowid: i64) {
self.last_insert_rowid.set(rowid);
}

View File

@@ -14,11 +14,11 @@ mod tests;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RowID {
pub table_id: u64,
pub row_id: u64,
pub row_id: i64,
}
impl RowID {
pub fn new(table_id: u64, row_id: u64) -> Self {
pub fn new(table_id: u64, row_id: i64) -> Self {
Self { table_id, row_id }
}
}
@@ -415,7 +415,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
row_id: 0,
}..RowID {
table_id,
row_id: u64::MAX,
row_id: i64::MAX,
},
)
.map(|entry| *entry.key())
@@ -425,7 +425,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn get_row_id_range(
&self,
table_id: u64,
start: u64,
start: i64,
bucket: &mut Vec<RowID>,
max_items: u64,
) -> Result<()> {
@@ -441,7 +441,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let end_id = RowID {
table_id,
row_id: u64::MAX,
row_id: i64::MAX,
};
self.rows
@@ -452,7 +452,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
Ok(())
}
pub fn get_next_row_id_for_table(&self, table_id: u64, start: u64) -> Option<RowID> {
pub fn get_next_row_id_for_table(&self, table_id: u64, start: i64) -> Option<RowID> {
tracing::trace!(
"getting_next_id_for_table(table_id={}, range_start={})",
table_id,
@@ -465,7 +465,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let max_bound = RowID {
table_id,
row_id: u64::MAX,
row_id: i64::MAX,
};
self.rows

View File

@@ -44,11 +44,11 @@ pub use database::MvStore;
mod tests {
use crate::mvcc::clock::LocalClock;
use crate::mvcc::database::{MvStore, Row, RowID};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
static IDS: AtomicU64 = AtomicU64::new(1);
static IDS: AtomicI64 = AtomicI64::new(1);
#[test]
fn test_non_overlapping_concurrent_inserts() {

View File

@@ -156,7 +156,7 @@ struct DestroyInfo {
#[derive(Debug, Clone)]
enum DeleteSavepoint {
Rowid(u64),
Rowid(i64),
Payload(ImmutableRecord),
}
@@ -230,14 +230,14 @@ enum PayloadOverflowWithOffset {
#[derive(Clone, Debug)]
pub enum BTreeKey<'a> {
TableRowId((u64, Option<&'a ImmutableRecord>)),
TableRowId((i64, Option<&'a ImmutableRecord>)),
IndexKey(&'a ImmutableRecord),
}
impl BTreeKey<'_> {
/// Create a new table rowid key from a rowid and an optional immutable record.
/// The record is optional because it may not be available when the key is created.
pub fn new_table_rowid(rowid: u64, record: Option<&ImmutableRecord>) -> BTreeKey<'_> {
pub fn new_table_rowid(rowid: i64, record: Option<&ImmutableRecord>) -> BTreeKey<'_> {
BTreeKey::TableRowId((rowid, record))
}
@@ -255,7 +255,7 @@ impl BTreeKey<'_> {
}
/// Get the rowid, if present. Index will never be present.
fn maybe_rowid(&self) -> Option<u64> {
fn maybe_rowid(&self) -> Option<i64> {
match self {
BTreeKey::TableRowId((rowid, _)) => Some(*rowid),
BTreeKey::IndexKey(_) => None,
@@ -263,7 +263,7 @@ impl BTreeKey<'_> {
}
/// Assert that the key is an integer rowid and return it.
fn to_rowid(&self) -> u64 {
fn to_rowid(&self) -> i64 {
match self {
BTreeKey::TableRowId((rowid, _)) => *rowid,
BTreeKey::IndexKey(_) => panic!("BTreeKey::to_rowid called on IndexKey"),
@@ -313,7 +313,7 @@ impl WriteInfo {
#[derive(Debug, Clone, Copy, PartialEq)]
enum CursorHasRecord {
Yes {
rowid: Option<u64>, // not all indexes and btrees have rowids, so this is optional.
rowid: Option<i64>, // not all indexes and btrees have rowids, so this is optional.
},
No,
}
@@ -380,7 +380,7 @@ enum OverflowState {
/// Holds a Record or RowId, so that these can be transformed into a SeekKey to restore
/// cursor position to its previous location.
enum CursorContext {
TableRowId(u64),
TableRowId(i64),
/// If we are in an index tree we can then reuse this field to save
/// our cursor information
@@ -496,12 +496,12 @@ impl BTreeCursor {
}
}
pub fn get_index_rowid_from_record(&self) -> Option<u64> {
pub fn get_index_rowid_from_record(&self) -> Option<i64> {
if !self.has_rowid() {
return None;
}
let rowid = match self.get_immutable_record().as_ref().unwrap().last_value() {
Some(RefValue::Integer(rowid)) => *rowid as u64,
Some(RefValue::Integer(rowid)) => *rowid as i64,
_ => unreachable!(
"index where has_rowid() is true should have an integer rowid as the last value"
),
@@ -1450,7 +1450,7 @@ impl BTreeCursor {
}
/// Specialized version of move_to() for table btrees.
fn tablebtree_move_to(&mut self, rowid: u64, seek_op: SeekOp) -> Result<CursorResult<()>> {
fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result<CursorResult<()>> {
let iter_dir = seek_op.iteration_direction();
'outer: loop {
let page = self.stack.top();
@@ -1706,7 +1706,7 @@ impl BTreeCursor {
/// of iterating cells in order.
fn tablebtree_seek(
&mut self,
rowid: u64,
rowid: i64,
seek_op: SeekOp,
) -> Result<CursorResult<CursorHasRecord>> {
assert!(self.mv_cursor.is_none());
@@ -2969,7 +2969,7 @@ impl BTreeCursor {
let (rowid, _) = read_varint(&divider_cell[n_bytes_payload..])?;
new_divider_cell
.extend_from_slice(&(page.get().get().id as u32).to_be_bytes());
write_varint_to_vec(rowid, &mut new_divider_cell);
write_varint_to_vec(rowid as u64, &mut new_divider_cell);
} else {
// Leaf index
new_divider_cell
@@ -3937,7 +3937,7 @@ impl BTreeCursor {
}
}
pub fn rowid(&self) -> Result<Option<u64>> {
pub fn rowid(&self) -> Result<Option<i64>> {
if let Some(mv_cursor) = &self.mv_cursor {
let mv_cursor = mv_cursor.borrow();
return Ok(mv_cursor.current_row_id().map(|rowid| rowid.row_id));
@@ -4194,7 +4194,7 @@ impl BTreeCursor {
match predecessor_cell {
BTreeCell::TableLeafCell(leaf_cell) => {
cell_payload.extend_from_slice(&child_pointer.to_be_bytes());
write_varint_to_vec(leaf_cell._rowid, &mut cell_payload);
write_varint_to_vec(leaf_cell._rowid as u64, &mut cell_payload);
}
BTreeCell::IndexLeafCell(leaf_cell) => {
cell_payload.extend_from_slice(&child_pointer.to_be_bytes());
@@ -4356,7 +4356,7 @@ impl BTreeCursor {
Value::Integer(i) => i,
_ => unreachable!("btree tables are indexed by integers!"),
};
let _ = return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
let _ = return_if_io!(self.move_to(SeekKey::TableRowId(*int_key), SeekOp::EQ));
let page = self.stack.top();
// TODO(pere): request load
return_if_locked!(page.get());
@@ -4366,7 +4366,7 @@ impl BTreeCursor {
// find cell
let int_key = match key {
Value::Integer(i) => *i as u64,
Value::Integer(i) => *i,
_ => unreachable!("btree tables are indexed by integers!"),
};
let cell_idx = self.find_cell(contents, &BTreeKey::new_table_rowid(int_key, None));
@@ -5780,7 +5780,7 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -
/// If the record is too large to fit in the cell, it will spill onto overflow pages.
fn fill_cell_payload(
page_type: PageType,
int_key: Option<u64>,
int_key: Option<i64>,
cell_payload: &mut Vec<u8>,
record: &ImmutableRecord,
usable_space: u16,
@@ -5797,7 +5797,7 @@ fn fill_cell_payload(
if matches!(page_type, PageType::TableLeaf) {
let int_key = int_key.unwrap();
write_varint_to_vec(record_buf.len() as u64, cell_payload);
write_varint_to_vec(int_key, cell_payload);
write_varint_to_vec(int_key as u64, cell_payload);
} else {
write_varint_to_vec(record_buf.len() as u64, cell_payload);
}
@@ -6043,7 +6043,7 @@ mod tests {
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.page_type(),
Some(id as u64),
Some(id as i64),
&mut payload,
&record,
4096,
@@ -6467,7 +6467,7 @@ mod tests {
);
run_until_done(
|| {
let key = SeekKey::TableRowId(key as u64);
let key = SeekKey::TableRowId(key);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
@@ -6481,7 +6481,7 @@ mod tests {
"".to_string()
};
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(key as u64, Some(&value)), true),
|| cursor.insert(&BTreeKey::new_table_rowid(key, Some(&value)), true),
pager.deref(),
)
.unwrap();
@@ -6503,7 +6503,7 @@ mod tests {
tracing::trace!("seeking key: {}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
let cursor_rowid = cursor.rowid().unwrap().unwrap();
if *key as u64 != cursor_rowid {
if *key != cursor_rowid {
valid = false;
println!("key {} is not found, got {}", key, cursor_rowid);
break;
@@ -6535,7 +6535,7 @@ mod tests {
run_until_done(|| cursor.next(), pager.deref()).unwrap();
let cursor_rowid = cursor.rowid().unwrap().unwrap();
assert_eq!(
*key as u64, cursor_rowid,
*key, cursor_rowid,
"key {} is not found, got {}",
key, cursor_rowid
);
@@ -7152,7 +7152,7 @@ mod tests {
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.page_type(),
Some(i as u64),
Some(i as i64),
&mut payload,
&record,
4096,
@@ -7231,7 +7231,7 @@ mod tests {
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.page_type(),
Some(i as u64),
Some(i),
&mut payload,
&record,
4096,
@@ -7244,7 +7244,10 @@ mod tests {
insert_into_cell(page, &payload, cell_idx, 4096).unwrap();
assert!(page.overflow_cells.is_empty());
total_size += payload.len() as u16 + 2;
cells.push(Cell { pos: i, payload });
cells.push(Cell {
pos: i as usize,
payload,
});
}
1 => {
if page.cell_count() == 0 {
@@ -7630,14 +7633,14 @@ mod tests {
tracing::trace!("before insert {}", i);
run_until_done(
|| {
let key = SeekKey::TableRowId(i as u64);
let key = SeekKey::TableRowId(i);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
)
.unwrap();
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(i as u64, Some(&value)), true),
|| cursor.insert(&BTreeKey::new_table_rowid(i, Some(&value)), true),
pager.deref(),
)
.unwrap();
@@ -7707,7 +7710,7 @@ mod tests {
run_until_done(
|| {
let key = SeekKey::TableRowId(i as u64);
let key = SeekKey::TableRowId(i);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
@@ -7715,7 +7718,7 @@ mod tests {
.unwrap();
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(i as u64, Some(&value)), true),
|| cursor.insert(&BTreeKey::new_table_rowid(i, Some(&value)), true),
pager.deref(),
)
.unwrap();
@@ -7729,7 +7732,7 @@ mod tests {
// Delete records with 500 <= key <= 3500
for i in 500..=3500 {
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page);
let seek_key = SeekKey::TableRowId(i as u64);
let seek_key = SeekKey::TableRowId(i);
let found = run_until_done(|| cursor.seek(seek_key.clone(), SeekOp::EQ), pager.deref())
.unwrap();
@@ -7788,14 +7791,14 @@ mod tests {
);
run_until_done(
|| {
let key = SeekKey::TableRowId(i as u64);
let key = SeekKey::TableRowId(i as i64);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
)
.unwrap();
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(i as u64, Some(&value)), true),
|| cursor.insert(&BTreeKey::new_table_rowid(i as i64, Some(&value)), true),
pager.deref(),
)
.unwrap();
@@ -7812,7 +7815,7 @@ mod tests {
else {
panic!("expected Some(rowid) but got {:?}", cursor.has_record.get());
};
assert_eq!(rowid, i as u64, "got!=expected");
assert_eq!(rowid, i as i64, "got!=expected");
}
}
@@ -8096,7 +8099,7 @@ mod tests {
]))]);
fill_cell_payload(
page_type,
Some(i),
Some(i as i64),
&mut payload,
&record,
pager.usable_space() as u16,

View File

@@ -642,7 +642,7 @@ impl PageContent {
/// Read the rowid of a table interior cell.
#[inline(always)]
pub fn cell_table_interior_read_rowid(&self, idx: usize) -> Result<u64> {
pub fn cell_table_interior_read_rowid(&self, idx: usize) -> Result<i64> {
debug_assert!(self.page_type() == PageType::TableInterior);
let buf = self.as_ptr();
const INTERIOR_PAGE_HEADER_SIZE_BYTES: usize = 12;
@@ -651,7 +651,7 @@ impl PageContent {
let cell_pointer = self.read_u16(cell_pointer) as usize;
const LEFT_CHILD_PAGE_SIZE_BYTES: usize = 4;
let (rowid, _) = read_varint(&buf[cell_pointer + LEFT_CHILD_PAGE_SIZE_BYTES..])?;
Ok(rowid)
Ok(rowid as i64)
}
/// Read the left child page of a table interior cell.
@@ -673,7 +673,7 @@ impl PageContent {
/// Read the rowid of a table leaf cell.
#[inline(always)]
pub fn cell_table_leaf_read_rowid(&self, idx: usize) -> Result<u64> {
pub fn cell_table_leaf_read_rowid(&self, idx: usize) -> Result<i64> {
debug_assert!(self.page_type() == PageType::TableLeaf);
let buf = self.as_ptr();
const LEAF_PAGE_HEADER_SIZE_BYTES: usize = 8;
@@ -684,7 +684,7 @@ impl PageContent {
let (_, nr) = read_varint(&buf[pos..])?;
pos += nr;
let (rowid, _) = read_varint(&buf[pos..])?;
Ok(rowid)
Ok(rowid as i64)
}
/// The cell pointer array of a b-tree page immediately follows the b-tree page header.
@@ -911,12 +911,12 @@ pub enum BTreeCell {
#[derive(Debug, Clone)]
pub struct TableInteriorCell {
pub _left_child_page: u32,
pub _rowid: u64,
pub _rowid: i64,
}
#[derive(Debug, Clone)]
pub struct TableLeafCell {
pub _rowid: u64,
pub _rowid: i64,
/// Payload of cell, if it overflows it won't include overflowed payload.
pub _payload: &'static [u8],
/// This is the complete payload size including overflow pages.
@@ -981,7 +981,7 @@ pub fn read_btree_cell(
let (rowid, _) = read_varint(&page[pos..])?;
Ok(BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page: left_child_page,
_rowid: rowid,
_rowid: rowid as i64,
}))
}
PageType::IndexLeaf => {
@@ -1015,7 +1015,7 @@ pub fn read_btree_cell(
let (payload, first_overflow_page) =
read_payload(&page[pos..pos + to_read], payload_size as usize);
Ok(BTreeCell::TableLeafCell(TableLeafCell {
_rowid: rowid,
_rowid: rowid as i64,
_payload: payload,
first_overflow_page,
payload_size,

View File

@@ -1486,7 +1486,7 @@ impl SeekOp {
#[derive(Clone, PartialEq, Debug)]
pub enum SeekKey<'a> {
TableRowId(u64),
TableRowId(i64),
IndexKey(&'a ImmutableRecord),
}

View File

@@ -1189,7 +1189,7 @@ pub fn op_vupdate(
if *conflict_action == 5 {
// ResolveType::Replace
if let Some(conn) = program.connection.upgrade() {
conn.update_last_rowid(new_rowid as u64);
conn.update_last_rowid(new_rowid);
}
}
state.pc += 1;
@@ -1934,7 +1934,7 @@ pub fn op_row_id(
let record = record.as_ref().unwrap();
let rowid = record.get_values().last().unwrap();
match rowid {
RefValue::Integer(rowid) => *rowid as u64,
RefValue::Integer(rowid) => *rowid,
_ => unreachable!(),
}
};
@@ -2019,7 +2019,7 @@ pub fn op_seek_rowid(
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let rowid = match state.registers[*src_reg].get_owned_value() {
Value::Integer(rowid) => Some(*rowid as u64),
Value::Integer(rowid) => Some(*rowid),
Value::Null => None,
other => {
return Err(LimboError::InternalError(format!(
@@ -2143,7 +2143,7 @@ pub fn op_seek(
return_if_io!(cursor.rewind());
None
}
Value::Integer(rowid) => Some(*rowid as u64),
Value::Integer(rowid) => Some(*rowid),
_ => {
return Err(LimboError::InternalError(format!(
"{}: the value in the register is not an integer",
@@ -3865,7 +3865,7 @@ pub fn op_insert(
// NOTE(pere): Sending moved_before == true is okay because we moved before but
// if we were to set to false after starting a balance procedure, it might
// leave undefined state.
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key as u64, Some(record)), true));
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key, Some(record)), true));
// Only update last_insert_rowid for regular table inserts, not schema modifications
if cursor.root_page() != 1 {
if let Some(rowid) = cursor.rowid()? {

View File

@@ -535,7 +535,7 @@ fn get_new_rowid<R: Rng>(cursor: &mut BTreeCursor, mut rng: R) -> Result<CursorR
.rowid()?
.unwrap_or(0) // if BTree is empty - use 0 as initial value for rowid
.checked_add(1) // add 1 but be careful with overflows
.unwrap_or(u64::MAX); // in case of overflow - use u64::MAX
.unwrap_or(i64::MAX); // in case of overflow - use u64::MAX
if rowid > i64::MAX.try_into().unwrap() {
let distribution = Uniform::from(1..=i64::MAX);
let max_attempts = 100;

View File

@@ -306,3 +306,11 @@ do_execsql_test_on_specific_db {:memory:} insert_from_select_union {
SELECT * FROM t;
} {1|100
2|200}
do_execsql_test_on_specific_db {:memory:} negative-primary-integer-key {
CREATE TABLE t(a INTEGER PRIMARY KEY);
insert into t values (-2),(13);
select * from t order by a asc;
} {-2
13}

View File

@@ -122,7 +122,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
}
}
assert_eq!(rowids.len(), 2);
assert!(rowids[0] > 0);
assert!(rowids[1] == -1);
assert!(rowids[0] == -1);
assert!(rowids[1] == 0);
Ok(())
}