use drizzle orm for database

This commit is contained in:
hzrd149
2025-03-29 12:45:28 +00:00
parent 4f325554ee
commit 755204c17a
40 changed files with 2199 additions and 1076 deletions

1
.vscode/launch.json vendored
View File

@@ -19,6 +19,7 @@
"args": ["--loader", "@swc-node/register/esm", "src/index.ts"], "args": ["--loader", "@swc-node/register/esm", "src/index.ts"],
"outFiles": ["${workspaceFolder}/**/*.js"], "outFiles": ["${workspaceFolder}/**/*.js"],
"env": { "env": {
"DATA_PATH": "./data",
"NODE_ENV": "development", "NODE_ENV": "development",
"DEBUG": "bakery,bakery:*,applesauce,applesauce:*", "DEBUG": "bakery,bakery:*,applesauce,applesauce:*",
"DEBUG_HIDE_DATE": "true", "DEBUG_HIDE_DATE": "true",

11
drizzle.config.ts Normal file
View File

@@ -0,0 +1,11 @@
import "dotenv/config";
import { defineConfig } from "drizzle-kit";
export default defineConfig({
dialect: "sqlite",
schema: "./src/db/schema.ts",
out: "./drizzle",
dbCredentials: {
url: process.env.DATABASE!,
},
});

View File

@@ -0,0 +1,44 @@
CREATE TABLE `application_state` (
`id` text PRIMARY KEY NOT NULL,
`state` text
);
--> statement-breakpoint
CREATE TABLE `decryption_cache` (
`event` text(64) PRIMARY KEY NOT NULL,
`content` text NOT NULL,
FOREIGN KEY (`event`) REFERENCES `events`(`id`) ON UPDATE no action ON DELETE no action
);
--> statement-breakpoint
CREATE TABLE `events` (
`id` text(64) PRIMARY KEY NOT NULL,
`created_at` integer NOT NULL,
`pubkey` text(64) NOT NULL,
`sig` text NOT NULL,
`kind` integer NOT NULL,
`content` text NOT NULL,
`tags` text NOT NULL,
`identifier` text
);
--> statement-breakpoint
CREATE INDEX `created_at` ON `events` (`created_at`);--> statement-breakpoint
CREATE INDEX `pubkey` ON `events` (`pubkey`);--> statement-breakpoint
CREATE INDEX `kind` ON `events` (`kind`);--> statement-breakpoint
CREATE INDEX `identifier` ON `events` (`identifier`);--> statement-breakpoint
CREATE TABLE `logs` (
`id` text PRIMARY KEY NOT NULL,
`timestamp` integer,
`service` text NOT NULL,
`message` text NOT NULL
);
--> statement-breakpoint
CREATE TABLE `tags` (
`id` integer PRIMARY KEY AUTOINCREMENT NOT NULL,
`event` text(64) NOT NULL,
`tag` text(1) NOT NULL,
`value` text NOT NULL,
FOREIGN KEY (`event`) REFERENCES `events`(`id`) ON UPDATE no action ON DELETE no action
);
--> statement-breakpoint
CREATE INDEX `event` ON `tags` (`event`);--> statement-breakpoint
CREATE INDEX `tag` ON `tags` (`tag`);--> statement-breakpoint
CREATE INDEX `value` ON `tags` (`value`);

View File

@@ -0,0 +1,287 @@
{
"version": "6",
"dialect": "sqlite",
"id": "118ac536-0bb2-4d0c-8bbe-1ba319ec7dc8",
"prevId": "00000000-0000-0000-0000-000000000000",
"tables": {
"application_state": {
"name": "application_state",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"state": {
"name": "state",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"decryption_cache": {
"name": "decryption_cache",
"columns": {
"event": {
"name": "event",
"type": "text(64)",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"content": {
"name": "content",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"decryption_cache_event_events_id_fk": {
"name": "decryption_cache_event_events_id_fk",
"tableFrom": "decryption_cache",
"tableTo": "events",
"columnsFrom": [
"event"
],
"columnsTo": [
"id"
],
"onDelete": "no action",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"events": {
"name": "events",
"columns": {
"id": {
"name": "id",
"type": "text(64)",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"pubkey": {
"name": "pubkey",
"type": "text(64)",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"sig": {
"name": "sig",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"kind": {
"name": "kind",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"content": {
"name": "content",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"tags": {
"name": "tags",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"identifier": {
"name": "identifier",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
}
},
"indexes": {
"created_at": {
"name": "created_at",
"columns": [
"created_at"
],
"isUnique": false
},
"pubkey": {
"name": "pubkey",
"columns": [
"pubkey"
],
"isUnique": false
},
"kind": {
"name": "kind",
"columns": [
"kind"
],
"isUnique": false
},
"identifier": {
"name": "identifier",
"columns": [
"identifier"
],
"isUnique": false
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"logs": {
"name": "logs",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"timestamp": {
"name": "timestamp",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"service": {
"name": "service",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"message": {
"name": "message",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"tags": {
"name": "tags",
"columns": {
"id": {
"name": "id",
"type": "integer",
"primaryKey": true,
"notNull": true,
"autoincrement": true
},
"event": {
"name": "event",
"type": "text(64)",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"tag": {
"name": "tag",
"type": "text(1)",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"value": {
"name": "value",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {
"event": {
"name": "event",
"columns": [
"event"
],
"isUnique": false
},
"tag": {
"name": "tag",
"columns": [
"tag"
],
"isUnique": false
},
"value": {
"name": "value",
"columns": [
"value"
],
"isUnique": false
}
},
"foreignKeys": {
"tags_event_events_id_fk": {
"name": "tags_event_events_id_fk",
"tableFrom": "tags",
"tableTo": "events",
"columnsFrom": [
"event"
],
"columnsTo": [
"id"
],
"onDelete": "no action",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
}
},
"views": {},
"enums": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"indexes": {}
}
}

View File

@@ -0,0 +1,13 @@
{
"version": "7",
"dialect": "sqlite",
"entries": [
{
"idx": 0,
"version": "6",
"when": 1743251841984,
"tag": "0000_fluffy_risque",
"breakpoints": true
}
]
}

View File

@@ -7,12 +7,13 @@
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"prepack": "tsc", "prepack": "pnpm build",
"start": "node .", "start": "node .",
"dev": "DATA_PATH=./data nodemon --loader @swc-node/register/esm src/index.ts", "dev": "DATA_PATH=./data nodemon --loader @swc-node/register/esm src/index.ts",
"mcp": "mcp-inspector node . --mcp", "mcp": "mcp-inspector node . --mcp",
"mcp-debug": "mcp-inspector node --inspect-brk . --mcp", "mcp-debug": "mcp-inspector node --inspect-brk . --mcp",
"build": "tsc", "build": "tsc",
"generate": "drizzle-kit generate",
"test": "vitest run", "test": "vitest run",
"format": "prettier -w .", "format": "prettier -w .",
"prerelease-next": "pnpm build", "prerelease-next": "pnpm build",
@@ -45,6 +46,7 @@
"dayjs": "^1.11.13", "dayjs": "^1.11.13",
"debug": "^4.4.0", "debug": "^4.4.0",
"dotenv": "^16.4.7", "dotenv": "^16.4.7",
"drizzle-orm": "^0.41.0",
"express": "^4.21.2", "express": "^4.21.2",
"get-port": "^7.1.0", "get-port": "^7.1.0",
"hash-sum": "^2.0.0", "hash-sum": "^2.0.0",
@@ -84,8 +86,10 @@
"@types/qrcode-terminal": "^0.12.2", "@types/qrcode-terminal": "^0.12.2",
"@types/web-push": "^3.6.4", "@types/web-push": "^3.6.4",
"@types/ws": "^8.18.0", "@types/ws": "^8.18.0",
"drizzle-kit": "^0.30.6",
"nodemon": "^3.1.9", "nodemon": "^3.1.9",
"prettier": "^3.5.3", "prettier": "^3.5.3",
"tsx": "^4.19.3",
"typescript": "^5.8.2", "typescript": "^5.8.2",
"vitest": "^3.0.9" "vitest": "^3.0.9"
}, },

900
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,86 +0,0 @@
import EventEmitter from "events";
import Database, { type Database as SQLDatabase } from "better-sqlite3";
import path from "path";
import fs from "fs";
import { DATA_PATH } from "../env.js";
export type LocalDatabaseConfig = {
directory: string;
name: string;
wal: boolean;
};
export default class LocalDatabase extends EventEmitter {
config: LocalDatabaseConfig;
path: { main: string; shm: string; wal: string };
db: SQLDatabase;
constructor(config: Partial<LocalDatabaseConfig>) {
super();
this.config = {
directory: DATA_PATH,
name: "events",
wal: true,
...config,
};
this.path = {
main: path.join(this.config.directory, `${this.config.name}.db`),
shm: path.join(this.config.directory, `${this.config.name}.db-shm`),
wal: path.join(this.config.directory, `${this.config.name}.db-wal`),
};
// Detect architecture to pass the correct native sqlite module
this.db = new Database(this.path.main);
if (this.config.wal) this.db.exec("PRAGMA journal_mode = WAL");
}
hasTable(table: string) {
const result = this.db
.prepare<[string], { count: number }>(`SELECT COUNT(*) as count FROM sqlite_master WHERE type='table' AND name=?`)
.get(table);
return !!result && result.count > 0;
}
// Delete all events in the database
/** @deprecated this should not be used */
clear() {
this.db.transaction(() => {
this.db.prepare(`DELETE FROM tags`).run();
if (this.hasTable("event_labels")) this.db.prepare(`DELETE FROM event_labels`).run();
this.db.prepare(`DELETE FROM events`).run();
})();
}
// Get number of events in the database
/** @deprecated this should be moved to a report */
count() {
const result = this.db.prepare(`SELECT COUNT(*) AS events FROM events`).get() as { events: number };
return result.events;
}
// Get total size of the database on disk
size() {
let sum;
try {
const statMain = fs.statSync(this.path.main).size;
const statShm = this.config.wal ? fs.statSync(this.path.shm).size : 0;
const statWal = this.config.wal ? fs.statSync(this.path.wal).size : 0;
sum = statMain + statShm + statWal;
} catch (err) {
console.log(err);
}
return sum;
}
destroy() {
this.removeAllListeners();
}
}

View File

@@ -9,7 +9,6 @@ import { filter } from "rxjs";
import cors from "cors"; import cors from "cors";
import { logger } from "../logger.js"; import { logger } from "../logger.js";
import Database from "./database.js";
import { NIP_11_SOFTWARE_URL, SENSITIVE_KINDS } from "../const.js"; import { NIP_11_SOFTWARE_URL, SENSITIVE_KINDS } from "../const.js";
import { OWNER_PUBKEY, BAKERY_PORT } from "../env.js"; import { OWNER_PUBKEY, BAKERY_PORT } from "../env.js";
@@ -18,7 +17,6 @@ import ControlApi from "../modules/control/control-api.js";
import ConfigActions from "../modules/control/config-actions.js"; import ConfigActions from "../modules/control/config-actions.js";
import ReceiverActions from "../modules/control/receiver-actions.js"; import ReceiverActions from "../modules/control/receiver-actions.js";
import Receiver from "../modules/receiver/index.js"; import Receiver from "../modules/receiver/index.js";
import DatabaseActions from "../modules/control/database-actions.js";
import DirectMessageManager from "../modules/direct-message-manager.js"; import DirectMessageManager from "../modules/direct-message-manager.js";
import DirectMessageActions from "../modules/control/dm-actions.js"; import DirectMessageActions from "../modules/control/dm-actions.js";
import AddressBook from "../modules/address-book.js"; import AddressBook from "../modules/address-book.js";
@@ -33,18 +31,17 @@ import DecryptionCache from "../modules/decryption-cache/decryption-cache.js";
import DecryptionCacheActions from "../modules/control/decryption-cache.js"; import DecryptionCacheActions from "../modules/control/decryption-cache.js";
import Scrapper from "../modules/scrapper/index.js"; import Scrapper from "../modules/scrapper/index.js";
import LogsActions from "../modules/control/logs-actions.js"; import LogsActions from "../modules/control/logs-actions.js";
import ApplicationStateManager from "../modules/state/application-state-manager.js"; import ApplicationStateManager from "../modules/application-state/manager.js";
import ScrapperActions from "../modules/control/scrapper-actions.js"; import ScrapperActions from "../modules/control/scrapper-actions.js";
import InboundNetworkManager from "../modules/network/inbound/index.js"; import InboundNetworkManager from "../modules/network/inbound/index.js";
import OutboundNetworkManager from "../modules/network/outbound/index.js"; import OutboundNetworkManager from "../modules/network/outbound/index.js";
import SecretsManager from "../modules/secrets-manager.js"; import SecretsManager from "../modules/secrets-manager.js";
import Switchboard from "../modules/switchboard/switchboard.js"; import Switchboard from "../modules/switchboard/switchboard.js";
import Gossip from "../modules/gossip.js"; import Gossip from "../modules/gossip.js";
import database from "../services/database.js";
import secrets from "../services/secrets.js"; import secrets from "../services/secrets.js";
import bakeryConfig from "../services/config.js"; import bakeryConfig from "../services/config.js";
import logStore from "../services/log-store.js"; import logStore from "../services/log-store.js";
import stateManager from "../services/state.js"; import stateManager from "../services/app-state.js";
import eventCache from "../services/event-cache.js"; import eventCache from "../services/event-cache.js";
import { inboundNetwork, outboundNetwork } from "../services/network.js"; import { inboundNetwork, outboundNetwork } from "../services/network.js";
import { server } from "../services/server.js"; import { server } from "../services/server.js";
@@ -54,7 +51,8 @@ import { getDMRecipient } from "../helpers/direct-messages.js";
import { onConnection, onJSONMessage } from "../helpers/ws.js"; import { onConnection, onJSONMessage } from "../helpers/ws.js";
import QueryManager from "../modules/queries/manager.js"; import QueryManager from "../modules/queries/manager.js";
import "../modules/queries/queries/index.js"; import "../modules/queries/queries/index.js";
import bakerySigner from "../services/bakery.js"; import bakerySigner from "../services/bakery-signer.js";
import db from "../db/index.js";
type EventMap = { type EventMap = {
listening: []; listening: [];
@@ -74,7 +72,7 @@ export default class App extends EventEmitter<EventMap> {
inboundNetwork: InboundNetworkManager; inboundNetwork: InboundNetworkManager;
outboundNetwork: OutboundNetworkManager; outboundNetwork: OutboundNetworkManager;
database: Database; database: typeof db;
eventStore: SQLiteEventStore; eventStore: SQLiteEventStore;
logStore: LogStore; logStore: LogStore;
relay: NostrRelay; relay: NostrRelay;
@@ -130,7 +128,7 @@ export default class App extends EventEmitter<EventMap> {
}); });
// Init sqlite database // Init sqlite database
this.database = database; this.database = db;
// create log managers // create log managers
this.logStore = logStore; this.logStore = logStore;
@@ -149,8 +147,7 @@ export default class App extends EventEmitter<EventMap> {
this.eventStore = eventCache; this.eventStore = eventCache;
// setup decryption cache // setup decryption cache
this.decryptionCache = new DecryptionCache(this.database.db); this.decryptionCache = new DecryptionCache(this.database);
this.decryptionCache.setup();
// Setup managers user contacts and profiles // Setup managers user contacts and profiles
this.addressBook = new AddressBook(); this.addressBook = new AddressBook();
@@ -192,7 +189,6 @@ export default class App extends EventEmitter<EventMap> {
this.control.registerHandler(new ConfigActions(this)); this.control.registerHandler(new ConfigActions(this));
this.control.registerHandler(new ReceiverActions(this)); this.control.registerHandler(new ReceiverActions(this));
this.control.registerHandler(new ScrapperActions(this)); this.control.registerHandler(new ScrapperActions(this));
this.control.registerHandler(new DatabaseActions(this));
this.control.registerHandler(new DirectMessageActions(this)); this.control.registerHandler(new DirectMessageActions(this));
this.control.registerHandler(new NotificationActions(this)); this.control.registerHandler(new NotificationActions(this));
this.control.registerHandler(new RemoteAuthActions(this)); this.control.registerHandler(new RemoteAuthActions(this));
@@ -395,9 +391,7 @@ export default class App extends EventEmitter<EventMap> {
this.config.write(); this.config.write();
this.scrapper.stop(); this.scrapper.stop();
this.receiver.stop(); this.receiver.stop();
await this.state.saveAll();
this.relay.stop(); this.relay.stop();
this.database.destroy();
this.receiver.destroy(); this.receiver.destroy();
await this.inboundNetwork.stop(); await this.inboundNetwork.stop();

View File

@@ -1,3 +1,4 @@
import { BehaviorSubject } from "rxjs";
import { EventEmitter } from "events"; import { EventEmitter } from "events";
import { LowSync, SyncAdapter } from "lowdb"; import { LowSync, SyncAdapter } from "lowdb";
import { JSONFileSync } from "lowdb/node"; import { JSONFileSync } from "lowdb/node";
@@ -17,6 +18,7 @@ export class ReactiveJsonFile<T extends object> extends EventEmitter<EventMap<T>
adapter: SyncAdapter<T>; adapter: SyncAdapter<T>;
data: T; data: T;
data$: BehaviorSubject<T>;
constructor(path: string, defaultData: T) { constructor(path: string, defaultData: T) {
super(); super();
@@ -25,6 +27,7 @@ export class ReactiveJsonFile<T extends object> extends EventEmitter<EventMap<T>
this.db = new LowSync<T>(this.adapter, defaultData); this.db = new LowSync<T>(this.adapter, defaultData);
this.data = this.createProxy(); this.data = this.createProxy();
this.data$ = new BehaviorSubject(this.db.data);
} }
private createProxy() { private createProxy() {
@@ -34,6 +37,7 @@ export class ReactiveJsonFile<T extends object> extends EventEmitter<EventMap<T>
}, },
set: (target, p, newValue, receiver) => { set: (target, p, newValue, receiver) => {
Reflect.set(target, p, newValue, receiver); Reflect.set(target, p, newValue, receiver);
this.data$.next(target as T);
this.emit("changed", target as T, String(p), newValue); this.emit("changed", target as T, String(p), newValue);
this.emit("updated", target as T); this.emit("updated", target as T);
return true; return true;
@@ -43,12 +47,14 @@ export class ReactiveJsonFile<T extends object> extends EventEmitter<EventMap<T>
read() { read() {
this.db.read(); this.db.read();
this.data$.next(this.db.data);
this.emit("loaded", this.db.data); this.emit("loaded", this.db.data);
this.emit("updated", this.db.data); this.emit("updated", this.db.data);
this.createProxy(); this.createProxy();
} }
write() { write() {
this.db.write(); this.db.write();
this.data$.next(this.db.data);
this.emit("saved", this.db.data); this.emit("saved", this.db.data);
} }
update(fn: (data: T) => unknown) { update(fn: (data: T) => unknown) {

22
src/db/database.ts Normal file
View File

@@ -0,0 +1,22 @@
import { drizzle } from "drizzle-orm/better-sqlite3";
import { migrate } from "drizzle-orm/better-sqlite3/migrator";
import Database from "better-sqlite3";
import { DATABASE } from "../env.js";
import * as schema from "./schema.js";
import { setupEventFts } from "./search/events.js";
import { setupDecryptedFts } from "./search/decrypted.js";
const sqlite = new Database(DATABASE);
const bakeryDatabase = drizzle(sqlite, { schema });
export type BakeryDatabase = typeof bakeryDatabase;
// Run migrations first
migrate(bakeryDatabase, { migrationsFolder: "./drizzle" });
// Setup search tables after migrations
setupEventFts(sqlite);
setupDecryptedFts(sqlite);
export default bakeryDatabase;

20
src/db/helpers.ts Normal file
View File

@@ -0,0 +1,20 @@
import { type Database } from "better-sqlite3";
import { NostrEvent } from "nostr-tools";
import * as schema from "./schema.js";
export function hasTable(db: Database, table: string) {
return db.prepare(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`).get(table);
}
export function parseEventRow(row: typeof schema.events.$inferSelect): NostrEvent {
return {
kind: row.kind,
id: row.id,
pubkey: row.pubkey,
content: row.content,
created_at: row.created_at,
sig: row.sig,
tags: JSON.parse(row.tags),
};
}

3
src/db/index.ts Normal file
View File

@@ -0,0 +1,3 @@
export { default, type BakeryDatabase } from "./database.js";
export * as schema from "./schema.js";
export * as helpers from "./helpers.js";

270
src/db/queries.ts Normal file
View File

@@ -0,0 +1,270 @@
import { Filter } from "nostr-tools";
import { eq, sql, desc, isNull, and } from "drizzle-orm";
import { mapParams } from "../helpers/sql.js";
import database from "./database.js";
import { schema } from "./index.js";
const isFilterKeyIndexableTag = (key: string) => {
return key[0] === "#" && key.length === 2;
};
const isFilterKeyIndexableAndTag = (key: string) => {
return key[0] === "&" && key.length === 2;
};
export const eventQuery = database.query.events
.findFirst({
where: (events, { eq }) => eq(events.id, sql.placeholder("id")),
})
.prepare();
export const addressableQuery = database.query.events
.findFirst({
where: (events, { eq }) =>
and(
eq(events.kind, sql.placeholder("kind")),
eq(events.pubkey, sql.placeholder("pubkey")),
eq(events.identifier, sql.placeholder("identifier")),
),
orderBy: [desc(schema.events.created_at), desc(schema.events.id)],
})
.prepare();
export const addressableHistoryQuery = database.query.events
.findMany({
where: (events, { eq }) =>
and(
eq(events.kind, sql.placeholder("kind")),
eq(events.pubkey, sql.placeholder("pubkey")),
eq(events.identifier, sql.placeholder("identifier")),
),
orderBy: [desc(schema.events.created_at), desc(schema.events.id)],
})
.prepare();
export const replaceableQuery = database.query.events
.findFirst({
where: (events, { eq, isNull }) =>
and(
eq(events.kind, sql.placeholder("kind")),
eq(events.pubkey, sql.placeholder("pubkey")),
isNull(events.identifier),
),
orderBy: [desc(schema.events.created_at), desc(schema.events.id)],
})
.prepare();
export const replaceableHistoryQuery = database.query.events
.findMany({
where: (events, { eq, isNull }) =>
and(
eq(events.kind, sql.placeholder("kind")),
eq(events.pubkey, sql.placeholder("pubkey")),
isNull(events.identifier),
),
orderBy: [desc(schema.events.created_at), desc(schema.events.id)],
})
.prepare();
function buildConditionsForFilter(filter: Filter) {
const joins: string[] = [];
const conditions: string[] = [];
const parameters: (string | number)[] = [];
const groupBy: string[] = [];
const having: string[] = [];
// get AND tag filters
const andTagQueries = Object.keys(filter).filter(isFilterKeyIndexableAndTag);
// get OR tag filters and remove any ones that appear in the AND
const orTagQueries = Object.keys(filter)
.filter(isFilterKeyIndexableTag)
.filter((t) => !andTagQueries.includes(t));
if (orTagQueries.length > 0) {
joins.push("INNER JOIN tags as or_tags ON events.id = or_tags.event");
}
if (andTagQueries.length > 0) {
joins.push("INNER JOIN tags as and_tags ON events.id = and_tags.event");
}
if (filter.search) {
joins.push("INNER JOIN events_fts ON events_fts.id = events.id");
conditions.push(`events_fts MATCH ?`);
parameters.push('"' + filter.search.replace(/"/g, '""') + '"');
}
if (typeof filter.since === "number") {
conditions.push(`events.created_at >= ?`);
parameters.push(filter.since);
}
if (typeof filter.until === "number") {
conditions.push(`events.created_at < ?`);
parameters.push(filter.until);
}
if (filter.ids) {
conditions.push(`events.id IN ${mapParams(filter.ids)}`);
parameters.push(...filter.ids);
}
if (filter.kinds) {
conditions.push(`events.kind IN ${mapParams(filter.kinds)}`);
parameters.push(...filter.kinds);
}
if (filter.authors) {
conditions.push(`events.pubkey IN ${mapParams(filter.authors)}`);
parameters.push(...filter.authors);
}
// add AND tag filters
for (const t of andTagQueries) {
conditions.push(`and_tags.tag = ?`);
parameters.push(t.slice(1));
// @ts-expect-error
const v = filter[t] as string[];
conditions.push(`and_tags.value IN ${mapParams(v)}`);
parameters.push(...v);
}
// add OR tag filters
for (let t of orTagQueries) {
conditions.push(`or_tags.tag = ?`);
parameters.push(t.slice(1));
// @ts-expect-error
const v = filter[t] as string[];
conditions.push(`or_tags.value IN ${mapParams(v)}`);
parameters.push(...v);
}
// if there is an AND tag filter set GROUP BY so that HAVING can be used
if (andTagQueries.length > 0) {
groupBy.push("events.id");
having.push("COUNT(and_tags.id) = ?");
// @ts-expect-error
parameters.push(andTagQueries.reduce((t, k) => t + (filter[k] as string[]).length, 0));
}
return { conditions, parameters, joins, groupBy, having };
}
export function buildSQLQueryForFilters(filters: Filter[], select = "events.*") {
let stmt = `SELECT ${select} FROM events `;
const orConditions: string[] = [];
const parameters: any[] = [];
const groupBy = new Set<string>();
const having = new Set<string>();
let joins = new Set<string>();
for (const filter of filters) {
const parts = buildConditionsForFilter(filter);
if (parts.conditions.length > 0) {
orConditions.push(`(${parts.conditions.join(" AND ")})`);
parameters.push(...parts.parameters);
for (const join of parts.joins) joins.add(join);
for (const group of parts.groupBy) groupBy.add(group);
for (const have of parts.having) having.add(have);
}
}
stmt += Array.from(joins).join(" ");
if (orConditions.length > 0) {
stmt += ` WHERE ${orConditions.join(" OR ")}`;
}
if (groupBy.size > 0) {
stmt += " GROUP BY " + Array.from(groupBy).join(",");
}
if (having.size > 0) {
stmt += " HAVING " + Array.from(having).join(" AND ");
}
// @ts-expect-error
const order = filters.find((f) => f.order)?.order;
if (filters.some((f) => f.search) && (order === "rank" || order === undefined)) {
stmt = stmt + " ORDER BY rank";
} else {
stmt = stmt + " ORDER BY created_at DESC";
}
let minLimit = Infinity;
for (const filter of filters) {
if (filter.limit) minLimit = Math.min(minLimit, filter.limit);
}
if (minLimit !== Infinity) {
stmt += " LIMIT ?";
parameters.push(minLimit);
}
return { stmt, parameters };
}
// New code using drizzle
// function buildConditionsForFilter(filter: Filter) {
// const conditions: (SQL | undefined)[] = [];
// // Handle tag filters
// const andTagQueries = Object.keys(filter).filter(isFilterKeyIndexableAndTag);
// const orTagQueries = Object.keys(filter)
// .filter(isFilterKeyIndexableTag)
// .filter((t) => !andTagQueries.includes(t));
// if (filter.since) conditions.push(gte(events.createdAt, filter.since));
// if (filter.until) conditions.push(lt(events.createdAt, filter.until));
// if (filter.ids) conditions.push(inArray(events.id, filter.ids));
// if (filter.kinds) conditions.push(inArray(events.kind, filter.kinds));
// if (filter.authors) conditions.push(inArray(events.pubkey, filter.authors));
// // Add tag conditions
// if (orTagQueries.length > 0) {
// const orConditions = orTagQueries.map((t) => {
// // @ts-expect-error
// const values = filter[t] as string[];
// return and(eq(tags.tagag, t.slice(1)), inArray(tags.valuealue, values));
// });
// conditions.push(or(...orConditions));
// }
// if (andTagQueries.length > 0) {
// andTagQueries.forEach((t) => {
// // @ts-expect-error
// const values = filter[t] as string[];
// conditions.push(and(eq(tags.tagag, t.slice(1)), inArray(tags.valuealue, values)));
// });
// }
// return conditions;
// }
// export function buildDrizzleQueryForFilters(filters: (Filter & { order?: "rank" | "createdAt" })[]) {
// const filterConditions = filters.map((filter) => and(...buildConditionsForFilter(filter)));
// let baseQuery = bakeryDatabase.select().from(events).leftJoin(tags, eq(events.id, tags.event));
// if (filterConditions.length > 0) {
// baseQuery = baseQuery.where(or(...filterConditions));
// }
// // Handle ordering
// const order = filters.find((f) => f.order)?.order;
// if (filters.some((f) => f.search) && (!order || order === "rank")) {
// baseQuery = baseQuery.orderBy(sql`rank`);
// } else {
// baseQuery = baseQuery.orderBy(desc(events.createdAt));
// }
// // Handle limit
// const minLimit = Math.min(...filters.map((f) => f.limit || Infinity));
// if (minLimit !== Infinity) {
// baseQuery = baseQuery.limit(minLimit);
// }
// return baseQuery;
// }

58
src/db/schema.ts Normal file
View File

@@ -0,0 +1,58 @@
import { int, sqliteTable, text, index } from "drizzle-orm/sqlite-core";
// Event store tables
export const events = sqliteTable(
"events",
{
id: text("id", { length: 64 }).notNull().primaryKey(),
created_at: int("created_at").notNull(),
pubkey: text("pubkey", { length: 64 }).notNull(),
sig: text("sig").notNull(),
kind: int("kind").notNull(),
content: text("content").notNull(),
tags: text("tags").notNull(),
identifier: text("identifier"),
},
(table) => [
index("created_at").on(table.created_at),
index("pubkey").on(table.pubkey),
index("kind").on(table.kind),
index("identifier").on(table.identifier),
],
);
// Event tags table
export const tags = sqliteTable(
"tags",
{
id: int("id").primaryKey({ autoIncrement: true }),
event: text("event", { length: 64 })
.references(() => events.id)
.notNull(),
tag: text("tag", { length: 1 }).notNull(),
value: text("value").notNull(),
},
(table) => [index("event").on(table.event), index("tag").on(table.tag), index("value").on(table.value)],
);
// Decryption cache tables
export const decryptionCache = sqliteTable("decryption_cache", {
event: text("event", { length: 64 })
.references(() => events.id)
.notNull()
.primaryKey(),
content: text("content").notNull(),
});
// Log store tables
export const logs = sqliteTable("logs", {
id: text("id").primaryKey(),
timestamp: int("timestamp"),
service: text("service").notNull(),
message: text("message").notNull(),
});
export const applicationState = sqliteTable("application_state", {
id: text("id").primaryKey().notNull(),
state: text("state"),
});

View File

@@ -0,0 +1,96 @@
import { type Database } from "better-sqlite3";
import { NostrEvent } from "nostr-tools";
import { logger } from "../../logger.js";
import { hasTable, parseEventRow } from "../helpers.js";
import * as schema from "../schema.js";
import { HiddenContentSymbol } from "applesauce-core/helpers";
const log = logger.extend("Database:Search:Decrypted");
export function setupDecryptedFts(database: Database) {
// Skip if search table already exists
if (hasTable(database, "decryption_cache_fts")) return;
database
.prepare(
`CREATE VIRTUAL TABLE IF NOT EXISTS decryption_cache_fts USING fts5(content, content='decryption_cache', tokenize='trigram')`,
)
.run();
log(`Created decryption cache search table`);
// create triggers to sync table
database
.prepare(
`
CREATE TRIGGER IF NOT EXISTS decryption_cache_ai AFTER INSERT ON decryption_cache BEGIN
INSERT INTO decryption_cache_fts(rowid, content) VALUES (NEW.rowid, NEW.content);
END;
`,
)
.run();
database
.prepare(
`
CREATE TRIGGER IF NOT EXISTS decryption_cache_ad AFTER DELETE ON decryption_cache BEGIN
INSERT INTO decryption_cache_ai(decryption_cache_ai, rowid, content) VALUES('delete', OLD.rowid, OLD.content);
END;
`,
)
.run();
// populate table
const inserted = database
.prepare(`INSERT INTO decryption_cache_fts (rowid, content) SELECT rowid, content FROM decryption_cache`)
.run();
log(`Indexed ${inserted.changes} decrypted events in search table`);
}
export function searchDecrypted(
database: Database,
search: string,
filter?: { conversation?: [string, string]; order?: "rank" | "created_at" },
): NostrEvent[] {
const params: any[] = [];
const andConditions: string[] = [];
let sql = `SELECT events.*, decryption_cache.content as plaintext FROM decryption_cache_fts
INNER JOIN decryption_cache ON decryption_cache_fts.rowid = decryption_cache.rowid
INNER JOIN events ON decryption_cache.event = events.id`;
andConditions.push("decryption_cache_fts MATCH ?");
params.push(search);
// filter down by authors
if (filter?.conversation) {
sql += `\nINNER JOIN tags ON tag.e = events.id AND tags.t = 'p'`;
andConditions.push(`(tags.v = ? AND events.pubkey = ?) OR (tags.v = ? AND events.pubkey = ?)`);
params.push(...filter.conversation, ...Array.from(filter.conversation).reverse());
}
if (andConditions.length > 0) {
sql += ` WHERE ${andConditions.join(" AND ")}`;
}
switch (filter?.order) {
case "rank":
sql += " ORDER BY rank";
break;
case "created_at":
default:
sql += " ORDER BY events.created_at DESC";
break;
}
return database
.prepare<any[], typeof schema.events.$inferSelect & { plaintext: string }>(sql)
.all(...params)
.map((row) => {
// Create the event object and add the hidden content
const event = parseEventRow(row);
Reflect.set(event, HiddenContentSymbol, row.plaintext);
return event;
});
}

101
src/db/search/events.ts Normal file
View File

@@ -0,0 +1,101 @@
import { kinds, NostrEvent } from "nostr-tools";
import { type Database } from "better-sqlite3";
import * as schema from "../schema.js";
import { logger } from "../../logger.js";
import { hasTable, parseEventRow } from "../helpers.js";
import { mapParams } from "../../helpers/sql.js";
const log = logger.extend("Database:Search:Events");
const SEARCHABLE_TAGS = ["title", "description", "about", "summary", "alt"];
const SEARCHABLE_KIND_BLACKLIST = [kinds.EncryptedDirectMessage];
const SEARCHABLE_CONTENT_FORMATTERS: Record<number, (content: string) => string> = {
[kinds.Metadata]: (content) => {
const SEARCHABLE_PROFILE_FIELDS = [
"name",
"display_name",
"about",
"nip05",
"lud16",
"website",
// Deprecated fields
"displayName",
"username",
];
try {
const lines: string[] = [];
const json = JSON.parse(content);
for (const field of SEARCHABLE_PROFILE_FIELDS) {
if (json[field]) lines.push(json[field]);
}
return lines.join("\n");
} catch (error) {
return content;
}
},
};
function convertEventToSearchRow(event: NostrEvent) {
const tags = event.tags
.filter((t) => SEARCHABLE_TAGS.includes(t[0]))
.map((t) => t[1])
.join(" ");
const content = SEARCHABLE_CONTENT_FORMATTERS[event.kind]
? SEARCHABLE_CONTENT_FORMATTERS[event.kind](event.content)
: event.content;
return { id: event.id, content, tags };
}
export function setupEventFts(database: Database) {
// Skip if search table already exists
if (hasTable(database, "events_fts")) return;
database
.prepare(
`CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(id UNINDEXED, content, tags, tokenize='trigram')`,
)
.run();
log("Created event search table");
const events = database
.prepare<number[], typeof schema.events.$inferSelect>(
`SELECT * FROM events WHERE kind NOT IN ${mapParams(SEARCHABLE_KIND_BLACKLIST)}`,
)
.all(...SEARCHABLE_KIND_BLACKLIST)
.map(parseEventRow);
// insert search content into table
let changes = 0;
for (const event of events) {
const search = convertEventToSearchRow(event);
// manually insert into fts table
const result = database
.prepare<[string, string, string]>(`INSERT OR REPLACE INTO events_fts (id, content, tags) VALUES (?, ?, ?)`)
.run(search.id, search.content, search.tags);
changes += result.changes;
}
log(`Inserted ${changes} events into search table`);
}
export function insertEventIntoSearch(database: Database, event: NostrEvent): boolean {
const search = convertEventToSearchRow(event);
const result = database
.prepare<[string, string, string]>(`INSERT OR REPLACE INTO events_fts (id, content, tags) VALUES (?, ?, ?)`)
.run(search.id, search.content, search.tags);
return result.changes > 0;
}
export function removeEventsFromSearch(database: Database, events: string[]): boolean {
const result = database.prepare<string[]>(`DELETE FROM events_fts WHERE id IN ${mapParams(events)}`).run(...events);
return result.changes > 0;
}

View File

@@ -13,6 +13,8 @@ export const PUBLIC_ADDRESS = process.env.PUBLIC_ADDRESS;
export const DATA_PATH = process.env.DATA_PATH || join(homedir(), ".bakery"); export const DATA_PATH = process.env.DATA_PATH || join(homedir(), ".bakery");
await mkdirp(DATA_PATH); await mkdirp(DATA_PATH);
export const DATABASE = join(DATA_PATH, "bakery.db");
export const BAKERY_PORT = parseInt(args.values.port ?? process.env.BAKERY_PORT ?? "") || DEFAULT_PORT; export const BAKERY_PORT = parseInt(args.values.port ?? process.env.BAKERY_PORT ?? "") || DEFAULT_PORT;
// I2P config // I2P config

View File

@@ -14,7 +14,9 @@ import App from "./app/index.js";
import { PUBLIC_ADDRESS, IS_MCP } from "./env.js"; import { PUBLIC_ADDRESS, IS_MCP } from "./env.js";
import { addListener, logger } from "./logger.js"; import { addListener, logger } from "./logger.js";
import { pathExists } from "./helpers/fs.js"; import { pathExists } from "./helpers/fs.js";
import "./services/owner.js"; import stateManager from "./services/app-state.js";
import bakeryDatabase from "./db/database.js";
import logStore from "./services/log-store.js";
// add durations plugin // add durations plugin
dayjs.extend(duration); dayjs.extend(duration);
@@ -81,9 +83,19 @@ if (IS_MCP) {
// shutdown process // shutdown process
async function shutdown() { async function shutdown() {
logger("shutting down"); logger("Shutting down...");
// Stop the app
await app.stop(); await app.stop();
// Save the application state
stateManager.saveAll();
// Stop writing the logs to the database
logStore.close();
// Close the database last
bakeryDatabase.$client.close();
process.exit(0); process.exit(0);
} }
process.on("SIGINT", shutdown); process.on("SIGINT", shutdown);

View File

@@ -0,0 +1,94 @@
import { BehaviorSubject, Subject, tap, throttleTime } from "rxjs";
import { eq } from "drizzle-orm";
import { BakeryDatabase } from "../../db/database.js";
import { schema } from "../../db/index.js";
import { logger } from "../../logger.js";
function createMutableState<T extends object>(
database: BakeryDatabase,
key: string,
initialState: T,
throttle = 1000,
): T {
const existing = database.select().from(schema.applicationState).where(eq(schema.applicationState.id, key)).get();
// Use json.parse to create a new object
const state = JSON.parse(existing?.state || JSON.stringify(initialState)) as T;
// Save the state if it doesn't exist
if (!existing)
database
.insert(schema.applicationState)
.values({ id: key, state: JSON.stringify(state) })
.run();
const dirty = new BehaviorSubject(false);
const save = new Subject<T>();
// only save the state every x ms
save
.pipe(
tap(() => dirty.value === false && dirty.next(true)),
throttleTime(throttle),
)
.subscribe((state) => {
database
.update(schema.applicationState)
.set({ state: JSON.stringify(state) })
.where(eq(schema.applicationState.id, key))
.run();
dirty.next(false);
});
return new Proxy(state, {
get(target, prop, receiver) {
return Reflect.get(target, prop, receiver);
},
set(target, prop, value, receiver) {
Reflect.set(target, prop, value, receiver);
save.next(target);
return true;
},
deleteProperty(target, prop) {
Reflect.deleteProperty(target, prop);
save.next(target);
return true;
},
ownKeys(target) {
return Reflect.ownKeys(target);
},
getOwnPropertyDescriptor(target, prop) {
return Reflect.getOwnPropertyDescriptor(target, prop);
},
});
}
export default class ApplicationStateManager {
protected log = logger.extend("State");
protected mutableState = new Map<string, any>();
constructor(public database: BakeryDatabase) {}
getMutableState<T extends object>(key: string, initialState: T): T {
const existing = this.mutableState.get(key);
if (existing) return existing as T;
this.log(`Loading state for ${key}`);
const state = createMutableState(this.database, key, initialState);
this.mutableState.set(key, state);
return state;
}
saveAll() {
this.log("Saving all application states");
for (const [key, state] of this.mutableState.entries()) {
this.database
.update(schema.applicationState)
.set({ state: JSON.stringify(state) })
.where(eq(schema.applicationState.id, key))
.run();
}
}
}

View File

@@ -1,65 +0,0 @@
import { WebSocket } from "ws";
import { DatabaseMessage, DatabaseResponse, DatabaseStats } from "@satellite-earth/core/types/control-api/database.js";
import App from "../../app/index.js";
import { ControlMessageHandler } from "./control-api.js";
export default class DatabaseActions implements ControlMessageHandler {
app: App;
name = "DATABASE";
subscribed = new Set<WebSocket | NodeJS.Process>();
constructor(app: App) {
this.app = app;
// update all subscribed sockets every 5 seconds
let last: DatabaseStats | undefined = undefined;
setInterval(() => {
const stats = this.getStats();
if (stats.count !== last?.count || stats.size !== last.size) {
for (const sock of this.subscribed) {
this.send(sock, ["CONTROL", "DATABASE", "STATS", stats]);
}
}
last = stats;
}, 5_000);
}
private getStats() {
const count = this.app.database.count();
const size = this.app.database.size();
return { count, size };
}
handleMessage(sock: WebSocket | NodeJS.Process, message: DatabaseMessage): boolean {
const action = message[2];
switch (action) {
case "SUBSCRIBE":
this.subscribed.add(sock);
sock.once("close", () => this.subscribed.delete(sock));
this.send(sock, ["CONTROL", "DATABASE", "STATS", this.getStats()]);
return true;
case "UNSUBSCRIBE":
this.subscribed.delete(sock);
return true;
case "STATS":
this.send(sock, ["CONTROL", "DATABASE", "STATS", this.getStats()]);
return true;
case "CLEAR":
this.app.database.clear();
return true;
default:
return false;
}
}
send(sock: WebSocket | NodeJS.Process, response: DatabaseResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -23,20 +23,15 @@ export default class DecryptionCacheActions implements ControlMessageHandler {
this.app.decryptionCache.addEventContent(message[3], message[4]); this.app.decryptionCache.addEventContent(message[3], message[4]);
return true; return true;
case "CLEAR-PUBKEY":
this.app.decryptionCache.clearPubkey(message[3]);
return true;
case "CLEAR": case "CLEAR":
this.app.decryptionCache.clearAll(); this.app.decryptionCache.clearAll();
return true; return true;
case "REQUEST": case "REQUEST":
this.app.decryptionCache.getEventsContent(message[3]).then((contents) => { const contents = this.app.decryptionCache.getEventsContent(message[3]);
for (const { event, content } of contents) for (const { event, content } of contents)
this.send(sock, ["CONTROL", "DECRYPTION-CACHE", "CONTENT", event, content]); this.send(sock, ["CONTROL", "DECRYPTION-CACHE", "CONTENT", event, content]);
this.send(sock, ["CONTROL", "DECRYPTION-CACHE", "END"]); this.send(sock, ["CONTROL", "DECRYPTION-CACHE", "END"]);
});
return true; return true;
default: default:

View File

@@ -1,153 +1,50 @@
import { MigrationSet } from "../../sqlite/migrations.js"; import { eq, inArray } from "drizzle-orm";
import { type Database } from "better-sqlite3"; import { getHiddenContent } from "applesauce-core/helpers";
import { EventEmitter } from "events"; import { EventEmitter } from "events";
import { EventRow, parseEventRow } from "../../sqlite/event-store.js";
import { logger } from "../../logger.js"; import { logger } from "../../logger.js";
import { NostrEvent } from "nostr-tools"; import { schema, type BakeryDatabase } from "../../db/index.js";
import { mapParams } from "../../helpers/sql.js"; import { searchDecrypted } from "../../db/search/decrypted.js";
const migrations = new MigrationSet("decryption-cache");
// Version 1
migrations.addScript(1, async (db, log) => {
db.prepare(
`
CREATE TABLE "decryption_cache" (
"event" TEXT(64) NOT NULL,
"content" TEXT NOT NULL,
PRIMARY KEY("event")
);
`,
).run();
});
// Version 2, search
migrations.addScript(2, async (db, log) => {
// create external Content fts5 table
db.prepare(
`CREATE VIRTUAL TABLE IF NOT EXISTS decryption_cache_fts USING fts5(content, content='decryption_cache', tokenize='trigram')`,
).run();
log(`Created decryption cache search table`);
// create triggers to sync table
db.prepare(
`
CREATE TRIGGER IF NOT EXISTS decryption_cache_ai AFTER INSERT ON decryption_cache BEGIN
INSERT INTO decryption_cache_fts(rowid, content) VALUES (NEW.rowid, NEW.content);
END;
`,
).run();
db.prepare(
`
CREATE TRIGGER IF NOT EXISTS decryption_cache_ad AFTER DELETE ON decryption_cache BEGIN
INSERT INTO decryption_cache_ai(decryption_cache_ai, rowid, content) VALUES('delete', OLD.rowid, OLD.content);
END;
`,
).run();
// populate table
const inserted = db
.prepare(`INSERT INTO decryption_cache_fts (rowid, content) SELECT rowid, content FROM decryption_cache`)
.run();
log(`Indexed ${inserted.changes} decrypted events in search table`);
});
type EventMap = { type EventMap = {
cache: [string, string]; cache: [string, string];
}; };
export default class DecryptionCache extends EventEmitter<EventMap> { export default class DecryptionCache extends EventEmitter<EventMap> {
database: Database;
log = logger.extend("DecryptionCache"); log = logger.extend("DecryptionCache");
constructor(database: Database) { constructor(public database: BakeryDatabase) {
super(); super();
this.database = database;
}
setup() {
return migrations.run(this.database);
} }
/** cache the decrypted content of an event */ /** cache the decrypted content of an event */
addEventContent(id: string, plaintext: string) { addEventContent(event: string, plaintext: string) {
const result = this.database const result = this.database.insert(schema.decryptionCache).values({ event: event, content: plaintext }).run();
.prepare<[string, string]>(`INSERT INTO decryption_cache (event, content) VALUES (?, ?)`)
.run(id, plaintext);
if (result.changes > 0) { if (result.changes > 0) {
this.log(`Saved content for ${id}`); this.log(`Saved content for ${event}`);
this.emit("cache", event, plaintext);
this.emit("cache", id, plaintext);
} }
} }
/** remove all cached content relating to a pubkey */ search(query: string, filter?: { conversation?: [string, string]; order?: "rank" | "created_at" }) {
clearPubkey(pubkey: string) { return searchDecrypted(this.database.$client, query, filter).map((event) => ({
// this.database.prepare<string>(`DELETE FROM decryption_cache INNER JOIN events ON event=events.id`) event,
plaintext: getHiddenContent(event)!,
}));
} }
/** clear all cached content */ /** clear all cached content */
clearAll() { clearAll() {
this.database.prepare(`DELETE FROM decryption_cache`).run(); this.database.delete(schema.decryptionCache).run();
} }
async search( getEventContent(id: string): string | undefined {
search: string, return this.database.select().from(schema.decryptionCache).where(eq(schema.decryptionCache.event, id)).get()
filter?: { conversation?: [string, string]; order?: "rank" | "created_at" }, ?.content;
): Promise<{ event: NostrEvent; plaintext: string }[]> {
const params: any[] = [];
const andConditions: string[] = [];
let sql = `SELECT events.*, decryption_cache.content as plaintext FROM decryption_cache_fts
INNER JOIN decryption_cache ON decryption_cache_fts.rowid = decryption_cache.rowid
INNER JOIN events ON decryption_cache.event = events.id`;
andConditions.push("decryption_cache_fts MATCH ?");
params.push(search);
// filter down by authors
if (filter?.conversation) {
sql += `\nINNER JOIN tags ON tag.e = events.id AND tags.t = 'p'`;
andConditions.push(`(tags.v = ? AND events.pubkey = ?) OR (tags.v = ? AND events.pubkey = ?)`);
params.push(...filter.conversation, ...Array.from(filter.conversation).reverse());
} }
if (andConditions.length > 0) { getEventsContent(ids: string[]): (typeof schema.decryptionCache.$inferSelect)[] {
sql += ` WHERE ${andConditions.join(" AND ")}`; return this.database.select().from(schema.decryptionCache).where(inArray(schema.decryptionCache.event, ids)).all();
}
switch (filter?.order) {
case "rank":
sql += " ORDER BY rank";
break;
case "created_at":
default:
sql += " ORDER BY events.created_at DESC";
break;
}
return this.database
.prepare<any[], EventRow & { plaintext: string }>(sql)
.all(...params)
.map((row) => ({ event: parseEventRow(row), plaintext: row.plaintext }));
}
async getEventContent(id: string) {
const result = this.database
.prepare<[string], { event: string; content: string }>(`SELECT * FROM decryption_cache WHERE event=?`)
.get(id);
return result?.content;
}
async getEventsContent(ids: string[]) {
return this.database
.prepare<
string[],
{ event: string; content: string }
>(`SELECT * FROM decryption_cache WHERE event IN ${mapParams(ids)}`)
.all(...ids);
} }
} }

View File

@@ -1,59 +1,32 @@
import { type Database as SQLDatabase } from "better-sqlite3"; import { bufferTime, filter, firstValueFrom, Subject, Subscription } from "rxjs";
import EventEmitter from "events"; import { gte, lte, like, and } from "drizzle-orm";
import { nanoid } from "nanoid"; import { nanoid } from "nanoid";
import { Debugger } from "debug";
import { logger } from "../../logger.js"; import { BakeryDatabase } from "../../db/database.js";
import { MigrationSet } from "../../sqlite/migrations.js"; import { schema } from "../../db/index.js";
type EventMap = { export type LogFilter = {
log: [LogEntry]; service?: string;
clear: [string | undefined]; since?: number;
until?: number;
}; };
export type LogEntry = { export default class LogStore {
id: string; public insert$ = new Subject<typeof schema.logs.$inferInsert>();
service: string; protected write$ = new Subject<typeof schema.logs.$inferInsert>();
timestamp: number;
message: string;
};
export type DatabaseLogEntry = LogEntry & {
id: number | bigint;
};
const migrations = new MigrationSet("log-store"); protected writeQueue: Subscription;
// version 1 constructor(public database: BakeryDatabase) {
migrations.addScript(1, async (db, log) => { // Buffer writes to the database
db.prepare( this.writeQueue = this.write$
` .pipe(
CREATE TABLE IF NOT EXISTS "logs" ( bufferTime(1000, null, 5000),
"id" TEXT NOT NULL UNIQUE, filter((entries) => entries.length > 0),
"timestamp" INTEGER NOT NULL, )
"service" TEXT NOT NULL, .subscribe((entries) => {
"message" TEXT NOT NULL, this.database.insert(schema.logs).values(entries).run();
PRIMARY KEY("id")
);
`,
).run();
log("Created logs table");
db.prepare("CREATE INDEX IF NOT EXISTS logs_service ON logs(service)");
log("Created logs service index");
}); });
export default class LogStore extends EventEmitter<EventMap> {
database: SQLDatabase;
debug: Debugger;
constructor(database: SQLDatabase) {
super();
this.database = database;
this.debug = logger;
}
async setup() {
return await migrations.run(this.database);
} }
addEntry(service: string, timestamp: Date | number, message: string) { addEntry(service: string, timestamp: Date | number, message: string) {
@@ -65,99 +38,37 @@ export default class LogStore extends EventEmitter<EventMap> {
message, message,
}; };
this.queue.push(entry); this.write$.next(entry);
this.emit("log", entry); this.insert$.next(entry);
if (!this.running) this.write();
} }
running = false; getLogs(filter?: LogFilter & { limit?: number }) {
queue: LogEntry[] = []; return this.database
private write() { .select()
if (this.running) return; .from(schema.logs)
this.running = true; .where(({ service, timestamp }) => {
const conditions = [];
const BATCH_SIZE = 5000; if (filter?.service) conditions.push(like(service, `${filter.service}%`));
if (filter?.since) conditions.push(gte(timestamp, filter.since));
const inserted: (number | bigint)[] = []; if (filter?.until) conditions.push(lte(timestamp, filter.until));
const failed: LogEntry[] = []; return and(...conditions);
})
this.database.transaction(() => { .limit(filter?.limit ?? -1)
let i = 0; .all();
while (this.queue.length) {
const entry = this.queue.shift()!;
try {
const { lastInsertRowid } = this.database
.prepare<
[string, string, number, string]
>(`INSERT INTO "logs" (id, service, timestamp, message) VALUES (?, ?, ?, ?)`)
.run(entry.id, entry.service, entry.timestamp, entry.message);
inserted.push(lastInsertRowid);
} catch (error) {
failed.push(entry);
} }
if (++i >= BATCH_SIZE) break; clearLogs(filter?: LogFilter) {
} const conditions = [];
})(); if (filter?.service) conditions.push(like(schema.logs.service, `${filter.service}%`));
if (filter?.since) conditions.push(gte(schema.logs.timestamp, filter.since));
if (filter?.until) conditions.push(lte(schema.logs.timestamp, filter.until));
const where = and(...conditions);
for (const entry of failed) { this.database.delete(schema.logs).where(where).run();
// Don't know what to do here...
} }
if (this.queue.length > 0) setTimeout(this.write.bind(this), 1000); close() {
else this.running = false; // stop writing to the database
} this.writeQueue.unsubscribe();
getLogs(filter?: { service?: string; since?: number; until?: number; limit?: number }) {
const conditions: string[] = [];
const parameters: (string | number)[] = [];
let sql = `SELECT * FROM logs`;
if (filter?.service) {
conditions.push(`service LIKE CONCAT(?,'%')`);
parameters.push(filter?.service);
}
if (filter?.since) {
conditions.push("timestamp>=?");
parameters.push(filter?.since);
}
if (filter?.until) {
conditions.push("timestamp<=?");
parameters.push(filter?.until);
}
if (conditions.length > 0) sql += ` WHERE ${conditions.join(" AND ")}`;
if (filter?.limit) {
sql += " LIMIT ?";
parameters.push(filter.limit);
}
return this.database.prepare<any[], DatabaseLogEntry>(sql).all(...parameters);
}
clearLogs(filter?: { service?: string; since?: number; until?: number }) {
const conditions: string[] = [];
const parameters: (string | number)[] = [];
let sql = `DELETE FROM logs`;
if (filter?.service) {
conditions.push("service=?");
parameters.push(filter?.service);
}
if (filter?.since) {
conditions.push("timestamp>=?");
parameters.push(filter?.since);
}
if (filter?.until) {
conditions.push("timestamp<=?");
parameters.push(filter?.until);
}
if (conditions.length > 0) sql += ` WHERE ${conditions.join(" AND ")}`;
this.database.prepare<any[]>(sql).run(parameters);
this.emit("clear", filter?.service);
} }
} }

View File

@@ -7,7 +7,7 @@ import webPush from "web-push";
import { logger } from "../../logger.js"; import { logger } from "../../logger.js";
import App from "../../app/index.js"; import App from "../../app/index.js";
import stateManager from "../../services/state.js"; import stateManager from "../../services/app-state.js";
import bakeryConfig from "../../services/config.js"; import bakeryConfig from "../../services/config.js";
import { getDMRecipient, getDMSender } from "../../helpers/direct-messages.js"; import { getDMRecipient, getDMSender } from "../../helpers/direct-messages.js";
@@ -40,9 +40,9 @@ export default class NotificationsManager extends EventEmitter<EventMap> {
} }
async setup() { async setup() {
this.state = ( this.state = stateManager.getMutableState<NotificationsManagerState>("notification-manager", {
await stateManager.getMutableState<NotificationsManagerState>("notification-manager", { channels: [] }) channels: [],
).proxy; });
} }
addOrUpdateChannel(channel: NotificationChannel) { addOrUpdateChannel(channel: NotificationChannel) {

View File

@@ -1,12 +1,4 @@
import { Observable } from "rxjs";
import { Query } from "../types.js"; import { Query } from "../types.js";
import bakeryConfig, { BakeryConfig } from "../../../services/config.js"; import bakeryConfig, { BakeryConfig } from "../../../services/config.js";
export const ConfigQuery: Query<BakeryConfig> = () => export const ConfigQuery: Query<BakeryConfig> = () => bakeryConfig.data$;
new Observable((observer) => {
observer.next(bakeryConfig.data);
const listener = (c: BakeryConfig) => observer.next(c);
bakeryConfig.on("updated", listener);
return () => bakeryConfig.off("updated", listener);
});

View File

@@ -1,9 +1,10 @@
import { filter, from, fromEvent, merge } from "rxjs"; import { filter, from, merge } from "rxjs";
import { Query } from "../types.js"; import { Query } from "../types.js";
import logStore from "../../../services/log-store.js"; import logStore from "../../../services/log-store.js";
import { LogEntry } from "../../log-store/log-store.js"; import { schema } from "../../../db/index.js";
export const LogsQuery: Query<LogEntry> = (args: { export const LogsQuery: Query<typeof schema.logs.$inferSelect> = (args: {
service?: string; service?: string;
since?: number; since?: number;
until?: number; until?: number;
@@ -13,7 +14,7 @@ export const LogsQuery: Query<LogEntry> = (args: {
// get last 500 lines // get last 500 lines
from(logStore.getLogs({ service: args.service, limit: 500 })), from(logStore.getLogs({ service: args.service, limit: 500 })),
// subscribe to new logs // subscribe to new logs
fromEvent<LogEntry>(logStore, "log").pipe( logStore.insert$.pipe(
// only return logs that match args // only return logs that match args
filter((entry) => { filter((entry) => {
return !args?.service || entry.service === args.service; return !args?.service || entry.service === args.service;

View File

@@ -1,6 +1,16 @@
import { from, merge, NEVER } from "rxjs"; import { from, merge, NEVER } from "rxjs";
import database from "../../../services/database.js";
import { Query } from "../types.js"; import { Query } from "../types.js";
import bakeryDatabase, { schema } from "../../../db/index.js";
export const ServicesQuery: Query<string[]> = () => export const ServicesQuery: Query<string[]> = () =>
merge(NEVER, from(database.db.prepare<[], { id: string }>(`SELECT service as id FROM logs GROUP BY service`).all())); merge(
NEVER,
from(
bakeryDatabase
.select()
.from(schema.logs)
.groupBy(schema.logs.service)
.all()
.map((row) => row.service),
),
);

View File

@@ -37,7 +37,7 @@ export default class Scrapper extends EventEmitter<EventMap> {
} }
async setup() { async setup() {
this.state = (await this.app.state.getMutableState<ScrapperState>("scrapper", { pubkeys: [] })).proxy; this.state = this.app.state.getMutableState<ScrapperState>("scrapper", { pubkeys: [] });
} }
async ensureData() { async ensureData() {

View File

@@ -56,7 +56,7 @@ export default class PubkeyScrapper extends EventEmitter<EventMap> {
`${this.pubkey}|${relay.url}`, `${this.pubkey}|${relay.url}`,
{}, {},
); );
if (state) scrapper.state = state.proxy; if (state) scrapper.state = state;
this.relayScrappers.set(url, scrapper); this.relayScrappers.set(url, scrapper);
} }

View File

@@ -1,49 +0,0 @@
import { Database } from "better-sqlite3";
import { MutableState } from "./mutable-state.js";
import { MigrationSet } from "../../sqlite/migrations.js";
const migrations = new MigrationSet("application-state");
migrations.addScript(1, async (db, log) => {
db.prepare(
`
CREATE TABLE "application_state" (
"id" TEXT NOT NULL,
"state" TEXT,
PRIMARY KEY("id")
);
`,
).run();
log("Created application state table");
});
export default class ApplicationStateManager {
private mutableState = new Map<string, MutableState<any>>();
database: Database;
constructor(database: Database) {
this.database = database;
}
async setup() {
await migrations.run(this.database);
}
async getMutableState<T extends object>(key: string, initialState: T) {
const cached = this.mutableState.get(key);
if (cached) return cached as MutableState<T>;
const state = new MutableState<T>(this.database, key, initialState);
await state.read();
this.mutableState.set(key, state);
return state;
}
async saveAll() {
for (const [key, state] of this.mutableState) {
await state.save();
}
}
}

View File

@@ -1,91 +0,0 @@
import { EventEmitter } from "events";
import { Database } from "better-sqlite3";
import _throttle from "lodash.throttle";
import { Debugger } from "debug";
import { logger } from "../../logger.js";
type EventMap<T> = {
/** fires when file is loaded */
loaded: [T];
/** fires when a field is set */
changed: [T, string, any];
/** fires when state is loaded or changed */
updated: [T];
saved: [T];
};
export class MutableState<T extends object> extends EventEmitter<EventMap<T>> {
state?: T;
log: Debugger;
private _proxy?: T;
/** A Proxy object that will automatically save when mutated */
get proxy() {
if (!this._proxy) throw new Error("Cant access state before initialized");
return this._proxy;
}
key: string;
database: Database;
constructor(database: Database, key: string, initialState: T) {
super();
this.state = initialState;
this.key = key;
this.database = database;
this.log = logger.extend(`State:` + key);
this.createProxy();
}
private createProxy() {
if (!this.state) return;
return (this._proxy = new Proxy(this.state, {
get(target, prop, receiver) {
return Reflect.get(target, prop, receiver);
},
set: (target, p, newValue, receiver) => {
Reflect.set(target, p, newValue, receiver);
this.emit("changed", target as T, String(p), newValue);
this.emit("updated", target as T);
this.throttleSave();
return newValue;
},
}));
}
private throttleSave = _throttle(this.save.bind(this), 30_000);
async read() {
const row = await this.database
.prepare<[string], { id: string; state: string }>(`SELECT id, state FROM application_state WHERE id=?`)
.get(this.key);
const state: T | undefined = row ? (JSON.parse(row.state) as T) : undefined;
if (state && this.state) {
Object.assign(this.state, state);
this.log("Loaded");
}
if (!this.state) throw new Error(`Missing initial state for ${this.key}`);
this.createProxy();
if (this.state) {
this.emit("loaded", this.state);
this.emit("updated", this.state);
}
}
async save() {
if (!this.state) return;
await this.database
.prepare<[string, string]>(`INSERT OR REPLACE INTO application_state (id, state) VALUES (?, ?)`)
.run(this.key, JSON.stringify(this.state));
this.log("Saved");
this.emit("saved", this.state);
}
}

View File

@@ -0,0 +1,6 @@
import ApplicationStateManager from "../modules/application-state/manager.js";
import bakeryDatabase from "../db/index.js";
const stateManager = new ApplicationStateManager(bakeryDatabase);
export default stateManager;

View File

@@ -1,7 +0,0 @@
import LocalDatabase from "../app/database.js";
import { DATA_PATH } from "../env.js";
// setup database
const database = new LocalDatabase({ directory: DATA_PATH });
export default database;

View File

@@ -1,7 +1,6 @@
import bakeryDatabase from "../db/database.js";
import { SQLiteEventStore } from "../sqlite/event-store.js"; import { SQLiteEventStore } from "../sqlite/event-store.js";
import database from "./database.js";
const eventCache = new SQLiteEventStore(database.db); const eventCache = new SQLiteEventStore(bakeryDatabase);
await eventCache.setup();
export default eventCache; export default eventCache;

View File

@@ -1,7 +1,6 @@
import LogStore from "../modules/log-store/log-store.js"; import LogStore from "../modules/log-store/log-store.js";
import database from "./database.js"; import bakeryDatabase from "../db/index.js";
const logStore = new LogStore(database.db); const logStore = new LogStore(bakeryDatabase);
await logStore.setup();
export default logStore; export default logStore;

View File

@@ -1,12 +1,14 @@
import database from "../../database.js"; import { count } from "drizzle-orm";
import mcpServer from "../server.js"; import mcpServer from "../server.js";
import bakeryDatabase, { schema } from "../../../db/index.js";
mcpServer.tool("get_database_stats", "Get the total number of events in the database", {}, async () => { mcpServer.tool("get_database_stats", "Get the total number of events in the database", {}, async () => {
const { events } = database.db.prepare<[], { events: number }>(`SELECT COUNT(*) AS events FROM events`).get() || {}; const events = await bakeryDatabase.$count(schema.events);
const { users } = const { users } =
database.db.prepare<[], { users: number }>(`SELECT COUNT(*) AS users FROM events GROUP BY pubkey`).get() || {}; bakeryDatabase.select({ users: count() }).from(schema.events).groupBy(schema.events.pubkey).get() || {};
return { return {
content: [{ type: "text", text: [`Total events: ${events ?? 0}`, `Total users: ${users ?? 0}`].join("\n") }], content: [{ type: "text", text: [`Total events: ${events}`, `Total users: ${users ?? 0}`].join("\n") }],
}; };
}); });

View File

@@ -1,7 +0,0 @@
import ApplicationStateManager from "../modules/state/application-state-manager.js";
import database from "./database.js";
const stateManager = new ApplicationStateManager(database.db);
await stateManager.setup();
export default stateManager;

View File

@@ -1,184 +1,20 @@
import { ISyncEventStore } from "applesauce-core"; import { ISyncEventStore } from "applesauce-core";
import { Database } from "better-sqlite3";
import { Filter, NostrEvent, kinds } from "nostr-tools"; import { Filter, NostrEvent, kinds } from "nostr-tools";
import { and, eq, inArray, isNull } from "drizzle-orm";
import EventEmitter from "events"; import EventEmitter from "events";
import { mapParams } from "../helpers/sql.js";
import { logger } from "../logger.js"; import { logger } from "../logger.js";
import { MigrationSet } from "../sqlite/migrations.js"; import { insertEventIntoSearch, removeEventsFromSearch } from "../db/search/events.js";
import { BakeryDatabase, schema } from "../db/index.js";
const isFilterKeyIndexableTag = (key: string) => { import { parseEventRow } from "../db/helpers.js";
return key[0] === "#" && key.length === 2; import {
}; addressableHistoryQuery,
const isFilterKeyIndexableAndTag = (key: string) => { addressableQuery,
return key[0] === "&" && key.length === 2; buildSQLQueryForFilters,
}; eventQuery,
replaceableHistoryQuery,
export type EventRow = { replaceableQuery,
id: string; } from "../db/queries.js";
kind: number;
pubkey: string;
content: string;
tags: string;
created_at: number;
sig: string;
d?: string;
};
export function parseEventRow(row: EventRow): NostrEvent {
return { ...row, tags: JSON.parse(row.tags) };
}
// search behavior
const SEARCHABLE_TAGS = ["title", "description", "about", "summary", "alt"];
const SEARCHABLE_KIND_BLACKLIST = [kinds.EncryptedDirectMessage];
const SEARCHABLE_CONTENT_FORMATTERS: Record<number, (content: string) => string> = {
[kinds.Metadata]: (content) => {
const SEARCHABLE_PROFILE_FIELDS = [
"name",
"display_name",
"about",
"nip05",
"lud16",
"website",
// Deprecated fields
"displayName",
"username",
];
try {
const lines: string[] = [];
const json = JSON.parse(content);
for (const field of SEARCHABLE_PROFILE_FIELDS) {
if (json[field]) lines.push(json[field]);
}
return lines.join("\n");
} catch (error) {
return content;
}
},
};
function convertEventToSearchRow(event: NostrEvent) {
const tags = event.tags
.filter((t) => SEARCHABLE_TAGS.includes(t[0]))
.map((t) => t[1])
.join(" ");
const content = SEARCHABLE_CONTENT_FORMATTERS[event.kind]
? SEARCHABLE_CONTENT_FORMATTERS[event.kind](event.content)
: event.content;
return { id: event.id, content, tags };
}
const migrations = new MigrationSet("event-store");
// Version 1
migrations.addScript(1, async (db, log) => {
// Create events table
db.prepare(
`
CREATE TABLE IF NOT EXISTS events (
id TEXT(64) PRIMARY KEY,
created_at INTEGER,
pubkey TEXT(64),
sig TEXT(128),
kind INTEGER,
content TEXT,
tags TEXT
)
`,
).run();
log("Setup events");
// Create tags table
db.prepare(
`
CREATE TABLE IF NOT EXISTS tags (
i INTEGER PRIMARY KEY AUTOINCREMENT,
e TEXT(64) REFERENCES events(id),
t TEXT(1),
v TEXT
)
`,
).run();
log("Setup tags table");
// Create indices
const indices = [
db.prepare("CREATE INDEX IF NOT EXISTS events_created_at ON events(created_at)"),
db.prepare("CREATE INDEX IF NOT EXISTS events_pubkey ON events(pubkey)"),
db.prepare("CREATE INDEX IF NOT EXISTS events_kind ON events(kind)"),
db.prepare("CREATE INDEX IF NOT EXISTS tags_e ON tags(e)"),
db.prepare("CREATE INDEX IF NOT EXISTS tags_t ON tags(t)"),
db.prepare("CREATE INDEX IF NOT EXISTS tags_v ON tags(v)"),
];
indices.forEach((statement) => statement.run());
log(`Setup ${indices.length} indices`);
});
// Version 2, search table
migrations.addScript(2, async (db, log) => {
db.prepare(
`CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(id UNINDEXED, content, tags, tokenize='trigram')`,
).run();
log("Created event search table");
const rows = db
.prepare<number[], EventRow>(`SELECT * FROM events WHERE kind NOT IN ${mapParams(SEARCHABLE_KIND_BLACKLIST)}`)
.all(...SEARCHABLE_KIND_BLACKLIST);
// insert search content into table
let changes = 0;
for (const row of rows) {
const search = convertEventToSearchRow(parseEventRow(row));
const result = db
.prepare<[string, string, string]>(`INSERT OR REPLACE INTO events_fts (id, content, tags) VALUES (?, ?, ?)`)
.run(search.id, search.content, search.tags);
changes += result.changes;
}
log(`Inserted ${changes} events into search table`);
});
// Version 3, indexed d tags
migrations.addScript(3, async (db, log) => {
db.prepare(`ALTER TABLE events ADD COLUMN d TEXT`).run();
log("Created d column");
db.prepare("CREATE INDEX IF NOT EXISTS events_d ON events(d)").run();
log(`Created d index`);
log(`Adding d tags to events table`);
let updated = 0;
db.transaction(() => {
const events = db
.prepare<[], { id: string; d: string }>(
`
SELECT events.id as id, tags.v as d
FROM events
INNER JOIN tags ON tags.e = events.id AND tags.t = 'd'
WHERE events.kind >= 30000 AND events.kind < 40000
`,
)
.all();
const update = db.prepare<[string, string]>("UPDATE events SET d = ? WHERE id = ?");
for (const row of events) {
const { changes } = update.run(row.d, row.id);
if (changes > 0) updated++;
}
})();
log(`Updated ${updated} events`);
});
type EventMap = { type EventMap = {
"event:inserted": [NostrEvent]; "event:inserted": [NostrEvent];
@@ -186,114 +22,89 @@ type EventMap = {
}; };
export class SQLiteEventStore extends EventEmitter<EventMap> implements ISyncEventStore { export class SQLiteEventStore extends EventEmitter<EventMap> implements ISyncEventStore {
db: Database;
log = logger.extend("sqlite-event-store"); log = logger.extend("sqlite-event-store");
preserveEphemeral = false; preserveEphemeral = false;
preserveReplaceable = false; keepHistory = false;
constructor(db: Database) { constructor(public database: BakeryDatabase) {
super(); super();
this.db = db;
} }
setup() { addEvent(event: NostrEvent): boolean {
return migrations.run(this.db);
}
addEvent(event: NostrEvent) {
// Don't store ephemeral events in db, // Don't store ephemeral events in db,
// just return the event directly // just return the event directly
if (!this.preserveEphemeral && kinds.isEphemeralKind(event.kind)) return false; if (!this.preserveEphemeral && kinds.isEphemeralKind(event.kind)) return false;
const inserted = this.db.transaction(() => { // Check if the event is already in the database
// TODO: Check if event is replaceable and if its newer if (eventQuery.execute({ id: event.id }).sync() !== undefined) return false;
// before inserting it into the database
// get event d value so it can be indexed // Get the replaceable identifier for the event
const d = kinds.isAddressableKind(event.kind) ? event.tags.find((t) => t[0] === "d" && t[1])?.[1] : undefined; const identifier =
kinds.isReplaceableKind(event.kind) || !kinds.isAddressableKind(event.kind)
? undefined
: event.tags.find((t) => t[0] === "d" && t[1])?.[1];
const insert = this.db // Check if the event is already in the database
.prepare( if (this.keepHistory === false && kinds.isReplaceableKind(event.kind)) {
` // Only check for newer events if we're not keeping history
INSERT OR IGNORE INTO events (id, created_at, pubkey, sig, kind, content, tags, d) if (this.keepHistory === false) {
VALUES (?, ?, ?, ?, ?, ?, ?, ?) const existing = replaceableQuery
`, .execute({
) kind: event.kind,
.run( pubkey: event.pubkey,
event.id, identifier,
event.created_at, })
event.pubkey, .sync();
event.sig,
event.kind,
event.content,
JSON.stringify(event.tags),
d,
);
// If event inserted, index tags, insert search // Found a newer event, exit
if (insert.changes) { if (existing !== undefined) return false;
}
} else if (this.keepHistory === false && kinds.isAddressableKind(event.kind)) {
const existing = addressableQuery
.execute({
kind: event.kind,
pubkey: event.pubkey,
identifier,
})
.sync();
// Found a newer event, exit
if (existing !== undefined) return false;
}
// Attempt to insert the event into the database
const inserted = this.database.transaction(() => {
const insert = this.database
.insert(schema.events)
.values({
id: event.id,
created_at: event.created_at,
pubkey: event.pubkey,
sig: event.sig,
kind: event.kind,
content: event.content,
tags: JSON.stringify(event.tags),
identifier: identifier ?? null,
})
.run();
// Insert indexed tags
this.insertEventTags(event); this.insertEventTags(event);
// Remove older replaceable events and all their associated tags
if (this.preserveReplaceable === false) {
let older: { id: string; created_at: number }[] = [];
if (kinds.isReplaceableKind(event.kind)) {
// Normal replaceable event
older = this.db
.prepare<[number, string], { id: string; created_at: number }>(
`
SELECT id, created_at FROM events WHERE kind = ? AND pubkey = ?
`,
)
.all(event.kind, event.pubkey);
} else if (kinds.isParameterizedReplaceableKind(event.kind)) {
// Parameterized Replaceable
const d = event.tags.find((t) => t[0] === "d")?.[1];
if (d) {
older = this.db
.prepare<[number, string, "d", string], { id: string; created_at: number }>(
`
SELECT events.id, events.created_at FROM events
INNER JOIN tags ON events.id = tags.e
WHERE kind = ? AND pubkey = ? AND tags.t = ? AND tags.v = ?
`,
)
.all(event.kind, event.pubkey, "d", d);
}
}
// If found other events that may need to be replaced,
// sort the events according to timestamp descending,
// falling back to id lexical order ascending as per
// NIP-01. Remove all non-most-recent events and tags.
if (older.length > 1) {
const removeIds = older
.sort((a, b) => {
return a.created_at === b.created_at ? a.id.localeCompare(b.id) : b.created_at - a.created_at;
})
.slice(1)
.map((item) => item.id);
this.removeEvents(removeIds);
// If the event that was just inserted was one of
// the events that was removed, return null so to
// indicate that the event was in effect *not*
// upserted and thus, if using the DB for a nostr
// relay, does not need to be pushed to clients
if (removeIds.indexOf(event.id) !== -1) return false;
}
}
}
return insert.changes > 0; return insert.changes > 0;
})(); });
if (inserted) { if (inserted) {
// Remove older replaceable events if we're not keeping history
if (this.keepHistory === false) {
this.removeReplaceableHistory(event.kind, event.pubkey, identifier);
}
// Index the event
this.insertEventIntoSearch(event); this.insertEventIntoSearch(event);
// Emit the event
this.emit("event:inserted", event); this.emit("event:inserted", event);
} }
@@ -303,231 +114,118 @@ export class SQLiteEventStore extends EventEmitter<EventMap> implements ISyncEve
private insertEventTags(event: NostrEvent) { private insertEventTags(event: NostrEvent) {
for (let tag of event.tags) { for (let tag of event.tags) {
if (tag[0].length === 1) { if (tag[0].length === 1) {
this.db.prepare(`INSERT INTO tags (e, t, v) VALUES (?, ?, ?)`).run(event.id, tag[0], tag[1]); this.database.insert(schema.tags).values({ event: event.id, tag: tag[0], value: tag[1] }).run();
} }
} }
} }
private insertEventIntoSearch(event: NostrEvent) { private insertEventIntoSearch(event: NostrEvent) {
const search = convertEventToSearchRow(event); return insertEventIntoSearch(this.database.$client, event);
return this.db
.prepare<[string, string, string]>(`INSERT OR REPLACE INTO events_fts (id, content, tags) VALUES (?, ?, ?)`)
.run(search.id, search.content, search.tags);
} }
removeEvents(ids: string[]) { protected removeReplaceableHistory(kind: number, pubkey: string, identifier?: string): number {
const results = this.db.transaction(() => { const existing = this.getReplaceableHistory(kind, pubkey, identifier);
this.db.prepare(`DELETE FROM tags WHERE e IN ${mapParams(ids)}`).run(...ids);
this.db.prepare(`DELETE FROM events_fts WHERE id IN ${mapParams(ids)}`).run(...ids);
return this.db.prepare(`DELETE FROM events WHERE events.id IN ${mapParams(ids)}`).run(...ids); // If there is more than one event, remove the older ones
})(); if (existing.length > 1) {
const removeIds = existing
// ignore the first event
.slice(1)
// get the ids of all the older events
.map((item) => item.id);
this.removeEvents(removeIds);
return removeIds.length;
}
return 0;
}
removeEvents(ids: string[]): number {
// Remove the events from the fts search table
removeEventsFromSearch(this.database.$client, ids);
const results = this.database.transaction(() => {
// Delete from tags first
this.database.delete(schema.tags).where(inArray(schema.tags.event, ids)).run();
// Then delete from events
return this.database.delete(schema.events).where(inArray(schema.events.id, ids)).run();
});
if (results.changes > 0) { if (results.changes > 0) {
for (const id of ids) { for (const id of ids) {
this.emit("event:removed", id); this.emit("event:removed", id);
} }
} }
}
protected buildConditionsForFilters(filter: Filter) { return results.changes;
const joins: string[] = [];
const conditions: string[] = [];
const parameters: (string | number)[] = [];
const groupBy: string[] = [];
const having: string[] = [];
// get AND tag filters
const andTagQueries = Object.keys(filter).filter(isFilterKeyIndexableAndTag);
// get OR tag filters and remove any ones that appear in the AND
const orTagQueries = Object.keys(filter)
.filter(isFilterKeyIndexableTag)
.filter((t) => !andTagQueries.includes(t));
if (orTagQueries.length > 0) {
joins.push("INNER JOIN tags as or_tags ON events.id = or_tags.e");
}
if (andTagQueries.length > 0) {
joins.push("INNER JOIN tags as and_tags ON events.id = and_tags.e");
}
if (filter.search) {
joins.push("INNER JOIN events_fts ON events_fts.id = events.id");
conditions.push(`events_fts MATCH ?`);
parameters.push('"' + filter.search.replace(/"/g, '""') + '"');
}
if (typeof filter.since === "number") {
conditions.push(`events.created_at >= ?`);
parameters.push(filter.since);
}
if (typeof filter.until === "number") {
conditions.push(`events.created_at < ?`);
parameters.push(filter.until);
}
if (filter.ids) {
conditions.push(`events.id IN ${mapParams(filter.ids)}`);
parameters.push(...filter.ids);
}
if (filter.kinds) {
conditions.push(`events.kind IN ${mapParams(filter.kinds)}`);
parameters.push(...filter.kinds);
}
if (filter.authors) {
conditions.push(`events.pubkey IN ${mapParams(filter.authors)}`);
parameters.push(...filter.authors);
}
// add AND tag filters
for (const t of andTagQueries) {
conditions.push(`and_tags.t = ?`);
parameters.push(t.slice(1));
// @ts-expect-error
const v = filter[t] as string[];
conditions.push(`and_tags.v IN ${mapParams(v)}`);
parameters.push(...v);
}
// add OR tag filters
for (let t of orTagQueries) {
conditions.push(`or_tags.t = ?`);
parameters.push(t.slice(1));
// @ts-expect-error
const v = filter[t] as string[];
conditions.push(`or_tags.v IN ${mapParams(v)}`);
parameters.push(...v);
}
// if there is an AND tag filter set GROUP BY so that HAVING can be used
if (andTagQueries.length > 0) {
groupBy.push("events.id");
having.push("COUNT(and_tags.i) = ?");
// @ts-expect-error
parameters.push(andTagQueries.reduce((t, k) => t + (filter[k] as string[]).length, 0));
}
return { conditions, parameters, joins, groupBy, having };
}
protected buildSQLQueryForFilters(filters: Filter[], select = "events.*") {
let sql = `SELECT ${select} FROM events `;
const orConditions: string[] = [];
const parameters: any[] = [];
const groupBy = new Set<string>();
const having = new Set<string>();
let joins = new Set<string>();
for (const filter of filters) {
const parts = this.buildConditionsForFilters(filter);
if (parts.conditions.length > 0) {
orConditions.push(`(${parts.conditions.join(" AND ")})`);
parameters.push(...parts.parameters);
for (const join of parts.joins) joins.add(join);
for (const group of parts.groupBy) groupBy.add(group);
for (const have of parts.having) having.add(have);
}
}
sql += Array.from(joins).join(" ");
if (orConditions.length > 0) {
sql += ` WHERE ${orConditions.join(" OR ")}`;
}
if (groupBy.size > 0) {
sql += " GROUP BY " + Array.from(groupBy).join(",");
}
if (having.size > 0) {
sql += " HAVING " + Array.from(having).join(" AND ");
}
// @ts-expect-error
const order = filters.find((f) => f.order)?.order;
if (filters.some((f) => f.search) && (order === "rank" || order === undefined)) {
sql = sql + " ORDER BY rank";
} else {
sql = sql + " ORDER BY created_at DESC";
}
let minLimit = Infinity;
for (const filter of filters) {
if (filter.limit) minLimit = Math.min(minLimit, filter.limit);
}
if (minLimit !== Infinity) {
sql += " LIMIT ?";
parameters.push(minLimit);
}
return { sql, parameters };
} }
hasEvent(id: string): boolean { hasEvent(id: string): boolean {
return this.db.prepare<[string], { id: string }>(`SELECT id FROM events WHERE id = ?`).get(id) !== undefined; return this.database.select().from(schema.events).where(eq(schema.events.id, id)).get() !== undefined;
} }
getEvent(id: string): NostrEvent | undefined { getEvent(id: string): NostrEvent | undefined {
const row = this.db.prepare<[string], EventRow>(`SELECT * FROM events WHERE id = ?`).get(id); const row = this.database.select().from(schema.events).where(eq(schema.events.id, id)).get();
if (!row) return undefined; if (!row) return undefined;
return parseEventRow(row); return parseEventRow(row);
} }
protected getReplaceableQuery(kind: number, pubkey: string, identifier?: string) {
if (kinds.isAddressableKind(kind)) {
return addressableQuery.execute({ kind, pubkey, identifier });
} else if (kinds.isReplaceableKind(kind)) {
return replaceableQuery.execute({ kind, pubkey });
} else throw new Error("Regular events are not replaceable");
}
hasReplaceable(kind: number, pubkey: string, identifier?: string): boolean { hasReplaceable(kind: number, pubkey: string, identifier?: string): boolean {
return this.getReplaceable(kind, pubkey, identifier) !== undefined; return this.getReplaceableQuery(kind, pubkey, identifier).sync() !== undefined;
} }
getReplaceable(kind: number, pubkey: string, identifier?: string): NostrEvent | undefined { getReplaceable(kind: number, pubkey: string, identifier?: string): NostrEvent | undefined {
const filter: Filter = { kinds: [kind], authors: [pubkey], limit: 1 }; const row = this.getReplaceableQuery(kind, pubkey, identifier).sync();
if (identifier) filter["#d"] = [identifier]; if (!row) return undefined;
return this.getEventsForFilters([filter])[0];
}
return parseEventRow(row);
}
getReplaceableHistory(kind: number, pubkey: string, identifier?: string): NostrEvent[] { getReplaceableHistory(kind: number, pubkey: string, identifier?: string): NostrEvent[] {
const filter: Filter = { kinds: [kind], authors: [pubkey] }; if (kinds.isRegularKind(kind)) throw new Error("Regular events are not replaceable");
if (identifier) filter["#d"] = [identifier];
return this.getEventsForFilters([filter]); const query = kinds.isAddressableKind(kind)
? addressableHistoryQuery.execute({
kind,
pubkey,
identifier,
})
: replaceableHistoryQuery.execute({
kind,
pubkey,
});
return query.sync().map(parseEventRow);
} }
getTimeline(filters: Filter | Filter[]): NostrEvent[] { getTimeline(filters: Filter | Filter[]): NostrEvent[] {
return this.getEventsForFilters(Array.isArray(filters) ? filters : [filters]); return this.getEventsForFilters(Array.isArray(filters) ? filters : [filters]);
} }
getAll(filters: Filter | Filter[]): Set<NostrEvent> { getAll(filters: Filter | Filter[]): Set<NostrEvent> {
return new Set(this.getEventsForFilters(Array.isArray(filters) ? filters : [filters])); return new Set(this.getEventsForFilters(Array.isArray(filters) ? filters : [filters]));
} }
// TODO: Update this to use drizzle
getEventsForFilters(filters: Filter[]) { getEventsForFilters(filters: Filter[]) {
const { sql, parameters } = this.buildSQLQueryForFilters(filters); const { stmt, parameters } = buildSQLQueryForFilters(filters);
return this.db.prepare<any[], EventRow>(sql).all(parameters).map(parseEventRow); return this.database.$client
} .prepare<any[], typeof schema.events.$inferSelect>(stmt)
.all(parameters)
*iterateEventsForFilters(filters: Filter[]): IterableIterator<NostrEvent> { .map(parseEventRow);
const { sql, parameters } = this.buildSQLQueryForFilters(filters);
const iterator = this.db.prepare<any[], EventRow>(sql).iterate(parameters);
while (true) {
const { value: row, done } = iterator.next();
if (done) break;
yield parseEventRow(row);
}
} }
// TODO: Update this to use drizzle
countEventsForFilters(filters: Filter[]) { countEventsForFilters(filters: Filter[]) {
const { sql, parameters } = this.buildSQLQueryForFilters(filters); const { stmt, parameters } = buildSQLQueryForFilters(filters);
const results = this.db const results = this.database.$client
.prepare<any[], { count: number }>(`SELECT COUNT(*) as count FROM ( ${sql} )`) .prepare<any[], { count: number }>(`SELECT COUNT(*) as count FROM ( ${stmt} )`)
.get(parameters) as { count: number } | undefined; .get(parameters) as { count: number } | undefined;
return results?.count ?? 0; return results?.count ?? 0;
} }