Save receiver state on shutdown

Remove event scrapper
This commit is contained in:
hzrd149
2025-03-30 11:16:14 +01:00
parent 1f954f5329
commit 745b8990f7
26 changed files with 178 additions and 95 deletions

View File

@@ -60,6 +60,7 @@
"lowdb": "^7.0.1",
"mkdirp": "^3.0.1",
"nanoid": "^5.1.5",
"node-graceful-shutdown": "^1.1.5",
"nostr-tools": "^2.11.0",
"pac-proxy-agent": "^7.2.0",
"process-streams": "^1.0.3",

44
pnpm-lock.yaml generated
View File

@@ -11,9 +11,6 @@ importers:
'@diva.exchange/i2p-sam':
specifier: ^5.4.2
version: 5.4.2
'@libsql/client':
specifier: ^0.15.1
version: 0.15.1
'@modelcontextprotocol/sdk':
specifier: ^1.8.0
version: 1.8.0
@@ -98,6 +95,9 @@ importers:
nanoid:
specifier: ^5.1.5
version: 5.1.5
node-graceful-shutdown:
specifier: ^1.1.5
version: 1.1.5
nostr-tools:
specifier: ^2.11.0
version: 2.11.0(typescript@5.8.2)
@@ -2985,6 +2985,10 @@ packages:
resolution: {integrity: sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
node-graceful-shutdown@1.1.5:
resolution: {integrity: sha512-tlz8XpPr+pqrEGWFNLtMwd0HdFsCAKp5NCmMvwcTZTA0hyrVd7gkKbivDcSM5uYB1331/cEjNTtmVtqKy8OSBw==}
engines: {node: '>=6'}
nodemon@3.1.9:
resolution: {integrity: sha512-hdr1oIb2p6ZSxu3PB2JWWYS7ZQ0qvaZsc3hK8DR8f02kRzc8rjYmxAIvdz+aYC+8F2IjNaB7HMcSDg8nQpJxyg==}
engines: {node: '>=10'}
@@ -4392,10 +4396,12 @@ snapshots:
transitivePeerDependencies:
- bufferutil
- utf-8-validate
optional: true
'@libsql/core@0.15.1':
dependencies:
js-base64: 3.7.7
optional: true
'@libsql/darwin-arm64@0.5.3':
optional: true
@@ -4412,8 +4418,10 @@ snapshots:
transitivePeerDependencies:
- bufferutil
- utf-8-validate
optional: true
'@libsql/isomorphic-fetch@0.3.1': {}
'@libsql/isomorphic-fetch@0.3.1':
optional: true
'@libsql/isomorphic-ws@0.1.5':
dependencies:
@@ -4422,6 +4430,7 @@ snapshots:
transitivePeerDependencies:
- bufferutil
- utf-8-validate
optional: true
'@libsql/linux-arm64-gnu@0.5.3':
optional: true
@@ -4540,7 +4549,8 @@ snapshots:
'@tybys/wasm-util': 0.9.0
optional: true
'@neon-rs/load@0.0.4': {}
'@neon-rs/load@0.0.4':
optional: true
'@noble/ciphers@0.5.3': {}
@@ -5741,7 +5751,8 @@ snapshots:
shebang-command: 2.0.0
which: 2.0.2
data-uri-to-buffer@4.0.1: {}
data-uri-to-buffer@4.0.1:
optional: true
data-uri-to-buffer@6.0.2: {}
@@ -5795,7 +5806,8 @@ snapshots:
detect-indent@6.1.0: {}
detect-libc@2.0.2: {}
detect-libc@2.0.2:
optional: true
detect-libc@2.0.3: {}
@@ -6122,6 +6134,7 @@ snapshots:
dependencies:
node-domexception: 1.0.0
web-streams-polyfill: 3.3.3
optional: true
file-uri-to-path@1.0.0: {}
@@ -6170,6 +6183,7 @@ snapshots:
formdata-polyfill@4.0.10:
dependencies:
fetch-blob: 3.2.0
optional: true
forwarded@0.2.0: {}
@@ -6465,7 +6479,8 @@ snapshots:
isexe@3.1.1: {}
js-base64@3.7.7: {}
js-base64@3.7.7:
optional: true
js-tokens@4.0.0: {}
@@ -6509,6 +6524,7 @@ snapshots:
'@libsql/linux-x64-gnu': 0.5.3
'@libsql/linux-x64-musl': 0.5.3
'@libsql/win32-x64-msvc': 0.5.3
optional: true
light-bolt11-decoder@3.2.0:
dependencies:
@@ -6824,7 +6840,8 @@ snapshots:
dependencies:
semver: 7.7.1
node-domexception@1.0.0: {}
node-domexception@1.0.0:
optional: true
node-fetch@2.7.0:
dependencies:
@@ -6835,6 +6852,9 @@ snapshots:
data-uri-to-buffer: 4.0.1
fetch-blob: 3.2.0
formdata-polyfill: 4.0.10
optional: true
node-graceful-shutdown@1.1.5: {}
nodemon@3.1.9:
dependencies:
@@ -7015,7 +7035,8 @@ snapshots:
dependencies:
duplex-maker: 1.0.0
promise-limit@2.7.0: {}
promise-limit@2.7.0:
optional: true
protomux@3.10.1:
dependencies:
@@ -7773,7 +7794,8 @@ snapshots:
transitivePeerDependencies:
- supports-color
web-streams-polyfill@3.3.3: {}
web-streams-polyfill@3.3.3:
optional: true
webidl-conversions@3.0.1: {}

View File

@@ -27,17 +27,15 @@ import RemoteAuthActions from "../modules/control/remote-auth-actions.js";
import LogStore from "../modules/log-store/log-store.js";
import DecryptionCache from "../modules/decryption-cache/decryption-cache.js";
import DecryptionCacheActions from "../modules/control/decryption-cache.js";
import Scrapper from "../modules/scrapper/index.js";
import LogsActions from "../modules/control/logs-actions.js";
import ApplicationStateManager from "../modules/application-state/manager.js";
import ScrapperActions from "../modules/control/scrapper-actions.js";
import InboundNetworkManager from "../modules/network/inbound/index.js";
import OutboundNetworkManager from "../modules/network/outbound/index.js";
import SecretsManager from "../modules/secrets-manager.js";
import Switchboard from "../modules/switchboard/switchboard.js";
import Gossip from "../modules/gossip.js";
import secrets from "../services/secrets.js";
import bakeryConfig from "../services/config.js";
import bakeryConfig from "../services/bakery-config.js";
import logStore from "../services/log-store.js";
import stateManager from "../services/app-state.js";
import eventCache from "../services/event-cache.js";
@@ -74,7 +72,6 @@ export default class App extends EventEmitter<EventMap> {
eventStore: SQLiteEventStore;
logStore: LogStore;
relay: NostrRelay;
scrapper: Scrapper;
control: ControlApi;
pool: CautiousPool;
addressBook: AddressBook;
@@ -163,12 +160,6 @@ export default class App extends EventEmitter<EventMap> {
if (this.notifications.shouldNotify(event)) this.notifications.notify(event);
});
this.scrapper = new Scrapper(this);
this.scrapper.setup();
// pass events from the scrapper to the event store
this.scrapper.on("event", (event) => this.eventStore.addEvent(event));
// Initializes direct message manager for subscribing to DMs
this.directMessageManager = new DirectMessageManager(this);
@@ -180,7 +171,6 @@ export default class App extends EventEmitter<EventMap> {
// API for controlling the node
this.control = new ControlApi(this);
this.control.registerHandler(new ConfigActions(this));
this.control.registerHandler(new ScrapperActions(this));
this.control.registerHandler(new DirectMessageActions(this));
this.control.registerHandler(new NotificationActions(this));
this.control.registerHandler(new RemoteAuthActions(this));
@@ -355,8 +345,6 @@ export default class App extends EventEmitter<EventMap> {
this.running = true;
await this.config.read();
if (this.config.data.runScrapperOnBoot) this.scrapper.start();
this.tick();
// start http server listening
@@ -380,7 +368,6 @@ export default class App extends EventEmitter<EventMap> {
async stop() {
this.running = false;
this.config.write();
this.scrapper.stop();
this.relay.stop();
await this.inboundNetwork.stop();

View File

@@ -1,4 +1,4 @@
import { bufferTime, MonoTypeOperatorFunction, scan, OperatorFunction, Subject, tap } from "rxjs";
import { bufferTime, MonoTypeOperatorFunction, scan, OperatorFunction, Subject, tap, map } from "rxjs";
export function bufferAudit<T>(buffer = 10_000, audit: (messages: T[]) => void): MonoTypeOperatorFunction<T> {
return (source) => {
@@ -22,3 +22,20 @@ export function lastN<T>(n: number): OperatorFunction<T, T[]> {
return newAcc;
}, []);
}
export function auditsPerMinute(
history = 5,
): OperatorFunction<number, { average: number; minutes: number; audits: number[] }> {
return (source) =>
source.pipe(
lastN(history),
map((audits) => {
const average = audits.reduce((sum, val) => sum + val, 0) / audits.length;
return {
average,
minutes: audits.length,
audits,
};
}),
);
}

View File

@@ -6,6 +6,7 @@ import express, { Request } from "express";
import dayjs from "dayjs";
import duration from "dayjs/plugin/duration.js";
import localizedFormat from "dayjs/plugin/localizedFormat.js";
import relativeTime from "dayjs/plugin/relativeTime.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import mcpServer from "./services/mcp/index.js";
@@ -23,7 +24,7 @@ import "./lifecycle.js";
// add durations plugin
dayjs.extend(duration);
dayjs.extend(localizedFormat);
dayjs.extend(relativeTime);
// create app
const app = new App();

View File

@@ -7,7 +7,7 @@ import { eventStore, queryStore } from "./services/stores.js";
import { replaceableLoader } from "./services/loaders.js";
import { logger } from "./logger.js";
import { rxNostr } from "./services/rx-nostr.js";
import bakeryConfig from "./services/config.js";
import bakeryConfig from "./services/bakery-config.js";
import receiver from "./services/receiver.js";
import eventCache from "./services/event-cache.js";
@@ -62,7 +62,7 @@ ownerMailboxes$.subscribe((mailboxes) => {
// Start the receiver when there is an owner and its enabled
bakeryConfig.data$
.pipe(
map((c) => c.runReceiverOnBoot),
map((c) => c.receiverEnabled),
distinctUntilChanged(),
)
.subscribe((enabled) => {

View File

@@ -1,7 +1,7 @@
import { Server } from "http";
import { logger } from "../../../logger.js";
import { getIPAddresses } from "../../../helpers/ip.js";
import bakeryConfig from "../../../services/config.js";
import bakeryConfig from "../../../services/bakery-config.js";
import TorInbound from "./tor.js";
import I2PInbound from "./i2p.js";

View File

@@ -5,7 +5,7 @@ import { logger } from "../../../logger.js";
import HyperOutbound from "./hyper.js";
import TorOutbound from "./tor.js";
import I2POutbound from "./i2p.js";
import bakeryConfig from "../../../services/config.js";
import bakeryConfig from "../../../services/bakery-config.js";
export default class OutboundNetworkManager {
log = logger.extend("Network:Outbound");

View File

@@ -8,7 +8,7 @@ import webPush from "web-push";
import { logger } from "../../logger.js";
import App from "../../app/index.js";
import stateManager from "../../services/app-state.js";
import bakeryConfig from "../../services/config.js";
import bakeryConfig from "../../services/bakery-config.js";
import { getDMRecipient, getDMSender } from "../../helpers/direct-messages.js";
export type NotificationsManagerState = {

View File

@@ -1,4 +1,6 @@
import { Query } from "../types.js";
import bakeryConfig, { BakeryConfig } from "../../../services/config.js";
import bakeryConfig, { BakeryConfig } from "../../../services/bakery-config.js";
export const ConfigQuery: Query<BakeryConfig> = () => bakeryConfig.data$;
const ConfigQuery: Query<BakeryConfig> = () => bakeryConfig.data$;
export default ConfigQuery;

View File

@@ -3,6 +3,6 @@ import { ConnectionState } from "rx-nostr";
import { Query } from "../types.js";
import { connections$ } from "../../../services/rx-nostr.js";
export const ConnectionsQuery: Query<Record<string, ConnectionState>> = () => {
return connections$;
};
const ConnectionsQuery: Query<Record<string, ConnectionState>> = () => connections$;
export default ConnectionsQuery;

View File

@@ -1,12 +1,14 @@
import QueryManager from "../manager.js";
import { ConfigQuery } from "./config.js";
import { ConnectionsQuery } from "./connections.js";
import { LogsQuery } from "./logs.js";
import ConfigQuery from "./config.js";
import ConnectionsQuery from "./connections.js";
import { LogsQuery, ServicesQuery } from "./logs.js";
import NetworkStateQuery from "./network-status.js";
import { ServicesQuery } from "./services.js";
import { ReceiverConnectionMapQuery, ReceiverStatsQuery } from "./receiver.js";
QueryManager.types.set("network-status", NetworkStateQuery);
QueryManager.types.set("logs", LogsQuery);
QueryManager.types.set("services", ServicesQuery);
QueryManager.types.set("config", ConfigQuery);
QueryManager.types.set("connections", ConnectionsQuery);
QueryManager.types.set("receiver-stats", ReceiverStatsQuery);
QueryManager.types.set("receiver-connections", ReceiverConnectionMapQuery);

View File

@@ -1,8 +1,8 @@
import { filter, from, merge } from "rxjs";
import { filter, from, merge, NEVER } from "rxjs";
import { Query } from "../types.js";
import logStore from "../../../services/log-store.js";
import { schema } from "../../../db/index.js";
import bakeryDatabase, { schema } from "../../../db/index.js";
export const LogsQuery: Query<typeof schema.logs.$inferSelect> = (args: {
service?: string;
@@ -21,3 +21,16 @@ export const LogsQuery: Query<typeof schema.logs.$inferSelect> = (args: {
}),
),
);
export const ServicesQuery: Query<string[]> = () =>
merge(
NEVER,
from(
bakeryDatabase
.select()
.from(schema.logs)
.groupBy(schema.logs.service)
.all()
.map((row) => row.service),
),
);

View File

@@ -0,0 +1,21 @@
import { combineLatest, map } from "rxjs";
import { Query } from "../types.js";
import { receiverEventsPerMinute } from "../../../services/runtime-stats.js";
import receiver from "../../../services/receiver.js";
import bakeryConfig from "../../../services/bakery-config.js";
export const ReceiverStatsQuery: Query<
undefined,
{ enabled: boolean; eventsPerMinute: { average: number; minutes: number; audits: number[] } }
> = () =>
combineLatest({
enabled: bakeryConfig.data$.pipe(map((c) => !!c.receiverEnabled)),
eventsPerMinute: receiverEventsPerMinute,
});
export const ReceiverConnectionMapQuery: Query<undefined, Record<string, string[]>> = () =>
receiver.relayPubkeys$.pipe(
// convert the map and sets to an object
map((map) => Object.fromEntries(Array.from(map.entries()).map(([k, v]) => [k, Array.from(v)]))),
);

View File

@@ -1,16 +0,0 @@
import { from, merge, NEVER } from "rxjs";
import { Query } from "../types.js";
import bakeryDatabase, { schema } from "../../../db/index.js";
export const ServicesQuery: Query<string[]> = () =>
merge(
NEVER,
from(
bakeryDatabase
.select()
.from(schema.logs)
.groupBy(schema.logs.service)
.all()
.map((row) => row.service),
),
);

View File

@@ -1,7 +1,10 @@
import { Observable } from "rxjs";
import WebSocket from "ws";
export type Query<T extends unknown = unknown> = (args: T, socket: WebSocket) => Observable<any>;
export type Query<Args extends unknown = unknown, Result extends unknown = unknown> = (
args: Args,
socket: WebSocket,
) => Observable<Result>;
// open query messages (id, type, args)
export type QueryOpen<Args extends unknown> = ["QRY", "OPEN", string, string, Args];

View File

@@ -1,8 +1,6 @@
import {
bufferTime,
catchError,
combineLatest,
distinct,
distinctUntilChanged,
from,
map,
@@ -12,8 +10,6 @@ import {
of,
scan,
share,
shareReplay,
Subject,
switchMap,
tap,
throttleTime,
@@ -21,13 +17,13 @@ import {
import { createRxForwardReq, EventPacket, RxNostr, RxReq } from "rx-nostr";
import { ProfilePointer } from "nostr-tools/nip19";
import { Filter, kinds } from "nostr-tools";
import { getRelaysFromContactsEvent, isFilterEqual } from "applesauce-core/helpers";
import { getRelaysFromContactsEvent, isFilterEqual, unixNow } from "applesauce-core/helpers";
import { FALLBACK_RELAYS, LOOKUP_RELAYS } from "../../env.js";
import { logger } from "../../logger.js";
import AsyncLoader from "../async-loader.js";
import { groupPubkeysByRelay } from "./relay-mapping.js";
import { lastN } from "../../helpers/rxjs.js";
import dayjs from "dayjs";
export type ReceiverConfig = {
refreshInterval?: number;
@@ -41,9 +37,15 @@ type OngoingRequest = {
observable: Observable<EventPacket>;
};
type ReceiverState = {
cursor?: number;
};
export default class Receiver {
log = logger.extend("Receiver");
state: ReceiverState = {};
/** The outboxes for the root pubkey */
outboxes$: Observable<string[]>;
@@ -98,7 +100,7 @@ export default class Receiver {
directory[contact.pubkey] = from(this.asyncLoader.outboxes(contact.pubkey, LOOKUP_RELAYS)).pipe(
catchError(() =>
// If the outboxes fail, try to load the contacts event and parse the relays from it
from(this.asyncLoader.replaceable(kinds.Contacts, contact.pubkey)).pipe(
from(this.asyncLoader.replaceable(kinds.Contacts, contact.pubkey, undefined, LOOKUP_RELAYS)).pipe(
map((event) => {
const parsed = getRelaysFromContactsEvent(event);
if (!parsed) throw new Error("No relays in contacts");
@@ -126,14 +128,14 @@ export default class Receiver {
this.requests$ = this.relayPubkeys$.pipe(
scan(
(acc, updated) => {
this.log(`Last scan was ${this.state.cursor ? dayjs.unix(this.state.cursor).fromNow() : "never"}`);
for (const [relay, pubkeys] of updated.entries()) {
const filter: Filter = { authors: Array.from(pubkeys) };
const filter: Filter = { authors: Array.from(pubkeys), since: this.state.cursor };
// only re-create the request if the filter has changed
if (acc[relay] && isFilterEqual(acc[relay].filter, filter)) continue;
this.log(`Subscribing to ${relay} with ${pubkeys.size} pubkeys`);
const req = createRxForwardReq();
const observable = this.rxNostr.use(req, { on: { relays: [relay] } });
@@ -148,10 +150,10 @@ export default class Receiver {
let emitted = new WeakSet<RxReq<"forward">>();
this.events$ = this.requests$.pipe(
// Subscribe to all requests
switchMap(
(requests) =>
// A hack to ensure that the filters are emitted after the observable is subscribed to
// TODO: this should be updated to only emit new REQ when the pubkeys (filter) changes
new Observable<EventPacket>((observer) => {
// Merge all the observables into one
const sub = merge(...Object.values(requests).map((r) => r.observable)).subscribe(observer);
@@ -171,23 +173,17 @@ export default class Receiver {
return sub;
}),
),
// Log when the receiver stops or has errors
tap({ complete: () => this.log("Receiver stopped"), error: (e) => this.log("Receiver error", e.message) }),
tap({
next: (packet) => {
// Update the cursor to the latest event date
this.state.cursor = Math.min(unixNow(), packet.event.created_at);
},
// Log when the receiver stops or has errors
complete: () => this.log("Receiver stopped"),
error: (e) => this.log("Receiver error", e.message),
}),
// Share so the pipeline its not recreated for each subscription
share(),
);
// Log the average number of events received per minute over the last 5 minutes
this.events$
.pipe(
distinct((e) => e.event.id),
bufferTime(60_000), // Buffer events for 1 minute
map((events) => events.length), // Count events in buffer
lastN(5),
)
.subscribe((audits) => {
const avg = audits.reduce((sum, val) => sum + val, 0) / audits.length;
this.log(`Average ${avg.toFixed(2)} events/minute over the last ${audits.length} minutes`);
});
}
}

View File

@@ -18,9 +18,8 @@ export const bakeryConfigSchema = z.object({
autoListen: z.boolean().default(false),
logsEnabled: z.boolean().default(false),
// scrapper config
runReceiverOnBoot: z.boolean().default(true),
runScrapperOnBoot: z.boolean().default(false),
// receiver config
receiverEnabled: z.boolean().default(true),
// nostr network config
bootstrap_relays: z.array(z.string().url()).optional(),

View File

@@ -1,7 +1,7 @@
import { kinds } from "nostr-tools";
import mcpServer from "./server.js";
import bakeryConfig from "../config.js";
import bakeryConfig from "../bakery-config.js";
mcpServer.resource("owner_pubkey", "pubkey://owner", async (uri) => ({
contents: [

View File

@@ -1,5 +1,5 @@
import mcpServer from "../server.js";
import bakeryConfig, { bakeryConfigSchema } from "../../config.js";
import bakeryConfig, { bakeryConfigSchema } from "../../bakery-config.js";
mcpServer.tool("get_bakery_config", "Gets the current configuration for the bakery", {}, async () => {
return { content: [{ type: "text", text: JSON.stringify(bakeryConfig.data) }] };

View File

@@ -5,7 +5,7 @@ import z from "zod";
import mcpServer from "../server.js";
import { ownerFactory, ownerPublish } from "../../owner-signer.js";
import bakeryConfig from "../../config.js";
import bakeryConfig from "../../bakery-config.js";
import eventCache from "../../event-cache.js";
import { normalizeToHexPubkey } from "../../../helpers/nip19.js";
import { asyncLoader } from "../../loaders.js";

View File

@@ -7,7 +7,7 @@ import mcpServer from "../server.js";
import { ownerAccount$, setupSigner$, startSignerSetup, stopSignerSetup } from "../../owner-signer.js";
import { DEFAULT_NOSTR_CONNECT_RELAYS } from "../../../const.js";
import { normalizeToHexPubkey } from "../../../helpers/nip19.js";
import bakeryConfig from "../../config.js";
import bakeryConfig from "../../bakery-config.js";
mcpServer.prompt("setup_signer", "Start the setup and connection process for the users nostr signer", async () => {
return {

View File

@@ -9,7 +9,7 @@ import { eventStore } from "./stores.js";
import { nostrConnectPublish, nostrConnectSubscription } from "../helpers/applesauce.js";
import { NostrEvent } from "nostr-tools";
import eventCache from "./event-cache.js";
import bakeryConfig from "./config.js";
import bakeryConfig from "./bakery-config.js";
import { rxNostr } from "./rx-nostr.js";
import { logger } from "../logger.js";
import { NostrConnectAccount } from "applesauce-accounts/accounts";

View File

@@ -1,9 +1,10 @@
import { filter, map } from "rxjs";
import bakeryConfig from "./config.js";
import bakeryConfig from "./bakery-config.js";
import { asyncLoader } from "./loaders.js";
import { rxNostr } from "./rx-nostr.js";
import Receiver from "../modules/receiver/index.js";
import stateManager from "./app-state.js";
const root = bakeryConfig.data$.pipe(
map((c) => c.owner),
@@ -15,5 +16,6 @@ const receiver = new Receiver(root, asyncLoader, rxNostr, {
minRelaysPerPubkey: 1,
maxRelaysPerPubkey: 3,
});
receiver.state = stateManager.getMutableState("receiver", {});
export default receiver;

View File

@@ -0,0 +1,29 @@
import { bufferTime, map, merge, shareReplay } from "rxjs";
import { distinct } from "rxjs";
import { onShutdown } from "node-graceful-shutdown";
import receiver from "./receiver.js";
import { auditsPerMinute } from "../helpers/rxjs.js";
import eventCache from "./event-cache.js";
// Log the average number of events received per minute over the last 5 minutes
export const receiverEventsPerMinute = receiver.events$.pipe(
distinct((e) => e.event.id),
bufferTime(60_000), // Buffer events for 1 minute
map((events) => events.length), // Count events in buffer
auditsPerMinute(),
shareReplay(),
);
export const databaseEventsPerMinute = eventCache.inserted$.pipe(
bufferTime(60_000), // Buffer events for 1 minute
map((events) => events.length), // Count events in buffer
auditsPerMinute(),
shareReplay(),
);
// Start all the stats and keep them running
const sub = merge(receiverEventsPerMinute, databaseEventsPerMinute).subscribe();
// Stop all the stats when the app shuts down
onShutdown("runtime-stats", () => sub.unsubscribe());

View File

@@ -1,3 +1,4 @@
import { Subject } from "rxjs";
import { ISyncEventStore } from "applesauce-core";
import { Filter, NostrEvent, kinds } from "nostr-tools";
import { eq, inArray } from "drizzle-orm";
@@ -24,6 +25,8 @@ type EventMap = {
export class SQLiteEventStore extends EventEmitter<EventMap> implements ISyncEventStore {
log = logger.extend("sqlite-event-store");
inserted$ = new Subject<NostrEvent>();
preserveEphemeral = false;
keepHistory = false;
@@ -106,6 +109,7 @@ export class SQLiteEventStore extends EventEmitter<EventMap> implements ISyncEve
// Emit the event
this.emit("event:inserted", event);
this.inserted$.next(event);
}
return inserted;