implement database persistence for messages

This commit is contained in:
liamcottle
2025-02-13 12:50:47 +13:00
parent 28c2c1fd47
commit 7cc56ccc69
13 changed files with 3024 additions and 18 deletions

220
src/js/Database.js Normal file
View File

@@ -0,0 +1,220 @@
import {v4} from 'uuid';
import {createRxDatabase} from 'rxdb/plugins/core';
import {getRxStorageDexie} from 'rxdb/plugins/storage-dexie';
import GlobalState from "./GlobalState.js";
import Utils from "./Utils.js";
var database = null;
async function initDatabase(publicKeyHex) {
// close any exsiting database connection
if(database){
await database.destroy();
}
// create a database with a unique name per identity
database = await createRxDatabase({
name: `meshcore_companion_db_${publicKeyHex}`,
storage: getRxStorageDexie(),
allowSlowCount: true,
});
// add database schemas
await database.addCollections({
messages: {
schema: {
version: 0,
primaryKey: 'id',
type: 'object',
properties: {
id: {
type: 'string',
maxLength: 36,
},
status: {
type: 'string',
},
to: {
type: 'string',
},
from: {
type: 'string',
},
path_len: {
type: 'integer',
},
txt_type: {
type: 'integer',
},
sender_timestamp: {
type: 'integer',
},
text: {
type: 'string',
},
timestamp: {
type: 'integer',
},
expected_ack_crc: {
type: 'integer',
},
error: {
type: 'string',
},
},
}
},
contact_messages_read_state: {
schema: {
version: 0,
primaryKey: 'id',
type: 'object',
properties: {
id: {
type: 'string',
maxLength: 36,
},
timestamp: {
type: 'integer',
},
},
}
},
});
}
class Message {
// insert a message into the database
static async insert(data) {
return await database.messages.insert({
id: v4(),
status: data.status,
to: Utils.bytesToHex(data.to),
from: Utils.bytesToHex(data.from),
path_len: data.pathLen,
txt_type: data.txtType,
sender_timestamp: data.sender_timestamp,
text: data.text,
timestamp: Date.now(),
expected_ack_crc: data.expected_ack_crc,
error: null,
});
}
// mark a message as delivered by its ack code
static async setMessageDeliveredByAckCode(ackCode) {
// find one latest message by ack code
// this will prevent updating older messages that might have the same ack code
const latestMessageByPacketId = database.messages.findOne({
selector: {
expected_ack_crc: {
$eq: ackCode,
},
},
sort: [
{
timestamp: "desc",
},
],
});
// patch the message state
return await latestMessageByPacketId.incrementalPatch({
status: "delivered",
});
}
// get all messages
static getAllMessages() {
return database.messages.find();
}
// get direct messages for the provided public key
static getContactMessages(publicKey) {
return database.messages.find({
selector: {
$or: [
// messages from us to other contact
{
from: {
$eq: Utils.bytesToHex(GlobalState.selfInfo.publicKey),
},
to: {
$eq: Utils.bytesToHex(publicKey),
},
},
// messages from other contact to us
{
from: {
$eq: Utils.bytesToHex(publicKey),
},
to: {
$eq: Utils.bytesToHex(GlobalState.selfInfo.publicKey),
},
},
]
},
sort: [
{
timestamp: "asc",
},
],
});
}
// get unread direct messages count for the provided public key
static getContactMessagesUnreadCount(publicKey, messagesLastReadTimestamp) {
return database.messages.count({
selector: {
timestamp: {
$gt: messagesLastReadTimestamp,
},
from: {
$eq: Utils.bytesToHex(publicKey),
},
to: {
$eq: Utils.bytesToHex(GlobalState.selfInfo.publicKey),
},
},
});
}
// delete direct messages for the provided public key
static async deleteContactMessages(publicKey) {
await this.getContactMessages(publicKey).remove();
}
}
class ContactMessagesReadState {
// update the read state of messages for the provided public key
static async touch(publicKey) {
return await database.contact_messages_read_state.upsert({
id: Utils.bytesToHex(publicKey),
timestamp: Date.now(),
});
}
// get the read state of messages for the provided public key
static get(publicKey) {
return database.contact_messages_read_state.findOne({
selector: {
id: {
$eq: Utils.bytesToHex(publicKey),
},
},
});
}
}
export default {
initDatabase,
Message,
ContactMessagesReadState,
};