core: change page idx type from usize to i64

MVCC is like the annoying younger cousin (I know because I was him) that
needs to be treated differently. MVCC requires us to use root_pages that
might not be allocated yet, and the plan is to use negative root_pages
for that case. Therefore, we need i64 in order to fit this change.
This commit is contained in:
Pere Diaz Bou
2025-09-29 18:38:43 +02:00
parent fda1b89540
commit 0f631101df
31 changed files with 308 additions and 317 deletions

View File

@@ -123,12 +123,12 @@ pub enum CompletionError {
#[error("Completion was aborted")]
Aborted,
#[error("Decryption failed for page={page_idx}")]
DecryptionError { page_idx: usize },
DecryptionError { page_idx: i64 },
#[error("I/O error: partial write")]
ShortWrite,
#[error("Checksum mismatch on page {page_id}: expected {expected}, got {actual}")]
ChecksumMismatch {
page_id: usize,
page_id: i64,
expected: u64,
actual: u64,
},

View File

@@ -247,7 +247,7 @@ pub enum AggregateEvalState {
#[derive(Debug)]
pub struct AggregateOperator {
// Unique operator ID for indexing in persistent storage
pub operator_id: usize,
pub operator_id: i64,
// GROUP BY column indices
group_by: Vec<usize>,
// Aggregate functions to compute (including MIN/MAX)
@@ -796,7 +796,7 @@ impl AggregateState {
impl AggregateOperator {
pub fn new(
operator_id: usize,
operator_id: i64,
group_by: Vec<usize>,
aggregates: Vec<AggregateFunction>,
input_column_names: Vec<String>,
@@ -1765,7 +1765,7 @@ impl MinMaxPersistState {
pub fn persist_min_max(
&mut self,
operator_id: usize,
operator_id: i64,
column_min_max: &HashMap<usize, AggColumnInfo>,
cursors: &mut DbspStateCursors,
generate_group_hash: impl Fn(&str) -> Hash128,

View File

@@ -208,7 +208,7 @@ pub enum ExecuteState {
/// Processing multiple inputs (for recursive node processing)
ProcessingInputs {
/// Collection of (node_id, state) pairs to process
input_states: Vec<(usize, ExecuteState)>,
input_states: Vec<(i64, ExecuteState)>,
/// Current index being processed
current_index: usize,
/// Collected deltas from processed inputs
@@ -320,11 +320,11 @@ pub enum DbspExpr {
/// A node in the DBSP circuit DAG
pub struct DbspNode {
/// Unique identifier for this node
pub id: usize,
pub id: i64,
/// The operator metadata
pub operator: DbspOperator,
/// Input nodes (edges in the DAG)
pub inputs: Vec<usize>,
pub inputs: Vec<i64>,
/// The actual executable operator
pub executable: Box<dyn IncrementalOperator>,
}
@@ -376,11 +376,11 @@ pub const DBSP_CIRCUIT_VERSION: u32 = 1;
#[derive(Debug)]
pub struct DbspCircuit {
/// All nodes in the circuit, indexed by their ID
pub(super) nodes: HashMap<usize, DbspNode>,
pub(super) nodes: HashMap<i64, DbspNode>,
/// Counter for generating unique node IDs
next_id: usize,
next_id: i64,
/// Root node ID (the final output)
pub(super) root: Option<usize>,
pub(super) root: Option<i64>,
/// Output schema of the circuit (schema of the root node)
pub(super) output_schema: SchemaRef,
@@ -388,20 +388,20 @@ pub struct DbspCircuit {
commit_state: CommitState,
/// Root page for the main materialized view data
pub(super) main_data_root: usize,
pub(super) main_data_root: i64,
/// Root page for internal DBSP state table
pub(super) internal_state_root: usize,
pub(super) internal_state_root: i64,
/// Root page for the DBSP state table's primary key index
pub(super) internal_state_index_root: usize,
pub(super) internal_state_index_root: i64,
}
impl DbspCircuit {
/// Create a new empty circuit with initial empty schema
/// The actual output schema will be set when the root node is established
pub fn new(
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
main_data_root: i64,
internal_state_root: i64,
internal_state_index_root: i64,
) -> Self {
// Start with an empty schema - will be updated when root is set
let empty_schema = Arc::new(LogicalSchema::new(vec![]));
@@ -418,7 +418,7 @@ impl DbspCircuit {
}
/// Set the root node and update the output schema
fn set_root(&mut self, root_id: usize, schema: SchemaRef) {
fn set_root(&mut self, root_id: i64, schema: SchemaRef) {
self.root = Some(root_id);
self.output_schema = schema;
}
@@ -428,9 +428,9 @@ impl DbspCircuit {
fn add_node(
&mut self,
operator: DbspOperator,
inputs: Vec<usize>,
inputs: Vec<i64>,
executable: Box<dyn IncrementalOperator>,
) -> usize {
) -> i64 {
let id = self.next_id;
self.next_id += 1;
@@ -655,7 +655,7 @@ impl DbspCircuit {
/// Execute a specific node in the circuit
fn execute_node(
&mut self,
node_id: usize,
node_id: i64,
pager: Arc<Pager>,
execute_state: &mut ExecuteState,
commit_operators: bool,
@@ -688,7 +688,7 @@ impl DbspCircuit {
let input_data = std::mem::take(input_data);
let input_node_ids = node.inputs.clone();
let input_states: Vec<(usize, ExecuteState)> = input_node_ids
let input_states: Vec<(i64, ExecuteState)> = input_node_ids
.iter()
.map(|&input_id| {
(
@@ -783,7 +783,7 @@ impl Display for DbspCircuit {
}
impl DbspCircuit {
fn fmt_node(&self, f: &mut Formatter, node_id: usize, depth: usize) -> fmt::Result {
fn fmt_node(&self, f: &mut Formatter, node_id: i64, depth: usize) -> fmt::Result {
let indent = " ".repeat(depth);
if let Some(node) = self.nodes.get(&node_id) {
match &node.operator {
@@ -838,9 +838,9 @@ pub struct DbspCompiler {
impl DbspCompiler {
/// Create a new DBSP compiler
pub fn new(
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
main_data_root: i64,
internal_state_root: i64,
internal_state_index_root: i64,
) -> Self {
Self {
circuit: DbspCircuit::new(
@@ -860,7 +860,7 @@ impl DbspCompiler {
}
/// Recursively compile a logical plan node
fn compile_plan(&mut self, plan: &LogicalPlan) -> Result<usize> {
fn compile_plan(&mut self, plan: &LogicalPlan) -> Result<i64> {
match plan {
LogicalPlan::Projection(proj) => {
// Compile the input first
@@ -1403,7 +1403,7 @@ impl DbspCompiler {
}
/// Compile a UNION operator
fn compile_union(&mut self, union: &crate::translate::logical::Union) -> Result<usize> {
fn compile_union(&mut self, union: &crate::translate::logical::Union) -> Result<i64> {
if union.inputs.len() != 2 {
return Err(LimboError::ParseError(format!(
"UNION requires exactly 2 inputs, got {}",
@@ -2483,7 +2483,7 @@ mod tests {
}};
}
fn setup_btree_for_circuit() -> (Arc<Pager>, usize, usize, usize) {
fn setup_btree_for_circuit() -> (Arc<Pager>, i64, i64, i64) {
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap();
let conn = db.connect().unwrap();
@@ -2494,17 +2494,17 @@ mod tests {
let main_root_page = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_table()))
.unwrap() as usize;
.unwrap() as i64;
let dbsp_state_page = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_table()))
.unwrap() as usize;
.unwrap() as i64;
let dbsp_state_index_page = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
.unwrap() as usize;
.unwrap() as i64;
(
pager,

View File

@@ -346,7 +346,7 @@ enum JoinCommitState {
#[derive(Debug)]
pub struct JoinOperator {
/// Unique operator ID for indexing in persistent storage
operator_id: usize,
operator_id: i64,
/// Type of join to perform
join_type: JoinType,
/// Column indices for extracting join keys from left input
@@ -365,7 +365,7 @@ pub struct JoinOperator {
impl JoinOperator {
pub fn new(
operator_id: usize,
operator_id: i64,
join_type: JoinType,
left_key_indices: Vec<usize>,
right_key_indices: Vec<usize>,

View File

@@ -28,7 +28,7 @@ pub enum UnionMode {
/// Handles both recursive CTEs and UNION/UNION ALL operations
#[derive(Debug)]
pub struct MergeOperator {
operator_id: usize,
operator_id: i64,
union_mode: UnionMode,
/// For UNION: tracks seen value hashes with their assigned rowids
/// For UNION ALL: tracks (source_id, original_rowid) -> assigned_rowid mappings
@@ -39,7 +39,7 @@ pub struct MergeOperator {
impl MergeOperator {
/// Create a new merge operator with specified union mode
pub fn new(operator_id: usize, mode: UnionMode) -> Self {
pub fn new(operator_id: i64, mode: UnionMode) -> Self {
Self {
operator_id,
union_mode: mode,

View File

@@ -38,7 +38,7 @@ impl DbspStateCursors {
/// Create an index definition for the DBSP state table
/// This defines the primary key index on (operator_id, zset_id, element_id)
pub fn create_dbsp_state_index(root_page: usize) -> Index {
pub fn create_dbsp_state_index(root_page: i64) -> Index {
Index {
name: "dbsp_state_pk".to_string(),
table_name: "dbsp_state".to_string(),
@@ -79,11 +79,11 @@ pub fn create_dbsp_state_index(root_page: usize) -> Index {
/// - Bits 16-63 (48 bits): operator_id
/// - Bits 2-15 (14 bits): column_index (supports up to 16,384 columns)
/// - Bits 0-1 (2 bits): operation type (AGG_TYPE_REGULAR, AGG_TYPE_MINMAX, etc.)
pub fn generate_storage_id(operator_id: usize, column_index: usize, op_type: u8) -> i64 {
pub fn generate_storage_id(operator_id: i64, column_index: usize, op_type: u8) -> i64 {
assert!(op_type <= 3, "Invalid operation type");
assert!(column_index < 16384, "Column index too large");
((operator_id as i64) << 16) | ((column_index as i64) << 2) | (op_type as i64)
((operator_id) << 16) | ((column_index as i64) << 2) | (op_type as i64)
}
// Generic eval state that delegates to operator-specific states
@@ -262,7 +262,7 @@ mod tests {
use std::sync::{Arc, Mutex};
/// Create a test pager for operator tests with both table and index
fn create_test_pager() -> (std::sync::Arc<crate::Pager>, usize, usize) {
fn create_test_pager() -> (std::sync::Arc<crate::Pager>, i64, i64) {
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap();
let conn = db.connect().unwrap();
@@ -277,14 +277,14 @@ mod tests {
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_table()))
.expect("Failed to create BTree for aggregate state table")
as usize;
as i64;
// Create a BTree for the index
let index_root_page_id = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
.expect("Failed to create BTree for aggregate state index")
as usize;
as i64;
(pager, table_root_page_id, index_root_page_id)
}

View File

@@ -207,7 +207,7 @@ pub struct IncrementalView {
#[cfg_attr(not(test), allow(dead_code))]
pub tracker: Arc<Mutex<ComputationTracker>>,
// Root page of the btree storing the materialized state (0 for unmaterialized)
root_page: usize,
root_page: i64,
}
impl IncrementalView {
@@ -215,9 +215,9 @@ impl IncrementalView {
fn try_compile_circuit(
select: &ast::Select,
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
main_data_root: i64,
internal_state_root: i64,
internal_state_index_root: i64,
) -> Result<DbspCircuit> {
// Build the logical plan from the SELECT statement
let mut builder = LogicalPlanBuilder::new(schema);
@@ -275,9 +275,9 @@ impl IncrementalView {
pub fn from_sql(
sql: &str,
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
main_data_root: i64,
internal_state_root: i64,
internal_state_index_root: i64,
) -> Result<Self> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
@@ -306,9 +306,9 @@ impl IncrementalView {
view_name: ast::QualifiedName,
select: ast::Select,
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
main_data_root: i64,
internal_state_root: i64,
internal_state_index_root: i64,
) -> Result<Self> {
let name = view_name.name.as_str().to_string();
@@ -353,9 +353,9 @@ impl IncrementalView {
table_conditions: HashMap<String, Vec<Option<ast::Expr>>>,
column_schema: ViewColumnSchema,
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
main_data_root: i64,
internal_state_root: i64,
internal_state_index_root: i64,
) -> Result<Self> {
// Create the tracker that will be shared by all operators
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
@@ -403,7 +403,7 @@ impl IncrementalView {
}
/// Get the root page for this materialized view's btree
pub fn get_root_page(&self) -> usize {
pub fn get_root_page(&self) -> i64 {
self.root_page
}

View File

@@ -1522,8 +1522,7 @@ impl Connection {
frame_watermark: Option<u64>,
) -> Result<bool> {
let pager = self.pager.read();
let (page_ref, c) = match pager.read_page_no_cache(page_idx as usize, frame_watermark, true)
{
let (page_ref, c) = match pager.read_page_no_cache(page_idx as i64, frame_watermark, true) {
Ok(result) => result,
// on windows, zero read will trigger UnexpectedEof
#[cfg(target_os = "windows")]

View File

@@ -20,7 +20,7 @@ enum CursorPosition {
pub struct MvccLazyCursor<Clock: LogicalClock> {
pub db: Arc<MvStore<Clock>>,
current_pos: CursorPosition,
table_id: u64,
table_id: i64,
tx_id: u64,
}
@@ -28,7 +28,7 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
pub fn new(
db: Arc<MvStore<Clock>>,
tx_id: u64,
table_id: u64,
table_id: i64,
pager: Arc<Pager>,
) -> Result<MvccLazyCursor<Clock>> {
db.maybe_initialize_table(table_id, pager)?;

View File

@@ -79,7 +79,7 @@ pub struct CheckpointStateMachine<Clock: LogicalClock> {
/// State machine for deleting rows from the B-tree
delete_row_state_machine: Option<StateMachine<DeleteRowStateMachine>>,
/// Cursors for the B-trees
cursors: HashMap<u64, Arc<RwLock<BTreeCursor>>>,
cursors: HashMap<i64, Arc<RwLock<BTreeCursor>>>,
/// Result of the checkpoint
checkpoint_result: Option<CheckpointResult>,
}
@@ -88,8 +88,8 @@ pub struct CheckpointStateMachine<Clock: LogicalClock> {
/// Special writes for CREATE TABLE / DROP TABLE ops.
/// These are used to create/destroy B-trees during pager ops.
pub enum SpecialWrite {
BTreeCreate { root_page: u64 },
BTreeDestroy { root_page: u64, num_columns: usize },
BTreeCreate { root_page: i64 },
BTreeDestroy { root_page: i64, num_columns: usize },
}
impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
@@ -167,7 +167,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
record_cursor.get_value(&row_data, 3)
);
};
root_page as u64
root_page
};
max_timestamp = max_timestamp.max(current_version_ts);
@@ -315,10 +315,11 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
if let Some(special_write) = special_write {
match special_write {
SpecialWrite::BTreeCreate { root_page } => {
let created_root_page = self.pager.io.block(|| {
self.pager.btree_create(&CreateBTreeFlags::new_table())
})?;
assert_eq!(created_root_page as u64, root_page, "Created root page does not match expected root page: {created_root_page} != {root_page}");
let created_root_page =
self.pager.io.block(|| {
self.pager.btree_create(&CreateBTreeFlags::new_table())
})? as i64;
assert_eq!(created_root_page , root_page, "Created root page does not match expected root page: {created_root_page} != {root_page}");
}
SpecialWrite::BTreeDestroy {
root_page,
@@ -330,7 +331,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
let cursor = BTreeCursor::new_table(
None,
self.pager.clone(),
root_page as usize,
root_page,
num_columns,
);
let cursor = Arc::new(RwLock::new(cursor));
@@ -350,7 +351,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
table_id as usize,
table_id,
num_columns,
);
let cursor = Arc::new(RwLock::new(cursor));

View File

@@ -42,12 +42,12 @@ pub mod tests;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RowID {
/// The table ID. Analogous to table's root page number.
pub table_id: u64,
pub table_id: i64,
pub row_id: i64,
}
impl RowID {
pub fn new(table_id: u64, row_id: i64) -> Self {
pub fn new(table_id: i64, row_id: i64) -> Self {
Self { table_id, row_id }
}
}
@@ -831,7 +831,7 @@ pub struct MvStore<Clock: LogicalClock> {
next_table_id: AtomicU64,
clock: Clock,
storage: Storage,
loaded_tables: RwLock<HashSet<u64>>,
loaded_tables: RwLock<HashSet<i64>>,
/// The transaction ID of a transaction that has acquired an exclusive write lock, if any.
///
@@ -1060,7 +1060,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn get_row_id_range(
&self,
table_id: u64,
table_id: i64,
start: i64,
bucket: &mut Vec<RowID>,
max_items: u64,
@@ -1090,7 +1090,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn get_next_row_id_for_table(
&self,
table_id: u64,
table_id: i64,
start: i64,
tx_id: TxID,
) -> Option<RowID> {
@@ -1644,7 +1644,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
///
/// # Arguments
///
pub fn maybe_initialize_table(&self, table_id: u64, pager: Arc<Pager>) -> Result<()> {
pub fn maybe_initialize_table(&self, table_id: i64, pager: Arc<Pager>) -> Result<()> {
tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
// First, check if the table is already loaded.
@@ -1661,15 +1661,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
// Mark table as loaded
pub fn mark_table_as_loaded(&self, table_id: u64) {
pub fn mark_table_as_loaded(&self, table_id: i64) {
self.loaded_tables.write().insert(table_id);
}
/// Scans the table and inserts the rows into the database.
///
/// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are.
fn scan_load_table(&self, table_id: u64, pager: Arc<Pager>) -> Result<()> {
let root_page = table_id as usize;
fn scan_load_table(&self, table_id: i64, pager: Arc<Pager>) -> Result<()> {
let root_page = table_id;
let mut cursor = BTreeCursor::new_table(
None, // No MVCC cursor for scanning
pager.clone(),
@@ -1741,7 +1741,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
Ok(())
}
pub fn get_last_rowid(&self, table_id: u64) -> Option<i64> {
pub fn get_last_rowid(&self, table_id: i64) -> Option<i64> {
let last_rowid = self
.rows
.upper_bound(Bound::Included(&RowID {

View File

@@ -103,7 +103,7 @@ impl MvccTestDbNoConn {
}
}
pub(crate) fn generate_simple_string_row(table_id: u64, id: i64, data: &str) -> Row {
pub(crate) fn generate_simple_string_row(table_id: i64, id: i64, data: &str) -> Row {
let record = ImmutableRecord::from_values(&[Value::Text(Text::new(data))], 1);
Row {
id: RowID {

View File

@@ -329,7 +329,7 @@ impl StreamingLogicalLogReader {
self.state = StreamingState::NeedTransactionStart;
continue;
}
let table_id = self.consume_u64(io)?;
let table_id = self.consume_u64(io)? as i64;
let record_type = self.consume_u8(io)?;
let _payload_size = self.consume_u64(io)?;
let mut bytes_read_on_row = 17; // table_id, record_type and payload_size

View File

@@ -353,15 +353,14 @@ impl Schema {
let mut cursor = BTreeCursor::new_table(mv_cursor, Arc::clone(&pager), 1, 10);
let mut from_sql_indexes = Vec::with_capacity(10);
let mut automatic_indices: HashMap<String, Vec<(String, usize)>> =
HashMap::with_capacity(10);
let mut automatic_indices: HashMap<String, Vec<(String, i64)>> = HashMap::with_capacity(10);
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
let mut dbsp_state_roots: HashMap<String, usize> = HashMap::new();
let mut dbsp_state_roots: HashMap<String, i64> = HashMap::new();
// Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page
let mut dbsp_state_index_roots: HashMap<String, usize> = HashMap::new();
let mut dbsp_state_index_roots: HashMap<String, i64> = HashMap::new();
// Store materialized view info (SQL and root page) for later creation
let mut materialized_view_info: HashMap<String, (String, usize)> = HashMap::new();
let mut materialized_view_info: HashMap<String, (String, i64)> = HashMap::new();
pager.begin_read_tx()?;
@@ -438,7 +437,7 @@ impl Schema {
pub fn populate_indices(
&mut self,
from_sql_indexes: Vec<UnparsedFromSqlIndex>,
automatic_indices: std::collections::HashMap<String, Vec<(String, usize)>>,
automatic_indices: std::collections::HashMap<String, Vec<(String, i64)>>,
) -> Result<()> {
for unparsed_sql_from_index in from_sql_indexes {
if !self.indexes_enabled() {
@@ -550,9 +549,9 @@ impl Schema {
/// Populate materialized views parsed from the schema.
pub fn populate_materialized_views(
&mut self,
materialized_view_info: std::collections::HashMap<String, (String, usize)>,
dbsp_state_roots: std::collections::HashMap<String, usize>,
dbsp_state_index_roots: std::collections::HashMap<String, usize>,
materialized_view_info: std::collections::HashMap<String, (String, i64)>,
dbsp_state_roots: std::collections::HashMap<String, i64>,
dbsp_state_index_roots: std::collections::HashMap<String, i64>,
) -> Result<()> {
for (view_name, (sql, main_root)) in materialized_view_info {
// Look up the DBSP state root for this view
@@ -620,10 +619,10 @@ impl Schema {
maybe_sql: Option<&str>,
syms: &SymbolTable,
from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
automatic_indices: &mut std::collections::HashMap<String, Vec<(String, usize)>>,
dbsp_state_roots: &mut std::collections::HashMap<String, usize>,
dbsp_state_index_roots: &mut std::collections::HashMap<String, usize>,
materialized_view_info: &mut std::collections::HashMap<String, (String, usize)>,
automatic_indices: &mut std::collections::HashMap<String, Vec<(String, i64)>>,
dbsp_state_roots: &mut std::collections::HashMap<String, i64>,
dbsp_state_index_roots: &mut std::collections::HashMap<String, i64>,
materialized_view_info: &mut std::collections::HashMap<String, (String, i64)>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<()> {
match ty {
@@ -647,10 +646,10 @@ impl Schema {
};
self.add_virtual_table(vtab);
if let Some(mv_store) = mv_store {
mv_store.mark_table_as_loaded(root_page as u64);
mv_store.mark_table_as_loaded(root_page);
}
} else {
let table = BTreeTable::from_sql(sql, root_page as usize)?;
let table = BTreeTable::from_sql(sql, root_page)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
@@ -667,8 +666,7 @@ impl Schema {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
if stored_version == DBSP_CIRCUIT_VERSION {
// Version matches, store the root page
dbsp_state_roots
.insert(view_name.to_string(), root_page as usize);
dbsp_state_roots.insert(view_name.to_string(), root_page);
} else {
// Version mismatch - DO NOT insert into dbsp_state_roots
// This will cause populate_materialized_views to skip this view
@@ -684,7 +682,7 @@ impl Schema {
}
if let Some(mv_store) = mv_store {
mv_store.mark_table_as_loaded(root_page as u64);
mv_store.mark_table_as_loaded(root_page);
}
self.add_btree_table(Arc::new(table));
}
@@ -695,7 +693,7 @@ impl Schema {
Some(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name.to_string(),
root_page: root_page as usize,
root_page,
sql: sql.to_string(),
});
}
@@ -721,17 +719,17 @@ impl Schema {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
if stored_version == DBSP_CIRCUIT_VERSION {
dbsp_state_index_roots
.insert(view_name.to_string(), root_page as usize);
.insert(view_name.to_string(), root_page);
}
}
}
} else {
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
e.insert(vec![(index_name, root_page)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
e.get_mut().push((index_name, root_page));
}
}
}
@@ -756,7 +754,7 @@ impl Schema {
// We'll handle reuse logic and create the actual IncrementalView
// in a later pass when we have both the main root page and DBSP state root
materialized_view_info
.insert(view_name.clone(), (sql.to_string(), root_page as usize));
.insert(view_name.clone(), (sql.to_string(), root_page));
// Mark the existing view for potential reuse
if self.incremental_views.contains_key(&view_name) {
@@ -876,7 +874,7 @@ pub enum Table {
}
impl Table {
pub fn get_root_page(&self) -> usize {
pub fn get_root_page(&self) -> i64 {
match self {
Table::BTree(table) => table.root_page,
Table::Virtual(_) => unimplemented!(),
@@ -962,7 +960,7 @@ pub struct UniqueSet {
#[derive(Clone, Debug)]
pub struct BTreeTable {
pub root_page: usize,
pub root_page: i64,
pub name: String,
pub primary_key_columns: Vec<(String, SortOrder)>,
pub columns: Vec<Column>,
@@ -996,7 +994,7 @@ impl BTreeTable {
.find(|(_, column)| column.name.as_ref() == Some(&name))
}
pub fn from_sql(sql: &str, root_page: usize) -> Result<BTreeTable> {
pub fn from_sql(sql: &str, root_page: i64) -> Result<BTreeTable> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
match cmd {
@@ -1094,11 +1092,7 @@ pub struct FromClauseSubquery {
pub result_columns_start_reg: Option<usize>,
}
pub fn create_table(
tbl_name: &str,
body: &CreateTableBody,
root_page: usize,
) -> Result<BTreeTable> {
pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> Result<BTreeTable> {
let table_name = normalize_ident(tbl_name);
trace!("Creating table {}", table_name);
let mut has_rowid = true;
@@ -1747,7 +1741,7 @@ pub fn sqlite_schema_table() -> BTreeTable {
pub struct Index {
pub name: String,
pub table_name: String,
pub root_page: usize,
pub root_page: i64,
pub columns: Vec<IndexColumn>,
pub unique: bool,
pub ephemeral: bool,
@@ -1776,7 +1770,7 @@ pub struct IndexColumn {
}
impl Index {
pub fn from_sql(sql: &str, root_page: usize, table: &BTreeTable) -> Result<Index> {
pub fn from_sql(sql: &str, root_page: i64, table: &BTreeTable) -> Result<Index> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
match cmd {
@@ -1824,7 +1818,7 @@ impl Index {
pub fn automatic_from_primary_key(
table: &BTreeTable,
auto_index: (String, usize), // name, root_page
auto_index: (String, i64), // name, root_page
column_count: usize,
) -> Result<Index> {
let has_primary_key_index =
@@ -1866,7 +1860,7 @@ impl Index {
pub fn automatic_from_unique(
table: &BTreeTable,
auto_index: (String, usize), // name, root_page
auto_index: (String, i64), // name, root_page
column_indices_and_sort_orders: Vec<(usize, SortOrder)>,
) -> Result<Index> {
let (index_name, root_page) = auto_index;

View File

@@ -520,7 +520,7 @@ pub struct BTreeCursor {
/// 2. an initialized database when the command is immediately followed by VACUUM.
usable_space_cached: usize,
/// Page id of the root page used to go back up fast.
root_page: usize,
root_page: i64,
/// Rowid and record are stored before being consumed.
pub has_record: Cell<bool>,
null_flag: bool,
@@ -571,7 +571,7 @@ pub struct BTreeCursor {
is_empty_table_state: RefCell<EmptyTableState>,
/// State machine for [BTreeCursor::move_to_rightmost] and, optionally, the id of the rightmost page in the btree.
/// If we know the rightmost page id and are already on that page, we can skip a seek.
move_to_right_state: (MoveToRightState, Option<usize>),
move_to_right_state: (MoveToRightState, Option<i64>),
/// State machine for [BTreeCursor::seek_to_last]
seek_to_last_state: SeekToLastState,
/// State machine for [BTreeCursor::rewind]
@@ -617,7 +617,7 @@ impl BTreeCursor {
pub fn new(
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
root_page: usize,
root_page: i64,
num_columns: usize,
) -> Self {
let valid_state = if root_page == 1 && !pager.db_state.is_initialized() {
@@ -668,7 +668,7 @@ impl BTreeCursor {
pub fn new_table(
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
root_page: usize,
root_page: i64,
num_columns: usize,
) -> Self {
Self::new(mv_cursor, pager, root_page, num_columns)
@@ -677,7 +677,7 @@ impl BTreeCursor {
pub fn new_index(
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
root_page: usize,
root_page: i64,
index: &Index,
num_columns: usize,
) -> Self {
@@ -766,7 +766,7 @@ impl BTreeCursor {
if let Some(rightmost_pointer) = rightmost_pointer {
let past_rightmost_pointer = cell_count as i32 + 1;
self.stack.set_cell_index(past_rightmost_pointer);
let (page, c) = self.read_page(rightmost_pointer as usize)?;
let (page, c) = self.read_page(rightmost_pointer as i64)?;
self.stack.push_backwards(page);
if let Some(c) = c {
io_yield_one!(c);
@@ -838,7 +838,7 @@ impl BTreeCursor {
self.stack.retreat();
}
let (mem_page, c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as i64)?;
self.stack.push_backwards(mem_page);
if let Some(c) = c {
io_yield_one!(c);
@@ -857,7 +857,7 @@ impl BTreeCursor {
) -> Result<IOResult<()>> {
loop {
if self.read_overflow_state.borrow().is_none() {
let (page, c) = self.read_page(start_next_page as usize)?;
let (page, c) = self.read_page(start_next_page as i64)?;
*self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow {
payload: payload.to_vec(),
next_page: start_next_page,
@@ -889,7 +889,7 @@ impl BTreeCursor {
*remaining_to_read -= to_read;
if *remaining_to_read != 0 && next != 0 {
let (new_page, c) = self.pager.read_page(next as usize)?;
let (new_page, c) = self.pager.read_page(next as i64)?;
*page = new_page;
*next_page = next;
if let Some(c) = c {
@@ -1052,7 +1052,7 @@ impl BTreeCursor {
let pages_to_skip = offset / overflow_size as u32;
let page_offset = offset % overflow_size as u32;
// Read page
let (page, c) = self.read_page(first_overflow_page.unwrap() as usize)?;
let (page, c) = self.read_page(first_overflow_page.unwrap() as i64)?;
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
@@ -1114,7 +1114,7 @@ impl BTreeCursor {
}
pages_left_to_skip -= 1;
let (next_page, c) = self.read_page(next as usize)?;
let (next_page, c) = self.read_page(next as i64)?;
self.state = CursorState::ReadWritePayload(
PayloadOverflowWithOffset::SkipOverflowPages {
@@ -1181,7 +1181,7 @@ impl BTreeCursor {
// Load next page
current_offset = 0; // Reset offset for new page
let (next_page, c) = self.read_page(next as usize)?;
let (next_page, c) = self.read_page(next as i64)?;
page = next_page;
self.state =
@@ -1313,7 +1313,7 @@ impl BTreeCursor {
(Some(right_most_pointer), false) => {
// do rightmost
self.stack.advance();
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as i64)?;
self.stack.push(mem_page);
if let Some(c) = c {
io_yield_one!(c);
@@ -1354,7 +1354,7 @@ impl BTreeCursor {
}
let left_child_page = contents.cell_interior_read_left_child_page(cell_idx);
let (mem_page, c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as i64)?;
self.stack.push(mem_page);
if let Some(c) = c {
io_yield_one!(c);
@@ -1431,7 +1431,7 @@ impl BTreeCursor {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as i64)?;
self.stack.push(mem_page);
if let Some(c) = c {
io_yield_one!(c);
@@ -1509,7 +1509,7 @@ impl BTreeCursor {
.unwrap()
.cell_interior_read_left_child_page(nearest_matching_cell);
self.stack.set_cell_index(nearest_matching_cell as i32);
let (mem_page, c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as i64)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -1527,7 +1527,7 @@ impl BTreeCursor {
.rightmost_pointer()
{
Some(right_most_pointer) => {
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as i64)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -1673,7 +1673,7 @@ impl BTreeCursor {
.rightmost_pointer()
{
Some(right_most_pointer) => {
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as i64)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -1713,14 +1713,14 @@ impl BTreeCursor {
{
let page = self.stack.get_page_at_level(old_top_idx).unwrap();
turso_assert!(
page.get().id != *left_child_page as usize,
page.get().id != *left_child_page as i64,
"corrupt: current page and left child page of cell {} are both {}",
leftmost_matching_cell,
page.get().id
);
}
let (mem_page, c) = self.read_page(*left_child_page as usize)?;
let (mem_page, c) = self.read_page(*left_child_page as i64)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -2813,7 +2813,7 @@ impl BTreeCursor {
let current_sibling = sibling_pointer;
let mut completions: Vec<Completion> = Vec::with_capacity(current_sibling + 1);
for i in (0..=current_sibling).rev() {
match btree_read_page(&self.pager, pgno as usize) {
match btree_read_page(&self.pager, pgno as i64) {
Err(e) => {
tracing::error!("error reading page {}: {}", pgno, e);
self.pager.io.cancel(&completions)?;
@@ -3459,7 +3459,7 @@ impl BTreeCursor {
let is_table_leaf = matches!(page_type, PageType::TableLeaf);
// Reassign page numbers in increasing order
{
let mut page_numbers: [usize; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE] =
let mut page_numbers: [i64; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE] =
[0; MAX_NEW_SIBLING_PAGES_AFTER_BALANCE];
for (i, page) in pages_to_balance_new
.iter()
@@ -4429,7 +4429,7 @@ impl BTreeCursor {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior
let (child, c) = self.read_page(right_most_pointer as usize)?;
let (child, c) = self.read_page(right_most_pointer as i64)?;
self.stack.push(child);
if let Some(c) = c {
io_yield_one!(c);
@@ -4471,7 +4471,7 @@ impl BTreeCursor {
!self.has_record.get()
}
pub fn root_page(&self) -> usize {
pub fn root_page(&self) -> i64 {
self.root_page
}
@@ -4718,7 +4718,7 @@ impl BTreeCursor {
match &self.mv_cursor {
Some(mv_cursor) => match key.maybe_rowid() {
Some(rowid) => {
let row_id = crate::mvcc::database::RowID::new(self.table_id() as u64, rowid);
let row_id = crate::mvcc::database::RowID::new(self.table_id(), rowid);
let record_buf = key.get_record().unwrap().get_payload().to_vec();
let num_columns = match key {
BTreeKey::IndexKey(record) => record.column_count(),
@@ -4984,7 +4984,7 @@ impl BTreeCursor {
cell_payload[..4].try_into().expect("invalid cell payload"),
);
turso_assert!(
left_child_page as usize != parent_page_id,
left_child_page as i64 != parent_page_id,
"corrupt: current page and left child page of cell {} are both {}",
left_child_page,
parent_page_id
@@ -5177,7 +5177,7 @@ impl BTreeCursor {
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, c) = self.read_page(next_page as usize)?;
let (page, c) = self.read_page(next_page as i64)?;
self.overflow_state = OverflowState::ProcessPage { next_page: page };
if let Some(c) = c {
io_yield_one!(c);
@@ -5209,7 +5209,7 @@ impl BTreeCursor {
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, c) = self.read_page(next as usize)?;
let (page, c) = self.read_page(next as i64)?;
self.overflow_state = OverflowState::ProcessPage { next_page: page };
if let Some(c) = c {
io_yield_one!(c);
@@ -5320,7 +5320,7 @@ impl BTreeCursor {
// Non-leaf page which has processed all children but not it's potential right child
(false, n) if n == contents.cell_count() as i32 => {
if let Some(rightmost) = contents.rightmost_pointer() {
let (rightmost_page, c) = self.read_page(rightmost as usize)?;
let (rightmost_page, c) = self.read_page(rightmost as i64)?;
self.stack.push(rightmost_page);
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5380,7 +5380,7 @@ impl BTreeCursor {
BTreeCell::IndexInteriorCell(cell) => cell.left_child_page,
_ => panic!("expected interior cell"),
};
let (child_page, c) = self.read_page(child_page_id as usize)?;
let (child_page, c) = self.read_page(child_page_id as i64)?;
self.stack.push(child_page);
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5399,7 +5399,7 @@ impl BTreeCursor {
// For an index interior cell, clear the left child page now that overflow pages have been cleared
BTreeCell::IndexInteriorCell(index_int_cell) => {
let (child_page, c) =
self.read_page(index_int_cell.left_child_page as usize)?;
self.read_page(index_int_cell.left_child_page as i64)?;
self.stack.push(child_page);
let destroy_info = self
.state
@@ -5464,7 +5464,7 @@ impl BTreeCursor {
btree_init_page(root_page, page_type, 0, self.pager.usable_space());
}
pub fn table_id(&self) -> usize {
pub fn table_id(&self) -> i64 {
self.root_page
}
@@ -5648,7 +5648,7 @@ impl BTreeCursor {
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
self.stack.advance();
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as i64)?;
self.stack.push(mem_page);
if let Some(c) = c {
io_yield_one!(c);
@@ -5667,7 +5667,7 @@ impl BTreeCursor {
..
}) => {
self.stack.advance();
let (mem_page, c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as i64)?;
self.stack.push(mem_page);
if let Some(c) = c {
io_yield_one!(c);
@@ -5732,7 +5732,7 @@ impl BTreeCursor {
}
}
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Option<Completion>)> {
pub fn read_page(&self, page_idx: i64) -> Result<(PageRef, Option<Completion>)> {
btree_read_page(&self.pager, page_idx)
}
@@ -5751,7 +5751,7 @@ pub enum IntegrityCheckError {
#[error("Cell {cell_idx} in page {page_id} is out of range. cell_range={cell_start}..{cell_end}, content_area={content_area}, usable_space={usable_space}")]
CellOutOfRange {
cell_idx: usize,
page_id: usize,
page_id: i64,
cell_start: usize,
cell_end: usize,
content_area: usize,
@@ -5760,7 +5760,7 @@ pub enum IntegrityCheckError {
#[error("Cell {cell_idx} in page {page_id} extends out of page. cell_range={cell_start}..{cell_end}, content_area={content_area}, usable_space={usable_space}")]
CellOverflowsPage {
cell_idx: usize,
page_id: usize,
page_id: i64,
cell_start: usize,
cell_end: usize,
content_area: usize,
@@ -5768,7 +5768,7 @@ pub enum IntegrityCheckError {
},
#[error("Page {page_id} ({page_category:?}) cell {cell_idx} has rowid={rowid} in wrong order. Parent cell has parent_rowid={max_intkey} and next_rowid={next_rowid}")]
CellRowidOutOfRange {
page_id: usize,
page_id: i64,
page_category: PageCategory,
cell_idx: usize,
rowid: i64,
@@ -5777,19 +5777,19 @@ pub enum IntegrityCheckError {
},
#[error("Page {page_id} is at different depth from another leaf page this_page_depth={this_page_depth}, other_page_depth={other_page_depth} ")]
LeafDepthMismatch {
page_id: usize,
page_id: i64,
this_page_depth: usize,
other_page_depth: usize,
},
#[error("Page {page_id} detected freeblock that extends page start={start} end={end}")]
FreeBlockOutOfRange {
page_id: usize,
page_id: i64,
start: usize,
end: usize,
},
#[error("Page {page_id} cell overlap detected at position={start} with previous_end={prev_end}. content_area={content_area}, is_free_block={is_free_block}")]
CellOverlap {
page_id: usize,
page_id: i64,
start: usize,
prev_end: usize,
content_area: usize,
@@ -5797,14 +5797,14 @@ pub enum IntegrityCheckError {
},
#[error("Page {page_id} unexpected fragmentation got={got}, expected={expected}")]
UnexpectedFragmentation {
page_id: usize,
page_id: i64,
got: usize,
expected: usize,
},
#[error("Page {page_id} referenced multiple times (references={references:?}, page_category={page_category:?})")]
PageReferencedMultipleTimes {
page_id: u64,
references: Vec<u64>,
page_id: i64,
references: Vec<i64>,
page_category: PageCategory,
},
#[error(
@@ -5832,7 +5832,7 @@ pub struct CheckFreelist {
#[derive(Clone)]
struct IntegrityCheckPageEntry {
page_idx: usize,
page_idx: i64,
level: usize,
max_intkey: i64,
page_category: PageCategory,
@@ -5840,7 +5840,7 @@ struct IntegrityCheckPageEntry {
pub struct IntegrityCheckState {
page_stack: Vec<IntegrityCheckPageEntry>,
first_leaf_level: Option<usize>,
page_reference: HashMap<u64, u64>,
page_reference: HashMap<i64, i64>,
page: Option<PageRef>,
pub freelist_count: CheckFreelist,
}
@@ -5865,7 +5865,7 @@ impl IntegrityCheckState {
pub fn start(
&mut self,
page_idx: usize,
page_idx: i64,
page_category: PageCategory,
errors: &mut Vec<IntegrityCheckError>,
) {
@@ -5891,10 +5891,10 @@ impl IntegrityCheckState {
fn push_page(
&mut self,
entry: IntegrityCheckPageEntry,
referenced_by: u64,
referenced_by: i64,
errors: &mut Vec<IntegrityCheckError>,
) {
let page_id = entry.page_idx as u64;
let page_id = entry.page_idx;
let Some(previous) = self.page_reference.insert(page_id, referenced_by) else {
self.page_stack.push(entry);
return;
@@ -5960,12 +5960,12 @@ pub fn integrity_check(
if next_freelist_trunk_page != 0 {
state.push_page(
IntegrityCheckPageEntry {
page_idx: next_freelist_trunk_page as usize,
page_idx: next_freelist_trunk_page as i64,
level,
max_intkey,
page_category: PageCategory::FreeListTrunk,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -5974,12 +5974,12 @@ pub fn integrity_check(
let page_pointer = contents.read_u32_no_offset((8 + 4 * i) as usize);
state.push_page(
IntegrityCheckPageEntry {
page_idx: page_pointer as usize,
page_idx: page_pointer as i64,
level,
max_intkey,
page_category: PageCategory::FreePage,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -5994,12 +5994,12 @@ pub fn integrity_check(
if next_overflow_page != 0 {
state.push_page(
IntegrityCheckPageEntry {
page_idx: next_overflow_page as usize,
page_idx: next_overflow_page as i64,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -6048,12 +6048,12 @@ pub fn integrity_check(
BTreeCell::TableInteriorCell(table_interior_cell) => {
state.push_page(
IntegrityCheckPageEntry {
page_idx: table_interior_cell.left_child_page as usize,
page_idx: table_interior_cell.left_child_page as i64,
level: level + 1,
max_intkey: table_interior_cell.rowid,
page_category: PageCategory::Normal,
},
page.get().id as u64,
page.get().id,
errors,
);
let rowid = table_interior_cell.rowid;
@@ -6097,12 +6097,12 @@ pub fn integrity_check(
if let Some(first_overflow_page) = table_leaf_cell.first_overflow_page {
state.push_page(
IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
page_idx: first_overflow_page as i64,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -6110,23 +6110,23 @@ pub fn integrity_check(
BTreeCell::IndexInteriorCell(index_interior_cell) => {
state.push_page(
IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
page_idx: index_interior_cell.left_child_page as i64,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
page_category: PageCategory::Normal,
},
page.get().id as u64,
page.get().id,
errors,
);
if let Some(first_overflow_page) = index_interior_cell.first_overflow_page {
state.push_page(
IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
page_idx: first_overflow_page as i64,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -6147,12 +6147,12 @@ pub fn integrity_check(
if let Some(first_overflow_page) = index_leaf_cell.first_overflow_page {
state.push_page(
IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
page_idx: first_overflow_page as i64,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -6163,12 +6163,12 @@ pub fn integrity_check(
if let Some(rightmost) = contents.rightmost_pointer() {
state.push_page(
IntegrityCheckPageEntry {
page_idx: rightmost as usize,
page_idx: rightmost as i64,
level: level + 1,
max_intkey,
page_category: PageCategory::Normal,
},
page.get().id as u64,
page.get().id,
errors,
);
}
@@ -6206,7 +6206,7 @@ pub fn integrity_check(
pub fn btree_read_page(
pager: &Arc<Pager>,
page_idx: usize,
page_idx: i64,
) -> Result<(Arc<Page>, Option<Completion>)> {
pager.read_page(page_idx)
}
@@ -6245,11 +6245,11 @@ fn validate_cells_after_insertion(cell_array: &CellArray, leaf_data: bool) {
pub struct CoverageChecker {
/// Min-heap ordered by cell start
heap: BinaryHeap<Reverse<IntegrityCheckCellRange>>,
page_idx: usize,
page_idx: i64,
}
impl CoverageChecker {
pub fn new(page_idx: usize) -> Self {
pub fn new(page_idx: i64) -> Self {
Self {
heap: BinaryHeap::new(),
page_idx,
@@ -7853,7 +7853,7 @@ mod tests {
use super::{btree_init_page, defragment_page, drop_cell, insert_into_cell};
#[allow(clippy::arc_with_non_send_sync)]
fn get_page(id: usize) -> Arc<Page> {
fn get_page(id: i64) -> Arc<Page> {
let page = Arc::new(Page::new(id));
let inner = PageContent::new(0, Arc::new(Buffer::new_temporary(4096)));
@@ -8005,7 +8005,7 @@ mod tests {
}
}
fn validate_btree(pager: Arc<Pager>, page_idx: usize) -> (usize, bool) {
fn validate_btree(pager: Arc<Pager>, page_idx: i64) -> (usize, bool) {
let num_columns = 5;
let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns);
let (page, _c) = cursor.read_page(page_idx).unwrap();
@@ -8028,7 +8028,7 @@ mod tests {
BTreeCell::TableInteriorCell(TableInteriorCell {
left_child_page, ..
}) => {
let (child_page, _c) = cursor.read_page(left_child_page as usize).unwrap();
let (child_page, _c) = cursor.read_page(left_child_page as i64).unwrap();
while child_page.is_locked() {
pager.io.step().unwrap();
}
@@ -8042,7 +8042,7 @@ mod tests {
continue;
}
let (child_depth, child_valid) =
validate_btree(pager.clone(), left_child_page as usize);
validate_btree(pager.clone(), left_child_page as i64);
valid &= child_valid;
child_depth
}
@@ -8075,7 +8075,7 @@ mod tests {
}
}
if let Some(right) = contents.rightmost_pointer() {
let (right_depth, right_valid) = validate_btree(pager.clone(), right as usize);
let (right_depth, right_valid) = validate_btree(pager.clone(), right as i64);
valid &= right_valid;
depth = Some(depth.unwrap_or(right_depth + 1));
if depth != Some(right_depth + 1) {
@@ -8115,7 +8115,7 @@ mod tests {
(depth.unwrap(), valid)
}
fn format_btree(pager: Arc<Pager>, page_idx: usize, depth: usize) -> String {
fn format_btree(pager: Arc<Pager>, page_idx: i64, depth: usize) -> String {
let num_columns = 5;
let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns);
@@ -8139,7 +8139,7 @@ mod tests {
));
child.push(format_btree(
pager.clone(),
cell.left_child_page as usize,
cell.left_child_page as i64,
depth + 2,
));
}
@@ -8155,7 +8155,7 @@ mod tests {
}
}
if let Some(rightmost) = contents.rightmost_pointer() {
child.push(format_btree(pager.clone(), rightmost as usize, depth + 2));
child.push(format_btree(pager.clone(), rightmost as i64, depth + 2));
}
let current = format!(
"{}-page:{}, ptr(right):{}\n{}+cells:{}",
@@ -8173,7 +8173,7 @@ mod tests {
}
}
fn empty_btree() -> (Arc<Pager>, usize, Arc<Database>, Arc<Connection>) {
fn empty_btree() -> (Arc<Pager>, i64, Arc<Database>, Arc<Connection>) {
#[allow(clippy::arc_with_non_send_sync)]
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap();
@@ -8565,7 +8565,7 @@ mod tests {
let index_root_page = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
.unwrap() as usize;
.unwrap() as i64;
let index_def = Index {
name: "testindex".to_string(),
where_clause: None,
@@ -8727,7 +8727,7 @@ mod tests {
let index_root_page = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
.unwrap() as usize;
.unwrap() as i64;
let index_def = Index {
name: "testindex".to_string(),
where_clause: None,
@@ -9150,7 +9150,7 @@ mod tests {
let large_payload = vec![b'A'; max_local + usable_size];
// Setup overflow pages (2, 3, 4) with linking
let mut current_page = 2u32;
let mut current_page = 2_i64;
while current_page <= 4 {
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(Buffer::new_temporary(
@@ -9163,15 +9163,13 @@ mod tests {
let c = Completion::new_write(move |_| {
let _ = _buf.clone();
});
let _c = pager.db_file.write_page(
current_page as usize,
buf.clone(),
&IOContext::default(),
c,
)?;
let _c =
pager
.db_file
.write_page(current_page, buf.clone(), &IOContext::default(), c)?;
pager.io.step()?;
let (page, _c) = cursor.read_page(current_page as usize)?;
let (page, _c) = cursor.read_page(current_page)?;
while page.is_locked() {
cursor.pager.io.step()?;
}
@@ -9184,7 +9182,7 @@ mod tests {
} else {
0
};
contents.write_u32_no_offset(0, next_page); // Write pointer to next overflow page
contents.write_u32_no_offset(0, next_page as u32); // Write pointer to next overflow page
let buf = contents.as_ptr();
buf[4..].fill(b'A');
@@ -9231,7 +9229,7 @@ mod tests {
let trunk_page_id = freelist_trunk_page;
if trunk_page_id > 0 {
// Verify trunk page structure
let (trunk_page, _c) = cursor.read_page(trunk_page_id as usize)?;
let (trunk_page, _c) = cursor.read_page(trunk_page_id as i64)?;
let contents = trunk_page.get_contents();
// Read number of leaf pages in trunk
let n_leaf = contents.read_u32_no_offset(4);

View File

@@ -14,7 +14,7 @@ impl ChecksumContext {
}
#[cfg(not(feature = "checksum"))]
pub fn add_checksum_to_page(&self, _page: &mut [u8], _page_id: usize) -> Result<()> {
pub fn add_checksum_to_page(&self, _page: &mut [u8], _page_id: i64) -> Result<()> {
Ok(())
}
@@ -22,13 +22,13 @@ impl ChecksumContext {
pub fn verify_checksum(
&self,
_page: &mut [u8],
_page_id: usize,
_page_id: i64,
) -> std::result::Result<(), CompletionError> {
Ok(())
}
#[cfg(feature = "checksum")]
pub fn add_checksum_to_page(&self, page: &mut [u8], _page_id: usize) -> Result<()> {
pub fn add_checksum_to_page(&self, page: &mut [u8], _page_id: i64) -> Result<()> {
if page.len() != CHECKSUM_PAGE_SIZE {
return Ok(());
}
@@ -47,7 +47,7 @@ impl ChecksumContext {
pub fn verify_checksum(
&self,
page: &mut [u8],
page_id: usize,
page_id: i64,
) -> std::result::Result<(), CompletionError> {
if page.len() != CHECKSUM_PAGE_SIZE {
return Ok(());

View File

@@ -62,17 +62,17 @@ impl Default for IOContext {
pub trait DatabaseStorage: Send + Sync {
fn read_header(&self, c: Completion) -> Result<Completion>;
fn read_page(&self, page_idx: usize, io_ctx: &IOContext, c: Completion) -> Result<Completion>;
fn read_page(&self, page_idx: i64, io_ctx: &IOContext, c: Completion) -> Result<Completion>;
fn write_page(
&self,
page_idx: usize,
page_idx: i64,
buffer: Arc<Buffer>,
io_ctx: &IOContext,
c: Completion,
) -> Result<Completion>;
fn write_pages(
&self,
first_page_idx: usize,
first_page_idx: i64,
page_size: usize,
buffers: Vec<Arc<Buffer>>,
io_ctx: &IOContext,
@@ -96,7 +96,8 @@ impl DatabaseStorage for DatabaseFile {
}
#[instrument(skip_all, level = Level::DEBUG)]
fn read_page(&self, page_idx: usize, io_ctx: &IOContext, c: Completion) -> Result<Completion> {
fn read_page(&self, page_idx: i64, io_ctx: &IOContext, c: Completion) -> Result<Completion> {
assert!(page_idx >= 0, "page should be positive");
let r = c.as_read();
let size = r.buf().len();
assert!(page_idx > 0);
@@ -184,7 +185,7 @@ impl DatabaseStorage for DatabaseFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn write_page(
&self,
page_idx: usize,
page_idx: i64,
buffer: Arc<Buffer>,
io_ctx: &IOContext,
c: Completion,
@@ -207,7 +208,7 @@ impl DatabaseStorage for DatabaseFile {
fn write_pages(
&self,
first_page_idx: usize,
first_page_idx: i64,
page_size: usize,
buffers: Vec<Arc<Buffer>>,
io_ctx: &IOContext,
@@ -225,12 +226,12 @@ impl DatabaseStorage for DatabaseFile {
EncryptionOrChecksum::Encryption(ctx) => buffers
.into_iter()
.enumerate()
.map(|(i, buffer)| encrypt_buffer(first_page_idx + i, buffer, ctx))
.map(|(i, buffer)| encrypt_buffer(first_page_idx + i as i64, buffer, ctx))
.collect::<Vec<_>>(),
EncryptionOrChecksum::Checksum(ctx) => buffers
.into_iter()
.enumerate()
.map(|(i, buffer)| checksum_buffer(first_page_idx + i, buffer, ctx))
.map(|(i, buffer)| checksum_buffer(first_page_idx + i as i64, buffer, ctx))
.collect::<Vec<_>>(),
EncryptionOrChecksum::None => buffers,
};
@@ -262,12 +263,12 @@ impl DatabaseFile {
}
}
fn encrypt_buffer(page_idx: usize, buffer: Arc<Buffer>, ctx: &EncryptionContext) -> Arc<Buffer> {
fn encrypt_buffer(page_idx: i64, buffer: Arc<Buffer>, ctx: &EncryptionContext) -> Arc<Buffer> {
let encrypted_data = ctx.encrypt_page(buffer.as_slice(), page_idx).unwrap();
Arc::new(Buffer::new(encrypted_data.to_vec()))
}
fn checksum_buffer(page_idx: usize, buffer: Arc<Buffer>, ctx: &ChecksumContext) -> Arc<Buffer> {
fn checksum_buffer(page_idx: i64, buffer: Arc<Buffer>, ctx: &ChecksumContext) -> Arc<Buffer> {
ctx.add_checksum_to_page(buffer.as_mut_slice(), page_idx)
.unwrap();
buffer

View File

@@ -424,7 +424,7 @@ impl EncryptionContext {
}
#[cfg(feature = "encryption")]
pub fn encrypt_page(&self, page: &[u8], page_id: usize) -> Result<Vec<u8>> {
pub fn encrypt_page(&self, page: &[u8], page_id: i64) -> Result<Vec<u8>> {
use crate::storage::sqlite3_ondisk::DatabaseHeader;
tracing::debug!("encrypting page {}", page_id);
assert_eq!(
@@ -480,7 +480,7 @@ impl EncryptionContext {
}
#[cfg(feature = "encryption")]
pub fn decrypt_page(&self, encrypted_page: &[u8], page_id: usize) -> Result<Vec<u8>> {
pub fn decrypt_page(&self, encrypted_page: &[u8], page_id: i64) -> Result<Vec<u8>> {
use crate::storage::sqlite3_ondisk::DatabaseHeader;
tracing::debug!("decrypting page {}", page_id);
assert_eq!(
@@ -577,14 +577,14 @@ impl EncryptionContext {
}
#[cfg(not(feature = "encryption"))]
pub fn encrypt_page(&self, _page: &[u8], _page_id: usize) -> Result<Vec<u8>> {
pub fn encrypt_page(&self, _page: &[u8], _page_id: i64) -> Result<Vec<u8>> {
Err(LimboError::InvalidArgument(
"encryption is not enabled, cannot encrypt page. enable via passing `--features encryption`".into(),
))
}
#[cfg(not(feature = "encryption"))]
pub fn decrypt_page(&self, _encrypted_page: &[u8], _page_id: usize) -> Result<Vec<u8>> {
pub fn decrypt_page(&self, _encrypted_page: &[u8], _page_id: i64) -> Result<Vec<u8>> {
Err(LimboError::InvalidArgument(
"encryption is not enabled, cannot decrypt page. enable via passing `--features encryption`".into(),
))

View File

@@ -299,17 +299,17 @@ impl PageCache {
.clone();
if entry.is_locked() {
return Err(CacheError::Locked {
pgno: entry.get().id,
pgno: entry.get().id as usize,
});
}
if entry.is_dirty() {
return Err(CacheError::Dirty {
pgno: entry.get().id,
pgno: entry.get().id as usize,
});
}
if entry.is_pinned() {
return Err(CacheError::Pinned {
pgno: entry.get().id,
pgno: entry.get().id as usize,
});
}
if clean_page {
@@ -538,7 +538,9 @@ impl PageCache {
let e = &self.entries[node.slot_index];
if let Some(ref p) = e.page {
if p.is_dirty() {
return Err(CacheError::Dirty { pgno: p.get().id });
return Err(CacheError::Dirty {
pgno: p.get().id as usize,
});
}
}
}
@@ -870,7 +872,7 @@ mod tests {
}
pub fn page_with_content(page_id: usize) -> PageRef {
let page = Arc::new(Page::new(page_id));
let page = Arc::new(Page::new(page_id as i64));
{
let buffer = crate::Buffer::new_temporary(4096);
let page_content = PageContent {
@@ -1325,7 +1327,7 @@ mod tests {
let id_page = rng.next_u64() % max_pages;
let key = PageCacheKey::new(id_page as usize);
#[allow(clippy::arc_with_non_send_sync)]
let page = Arc::new(Page::new(id_page as usize));
let page = Arc::new(Page::new(id_page as i64));
if cache.peek(&key, false).is_some() {
continue; // Skip duplicate page ids
@@ -1370,8 +1372,8 @@ mod tests {
// Verify all pages in reference_map are in cache
for (key, page) in &reference_map {
let cached_page = cache.peek(key, false).expect("Page should be in cache");
assert_eq!(cached_page.get().id, key.0);
assert_eq!(page.get().id, key.0);
assert_eq!(cached_page.get().id, key.0 as i64);
assert_eq!(page.get().id, key.0 as i64);
}
}
}

View File

@@ -124,7 +124,7 @@ impl HeaderRefMut {
pub struct PageInner {
pub flags: AtomicUsize,
pub contents: Option<PageContent>,
pub id: usize,
pub id: i64,
/// If >0, the page is pinned and not eligible for eviction from the page cache.
/// The reason this is a counter is that multiple nested code paths may signal that
/// a page must not be evicted from the page cache, so even if an inner code path
@@ -185,7 +185,7 @@ const PAGE_DIRTY: usize = 0b1000;
const PAGE_LOADED: usize = 0b10000;
impl Page {
pub fn new(id: usize) -> Self {
pub fn new(id: i64) -> Self {
Self {
inner: UnsafeCell::new(PageInner {
flags: AtomicUsize::new(0),
@@ -712,7 +712,7 @@ impl Pager {
ptrmap_pg_no
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?;
self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
@@ -819,7 +819,7 @@ impl Pager {
offset_in_ptrmap_page
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as i64)?;
self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
@@ -1177,10 +1177,11 @@ impl Pager {
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page_no_cache(
&self,
page_idx: usize,
page_idx: i64,
frame_watermark: Option<u64>,
allow_empty_read: bool,
) -> Result<(PageRef, Completion)> {
assert!(page_idx >= 0);
tracing::trace!("read_page_no_cache(page_idx = {})", page_idx);
let page = Arc::new(Page::new(page_idx));
let io_ctx = self.io_ctx.read();
@@ -1191,7 +1192,7 @@ impl Pager {
);
page.set_locked();
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read, &io_ctx)?;
let c = self.begin_read_disk_page(page_idx as usize, page.clone(), allow_empty_read, &io_ctx)?;
return Ok((page, c));
};
@@ -1204,16 +1205,18 @@ impl Pager {
return Ok((page, c));
}
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read, &io_ctx)?;
let c =
self.begin_read_disk_page(page_idx as usize, page.clone(), allow_empty_read, &io_ctx)?;
Ok((page, c))
}
/// Reads a page from the database.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Option<Completion>)> {
pub fn read_page(&self, page_idx: i64) -> Result<(PageRef, Option<Completion>)> {
assert!(page_idx >= 0, "pages in pager should be positive, negative might indicate unallocated pages from mvcc or any other nasty bug");
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_idx);
let page_key = PageCacheKey::new(page_idx as usize);
if let Some(page) = page_cache.get(&page_key)? {
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
turso_assert!(
@@ -1229,7 +1232,7 @@ impl Pager {
"attempted to read page {page_idx} but got page {}",
page.get().id
);
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
self.cache_insert(page_idx as usize, page.clone(), &mut page_cache)?;
Ok((page, Some(c)))
}
@@ -1313,7 +1316,7 @@ impl Pager {
pub fn add_dirty(&self, page: &Page) {
// TODO: check duplicates?
let mut dirty_pages = self.dirty_pages.write();
dirty_pages.insert(page.get().id);
dirty_pages.insert(page.get().id as usize);
page.set_dirty();
}
@@ -1663,7 +1666,7 @@ impl Pager {
let content = page.get_contents();
content.as_ptr().copy_from_slice(raw_page);
turso_assert!(
page.get().id == header.page_number as usize,
page.get().id == header.page_number as i64,
"page has unexpected id"
);
}
@@ -1868,7 +1871,7 @@ impl Pager {
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
// This is implemented in accordance with sqlite freepage2() function.
#[instrument(skip_all, level = Level::DEBUG)]
pub fn free_page(&self, mut page: Option<PageRef>, page_id: usize) -> Result<IOResult<()>> {
pub fn free_page(&self, mut page: Option<PageRef>, page_id: i64) -> Result<IOResult<()>> {
tracing::trace!("free_page(page_id={})", page_id);
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
const LEAF_ENTRY_SIZE: usize = 4;
@@ -1885,7 +1888,7 @@ impl Pager {
loop {
match &mut *state {
FreePageState::Start => {
if page_id < 2 || page_id > header.database_size.get() as usize {
if page_id < 2 || page_id as usize > header.database_size.get() as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {page_id} for free operation"
)));
@@ -1923,7 +1926,7 @@ impl Pager {
}
FreePageState::AddToTrunk { page } => {
let trunk_page_id = header.freelist_trunk_page.get();
let (trunk_page, c) = self.read_page(trunk_page_id as usize)?;
let (trunk_page, c) = self.read_page(trunk_page_id as i64)?;
if let Some(c) = c {
if !c.is_completed() {
io_yield_one!(c);
@@ -1941,7 +1944,7 @@ impl Pager {
if number_of_leaf_pages < max_free_list_entries as u32 {
turso_assert!(
trunk_page.get().id == trunk_page_id as usize,
trunk_page.get().id == trunk_page_id as i64,
"trunk page has unexpected id"
);
self.add_dirty(&trunk_page);
@@ -2041,7 +2044,7 @@ impl Pager {
AllocatePage1State::Writing { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
tracing::trace!("allocate_page1(Writing done)");
let page_key = PageCacheKey::new(page.get().id);
let page_key = PageCacheKey::new(page.get().id as usize);
let mut cache = self.page_cache.write();
cache.insert(page_key, page.clone()).map_err(|e| {
LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}"))
@@ -2103,10 +2106,9 @@ impl Pager {
{
// we will allocate a ptrmap page, so increment size
new_db_size += 1;
let page =
allocate_new_page(new_db_size as usize, &self.buffer_pool, 0);
let page = allocate_new_page(new_db_size as i64, &self.buffer_pool, 0);
self.add_dirty(&page);
let page_key = PageCacheKey::new(page.get().id);
let page_key = PageCacheKey::new(page.get().id as usize);
let mut cache = self.page_cache.write();
cache.insert(page_key, page.clone())?;
}
@@ -2119,7 +2121,7 @@ impl Pager {
};
continue;
}
let (trunk_page, c) = self.read_page(first_freelist_trunk_page_id as usize)?;
let (trunk_page, c) = self.read_page(first_freelist_trunk_page_id as i64)?;
*state = AllocatePageState::SearchAvailableFreeListLeaf {
trunk_page,
current_db_size: new_db_size,
@@ -2149,7 +2151,7 @@ impl Pager {
let page_contents = trunk_page.get_contents();
let next_leaf_page_id =
page_contents.read_u32_no_offset(FREELIST_TRUNK_OFFSET_FIRST_LEAF);
let (leaf_page, c) = self.read_page(next_leaf_page_id as usize)?;
let (leaf_page, c) = self.read_page(next_leaf_page_id as i64)?;
turso_assert!(
number_of_freelist_leaves > 0,
@@ -2189,7 +2191,7 @@ impl Pager {
trunk_page.get().id
);
trunk_page.get_contents().as_ptr().fill(0);
let page_key = PageCacheKey::new(trunk_page.get().id);
let page_key = PageCacheKey::new(trunk_page.get().id as usize);
{
let page_cache = self.page_cache.read();
turso_assert!(
@@ -2221,7 +2223,7 @@ impl Pager {
leaf_page.get().id
);
leaf_page.get_contents().as_ptr().fill(0);
let page_key = PageCacheKey::new(leaf_page.get().id);
let page_key = PageCacheKey::new(leaf_page.get().id as usize);
{
let page_cache = self.page_cache.read();
turso_assert!(
@@ -2270,12 +2272,12 @@ impl Pager {
}
// FIXME: should reserve page cache entry before modifying the database
let page = allocate_new_page(new_db_size as usize, &self.buffer_pool, 0);
let page = allocate_new_page(new_db_size as i64, &self.buffer_pool, 0);
{
// setup page and add to cache
self.add_dirty(&page);
let page_key = PageCacheKey::new(page.get().id);
let page_key = PageCacheKey::new(page.get().id as usize);
{
// Run in separate block to avoid deadlock on page cache write lock
let mut cache = self.page_cache.write();
@@ -2292,11 +2294,11 @@ impl Pager {
pub fn update_dirty_loaded_page_in_cache(
&self,
id: usize,
id: i64,
page: PageRef,
) -> Result<(), LimboError> {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(id);
let page_key = PageCacheKey::new(id as usize);
// FIXME: use specific page key for writer instead of max frame, this will make readers not conflict
assert!(page.is_dirty());
@@ -2406,7 +2408,7 @@ impl Pager {
}
}
pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
pub fn allocate_new_page(page_id: i64, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
let page = Arc::new(Page::new(page_id));
{
let buffer = buffer_pool.get_page();
@@ -2774,7 +2776,7 @@ mod ptrmap_tests {
assert_eq!(expected_ptrmap_pg_no, FIRST_PTRMAP_PAGE_NO);
// Ensure the pointer map page ref is created and loadable via the pager
let ptrmap_page_ref = pager.read_page(expected_ptrmap_pg_no as usize);
let ptrmap_page_ref = pager.read_page(expected_ptrmap_pg_no as i64);
assert!(ptrmap_page_ref.is_ok());
// Ensure that the database header size is correctly reflected

View File

@@ -294,7 +294,7 @@ pub struct DatabaseHeader {
}
impl DatabaseHeader {
pub const PAGE_ID: usize = 1;
pub const PAGE_ID: i64 = 1;
pub const SIZE: usize = size_of::<Self>();
const _CHECK: () = {
@@ -927,13 +927,13 @@ pub fn begin_read_page(
finish_read_page(page_idx, buf, page.clone());
});
let c = Completion::new_read(buf, complete);
db_file.read_page(page_idx, io_ctx, c)
db_file.read_page(page_idx as i64, io_ctx, c)
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn finish_read_page(page_idx: usize, buffer_ref: Arc<Buffer>, page: PageRef) {
tracing::trace!("finish_read_page(page_idx = {page_idx})");
let pos = if page_idx == DatabaseHeader::PAGE_ID {
let pos = if page_idx as i64 == DatabaseHeader::PAGE_ID {
DatabaseHeader::SIZE
} else {
0
@@ -1025,13 +1025,13 @@ pub fn write_pages_vectored(
const EST_BUFF_CAPACITY: usize = 32;
let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
let mut run_start_id: Option<usize> = None;
let mut run_start_id: Option<i64> = None;
let mut completions = Vec::new();
let mut iter = batch.iter().peekable();
while let Some((id, buffer)) = iter.next() {
if run_start_id.is_none() {
run_start_id = Some(*id);
run_start_id = Some(*id as i64);
}
run_bufs.push(buffer.clone());
@@ -1975,7 +1975,7 @@ pub fn begin_read_wal_frame(
bytes_read > 0,
"Expected to read some data on success for page_idx={page_idx}"
);
match encryption_ctx.decrypt_page(encrypted_buf.as_slice(), page_idx) {
match encryption_ctx.decrypt_page(encrypted_buf.as_slice(), page_idx as i64) {
Ok(decrypted_data) => {
encrypted_buf
.as_mut_slice()
@@ -1986,7 +1986,9 @@ pub fn begin_read_wal_frame(
tracing::error!(
"Failed to decrypt WAL frame data for page_idx={page_idx}: {e}"
);
original_complete(Err(CompletionError::DecryptionError { page_idx }));
original_complete(Err(CompletionError::DecryptionError {
page_idx: page_idx as i64,
}));
}
}
});
@@ -2009,7 +2011,7 @@ pub fn begin_read_wal_frame(
return;
}
match checksum_ctx.verify_checksum(buf.as_mut_slice(), page_idx) {
match checksum_ctx.verify_checksum(buf.as_mut_slice(), page_idx as i64) {
Ok(_) => {
original_c(Ok((buf, bytes_read)));
}

View File

@@ -1104,7 +1104,7 @@ impl Wal for WalFile {
"read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}"
);
let cloned = frame.clone();
finish_read_page(page.get().id, buf, cloned);
finish_read_page(page.get().id as usize, buf, cloned);
let epoch = shared_file.read().epoch.load(Ordering::Acquire);
frame.set_wal_tag(frame_id, epoch);
});
@@ -1129,7 +1129,7 @@ impl Wal for WalFile {
offset + WAL_FRAME_HEADER_SIZE as u64,
buffer_pool,
complete,
page_idx,
page_idx as usize,
&self.io_ctx.read(),
)
}
@@ -1166,7 +1166,7 @@ impl Wal for WalFile {
let (header, raw_page) = sqlite3_ondisk::parse_wal_frame_header(frame_ref);
if let Some(ctx) = encryption_ctx.clone() {
match ctx.decrypt_page(raw_page, header.page_number as usize) {
match ctx.decrypt_page(raw_page, header.page_number as i64) {
Ok(decrypted_data) => {
turso_assert!(
(frame_len - WAL_FRAME_HEADER_SIZE) == decrypted_data.len(),

View File

@@ -135,7 +135,7 @@ pub fn translate_analyze(
program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});

View File

@@ -426,7 +426,7 @@ pub fn translate_insert(
program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone())),
)
})
.collect::<Vec<(&String, usize, usize)>>();
.collect::<Vec<(&String, i64, usize)>>();
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;

View File

@@ -114,7 +114,7 @@ pub fn translate_create_table(
program.alloc_cursor_id(CursorType::BTreeTable(schema_master_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
let cdc_table = prepare_cdc_if_necessary(&mut program, resolver.schema, SQLITE_TABLEID)?;
@@ -202,7 +202,7 @@ pub fn translate_create_table(
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
@@ -537,7 +537,7 @@ pub fn translate_create_virtual_table(
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
@@ -642,7 +642,7 @@ pub fn translate_drop_table(
);
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id_0,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
@@ -826,7 +826,7 @@ pub fn translate_drop_table(
});
program.emit_insn(Insn::OpenRead {
cursor_id: sqlite_schema_cursor_id_1,
root_page: 1usize,
root_page: 1i64,
db: 0,
});
@@ -883,7 +883,7 @@ pub fn translate_drop_table(
// 5. Open a write cursor to the schema table and re-insert the records placed in the ephemeral table but insert the correct root page now
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id_1,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
@@ -910,10 +910,7 @@ pub fn translate_drop_table(
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 0, schema_column_0_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 1, schema_column_1_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 2, schema_column_2_register);
let root_page = table
.get_root_page()
.try_into()
.expect("Failed to cast the root page to an i64");
let root_page = table.get_root_page();
program.emit_insn(Insn::Integer {
value: root_page,
dest: moved_to_root_page_register,

View File

@@ -348,7 +348,7 @@ pub fn emit_upsert(
set_pairs: &mut [(usize, Box<ast::Expr>)],
where_clause: &mut Option<Box<ast::Expr>>,
resolver: &Resolver,
idx_cursors: &[(&String, usize, usize)],
idx_cursors: &[(&String, i64, usize)],
returning: &mut [ResultSetColumn],
cdc_cursor_id: Option<usize>,
row_done_label: BranchOffset,

View File

@@ -125,7 +125,7 @@ pub fn translate_create_materialized_view(
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
@@ -259,7 +259,7 @@ pub fn translate_create_view(
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});
@@ -340,7 +340,7 @@ pub fn translate_drop_view(
program.alloc_cursor_id(CursorType::BTreeTable(schema_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
root_page: 1i64.into(),
db: 0,
});

View File

@@ -131,7 +131,7 @@ pub const PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX: &str = "sqlite_autoindex_";
/// CREATE INDEX idx ON table_name(sql)
pub struct UnparsedFromSqlIndex {
pub table_name: String,
pub root_page: usize,
pub root_page: i64,
pub sql: String,
}
@@ -151,13 +151,13 @@ pub fn parse_schema_rows(
let mut automatic_indices = std::collections::HashMap::with_capacity(10);
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
let mut dbsp_state_roots: std::collections::HashMap<String, usize> =
let mut dbsp_state_roots: std::collections::HashMap<String, i64> =
std::collections::HashMap::new();
// Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page
let mut dbsp_state_index_roots: std::collections::HashMap<String, usize> =
let mut dbsp_state_index_roots: std::collections::HashMap<String, i64> =
std::collections::HashMap::new();
// Store materialized view info (SQL and root page) for later creation
let mut materialized_view_info: std::collections::HashMap<String, (String, usize)> =
let mut materialized_view_info: std::collections::HashMap<String, (String, i64)> =
std::collections::HashMap::new();
loop {
match rows.step()? {

View File

@@ -1037,13 +1037,14 @@ pub fn op_open_read(
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = *root_page as u64;
let table_id = *root_page;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store, tx_id, table_id, pager.clone()).unwrap(),
));
Some(mv_cursor)
} else {
assert!(*root_page >= 0, "");
None
};
let cursors = &mut state.cursors;
@@ -2241,6 +2242,7 @@ pub fn op_transaction_inner(
},
insn
);
tracing::info!("op_transaction: mv_store.is_some()={}", mv_store.is_some());
let pager = program.get_pager_from_database_index(db);
loop {
match state.op_transaction_state {
@@ -6594,9 +6596,9 @@ pub fn op_open_write(
let pager = program.get_pager_from_database_index(db);
let root_page = match root_page {
RegisterOrLiteral::Literal(lit) => *lit as u64,
RegisterOrLiteral::Literal(lit) => *lit,
RegisterOrLiteral::Register(reg) => match &state.registers[*reg].get_value() {
Value::Integer(val) => *val as u64,
Value::Integer(val) => *val,
_ => {
return Err(LimboError::InternalError(
"OpenWrite: the value in root_page is not an integer".into(),
@@ -6605,7 +6607,7 @@ pub fn op_open_write(
},
};
const SQLITE_SCHEMA_ROOT_PAGE: u64 = 1;
const SQLITE_SCHEMA_ROOT_PAGE: i64 = 1;
if root_page == SQLITE_SCHEMA_ROOT_PAGE {
if let Some(mv_store) = mv_store {
@@ -6648,7 +6650,7 @@ pub fn op_open_write(
let cursor = BTreeCursor::new_index(
mv_cursor,
pager.clone(),
root_page as usize,
root_page,
index.as_ref(),
num_columns,
);
@@ -6665,8 +6667,7 @@ pub fn op_open_write(
),
};
let cursor =
BTreeCursor::new_table(mv_cursor, pager.clone(), root_page as usize, num_columns);
let cursor = BTreeCursor::new_table(mv_cursor, pager.clone(), root_page, num_columns);
cursors
.get_mut(*cursor_id)
.unwrap()
@@ -7399,11 +7400,11 @@ pub fn op_open_ephemeral(
} else {
&CreateBTreeFlags::new_index()
};
let root_page = return_if_io!(pager.btree_create(flag));
let root_page = return_if_io!(pager.btree_create(flag)) as i64;
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = root_page as u64;
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
@@ -7420,15 +7421,9 @@ pub fn op_open_ephemeral(
};
let cursor = if let CursorType::BTreeIndex(index) = cursor_type {
BTreeCursor::new_index(
mv_cursor,
pager.clone(),
root_page as usize,
index,
num_columns,
)
BTreeCursor::new_index(mv_cursor, pager.clone(), root_page, index, num_columns)
} else {
BTreeCursor::new_table(mv_cursor, pager.clone(), root_page as usize, num_columns)
BTreeCursor::new_table(mv_cursor, pager.clone(), root_page, num_columns)
};
state.op_open_ephemeral_state = OpOpenEphemeralState::Rewind {
cursor: Box::new(cursor),
@@ -7508,7 +7503,7 @@ pub fn op_open_dup(
let pager = &original_cursor.pager;
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = root_page as u64;
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Arc::new(RwLock::new(MvCursor::new(
mv_store,
@@ -7744,7 +7739,7 @@ pub fn op_integrity_check(
.get()));
integrity_check_state.set_expected_freelist_count(expected_freelist_count as usize);
integrity_check_state.start(
freelist_trunk_page as usize,
freelist_trunk_page as i64,
PageCategory::FreeListTrunk,
&mut errors,
);

View File

@@ -859,7 +859,7 @@ pub enum Insn {
/// Deletes an entire database table or index whose root page in the database file is given by P1.
Destroy {
/// The root page of the table/index to destroy
root: usize,
root: i64,
/// Register to store the former value of any moved root page (for AUTOVACUUM)
former_root_reg: usize,
/// Whether this is a temporary table (1) or main database table (0)
@@ -1093,7 +1093,7 @@ pub enum Insn {
/// stored in P4_INTARRAY argument. If P5 is not zero, the check is done on the auxiliary database file, not the main database file. This opcode is used to implement the integrity_check pragma.
IntegrityCk {
max_errors: usize,
roots: Vec<usize>,
roots: Vec<i64>,
message_register: usize,
},
RenameTable {

View File

@@ -162,7 +162,7 @@ impl BranchOffset {
pub type CursorID = usize;
pub type PageIdx = usize;
pub type PageIdx = i64;
// Index of insn in list of insns
type InsnReference = u32;