mirror of
https://github.com/aljazceru/ditto.git
synced 2026-01-20 13:54:31 +01:00
Merge branch 'main' into 'use-postgres-js'
# Conflicts: # deno.json # src/db/adapters/DittoPostgres.ts
This commit is contained in:
18
src/app.ts
18
src/app.ts
@@ -108,6 +108,7 @@ import {
|
||||
trendingStatusesController,
|
||||
trendingTagsController,
|
||||
} from '@/controllers/api/trends.ts';
|
||||
import { errorHandler } from '@/controllers/error.ts';
|
||||
import { metricsController } from '@/controllers/metrics.ts';
|
||||
import { indexController } from '@/controllers/site.ts';
|
||||
import { nodeInfoController, nodeInfoSchemaController } from '@/controllers/well-known/nodeinfo.ts';
|
||||
@@ -151,18 +152,17 @@ if (Conf.cronEnabled) {
|
||||
|
||||
app.use('*', rateLimitMiddleware(300, Time.minutes(5)));
|
||||
|
||||
app.use('/api/*', logger(debug));
|
||||
app.use('/.well-known/*', logger(debug));
|
||||
app.use('/users/*', logger(debug));
|
||||
app.use('/nodeinfo/*', logger(debug));
|
||||
app.use('/oauth/*', logger(debug));
|
||||
app.use('/api/*', metricsMiddleware, logger(debug));
|
||||
app.use('/.well-known/*', metricsMiddleware, logger(debug));
|
||||
app.use('/users/*', metricsMiddleware, logger(debug));
|
||||
app.use('/nodeinfo/*', metricsMiddleware, logger(debug));
|
||||
app.use('/oauth/*', metricsMiddleware, logger(debug));
|
||||
|
||||
app.get('/api/v1/streaming', streamingController);
|
||||
app.get('/relay', relayController);
|
||||
app.get('/api/v1/streaming', metricsMiddleware, streamingController);
|
||||
app.get('/relay', metricsMiddleware, relayController);
|
||||
|
||||
app.use(
|
||||
'*',
|
||||
metricsMiddleware,
|
||||
cspMiddleware(),
|
||||
cors({ origin: '*', exposeHeaders: ['link'] }),
|
||||
signerMiddleware,
|
||||
@@ -340,6 +340,8 @@ app.get('/', frontendController, indexController);
|
||||
// Fallback
|
||||
app.get('*', publicFiles, staticFiles, frontendController);
|
||||
|
||||
app.onError(errorHandler);
|
||||
|
||||
export default app;
|
||||
|
||||
export type { AppContext, AppController, AppMiddleware };
|
||||
|
||||
@@ -209,7 +209,9 @@ const accountStatusesController: AppController = async (c) => {
|
||||
filter['#t'] = [tagged];
|
||||
}
|
||||
|
||||
const events = await store.query([filter], { signal })
|
||||
const opts = { signal, limit, timeout: 10_000 };
|
||||
|
||||
const events = await store.query([filter], opts)
|
||||
.then((events) => hydrateEvents({ events, store, signal }))
|
||||
.then((events) => {
|
||||
if (exclude_replies) {
|
||||
|
||||
@@ -78,7 +78,7 @@ async function renderNotifications(
|
||||
const store = c.get('store');
|
||||
const pubkey = await c.get('signer')?.getPublicKey()!;
|
||||
const { signal } = c.req.raw;
|
||||
const opts = { signal, limit: params.limit };
|
||||
const opts = { signal, limit: params.limit, timeout: 15_000 };
|
||||
|
||||
const events = await store
|
||||
.query(filters, opts)
|
||||
|
||||
@@ -8,14 +8,21 @@ import { z } from 'zod';
|
||||
import { type AppController } from '@/app.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { getAmount } from '@/utils/bolt11.ts';
|
||||
import { getAncestors, getAuthor, getDescendants, getEvent } from '@/queries.ts';
|
||||
import { getUnattachedMediaByIds } from '@/db/unattached-media.ts';
|
||||
import { renderEventAccounts } from '@/views.ts';
|
||||
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { createEvent, paginated, paginationSchema, parseBody, updateListEvent } from '@/utils/api.ts';
|
||||
import {
|
||||
createEvent,
|
||||
listPaginationSchema,
|
||||
paginated,
|
||||
paginatedList,
|
||||
paginationSchema,
|
||||
parseBody,
|
||||
updateListEvent,
|
||||
} from '@/utils/api.ts';
|
||||
import { getInvoice, getLnurl } from '@/utils/lnurl.ts';
|
||||
import { lookupPubkey } from '@/utils/lookup.ts';
|
||||
import { addTag, deleteTag } from '@/utils/tags.ts';
|
||||
@@ -545,33 +552,26 @@ const zapController: AppController = async (c) => {
|
||||
|
||||
const zappedByController: AppController = async (c) => {
|
||||
const id = c.req.param('id');
|
||||
const params = listPaginationSchema.parse(c.req.query());
|
||||
const store = await Storages.db();
|
||||
const amountSchema = z.coerce.number().int().nonnegative().catch(0);
|
||||
const db = await DittoDB.getInstance();
|
||||
|
||||
const events = (await store.query([{ kinds: [9735], '#e': [id], limit: 100 }])).map((event) => {
|
||||
const zapRequestString = event.tags.find(([name]) => name === 'description')?.[1];
|
||||
if (!zapRequestString) return;
|
||||
try {
|
||||
const zapRequest = n.json().pipe(n.event()).parse(zapRequestString);
|
||||
const amount = zapRequest?.tags.find(([name]: any) => name === 'amount')?.[1];
|
||||
if (!amount) {
|
||||
const amount = getAmount(event?.tags.find(([name]) => name === 'bolt11')?.[1]);
|
||||
if (!amount) return;
|
||||
zapRequest.tags.push(['amount', amount]);
|
||||
}
|
||||
return zapRequest;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
}).filter(Boolean) as DittoEvent[];
|
||||
const zaps = await db.selectFrom('event_zaps')
|
||||
.selectAll()
|
||||
.where('target_event_id', '=', id)
|
||||
.orderBy('amount_millisats', 'desc')
|
||||
.limit(params.limit)
|
||||
.offset(params.offset).execute();
|
||||
|
||||
await hydrateEvents({ events, store });
|
||||
const authors = await store.query([{ kinds: [0], authors: zaps.map((zap) => zap.sender_pubkey) }]);
|
||||
|
||||
const results = (await Promise.all(
|
||||
events.map(async (event) => {
|
||||
const amount = amountSchema.parse(event.tags.find(([name]) => name === 'amount')?.[1]);
|
||||
const comment = event?.content ?? '';
|
||||
const account = event?.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey);
|
||||
zaps.map(async (zap) => {
|
||||
const amount = zap.amount_millisats;
|
||||
const comment = zap.comment;
|
||||
|
||||
const sender = authors.find((author) => author.pubkey === zap.sender_pubkey);
|
||||
const account = sender ? await renderAccount(sender) : await accountFromPubkey(zap.sender_pubkey);
|
||||
|
||||
return {
|
||||
comment,
|
||||
@@ -581,7 +581,7 @@ const zappedByController: AppController = async (c) => {
|
||||
}),
|
||||
)).filter(Boolean);
|
||||
|
||||
return c.json(results);
|
||||
return paginatedList(c, params, results);
|
||||
};
|
||||
|
||||
export {
|
||||
|
||||
@@ -60,9 +60,10 @@ const suggestedTimelineController: AppController = async (c) => {
|
||||
async function renderStatuses(c: AppContext, filters: NostrFilter[]) {
|
||||
const { signal } = c.req.raw;
|
||||
const store = c.get('store');
|
||||
const opts = { signal, timeout: 10_000 };
|
||||
|
||||
const events = await store
|
||||
.query(filters, { signal })
|
||||
.query(filters, opts)
|
||||
.then((events) => hydrateEvents({ events, store, signal }));
|
||||
|
||||
if (!events.length) {
|
||||
|
||||
11
src/controllers/error.ts
Normal file
11
src/controllers/error.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { ErrorHandler } from '@hono/hono';
|
||||
|
||||
export const errorHandler: ErrorHandler = (err, c) => {
|
||||
console.error(err);
|
||||
|
||||
if (err.message === 'canceling statement due to statement timeout') {
|
||||
return c.json({ error: 'The server was unable to respond in a timely manner' }, 500);
|
||||
}
|
||||
|
||||
return c.json({ error: 'Something went wrong' }, 500);
|
||||
};
|
||||
@@ -73,11 +73,15 @@ function connectStream(socket: WebSocket) {
|
||||
const pubsub = await Storages.pubsub();
|
||||
|
||||
try {
|
||||
for (const event of await store.query(filters, { limit: FILTER_LIMIT })) {
|
||||
for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: 1000 })) {
|
||||
send(['EVENT', subId, event]);
|
||||
}
|
||||
} catch (e) {
|
||||
send(['CLOSED', subId, e.message]);
|
||||
if (e instanceof RelayError) {
|
||||
send(['CLOSED', subId, e.message]);
|
||||
} else {
|
||||
send(['CLOSED', subId, 'error: something went wrong']);
|
||||
}
|
||||
controllers.delete(subId);
|
||||
return;
|
||||
}
|
||||
@@ -124,7 +128,7 @@ function connectStream(socket: WebSocket) {
|
||||
/** Handle COUNT. Return the number of events matching the filters. */
|
||||
async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise<void> {
|
||||
const store = await Storages.db();
|
||||
const { count } = await store.count(filters);
|
||||
const { count } = await store.count(filters, { timeout: 100 });
|
||||
send(['COUNT', subId, { count, approximate: false }]);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ export interface DittoTables {
|
||||
author_stats: AuthorStatsRow;
|
||||
event_stats: EventStatsRow;
|
||||
pubkey_domains: PubkeyDomainRow;
|
||||
event_zaps: EventZapRow;
|
||||
}
|
||||
|
||||
interface AuthorStatsRow {
|
||||
@@ -69,3 +70,11 @@ interface PubkeyDomainRow {
|
||||
domain: string;
|
||||
last_updated_at: number;
|
||||
}
|
||||
|
||||
interface EventZapRow {
|
||||
receipt_id: string;
|
||||
target_event_id: string;
|
||||
sender_pubkey: string;
|
||||
amount_millisats: number;
|
||||
comment: string;
|
||||
}
|
||||
|
||||
32
src/db/migrations/027_add_zap_events.ts
Normal file
32
src/db/migrations/027_add_zap_events.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await db.schema
|
||||
.createTable('event_zaps')
|
||||
.addColumn('receipt_id', 'text', (col) => col.primaryKey())
|
||||
.addColumn('target_event_id', 'text', (col) => col.notNull())
|
||||
.addColumn('sender_pubkey', 'text', (col) => col.notNull())
|
||||
.addColumn('amount_millisats', 'integer', (col) => col.notNull())
|
||||
.addColumn('comment', 'text', (col) => col.notNull())
|
||||
.execute();
|
||||
|
||||
await db.schema
|
||||
.createIndex('idx_event_zaps_amount_millisats')
|
||||
.on('event_zaps')
|
||||
.column('amount_millisats')
|
||||
.ifNotExists()
|
||||
.execute();
|
||||
|
||||
await db.schema
|
||||
.createIndex('idx_event_zaps_target_event_id')
|
||||
.on('event_zaps')
|
||||
.column('target_event_id')
|
||||
.ifNotExists()
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await db.schema.dropIndex('idx_event_zaps_amount_millisats').ifExists().execute();
|
||||
await db.schema.dropIndex('idx_event_zaps_target_event_id').ifExists().execute();
|
||||
await db.schema.dropTable('event_zaps').execute();
|
||||
}
|
||||
@@ -4,7 +4,7 @@ import { firehoseEventCounter } from '@/metrics.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { nostrNow } from '@/utils.ts';
|
||||
|
||||
import * as pipeline from './pipeline.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
|
||||
const console = new Stickynotes('ditto:firehose');
|
||||
|
||||
|
||||
@@ -6,6 +6,12 @@ export const httpRequestCounter = new Counter({
|
||||
labelNames: ['method'],
|
||||
});
|
||||
|
||||
export const httpResponseCounter = new Counter({
|
||||
name: 'http_responses_total',
|
||||
help: 'Total number of HTTP responses',
|
||||
labelNames: ['status', 'path'],
|
||||
});
|
||||
|
||||
export const streamingConnectionsGauge = new Gauge({
|
||||
name: 'streaming_connections',
|
||||
help: 'Number of active connections to the streaming API',
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import { MiddlewareHandler } from '@hono/hono';
|
||||
|
||||
import { httpRequestCounter } from '@/metrics.ts';
|
||||
import { httpRequestCounter, httpResponseCounter } from '@/metrics.ts';
|
||||
|
||||
export const metricsMiddleware: MiddlewareHandler = async (c, next) => {
|
||||
const { method } = c.req;
|
||||
httpRequestCounter.inc({ method });
|
||||
|
||||
await next();
|
||||
|
||||
const { status } = c.res;
|
||||
const path = c.req.matchedRoutes.find((r) => r.method !== 'ALL')?.path ?? c.req.routePath;
|
||||
httpResponseCounter.inc({ status, path });
|
||||
};
|
||||
|
||||
125
src/pipeline.test.ts
Normal file
125
src/pipeline.test.ts
Normal file
@@ -0,0 +1,125 @@
|
||||
import { assertEquals } from '@std/assert';
|
||||
import { generateSecretKey } from 'nostr-tools';
|
||||
|
||||
import { genEvent, getTestDB } from '@/test.ts';
|
||||
import { handleZaps } from '@/pipeline.ts';
|
||||
|
||||
Deno.test('store one zap receipt in nostr_events; convert it into event_zaps table format and store it', async () => {
|
||||
await using db = await getTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
||||
const event = genEvent({
|
||||
'id': '67b48a14fb66c60c8f9070bdeb37afdfcc3d08ad01989460448e4081eddda446',
|
||||
'pubkey': '9630f464cca6a5147aa8a35f0bcdd3ce485324e732fd39e09233b1d848238f31',
|
||||
'created_at': 1674164545,
|
||||
'kind': 9735,
|
||||
'tags': [
|
||||
['p', '32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245'],
|
||||
['P', '97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322'],
|
||||
['e', '3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8'],
|
||||
[
|
||||
'bolt11',
|
||||
'lnbc10u1p3unwfusp5t9r3yymhpfqculx78u027lxspgxcr2n2987mx2j55nnfs95nxnzqpp5jmrh92pfld78spqs78v9euf2385t83uvpwk9ldrlvf6ch7tpascqhp5zvkrmemgth3tufcvflmzjzfvjt023nazlhljz2n9hattj4f8jq8qxqyjw5qcqpjrzjqtc4fc44feggv7065fqe5m4ytjarg3repr5j9el35xhmtfexc42yczarjuqqfzqqqqqqqqlgqqqqqqgq9q9qxpqysgq079nkq507a5tw7xgttmj4u990j7wfggtrasah5gd4ywfr2pjcn29383tphp4t48gquelz9z78p4cq7ml3nrrphw5w6eckhjwmhezhnqpy6gyf0',
|
||||
],
|
||||
[
|
||||
'description',
|
||||
'{"pubkey":"97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322","content":"","id":"d9cc14d50fcb8c27539aacf776882942c1a11ea4472f8cdec1dea82fab66279d","created_at":1674164539,"sig":"77127f636577e9029276be060332ea565deaf89ff215a494ccff16ae3f757065e2bc59b2e8c113dd407917a010b3abd36c8d7ad84c0e3ab7dab3a0b0caa9835d","kind":9734,"tags":[["e","3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["relays","wss://relay.damus.io","wss://nostr-relay.wlvs.space","wss://nostr.fmt.wiz.biz","wss://relay.nostr.bg","wss://nostr.oxtr.dev","wss://nostr.v0l.io","wss://brb.io","wss://nostr.bitcoiner.social","ws://monad.jb55.com:8080","wss://relay.snort.social"]]}',
|
||||
],
|
||||
['preimage', '5d006d2cf1e73c7148e7519a4c68adc81642ce0e25a432b2434c99f97344c15f'],
|
||||
],
|
||||
'content': '',
|
||||
}, sk);
|
||||
|
||||
await db.store.event(event);
|
||||
|
||||
await handleZaps(kysely, event);
|
||||
await handleZaps(kysely, event);
|
||||
|
||||
const zapReceipts = await kysely.selectFrom('nostr_events').selectAll().execute();
|
||||
const customEventZaps = await kysely.selectFrom('event_zaps').selectAll().execute();
|
||||
|
||||
assertEquals(zapReceipts.length, 1); // basic check
|
||||
assertEquals(customEventZaps.length, 1); // basic check
|
||||
|
||||
const expected = {
|
||||
receipt_id: event.id,
|
||||
target_event_id: '3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8',
|
||||
sender_pubkey: '97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322',
|
||||
amount_millisats: 1000000,
|
||||
comment: '',
|
||||
};
|
||||
|
||||
assertEquals(customEventZaps[0], expected);
|
||||
});
|
||||
|
||||
// The function tests below only handle the edge cases and don't assert anything
|
||||
// If no error happens = ok
|
||||
|
||||
Deno.test('zap receipt does not have a "description" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
||||
const event = genEvent({ kind: 9735 }, sk);
|
||||
|
||||
await handleZaps(kysely, event);
|
||||
|
||||
// no error happened = ok
|
||||
});
|
||||
|
||||
Deno.test('zap receipt does not have a zap request stringified value in the "description" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
||||
const event = genEvent({ kind: 9735, tags: [['description', 'yolo']] }, sk);
|
||||
|
||||
await handleZaps(kysely, event);
|
||||
|
||||
// no error happened = ok
|
||||
});
|
||||
|
||||
Deno.test('zap receipt does not have a "bolt11" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
||||
const event = genEvent({
|
||||
kind: 9735,
|
||||
tags: [[
|
||||
'description',
|
||||
'{"pubkey":"97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322","content":"","id":"d9cc14d50fcb8c27539aacf776882942c1a11ea4472f8cdec1dea82fab66279d","created_at":1674164539,"sig":"77127f636577e9029276be060332ea565deaf89ff215a494ccff16ae3f757065e2bc59b2e8c113dd407917a010b3abd36c8d7ad84c0e3ab7dab3a0b0caa9835d","kind":9734,"tags":[["e","3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["relays","wss://relay.damus.io","wss://nostr-relay.wlvs.space","wss://nostr.fmt.wiz.biz","wss://relay.nostr.bg","wss://nostr.oxtr.dev","wss://nostr.v0l.io","wss://brb.io","wss://nostr.bitcoiner.social","ws://monad.jb55.com:8080","wss://relay.snort.social"]]}',
|
||||
]],
|
||||
}, sk);
|
||||
|
||||
await handleZaps(kysely, event);
|
||||
|
||||
// no error happened = ok
|
||||
});
|
||||
|
||||
Deno.test('zap request inside zap receipt does not have an "e" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
||||
const event = genEvent({
|
||||
kind: 9735,
|
||||
tags: [[
|
||||
'bolt11',
|
||||
'lnbc10u1p3unwfusp5t9r3yymhpfqculx78u027lxspgxcr2n2987mx2j55nnfs95nxnzqpp5jmrh92pfld78spqs78v9euf2385t83uvpwk9ldrlvf6ch7tpascqhp5zvkrmemgth3tufcvflmzjzfvjt023nazlhljz2n9hattj4f8jq8qxqyjw5qcqpjrzjqtc4fc44feggv7065fqe5m4ytjarg3repr5j9el35xhmtfexc42yczarjuqqfzqqqqqqqqlgqqqqqqgq9q9qxpqysgq079nkq507a5tw7xgttmj4u990j7wfggtrasah5gd4ywfr2pjcn29383tphp4t48gquelz9z78p4cq7ml3nrrphw5w6eckhjwmhezhnqpy6gyf0',
|
||||
], [
|
||||
'description',
|
||||
'{"pubkey":"97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322","content":"","id":"d9cc14d50fcb8c27539aacf776882942c1a11ea4472f8cdec1dea82fab66279d","created_at":1674164539,"sig":"77127f636577e9029276be060332ea565deaf89ff215a494ccff16ae3f757065e2bc59b2e8c113dd407917a010b3abd36c8d7ad84c0e3ab7dab3a0b0caa9835d","kind":9734,"tags":[["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["relays","wss://relay.damus.io","wss://nostr-relay.wlvs.space","wss://nostr.fmt.wiz.biz","wss://relay.nostr.bg","wss://nostr.oxtr.dev","wss://nostr.v0l.io","wss://brb.io","wss://nostr.bitcoiner.social","ws://monad.jb55.com:8080","wss://relay.snort.social"]]}',
|
||||
]],
|
||||
}, sk);
|
||||
|
||||
await handleZaps(kysely, event);
|
||||
|
||||
// no error happened = ok
|
||||
});
|
||||
@@ -1,7 +1,8 @@
|
||||
import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify';
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
import { sql } from 'kysely';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
import { LRUCache } from 'lru-cache';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
@@ -18,6 +19,8 @@ import { verifyEventWorker } from '@/workers/verify.ts';
|
||||
import { nip05Cache } from '@/utils/nip05.ts';
|
||||
import { updateStats } from '@/utils/stats.ts';
|
||||
import { getTagSet } from '@/utils/tags.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { getAmount } from '@/utils/bolt11.ts';
|
||||
|
||||
const debug = Debug('ditto:pipeline');
|
||||
|
||||
@@ -51,8 +54,11 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
||||
throw new RelayError('blocked', 'user is disabled');
|
||||
}
|
||||
|
||||
const kysely = await DittoDB.getInstance();
|
||||
|
||||
await Promise.all([
|
||||
storeEvent(event, signal),
|
||||
handleZaps(kysely, event),
|
||||
parseMetadata(event, signal),
|
||||
generateSetEvents(event),
|
||||
processMedia(event),
|
||||
@@ -110,7 +116,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
|
||||
}
|
||||
|
||||
/** Maybe store the event, if eligible. */
|
||||
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<void> {
|
||||
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> {
|
||||
if (NKinds.ephemeral(event.kind)) return;
|
||||
const store = await Storages.db();
|
||||
const kysely = await DittoDB.getInstance();
|
||||
@@ -220,4 +226,33 @@ async function generateSetEvents(event: NostrEvent): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
export { handleEvent };
|
||||
/** Stores the event in the 'event_zaps' table */
|
||||
async function handleZaps(kysely: Kysely<DittoTables>, event: NostrEvent) {
|
||||
if (event.kind !== 9735) return;
|
||||
|
||||
const zapRequestString = event?.tags?.find(([name]) => name === 'description')?.[1];
|
||||
if (!zapRequestString) return;
|
||||
const zapRequest = n.json().pipe(n.event()).optional().catch(undefined).parse(zapRequestString);
|
||||
if (!zapRequest) return;
|
||||
|
||||
const amountSchema = z.coerce.number().int().nonnegative().catch(0);
|
||||
const amount_millisats = amountSchema.parse(getAmount(event?.tags.find(([name]) => name === 'bolt11')?.[1]));
|
||||
if (!amount_millisats || amount_millisats < 1) return;
|
||||
|
||||
const zappedEventId = zapRequest.tags.find(([name]) => name === 'e')?.[1];
|
||||
if (!zappedEventId) return;
|
||||
|
||||
try {
|
||||
await kysely.insertInto('event_zaps').values({
|
||||
receipt_id: event.id,
|
||||
target_event_id: zappedEventId,
|
||||
sender_pubkey: zapRequest.pubkey,
|
||||
amount_millisats,
|
||||
comment: zapRequest.content,
|
||||
}).execute();
|
||||
} catch {
|
||||
// receipt_id is unique, do nothing
|
||||
}
|
||||
}
|
||||
|
||||
export { handleEvent, handleZaps };
|
||||
|
||||
@@ -191,3 +191,33 @@ Deno.test('inserting replaceable events', async () => {
|
||||
await eventsDB.event(newerEvent);
|
||||
assertEquals(await eventsDB.query([{ kinds: [0] }]), [newerEvent]);
|
||||
});
|
||||
|
||||
Deno.test("throws a RelayError when querying an event with a large 'since'", async () => {
|
||||
const { eventsDB } = await createDB();
|
||||
|
||||
await assertRejects(
|
||||
() => eventsDB.query([{ since: 33333333333333 }]),
|
||||
RelayError,
|
||||
'since filter too far into the future',
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("throws a RelayError when querying an event with a large 'until'", async () => {
|
||||
const { eventsDB } = await createDB();
|
||||
|
||||
await assertRejects(
|
||||
() => eventsDB.query([{ until: 66666666666666 }]),
|
||||
RelayError,
|
||||
'until filter too far into the future',
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("throws a RelayError when querying an event with a large 'kind'", async () => {
|
||||
const { eventsDB } = await createDB();
|
||||
|
||||
await assertRejects(
|
||||
() => eventsDB.query([{ kinds: [99999999999999] }]),
|
||||
RelayError,
|
||||
'kind filter too far into the future',
|
||||
);
|
||||
});
|
||||
|
||||
@@ -45,13 +45,14 @@ class EventsDB implements NStore {
|
||||
constructor(private kysely: Kysely<DittoTables>) {
|
||||
this.store = new NDatabase(kysely, {
|
||||
fts: Conf.db.dialect,
|
||||
timeoutStrategy: Conf.db.dialect === 'postgres' ? 'setStatementTimeout' : undefined,
|
||||
indexTags: EventsDB.indexTags,
|
||||
searchText: EventsDB.searchText,
|
||||
});
|
||||
}
|
||||
|
||||
/** Insert an event (and its tags) into the database. */
|
||||
async event(event: NostrEvent, _opts?: { signal?: AbortSignal }): Promise<void> {
|
||||
async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
|
||||
event = purifyEvent(event);
|
||||
this.console.debug('EVENT', JSON.stringify(event));
|
||||
dbEventCounter.inc({ kind: event.kind });
|
||||
@@ -63,7 +64,7 @@ class EventsDB implements NStore {
|
||||
await this.deleteEventsAdmin(event);
|
||||
|
||||
try {
|
||||
await this.store.event(event);
|
||||
await this.store.event(event, { ...opts, timeout: opts.timeout ?? 1000 });
|
||||
} catch (e) {
|
||||
if (e.message === 'Cannot add a deleted event') {
|
||||
throw new RelayError('blocked', 'event deleted by user');
|
||||
@@ -137,20 +138,23 @@ class EventsDB implements NStore {
|
||||
}
|
||||
|
||||
/** Get events for filters from the database. */
|
||||
async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> {
|
||||
async query(
|
||||
filters: NostrFilter[],
|
||||
opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {},
|
||||
): Promise<NostrEvent[]> {
|
||||
filters = await this.expandFilters(filters);
|
||||
dbQueryCounter.inc();
|
||||
|
||||
for (const filter of filters) {
|
||||
if (filter.since && filter.since >= 2_147_483_647) {
|
||||
throw new Error('since filter too far into the future');
|
||||
throw new RelayError('invalid', 'since filter too far into the future');
|
||||
}
|
||||
if (filter.until && filter.until >= 2_147_483_647) {
|
||||
throw new Error('until filter too far into the future');
|
||||
throw new RelayError('invalid', 'until filter too far into the future');
|
||||
}
|
||||
for (const kind of filter.kinds ?? []) {
|
||||
if (kind >= 2_147_483_647) {
|
||||
throw new Error('kind filter too far into the future');
|
||||
throw new RelayError('invalid', 'kind filter too far into the future');
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,28 +164,28 @@ class EventsDB implements NStore {
|
||||
|
||||
this.console.debug('REQ', JSON.stringify(filters));
|
||||
|
||||
return this.store.query(filters, opts);
|
||||
return this.store.query(filters, { ...opts, timeout: opts.timeout ?? 1000 });
|
||||
}
|
||||
|
||||
/** Delete events based on filters from the database. */
|
||||
async remove(filters: NostrFilter[], _opts?: { signal?: AbortSignal }): Promise<void> {
|
||||
async remove(filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
|
||||
if (!filters.length) return Promise.resolve();
|
||||
this.console.debug('DELETE', JSON.stringify(filters));
|
||||
|
||||
return this.store.remove(filters);
|
||||
return this.store.remove(filters, { ...opts, timeout: opts.timeout ?? 3000 });
|
||||
}
|
||||
|
||||
/** Get number of events that would be returned by filters. */
|
||||
async count(
|
||||
filters: NostrFilter[],
|
||||
opts: { signal?: AbortSignal } = {},
|
||||
opts: { signal?: AbortSignal; timeout?: number } = {},
|
||||
): Promise<{ count: number; approximate: boolean }> {
|
||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
||||
if (!filters.length) return Promise.resolve({ count: 0, approximate: false });
|
||||
|
||||
this.console.debug('COUNT', JSON.stringify(filters));
|
||||
|
||||
return this.store.count(filters);
|
||||
return this.store.count(filters, { ...opts, timeout: opts.timeout ?? 500 });
|
||||
}
|
||||
|
||||
/** Return only the tags that should be indexed. */
|
||||
@@ -275,6 +279,12 @@ class EventsDB implements NStore {
|
||||
|
||||
filter.search = tokens.filter((t) => typeof t === 'string').join(' ');
|
||||
}
|
||||
|
||||
if (filter.kinds) {
|
||||
// Ephemeral events are not stored, so don't bother querying for them.
|
||||
// If this results in an empty kinds array, NDatabase will remove the filter before querying and return no results.
|
||||
filter.kinds = filter.kinds.filter((kind) => !NKinds.ephemeral(kind));
|
||||
}
|
||||
}
|
||||
|
||||
return filters;
|
||||
|
||||
0
src/utils/scavenger.test.ts
Normal file
0
src/utils/scavenger.test.ts
Normal file
Reference in New Issue
Block a user