mirror of
https://github.com/aljazceru/ditto.git
synced 2025-12-21 23:44:20 +01:00
Remove UserData type, hydrate the event instead
This commit is contained in:
@@ -2,7 +2,6 @@ import { Conf } from '@/config.ts';
|
||||
import { encryptAdmin } from '@/crypto.ts';
|
||||
import { addRelays } from '@/db/relays.ts';
|
||||
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
||||
import { findUser } from '@/db/users.ts';
|
||||
import { Debug, type Event, LNURL } from '@/deps.ts';
|
||||
import { isEphemeralKind } from '@/kinds.ts';
|
||||
import { isLocallyFollowed } from '@/queries.ts';
|
||||
@@ -10,13 +9,13 @@ import { updateStats } from '@/stats.ts';
|
||||
import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { getTagSet } from '@/tags.ts';
|
||||
import { type EventData } from '@/types.ts';
|
||||
import { eventAge, isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
import { TrendsWorker } from '@/workers/trends.ts';
|
||||
import { verifySignatureWorker } from '@/workers/verify.ts';
|
||||
import { signAdminEvent } from '@/sign.ts';
|
||||
import { lnurlCache } from '@/utils/lnurl.ts';
|
||||
import { DittoEvent } from '@/storages/types.ts';
|
||||
|
||||
const debug = Debug('ditto:pipeline');
|
||||
|
||||
@@ -24,24 +23,24 @@ const debug = Debug('ditto:pipeline');
|
||||
* Common pipeline function to process (and maybe store) events.
|
||||
* It is idempotent, so it can be called multiple times for the same event.
|
||||
*/
|
||||
async function handleEvent(event: Event): Promise<void> {
|
||||
async function handleEvent(event: DittoEvent): Promise<void> {
|
||||
const signal = AbortSignal.timeout(5000);
|
||||
if (!(await verifySignatureWorker(event))) return;
|
||||
const wanted = reqmeister.isWanted(event);
|
||||
if (await encounterEvent(event)) return;
|
||||
debug(`Event<${event.kind}> ${event.id}`);
|
||||
const data = await getEventData(event);
|
||||
await hydrateEvent(event);
|
||||
|
||||
await Promise.all([
|
||||
storeEvent(event, data, { force: wanted }),
|
||||
storeEvent(event, { force: wanted }),
|
||||
processDeletions(event),
|
||||
trackRelays(event),
|
||||
trackHashtags(event),
|
||||
fetchRelatedEvents(event, data, signal),
|
||||
processMedia(event, data),
|
||||
payZap(event, data, signal),
|
||||
streamOut(event, data),
|
||||
broadcast(event, data),
|
||||
fetchRelatedEvents(event, signal),
|
||||
processMedia(event),
|
||||
payZap(event, signal),
|
||||
streamOut(event),
|
||||
broadcast(event),
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -53,10 +52,10 @@ async function encounterEvent(event: Event): Promise<boolean> {
|
||||
return preexisting;
|
||||
}
|
||||
|
||||
/** Preload data that will be useful to several tasks. */
|
||||
async function getEventData({ pubkey }: Event): Promise<EventData> {
|
||||
const user = await findUser({ pubkey });
|
||||
return { user };
|
||||
/** Hydrate the event with the user, if applicable. */
|
||||
async function hydrateEvent(event: DittoEvent): Promise<void> {
|
||||
const [user] = await eventsDB.filter([{ kinds: [30361], authors: [Conf.pubkey], limit: 1 }]);
|
||||
event.user = user;
|
||||
}
|
||||
|
||||
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
|
||||
@@ -67,11 +66,11 @@ interface StoreEventOpts {
|
||||
}
|
||||
|
||||
/** Maybe store the event, if eligible. */
|
||||
async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
|
||||
async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise<void> {
|
||||
if (isEphemeralKind(event.kind)) return;
|
||||
const { force = false } = opts;
|
||||
|
||||
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
||||
if (force || event.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
||||
const isDeleted = await eventsDB.count(
|
||||
[{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }],
|
||||
) > 0;
|
||||
@@ -80,7 +79,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
||||
return Promise.reject(new RelayError('blocked', 'event was deleted'));
|
||||
} else {
|
||||
await Promise.all([
|
||||
eventsDB.add(event, { data }).catch(debug),
|
||||
eventsDB.add(event).catch(debug),
|
||||
updateStats(event).catch(debug),
|
||||
]);
|
||||
}
|
||||
@@ -144,8 +143,8 @@ function trackRelays(event: Event) {
|
||||
}
|
||||
|
||||
/** Queue related events to fetch. */
|
||||
function fetchRelatedEvents(event: Event, data: EventData, signal: AbortSignal) {
|
||||
if (!data.user) {
|
||||
function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) {
|
||||
if (!event.user) {
|
||||
reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
|
||||
}
|
||||
for (const [name, id, relay] of event.tags) {
|
||||
@@ -156,7 +155,7 @@ function fetchRelatedEvents(event: Event, data: EventData, signal: AbortSignal)
|
||||
}
|
||||
|
||||
/** Delete unattached media entries that are attached to the event. */
|
||||
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
|
||||
function processMedia({ tags, pubkey, user }: DittoEvent) {
|
||||
if (user) {
|
||||
const urls = getTagSet(tags, 'media');
|
||||
return deleteAttachedMedia(pubkey, [...urls]);
|
||||
@@ -164,8 +163,8 @@ function processMedia({ tags, pubkey }: Event, { user }: EventData) {
|
||||
}
|
||||
|
||||
/** Emit Nostr Wallet Connect event from zaps so users may pay. */
|
||||
async function payZap(event: Event, data: EventData, signal: AbortSignal) {
|
||||
if (event.kind !== 9734 || !data.user) return;
|
||||
async function payZap(event: DittoEvent, signal: AbortSignal) {
|
||||
if (event.kind !== 9734 || !event.user) return;
|
||||
|
||||
const lnurl = event.tags.find(([name]) => name === 'lnurl')?.[1];
|
||||
const amount = Number(event.tags.find(([name]) => name === 'amount')?.[1]);
|
||||
@@ -212,10 +211,10 @@ async function payZap(event: Event, data: EventData, signal: AbortSignal) {
|
||||
const isFresh = (event: Event): boolean => eventAge(event) < Time.seconds(10);
|
||||
|
||||
/** Distribute the event through active subscriptions. */
|
||||
function streamOut(event: Event, data: EventData) {
|
||||
function streamOut(event: Event) {
|
||||
if (!isFresh(event)) return;
|
||||
|
||||
for (const sub of Sub.matches(event, data)) {
|
||||
for (const sub of Sub.matches(event)) {
|
||||
sub.stream(event);
|
||||
}
|
||||
}
|
||||
@@ -224,8 +223,8 @@ function streamOut(event: Event, data: EventData) {
|
||||
* Publish the event to other relays.
|
||||
* This should only be done in certain circumstances, like mentioning a user or publishing deletions.
|
||||
*/
|
||||
function broadcast(event: Event, data: EventData) {
|
||||
if (!data.user || !isFresh(event)) return;
|
||||
function broadcast(event: DittoEvent) {
|
||||
if (!event.user || !isFresh(event)) return;
|
||||
|
||||
if (event.kind === 5) {
|
||||
client.add(event);
|
||||
|
||||
Reference in New Issue
Block a user