Merge branch 'main' into search-nip05-enhance-and-postgres-support-testing

This commit is contained in:
P. Reis
2024-07-16 13:58:47 -03:00
24 changed files with 568 additions and 79 deletions

View File

@@ -108,6 +108,7 @@ import {
trendingStatusesController,
trendingTagsController,
} from '@/controllers/api/trends.ts';
import { errorHandler } from '@/controllers/error.ts';
import { metricsController } from '@/controllers/metrics.ts';
import { indexController } from '@/controllers/site.ts';
import { nodeInfoController, nodeInfoSchemaController } from '@/controllers/well-known/nodeinfo.ts';
@@ -151,18 +152,17 @@ if (Conf.cronEnabled) {
app.use('*', rateLimitMiddleware(300, Time.minutes(5)));
app.use('/api/*', logger(debug));
app.use('/.well-known/*', logger(debug));
app.use('/users/*', logger(debug));
app.use('/nodeinfo/*', logger(debug));
app.use('/oauth/*', logger(debug));
app.use('/api/*', metricsMiddleware, logger(debug));
app.use('/.well-known/*', metricsMiddleware, logger(debug));
app.use('/users/*', metricsMiddleware, logger(debug));
app.use('/nodeinfo/*', metricsMiddleware, logger(debug));
app.use('/oauth/*', metricsMiddleware, logger(debug));
app.get('/api/v1/streaming', streamingController);
app.get('/relay', relayController);
app.get('/api/v1/streaming', metricsMiddleware, streamingController);
app.get('/relay', metricsMiddleware, relayController);
app.use(
'*',
metricsMiddleware,
cspMiddleware(),
cors({ origin: '*', exposeHeaders: ['link'] }),
signerMiddleware,
@@ -340,12 +340,7 @@ app.get('/', frontendController, indexController);
// Fallback
app.get('*', publicFiles, staticFiles, frontendController);
app.onError((err, c) => {
if (err.message === 'canceling statement due to statement timeout') {
return c.json({ error: 'The server was unable to respond in a timely manner' }, 500);
}
return c.json({ error: 'Something went wrong' }, 500);
});
app.onError(errorHandler);
export default app;

View File

@@ -98,6 +98,21 @@ class Conf {
}
return undefined;
},
/** Database query timeout configurations. */
timeouts: {
/** Default query timeout when another setting isn't more specific. */
get default(): number {
return Number(Deno.env.get('DB_TIMEOUT_DEFAULT') || 5_000);
},
/** Timeout used for queries made through the Nostr relay. */
get relay(): number {
return Number(Deno.env.get('DB_TIMEOUT_RELAY') || 1_000);
},
/** Timeout used for timelines such as home, notifications, hashtag, etc. */
get timelines(): number {
return Number(Deno.env.get('DB_TIMEOUT_TIMELINES') || 15_000);
},
},
};
/** Character limit to enforce for posts made through Mastodon API. */
static get postCharLimit(): number {

View File

@@ -209,7 +209,7 @@ const accountStatusesController: AppController = async (c) => {
filter['#t'] = [tagged];
}
const opts = { signal, limit, timeout: 10_000 };
const opts = { signal, limit, timeout: Conf.db.timeouts.timelines };
const events = await store.query([filter], opts)
.then((events) => hydrateEvents({ events, store, signal }))

View File

@@ -78,7 +78,7 @@ async function renderNotifications(
const store = c.get('store');
const pubkey = await c.get('signer')?.getPublicKey()!;
const { signal } = c.req.raw;
const opts = { signal, limit: params.limit, timeout: 15_000 };
const opts = { signal, limit: params.limit, timeout: Conf.db.timeouts.timelines };
const events = await store
.query(filters, opts)

View File

@@ -97,8 +97,8 @@ const createStatusController: AppController = async (c) => {
const root = ancestor.tags.find((tag) => tag[0] === 'e' && tag[3] === 'root')?.[1] ?? ancestor.id;
tags.push(['e', root, 'root']);
tags.push(['e', data.in_reply_to_id, 'reply']);
tags.push(['e', root, Conf.relay, 'root']);
tags.push(['e', data.in_reply_to_id, Conf.relay, 'reply']);
}
if (data.quote_id) {
@@ -202,7 +202,7 @@ const deleteStatusController: AppController = async (c) => {
if (event.pubkey === pubkey) {
await createEvent({
kind: 5,
tags: [['e', id]],
tags: [['e', id, Conf.relay]],
}, c);
const author = await getAuthor(event.pubkey);
@@ -260,8 +260,8 @@ const favouriteController: AppController = async (c) => {
kind: 7,
content: '+',
tags: [
['e', target.id],
['p', target.pubkey],
['e', target.id, Conf.relay],
['p', target.pubkey, Conf.relay],
],
}, c);
@@ -302,7 +302,10 @@ const reblogStatusController: AppController = async (c) => {
const reblogEvent = await createEvent({
kind: 6,
tags: [['e', event.id], ['p', event.pubkey]],
tags: [
['e', event.id, Conf.relay],
['p', event.pubkey, Conf.relay],
],
}, c);
await hydrateEvents({
@@ -337,7 +340,7 @@ const unreblogStatusController: AppController = async (c) => {
await createEvent({
kind: 5,
tags: [['e', repostEvent.id]],
tags: [['e', repostEvent.id, Conf.relay]],
}, c);
return c.json(await renderStatus(event, { viewerPubkey: pubkey }));
@@ -389,7 +392,7 @@ const bookmarkController: AppController = async (c) => {
if (event) {
await updateListEvent(
{ kinds: [10003], authors: [pubkey], limit: 1 },
(tags) => addTag(tags, ['e', eventId]),
(tags) => addTag(tags, ['e', eventId, Conf.relay]),
c,
);
@@ -416,7 +419,7 @@ const unbookmarkController: AppController = async (c) => {
if (event) {
await updateListEvent(
{ kinds: [10003], authors: [pubkey], limit: 1 },
(tags) => deleteTag(tags, ['e', eventId]),
(tags) => deleteTag(tags, ['e', eventId, Conf.relay]),
c,
);
@@ -443,7 +446,7 @@ const pinController: AppController = async (c) => {
if (event) {
await updateListEvent(
{ kinds: [10001], authors: [pubkey], limit: 1 },
(tags) => addTag(tags, ['e', eventId]),
(tags) => addTag(tags, ['e', eventId, Conf.relay]),
c,
);
@@ -472,7 +475,7 @@ const unpinController: AppController = async (c) => {
if (event) {
await updateListEvent(
{ kinds: [10001], authors: [pubkey], limit: 1 },
(tags) => deleteTag(tags, ['e', eventId]),
(tags) => deleteTag(tags, ['e', eventId, Conf.relay]),
c,
);
@@ -516,7 +519,7 @@ const zapController: AppController = async (c) => {
lnurl = getLnurl(meta);
if (target && lnurl) {
tags.push(
['e', target.id],
['e', target.id, Conf.relay],
['p', target.pubkey],
['amount', amount.toString()],
['relays', Conf.relay],

View File

@@ -10,9 +10,10 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
import { getFeedPubkeys } from '@/queries.ts';
import { hydrateEvents } from '@/storages/hydrate.ts';
import { Storages } from '@/storages.ts';
import { bech32ToPubkey } from '@/utils.ts';
import { bech32ToPubkey, Time } from '@/utils.ts';
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts';
import TTLCache from '@isaacs/ttlcache';
const debug = Debug('ditto:streaming');
@@ -37,6 +38,11 @@ const streamSchema = z.enum([
type Stream = z.infer<typeof streamSchema>;
const LIMITER_WINDOW = Time.minutes(5);
const LIMITER_LIMIT = 100;
const limiter = new TTLCache<string, number>();
const streamingController: AppController = async (c) => {
const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol');
@@ -52,6 +58,14 @@ const streamingController: AppController = async (c) => {
return c.json({ error: 'Invalid access token' }, 401);
}
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
const store = await Storages.db();
@@ -122,6 +136,23 @@ const streamingController: AppController = async (c) => {
}
};
socket.onmessage = (e) => {
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
};
socket.onclose = () => {
streamingConnectionsGauge.dec();
controller.abort();

View File

@@ -60,7 +60,7 @@ const suggestedTimelineController: AppController = async (c) => {
async function renderStatuses(c: AppContext, filters: NostrFilter[]) {
const { signal } = c.req.raw;
const store = c.get('store');
const opts = { signal, timeout: 10_000 };
const opts = { signal, timeout: Conf.db.timeouts.timelines };
const events = await store
.query(filters, opts)

16
src/controllers/error.ts Normal file
View File

@@ -0,0 +1,16 @@
import { ErrorHandler } from '@hono/hono';
import { HTTPException } from '@hono/hono/http-exception';
export const errorHandler: ErrorHandler = (err, c) => {
if (err instanceof HTTPException) {
return c.json({ error: err.message }, err.status);
}
console.error(err);
if (err.message === 'canceling statement due to statement timeout') {
return c.json({ error: 'The server was unable to respond in a timely manner' }, 500);
}
return c.json({ error: 'Something went wrong' }, 500);
};

View File

@@ -1,3 +1,4 @@
import TTLCache from '@isaacs/ttlcache';
import {
NostrClientCLOSE,
NostrClientCOUNT,
@@ -9,17 +10,24 @@ import {
} from '@nostrify/nostrify';
import { AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
import { relayInfoController } from '@/controllers/nostr/relay-info.ts';
import { relayConnectionsGauge, relayEventCounter, relayMessageCounter } from '@/metrics.ts';
import * as pipeline from '@/pipeline.ts';
import { RelayError } from '@/RelayError.ts';
import { Storages } from '@/storages.ts';
import { Time } from '@/utils/time.ts';
/** Limit of initial events returned for a subscription. */
const FILTER_LIMIT = 100;
const LIMITER_WINDOW = Time.minutes(1);
const LIMITER_LIMIT = 300;
const limiter = new TTLCache<string, number>();
/** Set up the Websocket connection. */
function connectStream(socket: WebSocket) {
function connectStream(socket: WebSocket, ip: string | undefined) {
const controllers = new Map<string, AbortController>();
socket.onopen = () => {
@@ -27,6 +35,21 @@ function connectStream(socket: WebSocket) {
};
socket.onmessage = (e) => {
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
const result = n.json().pipe(n.clientMsg()).safeParse(e.data);
if (result.success) {
relayMessageCounter.inc({ verb: result.data[0] });
@@ -73,7 +96,7 @@ function connectStream(socket: WebSocket) {
const pubsub = await Storages.pubsub();
try {
for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: 1000 })) {
for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: Conf.db.timeouts.relay })) {
send(['EVENT', subId, event]);
}
} catch (e) {
@@ -128,7 +151,7 @@ function connectStream(socket: WebSocket) {
/** Handle COUNT. Return the number of events matching the filters. */
async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise<void> {
const store = await Storages.db();
const { count } = await store.count(filters, { timeout: 100 });
const { count } = await store.count(filters, { timeout: Conf.db.timeouts.relay });
send(['COUNT', subId, { count, approximate: false }]);
}
@@ -152,8 +175,16 @@ const relayController: AppController = (c, next) => {
return c.text('Please use a Nostr client to connect.', 400);
}
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 });
connectStream(socket);
connectStream(socket, ip);
return response;
};

View File

@@ -39,14 +39,14 @@ export class DittoDB {
static get poolSize(): number {
if (Conf.db.dialect === 'postgres') {
return DittoPostgres.getPool().size;
return DittoPostgres.poolSize;
}
return 1;
}
static get availableConnections(): number {
if (Conf.db.dialect === 'postgres') {
return DittoPostgres.getPool().available;
return DittoPostgres.availableConnections;
}
return 1;
}

View File

@@ -1,6 +1,6 @@
import { Kysely, PostgresAdapter, PostgresIntrospector, PostgresQueryCompiler } from 'kysely';
import { PostgreSQLDriver } from 'kysely_deno_postgres';
import { Pool } from 'postgres';
import { Kysely } from 'kysely';
import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js';
import postgres from 'postgres';
import { Conf } from '@/config.ts';
import { DittoTables } from '@/db/DittoTables.ts';
@@ -8,37 +8,31 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPostgres {
static db: Kysely<DittoTables> | undefined;
static pool: Pool | undefined;
static getPool(): Pool {
if (!this.pool) {
this.pool = new Pool(Conf.databaseUrl, Conf.pg.poolSize, true);
}
return this.pool;
}
static postgres?: postgres.Sql;
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.postgres) {
this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize });
}
if (!this.db) {
this.db = new Kysely({
dialect: {
createAdapter() {
return new PostgresAdapter();
},
createDriver() {
return new PostgreSQLDriver(DittoPostgres.getPool());
},
createIntrospector(db: Kysely<unknown>) {
return new PostgresIntrospector(db);
},
createQueryCompiler() {
return new PostgresQueryCompiler();
},
},
dialect: new PostgresJSDialect({
postgres: this.postgres as unknown as PostgresJSDialectConfig['postgres'],
}),
log: KyselyLogger,
});
}
return this.db;
}
static get poolSize() {
return this.postgres?.connections.open ?? 0;
}
static get availableConnections(): number {
return this.postgres?.connections.idle ?? 0;
}
}

View File

@@ -6,6 +6,12 @@ export const httpRequestCounter = new Counter({
labelNames: ['method'],
});
export const httpResponseCounter = new Counter({
name: 'http_responses_total',
help: 'Total number of HTTP responses',
labelNames: ['status', 'path'],
});
export const streamingConnectionsGauge = new Gauge({
name: 'streaming_connections',
help: 'Number of active connections to the streaming API',

View File

@@ -1,10 +1,14 @@
import { MiddlewareHandler } from '@hono/hono';
import { httpRequestCounter } from '@/metrics.ts';
import { httpRequestCounter, httpResponseCounter } from '@/metrics.ts';
export const metricsMiddleware: MiddlewareHandler = async (c, next) => {
const { method } = c.req;
httpRequestCounter.inc({ method });
await next();
const { status } = c.res;
const path = c.req.matchedRoutes.find((r) => r.method !== 'ALL')?.path ?? c.req.routePath;
httpResponseCounter.inc({ status, path });
};

View File

@@ -1,4 +1,5 @@
// deno-lint-ignore-file require-await
import { HTTPException } from '@hono/hono/http-exception';
import { NConnectSigner, NostrEvent, NostrSigner } from '@nostrify/nostrify';
import { Storages } from '@/storages.ts';
@@ -27,30 +28,78 @@ export class ConnectSigner implements NostrSigner {
async signEvent(event: Omit<NostrEvent, 'id' | 'pubkey' | 'sig'>): Promise<NostrEvent> {
const signer = await this.signer;
return signer.signEvent(event);
try {
return await signer.signEvent(event);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, { message: 'The event was not signed quickly enough' });
} else {
throw e;
}
}
}
readonly nip04 = {
encrypt: async (pubkey: string, plaintext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip04.encrypt(pubkey, plaintext);
try {
return await signer.nip04.encrypt(pubkey, plaintext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not encrypted quickly enough',
});
} else {
throw e;
}
}
},
decrypt: async (pubkey: string, ciphertext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip04.decrypt(pubkey, ciphertext);
try {
return await signer.nip04.decrypt(pubkey, ciphertext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not decrypted quickly enough',
});
} else {
throw e;
}
}
},
};
readonly nip44 = {
encrypt: async (pubkey: string, plaintext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip44.encrypt(pubkey, plaintext);
try {
return await signer.nip44.encrypt(pubkey, plaintext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not encrypted quickly enough',
});
} else {
throw e;
}
}
},
decrypt: async (pubkey: string, ciphertext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip44.decrypt(pubkey, ciphertext);
try {
return await signer.nip44.decrypt(pubkey, ciphertext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not decrypted quickly enough',
});
} else {
throw e;
}
}
},
};

View File

@@ -7,7 +7,7 @@ export class ReadOnlySigner implements NostrSigner {
async signEvent(): Promise<NostrEvent> {
throw new HTTPException(401, {
message: 'Log out and back in',
message: 'Log in with Nostr Connect to sign events',
});
}

View File

@@ -64,7 +64,7 @@ class EventsDB implements NStore {
await this.deleteEventsAdmin(event);
try {
await this.store.event(event, { ...opts, timeout: opts.timeout ?? 1000 });
await this.store.event(event, { ...opts, timeout: opts.timeout ?? Conf.db.timeouts.default });
} catch (e) {
if (e.message === 'Cannot add a deleted event') {
throw new RelayError('blocked', 'event deleted by user');
@@ -164,7 +164,7 @@ class EventsDB implements NStore {
this.console.debug('REQ', JSON.stringify(filters));
return this.store.query(filters, { ...opts, timeout: opts.timeout ?? 1000 });
return this.store.query(filters, { ...opts, timeout: opts.timeout ?? Conf.db.timeouts.default });
}
/** Delete events based on filters from the database. */
@@ -172,7 +172,7 @@ class EventsDB implements NStore {
if (!filters.length) return Promise.resolve();
this.console.debug('DELETE', JSON.stringify(filters));
return this.store.remove(filters, { ...opts, timeout: opts.timeout ?? 3000 });
return this.store.remove(filters, { ...opts, timeout: opts.timeout ?? Conf.db.timeouts.default });
}
/** Get number of events that would be returned by filters. */
@@ -185,7 +185,7 @@ class EventsDB implements NStore {
this.console.debug('COUNT', JSON.stringify(filters));
return this.store.count(filters, { ...opts, timeout: opts.timeout ?? 500 });
return this.store.count(filters, { ...opts, timeout: opts.timeout ?? Conf.db.timeouts.default });
}
/** Return only the tags that should be indexed. */
@@ -253,6 +253,8 @@ class EventsDB implements NStore {
/** Converts filters to more performant, simpler filters that are better for SQLite. */
async expandFilters(filters: NostrFilter[]): Promise<NostrFilter[]> {
filters = structuredClone(filters);
for (const filter of filters) {
if (filter.search) {
const tokens = NIP50.parseInput(filter.search);
@@ -282,6 +284,12 @@ class EventsDB implements NStore {
filter.search = tokens.filter((t) => typeof t === 'string').join(' ');
}
if (filter.kinds) {
// Ephemeral events are not stored, so don't bother querying for them.
// If this results in an empty kinds array, NDatabase will remove the filter before querying and return no results.
filter.kinds = filter.kinds.filter((kind) => !NKinds.ephemeral(kind));
}
}
return filters;

View File

@@ -168,3 +168,7 @@ export const createTestDB = async (databaseUrl?: string) => {
},
};
};
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}