diff --git a/src/app/index.ts b/src/app/index.ts index a8501c5..20b1613 100644 --- a/src/app/index.ts +++ b/src/app/index.ts @@ -5,6 +5,7 @@ import { kinds } from "nostr-tools"; import { AbstractRelay } from "nostr-tools/abstract-relay"; import express, { Express } from "express"; import { EventEmitter } from "events"; +import { filter } from "rxjs"; import cors from "cors"; import { logger } from "../logger.js"; @@ -52,6 +53,9 @@ import { server } from "../services/server.js"; import { SQLiteEventStore } from "../sqlite/event-store.js"; import { NostrRelay } from "../relay/nostr-relay.js"; import { getDMRecipient } from "../helpers/nostr/dms.js"; +import { onConnection, onJSONMessage } from "../helpers/ws.js"; +import QueryManager from "../modules/queries/manager.js"; +import "../modules/queries/queries/index.js"; type EventMap = { listening: []; @@ -205,6 +209,7 @@ export default class App extends EventEmitter { this.control.registerHandler(new NotificationActions(this)); this.control.registerHandler(new RemoteAuthActions(this)); this.control.registerHandler(new DecryptionCacheActions(this)); + this.control.registerHandler(new LogsActions(this)); // reports @@ -217,6 +222,16 @@ export default class App extends EventEmitter { // if process has an RPC interface, attach control api to it if (process.send) this.control.attachToProcess(process); + // queries + onConnection(this.wss).subscribe((ws) => { + const manager = new QueryManager(ws); + const sub = onJSONMessage(ws) + .pipe(filter((m) => Array.isArray(m))) + .subscribe(manager.messages); + + ws.once("close", () => sub.unsubscribe()); + }); + this.relay = new NostrRelay(this.eventStore); this.relay.sendChallenge = true; this.relay.requireRelayInAuth = false; diff --git a/src/helpers/ws.ts b/src/helpers/ws.ts new file mode 100644 index 0000000..18e004b --- /dev/null +++ b/src/helpers/ws.ts @@ -0,0 +1,24 @@ +import { Observable } from "rxjs"; +import { WebSocketServer, WebSocket } from "ws"; + +export function onConnection(wss: WebSocketServer): Observable { + return new Observable((observer) => { + const listener = (ws: WebSocket) => observer.next(ws); + + wss.on("connection", listener); + return () => wss.off("connection", listener); + }); +} + +export function onJSONMessage(ws: WebSocket) { + return new Observable((observer) => { + const listener = (message: string | Buffer) => { + try { + observer.next(JSON.parse(String(message))); + } catch (error) {} + }; + + ws.on("message", listener); + return () => ws.off("message", listener); + }); +} diff --git a/src/index.ts b/src/index.ts index d4eb28f..a8e0b02 100644 --- a/src/index.ts +++ b/src/index.ts @@ -77,3 +77,13 @@ async function shutdown() { } process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown); + +// catch unhandled errors +process.on("uncaughtException", (error) => { + console.error("Uncaught Exception:", error); +}); + +// 2. Catch unhandled promise rejections +process.on("unhandledRejection", (reason, promise) => { + console.error("Unhandled Promise Rejection:", reason); +}); diff --git a/src/modules/queries/manager.ts b/src/modules/queries/manager.ts new file mode 100644 index 0000000..f201aa0 --- /dev/null +++ b/src/modules/queries/manager.ts @@ -0,0 +1,71 @@ +import WebSocket from "ws"; +import { Observable, Subject, Subscription } from "rxjs"; +import { logger } from "../../logger.js"; +import { Query, QueryClose, QueryData, QueryError } from "./types.js"; + +export default class QueryManager { + static types = new Map>(); + + log = logger.extend("QueryManager"); + + // incoming messages + messages = new Subject(); + + // active queries + queries = new Map(); + + constructor(public socket: Socket) { + this.messages.subscribe((message) => { + if (message[0] === "QRY") { + try { + switch (message[1]) { + case "OPEN": + this.openQuery(message[2], message[3], message[4]); + break; + case "CLOSE": + this.closeQuery(message[2]); + break; + } + } catch (error) { + if (error instanceof Error) this.log(`Failed to handle QRY message ${error.message}`); + } + } + }); + } + + openQuery(type: string, id: string, args: T) { + if (!this.queries.has(id)) { + try { + const queryType = QueryManager.types.get(type); + if (!queryType) throw new Error(`Cant find query type ${type}`); + + const sub = queryType(args, this.socket).subscribe({ + next: (result) => this.send(["QRY", "DATA", id, result]), + error: (err) => { + if (err instanceof Error) this.send(["QRY", "ERR", id, err.message]); + }, + complete: () => this.send(["QRY", "CLOSE", id]), + }); + + this.queries.set(id, sub); + } catch (error) { + if (error instanceof Error) this.send(["QRY", "ERR", id, error.message]); + throw error; + } + } + } + + closeQuery(id: string) { + const sub = this.queries.get(id); + + if (sub) { + // stop the query + sub.unsubscribe(); + this.queries.delete(id); + } + } + + protected send(message: QueryData | QueryError | QueryClose) { + this.socket.send(JSON.stringify(message)); + } +} diff --git a/src/modules/queries/queries/config.ts b/src/modules/queries/queries/config.ts new file mode 100644 index 0000000..7f6bd62 --- /dev/null +++ b/src/modules/queries/queries/config.ts @@ -0,0 +1,13 @@ +import { Observable } from "rxjs"; +import { PrivateNodeConfig } from "@satellite-earth/core/types"; + +import { Query } from "../types.js"; +import config from "../../../services/config.js"; + +export const ConfigQuery: Query = () => + new Observable((observer) => { + observer.next(config.data); + const listener = (c: PrivateNodeConfig) => observer.next(c); + config.on("updated", listener); + return () => config.off("updated", listener); + }); diff --git a/src/modules/queries/queries/index.ts b/src/modules/queries/queries/index.ts new file mode 100644 index 0000000..1acb843 --- /dev/null +++ b/src/modules/queries/queries/index.ts @@ -0,0 +1,10 @@ +import QueryManager from "../manager.js"; +import { ConfigQuery } from "./config.js"; +import { LogsQuery } from "./logs.js"; +import NetworkStateQuery from "./network-status.js"; +import { ServicesQuery } from "./services.js"; + +QueryManager.types.set("network-status", NetworkStateQuery); +QueryManager.types.set("logs", LogsQuery); +QueryManager.types.set("services", ServicesQuery); +QueryManager.types.set("config", ConfigQuery); diff --git a/src/modules/queries/queries/logs.ts b/src/modules/queries/queries/logs.ts new file mode 100644 index 0000000..00395e8 --- /dev/null +++ b/src/modules/queries/queries/logs.ts @@ -0,0 +1,22 @@ +import { filter, from, fromEvent, merge } from "rxjs"; +import { Query } from "../types.js"; +import logStore from "../../../services/log-store.js"; +import { LogEntry } from "../../log-store/log-store.js"; + +export const LogsQuery: Query = (args: { + service?: string; + since?: number; + until?: number; + limit?: number; +}) => + merge( + // get last 500 lines + from(logStore.getLogs({ service: args.service, limit: 500 })), + // subscribe to new logs + fromEvent(logStore, "log").pipe( + // only return logs that match args + filter((entry) => { + return !args?.service || entry.service === args.service; + }), + ), + ); diff --git a/src/modules/queries/queries/network-status.ts b/src/modules/queries/queries/network-status.ts new file mode 100644 index 0000000..042f120 --- /dev/null +++ b/src/modules/queries/queries/network-status.ts @@ -0,0 +1,76 @@ +import { interval, map } from "rxjs"; +import { inboundNetwork, outboundNetwork } from "../../../services/network.js"; +import { Query } from "../types.js"; + +export type NetworkOutboundState = { + available: boolean; + running?: boolean; + error?: string; +}; +export type NetworkInboundState = { + available: boolean; + running?: boolean; + error?: string; + address?: string; +}; +export type NetworkState = { + inbound: NetworkInboundState; + outbound: NetworkInboundState; +}; +export type NetworkStateResult = { + tor: NetworkState; + hyper: NetworkState; + i2p: NetworkState; +}; + +const NetworkStateQuery: Query = () => + interval(1000).pipe( + map(() => { + const inbound = inboundNetwork; + const outbound = outboundNetwork; + + return { + tor: { + inbound: { + available: inbound.tor.available, + running: inbound.tor.running, + error: inbound.tor.error?.message, + address: inbound.tor.address, + }, + outbound: { + available: outbound.tor.available, + running: outbound.tor.running, + error: outbound.tor.error?.message, + }, + }, + hyper: { + inbound: { + available: inbound.hyper.available, + running: inbound.hyper.running, + error: inbound.hyper.error?.message, + address: inbound.hyper.address, + }, + outbound: { + available: outbound.hyper.available, + running: outbound.hyper.running, + error: outbound.hyper.error?.message, + }, + }, + i2p: { + inbound: { + available: inbound.i2p.available, + running: inbound.i2p.running, + error: inbound.i2p.error?.message, + address: inbound.i2p.address, + }, + outbound: { + available: outbound.i2p.available, + running: outbound.i2p.running, + error: outbound.i2p.error?.message, + }, + }, + }; + }), + ); + +export default NetworkStateQuery; diff --git a/src/modules/queries/queries/services.ts b/src/modules/queries/queries/services.ts new file mode 100644 index 0000000..0ed6c7e --- /dev/null +++ b/src/modules/queries/queries/services.ts @@ -0,0 +1,6 @@ +import { from, merge, NEVER } from "rxjs"; +import database from "../../../services/database.js"; +import { Query } from "../types.js"; + +export const ServicesQuery: Query = () => + merge(NEVER, from(database.db.prepare<[], { id: string }>(`SELECT service as id FROM logs GROUP BY service`).all())); diff --git a/src/modules/queries/types.ts b/src/modules/queries/types.ts new file mode 100644 index 0000000..37c08a4 --- /dev/null +++ b/src/modules/queries/types.ts @@ -0,0 +1,14 @@ +import { Observable } from "rxjs"; +import WebSocket from "ws"; + +export type Query = (args: T, socket: WebSocket) => Observable; + +// open query messages (id, type, args) +export type QueryOpen = ["QRY", "OPEN", string, string, Args]; +// close query message (id) +export type QueryClose = ["QRY", "CLOSE", string]; + +// error messages (id, message) +export type QueryError = ["QRY", "ERR", string, string]; +// result message (id, data) +export type QueryData = ["QRY", "DATA", string, Result]; diff --git a/src/modules/reports/manager.ts b/src/modules/reports/manager.ts index 5fc4e7c..3c9e315 100644 --- a/src/modules/reports/manager.ts +++ b/src/modules/reports/manager.ts @@ -4,8 +4,10 @@ import hash_sum from "hash-sum"; import { Session } from "../../relay/session.js"; import SuperMap from "../../helpers/super-map.js"; +export type Query = (args: T, socket: WebSocket) => Observable; + // open query messages (id, type, args) -export type QueryOpen> = ["QRY", "OPEN", string, string, Args]; +export type QueryOpen = ["QRY", "OPEN", string, string, Args]; // close query message (id) export type QueryClose = ["QRY", "CLOSE", string]; diff --git a/src/modules/reports/report.ts b/src/modules/reports/report.ts index a79a104..4128b47 100644 --- a/src/modules/reports/report.ts +++ b/src/modules/reports/report.ts @@ -1,10 +1,13 @@ import { WebSocket } from "ws"; +import { Observable } from "rxjs"; import { ReportErrorMessage, ReportResultMessage } from "@satellite-earth/core/types/control-api/reports.js"; import { ReportArguments, ReportResults } from "@satellite-earth/core/types"; import type App from "../../app/index.js"; import { logger } from "../../logger.js"; +export type NewReport = (socket: WebSocket | NodeJS.Process) => Observable; + type f = () => void; export default class Report { diff --git a/src/modules/reports/reports/index.ts b/src/modules/reports/reports/index.ts index 75387f3..4763bf6 100644 --- a/src/modules/reports/reports/index.ts +++ b/src/modules/reports/reports/index.ts @@ -3,12 +3,10 @@ import Report from "../report.js"; import OverviewReport from "./overview.js"; import ConversationsReport from "./conversations.js"; -import LogsReport from "./logs.js"; import ServicesReport from "./services.js"; import DMSearchReport from "./dm-search.js"; import ScrapperStatusReport from "./scrapper-status.js"; import ReceiverStatusReport from "./receiver-status.js"; -import NetworkStatusReport from "./network-status.js"; import NotificationChannelsReport from "./notification-channels.js"; import EventsSummaryReport from "./events-summary.js"; @@ -17,12 +15,10 @@ const REPORT_CLASSES: { } = { OVERVIEW: OverviewReport, CONVERSATIONS: ConversationsReport, - LOGS: LogsReport, SERVICES: ServicesReport, DM_SEARCH: DMSearchReport, SCRAPPER_STATUS: ScrapperStatusReport, RECEIVER_STATUS: ReceiverStatusReport, - NETWORK_STATUS: NetworkStatusReport, NOTIFICATION_CHANNELS: NotificationChannelsReport, EVENTS_SUMMARY: EventsSummaryReport, }; diff --git a/src/modules/reports/reports/logs.ts b/src/modules/reports/reports/logs.ts deleted file mode 100644 index fc8ca3b..0000000 --- a/src/modules/reports/reports/logs.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { ReportArguments } from "@satellite-earth/core/types"; - -import { LogEntry } from "../../log-store/log-store.js"; -import Report from "../report.js"; - -/** WARNING: be careful of calling this.log in this class. it could trigger an infinite loop of logging */ -export default class LogsReport extends Report<"LOGS"> { - readonly type = "LOGS"; - - async setup() { - const listener = (entry: LogEntry) => { - if (!this.args?.service || entry.service === this.args.service) this.send(entry); - }; - - this.app.logStore.on("log", listener); - return () => this.app.logStore.off("log", listener); - } - - async execute(args: ReportArguments["LOGS"]) { - const logs = this.app.logStore.getLogs({ service: args.service, limit: 500 }); - for (const entry of logs) this.send(entry); - } -} diff --git a/src/modules/reports/reports/network-status.ts b/src/modules/reports/reports/network-status.ts deleted file mode 100644 index 2ab7787..0000000 --- a/src/modules/reports/reports/network-status.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { interval, map } from "rxjs"; -import { Report } from "../manager.js"; -import { default as OldReport } from "../report.js"; -import { inboundNetwork, outboundNetwork } from "../../../services/network.js"; - -export default class NetworkStatusReport extends OldReport<"NETWORK_STATUS"> { - readonly type = "NETWORK_STATUS"; - - update() { - const torIn = this.app.inboundNetwork.tor; - const torOut = this.app.outboundNetwork.tor; - const hyperIn = this.app.inboundNetwork.hyper; - const hyperOut = this.app.outboundNetwork.hyper; - const i2pIn = this.app.inboundNetwork.i2p; - const i2pOut = this.app.outboundNetwork.i2p; - - this.send({ - tor: { - inbound: { - available: torIn.available, - running: torIn.running, - error: torIn.error?.message, - address: torIn.address, - }, - outbound: { - available: torOut.available, - running: torOut.running, - error: torOut.error?.message, - }, - }, - hyper: { - inbound: { - available: hyperIn.available, - running: hyperIn.running, - error: hyperIn.error?.message, - address: hyperIn.address, - }, - outbound: { - available: hyperOut.available, - running: hyperOut.running, - error: hyperOut.error?.message, - }, - }, - i2p: { - inbound: { - available: i2pIn.available, - running: i2pIn.running, - error: i2pIn.error?.message, - address: i2pIn.address, - }, - outbound: { - available: i2pOut.available, - running: i2pOut.running, - error: i2pOut.error?.message, - }, - }, - }); - } - - async setup() { - const listener = this.update.bind(this); - - // NOTE: set and interval since there are not events to listen to yet - const i = setInterval(listener, 1000); - - return () => clearInterval(i); - } - - async execute(args: {}): Promise { - this.update(); - } -} - -export type NetworkOutboundState = { - available: boolean; - running?: boolean; - error?: string; - address?: string; -}; -export type NetworkInboundState = { - available: boolean; - running?: boolean; - error?: string; -}; -export type NetworkState = { - inbound: NetworkInboundState; - outbound: NetworkInboundState; -}; -export type NetworkStateResult = { - tor: NetworkState; - hyper: NetworkState; - i2p: NetworkState; -}; - -const NetworkStateReport: Report<{}, NetworkStateResult> = () => - interval(1000).pipe( - map(() => { - const inbound = inboundNetwork; - const outbound = outboundNetwork; - - return { - tor: { - inbound: { - available: inbound.tor.available, - running: inbound.tor.running, - error: inbound.tor.error?.message, - address: inbound.tor.address, - }, - outbound: { - available: outbound.tor.available, - running: outbound.tor.running, - error: outbound.tor.error?.message, - }, - }, - hyper: { - inbound: { - available: inbound.hyper.available, - running: inbound.hyper.running, - error: inbound.hyper.error?.message, - address: inbound.hyper.address, - }, - outbound: { - available: outbound.hyper.available, - running: outbound.hyper.running, - error: outbound.hyper.error?.message, - }, - }, - i2p: { - inbound: { - available: inbound.i2p.available, - running: inbound.i2p.running, - error: inbound.i2p.error?.message, - address: inbound.i2p.address, - }, - outbound: { - available: outbound.i2p.available, - running: outbound.i2p.running, - error: outbound.i2p.error?.message, - }, - }, - }; - }), - ); - -// export default NetworkStateReport