Merge branch 'main' into zap-notification-streaming

This commit is contained in:
P. Reis
2024-09-16 09:18:40 -03:00
15 changed files with 350 additions and 75 deletions

View File

@@ -6,6 +6,7 @@ import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
import { getAuthor, getFollowedPubkeys } from '@/queries.ts';
import { booleanParamSchema, fileSchema } from '@/schema.ts';
import { getPubkeysBySearch } from '@/controllers/api/search.ts';
import { Storages } from '@/storages.ts';
import { uploadFile } from '@/utils/upload.ts';
import { nostrNow } from '@/utils.ts';
@@ -115,6 +116,7 @@ const accountSearchQuerySchema = z.object({
const accountSearchController: AppController = async (c) => {
const { signal } = c.req.raw;
const { limit } = c.get('pagination');
const kysely = await Storages.kysely();
const result = accountSearchQuerySchema.safeParse(c.req.query());
@@ -133,8 +135,17 @@ const accountSearchController: AppController = async (c) => {
return c.json(pubkey ? [await accountFromPubkey(pubkey)] : []);
}
const events = event ? [event] : await store.query([{ kinds: [0], search: query, limit }], { signal });
const pubkeys = await getPubkeysBySearch(kysely, { q: query, limit });
let events = event ? [event] : await store.query([{ kinds: [0], authors: pubkeys, limit }], {
signal,
});
if (!event) {
events = pubkeys
.map((pubkey) => events.find((event) => event.pubkey === pubkey))
.filter((event) => !!event);
}
const accounts = await hydrateEvents({ events, store, signal }).then(
(events) =>
Promise.all(

View File

@@ -0,0 +1,21 @@
import { assertEquals } from '@std/assert';
import { createTestDB } from '@/test.ts';
import { getPubkeysBySearch } from '@/controllers/api/search.ts';
Deno.test('fuzzy search works', async () => {
await using db = await createTestDB();
await db.kysely.insertInto('author_search').values({
pubkey: '47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4',
search: 'patrickReiis patrickdosreis.com',
}).execute();
assertEquals(await getPubkeysBySearch(db.kysely, { q: 'pat rick', limit: 1 }), []);
assertEquals(await getPubkeysBySearch(db.kysely, { q: 'patrick dos reis', limit: 1 }), [
'47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4',
]);
assertEquals(await getPubkeysBySearch(db.kysely, { q: 'dosreis.com', limit: 1 }), [
'47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4',
]);
});

View File

@@ -1,8 +1,10 @@
import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify';
import { nip19 } from 'nostr-tools';
import { Kysely, sql } from 'kysely';
import { z } from 'zod';
import { AppController } from '@/app.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { booleanParamSchema } from '@/schema.ts';
import { Storages } from '@/storages.ts';
import { hydrateEvents } from '@/storages/hydrate.ts';
@@ -47,9 +49,8 @@ const searchController: AppController = async (c) => {
if (event) {
events = [event];
} else {
events = await searchEvents(result.data, signal);
}
events.push(...(await searchEvents(result.data, signal)));
const viewerPubkey = await c.get('signer')?.getPublicKey();
@@ -89,10 +90,33 @@ async function searchEvents({ q, type, limit, account_id }: SearchQuery, signal:
filter.authors = [account_id];
}
const pubkeys: string[] = [];
if (type === 'accounts') {
const kysely = await Storages.kysely();
pubkeys.push(...(await getPubkeysBySearch(kysely, { q, limit })));
if (!filter?.authors) {
filter.authors = pubkeys;
} else {
filter.authors.push(...pubkeys);
}
filter.search = undefined;
}
const store = await Storages.search();
return store.query([filter], { signal })
let events = await store.query([filter], { signal })
.then((events) => hydrateEvents({ events, store, signal }));
if (type !== 'accounts') return events;
events = pubkeys
.map((pubkey) => events.find((event) => event.pubkey === pubkey))
.filter((event) => !!event);
return events;
}
/** Get event kinds to search from `type` query param. */
@@ -170,4 +194,16 @@ async function getLookupFilters({ q, type, resolve }: SearchQuery, signal: Abort
return [];
}
export { searchController };
/** Get pubkeys whose name and NIP-05 is similar to 'q' */
async function getPubkeysBySearch(kysely: Kysely<DittoTables>, { q, limit }: Pick<SearchQuery, 'q' | 'limit'>) {
const pubkeys = (await sql<{ pubkey: string }>`
SELECT *, word_similarity(${q}, search) AS sml
FROM author_search
WHERE ${q} % search
ORDER BY sml DESC, search LIMIT ${limit}
`.execute(kysely)).rows.map(({ pubkey }) => pubkey);
return pubkeys;
}
export { getPubkeysBySearch, searchController };

View File

@@ -1,13 +1,21 @@
import { Nullable } from 'kysely';
import { NPostgresSchema } from '@nostrify/db';
export interface DittoTables extends NPostgresSchema {
nostr_events: NostrEventsRow;
nip46_tokens: NIP46TokenRow;
author_stats: AuthorStatsRow;
event_stats: EventStatsRow;
pubkey_domains: PubkeyDomainRow;
event_zaps: EventZapRow;
author_search: AuthorSearch;
}
type NostrEventsRow = NPostgresSchema['nostr_events'] & {
language: Nullable<string>;
};
interface AuthorStatsRow {
pubkey: string;
followers_count: number;
@@ -47,3 +55,8 @@ interface EventZapRow {
amount_millisats: number;
comment: string;
}
interface AuthorSearch {
pubkey: string;
search: string;
}

View File

@@ -1,4 +1,5 @@
import { PGlite } from '@electric-sql/pglite';
import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm';
import { PgliteDialect } from '@soapbox/kysely-pglite';
import { Kysely } from 'kysely';
@@ -10,7 +11,7 @@ export class DittoPglite {
static create(databaseUrl: string): DittoDatabase {
const kysely = new Kysely<DittoTables>({
dialect: new PgliteDialect({
database: new PGlite(databaseUrl),
database: new PGlite(databaseUrl, { extensions: { pg_trgm } }),
}),
log: KyselyLogger,
});

View File

@@ -0,0 +1,18 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('author_search')
.addColumn('pubkey', 'char(64)', (col) => col.primaryKey())
.addColumn('search', 'text', (col) => col.notNull())
.ifNotExists()
.execute();
await sql`CREATE EXTENSION IF NOT EXISTS pg_trgm`.execute(db);
await sql`CREATE INDEX author_search_search_idx ON author_search USING GIN (search gin_trgm_ops)`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropIndex('author_search_search_idx').ifExists().execute();
await db.schema.dropTable('author_search').execute();
}

View File

@@ -0,0 +1,15 @@
import { Kysely } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('nostr_events').addColumn('language', 'char(2)').execute();
await db.schema.createIndex('nostr_events_language_created_idx')
.on('nostr_events')
.columns(['language', 'created_at desc', 'id asc', 'kind'])
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('nostr_events').dropColumn('language').execute();
await db.schema.dropIndex('nostr_events_language_created_idx').execute();
}

View File

@@ -1,6 +1,8 @@
import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug';
import ISO6391 from 'iso-639-1';
import { Kysely, sql } from 'kysely';
import lande from 'lande';
import { LRUCache } from 'lru-cache';
import { z } from 'zod';
@@ -55,10 +57,11 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
const kysely = await Storages.kysely();
await storeEvent(purifyEvent(event), signal);
await Promise.all([
storeEvent(purifyEvent(event), signal),
handleZaps(kysely, event),
parseMetadata(event, signal),
setLanguage(event),
generateSetEvents(event),
streamOut(event),
]);
@@ -133,34 +136,65 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
const metadata = n.json().pipe(n.metadata()).catch({}).safeParse(event.content);
if (!metadata.success) return;
const kysely = await Storages.kysely();
// Get nip05.
const { nip05 } = metadata.data;
if (!nip05) return;
const { name, nip05 } = metadata.data;
const result = nip05 ? await nip05Cache.fetch(nip05, { signal }).catch(() => undefined) : undefined;
// Fetch nip05.
const result = await nip05Cache.fetch(nip05, { signal }).catch(() => undefined);
if (!result) return;
// Ensure pubkey matches event.
const { pubkey } = result;
if (pubkey !== event.pubkey) return;
// Track pubkey domain.
// Populate author_search.
try {
const kysely = await Storages.kysely();
const { domain } = parseNip05(nip05);
const search = result?.pubkey === event.pubkey ? [name, nip05].filter(Boolean).join(' ').trim() : name ?? '';
await sql`
INSERT INTO pubkey_domains (pubkey, domain, last_updated_at)
VALUES (${pubkey}, ${domain}, ${event.created_at})
ON CONFLICT(pubkey) DO UPDATE SET
domain = excluded.domain,
last_updated_at = excluded.last_updated_at
WHERE excluded.last_updated_at > pubkey_domains.last_updated_at
`.execute(kysely);
} catch (_e) {
if (search) {
await kysely.insertInto('author_search')
.values({ pubkey: event.pubkey, search })
.onConflict((oc) => oc.column('pubkey').doUpdateSet({ search }))
.execute();
}
} catch {
// do nothing
}
if (nip05 && result && result.pubkey === event.pubkey) {
// Track pubkey domain.
try {
const { domain } = parseNip05(nip05);
await sql`
INSERT INTO pubkey_domains (pubkey, domain, last_updated_at)
VALUES (${event.pubkey}, ${domain}, ${event.created_at})
ON CONFLICT(pubkey) DO UPDATE SET
domain = excluded.domain,
last_updated_at = excluded.last_updated_at
WHERE excluded.last_updated_at > pubkey_domains.last_updated_at
`.execute(kysely);
} catch (_e) {
// do nothing
}
}
}
/** Update the event in the database and set its language. */
async function setLanguage(event: NostrEvent): Promise<void> {
const [topResult] = lande(event.content);
if (topResult) {
const [iso6393, confidence] = topResult;
const locale = new Intl.Locale(iso6393);
if (confidence >= 0.95 && ISO6391.validate(locale.language)) {
const kysely = await Storages.kysely();
try {
await kysely.updateTable('nostr_events')
.set('language', locale.language)
.where('id', '=', event.id)
.execute();
} catch {
// do nothing
}
}
}
}
/** Determine if the event is being received in a timely manner. */

View File

@@ -54,6 +54,23 @@ Deno.test('query events with domain search filter', async () => {
assertEquals(await store.query([{ kinds: [1], search: 'domain:example.com' }]), []);
});
Deno.test('query events with language search filter', async () => {
await using db = await createTestDB();
const { store, kysely } = db;
const en = genEvent({ kind: 1, content: 'hello world!' });
const es = genEvent({ kind: 1, content: 'hola mundo!' });
await store.event(en);
await store.event(es);
await kysely.updateTable('nostr_events').set('language', 'en').where('id', '=', en.id).execute();
await kysely.updateTable('nostr_events').set('language', 'es').where('id', '=', es.id).execute();
assertEquals(await store.query([{ search: 'language:en' }]), [en]);
assertEquals(await store.query([{ search: 'language:es' }]), [es]);
});
Deno.test('delete events', async () => {
await using db = await createTestDB();
const { store } = db;

View File

@@ -3,7 +3,7 @@
import { NPostgres, NPostgresSchema } from '@nostrify/db';
import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely } from 'kysely';
import { Kysely, SelectQueryBuilder } from 'kysely';
import { nip27 } from 'nostr-tools';
import { DittoTables } from '@/db/DittoTables.ts';
@@ -145,8 +145,36 @@ class EventsDB extends NPostgres {
}
protected getFilterQuery(trx: Kysely<NPostgresSchema>, filter: NostrFilter) {
const query = super.getFilterQuery(trx, filter);
return query;
if (filter.search) {
const tokens = NIP50.parseInput(filter.search);
let query = super.getFilterQuery(trx, {
...filter,
search: tokens.filter((t) => typeof t === 'string').join(' '),
}) as SelectQueryBuilder<DittoTables, 'nostr_events', Pick<DittoTables['nostr_events'], keyof NostrEvent>>;
const data = tokens.filter((t) => typeof t === 'object').reduce(
(acc, t) => acc.set(t.key, t.value),
new Map<string, string>(),
);
const domain = data.get('domain');
const language = data.get('language');
if (domain) {
query = query
.innerJoin('pubkey_domains', 'nostr_events.pubkey', 'pubkey_domains.pubkey')
.where('pubkey_domains.domain', '=', domain);
}
if (language) {
query = query.where('language', '=', language);
}
return query;
}
return super.getFilterQuery(trx, filter);
}
/** Get events for filters from the database. */
@@ -260,35 +288,6 @@ class EventsDB extends NPostgres {
filters = structuredClone(filters);
for (const filter of filters) {
if (filter.search) {
const tokens = NIP50.parseInput(filter.search);
const domain = (tokens.find((t) =>
typeof t === 'object' && t.key === 'domain'
) as { key: 'domain'; value: string } | undefined)?.value;
if (domain) {
const query = this.opts.kysely
.selectFrom('pubkey_domains')
.select('pubkey')
.where('domain', '=', domain);
if (filter.authors) {
query.where('pubkey', 'in', filter.authors);
}
const pubkeys = await query
.execute()
.then((rows) =>
rows.map((row) => row.pubkey)
);
filter.authors = pubkeys;
}
filter.search = tokens.filter((t) => typeof t === 'string').join(' ');
}
if (filter.kinds) {
// Ephemeral events are not stored, so don't bother querying for them.
// If this results in an empty kinds array, NDatabase will remove the filter before querying and return no results.

View File

@@ -63,6 +63,7 @@ export async function createTestDB() {
'pubkey_domains',
'nostr_events',
'event_zaps',
'author_search',
]
) {
await kysely.schema.dropTable(table).ifExists().cascade().execute();