fork and copy @satellite/personal-node

This commit is contained in:
hzrd149
2025-01-14 17:13:09 -06:00
parent 7dd48ce262
commit 11739c955d
89 changed files with 9417 additions and 0 deletions

8
.changeset/README.md Normal file
View File

@@ -0,0 +1,8 @@
# Changesets
Hello and welcome! This folder has been automatically generated by `@changesets/cli`, a build tool that works
with multi-package repos, or single-package repos to help you version and publish your code. You can
find the full documentation for it [in our repository](https://github.com/changesets/changesets)
We have a quick list of common questions to get you started engaging with this project in
[our documentation](https://github.com/changesets/changesets/blob/main/docs/common-questions.md)

11
.changeset/config.json Normal file
View File

@@ -0,0 +1,11 @@
{
"$schema": "https://unpkg.com/@changesets/config@3.0.5/schema.json",
"changelog": "@changesets/cli/changelog",
"commit": false,
"fixed": [],
"linked": [],
"access": "public",
"baseBranch": "master",
"updateInternalDependencies": "patch",
"ignore": []
}

5
.dockerignore Normal file
View File

@@ -0,0 +1,5 @@
/dist
/nostrudel/dist
/data
node_modules
/nostrudel/node_modules

18
.env.example Normal file
View File

@@ -0,0 +1,18 @@
# where to store the relays data
DATA_PATH=./data
# the port to use
PORT=3000
# the address to the tor SOCKS5 proxy to enable connections to .onion addresses
# TOR_PROXY="127.0.0.1:9050"
# tor proxy type, SOCKS5 or HTTP
# TOR_PROXY_TYPE="SOCKS5"
# the address to the i2p SOCKS5 proxy to enable connections to .i2p addresses
# I2P_PROXY="127.0.0.1:4447"
# I@P proxy type, SOCKS5 or HTTP
# I2P_PROXY_TYPE="SOCKS5"
# sets a hardcoded tor address
# TOR_ADDRESS="http://xxxxxxxxxxxx.onion"

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
node_modules
.DS_Store
.env
/data
dist

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "nostrudel"]
path = nostrudel
url = https://github.com/hzrd149/nostrudel.git

1
.nvmrc Normal file
View File

@@ -0,0 +1 @@
22

5
.prettierrc Normal file
View File

@@ -0,0 +1,5 @@
{
"tabWidth": 2,
"useTabs": false,
"printWidth": 120
}

5
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"search.exclude": {
"**/nostrudel": true
}
}

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2023 hzrd149
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1 +1,3 @@
# bakery
A relay backend for noStrudel

29
dockerfile Normal file
View File

@@ -0,0 +1,29 @@
# syntax=docker/dockerfile:1
FROM node:22-slim AS base
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
RUN corepack enable
WORKDIR /app
COPY . /app
FROM base AS web
RUN --mount=type=cache,id=pnpm,target=/pnpm/store cd nostrudel && pnpm install
RUN cd nostrudel && pnpm build
FROM base AS build
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install
RUN pnpm run build
FROM base
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --prod
COPY --from=build /app/dist /app/dist
COPY --from=web /app/nostrudel/dist /app/public
VOLUME [ "/app/data" ]
EXPOSE 3000
ENV PORT="3000"
CMD [ "node", "." ]

1
nostrudel Submodule

Submodule nostrudel added at 831712bf34

80
package.json Normal file
View File

@@ -0,0 +1,80 @@
{
"name": "bakery",
"version": "0.1.0",
"description": "A relay backend for noStrudel",
"type": "module",
"bin": "dist/index.js",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"prepack": "pnpm build",
"start": "node .",
"dev": "nodemon --loader @swc-node/register/esm src/index.ts",
"build": "tsc",
"format": "prettier -w . --ignore-path .gitignore"
},
"files": [
"dist",
"nostrudel/dist"
],
"keywords": [
"nostr",
"relay"
],
"author": "hzrd149",
"license": "MIT",
"dependencies": {
"@diva.exchange/i2p-sam": "^5.4.1",
"@noble/hashes": "^1.7.0",
"@satellite-earth/core": "^0.5.0",
"applesauce-core": "^0.10.0",
"applesauce-signer": "^0.10.0",
"better-sqlite3": "^11.7.2",
"blossom-client-sdk": "^2.1.1",
"cors": "^2.8.5",
"dayjs": "^1.11.13",
"debug": "^4.4.0",
"dotenv": "^16.4.7",
"express": "^4.21.2",
"get-port": "^7.1.0",
"holesail-server": "^1.4.4",
"hyper-address": "^0.1.3",
"hyper-socks5-proxy": "^0.1.2",
"hyperdht": "^6.20.1",
"import-meta-resolve": "^4.1.0",
"lodash.throttle": "^4.1.1",
"lowdb": "^7.0.1",
"mkdirp": "^3.0.1",
"nanoid": "^5.0.9",
"nostr-tools": "^2.10.4",
"pac-proxy-agent": "^7.1.0",
"process-streams": "^1.0.3",
"streamx": "^2.21.1",
"unique-names-generator": "^4.7.1",
"web-push": "^3.6.7",
"ws": "^8.18.0"
},
"devDependencies": {
"@changesets/cli": "^2.27.11",
"@swc-node/register": "^1.10.9",
"@swc/core": "^1.10.7",
"@types/better-sqlite3": "^7.6.12",
"@types/cors": "^2.8.17",
"@types/debug": "^4.1.12",
"@types/express": "^4.17.21",
"@types/lodash.throttle": "^4.1.9",
"@types/node": "^22.10.6",
"@types/web-push": "^3.6.4",
"@types/ws": "^8.5.13",
"nodemon": "^3.1.9",
"prettier": "^3.4.2",
"typescript": "^5.7.3"
},
"nodemonConfig": {
"ignore": [
"data/**"
],
"exec": "node",
"signal": "SIGTERM"
}
}

3915
pnpm-lock.yaml generated Normal file

File diff suppressed because it is too large Load Diff

0
public/.gitkeep Normal file
View File

88
src/app/database.ts Normal file
View File

@@ -0,0 +1,88 @@
import EventEmitter from "events";
import Database, { type Database as SQLDatabase } from "better-sqlite3";
import { fileURLToPath } from "url";
import path from "path";
import fs from "fs";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
export type LocalDatabaseConfig = {
directory: string;
name: string;
wal: boolean;
};
export default class LocalDatabase extends EventEmitter {
config: LocalDatabaseConfig;
path: { main: string; shm: string; wal: string };
db: SQLDatabase;
constructor(config: Partial<LocalDatabaseConfig>) {
super();
this.config = {
directory: "data",
name: "events",
wal: true,
...config,
};
this.path = {
main: path.join(this.config.directory, `${this.config.name}.db`),
shm: path.join(this.config.directory, `${this.config.name}.db-shm`),
wal: path.join(this.config.directory, `${this.config.name}.db-wal`),
};
// Detect architecture to pass the correct native sqlite module
this.db = new Database(this.path.main);
if (this.config.wal) this.db.pragma("journal_mode = WAL");
}
hasTable(table: string) {
const result = this.db
.prepare(`SELECT COUNT(*) as count FROM sqlite_master WHERE type='table' AND name=?`)
.get([table]) as { count: number };
return result.count > 0;
}
// Delete all events in the database
/** @deprecated this should not be used */
clear() {
this.db.transaction(() => {
this.db.prepare(`DELETE FROM tags`).run();
if (this.hasTable("event_labels")) this.db.prepare(`DELETE FROM event_labels`).run();
this.db.prepare(`DELETE FROM events`).run();
})();
}
// Get number of events in the database
/** @deprecated this should be moved to a report */
count() {
const result = this.db.prepare(`SELECT COUNT(*) AS events FROM events`).get() as { events: number };
return result.events;
}
// Get total size of the database on disk
size() {
let sum;
try {
const statMain = fs.statSync(this.path.main).size;
const statShm = this.config.wal ? fs.statSync(this.path.shm).size : 0;
const statWal = this.config.wal ? fs.statSync(this.path.wal).size : 0;
sum = statMain + statShm + statWal;
} catch (err) {
console.log(err);
}
return sum;
}
destroy() {
this.removeAllListeners();
}
}

410
src/app/index.ts Normal file
View File

@@ -0,0 +1,410 @@
import path from "path";
import { WebSocketServer } from "ws";
import { createServer, Server } from "http";
import { IEventStore, NostrRelay, SQLiteEventStore } from "@satellite-earth/core";
import { getDMRecipient } from "@satellite-earth/core/helpers/nostr";
import { kinds } from "nostr-tools";
import { AbstractRelay } from "nostr-tools/abstract-relay";
import express, { Express } from "express";
import { EventEmitter } from "events";
import { SimpleSigner } from "applesauce-signer/signers/simple-signer";
import cors from "cors";
import { logger } from "../logger.js";
import Database from "./database.js";
import { NIP_11_SOFTWARE_URL, SENSITIVE_KINDS } from "../const.js";
import { OWNER_PUBKEY, PORT } from "../env.js";
import ConfigManager from "../modules/config-manager.js";
import ControlApi from "../modules/control/control-api.js";
import ConfigActions from "../modules/control/config-actions.js";
import ReceiverActions from "../modules/control/receiver-actions.js";
import Receiver from "../modules/receiver/index.js";
import DatabaseActions from "../modules/control/database-actions.js";
import DirectMessageManager from "../modules/direct-message-manager.js";
import DirectMessageActions from "../modules/control/dm-actions.js";
import AddressBook from "../modules/address-book.js";
import NotificationsManager from "../modules/notifications/notifications-manager.js";
import NotificationActions from "../modules/control/notification-actions.js";
import ProfileBook from "../modules/profile-book.js";
import ContactBook from "../modules/contact-book.js";
import CautiousPool from "../modules/cautious-pool.js";
import RemoteAuthActions from "../modules/control/remote-auth-actions.js";
import ReportActions from "../modules/control/report-actions.js";
import LogStore from "../modules/log-store/log-store.js";
import DecryptionCache from "../modules/decryption-cache/decryption-cache.js";
import DecryptionCacheActions from "../modules/control/decryption-cache.js";
import Scrapper from "../modules/scrapper/index.js";
import LogsActions from "../modules/control/logs-actions.js";
import ApplicationStateManager from "../modules/state/application-state-manager.js";
import ScrapperActions from "../modules/control/scrapper-actions.js";
import InboundNetworkManager from "../modules/network/inbound/index.js";
import SecretsManager from "../modules/secrets-manager.js";
import outboundNetwork, { OutboundNetworkManager } from "../modules/network/outbound/index.js";
import Switchboard from "../modules/switchboard/switchboard.js";
import Gossip from "../modules/gossip.js";
type EventMap = {
listening: [];
};
export default class App extends EventEmitter<EventMap> {
running = false;
config: ConfigManager;
secrets: SecretsManager;
state: ApplicationStateManager;
signer: SimpleSigner;
server: Server;
wss: WebSocketServer;
express: Express;
inboundNetwork: InboundNetworkManager;
outboundNetwork: OutboundNetworkManager;
database: Database;
eventStore: IEventStore;
logStore: LogStore;
relay: NostrRelay;
receiver: Receiver;
scrapper: Scrapper;
control: ControlApi;
reports: ReportActions;
pool: CautiousPool;
addressBook: AddressBook;
profileBook: ProfileBook;
contactBook: ContactBook;
directMessageManager: DirectMessageManager;
notifications: NotificationsManager;
decryptionCache: DecryptionCache;
switchboard: Switchboard;
gossip: Gossip;
constructor(dataPath: string) {
super();
this.config = new ConfigManager(path.join(dataPath, "node.json"));
this.config.read();
this.secrets = new SecretsManager(path.join(dataPath, "secrets.json"));
this.secrets.read();
this.signer = new SimpleSigner(this.secrets.get("nostrKey"));
// copy the vapid public key over to config so the web ui can access it
// TODO: this should be moved to another place
this.secrets.on("updated", () => {
this.config.data.vapidPublicKey = this.secrets.get("vapidPublicKey");
if (this.signer.key !== this.secrets.get("nostrKey")) {
this.signer = new SimpleSigner(this.secrets.get("nostrKey"));
}
});
// set owner pubkey from env variable
if (!this.config.data.owner && OWNER_PUBKEY) {
this.config.setField("owner", OWNER_PUBKEY);
}
// create http and ws server interface
this.server = createServer();
this.inboundNetwork = new InboundNetworkManager(this);
this.outboundNetwork = outboundNetwork;
/** make the outbound network reflect the app config */
this.outboundNetwork.listenToAppConfig(this.config);
// setup express
this.express = express();
this.express.use(cors());
this.setupExpress();
// pass requests to express server
this.server.on("request", this.express);
// create websocket server
this.wss = new WebSocketServer({ server: this.server });
// Fix CORS for websocket
this.wss.on("headers", (headers, request) => {
headers.push("Access-Control-Allow-Origin: *");
});
// Init embedded sqlite database
this.database = new Database({ directory: dataPath });
// create log managers
this.logStore = new LogStore(this.database.db);
this.logStore.setup();
this.state = new ApplicationStateManager(this.database.db);
this.state.setup();
// Recognize local relay by matching auth string
this.pool = new CautiousPool((relay: AbstractRelay, challenge: string) => {
for (const [socket, auth] of this.relay.auth) {
if (auth.challenge === challenge) return true;
}
return false;
});
// Initialize the event store
this.eventStore = new SQLiteEventStore(this.database.db);
this.eventStore.setup();
// setup decryption cache
this.decryptionCache = new DecryptionCache(this.database.db);
this.decryptionCache.setup();
// Setup managers user contacts and profiles
this.addressBook = new AddressBook(this);
this.profileBook = new ProfileBook(this);
this.contactBook = new ContactBook(this);
// Setup the notifications manager
this.notifications = new NotificationsManager(this /*this.eventStore, this.state*/);
this.notifications.webPushKeys = {
publicKey: this.secrets.get("vapidPublicKey"),
privateKey: this.secrets.get("vapidPrivateKey"),
};
this.notifications.setup();
this.eventStore.on("event:inserted", (event) => {
if (this.notifications.shouldNotify(event)) this.notifications.notify(event);
});
// Initializes receiver and scrapper for pulling data from remote relays
this.receiver = new Receiver(this);
this.receiver.on("event", (event) => this.eventStore.addEvent(event));
this.scrapper = new Scrapper(this);
this.scrapper.setup();
// pass events from the scrapper to the event store
this.scrapper.on("event", (event) => this.eventStore.addEvent(event));
// Initializes direct message manager for subscribing to DMs
this.directMessageManager = new DirectMessageManager(this);
// set watchInbox for owner when config is loaded or changed
this.config.on("updated", (config) => {
if (config.owner) this.directMessageManager.watchInbox(config.owner);
});
// API for controlling the node
this.control = new ControlApi(this);
this.control.registerHandler(new ConfigActions(this));
this.control.registerHandler(new ReceiverActions(this));
this.control.registerHandler(new ScrapperActions(this));
this.control.registerHandler(new DatabaseActions(this));
this.control.registerHandler(new DirectMessageActions(this));
this.control.registerHandler(new NotificationActions(this));
this.control.registerHandler(new RemoteAuthActions(this));
this.control.registerHandler(new DecryptionCacheActions(this));
this.control.registerHandler(new LogsActions(this));
// reports
this.reports = new ReportActions(this);
this.control.registerHandler(this.reports);
// connect control api to websocket server
this.control.attachToServer(this.wss);
// if process has an RPC interface, attach control api to it
if (process.send) this.control.attachToProcess(process);
this.relay = new NostrRelay(this.eventStore);
this.relay.sendChallenge = true;
this.relay.requireRelayInAuth = false;
// NIP-66 gossip
this.gossip = new Gossip(this.inboundNetwork, this.signer, this.pool, this.relay, this.eventStore);
this.config.on("updated", (config) => {
this.gossip.interval = config.gossipInterval;
this.gossip.broadcastRelays = config.gossipBroadcastRelays;
if (config.gossipEnabled) this.gossip.start();
else this.gossip.stop();
});
// setup PROXY switchboard
this.switchboard = new Switchboard(this);
// attach switchboard to websocket server
this.wss.on("connection", (ws, request) => {
this.switchboard.handleConnection(ws, request);
});
// update profiles when conversations are opened
this.directMessageManager.on("open", (a, b) => {
this.profileBook.loadProfile(a, this.addressBook.getOutboxes(a));
this.profileBook.loadProfile(b, this.addressBook.getOutboxes(b));
});
// only allow the owner to NIP-42 authenticate with the relay
this.relay.checkAuth = (ws, auth) => {
// If owner is not set, update it to match the pubkey
// that signed the auth message. This allows the user
// to set the owner pubkey from the initial login when
// setting up their personal node (the owner pubkey may
// otherwise be set using the env var `OWNER_PUBKEY`)
if (!this.config.data.owner) {
this.config.update((config) => {
logger(`Owner is unset, setting owner to first NIP-42 auth: ${auth.pubkey}`);
config.owner = auth.pubkey;
});
return true;
}
if (auth.pubkey !== this.config.data.owner) return "Pubkey dose not match owner";
return true;
};
// when the owner NIP-42 authenticates with the relay pass it along to the control
this.relay.on("socket:auth", (ws, auth) => {
if (auth.pubkey === this.config.data.owner) {
this.control.authenticatedConnections.add(ws);
}
});
// if socket is unauthenticated only allow owner's events and incoming DMs
this.relay.registerEventHandler((ctx, next) => {
const auth = ctx.relay.getSocketAuth(ctx.socket);
if (!auth) {
// is it an incoming DM for the owner?
if (ctx.event.kind === kinds.EncryptedDirectMessage && getDMRecipient(ctx.event) === this.config.data.owner)
return next();
if (ctx.event.pubkey === this.config.data.owner) return next();
throw new Error(ctx.relay.makeAuthRequiredReason("This relay only accepts events from its owner"));
}
return next();
});
// handle forwarding direct messages by owner
this.relay.registerEventHandler(async (ctx, next) => {
if (ctx.event.kind === kinds.EncryptedDirectMessage && ctx.event.pubkey === this.config.data.owner) {
// send direct message
const results = await this.directMessageManager.forwardMessage(ctx.event);
if (!results || !results.some((p) => p.status === "fulfilled")) throw new Error("Failed to forward message");
return `Forwarded message to ${results.filter((p) => p.status === "fulfilled").length}/${results.length} relays`;
} else return next();
});
// block subscriptions for sensitive kinds unless NIP-42 auth or Auth Code
this.relay.registerSubscriptionFilter((ctx, next) => {
// always allow if authenticated with auth code
const isAuthenticatedWithAuthCode = this.control.authenticatedConnections.has(ctx.socket);
if (isAuthenticatedWithAuthCode) return next();
const hasSensitiveKinds = ctx.filters.some(
(filter) => filter.kinds && SENSITIVE_KINDS.some((k) => filter.kinds?.includes(k)),
);
if (hasSensitiveKinds) {
const auth = ctx.relay.getSocketAuth(ctx.socket);
if (!auth) throw new Error(ctx.relay.makeAuthRequiredReason("Cant view sensitive events without auth"));
}
return next();
});
// Handle possible additional actions when the event store receives a new message
this.eventStore.on("event:inserted", (event) => {
const loadProfile = (pubkey: string) => {
const profile = this.profileBook.getProfile(pubkey);
if (!profile) {
this.profileBook.loadProfile(pubkey, this.addressBook.getOutboxes(pubkey));
this.addressBook.loadOutboxes(pubkey).then((outboxes) => {
this.profileBook.loadProfile(pubkey, outboxes ?? undefined);
});
}
};
// Fetch profiles for all incoming DMs
switch (event.kind) {
case kinds.EncryptedDirectMessage:
loadProfile(event.pubkey);
break;
default:
loadProfile(event.pubkey);
break;
}
});
// Read the config again, this fires the "loaded" and "updated" events to synchronize all the other services
// NOTE: its important this is called last. otherwise any this.config.on("update") listeners above will note fire
this.config.read();
}
setupExpress() {
this.express.get("/health", (req, res) => {
res.status(200).send("Healthy");
});
// NIP-11
this.express.get("/", (req, res, next) => {
if (req.headers.accept === "application/nostr+json") {
res.send({
name: this.config.data.name,
description: this.config.data.description,
software: NIP_11_SOFTWARE_URL,
supported_nips: NostrRelay.SUPPORTED_NIPS,
pubkey: this.config.data.owner,
});
} else return next();
});
}
async start() {
this.running = true;
await this.config.read();
if (this.config.data.runReceiverOnBoot) this.receiver.start();
if (this.config.data.runScrapperOnBoot) this.scrapper.start();
this.tick();
// start http server listening
await new Promise<void>((res) => this.server.listen(PORT, () => res()));
logger(`Listening on`, PORT);
if (process.send) process.send({ type: "RELAY_READY" });
this.emit("listening");
await this.inboundNetwork.start();
}
tick() {
if (!this.running) return;
setTimeout(this.tick.bind(this), 100);
}
async stop() {
this.running = false;
this.config.write();
this.scrapper.stop();
this.receiver.stop();
await this.state.saveAll();
this.reports.cleanup();
this.relay.stop();
this.database.destroy();
this.receiver.destroy();
await this.inboundNetwork.stop();
await this.outboundNetwork.stop();
this.gossip.stop();
// wait for server to close
await new Promise<void>((res) => this.server.close(() => res()));
}
}

7
src/const.ts Normal file
View File

@@ -0,0 +1,7 @@
import { EncryptedDirectMessage } from "nostr-tools/kinds";
export const SENSITIVE_KINDS = [EncryptedDirectMessage];
export const NIP_11_SOFTWARE_URL = "git+https://github.com/hzrd149/bakery.git";
export const OUTBOUND_PROXY_TYPES = ["SOCKS5", "HTTP"];

33
src/env.ts Normal file
View File

@@ -0,0 +1,33 @@
import "dotenv/config.js";
import { OUTBOUND_PROXY_TYPES } from "./const.js";
import { safeRelayUrls } from "applesauce-core/helpers";
import { normalizeToHexPubkey } from "./helpers/nip19.js";
export const OWNER_PUBKEY = process.env.OWNER_PUBKEY ? normalizeToHexPubkey(process.env.OWNER_PUBKEY) : undefined;
export const PUBLIC_ADDRESS = process.env.PUBLIC_ADDRESS;
export const DATA_PATH = process.env.DATA_PATH || "./data";
export const PORT = parseInt(process.env.PORT ?? "") || 3000;
// I2P config
export const I2P_PROXY = process.env.I2P_PROXY;
export const I2P_PROXY_TYPE = (process.env.I2P_PROXY_TYPE || "SOCKS5") as "SOCKS5" | "HTTP";
export const I2P_SAM_ADDRESS = process.env.I2P_SAM_ADDRESS;
if (!OUTBOUND_PROXY_TYPES.includes(I2P_PROXY_TYPE)) throw new Error("Invalid I2P_PROXY_TYPE, must be SOCKS5 or HTTP");
// Tor config
export const TOR_PROXY = process.env.TOR_PROXY;
export const TOR_PROXY_TYPE = (process.env.TOR_PROXY_TYPE || "SOCKS5") as "SOCKS5" | "HTTP";
export const TOR_ADDRESS = process.env.TOR_ADDRESS;
if (!OUTBOUND_PROXY_TYPES.includes(TOR_PROXY_TYPE)) throw new Error("Invalid TOR_PROXY_TYPE, must be SOCKS5 or HTTP");
// Default relay config
export const BOOTSTRAP_RELAYS = process.env.BOOTSTRAP_RELAYS
? safeRelayUrls(process.env.BOOTSTRAP_RELAYS.split(","))
: safeRelayUrls(["wss://nos.lol", "wss://relay.damus.io", "wss://relay.nostr.band"]);
export const COMMON_CONTACT_RELAYS = process.env.COMMON_CONTACT_RELAYS
? safeRelayUrls(process.env.COMMON_CONTACT_RELAYS.split(","))
: safeRelayUrls(["wss://purplepag.es", "wss://user.kindpag.es", "wss://relay.nos.social"]);

9
src/helpers/fs.ts Normal file
View File

@@ -0,0 +1,9 @@
import pfs from "fs/promises";
export async function pathExists(path: string) {
try {
await pfs.stat(path);
return true;
} catch (error) {}
return false;
}

19
src/helpers/ip.ts Normal file
View File

@@ -0,0 +1,19 @@
import os from 'node:os';
export function getIPAddresses() {
var ifaces = os.networkInterfaces();
var addresses: string[] = [];
for (const [name, info] of Object.entries(ifaces)) {
if (!info) continue;
for (const interfaceInfo of info) {
// skip over internal (i.e. 127.0.0.1) and non-ipv4 addresses
if (interfaceInfo.internal) continue;
addresses.push(interfaceInfo.address);
}
}
return addresses;
}

27
src/helpers/json.ts Normal file
View File

@@ -0,0 +1,27 @@
import fs from "fs";
const loadJson = (params: { path: string }) => {
let object;
try {
const data = fs.readFileSync(params.path);
object = JSON.parse(data.toString("utf8"));
} catch (err) {
console.log(err);
}
if (object) {
return object;
}
};
const saveJson = (data: any, params: { path: string }) => {
try {
fs.writeFileSync(params.path, Buffer.from(JSON.stringify(data)));
} catch (err) {
console.log(err);
}
};
export { loadJson, saveJson };

23
src/helpers/network.ts Normal file
View File

@@ -0,0 +1,23 @@
import net from 'net';
export function testTCPConnection(host: string, port: number, timeout = 5000) {
return new Promise((resolve, reject) => {
const socket = new net.Socket();
const timer = setTimeout(() => {
socket.destroy();
reject(new Error('Connection timed out'));
}, timeout);
socket.connect(port, host, () => {
clearTimeout(timer);
socket.destroy();
resolve(true);
});
socket.on('error', (err) => {
clearTimeout(timer);
reject(err);
});
});
}

12
src/helpers/nip19.ts Normal file
View File

@@ -0,0 +1,12 @@
import { getPubkeyFromDecodeResult, isHexKey } from "applesauce-core/helpers";
import { nip19 } from "nostr-tools";
export function normalizeToHexPubkey(hex: string) {
if (isHexKey(hex)) return hex;
try {
const decode = nip19.decode(hex);
return getPubkeyFromDecodeResult(decode) ?? null;
} catch (error) {
return null;
}
}

87
src/index.ts Normal file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env node
import process from "node:process";
import path from "node:path";
import pfs from "fs/promises";
import express, { Request } from "express";
import { mkdirp } from "mkdirp";
import dayjs from "dayjs";
import duration from "dayjs/plugin/duration.js";
import localizedFormat from "dayjs/plugin/localizedFormat.js";
import { useWebSocketImplementation } from "nostr-tools/relay";
import OutboundProxyWebSocket from "./modules/network/outbound/websocket.js";
import App from "./app/index.js";
import { DATA_PATH, PUBLIC_ADDRESS } from "./env.js";
import { addListener, logger } from "./logger.js";
import { pathExists } from "./helpers/fs.js";
// add durations plugin
dayjs.extend(duration);
dayjs.extend(localizedFormat);
// @ts-expect-error
global.WebSocket = OutboundProxyWebSocket;
useWebSocketImplementation(OutboundProxyWebSocket);
// create app
await mkdirp(DATA_PATH);
const app = new App(DATA_PATH);
// connect logger to app LogStore
addListener(({ namespace }, ...args) => {
app.logStore.addEntry(namespace, Date.now(), args.join(" "));
});
function getPublicRelayAddressFromRequest(req: Request) {
let url: URL;
if (PUBLIC_ADDRESS) {
url = new URL(PUBLIC_ADDRESS);
} else {
url = new URL("/", req.protocol + "://" + req.hostname);
}
url.protocol = req.protocol === "https:" ? "wss:" : "ws:";
return url;
}
// if the app isn't setup redirect to the setup view
app.express.get("/", (req, res, next) => {
if (!app.config.data.owner) {
logger("Redirecting to setup view");
const url = new URL("/setup", req.protocol + "://" + req.headers["host"]);
const relay = getPublicRelayAddressFromRequest(req);
url.searchParams.set("relay", relay.toString());
res.redirect(url.toString());
} else return next();
});
// serve the web ui or redirect to another hosted version
const appDir = (await pathExists("./nostrudel/dist")) ? "./nostrudel/dist" : "./public";
app.express.use(express.static(appDir));
app.express.get("*", (req, res) => {
res.sendFile(path.resolve(appDir, "index.html"));
});
// log uncaught errors
process.on("unhandledRejection", (reason, promise) => {
if (reason instanceof Error) {
console.log("Unhandled Rejection");
console.log(reason);
} else console.log("Unhandled Rejection at:", promise, "reason:", reason);
});
// start the app
await app.start();
// shutdown process
async function shutdown() {
logger("shutting down");
await app.stop();
process.exit(0);
}
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);

25
src/logger.ts Normal file
View File

@@ -0,0 +1,25 @@
import debug, { Debugger } from "debug";
if (!process.env.DEBUG) debug.enable("bakery,bakery:*");
type Listener = (logger: Debugger, ...args: any[]) => void;
const listeners = new Set<Listener>();
export function addListener(listener: Listener) {
listeners.add(listener);
}
export function removeListener(listener: Listener) {
listeners.delete(listener);
}
// listen for logs
debug.log = function (this: Debugger, ...args: any[]) {
for (const listener of listeners) {
listener(this, ...args);
}
console.log.apply(this, args);
};
const logger = debug("bakery");
export { logger };

View File

@@ -0,0 +1,66 @@
import { NostrEvent, kinds } from 'nostr-tools';
import _throttle from 'lodash.throttle';
import { getInboxes, getOutboxes } from '@satellite-earth/core/helpers/nostr/mailboxes.js';
import { logger } from '../logger.js';
import App from '../app/index.js';
import PubkeyBatchLoader from './pubkey-batch-loader.js';
/** Loads 10002 events for pubkeys */
export default class AddressBook {
log = logger.extend('AddressBook');
app: App;
loader: PubkeyBatchLoader;
get extraRelays() {
return this.loader.extraRelays;
}
set extraRelays(v: string[]) {
this.loader.extraRelays = v;
}
constructor(app: App) {
this.app = app;
this.loader = new PubkeyBatchLoader(kinds.RelayList, this.app.pool, (pubkey) => {
return this.app.eventStore.getEventsForFilters([{ kinds: [kinds.RelayList], authors: [pubkey] }])?.[0];
});
this.loader.on('event', (event) => this.app.eventStore.addEvent(event));
this.loader.on('batch', (found, failed) => {
this.log(`Found ${found}, failed ${failed}, pending ${this.loader.queue}`);
});
}
getMailboxes(pubkey: string) {
return this.loader.getEvent(pubkey);
}
getOutboxes(pubkey: string) {
const mailboxes = this.getMailboxes(pubkey);
return mailboxes && getOutboxes(mailboxes);
}
getInboxes(pubkey: string) {
const mailboxes = this.getMailboxes(pubkey);
return mailboxes && getInboxes(mailboxes);
}
handleEvent(event: NostrEvent) {
this.loader.handleEvent(event);
}
async loadMailboxes(pubkey: string, relays?: string[]) {
return this.loader.getOrLoadEvent(pubkey, relays);
}
async loadOutboxes(pubkey: string, relays?: string[]) {
const mailboxes = await this.loadMailboxes(pubkey, relays);
return mailboxes && getOutboxes(mailboxes);
}
async loadInboxes(pubkey: string, relays?: string[]) {
const mailboxes = await this.loadMailboxes(pubkey, relays);
return mailboxes && getInboxes(mailboxes);
}
}

View File

@@ -0,0 +1,95 @@
import EventEmitter from 'events';
import { SimplePool, VerifiedEvent } from 'nostr-tools';
import { AbstractRelay } from 'nostr-tools/relay';
import { normalizeURL } from 'nostr-tools/utils';
import { logger } from '../logger.js';
export type TestRelay = (relay: AbstractRelay, challenge: string) => boolean;
type EventMap = {
challenge: [AbstractRelay, string];
connected: [AbstractRelay];
closed: [AbstractRelay];
};
export default class CautiousPool extends SimplePool {
log = logger.extend('CautiousPool');
isSelf?: TestRelay;
blacklist = new Set<string>();
challenges = new Map<string, string>();
authenticated = new Map<string, boolean>();
emitter = new EventEmitter<EventMap>();
constructor(isSelf?: TestRelay) {
super();
this.isSelf = isSelf;
}
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<AbstractRelay> {
url = normalizeURL(url);
const parsed = new URL(url);
if (parsed.host === 'localhost' || parsed.host === '127.0.0.1') throw new Error('Cant connect to localhost');
if (this.blacklist.has(url)) throw new Error('Cant connect to self');
const relay = await super.ensureRelay(url, params);
if (this.checkRelay(relay)) throw new Error('Cant connect to self');
this.emitter.emit('connected', relay);
relay._onauth = (challenge) => {
if (this.checkRelay(relay, challenge)) {
this.authenticated.set(relay.url, false);
this.challenges.set(relay.url, challenge);
this.emitter.emit('challenge', relay, challenge);
}
};
relay.onnotice = () => {};
relay.onclose = () => {
this.challenges.delete(relay.url);
this.authenticated.delete(relay.url);
this.emitter.emit('closed', relay);
};
return relay;
}
private checkRelay(relay: AbstractRelay, challenge?: string) {
// @ts-expect-error
challenge = challenge || relay.challenge;
if (challenge) {
if (this.isSelf && this.isSelf(relay, challenge)) {
this.log(`Found ${relay.url} connects to ourselves, adding to blacklist`);
this.blacklist.add(relay.url);
relay.close();
relay.connect = () => {
throw new Error('Cant connect to self');
};
return true;
}
}
return false;
}
isAuthenticated(relay: string | AbstractRelay) {
return !!this.authenticated.get(typeof relay === 'string' ? relay : relay.url);
}
async authenticate(url: string | AbstractRelay, auth: VerifiedEvent) {
const relay = typeof url === 'string' ? await this.ensureRelay(url) : url;
return await relay.auth(async (draft) => auth);
}
[Symbol.iterator](): IterableIterator<[string, AbstractRelay]> {
return this.relays[Symbol.iterator]();
}
}

View File

@@ -0,0 +1,89 @@
import { type Database } from 'better-sqlite3';
import { WebSocket, WebSocketServer } from 'ws';
import { type IncomingMessage } from 'http';
import { randomBytes } from 'crypto';
import { NostrEvent, SimplePool } from 'nostr-tools';
import { HyperConnectionManager } from './hyper-connection-manager.js';
import { logger } from '../logger.js';
import { CommunityProxy } from './community-proxy.js';
import { IEventStore } from '@satellite-earth/core';
export class CommunityMultiplexer {
log = logger.extend('community-multiplexer');
db: Database;
eventStore: IEventStore;
pool: SimplePool;
connectionManager: HyperConnectionManager;
communities = new Map<string, CommunityProxy>();
constructor(db: Database, eventStore: IEventStore) {
this.db = db;
this.eventStore = eventStore;
this.pool = new SimplePool();
this.connectionManager = new HyperConnectionManager(randomBytes(32).toString('hex'));
this.syncCommunityDefinitions();
}
attachToServer(wss: WebSocketServer) {
wss.on('connection', this.handleConnection.bind(this));
}
handleConnection(ws: WebSocket, req: IncomingMessage) {
if (!req.url) return false;
const url = new URL(req.url, `http://${req.headers.host}`);
const pubkey = url.pathname.split('/')[1] as string | undefined;
if (!pubkey || pubkey.length !== 64) return false;
try {
let community = this.communities.get(pubkey);
if (!community) community = this.getCommunityProxy(pubkey);
// connect the socket to the relay
community.relay.handleConnection(ws, req);
return true;
} catch (error) {
this.log('Failed handle ws connection to', pubkey);
console.log(error);
return false;
}
}
syncCommunityDefinitions() {
this.log('Syncing community definitions');
const sub = this.pool.subscribeMany(['wss://nostrue.com'], [{ kinds: [12012] }], {
onevent: (event) => this.eventStore.addEvent(event),
oneose: () => sub.close(),
});
}
getCommunityProxy(pubkey: string) {
this.log('Looking for community definition', pubkey);
let definition: NostrEvent | undefined = undefined;
const local = this.eventStore.getEventsForFilters([{ kinds: [12012], authors: [pubkey] }]);
if (local[0]) definition = local[0];
if (!definition) throw new Error('Failed to find community definition');
this.log('Creating community proxy', pubkey);
const community = new CommunityProxy(this.db, definition, this.connectionManager);
community.connect();
this.communities.set(pubkey, community);
return community;
}
stop() {
for (const [pubkey, community] of this.communities) {
community.stop();
}
this.communities.clear();
this.connectionManager.stop();
}
}

View File

@@ -0,0 +1,175 @@
import { type Database } from "better-sqlite3";
import { Debugger } from "debug";
import { Filter, NostrEvent, Relay, kinds } from "nostr-tools";
import { NostrRelay, RelayActions } from "@satellite-earth/core";
import { Subscription } from "nostr-tools/abstract-relay";
import { LabeledEventStore } from "./labeled-event-store.js";
import { HyperConnectionManager } from "./hyper-connection-manager.js";
import { logger } from "../logger.js";
/** Used to connect to and sync with remote communities */
export class CommunityProxy {
log: Debugger;
database: Database;
connectionManager: HyperConnectionManager;
definition: NostrEvent;
upstream?: Relay;
eventStore: LabeledEventStore;
relay: NostrRelay;
get addresses() {
return this.definition.tags.filter((t) => t[0] === "r" && t[1]).map((t) => t[1]);
}
constructor(database: Database, communityDefinition: NostrEvent, connectionManager: HyperConnectionManager) {
this.database = database;
this.connectionManager = connectionManager;
this.definition = communityDefinition;
this.log = logger.extend("community-proxy:" + communityDefinition.pubkey);
this.eventStore = new LabeledEventStore(this.database, communityDefinition.pubkey);
this.eventStore.setup();
this.relay = new NostrRelay(this.eventStore);
// handle incoming events and pass them to the upstream relay
this.relay.registerEventHandler(async (ctx, next) => {
// send event to upstream relay
if (this.upstream) {
const result = this.upstream.publish(ctx.event);
this.log("Sent event to upstream", ctx.event.id);
return result;
} else throw new Error("Not connected to upstream");
});
this.relay.on("subscription:created", (subscription, ws) => {
this.syncChannelsFromFilters(subscription.filters);
});
this.relay.on("subscription:updated", (subscription, ws) => {
this.syncChannelsFromFilters(subscription.filters);
});
}
protected async connectUpstream() {
if (this.upstream) {
if (this.upstream.connected) this.upstream.close();
this.upstream = undefined;
}
const hyperAddress = this.definition.tags.find((t) => t[0] === "r" && t[1] && t[2] === "hyper")?.[1];
let address = this.definition.tags.find((t) => t[0] === "r" && t[1].startsWith("ws"))?.[1];
if (hyperAddress) {
const serverInfo = await this.connectionManager.getLocalAddress(hyperAddress);
address = new URL(`ws://${serverInfo.address}:${serverInfo.port}`).toString();
}
if (!address) throw new Error("Failed to find connection address");
try {
this.log("Connecting to upstream", address);
this.upstream = await Relay.connect(address);
this.upstream.onclose = () => {
this.log("Upstream connection closed");
this.upstream = undefined;
};
} catch (error) {
this.log("Failed to connect to upstream");
if (error instanceof Error) this.log(error);
}
}
async connect() {
if (this.upstream) return;
await this.connectUpstream();
setTimeout(() => {
this.syncMetadata();
this.syncDeletions();
}, 100);
}
handleEvent(event: NostrEvent) {
try {
switch (event.kind) {
case kinds.EventDeletion:
this.handleDeleteEvent(event);
break;
default:
this.eventStore.addEvent(event);
break;
}
} catch (error) {
this.log("Failed to handle event");
console.log(error);
}
}
handleDeleteEvent(deleteEvent: NostrEvent) {
const communityPubkey = this.definition.pubkey;
const ids = RelayActions.handleDeleteEvent(
this.eventStore,
deleteEvent,
deleteEvent.pubkey === communityPubkey ? () => true : undefined,
);
if (ids.length) this.log(`Deleted`, ids.length, "events");
}
syncMetadata() {
if (!this.upstream) return;
this.log("Opening subscription to sync metadata");
this.upstream.subscribe([{ kinds: [kinds.Metadata, kinds.RelayList, 12012, 39000, 39001, 39002] }], {
id: "metadata-sync",
onevent: (event) => this.handleEvent(event),
onclose: () => this.log("Closed metadata sync"),
});
}
syncDeletions() {
if (!this.upstream) return;
this.log("Opening subscription to sync deletions");
this.upstream.subscribe([{ kinds: [kinds.EventDeletion] }], {
id: "deletion-sync",
onevent: (event) => this.handleEvent(event),
onclose: () => this.log("Closed deletion sync"),
});
}
private syncChannelsFromFilters(filters: Filter[]) {
const channels = new Set<string>();
for (const filter of filters) {
if (filter["#h"]) filter["#h"].forEach((c) => channels.add(c));
}
for (const channel of channels) {
this.syncChannel(channel);
}
}
channelSubs = new Map<string, Subscription>();
syncChannel(channel: string) {
if (!this.upstream) return;
if (this.channelSubs.has(channel)) return;
this.log("Opening subscription to sync channel", channel);
const sub = this.upstream.subscribe([{ kinds: [9, 10, 11, 12], "#h": [channel] }], {
id: `channel-${channel}-sync`,
onevent: (event) => this.eventStore.addEvent(event),
onclose: () => {
this.channelSubs.delete(channel);
},
});
this.channelSubs.set(channel, sub);
}
stop() {
this.upstream?.close();
}
}

View File

@@ -0,0 +1,61 @@
import { JSONFileSync } from "lowdb/node";
import _throttle from "lodash.throttle";
import { uniqueNamesGenerator, adjectives, colors, animals } from "unique-names-generator";
import { PrivateNodeConfig } from "@satellite-earth/core/types/private-node-config.js";
import { ReactiveJsonFileSync } from "@satellite-earth/core";
import { logger } from "../logger.js";
export const defaultConfig: PrivateNodeConfig = {
name: uniqueNamesGenerator({
dictionaries: [colors, adjectives, animals],
}),
description: "",
autoListen: false,
runReceiverOnBoot: true,
runScrapperOnBoot: false,
logsEnabled: true,
requireReadAuth: false,
publicAddresses: [],
hyperEnabled: false,
enableTorConnections: true,
enableI2PConnections: true,
enableHyperConnections: false,
routeAllTrafficThroughTor: false,
gossipEnabled: false,
gossipInterval: 10 * 60_000,
gossipBroadcastRelays: [],
};
export default class ConfigManager extends ReactiveJsonFileSync<PrivateNodeConfig> {
log = logger.extend("ConfigManager");
constructor(path: string) {
super(new JSONFileSync(path), defaultConfig);
this.on("loaded", (config) => {
// explicitly set default values if fields are not set
for (const [key, value] of Object.entries(defaultConfig)) {
// @ts-expect-error
if (config[key] === undefined) {
// @ts-expect-error
config[key] = value;
}
}
this.write();
});
}
setField(field: keyof PrivateNodeConfig, value: any) {
this.log(`Setting ${field} to ${value}`);
// @ts-expect-error
this.data[field] = value;
this.write();
}
}

View File

@@ -0,0 +1,54 @@
import { NostrEvent, kinds } from 'nostr-tools';
import _throttle from 'lodash.throttle';
import { COMMON_CONTACT_RELAYS } from '../env.js';
import { logger } from '../logger.js';
import App from '../app/index.js';
import PubkeyBatchLoader from './pubkey-batch-loader.js';
/** Loads 3 contact lists for pubkeys */
export default class ContactBook {
log = logger.extend('ContactsBook');
app: App;
loader: PubkeyBatchLoader;
extraRelays = COMMON_CONTACT_RELAYS;
constructor(app: App) {
this.app = app;
this.loader = new PubkeyBatchLoader(kinds.Contacts, this.app.pool, (pubkey) => {
return this.app.eventStore.getEventsForFilters([{ kinds: [kinds.Contacts], authors: [pubkey] }])?.[0];
});
this.loader.on('event', (event) => this.app.eventStore.addEvent(event));
this.loader.on('batch', (found, failed) => {
this.log(`Found ${found}, failed ${failed}, pending ${this.loader.queue}`);
});
}
getContacts(pubkey: string) {
return this.loader.getEvent(pubkey);
}
getFollowedPubkeys(pubkey: string): string[] {
const contacts = this.getContacts(pubkey);
if (contacts) {
return contacts.tags
.filter((tag) => {
return tag[0] === 'p';
})
.map((tag) => {
return tag[1];
});
}
return [];
}
handleEvent(event: NostrEvent) {
this.loader.handleEvent(event);
}
async loadContacts(pubkey: string, relays: string[] = []) {
return this.loader.getOrLoadEvent(pubkey, relays);
}
}

View File

@@ -0,0 +1,49 @@
import { WebSocket } from 'ws';
import { ConfigMessage, ConfigResponse } from '@satellite-earth/core/types/control-api/config.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
/** handles ['CONTROL', 'CONFIG', ...] messages */
export default class ConfigActions implements ControlMessageHandler {
app: App;
name = 'CONFIG';
private subscribed = new Set<WebSocket | NodeJS.Process>();
constructor(app: App) {
this.app = app;
// when config changes send it to the subscribed sockets
this.app.config.on('changed', (config) => {
for (const sock of this.subscribed) {
this.send(sock, ['CONTROL', 'CONFIG', 'CHANGED', config]);
}
});
}
handleMessage(sock: WebSocket | NodeJS.Process, message: ConfigMessage) {
const method = message[2];
switch (method) {
case 'SUBSCRIBE':
this.subscribed.add(sock);
sock.once('close', () => this.subscribed.delete(sock));
this.send(sock, ['CONTROL', 'CONFIG', 'CHANGED', this.app.config.data]);
return true;
case 'SET':
const field = message[3];
const value = message[4];
this.app.config.setField(field, value);
return true;
default:
return false;
}
}
send(sock: WebSocket | NodeJS.Process, response: ConfigResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -0,0 +1,130 @@
import { WebSocket, WebSocketServer } from 'ws';
import { type IncomingMessage } from 'http';
import { ControlResponse } from '@satellite-earth/core/types/control-api/index.js';
import type App from '../../app/index.js';
import { logger } from '../../logger.js';
export type ControlMessage = ['CONTROL', string, string, ...any[]];
export interface ControlMessageHandler {
app: App;
name: string;
handleConnection?(ws: WebSocket | NodeJS.Process): void;
handleDisconnect?(socket: WebSocket): void;
handleMessage(sock: WebSocket | NodeJS.Process, message: ControlMessage): boolean | Promise<boolean>;
}
/** handles web socket connections and 'CONTROL' messages */
export default class ControlApi {
app: App;
auth?: string;
log = logger.extend('ControlApi');
handlers = new Map<string, ControlMessageHandler>();
authenticatedConnections = new Set<WebSocket | NodeJS.Process>();
constructor(app: App, auth?: string) {
this.app = app;
this.auth = auth;
}
registerHandler(handler: ControlMessageHandler) {
this.handlers.set(handler.name, handler);
}
unregisterHandler(handler: ControlMessageHandler) {
this.handlers.delete(handler.name);
}
/** start listening for incoming ws connections */
attachToServer(wss: WebSocketServer) {
wss.on('connection', this.handleConnection.bind(this));
}
handleConnection(ws: WebSocket, req: IncomingMessage) {
ws.on('message', (data, isBinary) => {
this.handleRawMessage(ws, data as Buffer);
});
for (const [id, handler] of this.handlers) {
handler.handleConnection?.(ws);
}
ws.once('close', () => this.handleDisconnect(ws));
}
handleDisconnect(ws: WebSocket) {
this.authenticatedConnections.delete(ws);
for (const [id, handler] of this.handlers) {
handler.handleDisconnect?.(ws);
}
}
attachToProcess(p: NodeJS.Process) {
p.on('message', (message) => {
if (
Array.isArray(message) &&
message[0] === 'CONTROL' &&
typeof message[1] === 'string' &&
typeof message[2] === 'string'
) {
this.handleMessage(p, message as ControlMessage);
}
});
for (const [id, handler] of this.handlers) {
handler.handleConnection?.(p);
}
}
/** handle a ws message */
async handleRawMessage(ws: WebSocket | NodeJS.Process, message: Buffer) {
try {
const data = JSON.parse(message.toString()) as string[];
try {
if (
Array.isArray(data) &&
data[0] === 'CONTROL' &&
typeof data[1] === 'string' &&
typeof data[2] === 'string'
) {
if (this.authenticatedConnections.has(ws) || data[1] === 'AUTH') {
await this.handleMessage(ws, data as ControlMessage);
}
}
} catch (err) {
this.log('Failed to handle Control message', message.toString('utf-8'));
this.log(err);
}
} catch (error) {
// failed to parse JSON, do nothing
}
}
/** handle a ['CONTROL', ...] message */
async handleMessage(sock: WebSocket | NodeJS.Process, message: ControlMessage) {
// handle ['CONTROL', 'AUTH', <code>] messages
if (message[1] === 'AUTH' && message[2] === 'CODE') {
const code = message[3];
if (code === this.auth) {
this.authenticatedConnections.add(sock);
this.send(sock, ['CONTROL', 'AUTH', 'SUCCESS']);
} else {
this.send(sock, ['CONTROL', 'AUTH', 'INVALID', 'Invalid Auth Code']);
}
return true;
}
const handler = this.handlers.get(message[1]);
if (handler) {
return await handler.handleMessage(sock, message);
}
this.log('Failed to handle Control message', message);
return false;
}
send(sock: WebSocket | NodeJS.Process, response: ControlResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -0,0 +1,66 @@
import { WebSocket } from "ws";
import os from "node:os";
import { DatabaseMessage, DatabaseResponse, DatabaseStats } from "@satellite-earth/core/types/control-api/database.js";
import App from "../../app/index.js";
import { ControlMessageHandler } from "./control-api.js";
export default class DatabaseActions implements ControlMessageHandler {
app: App;
name = "DATABASE";
subscribed = new Set<WebSocket | NodeJS.Process>();
constructor(app: App) {
this.app = app;
// update all subscribed sockets every 5 seconds
let last: DatabaseStats | undefined = undefined;
setInterval(() => {
const stats = this.getStats();
if (stats.count !== last?.count || stats.size !== last.size) {
for (const sock of this.subscribed) {
this.send(sock, ["CONTROL", "DATABASE", "STATS", stats]);
}
}
last = stats;
}, 5_000);
}
private getStats() {
const count = this.app.database.count();
const size = this.app.database.size();
return { count, size };
}
handleMessage(sock: WebSocket | NodeJS.Process, message: DatabaseMessage): boolean {
const action = message[2];
switch (action) {
case "SUBSCRIBE":
this.subscribed.add(sock);
sock.once("close", () => this.subscribed.delete(sock));
this.send(sock, ["CONTROL", "DATABASE", "STATS", this.getStats()]);
return true;
case "UNSUBSCRIBE":
this.subscribed.delete(sock);
return true;
case "STATS":
this.send(sock, ["CONTROL", "DATABASE", "STATS", this.getStats()]);
return true;
case "CLEAR":
this.app.database.clear();
return true;
default:
return false;
}
}
send(sock: WebSocket | NodeJS.Process, response: DatabaseResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -0,0 +1,50 @@
import { WebSocket } from 'ws';
import {
DecryptionCacheMessage,
DecryptionCacheResponse,
} from '@satellite-earth/core/types/control-api/decryption-cache.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
/** handles ['CONTROL', 'DECRYPTION-CACHE', ...] messages */
export default class DecryptionCacheActions implements ControlMessageHandler {
app: App;
name = 'DECRYPTION-CACHE';
constructor(app: App) {
this.app = app;
}
handleMessage(sock: WebSocket | NodeJS.Process, message: DecryptionCacheMessage) {
const method = message[2];
switch (method) {
case 'ADD-CONTENT':
this.app.decryptionCache.addEventContent(message[3], message[4]);
return true;
case 'CLEAR-PUBKEY':
this.app.decryptionCache.clearPubkey(message[3]);
return true;
case 'CLEAR':
this.app.decryptionCache.clearAll();
return true;
case 'REQUEST':
this.app.decryptionCache.getEventsContent(message[3]).then((contents) => {
for (const { event, content } of contents)
this.send(sock, ['CONTROL', 'DECRYPTION-CACHE', 'CONTENT', event, content]);
this.send(sock, ['CONTROL', 'DECRYPTION-CACHE', 'END']);
});
return true;
default:
return false;
}
}
send(sock: WebSocket | NodeJS.Process, response: DecryptionCacheResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -0,0 +1,31 @@
import { WebSocket } from 'ws';
import { DirectMessageMessage } from '@satellite-earth/core/types/control-api/direct-messages.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
/** handles ['CONTROL', 'DM', ...] messages */
export default class DirectMessageActions implements ControlMessageHandler {
app: App;
name = 'DM';
constructor(app: App) {
this.app = app;
}
handleMessage(sock: WebSocket | NodeJS.Process, message: DirectMessageMessage) {
const method = message[2];
switch (method) {
case 'OPEN':
this.app.directMessageManager.openConversation(message[3], message[4]);
return true;
case 'CLOSE':
this.app.directMessageManager.closeConversation(message[3], message[4]);
return true;
default:
return false;
}
}
}

View File

@@ -0,0 +1,27 @@
import { WebSocket } from 'ws';
import { LogsMessage } from '@satellite-earth/core/types/control-api/logs.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
/** handles ['CONTROL', 'DM', ...] messages */
export default class LogsActions implements ControlMessageHandler {
app: App;
name = 'LOGS';
constructor(app: App) {
this.app = app;
}
handleMessage(sock: WebSocket | NodeJS.Process, message: LogsMessage) {
const method = message[2];
switch (method) {
case 'CLEAR':
this.app.logStore.clearLogs(message[3] ? { service: message[3] } : undefined);
return true;
default:
return false;
}
}
}

View File

@@ -0,0 +1,43 @@
import { WebSocket } from 'ws';
import { NotificationsMessage, NotificationsResponse } from '@satellite-earth/core/types/control-api/notifications.js';
import { ControlMessageHandler } from './control-api.js';
import type App from '../../app/index.js';
import { NostrEvent } from 'nostr-tools';
export default class NotificationActions implements ControlMessageHandler {
app: App;
name = 'NOTIFICATIONS';
constructor(app: App) {
this.app = app;
}
handleMessage(sock: WebSocket | NodeJS.Process, message: NotificationsMessage): boolean {
const action = message[2];
switch (action) {
case 'GET-VAPID-KEY':
this.send(sock, ['CONTROL', 'NOTIFICATIONS', 'VAPID-KEY', this.app.notifications.webPushKeys.publicKey]);
return true;
case 'REGISTER':
this.app.notifications.addOrUpdateChannel(message[3]);
return true;
case 'NOTIFY':
const event: NostrEvent | undefined = this.app.eventStore.getEventsForFilters([{ ids: [message[3]] }])?.[0];
if (event) this.app.notifications.notify(event);
return true;
case 'UNREGISTER':
this.app.notifications.removeChannel(message[3]);
return true;
default:
return false;
}
}
send(sock: WebSocket | NodeJS.Process, response: NotificationsResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -0,0 +1,29 @@
import { WebSocket } from 'ws';
import { ReceiverMessage } from '@satellite-earth/core/types/control-api/receiver.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
export default class ReceiverActions implements ControlMessageHandler {
app: App;
name = 'RECEIVER';
constructor(app: App) {
this.app = app;
}
handleMessage(sock: WebSocket | NodeJS.Process, message: ReceiverMessage): boolean {
const action = message[2];
switch (action) {
case 'START':
this.app.receiver.start();
return true;
case 'STOP':
this.app.receiver.stop();
return true;
default:
return false;
}
}
}

View File

@@ -0,0 +1,69 @@
import { WebSocket } from 'ws';
import { verifyEvent } from 'nostr-tools';
import { RemoteAuthMessage, RemoteAuthResponse } from '@satellite-earth/core/types/control-api/remote-auth.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
/** handles ['CONTROL', 'REMOTE-AUTH', ...] messages */
export default class RemoteAuthActions implements ControlMessageHandler {
app: App;
name = 'REMOTE-AUTH';
private subscribed = new Set<WebSocket | NodeJS.Process>();
constructor(app: App) {
this.app = app;
// when config changes send it to the subscribed sockets
this.app.pool.emitter.on('challenge', (relay, challenge) => {
for (const sock of this.subscribed) {
this.send(sock, [
'CONTROL',
'REMOTE-AUTH',
'STATUS',
relay.url,
challenge,
!!this.app.pool.authenticated.get(relay.url),
]);
}
});
}
sendAllStatuses(sock: WebSocket | NodeJS.Process) {
for (const [url, relay] of this.app.pool) {
const challenge = this.app.pool.challenges.get(url);
const authenticated = this.app.pool.isAuthenticated(url);
if (challenge) {
this.send(sock, ['CONTROL', 'REMOTE-AUTH', 'STATUS', url, challenge, authenticated]);
}
}
}
async handleMessage(sock: WebSocket | NodeJS.Process, message: RemoteAuthMessage) {
const method = message[2];
switch (method) {
case 'SUBSCRIBE':
this.subscribed.add(sock);
sock.once('close', () => this.subscribed.delete(sock));
this.sendAllStatuses(sock);
return true;
case 'UNSUBSCRIBE':
this.subscribed.delete(sock);
return true;
case 'AUTHENTICATE':
const event = message[3];
if (verifyEvent(event)) {
const relay = event.tags.find((t) => (t[0] = 'relay'))?.[1];
if (relay) await this.app.pool.authenticate(relay, event);
}
default:
return false;
}
}
send(sock: WebSocket | NodeJS.Process, response: RemoteAuthResponse) {
sock.send?.(JSON.stringify(response));
}
}

View File

@@ -0,0 +1,93 @@
import { WebSocket } from 'ws';
import { ReportArguments } from '@satellite-earth/core/types';
import { ReportsMessage } from '@satellite-earth/core/types/control-api/reports.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
import Report from '../reports/report.js';
import { logger } from '../../logger.js';
import REPORT_CLASSES from '../reports/reports/index.js';
/** handles ['CONTROL', 'REPORT', ...] messages */
export default class ReportActions implements ControlMessageHandler {
app: App;
name = 'REPORT';
log = logger.extend('ReportActions');
types: {
[k in keyof ReportArguments]?: typeof Report<k>;
} = REPORT_CLASSES;
private reports = new Map<WebSocket | NodeJS.Process, Map<string, Report<any>>>();
constructor(app: App) {
this.app = app;
}
private getReportsForSocket(socket: WebSocket | NodeJS.Process) {
let map = this.reports.get(socket);
if (map) return map;
map = new Map();
this.reports.set(socket, map);
return map;
}
handleDisconnect(ws: WebSocket): void {
// close all reports for socket on disconnect
const reports = this.reports.get(ws);
if (reports) {
for (const [id, report] of reports) report.close();
if (reports.size) this.log(`Closed ${reports.size} reports for disconnected socket`);
this.reports.delete(ws);
}
}
// TODO: maybe move some of this logic out to a manager class so the control action class can be simpler
async handleMessage(sock: WebSocket | NodeJS.Process, message: ReportsMessage) {
const method = message[2];
switch (method) {
case 'SUBSCRIBE': {
const reports = this.getReportsForSocket(sock);
const id = message[3];
const type = message[4];
const args = message[5];
let report = reports.get(id) as Report<typeof type> | undefined;
if (!report) {
const ReportClass = this.types[type];
if (!ReportClass) throw new Error('Missing class for report type: ' + type);
this.log(`Creating ${type} ${id} report with args`, JSON.stringify(args));
report = new ReportClass(id, this.app, sock);
reports.set(id, report);
}
await report.run(args);
return true;
}
case 'CLOSE': {
const reports = this.getReportsForSocket(sock);
const id = message[3];
const report = reports.get(id);
if (report) {
await report.close();
reports.delete(id);
}
return true;
}
default:
return false;
}
}
cleanup() {
for (const [sock, reports] of this.reports) {
for (const [id, report] of reports) {
report.close();
}
}
}
}

View File

@@ -0,0 +1,37 @@
import { WebSocket } from 'ws';
import { ScrapperMessage } from '@satellite-earth/core/types/control-api/scrapper.js';
import type App from '../../app/index.js';
import { type ControlMessageHandler } from './control-api.js';
export default class ScrapperActions implements ControlMessageHandler {
app: App;
name = 'SCRAPPER';
constructor(app: App) {
this.app = app;
}
handleMessage(sock: WebSocket | NodeJS.Process, message: ScrapperMessage): boolean {
const action = message[2];
switch (action) {
case 'START':
this.app.scrapper.start();
return true;
case 'STOP':
this.app.scrapper.stop();
return true;
case 'ADD-PUBKEY':
this.app.scrapper.addPubkey(message[3]);
return true;
case 'REMOVE-PUBKEY':
this.app.scrapper.removePubkey(message[3]);
return true;
default:
return false;
}
}
}

View File

@@ -0,0 +1,153 @@
import { mapParams } from '@satellite-earth/core/helpers/sql.js';
import { MigrationSet } from '@satellite-earth/core/sqlite';
import { type Database } from 'better-sqlite3';
import { EventEmitter } from 'events';
import { logger } from '../../logger.js';
import { EventRow, parseEventRow } from '@satellite-earth/core/sqlite-event-store';
import { NostrEvent } from 'nostr-tools';
const migrations = new MigrationSet('decryption-cache');
// Version 1
migrations.addScript(1, async (db, log) => {
db.prepare(
`
CREATE TABLE "decryption_cache" (
"event" TEXT(64) NOT NULL,
"content" TEXT NOT NULL,
PRIMARY KEY("event")
);
`,
).run();
});
// Version 2, search
migrations.addScript(2, async (db, log) => {
// create external Content fts5 table
db.prepare(
`CREATE VIRTUAL TABLE IF NOT EXISTS decryption_cache_fts USING fts5(content, content='decryption_cache', tokenize='trigram')`,
).run();
log(`Created decryption cache search table`);
// create triggers to sync table
db.prepare(
`
CREATE TRIGGER IF NOT EXISTS decryption_cache_ai AFTER INSERT ON decryption_cache BEGIN
INSERT INTO decryption_cache_fts(rowid, content) VALUES (NEW.rowid, NEW.content);
END;
`,
).run();
db.prepare(
`
CREATE TRIGGER IF NOT EXISTS decryption_cache_ad AFTER DELETE ON decryption_cache BEGIN
INSERT INTO decryption_cache_ai(decryption_cache_ai, rowid, content) VALUES('delete', OLD.rowid, OLD.content);
END;
`,
).run();
// populate table
const inserted = db
.prepare(`INSERT INTO decryption_cache_fts (rowid, content) SELECT rowid, content FROM decryption_cache`)
.run();
log(`Indexed ${inserted.changes} decrypted events in search table`);
});
type EventMap = {
cache: [string, string];
};
export default class DecryptionCache extends EventEmitter<EventMap> {
database: Database;
log = logger.extend('DecryptionCache');
constructor(database: Database) {
super();
this.database = database;
}
setup() {
return migrations.run(this.database);
}
/** cache the decrypted content of an event */
addEventContent(id: string, plaintext: string) {
const result = this.database
.prepare<[string, string]>(`INSERT INTO decryption_cache (event, content) VALUES (?, ?)`)
.run(id, plaintext);
if (result.changes > 0) {
this.log(`Saved content for ${id}`);
this.emit('cache', id, plaintext);
}
}
/** remove all cached content relating to a pubkey */
clearPubkey(pubkey: string) {
// this.database.prepare<string>(`DELETE FROM decryption_cache INNER JOIN events ON event=events.id`)
}
/** clear all cached content */
clearAll() {
this.database.prepare(`DELETE FROM decryption_cache`).run();
}
async search(
search: string,
filter?: { conversation?: [string, string]; order?: 'rank' | 'created_at' },
): Promise<{ event: NostrEvent; plaintext: string }[]> {
const params: any[] = [];
const andConditions: string[] = [];
let sql = `SELECT events.*, decryption_cache.content as plaintext FROM decryption_cache_fts
INNER JOIN decryption_cache ON decryption_cache_fts.rowid = decryption_cache.rowid
INNER JOIN events ON decryption_cache.event = events.id`;
andConditions.push('decryption_cache_fts MATCH ?');
params.push(search);
// filter down by authors
if (filter?.conversation) {
sql += `\nINNER JOIN tags ON tag.e = events.id AND tags.t = 'p'`;
andConditions.push(`(tags.v = ? AND events.pubkey = ?) OR (tags.v = ? AND events.pubkey = ?)`);
params.push(...filter.conversation, ...Array.from(filter.conversation).reverse());
}
if (andConditions.length > 0) {
sql += ` WHERE ${andConditions.join(' AND ')}`;
}
switch (filter?.order) {
case 'rank':
sql += ' ORDER BY rank';
break;
case 'created_at':
default:
sql += ' ORDER BY events.created_at DESC';
break;
}
return this.database
.prepare<any[], EventRow & { plaintext: string }>(sql)
.all(...params)
.map((row) => ({ event: parseEventRow(row), plaintext: row.plaintext }));
}
async getEventContent(id: string) {
const result = this.database
.prepare<[string], { event: string; content: string }>(`SELECT * FROM decryption_cache WHERE event=?`)
.get(id);
return result?.content;
}
async getEventsContent(ids: string[]) {
return this.database
.prepare<
string[],
{ event: string; content: string }
>(`SELECT * FROM decryption_cache WHERE event IN ${mapParams(ids)}`)
.all(...ids);
}
}

View File

@@ -0,0 +1,168 @@
import { NostrEvent, kinds } from 'nostr-tools';
import { SubCloser } from 'nostr-tools/abstract-pool';
import { Subscription } from 'nostr-tools/abstract-relay';
import { EventEmitter } from 'events';
import { getInboxes } from '@satellite-earth/core/helpers/nostr/mailboxes.js';
import { logger } from '../logger.js';
import type App from '../app/index.js';
import { getRelaysFromContactList } from '@satellite-earth/core/helpers/nostr/contacts.js';
type EventMap = {
open: [string, string];
close: [string, string];
message: [NostrEvent];
};
/** handles sending and receiving direct messages */
export default class DirectMessageManager extends EventEmitter<EventMap> {
log = logger.extend('DirectMessageManager');
app: App;
private explicitRelays: string[] = [];
constructor(app: App) {
super();
this.app = app;
// Load profiles for participants when
// a conversation thread is opened
this.on('open', (a, b) => {
this.app.profileBook.loadProfile(a, this.app.addressBook.getOutboxes(a));
this.app.profileBook.loadProfile(b, this.app.addressBook.getOutboxes(b));
});
// emit a "message" event when a new kind4 message is detected
this.app.eventStore.on('event:inserted', (event) => {
if (event.kind === kinds.EncryptedDirectMessage) this.emit('message', event);
});
}
/** sends a DM event to the receivers inbox relays */
async forwardMessage(event: NostrEvent) {
if (event.kind !== kinds.EncryptedDirectMessage) return;
const addressedTo = event.tags.find((t) => t[0] === 'p')?.[1];
if (!addressedTo) return;
// get users inboxes
let relays = await this.app.addressBook.loadInboxes(addressedTo);
if (!relays || relays.length === 0) {
// try to send the DM to the users legacy app relays
const contacts = await this.app.contactBook.loadContacts(addressedTo);
if (contacts) {
const appRelays = getRelaysFromContactList(contacts);
if (appRelays) relays = appRelays.filter((r) => r.write).map((r) => r.url);
}
}
if (!relays || relays.length === 0) {
// use fallback relays
relays = this.explicitRelays;
}
this.log(`Forwarding message to ${relays.length} relays`);
const results = await Promise.allSettled(this.app.pool.publish(relays, event));
return results;
}
private getConversationKey(a: string, b: string) {
if (a < b) return a + ':' + b;
else return b + ':' + a;
}
watching = new Map<string, Map<string, Subscription>>();
async watchInbox(pubkey: string) {
if (this.watching.has(pubkey)) return;
this.log(`Watching ${pubkey} inboxes for mail`);
const mailboxes = await this.app.addressBook.loadMailboxes(pubkey);
if (!mailboxes) {
this.log(`Failed to get ${pubkey} mailboxes`);
return;
}
const relays = getInboxes(mailboxes, this.explicitRelays);
const subscriptions = new Map<string, Subscription>();
for (const url of relays) {
const subscribe = async () => {
const relay = await this.app.pool.ensureRelay(url);
const sub = relay.subscribe([{ kinds: [kinds.EncryptedDirectMessage], '#p': [pubkey] }], {
onevent: (event) => {
this.app.eventStore.addEvent(event);
},
onclose: () => {
// reconnect if we are still watching this pubkey
if (this.watching.has(pubkey)) {
this.log(`Reconnecting to ${relay.url} for ${pubkey} inbox DMs`);
setTimeout(() => subscribe(), 30_000);
}
},
});
subscriptions.set(relay.url, sub);
};
subscribe();
}
this.watching.set(pubkey, subscriptions);
}
stopWatchInbox(pubkey: string) {
const subs = this.watching.get(pubkey);
if (subs) {
this.watching.delete(pubkey);
for (const [_, sub] of subs) {
sub.close();
}
}
}
subscriptions = new Map<string, SubCloser>();
async openConversation(a: string, b: string) {
const key = this.getConversationKey(a, b);
if (this.subscriptions.has(key)) return;
const aMailboxes = await this.app.addressBook.loadMailboxes(a);
const bMailboxes = await this.app.addressBook.loadMailboxes(b);
// If inboxes for either user cannot be determined, either because nip65
// was not found, or nip65 had no listed read relays, fall back to explicit
const aInboxes = aMailboxes ? getInboxes(aMailboxes, this.explicitRelays) : this.explicitRelays;
const bInboxes = bMailboxes ? getInboxes(bMailboxes, this.explicitRelays) : this.explicitRelays;
const relays = new Set([...aInboxes, ...bInboxes]);
let events = 0;
const sub = this.app.pool.subscribeMany(
Array.from(relays),
[{ kinds: [kinds.EncryptedDirectMessage], authors: [a, b], '#p': [a, b] }],
{
onevent: (event) => {
events += +this.app.eventStore.addEvent(event);
},
oneose: () => {
if (events) this.log(`Found ${events} new messages`);
},
},
);
this.log(`Opened conversation ${key} on ${relays.size} relays`);
this.subscriptions.set(key, sub);
this.emit('open', a, b);
}
closeConversation(a: string, b: string) {
const key = this.getConversationKey(a, b);
const sub = this.subscriptions.get(key);
if (sub) {
sub.close();
this.subscriptions.delete(key);
this.emit('close', a, b);
}
}
}

132
src/modules/gossip.ts Normal file
View File

@@ -0,0 +1,132 @@
import { SimpleSigner } from 'applesauce-signer/signers/simple-signer';
import { EventTemplate, SimplePool } from 'nostr-tools';
import { getTagValue } from 'applesauce-core/helpers';
import { IEventStore, NostrRelay } from '@satellite-earth/core';
import dayjs, { Dayjs } from 'dayjs';
import { logger } from '../logger.js';
import InboundNetworkManager from './network/inbound/index.js';
function buildGossipTemplate(self: string, address: string, network: string): EventTemplate {
return {
kind: 30166,
content: '',
tags: [
['d', address],
['n', network],
['p', self],
['T', 'PrivateInbox'],
...NostrRelay.SUPPORTED_NIPS.map((nip) => ['N', String(nip)]),
],
created_at: dayjs().unix(),
};
}
export default class Gossip {
log = logger.extend('Gossip');
network: InboundNetworkManager;
signer: SimpleSigner;
pool: SimplePool;
relay: NostrRelay;
eventStore: IEventStore;
running = false;
// default every 30 minutes
interval = 30 * 60_000;
broadcastRelays: string[] = [];
constructor(
network: InboundNetworkManager,
signer: SimpleSigner,
pool: SimplePool,
relay: NostrRelay,
eventStore: IEventStore,
) {
this.network = network;
this.signer = signer;
this.pool = pool;
this.relay = relay;
this.eventStore = eventStore;
}
async gossip() {
const pubkey = await this.signer.getPublicKey();
if (this.broadcastRelays.length === 0) return;
if (this.network.hyper.available && this.network.hyper.address) {
this.log('Publishing hyper gossip');
await this.pool.publish(
this.broadcastRelays,
await this.signer.signEvent(buildGossipTemplate(pubkey, this.network.hyper.address, 'hyper')),
);
}
if (this.network.tor.available && this.network.tor.address) {
this.log('Publishing tor gossip');
await this.pool.publish(
this.broadcastRelays,
await this.signer.signEvent(buildGossipTemplate(pubkey, this.network.tor.address, 'tor')),
);
}
if (this.network.i2p.available && this.network.i2p.address) {
this.log('Publishing i2p gossip');
await this.pool.publish(
this.broadcastRelays,
await this.signer.signEvent(buildGossipTemplate(pubkey, this.network.i2p.address, 'i2p')),
);
}
}
private async update() {
if (!this.running) return;
await this.gossip();
setTimeout(this.update.bind(this), this.interval);
}
start() {
if (this.running) return;
this.running = true;
this.log(`Starting gossip on ${this.broadcastRelays.join(', ')}`);
setTimeout(this.update.bind(this), 5000);
}
stop() {
this.log('Stopping gossip');
this.running = false;
}
private lookups = new Map<string, Dayjs>();
async lookup(pubkey: string) {
const last = this.lookups.get(pubkey);
const filter = { authors: [pubkey], '#p': [pubkey], kinds: [30166] };
// no cache or expired
if (last === undefined || !last.isAfter(dayjs())) {
await new Promise<void>((res) => {
this.lookups.set(pubkey, dayjs().add(1, 'hour'));
const sub = this.pool.subscribeMany(this.broadcastRelays, [filter], {
onevent: (event) => this.eventStore.addEvent(event),
oneose: () => {
sub.close();
res();
},
});
});
}
const events = this.eventStore.getEventsForFilters([filter]);
const addresses: string[] = [];
for (const event of events) {
const url = getTagValue(event, 'd');
if (url) addresses.push(url);
}
return addresses;
}
}

105
src/modules/graph/index.ts Normal file
View File

@@ -0,0 +1,105 @@
import { NostrEvent, kinds } from 'nostr-tools';
import App from '../../app/index.js';
export type Node = { p: string; z: number; n: number };
// TODO: this should be moved to core
export default class Graph {
contacts: Record<string, { created_at: number; set: Set<string> }> = {};
app: App;
constructor(app: App) {
this.app = app;
}
init() {
const events = this.app.eventStore.getEventsForFilters([{ kinds: [kinds.Contacts] }]);
for (let event of events) {
this.add(event);
}
}
add(event: NostrEvent) {
if (event.kind === kinds.Contacts) {
this.addContacts(event);
}
}
addContacts(event: NostrEvent) {
const existing = this.contacts[event.pubkey];
// Add or overwrite an existing (older) contacts list
if (!existing || existing.created_at < event.created_at) {
const following = new Set(event.tags.filter((tag) => tag[0] === 'p').map((tag) => tag[1]));
this.contacts[event.pubkey] = {
created_at: event.created_at,
set: following,
};
}
}
getNodes(roots: string[] = []): Node[] {
const u: Record<string, { z: number; n: number }> = {};
// Init u with root pubkeys
for (let p of roots) {
u[p] = { z: 0, n: 1 };
}
const populate = (pubkeys: string[], z: number) => {
for (let p of pubkeys) {
// If pubkey's contacts don't exist, skip it
if (!this.contacts[p]) {
continue;
}
// Iterate across pubkey's contacts, if the
// contact has not been recorded, create an
// entry at the current degrees of separation,
// otherwise increment the number of occurances
this.contacts[p].set.forEach((c) => {
// Don't count self-follow
if (p === c) {
return;
}
if (!u[c]) {
u[c] = { z, n: 1 };
} else {
if (u[c].z > z) {
return;
}
u[c].n++;
}
});
}
};
// Populate u with all the pubkeys that
// are directly followed by root pubkey
populate(roots, 1);
// On the second pass, populate u with
// all the pubkeys that are followed
// by any pubkey that root follows
populate(
Object.keys(u).filter((p) => {
return u[p].z > 0;
}),
2,
);
// Return list of pubkeys sorted by degrees
// of separation and number of occurances
return Object.keys(u)
.map((p) => {
return { ...u[p], p };
})
.sort((a, b) => {
return a.z === b.z ? b.n - a.n : a.z - b.z;
});
}
}

View File

@@ -0,0 +1,65 @@
import net from 'net';
import HyperDHT from 'hyperdht';
import { pipeline } from 'streamx';
import { logger } from '../logger.js';
const START_PORT = 25100;
export class HyperConnectionManager {
log = logger.extend(`hyper-connection-manager`);
sockets = new Map<string, net.Socket>();
servers = new Map<string, net.Server>();
node: HyperDHT;
lastPort = START_PORT;
constructor(privateKey: string) {
this.node = new HyperDHT({
keyPair: HyperDHT.keyPair(Buffer.from(privateKey, 'hex')),
});
}
protected bind(pubkey: string) {
return new Promise<net.Server>((res) => {
const proxy = net.createServer({ allowHalfOpen: true }, (socket_) => {
const socket = this.node.connect(Buffer.from(pubkey, 'hex'), {
reusableSocket: true,
});
// @ts-expect-error
socket.setKeepAlive(5000);
socket.on('open', () => {
// connect the sockets
pipeline(socket_, socket, socket_);
});
socket.on('error', (error) => {
this.log('Failed to connect to', pubkey);
this.log(error);
});
});
this.servers.set(pubkey, proxy);
const port = this.lastPort++;
proxy.listen(port, '127.0.0.1', () => {
this.log('Bound hyper address', pubkey, 'to port:', port);
res(proxy);
});
});
}
async getLocalAddress(pubkey: string) {
let server = this.servers.get(pubkey);
if (!server) server = await this.bind(pubkey);
return server!.address() as net.AddressInfo;
}
stop() {
for (const [pubkey, server] of this.servers) {
server.close();
}
this.servers.clear();
}
}

View File

@@ -0,0 +1,79 @@
import { Database } from 'better-sqlite3';
import { Filter, NostrEvent } from 'nostr-tools';
import { IEventStore, SQLiteEventStore } from '@satellite-earth/core';
import { logger } from '../logger.js';
import { MigrationSet } from '@satellite-earth/core/sqlite';
export function mapParams(params: any[]) {
return `(${params.map(() => `?`).join(', ')})`;
}
const migrations = new MigrationSet('labeled-event-store');
// Version 1
migrations.addScript(1, async (db, log) => {
db.prepare(
`
CREATE TABLE IF NOT EXISTS event_labels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event TEXT(64) REFERENCES events(id),
label TEXT
)
`,
).run();
db.prepare('CREATE INDEX IF NOT EXISTS event_labels_label ON event_labels(label)').run();
db.prepare('CREATE INDEX IF NOT EXISTS event_labels_event ON event_labels(event)').run();
});
/** An event store that is can only see a subset of events int the database */
export class LabeledEventStore extends SQLiteEventStore implements IEventStore {
label: string;
readAll = false;
constructor(db: Database, label: string) {
super(db);
this.label = label;
this.log = logger.extend(`event-store:` + label);
}
async setup() {
await super.setup();
await migrations.run(this.db);
}
override buildConditionsForFilters(filter: Filter) {
const parts = super.buildConditionsForFilters(filter);
if (!this.readAll) {
parts.joins.push('INNER JOIN event_labels ON events.id = event_labels.event');
parts.conditions.push('event_labels.label = ?');
parts.parameters.push(this.label);
return parts;
}
return parts;
}
addEvent(event: NostrEvent) {
const inserted = super.addEvent(event);
const hasLabel = !!this.db
.prepare('SELECT * FROM event_labels WHERE event = ? AND label = ?')
.get(event.id, this.label);
if (!hasLabel) this.db.prepare(`INSERT INTO event_labels (event, label) VALUES (?, ?)`).run(event.id, this.label);
return inserted;
}
removeEvents(ids: string[]) {
this.db.prepare(`DELETE FROM event_labels WHERE event IN ${mapParams(ids)}`).run(...ids);
return super.removeEvents(ids);
}
removeEvent(id: string) {
this.db.prepare(`DELETE FROM event_labels WHERE event = ?`).run(id);
return super.removeEvent(id);
}
}

View File

@@ -0,0 +1,163 @@
import { type Database as SQLDatabase } from 'better-sqlite3';
import { MigrationSet } from '@satellite-earth/core/sqlite';
import EventEmitter from 'events';
import { nanoid } from 'nanoid';
import { Debugger } from 'debug';
import { logger } from '../../logger.js';
type EventMap = {
log: [LogEntry];
clear: [string | undefined];
};
export type LogEntry = {
id: string;
service: string;
timestamp: number;
message: string;
};
export type DatabaseLogEntry = LogEntry & {
id: number | bigint;
};
const migrations = new MigrationSet('log-store');
// version 1
migrations.addScript(1, async (db, log) => {
db.prepare(
`
CREATE TABLE IF NOT EXISTS "logs" (
"id" TEXT NOT NULL UNIQUE,
"timestamp" INTEGER NOT NULL,
"service" TEXT NOT NULL,
"message" TEXT NOT NULL,
PRIMARY KEY("id")
);
`,
).run();
log('Created logs table');
db.prepare('CREATE INDEX IF NOT EXISTS logs_service ON logs(service)');
log('Created logs service index');
});
export default class LogStore extends EventEmitter<EventMap> {
database: SQLDatabase;
debug: Debugger;
constructor(database: SQLDatabase) {
super();
this.database = database;
this.debug = logger;
}
async setup() {
return await migrations.run(this.database);
}
addEntry(service: string, timestamp: Date | number, message: string) {
const unix = timestamp instanceof Date ? Math.round(timestamp.valueOf() / 1000) : timestamp;
const entry = {
id: nanoid(),
service,
timestamp: unix,
message,
};
this.queue.push(entry);
this.emit('log', entry);
if (!this.running) this.write();
}
running = false;
queue: LogEntry[] = [];
private write() {
if (this.running) return;
this.running = true;
const BATCH_SIZE = 5000;
const inserted: (number | bigint)[] = [];
const failed: LogEntry[] = [];
this.database.transaction(() => {
let i = 0;
while (this.queue.length) {
const entry = this.queue.shift()!;
try {
const { lastInsertRowid } = this.database
.prepare<
[string, string, number, string]
>(`INSERT INTO "logs" (id, service, timestamp, message) VALUES (?, ?, ?, ?)`)
.run(entry.id, entry.service, entry.timestamp, entry.message);
inserted.push(lastInsertRowid);
} catch (error) {
failed.push(entry);
}
if (++i >= BATCH_SIZE) break;
}
})();
for (const entry of failed) {
// Don't know what to do here...
}
if (this.queue.length > 0) setTimeout(this.write.bind(this), 1000);
else this.running = false;
}
getLogs(filter?: { service?: string; since?: number; until?: number; limit?: number }) {
const conditions: string[] = [];
const parameters: (string | number)[] = [];
let sql = `SELECT * FROM logs`;
if (filter?.service) {
conditions.push(`service LIKE CONCAT(?,'%')`);
parameters.push(filter?.service);
}
if (filter?.since) {
conditions.push('timestamp>=?');
parameters.push(filter?.since);
}
if (filter?.until) {
conditions.push('timestamp<=?');
parameters.push(filter?.until);
}
if (conditions.length > 0) sql += ` WHERE ${conditions.join(' AND ')}`;
if (filter?.limit) {
sql += ' LIMIT ?';
parameters.push(filter.limit);
}
return this.database.prepare<any[], DatabaseLogEntry>(sql).all(...parameters);
}
clearLogs(filter?: { service?: string; since?: number; until?: number }) {
const conditions: string[] = [];
const parameters: (string | number)[] = [];
let sql = `DELETE FROM logs`;
if (filter?.service) {
conditions.push('service=?');
parameters.push(filter?.service);
}
if (filter?.since) {
conditions.push('timestamp>=?');
parameters.push(filter?.since);
}
if (filter?.until) {
conditions.push('timestamp<=?');
parameters.push(filter?.until);
}
if (conditions.length > 0) sql += ` WHERE ${conditions.join(' AND ')}`;
this.database.prepare<any[], DatabaseLogEntry>(sql).run(...parameters);
this.emit('clear', filter?.service);
}
}

View File

@@ -0,0 +1,71 @@
import HolesailServer from 'holesail-server';
import { encodeAddress } from 'hyper-address';
import { hexToBytes } from '@noble/hashes/utils';
import { AddressInfo } from 'net';
import App from '../../../app/index.js';
import { InboundInterface } from '../interfaces.js';
import { logger } from '../../../logger.js';
/** manages a holesail-server instance that points to the app.server http server */
export default class HyperInbound implements InboundInterface {
app: App;
hyper?: HolesailServer;
log = logger.extend('Network:Inbound:Hyper');
get available() {
return true;
}
running = false;
error?: Error;
address?: string;
constructor(app: App) {
this.app = app;
}
async start(address: AddressInfo) {
try {
this.running = true;
this.error = undefined;
this.log(`Importing and starting hyperdht node`);
const { default: HolesailServer } = await import('holesail-server');
const { getOrCreateNode } = await import('../../../sidecars/hyperdht.js');
const hyper = (this.hyper = new HolesailServer());
hyper.dht = getOrCreateNode();
return new Promise<void>((res) => {
hyper.serve(
{
port: address.port,
address: address.address,
secure: false,
buffSeed: this.app.secrets.get('hyperKey'),
},
() => {
const address = 'http://' + encodeAddress(hexToBytes(hyper.getPublicKey()));
this.address = address;
this.log(`Listening on ${address}`);
res();
},
);
});
} catch (error) {
this.running = false;
if (error instanceof Error) this.error = error;
}
}
async stop() {
this.log('Shutting down');
// disabled because holesail-server destroys the hyperdht node
// this.hyper?.destroy();
this.running = false;
this.address = undefined;
this.error = undefined;
}
}

View File

@@ -0,0 +1,76 @@
import type { AddressInfo } from 'net';
import type { I2pSamStream } from '@diva.exchange/i2p-sam';
import App from '../../../app/index.js';
import { I2P_SAM_ADDRESS } from '../../../env.js';
import { logger } from '../../../logger.js';
import { InboundInterface } from '../interfaces.js';
export default class I2PInbound implements InboundInterface {
app: App;
log = logger.extend('Network:Inbound:I2P');
available = !!I2P_SAM_ADDRESS;
running = false;
address?: string;
error?: Error;
private forward?: I2pSamStream;
constructor(app: App) {
this.app = app;
}
async start(address: AddressInfo) {
try {
if (this.running) return;
this.running = true;
const [host, port] = I2P_SAM_ADDRESS?.split(':') ?? [];
if (!host || !port) throw new Error(`Malformed proxy address ${I2P_SAM_ADDRESS}`);
this.log('Importing I2P SAM package');
const { createForward } = await import('@diva.exchange/i2p-sam');
// try to get the last key pair that was used
const privateKey = this.app.secrets.get('i2pPrivateKey');
const publicKey = this.app.secrets.get('i2pPublicKey');
this.log('Creating forwarding stream');
this.forward = await createForward({
sam: {
host: host,
portTCP: parseInt(port),
privateKey,
publicKey,
},
forward: {
host: address.address,
port: address.port,
},
});
this.address = 'http://' + this.forward.getB32Address();
this.log(`Listening on ${this.address}`);
// save the key pair for later
this.app.secrets.set('i2pPrivateKey', this.forward.getPrivateKey());
this.app.secrets.set('i2pPublicKey', this.forward.getPublicKey());
} catch (error) {
this.running = false;
if (error instanceof Error) this.error = error;
}
}
async stop() {
if (!this.running) return;
this.running = false;
if (this.forward) {
this.log('Closing forwarding stream');
this.forward.close();
this.forward = undefined;
}
}
}

View File

@@ -0,0 +1,79 @@
import App from '../../../app/index.js';
import HyperInbound from './hyper.js';
import { logger } from '../../../logger.js';
import { getIPAddresses } from '../../../helpers/ip.js';
import TorInbound from './tor.js';
import ConfigManager from '../../config-manager.js';
import I2PInbound from './i2p.js';
/** manages all inbound servers on other networks: hyper, tor, i2p, etc... */
export default class InboundNetworkManager {
app: App;
log = logger.extend('Network:Inbound');
hyper: HyperInbound;
tor: TorInbound;
i2p: I2PInbound;
running = false;
get addresses() {
const ip = getIPAddresses();
const hyper = this.hyper.address;
const tor = this.tor.address;
return [...(ip ?? []), ...(tor ?? []), ...(hyper ?? [])];
}
constructor(app: App) {
this.app = app;
this.hyper = new HyperInbound(app);
this.tor = new TorInbound(app);
this.i2p = new I2PInbound(app);
this.listenToAppConfig(app.config);
}
private getAddress() {
const address = this.app.server.address();
if (typeof address === 'string' || address === null)
throw new Error('External servers started when server does not have an address');
return address;
}
private update(config = this.app.config.data) {
if (!this.running) return;
const address = this.getAddress();
if (this.hyper.available && config.hyperEnabled !== this.hyper.running) {
if (config.hyperEnabled) this.hyper.start(address);
else this.hyper.stop();
}
if (this.tor.available) {
if (!this.tor.running) this.tor.start(address);
}
if (this.i2p.available) {
if (!this.i2p.running) this.i2p.start(address);
}
}
/** A helper method to make the manager run off of the app config */
listenToAppConfig(config: ConfigManager) {
config.on('updated', this.update.bind(this));
}
start() {
this.running = true;
this.update();
}
async stop() {
this.running = false;
await this.hyper.stop();
await this.tor.stop();
await this.i2p.stop();
}
}

View File

@@ -0,0 +1,29 @@
import { AddressInfo } from 'net';
import App from '../../../app/index.js';
import { TOR_ADDRESS } from '../../../env.js';
import { logger } from '../../../logger.js';
import { InboundInterface } from '../interfaces.js';
export default class TorInbound implements InboundInterface {
app: App;
log = logger.extend('Network:Inbound:Tor');
readonly available = !!TOR_ADDRESS;
readonly running = !!TOR_ADDRESS;
readonly address = TOR_ADDRESS;
error?: Error;
constructor(app: App) {
this.app = app;
}
async start(address: AddressInfo) {
// not implemented yet
if (TOR_ADDRESS) this.log(`Listening on ${TOR_ADDRESS}`);
}
async stop() {
// not implemented yet
}
}

View File

@@ -0,0 +1,22 @@
import { AddressInfo } from 'net';
export interface InboundInterface {
available: boolean;
running: boolean;
error?: Error;
address?: string;
start(address: AddressInfo): Promise<void>;
stop(): Promise<void>;
}
export interface OutboundInterface {
available: boolean;
running: boolean;
error?: Error;
type: 'SOCKS5' | 'HTTP';
address?: string;
start(): Promise<void>;
stop(): Promise<void>;
}

View File

@@ -0,0 +1,57 @@
import type { createProxy } from 'hyper-socks5-proxy';
import getPort from 'get-port';
import EventEmitter from 'events';
import { logger } from '../../../logger.js';
import { OutboundInterface } from '../interfaces.js';
type EventMap = {
started: [];
stopped: [];
};
export default class HyperOutbound extends EventEmitter<EventMap> implements OutboundInterface {
log = logger.extend('Network:Outbound:Hyper');
private port?: number;
private proxy?: ReturnType<typeof createProxy>;
running = false;
error?: Error;
readonly type = 'SOCKS5';
address?: string;
get available() {
return true;
}
async start() {
if (this.running) return;
this.running = true;
try {
const { createProxy } = await import('hyper-socks5-proxy');
const { getOrCreateNode } = await import('../../../sidecars/hyperdht.js');
this.port = await getPort({ port: 1080 });
this.proxy = createProxy({ node: await getOrCreateNode() });
this.log('Starting SOCKS5 proxy');
this.address = `127.0.0.1:${this.port}`;
this.proxy.listen(this.port, '127.0.0.1');
this.log(`Proxy listening on ${this.address}`);
this.emit('started');
} catch (error) {
this.running = false;
if (error instanceof Error) this.error = error;
}
}
async stop() {
if (!this.running) return;
this.running = false;
this.log('Stopping');
await new Promise<void>((res) => this.proxy?.close(() => res()));
this.proxy = undefined;
this.emit('stopped');
}
}

View File

@@ -0,0 +1,31 @@
import { logger } from '../../../logger.js';
import { OutboundInterface } from '../interfaces.js';
import { I2P_PROXY, I2P_PROXY_TYPE } from '../../../env.js';
import { testTCPConnection } from '../../../helpers/network.js';
export default class I2POutbound implements OutboundInterface {
log = logger.extend('Network:Outbound:I2P');
running = false;
error?: Error;
readonly type = I2P_PROXY_TYPE;
readonly address = I2P_PROXY;
readonly available = !!I2P_PROXY;
async start() {
try {
if (this.running) return;
this.running = true;
this.log(`Connecting to ${I2P_PROXY}`);
const [host, port] = this.address?.split(':') ?? [];
if (!host || !port) throw new Error('Malformed proxy address');
await testTCPConnection(host, parseInt(port), 3000);
} catch (error) {
this.running = false;
if (error instanceof Error) this.error = error;
}
}
async stop() {}
}

View File

@@ -0,0 +1,134 @@
import { PacProxyAgent } from 'pac-proxy-agent';
import _throttle from 'lodash.throttle';
import { logger } from '../../../logger.js';
import ConfigManager from '../../config-manager.js';
import HyperOutbound from './hyper.js';
import TorOutbound from './tor.js';
import I2POutbound from './i2p.js';
export class OutboundNetworkManager {
log = logger.extend('Network:Outbound');
hyper: HyperOutbound;
tor: TorOutbound;
i2p: I2POutbound;
running = false;
agent: PacProxyAgent<string>;
enableHyperConnections = false;
enableTorConnections = false;
enableI2PConnections = false;
routeAllTrafficThroughTor = false;
constructor() {
this.hyper = new HyperOutbound();
this.tor = new TorOutbound();
this.i2p = new I2POutbound();
this.agent = new PacProxyAgent(this.buildPacURI(), { fallbackToDirect: true });
}
private buildPacURI() {
const statements: string[] = [];
if (this.i2p.available && this.enableI2PConnections) {
statements.push(
`
if (shExpMatch(host, "*.i2p"))
{
return "${this.i2p.type} ${this.i2p.address}";
}
`.trim(),
);
}
if (this.tor.available && this.enableTorConnections) {
statements.push(
`
if (shExpMatch(host, "*.onion"))
{
return "${this.tor.type} ${this.tor.address}";
}
`.trim(),
);
}
if (this.hyper.available && this.enableHyperConnections) {
statements.push(
`
if (shExpMatch(host, "*.hyper"))
{
return "${this.hyper.type} ${this.hyper.address}";
}
`.trim(),
);
}
if (this.routeAllTrafficThroughTor && this.tor.available) {
// if tor is available, route all traffic through it
statements.push(`${this.tor.type} ${this.tor.address}`);
this.log('Routing all traffic through tor proxy');
} else {
statements.push('return "DIRECT";');
}
const PACFile = `
// SPDX-License-Identifier: CC0-1.0
function FindProxyForURL(url, host)
{
${statements.join('\n')}
}
`.trim();
return 'pac+data:application/x-ns-proxy-autoconfig;base64,' + btoa(PACFile);
}
updateAgent(uri = this.buildPacURI()) {
this.log('Updating PAC proxy agent');
// copied from https://github.com/TooTallNate/proxy-agents/blob/main/packages/pac-proxy-agent/src/index.ts#L79C22-L79C51
this.agent.uri = new URL(uri.replace(/^pac\+/i, ''));
// forces the agent to refetch the resolver and pac file
this.agent.resolverPromise = undefined;
}
updateAgentThrottle: () => void = _throttle(this.updateAgent.bind(this), 100);
/** A helper method to make the manager run off of the app config */
listenToAppConfig(config: ConfigManager) {
config.on('updated', (c) => {
this.enableHyperConnections = c.hyperEnabled && c.enableHyperConnections;
this.enableTorConnections = c.enableTorConnections;
this.enableI2PConnections = c.enableI2PConnections;
this.routeAllTrafficThroughTor = c.routeAllTrafficThroughTor;
if (this.hyper.available && this.enableHyperConnections !== this.hyper.running) {
if (this.enableHyperConnections) this.hyper.start();
else this.hyper.stop();
}
if (this.tor.available && this.enableTorConnections !== this.tor.running) {
if (this.enableTorConnections) this.tor.start();
else this.tor.stop();
}
if (this.i2p.available && this.enableI2PConnections !== this.i2p.running) {
if (this.enableI2PConnections) this.i2p.start();
else this.i2p.stop();
}
this.updateAgentThrottle();
});
}
async stop() {
await this.hyper.stop();
await this.tor.stop();
}
}
const outboundNetwork = new OutboundNetworkManager();
export default outboundNetwork;

View File

@@ -0,0 +1,31 @@
import { logger } from '../../../logger.js';
import { OutboundInterface } from '../interfaces.js';
import { TOR_PROXY, TOR_PROXY_TYPE } from '../../../env.js';
import { testTCPConnection } from '../../../helpers/network.js';
export default class TorOutbound implements OutboundInterface {
log = logger.extend('Network:Outbound:Tor');
running = false;
error?: Error;
readonly type = TOR_PROXY_TYPE;
readonly address = TOR_PROXY;
readonly available = !!TOR_PROXY;
async start() {
try {
if (this.running) return;
this.running = true;
this.log(`Connecting to ${TOR_PROXY}`);
const [host, port] = this.address?.split(':') ?? [];
if (!host || !port) throw new Error('Malformed proxy address');
await testTCPConnection(host, parseInt(port), 3000);
} catch (error) {
this.running = false;
if (error instanceof Error) this.error = error;
}
}
async stop() {}
}

View File

@@ -0,0 +1,11 @@
import { ClientRequestArgs } from 'http';
import { ClientOptions, WebSocket } from 'ws';
import outboundNetwork from './index.js';
/** extends the WebSocket class from ws to always use the custom http agent */
export default class OutboundProxyWebSocket extends WebSocket {
constructor(address: string | URL, options?: ClientOptions | ClientRequestArgs) {
super(address, { agent: outboundNetwork.agent, ...options });
}
}

View File

@@ -0,0 +1,155 @@
import { NotificationChannel, WebPushNotification } from '@satellite-earth/core/types/control-api/notifications.js';
import { getDMRecipient, getDMSender, getUserDisplayName, parseKind0Event } from '@satellite-earth/core/helpers/nostr';
import { NostrEvent, kinds } from 'nostr-tools';
import { npubEncode } from 'nostr-tools/nip19';
import EventEmitter from 'events';
import webPush from 'web-push';
import dayjs from 'dayjs';
import { logger } from '../../logger.js';
import App from '../../app/index.js';
export type NotificationsManagerState = {
channels: NotificationChannel[];
};
type EventMap = {
addChannel: [NotificationChannel];
updateChannel: [NotificationChannel];
removeChannel: [NotificationChannel];
};
export default class NotificationsManager extends EventEmitter<EventMap> {
log = logger.extend('Notifications');
app: App;
lastRead: number = dayjs().unix();
webPushKeys: webPush.VapidKeys = webPush.generateVAPIDKeys();
state: NotificationsManagerState = { channels: [] };
get channels() {
return this.state.channels;
}
constructor(app: App) {
super();
this.app = app;
}
async setup() {
this.state = (
await this.app.state.getMutableState<NotificationsManagerState>('notification-manager', { channels: [] })
).proxy;
}
addOrUpdateChannel(channel: NotificationChannel) {
if (this.state.channels.some((c) => c.id === channel.id)) {
// update channel
this.log(`Updating channel ${channel.id} (${channel.type})`);
this.state.channels = this.state.channels.map((c) => {
if (c.id === channel.id) return channel;
else return c;
});
this.emit('updateChannel', channel);
} else {
// add channel
this.log(`Added new channel ${channel.id} (${channel.type})`);
this.state.channels = [...this.state.channels, channel];
this.emit('addChannel', channel);
}
}
removeChannel(id: string) {
const channel = this.state.channels.find((s) => s.id === id);
if (channel) {
this.log(`Removed channel ${id}`);
this.state.channels = this.state.channels.filter((s) => s.id !== id);
this.emit('removeChannel', channel);
}
}
/** Whether a notification should be sent */
shouldNotify(event: NostrEvent) {
if (event.kind !== kinds.EncryptedDirectMessage) return;
if (getDMRecipient(event) !== this.app.config.data.owner) return;
if (event.created_at > this.lastRead) return true;
}
/** builds a notification based on a nostr event */
async buildNotification(event: NostrEvent) {
// TODO in the future we might need to build special notifications for channel type
switch (event.kind) {
case kinds.EncryptedDirectMessage:
const sender = getDMSender(event);
const senderProfileEvent = await this.app.profileBook.loadProfile(sender);
const senderProfile = senderProfileEvent ? parseKind0Event(senderProfileEvent) : undefined;
const senderName = getUserDisplayName(senderProfile, sender);
return {
kind: event.kind,
event,
senderName,
senderProfile,
title: `Message from ${senderName}`,
body: 'Tap on notification to read',
icon: 'https://app.satellite.earth/logo-64x64.png',
// TODO: switch this to a satellite:// link once the native app supports it
url: `https://app.satellite.earth/messages/p/${npubEncode(sender)}`,
};
}
}
async notify(event: NostrEvent) {
const notification = await this.buildNotification(event);
if (!notification) return;
this.log(`Sending notification for ${event.id} to ${this.state.channels.length} channels`);
for (const channel of this.state.channels) {
this.log(`Sending notification "${notification.title}" to ${channel.id} (${channel.type})`);
try {
switch (channel.type) {
case 'web':
const pushNotification: WebPushNotification = {
title: notification.title,
body: notification.body,
icon: notification.icon,
url: notification.url,
event: notification.event,
};
await webPush.sendNotification(channel, JSON.stringify(pushNotification), {
vapidDetails: {
subject: 'mailto:admin@example.com',
publicKey: this.webPushKeys.publicKey,
privateKey: this.webPushKeys.privateKey,
},
});
break;
case 'ntfy':
const headers: HeadersInit = {
Title: notification.title,
Icon: notification.icon,
Click: notification.url,
};
await fetch(new URL(channel.topic, channel.server), {
method: 'POST',
body: notification.body,
headers,
}).then((res) => res.text());
break;
default:
// @ts-expect-error
throw new Error(`Unknown channel type ${channel.type}`);
}
} catch (error) {
this.log(`Failed to notification ${channel.id} (${channel.type})`);
this.log(error);
}
}
}
}

View File

@@ -0,0 +1,40 @@
import { NostrEvent, kinds } from 'nostr-tools';
import _throttle from 'lodash.throttle';
import { COMMON_CONTACT_RELAYS } from '../env.js';
import { logger } from '../logger.js';
import App from '../app/index.js';
import PubkeyBatchLoader from './pubkey-batch-loader.js';
/** loads kind 0 metadata for pubkeys */
export default class ProfileBook {
log = logger.extend('ProfileBook');
app: App;
loader: PubkeyBatchLoader;
extraRelays = COMMON_CONTACT_RELAYS;
constructor(app: App) {
this.app = app;
this.loader = new PubkeyBatchLoader(kinds.Metadata, this.app.pool, (pubkey) => {
return this.app.eventStore.getEventsForFilters([{ kinds: [kinds.Metadata], authors: [pubkey] }])?.[0];
});
this.loader.on('event', (event) => this.app.eventStore.addEvent(event));
this.loader.on('batch', (found, failed) => {
this.log(`Found ${found}, failed ${failed}, pending ${this.loader.queue}`);
});
}
getProfile(pubkey: string) {
return this.loader.getEvent(pubkey);
}
handleEvent(event: NostrEvent) {
this.loader.handleEvent(event);
}
async loadProfile(pubkey: string, relays: string[] = []) {
return this.loader.getOrLoadEvent(pubkey, relays);
}
}

View File

@@ -0,0 +1,175 @@
import { Filter, NostrEvent, SimplePool } from "nostr-tools";
import _throttle from "lodash.throttle";
import { EventEmitter } from "events";
import { getInboxes, getOutboxes } from "@satellite-earth/core/helpers/nostr/mailboxes.js";
import SuperMap from "@satellite-earth/core/helpers/super-map.js";
import { Deferred, createDefer } from "applesauce-core/promise";
import { COMMON_CONTACT_RELAYS } from "../env.js";
type EventMap = {
event: [NostrEvent];
batch: [number, number];
};
/** Loads 10002 events for pubkeys */
export default class PubkeyBatchLoader extends EventEmitter<EventMap> {
extraRelays = COMMON_CONTACT_RELAYS;
kind: number;
pool: SimplePool;
loadFromCache?: (pubkey: string) => NostrEvent | undefined;
get queue() {
return this.next.size;
}
failed = new SuperMap<string, Set<string>>(() => new Set());
constructor(kind: number, pool: SimplePool, loadFromCache?: (pubkey: string) => NostrEvent | undefined) {
super();
this.kind = kind;
this.pool = pool;
this.loadFromCache = loadFromCache;
}
private cache = new Map<string, NostrEvent>();
getEvent(pubkey: string) {
if (this.cache.has(pubkey)) return this.cache.get(pubkey)!;
const event = this.loadFromCache?.(pubkey);
if (event) {
this.cache.set(pubkey, event);
return event;
}
}
getOutboxes(pubkey: string) {
const mailboxes = this.getEvent(pubkey);
return mailboxes && getOutboxes(mailboxes);
}
getInboxes(pubkey: string) {
const mailboxes = this.getEvent(pubkey);
return mailboxes && getInboxes(mailboxes);
}
handleEvent(event: NostrEvent) {
if (event.kind === this.kind) {
this.emit("event", event);
const current = this.cache.get(event.pubkey);
if (!current || event.created_at > current.created_at) this.cache.set(event.pubkey, event);
}
}
/** next queue */
private next = new Map<string, string[]>();
/** currently fetching */
private fetching = new Map<string, string[]>();
/** promises for next and fetching */
private pending = new Map<string, Deferred<NostrEvent | null>>();
private fetchEventsThrottle = _throttle(this.fetchEvents.bind(this), 1000);
private async fetchEvents() {
if (this.fetching.size > 0 || this.next.size === 0) return;
// copy all from next queue to fetching queue
for (const [pubkey, relays] of this.next) this.fetching.set(pubkey, relays);
this.next.clear();
if (this.fetching.size > 0) {
const filters: Record<string, Filter> = {};
for (const [pubkey, relays] of this.fetching) {
for (const relay of relays) {
filters[relay] = filters[relay] || { kinds: [this.kind], authors: [] };
if (!filters[relay].authors?.includes(pubkey)) {
filters[relay].authors?.push(pubkey);
}
}
}
const requests: Record<string, Filter[]> = {};
for (const [relay, filter] of Object.entries(filters)) requests[relay] = [filter];
await new Promise<void>((res) => {
const sub = this.pool.subscribeManyMap(requests, {
onevent: (event) => this.handleEvent(event),
oneose: () => {
sub.close();
// resolve all pending promises
let failed = 0;
let found = 0;
for (const [pubkey, relays] of this.fetching) {
const p = this.pending.get(pubkey);
if (p) {
const event = this.getEvent(pubkey) ?? null;
p.resolve(event);
if (!event) {
failed++;
for (const url of relays) this.failed.get(pubkey).add(url);
p.reject();
} else found++;
this.pending.delete(pubkey);
}
}
this.fetching.clear();
this.emit("batch", found, failed);
res();
},
});
});
// if there are pending requests, make another request
if (this.next.size > 0) this.fetchEventsThrottle();
}
}
getOrLoadEvent(pubkey: string, relays: string[] = []): Promise<NostrEvent | null> {
// if its in the cache, return it
const event = this.getEvent(pubkey);
if (event) return Promise.resolve(event);
// if its already being fetched, return promise
const pending = this.pending.get(pubkey);
if (pending) return pending;
return this.loadEvent(pubkey, relays);
}
loadEvent(pubkey: string, relays: string[] = [], ignoreFailed = false): Promise<NostrEvent | null> {
const urls = new Set(this.next.get(pubkey));
// add relays
for (const url of relays) urls.add(url);
// add extra relays
for (const url of this.extraRelays) urls.add(url);
// filter out failed relays
if (!ignoreFailed) {
const failed = this.failed.get(pubkey);
for (const url of failed) urls.delete(url);
}
if (urls.size === 0) {
// nothing new to try return null
return Promise.resolve(null);
}
// create a promise
const defer = createDefer<NostrEvent | null>();
this.pending.set(pubkey, defer);
// add pubkey and relay to next queue
this.next.set(pubkey, Array.from(urls));
// trigger queue
this.fetchEventsThrottle();
return defer;
}
}

View File

@@ -0,0 +1,259 @@
import EventEmitter from 'events';
import { NostrEvent, SimplePool, Filter } from 'nostr-tools';
import SuperMap from '@satellite-earth/core/helpers/super-map.js';
import { AbstractRelay, Subscription, SubscriptionParams } from 'nostr-tools/abstract-relay';
import { getPubkeysFromList } from '@satellite-earth/core/helpers/nostr/lists.js';
import { getInboxes, getOutboxes } from '@satellite-earth/core/helpers/nostr/mailboxes.js';
import { getRelaysFromContactList } from '@satellite-earth/core/helpers/nostr/contacts.js';
import { BOOTSTRAP_RELAYS } from '../../env.js';
import { logger } from '../../logger.js';
import App from '../../app/index.js';
/** creates a new subscription and waits for it to get an event or close */
function asyncSubscription(relay: AbstractRelay, filters: Filter[], opts: SubscriptionParams) {
let resolved = false;
return new Promise<Subscription>((res, rej) => {
const sub = relay.subscribe(filters, {
onevent: (event) => {
if (!resolved) res(sub);
opts.onevent?.(event);
},
oneose: () => {
if (!resolved) res(sub);
opts.oneose?.();
},
onclose: (reason) => {
if (!resolved) rej(new Error(reason));
opts.onclose?.(reason);
},
});
});
}
type EventMap = {
started: [Receiver];
stopped: [Receiver];
status: [string];
rebuild: [];
subscribed: [string, string[]];
closed: [string, string[]];
error: [Error];
event: [NostrEvent];
};
type ReceiverStatus = 'running' | 'starting' | 'errored' | 'stopped';
export default class Receiver extends EventEmitter<EventMap> {
log = logger.extend('Receiver');
_status: ReceiverStatus = 'stopped';
get status() {
return this._status;
}
set status(v: ReceiverStatus) {
this._status = v;
this.emit('status', v);
}
starting = true;
startupError?: Error;
app: App;
pool: SimplePool;
subscriptions = new Map<string, Subscription>();
constructor(app: App, pool?: SimplePool) {
super();
this.app = app;
this.pool = pool || app.pool;
}
// pubkey -> relays
private pubkeyRelays = new Map<string, Set<string>>();
// relay url -> pubkeys
private relayPubkeys = new SuperMap<string, Set<string>>(() => new Set());
// the current request map in the format of relay -> pubkeys
map = new SuperMap<string, Set<string>>(() => new Set());
async fetchData() {
const owner = this.app.config.data.owner;
if (!owner) throw new Error('Missing owner');
const ownerMailboxes = await this.app.addressBook.loadMailboxes(owner);
const ownerInboxes = getInboxes(ownerMailboxes);
const ownerOutboxes = getOutboxes(ownerMailboxes);
this.log('Searching for owner kind:3 contacts');
const contacts = await this.app.contactBook.loadContacts(owner);
if (!contacts) throw new Error('Cant find contacts');
this.pubkeyRelays.clear();
this.relayPubkeys.clear();
// add the owners details
this.pubkeyRelays.set(owner, new Set(ownerOutboxes));
for (const url of ownerOutboxes) this.relayPubkeys.get(url).add(owner);
const people = getPubkeysFromList(contacts);
this.log(`Found ${people.length} contacts`);
let usersWithMailboxes = 0;
let usersWithContactRelays = 0;
let usersWithFallbackRelays = 0;
// fetch all addresses in parallel
await Promise.all(
people.map(async (person) => {
const mailboxes = await this.app.addressBook.loadMailboxes(person.pubkey, ownerInboxes ?? BOOTSTRAP_RELAYS);
let relays = getOutboxes(mailboxes);
// if the user does not have any mailboxes try to get the relays stored in the contact list
if (relays.length === 0) {
this.log(`Failed to find mailboxes for ${person.pubkey}`);
const contacts = await this.app.contactBook.loadContacts(person.pubkey, ownerInboxes ?? BOOTSTRAP_RELAYS);
if (contacts && contacts.content.startsWith('{')) {
const parsed = getRelaysFromContactList(contacts);
if (parsed) {
relays = parsed.filter((r) => r.write).map((r) => r.url);
usersWithContactRelays++;
} else {
relays = BOOTSTRAP_RELAYS;
usersWithFallbackRelays++;
}
} else {
relays = BOOTSTRAP_RELAYS;
usersWithFallbackRelays++;
}
} else usersWithMailboxes++;
// add pubkey details
this.pubkeyRelays.set(person.pubkey, new Set(relays));
for (const url of relays) this.relayPubkeys.get(url).add(person.pubkey);
}),
);
this.log(
`Found ${usersWithMailboxes} users with mailboxes, ${usersWithContactRelays} user with relays in contact list, and ${usersWithFallbackRelays} using fallback relays`,
);
}
buildMap() {
this.map.clear();
// sort pubkey relays by popularity
for (const [pubkey, relays] of this.pubkeyRelays) {
const sorted = Array.from(relays).sort((a, b) => this.relayPubkeys.get(b).size - this.relayPubkeys.get(a).size);
// add the pubkey to their top two relays
for (const url of sorted.slice(0, 2)) this.map.get(url).add(pubkey);
}
this.emit('rebuild');
return this.map;
}
private handleEvent(event: NostrEvent) {
this.emit('event', event);
}
async updateRelaySubscription(url: string) {
const pubkeys = this.map.get(url);
if (pubkeys.size === 0) return;
const subscription = this.subscriptions.get(url);
if (!subscription || subscription.closed) {
const relay = await this.app.pool.ensureRelay(url);
const sub = relay.subscribe([{ authors: Array.from(pubkeys) }], {
onevent: this.handleEvent.bind(this),
onclose: () => {
this.emit('closed', url, Array.from(pubkeys));
// wait 30 seconds then try to reconnect
setTimeout(() => {
this.updateRelaySubscription(url);
}, 30_000);
},
});
this.emit('subscribed', url, Array.from(pubkeys));
this.subscriptions.set(url, sub);
this.log(`Subscribed to ${url} for ${pubkeys.size} pubkeys`);
} else {
const hasOld = subscription.filters[0].authors?.some((p) => !pubkeys.has(p));
const hasNew = Array.from(pubkeys).some((p) => !subscription.filters[0].authors?.includes(p));
if (hasNew || hasOld) {
// reset the subscription
subscription.eosed = false;
subscription.filters = [{ authors: Array.from(pubkeys) }];
subscription.fire();
this.log(`Subscribed to ${url} with ${pubkeys.size} pubkeys`);
}
}
}
ensureSubscriptions() {
const promises: Promise<void>[] = [];
for (const [url, pubkeys] of this.map) {
const p = this.updateRelaySubscription(url).catch((error) => {
// failed to connect to relay
// this needs to be remembered and the subscription map should be rebuilt accordingly
});
promises.push(p);
}
return Promise.all(promises);
}
async start() {
if (this.status === 'running' || this.status === 'starting') return;
try {
this.log('Starting');
this.startupError = undefined;
this.status = 'starting';
await this.fetchData();
this.buildMap();
await this.ensureSubscriptions();
this.status = 'running';
this.emit('started', this);
} catch (error) {
this.status = 'errored';
if (error instanceof Error) {
this.startupError = error;
this.log(`Failed to start receiver`, error.message);
this.emit('error', error);
}
}
}
/** stop receiving events and disconnect from all relays */
stop() {
if (this.status === 'stopped') return;
this.status = 'stopped';
for (const [relay, sub] of this.subscriptions) sub.close();
this.subscriptions.clear();
this.log('Stopped');
this.emit('stopped', this);
}
destroy() {
this.stop();
this.removeAllListeners();
}
}

View File

@@ -0,0 +1,77 @@
import { WebSocket } from "ws";
import { ReportErrorMessage, ReportResultMessage } from "@satellite-earth/core/types/control-api/reports.js";
import { ReportArguments, ReportResults } from "@satellite-earth/core/types";
import type App from "../../app/index.js";
import { logger } from "../../logger.js";
type f = () => void;
export default class Report<T extends keyof ReportResults> {
id: string;
// @ts-expect-error
readonly type: T = "";
socket: WebSocket | NodeJS.Process;
app: App;
running = false;
log = logger.extend("Report");
args?: ReportArguments[T];
private setupTeardown?: void | f;
constructor(id: string, app: App, socket: WebSocket | NodeJS.Process) {
this.id = id;
this.socket = socket;
this.app = app;
this.log = logger.extend("Report:" + this.type);
}
private sendError(message: string) {
this.socket.send?.(JSON.stringify(["CONTROL", "REPORT", "ERROR", this.id, message] satisfies ReportErrorMessage));
}
// override when extending
/** This method is run only once when the report starts */
async setup(args: ReportArguments[T]): Promise<void | f> {}
/** this method is run every time the client sends new arguments */
async execute(args: ReportArguments[T]) {}
/** this method is run when the report is closed */
cleanup() {}
// private methods
protected send(result: ReportResults[T]) {
this.socket.send?.(
JSON.stringify(["CONTROL", "REPORT", "RESULT", this.id, result] satisfies ReportResultMessage<T>),
);
}
// public api
async run(args: ReportArguments[T]) {
try {
this.args = args;
if (this.running === false) {
// hack to make sure the .log is extended correctly
this.log = logger.extend("Report:" + this.type);
this.setupTeardown = await this.setup(args);
}
this.log(`Executing with args`, JSON.stringify(args));
await this.execute(args);
this.running = true;
} catch (error) {
if (error instanceof Error) this.sendError(error.message);
else this.sendError("Unknown server error");
if (error instanceof Error) this.log("Error: " + error.message);
throw error;
}
}
close() {
this.setupTeardown?.();
this.cleanup();
this.running = false;
}
}

View File

@@ -0,0 +1,115 @@
import { ReportArguments, ReportResults } from '@satellite-earth/core/types';
import { NostrEvent } from 'nostr-tools';
import { getTagValue } from '@satellite-earth/core/helpers/nostr';
import SuperMap from '@satellite-earth/core/helpers/super-map.js';
import Report from '../report.js';
export default class ConversationsReport extends Report<'CONVERSATIONS'> {
readonly type = 'CONVERSATIONS';
private async getConversationResult(self: string, other: string) {
const sent = this.app.database.db
.prepare<[string, string], { pubkey: string; count: number; lastMessage: number }>(
`
SELECT tags.v as pubkey, count(events.id) as count, max(events.created_at) as lastMessage FROM tags
INNER JOIN events ON events.id = tags.e
WHERE events.kind = 4 AND tags.t = 'p' AND events.pubkey = ? AND tags.v = ?`,
)
.get(self, other);
const received = this.app.database.db
.prepare<[string, string], { pubkey: string; count: number; lastMessage: number }>(
`
SELECT events.pubkey, count(events.id) as count, max(events.created_at) as lastMessage FROM events
INNER JOIN tags ON tags.e = events.id
WHERE events.kind = 4 AND tags.t = 'p' AND tags.v = ? AND events.pubkey = ?`,
)
.get(self, other);
const result: ReportResults['CONVERSATIONS'] = {
pubkey: other,
count: (received?.count ?? 0) + (sent?.count ?? 0),
sent: 0,
received: 0,
};
if (received) {
result.received = received.count;
result.lastReceived = received.lastMessage;
}
if (sent) {
result.sent = sent.count;
result.lastSent = sent.lastMessage;
}
return result;
}
private async getAllConversationResults(self: string) {
const sent = this.app.database.db
.prepare<[string], { pubkey: string; count: number; lastMessage: number }>(
`
SELECT tags.v as pubkey, count(tags.v) as count, max(events.created_at) as lastMessage FROM tags
INNER JOIN events ON events.id = tags.e
WHERE events.kind = 4 AND tags.t = 'p' AND events.pubkey = ?
GROUP BY tags.v`,
)
.all(self);
const received = this.app.database.db
.prepare<[string], { pubkey: string; count: number; lastMessage: number }>(
`
SELECT events.pubkey, count(events.pubkey) as count, max(events.created_at) as lastMessage FROM events
INNER JOIN tags ON tags.e = events.id
WHERE events.kind = 4 AND tags.t = 'p' AND tags.v = ?
GROUP BY events.pubkey`,
)
.all(self);
const results = new SuperMap<string, ReportResults['CONVERSATIONS']>((pubkey) => ({
pubkey,
count: sent.length + received.length,
sent: 0,
received: 0,
}));
for (const { pubkey, count, lastMessage } of received) {
const result = results.get(pubkey);
result.received = count;
result.lastReceived = lastMessage;
}
for (const { pubkey, count, lastMessage } of sent) {
const result = results.get(pubkey);
result.sent = count;
result.lastSent = lastMessage;
}
return Array.from(results.values()).sort(
(a, b) => Math.max(b.lastReceived ?? 0, b.lastSent ?? 0) - Math.max(a.lastReceived ?? 0, a.lastSent ?? 0),
);
}
async setup(args: ReportArguments['CONVERSATIONS']) {
const listener = (event: NostrEvent) => {
const from = event.pubkey;
const to = getTagValue(event, 'p');
if (!to) return;
const self = args.pubkey;
// get the latest stats from the database
this.getConversationResult(self, self === from ? to : from).then((result) => this.send(result));
};
this.app.directMessageManager.on('message', listener);
return () => this.app.directMessageManager.off('message', listener);
}
async execute(args: ReportArguments['CONVERSATIONS']) {
const results = await this.getAllConversationResults(args.pubkey);
for (const result of results) {
this.send(result);
}
}
}

View File

@@ -0,0 +1,11 @@
import { ReportArguments } from '@satellite-earth/core/types';
import Report from '../report.js';
export default class DMSearchReport extends Report<'DM_SEARCH'> {
readonly type = 'DM_SEARCH';
async execute(args: ReportArguments['DM_SEARCH']) {
const results = await this.app.decryptionCache.search(args.query, args);
for (const result of results) this.send(result);
}
}

View File

@@ -0,0 +1,69 @@
import { ReportArguments } from '@satellite-earth/core/types';
import { EventRow, parseEventRow } from '@satellite-earth/core';
import Report from '../report.js';
export default class EventsSummaryReport extends Report<'EVENTS_SUMMARY'> {
readonly type = 'EVENTS_SUMMARY';
async execute(args: ReportArguments['EVENTS_SUMMARY']): Promise<void> {
let sql = `
SELECT
events.*,
COUNT(l.id) AS reactions,
COUNT(s.id) AS shares,
COUNT(r.id) AS replies,
(events.kind || ':' || events.pubkey || ':' || events.d) as address
FROM events
LEFT JOIN tags ON ( tags.t = 'e' AND tags.v = events.id ) OR ( tags.t = 'a' AND tags.v = address )
LEFT JOIN events AS l ON l.id = tags.e AND l.kind = 7
LEFT JOIN events AS s ON s.id = tags.e AND (s.kind = 6 OR s.kind = 16)
LEFT JOIN events AS r ON r.id = tags.e AND r.kind = 1
`;
const params: any[] = [];
const conditions: string[] = [];
if (args.kind !== undefined) {
conditions.push(`events.kind = ?`);
params.push(args.kind);
}
if (args.pubkey !== undefined) {
conditions.push(`events.pubkey = ?`);
params.push(args.pubkey);
}
if (conditions.length > 0) {
sql += ` WHERE ${conditions.join(' AND ')}\n`;
}
sql += ' GROUP BY events.id\n';
switch (args.order) {
case 'created_at':
sql += ` ORDER BY events.created_at DESC\n`;
break;
default:
case 'interactions':
sql += ` ORDER BY reactions + shares + replies DESC\n`;
break;
}
let limit = args.limit || 100;
sql += ` LIMIT ?`;
params.push(limit);
const rows = await this.app.database.db
.prepare<any[], EventRow & { reactions: number; shares: number; replies: number }>(sql)
.all(...params);
const results = rows.map((row) => {
const event = parseEventRow(row);
return { event, reactions: row.reactions, shares: row.shares, replies: row.replies };
});
for (const result of results) {
this.send(result);
}
}
}

View File

@@ -0,0 +1,30 @@
import { ReportArguments } from '@satellite-earth/core/types';
import Report from '../report.js';
import OverviewReport from './overview.js';
import ConversationsReport from './conversations.js';
import LogsReport from './logs.js';
import ServicesReport from './services.js';
import DMSearchReport from './dm-search.js';
import ScrapperStatusReport from './scrapper-status.js';
import ReceiverStatusReport from './receiver-status.js';
import NetworkStatusReport from './network-status.js';
import NotificationChannelsReport from './notification-channels.js';
import EventsSummaryReport from './events-summary.js';
const REPORT_CLASSES: {
[k in keyof ReportArguments]?: typeof Report<k>;
} = {
OVERVIEW: OverviewReport,
CONVERSATIONS: ConversationsReport,
LOGS: LogsReport,
SERVICES: ServicesReport,
DM_SEARCH: DMSearchReport,
SCRAPPER_STATUS: ScrapperStatusReport,
RECEIVER_STATUS: ReceiverStatusReport,
NETWORK_STATUS: NetworkStatusReport,
NOTIFICATION_CHANNELS: NotificationChannelsReport,
EVENTS_SUMMARY: EventsSummaryReport,
};
export default REPORT_CLASSES;

View File

@@ -0,0 +1,23 @@
import { ReportArguments } from '@satellite-earth/core/types';
import { LogEntry } from '../../log-store/log-store.js';
import Report from '../report.js';
/** WARNING: be careful of calling this.log in this class. it could trigger an infinite loop of logging */
export default class LogsReport extends Report<'LOGS'> {
readonly type = 'LOGS';
async setup() {
const listener = (entry: LogEntry) => {
if (!this.args?.service || entry.service === this.args.service) this.send(entry);
};
this.app.logStore.on('log', listener);
return () => this.app.logStore.off('log', listener);
}
async execute(args: ReportArguments['LOGS']) {
const logs = this.app.logStore.getLogs({ service: args.service, limit: 500 });
for (const entry of logs) this.send(entry);
}
}

View File

@@ -0,0 +1,69 @@
import Report from '../report.js';
export default class NetworkStatusReport extends Report<'NETWORK_STATUS'> {
readonly type = 'NETWORK_STATUS';
update() {
const torIn = this.app.inboundNetwork.tor;
const torOut = this.app.outboundNetwork.tor;
const hyperIn = this.app.inboundNetwork.hyper;
const hyperOut = this.app.outboundNetwork.hyper;
const i2pIn = this.app.inboundNetwork.i2p;
const i2pOut = this.app.outboundNetwork.i2p;
this.send({
tor: {
inbound: {
available: torIn.available,
running: torIn.running,
error: torIn.error?.message,
address: torIn.address,
},
outbound: {
available: torOut.available,
running: torOut.running,
error: torOut.error?.message,
},
},
hyper: {
inbound: {
available: hyperIn.available,
running: hyperIn.running,
error: hyperIn.error?.message,
address: hyperIn.address,
},
outbound: {
available: hyperOut.available,
running: hyperOut.running,
error: hyperOut.error?.message,
},
},
i2p: {
inbound: {
available: i2pIn.available,
running: i2pIn.running,
error: i2pIn.error?.message,
address: i2pIn.address,
},
outbound: {
available: i2pOut.available,
running: i2pOut.running,
error: i2pOut.error?.message,
},
},
});
}
async setup() {
const listener = this.update.bind(this);
// NOTE: set and interval since there are not events to listen to yet
const i = setInterval(listener, 1000);
return () => clearInterval(i);
}
async execute(args: {}): Promise<void> {
this.update();
}
}

View File

@@ -0,0 +1,29 @@
import { NotificationChannel } from '@satellite-earth/core/types/control-api/notifications.js';
import Report from '../report.js';
export default class NotificationChannelsReport extends Report<'NOTIFICATION_CHANNELS'> {
readonly type = 'NOTIFICATION_CHANNELS';
async setup() {
const listener = this.send.bind(this);
const removeListener = (channel: NotificationChannel) => {
this.send(['removed', channel.id]);
};
this.app.notifications.on('addChannel', listener);
this.app.notifications.on('updateChannel', listener);
this.app.notifications.on('removeChannel', removeListener);
return () => {
this.app.notifications.off('addChannel', listener);
this.app.notifications.off('updateChannel', listener);
this.app.notifications.off('removeChannel', removeListener);
};
}
async execute(args: {}): Promise<void> {
for (const channel of this.app.notifications.channels) {
this.send(channel);
}
}
}

View File

@@ -0,0 +1,40 @@
import { NostrEvent } from 'nostr-tools';
import { ReportArguments } from '@satellite-earth/core/types';
import Report from '../report.js';
export default class OverviewReport extends Report<'OVERVIEW'> {
readonly type = 'OVERVIEW';
async setup() {
const listener = (event: NostrEvent) => {
// update summary for pubkey
const result = this.app.database.db
.prepare<
[string],
{ pubkey: string; events: number; active: number }
>(`SELECT pubkey, COUNT(events.id) as \`events\`, MAX(created_at) as \`active\` FROM events WHERE pubkey=?`)
.get(event.pubkey);
if (result) this.send(result);
};
this.app.eventStore.on('event:inserted', listener);
return () => {
this.app.eventStore.off('event:inserted', listener);
};
}
async execute(args: ReportArguments['OVERVIEW']) {
const results = await this.app.database.db
.prepare<
[],
{ pubkey: string; events: number; active: number }
>(`SELECT pubkey, COUNT(events.id) as \`events\`, MAX(created_at) as \`active\` FROM events GROUP BY pubkey ORDER BY \`events\` DESC`)
.all();
for (const result of results) {
this.send(result);
}
}
}

View File

@@ -0,0 +1,38 @@
import Report from '../report.js';
export default class ReceiverStatusReport extends Report<'RECEIVER_STATUS'> {
readonly type = 'RECEIVER_STATUS';
update() {
this.send({
status: this.app.receiver.status,
startError: this.app.receiver.startupError?.message,
subscriptions: Array.from(this.app.receiver.map).map(([relay, pubkeys]) => ({
relay,
pubkeys: Array.from(pubkeys),
active: !!this.app.receiver.subscriptions.get(relay),
closed: !!this.app.receiver.subscriptions.get(relay)?.closed,
})),
});
}
async setup() {
const listener = this.update.bind(this);
this.app.receiver.on('status', listener);
this.app.receiver.on('subscribed', listener);
this.app.receiver.on('closed', listener);
this.app.receiver.on('error', listener);
return () => {
this.app.receiver.off('status', listener);
this.app.receiver.off('subscribed', listener);
this.app.receiver.off('closed', listener);
this.app.receiver.off('error', listener);
};
}
async execute(args: {}): Promise<void> {
this.update();
}
}

View File

@@ -0,0 +1,55 @@
import { NostrEvent } from 'nostr-tools';
import _throttle from 'lodash.throttle';
import Report from '../report.js';
export default class ScrapperStatusReport extends Report<'SCRAPPER_STATUS'> {
readonly type = 'SCRAPPER_STATUS';
eventsPerSecond: number[] = [0];
update() {
const averageEventsPerSecond = this.eventsPerSecond.reduce((m, v) => m + v, 0) / this.eventsPerSecond.length;
const pubkeys = this.app.scrapper.state.pubkeys;
let activeSubscriptions = 0;
for (const [pubkey, scrapper] of this.app.scrapper.scrappers) {
for (const [relay, relayScrapper] of scrapper.relayScrappers) {
if (relayScrapper.running) activeSubscriptions++;
}
}
this.send({
running: this.app.scrapper.running,
eventsPerSecond: averageEventsPerSecond,
activeSubscriptions,
pubkeys,
});
}
async setup() {
const onEvent = (event: NostrEvent) => {
this.eventsPerSecond[0]++;
};
this.app.scrapper.on('event', onEvent);
const tick = setInterval(() => {
// start a new second
this.eventsPerSecond.unshift(0);
// limit to 60 seconds
while (this.eventsPerSecond.length > 60) this.eventsPerSecond.pop();
this.update();
}, 1000);
return () => {
this.app.scrapper.off('event', onEvent);
clearInterval(tick);
};
}
async execute(args: {}): Promise<void> {
this.update();
}
}

View File

@@ -0,0 +1,12 @@
import Report from '../report.js';
export default class ServicesReport extends Report<'SERVICES'> {
readonly type = 'SERVICES';
async execute() {
const services = this.app.database.db
.prepare<[], { id: string }>(`SELECT service as id FROM logs GROUP BY service`)
.all();
for (const service of services) this.send(service);
}
}

View File

@@ -0,0 +1,145 @@
import SuperMap from "@satellite-earth/core/helpers/super-map.js";
import { EventEmitter } from "events";
import { NostrEvent } from "nostr-tools";
import { Deferred, createDefer } from "applesauce-core/promise";
import App from "../../app/index.js";
import { logger } from "../../logger.js";
import { getPubkeysFromList } from "@satellite-earth/core/helpers/nostr/lists.js";
import PubkeyScrapper from "./pubkey-scrapper.js";
const MAX_TASKS = 10;
type EventMap = {
event: [NostrEvent];
};
export type ScrapperState = {
pubkeys: string[];
};
export default class Scrapper extends EventEmitter<EventMap> {
app: App;
log = logger.extend("scrapper:service");
state: ScrapperState = { pubkeys: [] };
// pubkey -> relay -> scrapper
scrappers = new SuperMap<string, PubkeyScrapper>((pubkey) => {
const scrapper = new PubkeyScrapper(this.app, pubkey);
scrapper.on("event", (event) => this.emit("event", event));
return scrapper;
});
constructor(app: App) {
super();
this.app = app;
}
async setup() {
this.state = (await this.app.state.getMutableState<ScrapperState>("scrapper", { pubkeys: [] })).proxy;
}
async ensureData() {
if (!this.app.config.data.owner) throw new Error("Owner not setup yet");
// get mailboxes and contacts
const mailboxes = await this.app.addressBook.loadMailboxes(this.app.config.data.owner);
const contacts = await this.app.contactBook.loadContacts(this.app.config.data.owner);
if (!contacts) throw new Error("Missing contact list");
return { contacts: getPubkeysFromList(contacts), mailboxes };
}
private async scrapeOwner() {
if (!this.running) return;
try {
if (!this.app.config.data.owner) throw new Error("Owner not setup yet");
const scrapper = this.scrappers.get(this.app.config.data.owner);
await scrapper.loadNext();
} catch (error) {
// eat error
}
setTimeout(this.scrapeOwner.bind(this), 1000);
}
private async scrapeForPubkey(pubkey: string, relay?: string) {
const scrapper = this.scrappers.get(pubkey);
if (relay) scrapper.additionalRelays = [relay];
return await scrapper.loadNext();
}
tasks = new Set<Promise<any>>();
private block?: Deferred<void>;
private waitForBlock() {
if (this.block) return this.block;
this.block = createDefer();
return this.block;
}
private unblock() {
if (this.block) {
this.block?.resolve();
this.block = undefined;
}
}
async scrapeContacts() {
if (!this.running) return;
const { contacts } = await this.ensureData();
for (const person of contacts) {
// check if the pubkey should be scraped
if (!this.state.pubkeys.includes(person.pubkey)) continue;
// await here if the task queue if full
if (this.tasks.size >= MAX_TASKS) await this.waitForBlock();
// check running again since this is resuming
if (!this.running) return;
const promise = this.scrapeForPubkey(person.pubkey, person.relay);
// add it to the tasks array
this.tasks.add(promise);
promise
.catch((err) => {
// eat the error
})
.finally(() => {
this.tasks.delete(promise);
this.unblock();
});
}
// set timeout for next batch
setTimeout(this.scrapeContacts.bind(this), 1000);
}
running = false;
start() {
this.running = true;
this.scrapeOwner();
this.scrapeContacts();
}
stop() {
this.running = false;
}
addPubkey(pubkey: string) {
this.state.pubkeys.push(pubkey);
}
removePubkey(pubkey: string) {
this.state.pubkeys = this.state.pubkeys.filter((p) => p !== pubkey);
}
}

View File

@@ -0,0 +1,115 @@
import dayjs from 'dayjs';
import { EventEmitter } from 'events';
import { NostrEvent } from 'nostr-tools';
import { Debugger } from 'debug';
import { AbstractRelay, Subscription } from 'nostr-tools/abstract-relay';
import { logger } from '../../logger.js';
function stripProtocol(url: string) {
return url.replace(/^\w+\:\/\//, '');
}
const DEFAULT_LIMIT = 1000;
export type PubkeyRelayScrapperState = {
cursor?: number;
complete?: boolean;
};
type EventMap = {
event: [NostrEvent];
chunk: [{ count: number; cursor: number }];
};
export default class PubkeyRelayScrapper extends EventEmitter<EventMap> {
pubkey: string;
relay: AbstractRelay;
log: Debugger;
running = false;
error?: Error;
state: PubkeyRelayScrapperState = {};
get cursor() {
return this.state.cursor || dayjs().unix();
}
set cursor(v: number) {
this.state.cursor = v;
}
get complete() {
return this.state.complete || false;
}
set complete(v: boolean) {
this.state.complete = v;
}
private subscription?: Subscription;
constructor(pubkey: string, relay: AbstractRelay, state?: PubkeyRelayScrapperState) {
super();
this.pubkey = pubkey;
this.relay = relay;
if (state) this.state = state;
this.log = logger.extend('scrapper:' + pubkey + ':' + stripProtocol(relay.url));
}
async loadNext() {
// don't run if its already running, complete, or has an error
if (this.running || this.complete || this.error) return;
this.running = true;
// wait for relay connection
await this.relay.connect();
const cursor = this.state.cursor || dayjs().unix();
this.log(`Requesting from ${dayjs.unix(cursor).format('lll')} (${cursor})`);
// return a promise to wait for the subscription to end
return new Promise<void>((res, rej) => {
let count = 0;
let newCursor = cursor;
this.subscription = this.relay.subscribe([{ authors: [this.pubkey], until: cursor, limit: DEFAULT_LIMIT }], {
onevent: (event) => {
this.emit('event', event);
count++;
newCursor = Math.min(newCursor, event.created_at);
},
oneose: () => {
this.running = false;
// if no events where returned, mark complete
if (count === 0) {
// connection closed before events could be returned, ignore complete
if (this.subscription?.closed === true) return;
this.complete = true;
this.log('Got 0 events, complete');
} else {
this.log(`Got ${count} events and moved cursor to ${dayjs.unix(newCursor).format('lll')} (${newCursor})`);
}
this.state.cursor = newCursor - 1;
this.emit('chunk', { count, cursor: this.cursor });
this.subscription?.close();
res();
},
onclose: (reason) => {
if (reason !== 'closed by caller') {
// unexpected close
this.log(`Error: ${reason}`);
this.error = new Error(reason);
rej(this.error);
}
res();
},
});
});
}
}

View File

@@ -0,0 +1,83 @@
import App from '../../app/index.js';
import { NostrEvent } from 'nostr-tools';
import { EventEmitter } from 'events';
import { Debugger } from 'debug';
import { getOutboxes } from '@satellite-earth/core/helpers/nostr/mailboxes.js';
import PubkeyRelayScrapper, { PubkeyRelayScrapperState } from './pubkey-relay-scrapper.js';
import { logger } from '../../logger.js';
type EventMap = {
event: [NostrEvent];
};
export default class PubkeyScrapper extends EventEmitter<EventMap> {
app: App;
pubkey: string;
additionalRelays: string[] = [];
log: Debugger;
private failed = new Set<string>();
relayScrappers = new Map<string, PubkeyRelayScrapper>();
constructor(app: App, pubkey: string) {
super();
this.app = app;
this.pubkey = pubkey;
this.log = logger.extend('scrapper:' + this.pubkey);
}
async ensureData() {
// get mailboxes
this.app.profileBook.loadProfile(this.pubkey);
const mailboxes = await this.app.addressBook.loadMailboxes(this.pubkey);
return { mailboxes };
}
async loadNext() {
const { mailboxes } = await this.ensureData();
const outboxes = getOutboxes(mailboxes);
const relays = [...outboxes, ...this.additionalRelays];
const scrappers: PubkeyRelayScrapper[] = [];
for (const url of relays) {
if (this.failed.has(url)) continue;
try {
let scrapper = this.relayScrappers.get(url);
if (!scrapper) {
const relay = await this.app.pool.ensureRelay(url);
scrapper = new PubkeyRelayScrapper(this.pubkey, relay);
scrapper.on('event', (event) => this.emit('event', event));
// load the state from the database
const state = await this.app.state.getMutableState<PubkeyRelayScrapperState>(
`${this.pubkey}|${relay.url}`,
{},
);
if (state) scrapper.state = state.proxy;
this.relayScrappers.set(url, scrapper);
}
scrappers.push(scrapper);
} catch (error) {
this.failed.add(url);
if (error instanceof Error) this.log(`Failed to create relay scrapper for ${url}`, error.message);
}
}
// call loadNext on the one with the latest cursor
const incomplete = scrappers
.filter((s) => !s.complete && !s.running && !s.error)
.sort((a, b) => b.cursor - a.cursor);
const next = incomplete[0];
if (next) {
await next.loadNext();
}
}
}

View File

@@ -0,0 +1,125 @@
import _throttle from 'lodash.throttle';
import { generateSecretKey } from 'nostr-tools';
import EventEmitter from 'events';
import { bytesToHex, hexToBytes } from '@noble/hashes/utils';
import webPush from 'web-push';
import crypto from 'crypto';
import fs from 'fs';
import { logger } from '../logger.js';
type Secrets = {
nostrKey: Uint8Array;
vapidPrivateKey: string;
vapidPublicKey: string;
hyperKey: Buffer;
i2pPrivateKey?: string;
i2pPublicKey?: string;
};
type RawJson = Partial<{
nostrKey: string;
vapidPrivateKey: string;
vapidPublicKey: string;
hyperKey: string;
i2pPrivateKey?: string;
i2pPublicKey?: string;
}>;
type EventMap = {
/** fires when file is loaded */
loaded: [];
/** fires when a field is set */
changed: [keyof Secrets, any];
/** fires when file is loaded or changed */
updated: [];
saved: [];
};
export default class SecretsManager extends EventEmitter<EventMap> {
log = logger.extend('SecretsManager');
protected secrets?: Secrets;
path: string;
constructor(path: string) {
super();
this.path = path;
}
get<T extends keyof Secrets>(secret: T): Secrets[T] {
if (!this.secrets) throw new Error('Secrets not loaded');
return this.secrets[secret];
}
set<T extends keyof Secrets>(secret: T, value: Secrets[T]) {
if (!this.secrets) throw new Error('Secrets not loaded');
this.secrets[secret] = value;
this.emit('changed', secret, value);
this.emit('updated');
this.write();
}
read() {
this.log('Loading secrets');
let json: Record<string, any> = {};
try {
json = JSON.parse(fs.readFileSync(this.path, { encoding: 'utf-8' }));
} catch (error) {}
let changed = false;
const secrets = {} as Secrets;
if (!json.nostrKey) {
this.log('Generating new nostr key');
secrets.nostrKey = generateSecretKey();
changed = true;
} else secrets.nostrKey = hexToBytes(json.nostrKey);
if (!json.vapidPrivateKey || !json.vapidPublicKey) {
this.log('Generating new vapid key');
const keys = webPush.generateVAPIDKeys();
secrets.vapidPrivateKey = keys.privateKey;
secrets.vapidPublicKey = keys.publicKey;
changed = true;
} else {
secrets.vapidPrivateKey = json.vapidPrivateKey;
secrets.vapidPublicKey = json.vapidPublicKey;
}
if (!json.hyperKey) {
this.log('Generating new hyper key');
secrets.hyperKey = crypto.randomBytes(32);
changed = true;
} else secrets.hyperKey = Buffer.from(json.hyperKey, 'hex');
secrets.i2pPrivateKey = json.i2pPrivateKey;
secrets.i2pPublicKey = json.i2pPublicKey;
this.secrets = secrets;
this.emit('loaded');
this.emit('updated');
if (changed) this.write();
}
write() {
if (!this.secrets) throw new Error('Secrets not loaded');
this.log('Saving');
const json: RawJson = {
nostrKey: bytesToHex(this.secrets?.nostrKey),
vapidPrivateKey: this.secrets.vapidPrivateKey,
vapidPublicKey: this.secrets.vapidPublicKey,
hyperKey: this.secrets.hyperKey?.toString('hex'),
i2pPrivateKey: this.secrets.i2pPrivateKey,
i2pPublicKey: this.secrets.i2pPublicKey,
};
fs.writeFileSync(this.path, JSON.stringify(json, null, 2), { encoding: 'utf-8' });
this.emit('saved');
}
}

View File

@@ -0,0 +1,49 @@
import { MigrationSet } from '@satellite-earth/core/sqlite';
import { Database } from 'better-sqlite3';
import { MutableState } from './mutable-state.js';
const migrations = new MigrationSet('application-state');
migrations.addScript(1, async (db, log) => {
db.prepare(
`
CREATE TABLE "application_state" (
"id" TEXT NOT NULL,
"state" TEXT,
PRIMARY KEY("id")
);
`,
).run();
log('Created application state table');
});
export default class ApplicationStateManager {
private mutableState = new Map<string, MutableState<any>>();
database: Database;
constructor(database: Database) {
this.database = database;
}
async setup() {
await migrations.run(this.database);
}
async getMutableState<T extends object>(key: string, initialState: T) {
const cached = this.mutableState.get(key);
if (cached) return cached as MutableState<T>;
const state = new MutableState<T>(this.database, key, initialState);
await state.read();
this.mutableState.set(key, state);
return state;
}
async saveAll() {
for (const [key, state] of this.mutableState) {
await state.save();
}
}
}

View File

@@ -0,0 +1,91 @@
import { EventEmitter } from 'events';
import { Database } from 'better-sqlite3';
import _throttle from 'lodash.throttle';
import { Debugger } from 'debug';
import { logger } from '../../logger.js';
type EventMap<T> = {
/** fires when file is loaded */
loaded: [T];
/** fires when a field is set */
changed: [T, string, any];
/** fires when state is loaded or changed */
updated: [T];
saved: [T];
};
export class MutableState<T extends object> extends EventEmitter<EventMap<T>> {
state?: T;
log: Debugger;
private _proxy?: T;
/** A Proxy object that will automatically save when mutated */
get proxy() {
if (!this._proxy) throw new Error('Cant access state before initialized');
return this._proxy;
}
key: string;
database: Database;
constructor(database: Database, key: string, initialState: T) {
super();
this.state = initialState;
this.key = key;
this.database = database;
this.log = logger.extend(`State:` + key);
this.createProxy();
}
private createProxy() {
if (!this.state) return;
return (this._proxy = new Proxy(this.state, {
get(target, prop, receiver) {
return Reflect.get(target, prop, receiver);
},
set: (target, p, newValue, receiver) => {
Reflect.set(target, p, newValue, receiver);
this.emit('changed', target as T, String(p), newValue);
this.emit('updated', target as T);
this.throttleSave();
return newValue;
},
}));
}
private throttleSave = _throttle(this.save.bind(this), 30_000);
async read() {
const row = await this.database
.prepare<[string], { id: string; state: string }>(`SELECT id, state FROM application_state WHERE id=?`)
.get(this.key);
const state: T | undefined = row ? (JSON.parse(row.state) as T) : undefined;
if (state && this.state) {
Object.assign(this.state, state);
this.log('Loaded');
}
if (!this.state) throw new Error(`Missing initial state for ${this.key}`);
this.createProxy();
if (this.state) {
this.emit('loaded', this.state);
this.emit('updated', this.state);
}
}
async save() {
if (!this.state) return;
await this.database
.prepare<[string, string]>(`INSERT OR REPLACE INTO application_state (id, state) VALUES (?, ?)`)
.run(this.key, JSON.stringify(this.state));
this.log('Saved');
this.emit('saved', this.state);
}
}

View File

@@ -0,0 +1,92 @@
import { RawData, WebSocket } from 'ws';
import { IncomingMessage } from 'http';
import { logger } from '../../logger.js';
import OutboundProxyWebSocket from '../network/outbound/websocket.js';
import { isHexKey } from 'applesauce-core/helpers';
import App from '../../app/index.js';
export default class Switchboard {
private app: App;
private log = logger.extend('Switchboard');
constructor(app: App) {
this.app = app;
}
public handleConnection(downstream: WebSocket, req: IncomingMessage) {
let upstream: WebSocket | undefined;
const handleMessage = async (message: RawData) => {
try {
// Parse JSON from the raw buffer
const data = JSON.parse(typeof message === 'string' ? message : message.toString('utf-8'));
if (!Array.isArray(data)) throw new Error('Message is not an array');
if (data[0] === 'PROXY' && data[1]) {
let addresses: string[];
if (isHexKey(data[1])) {
addresses = await this.app.gossip.lookup(data[1]);
} else addresses = [data[1]];
if (addresses.length === 0) {
downstream.send(JSON.stringify(['PROXY', 'ERROR', 'Lookup failed']));
return;
}
this.app.relay.disconnectSocket(downstream);
downstream.send(JSON.stringify(['PROXY', 'CONNECTING']));
let error: Error | undefined = undefined;
for (const address of addresses) {
try {
upstream = new OutboundProxyWebSocket(address);
// wait for connection
await new Promise<void>((res, rej) => {
upstream?.once('open', () => res());
upstream?.once('error', (error) => rej(error));
});
this.log(`Proxy connection to ${address}`);
// clear last error
error = undefined;
// Forward from client to target relay
downstream.on('message', (message, isBinary) => {
upstream?.send(message, { binary: isBinary });
});
// Forward back from target relay to client
upstream.on('message', (message, isBinary) => {
downstream.send(message, { binary: isBinary });
});
// connect the close events
upstream.on('close', () => downstream.close());
downstream.on('close', () => upstream?.close());
// tell downstream its connected
downstream.send(JSON.stringify(['PROXY', 'CONNECTED']));
// Step away from the connection
downstream.off('message', handleMessage);
} catch (err) {
upstream = undefined;
if (err instanceof Error) error = err;
}
}
// send the error back if we failed to connect to any address
if (error) downstream.send(JSON.stringify(['PROXY', 'ERROR', error.message]));
}
} catch (err) {
this.log('Failed to handle message', err);
}
};
downstream.on('message', handleMessage);
this.app.relay.handleConnection(downstream, req);
}
}

19
src/sidecars/hyperdht.ts Normal file
View File

@@ -0,0 +1,19 @@
import HyperDHT from 'hyperdht';
import { logger } from '../logger.js';
const log = logger.extend('HyperDHT');
let node: HyperDHT | undefined;
export function getOrCreateNode() {
if (node) return node;
log('Creating HyperDHT Node');
return (node = new HyperDHT());
}
export function destroyNode() {
if (node) {
node.destroy();
node = undefined;
}
}

24
src/types/holesail-server.d.ts vendored Normal file
View File

@@ -0,0 +1,24 @@
declare module 'holesail-server' {
import HyperDHT, { KeyPair, Server } from 'hyperdht';
type ServeArgs = {
secure?: boolean;
buffSeed?: Buffer;
port?: number;
address?: string;
};
export default class HolesailServer {
dht: HyperDHT;
server: Server | null;
seed: Buffer | null;
keyPair: KeyPair | null;
buffer: Buffer | null;
secure?: boolean;
keyPairGenerator(buffer?: Buffer): KeyPair;
serve(args: ServeArgs, callback?: () => void): void;
destroy(): 0;
getPublicKey(): string;
}
}

38
src/types/hyperdht.d.ts vendored Normal file
View File

@@ -0,0 +1,38 @@
declare module 'hyperdht' {
import type { Socket } from 'net';
import type EventEmitter from 'events';
class NoiseStreamSocket extends Socket {
remotePublicKey: Buffer;
}
export class Server extends EventEmitter<{
listening: [];
connection: [NoiseStreamSocket];
}> {
address(): { host: string; port: string; publicKey: Buffer } | null;
listen(keyPair: KeyPair): Promise<void>;
}
type KeyPair = {
publicKey: Buffer;
secretKey: Buffer;
};
export default class HyperDHT {
constructor(opts?: { keyPair: KeyPair; bootstrap?: string[] });
createServer(
opts?: {
firewall?: (removePublicKey: Buffer, remoteHandshakePayload: any) => boolean;
},
onconnection?: (socket: NoiseStreamSocket) => void,
): Server;
destroy(opts?: { force: boolean }): Promise<void>;
connect(host: Buffer, opts?: { reusableSocket: boolean }): Socket;
static keyPair(seed?: Buffer): KeyPair;
}
}

5
src/types/streamx.d.ts vendored Normal file
View File

@@ -0,0 +1,5 @@
declare module 'streamx' {
import { Duplex, Stream } from 'stream';
export function pipeline(...streams: Stream[]): Duplex;
}

15
tsconfig.json Normal file
View File

@@ -0,0 +1,15 @@
{
"compilerOptions": {
"module": "NodeNext",
"target": "ES2020",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"outDir": "dist",
"skipLibCheck": true,
"declaration": true,
"allowJs": true,
"strict": true,
"sourceMap": true
},
"include": ["src"]
}