use rxjs for queries

This commit is contained in:
hzrd149
2025-03-08 21:44:03 +00:00
parent 2fc5900b57
commit 1a24f99b32
15 changed files with 267 additions and 173 deletions

View File

@@ -5,6 +5,7 @@ import { kinds } from "nostr-tools";
import { AbstractRelay } from "nostr-tools/abstract-relay";
import express, { Express } from "express";
import { EventEmitter } from "events";
import { filter } from "rxjs";
import cors from "cors";
import { logger } from "../logger.js";
@@ -52,6 +53,9 @@ import { server } from "../services/server.js";
import { SQLiteEventStore } from "../sqlite/event-store.js";
import { NostrRelay } from "../relay/nostr-relay.js";
import { getDMRecipient } from "../helpers/nostr/dms.js";
import { onConnection, onJSONMessage } from "../helpers/ws.js";
import QueryManager from "../modules/queries/manager.js";
import "../modules/queries/queries/index.js";
type EventMap = {
listening: [];
@@ -205,6 +209,7 @@ export default class App extends EventEmitter<EventMap> {
this.control.registerHandler(new NotificationActions(this));
this.control.registerHandler(new RemoteAuthActions(this));
this.control.registerHandler(new DecryptionCacheActions(this));
this.control.registerHandler(new LogsActions(this));
// reports
@@ -217,6 +222,16 @@ export default class App extends EventEmitter<EventMap> {
// if process has an RPC interface, attach control api to it
if (process.send) this.control.attachToProcess(process);
// queries
onConnection(this.wss).subscribe((ws) => {
const manager = new QueryManager(ws);
const sub = onJSONMessage(ws)
.pipe(filter((m) => Array.isArray(m)))
.subscribe(manager.messages);
ws.once("close", () => sub.unsubscribe());
});
this.relay = new NostrRelay(this.eventStore);
this.relay.sendChallenge = true;
this.relay.requireRelayInAuth = false;

24
src/helpers/ws.ts Normal file
View File

@@ -0,0 +1,24 @@
import { Observable } from "rxjs";
import { WebSocketServer, WebSocket } from "ws";
export function onConnection(wss: WebSocketServer): Observable<WebSocket> {
return new Observable<WebSocket>((observer) => {
const listener = (ws: WebSocket) => observer.next(ws);
wss.on("connection", listener);
return () => wss.off("connection", listener);
});
}
export function onJSONMessage<T extends unknown = unknown>(ws: WebSocket) {
return new Observable<T>((observer) => {
const listener = (message: string | Buffer) => {
try {
observer.next(JSON.parse(String(message)));
} catch (error) {}
};
ws.on("message", listener);
return () => ws.off("message", listener);
});
}

View File

@@ -77,3 +77,13 @@ async function shutdown() {
}
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
// catch unhandled errors
process.on("uncaughtException", (error) => {
console.error("Uncaught Exception:", error);
});
// 2. Catch unhandled promise rejections
process.on("unhandledRejection", (reason, promise) => {
console.error("Unhandled Promise Rejection:", reason);
});

View File

@@ -0,0 +1,71 @@
import WebSocket from "ws";
import { Observable, Subject, Subscription } from "rxjs";
import { logger } from "../../logger.js";
import { Query, QueryClose, QueryData, QueryError } from "./types.js";
export default class QueryManager<Socket extends WebSocket = WebSocket> {
static types = new Map<string, Query<any>>();
log = logger.extend("QueryManager");
// incoming messages
messages = new Subject<any[]>();
// active queries
queries = new Map<string, Subscription>();
constructor(public socket: Socket) {
this.messages.subscribe((message) => {
if (message[0] === "QRY") {
try {
switch (message[1]) {
case "OPEN":
this.openQuery(message[2], message[3], message[4]);
break;
case "CLOSE":
this.closeQuery(message[2]);
break;
}
} catch (error) {
if (error instanceof Error) this.log(`Failed to handle QRY message ${error.message}`);
}
}
});
}
openQuery<T extends unknown>(type: string, id: string, args: T) {
if (!this.queries.has(id)) {
try {
const queryType = QueryManager.types.get(type);
if (!queryType) throw new Error(`Cant find query type ${type}`);
const sub = queryType(args, this.socket).subscribe({
next: (result) => this.send(["QRY", "DATA", id, result]),
error: (err) => {
if (err instanceof Error) this.send(["QRY", "ERR", id, err.message]);
},
complete: () => this.send(["QRY", "CLOSE", id]),
});
this.queries.set(id, sub);
} catch (error) {
if (error instanceof Error) this.send(["QRY", "ERR", id, error.message]);
throw error;
}
}
}
closeQuery(id: string) {
const sub = this.queries.get(id);
if (sub) {
// stop the query
sub.unsubscribe();
this.queries.delete(id);
}
}
protected send<T extends unknown>(message: QueryData<T> | QueryError | QueryClose) {
this.socket.send(JSON.stringify(message));
}
}

View File

@@ -0,0 +1,13 @@
import { Observable } from "rxjs";
import { PrivateNodeConfig } from "@satellite-earth/core/types";
import { Query } from "../types.js";
import config from "../../../services/config.js";
export const ConfigQuery: Query<PrivateNodeConfig> = () =>
new Observable((observer) => {
observer.next(config.data);
const listener = (c: PrivateNodeConfig) => observer.next(c);
config.on("updated", listener);
return () => config.off("updated", listener);
});

View File

@@ -0,0 +1,10 @@
import QueryManager from "../manager.js";
import { ConfigQuery } from "./config.js";
import { LogsQuery } from "./logs.js";
import NetworkStateQuery from "./network-status.js";
import { ServicesQuery } from "./services.js";
QueryManager.types.set("network-status", NetworkStateQuery);
QueryManager.types.set("logs", LogsQuery);
QueryManager.types.set("services", ServicesQuery);
QueryManager.types.set("config", ConfigQuery);

View File

@@ -0,0 +1,22 @@
import { filter, from, fromEvent, merge } from "rxjs";
import { Query } from "../types.js";
import logStore from "../../../services/log-store.js";
import { LogEntry } from "../../log-store/log-store.js";
export const LogsQuery: Query<LogEntry> = (args: {
service?: string;
since?: number;
until?: number;
limit?: number;
}) =>
merge(
// get last 500 lines
from(logStore.getLogs({ service: args.service, limit: 500 })),
// subscribe to new logs
fromEvent<LogEntry>(logStore, "log").pipe(
// only return logs that match args
filter((entry) => {
return !args?.service || entry.service === args.service;
}),
),
);

View File

@@ -0,0 +1,76 @@
import { interval, map } from "rxjs";
import { inboundNetwork, outboundNetwork } from "../../../services/network.js";
import { Query } from "../types.js";
export type NetworkOutboundState = {
available: boolean;
running?: boolean;
error?: string;
};
export type NetworkInboundState = {
available: boolean;
running?: boolean;
error?: string;
address?: string;
};
export type NetworkState = {
inbound: NetworkInboundState;
outbound: NetworkInboundState;
};
export type NetworkStateResult = {
tor: NetworkState;
hyper: NetworkState;
i2p: NetworkState;
};
const NetworkStateQuery: Query<NetworkStateResult> = () =>
interval(1000).pipe(
map(() => {
const inbound = inboundNetwork;
const outbound = outboundNetwork;
return {
tor: {
inbound: {
available: inbound.tor.available,
running: inbound.tor.running,
error: inbound.tor.error?.message,
address: inbound.tor.address,
},
outbound: {
available: outbound.tor.available,
running: outbound.tor.running,
error: outbound.tor.error?.message,
},
},
hyper: {
inbound: {
available: inbound.hyper.available,
running: inbound.hyper.running,
error: inbound.hyper.error?.message,
address: inbound.hyper.address,
},
outbound: {
available: outbound.hyper.available,
running: outbound.hyper.running,
error: outbound.hyper.error?.message,
},
},
i2p: {
inbound: {
available: inbound.i2p.available,
running: inbound.i2p.running,
error: inbound.i2p.error?.message,
address: inbound.i2p.address,
},
outbound: {
available: outbound.i2p.available,
running: outbound.i2p.running,
error: outbound.i2p.error?.message,
},
},
};
}),
);
export default NetworkStateQuery;

View File

@@ -0,0 +1,6 @@
import { from, merge, NEVER } from "rxjs";
import database from "../../../services/database.js";
import { Query } from "../types.js";
export const ServicesQuery: Query<string[]> = () =>
merge(NEVER, from(database.db.prepare<[], { id: string }>(`SELECT service as id FROM logs GROUP BY service`).all()));

View File

@@ -0,0 +1,14 @@
import { Observable } from "rxjs";
import WebSocket from "ws";
export type Query<T extends unknown = unknown> = (args: T, socket: WebSocket) => Observable<any>;
// open query messages (id, type, args)
export type QueryOpen<Args extends unknown> = ["QRY", "OPEN", string, string, Args];
// close query message (id)
export type QueryClose = ["QRY", "CLOSE", string];
// error messages (id, message)
export type QueryError = ["QRY", "ERR", string, string];
// result message (id, data)
export type QueryData<Result extends unknown> = ["QRY", "DATA", string, Result];

View File

@@ -4,8 +4,10 @@ import hash_sum from "hash-sum";
import { Session } from "../../relay/session.js";
import SuperMap from "../../helpers/super-map.js";
export type Query<T extends unknown = unknown> = (args: T, socket: WebSocket) => Observable<any>;
// open query messages (id, type, args)
export type QueryOpen<Args extends Record<string, any>> = ["QRY", "OPEN", string, string, Args];
export type QueryOpen<Args extends unknown> = ["QRY", "OPEN", string, string, Args];
// close query message (id)
export type QueryClose = ["QRY", "CLOSE", string];

View File

@@ -1,10 +1,13 @@
import { WebSocket } from "ws";
import { Observable } from "rxjs";
import { ReportErrorMessage, ReportResultMessage } from "@satellite-earth/core/types/control-api/reports.js";
import { ReportArguments, ReportResults } from "@satellite-earth/core/types";
import type App from "../../app/index.js";
import { logger } from "../../logger.js";
export type NewReport = (socket: WebSocket | NodeJS.Process) => Observable<any>;
type f = () => void;
export default class Report<T extends keyof ReportResults> {

View File

@@ -3,12 +3,10 @@ import Report from "../report.js";
import OverviewReport from "./overview.js";
import ConversationsReport from "./conversations.js";
import LogsReport from "./logs.js";
import ServicesReport from "./services.js";
import DMSearchReport from "./dm-search.js";
import ScrapperStatusReport from "./scrapper-status.js";
import ReceiverStatusReport from "./receiver-status.js";
import NetworkStatusReport from "./network-status.js";
import NotificationChannelsReport from "./notification-channels.js";
import EventsSummaryReport from "./events-summary.js";
@@ -17,12 +15,10 @@ const REPORT_CLASSES: {
} = {
OVERVIEW: OverviewReport,
CONVERSATIONS: ConversationsReport,
LOGS: LogsReport,
SERVICES: ServicesReport,
DM_SEARCH: DMSearchReport,
SCRAPPER_STATUS: ScrapperStatusReport,
RECEIVER_STATUS: ReceiverStatusReport,
NETWORK_STATUS: NetworkStatusReport,
NOTIFICATION_CHANNELS: NotificationChannelsReport,
EVENTS_SUMMARY: EventsSummaryReport,
};

View File

@@ -1,23 +0,0 @@
import { ReportArguments } from "@satellite-earth/core/types";
import { LogEntry } from "../../log-store/log-store.js";
import Report from "../report.js";
/** WARNING: be careful of calling this.log in this class. it could trigger an infinite loop of logging */
export default class LogsReport extends Report<"LOGS"> {
readonly type = "LOGS";
async setup() {
const listener = (entry: LogEntry) => {
if (!this.args?.service || entry.service === this.args.service) this.send(entry);
};
this.app.logStore.on("log", listener);
return () => this.app.logStore.off("log", listener);
}
async execute(args: ReportArguments["LOGS"]) {
const logs = this.app.logStore.getLogs({ service: args.service, limit: 500 });
for (const entry of logs) this.send(entry);
}
}

View File

@@ -1,145 +0,0 @@
import { interval, map } from "rxjs";
import { Report } from "../manager.js";
import { default as OldReport } from "../report.js";
import { inboundNetwork, outboundNetwork } from "../../../services/network.js";
export default class NetworkStatusReport extends OldReport<"NETWORK_STATUS"> {
readonly type = "NETWORK_STATUS";
update() {
const torIn = this.app.inboundNetwork.tor;
const torOut = this.app.outboundNetwork.tor;
const hyperIn = this.app.inboundNetwork.hyper;
const hyperOut = this.app.outboundNetwork.hyper;
const i2pIn = this.app.inboundNetwork.i2p;
const i2pOut = this.app.outboundNetwork.i2p;
this.send({
tor: {
inbound: {
available: torIn.available,
running: torIn.running,
error: torIn.error?.message,
address: torIn.address,
},
outbound: {
available: torOut.available,
running: torOut.running,
error: torOut.error?.message,
},
},
hyper: {
inbound: {
available: hyperIn.available,
running: hyperIn.running,
error: hyperIn.error?.message,
address: hyperIn.address,
},
outbound: {
available: hyperOut.available,
running: hyperOut.running,
error: hyperOut.error?.message,
},
},
i2p: {
inbound: {
available: i2pIn.available,
running: i2pIn.running,
error: i2pIn.error?.message,
address: i2pIn.address,
},
outbound: {
available: i2pOut.available,
running: i2pOut.running,
error: i2pOut.error?.message,
},
},
});
}
async setup() {
const listener = this.update.bind(this);
// NOTE: set and interval since there are not events to listen to yet
const i = setInterval(listener, 1000);
return () => clearInterval(i);
}
async execute(args: {}): Promise<void> {
this.update();
}
}
export type NetworkOutboundState = {
available: boolean;
running?: boolean;
error?: string;
address?: string;
};
export type NetworkInboundState = {
available: boolean;
running?: boolean;
error?: string;
};
export type NetworkState = {
inbound: NetworkInboundState;
outbound: NetworkInboundState;
};
export type NetworkStateResult = {
tor: NetworkState;
hyper: NetworkState;
i2p: NetworkState;
};
const NetworkStateReport: Report<{}, NetworkStateResult> = () =>
interval(1000).pipe(
map(() => {
const inbound = inboundNetwork;
const outbound = outboundNetwork;
return {
tor: {
inbound: {
available: inbound.tor.available,
running: inbound.tor.running,
error: inbound.tor.error?.message,
address: inbound.tor.address,
},
outbound: {
available: outbound.tor.available,
running: outbound.tor.running,
error: outbound.tor.error?.message,
},
},
hyper: {
inbound: {
available: inbound.hyper.available,
running: inbound.hyper.running,
error: inbound.hyper.error?.message,
address: inbound.hyper.address,
},
outbound: {
available: outbound.hyper.available,
running: outbound.hyper.running,
error: outbound.hyper.error?.message,
},
},
i2p: {
inbound: {
available: inbound.i2p.available,
running: inbound.i2p.running,
error: inbound.i2p.error?.message,
address: inbound.i2p.address,
},
outbound: {
available: outbound.i2p.available,
running: outbound.i2p.running,
error: outbound.i2p.error?.message,
},
},
};
}),
);
// export default NetworkStateReport