Remove SQLite support

This commit is contained in:
Alex Gleason
2024-09-11 11:08:33 -05:00
parent f76d0af16d
commit dc8d09a9da
28 changed files with 156 additions and 568 deletions

View File

@@ -1,5 +1,3 @@
import url from 'node:url';
import * as dotenv from '@std/dotenv';
import { getPublicKey, nip19 } from 'nostr-tools';
import { z } from 'zod';
@@ -89,20 +87,6 @@ class Conf {
return Deno.env.get('TEST_DATABASE_URL') ?? 'memory://';
}
static db = {
get url(): url.UrlWithStringQuery {
return url.parse(Conf.databaseUrl);
},
get dialect(): 'sqlite' | 'postgres' | undefined {
switch (Conf.db.url.protocol) {
case 'sqlite:':
return 'sqlite';
case 'pglite:':
case 'postgres:':
case 'postgresql:':
return 'postgres';
}
return undefined;
},
/** Database query timeout configurations. */
timeouts: {
/** Default query timeout when another setting isn't more specific. */
@@ -221,21 +205,6 @@ class Conf {
static get sentryDsn(): string | undefined {
return Deno.env.get('SENTRY_DSN');
}
/** SQLite settings. */
static sqlite = {
/**
* Number of bytes to use for memory-mapped IO.
* https://www.sqlite.org/pragma.html#pragma_mmap_size
*/
get mmapSize(): number {
const value = Deno.env.get('SQLITE_MMAP_SIZE');
if (value) {
return Number(value);
} else {
return 1024 * 1024 * 1024;
}
},
};
/** Postgres settings. */
static pg = {
/** Number of connections to use in the pool. */

View File

@@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => {
async function getToken(
{ pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] },
): Promise<`token1${string}`> {
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
const token = generateToken();
const serverSeckey = generateSecretKey();

View File

@@ -578,7 +578,7 @@ const zappedByController: AppController = async (c) => {
const id = c.req.param('id');
const params = c.get('listPagination');
const store = await Storages.db();
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
const zaps = await kysely.selectFrom('event_zaps')
.selectAll()

View File

@@ -222,7 +222,7 @@ async function topicToFilter(
async function getTokenPubkey(token: string): Promise<string | undefined> {
if (token.startsWith('token1')) {
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
const { user_pubkey } = await kysely
.selectFrom('nip46_tokens')

View File

@@ -1,75 +1,66 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoPglite } from '@/db/adapters/DittoPglite.ts';
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
import { DittoTables } from '@/db/DittoTables.ts';
export type DittoDatabase = {
dialect: 'sqlite';
kysely: Kysely<DittoTables> & Kysely<NDatabaseSchema>;
} | {
dialect: 'postgres';
kysely: Kysely<DittoTables> & Kysely<NPostgresSchema>;
};
export class DittoDB {
private static db: Promise<DittoDatabase> | undefined;
private static kysely: Promise<Kysely<DittoTables>> | undefined;
static getInstance(): Promise<DittoDatabase> {
if (!this.db) {
this.db = this._getInstance();
static getInstance(): Promise<Kysely<DittoTables>> {
if (!this.kysely) {
this.kysely = this._getInstance();
}
return this.db;
return this.kysely;
}
static async _getInstance(): Promise<DittoDatabase> {
const result = {} as DittoDatabase;
static async _getInstance(): Promise<Kysely<DittoTables>> {
const { protocol } = new URL(Conf.databaseUrl);
switch (Conf.db.url.protocol) {
case 'sqlite:':
result.dialect = 'sqlite';
result.kysely = await DittoSQLite.getInstance();
break;
case 'pglite:':
result.dialect = 'postgres';
result.kysely = await DittoPglite.getInstance();
let kysely: Kysely<DittoTables>;
switch (protocol) {
case 'file:':
case 'memory:':
kysely = await DittoPglite.getInstance();
break;
case 'postgres:':
case 'postgresql:':
result.dialect = 'postgres';
result.kysely = await DittoPostgres.getInstance();
kysely = await DittoPostgres.getInstance();
break;
default:
throw new Error('Unsupported database URL.');
}
await this.migrate(result.kysely);
await this.migrate(kysely);
return result;
return kysely;
}
static get poolSize(): number {
if (Conf.db.dialect === 'postgres') {
const { protocol } = new URL(Conf.databaseUrl);
if (['postgres:', 'postgresql:'].includes(protocol)) {
return DittoPostgres.poolSize;
}
return 1;
}
static get availableConnections(): number {
if (Conf.db.dialect === 'postgres') {
const { protocol } = new URL(Conf.databaseUrl);
if (['postgres:', 'postgresql:'].includes(protocol)) {
return DittoPostgres.availableConnections;
}
return 1;
}
/** Migrate the database to the latest version. */
static async migrate(kysely: DittoDatabase['kysely']) {
static async migrate(kysely: Kysely<DittoTables>) {
const migrator = new Migrator({
db: kysely,
provider: new FileMigrationProvider({

View File

@@ -1,4 +1,6 @@
export interface DittoTables {
import { NPostgresSchema } from '@nostrify/db';
export interface DittoTables extends NPostgresSchema {
nip46_tokens: NIP46TokenRow;
author_stats: AuthorStatsRow;
event_stats: EventStatsRow;

View File

@@ -1,5 +1,4 @@
import { PGlite } from '@electric-sql/pglite';
import { NPostgresSchema } from '@nostrify/db';
import { PgliteDialect } from '@soapbox/kysely-pglite';
import { Kysely } from 'kysely';
@@ -8,17 +7,17 @@ import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPglite {
static db: Kysely<DittoTables> & Kysely<NPostgresSchema> | undefined;
static db: Kysely<DittoTables> | undefined;
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NPostgresSchema>> {
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.db) {
this.db = new Kysely({
this.db = new Kysely<DittoTables>({
dialect: new PgliteDialect({
database: new PGlite(this.path),
database: new PGlite(Conf.databaseUrl),
}),
log: KyselyLogger,
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
}) as Kysely<DittoTables>;
}
return this.db;
@@ -31,26 +30,4 @@ export class DittoPglite {
static get availableConnections(): number {
return 1;
}
/** Get the relative or absolute path based on the `DATABASE_URL`. */
static get path(): string | undefined {
if (Conf.databaseUrl === 'pglite://:memory:') {
return undefined;
}
const { host, pathname } = Conf.db.url;
if (!pathname) return '';
// Get relative path.
if (host === '') {
return pathname;
} else if (host === '.') {
return pathname;
} else if (host) {
return host + pathname;
}
return '';
}
}

View File

@@ -1,4 +1,3 @@
import { NPostgresSchema } from '@nostrify/db';
import {
BinaryOperationNode,
FunctionNode,
@@ -18,17 +17,17 @@ import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPostgres {
static db: Kysely<DittoTables> & Kysely<NPostgresSchema> | undefined;
static kysely: Kysely<DittoTables> | undefined;
static postgres?: postgres.Sql;
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NPostgresSchema>> {
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.postgres) {
this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize });
}
if (!this.db) {
this.db = new Kysely({
if (!this.kysely) {
this.kysely = new Kysely<DittoTables>({
dialect: {
createAdapter() {
return new PostgresAdapter();
@@ -46,10 +45,10 @@ export class DittoPostgres {
},
},
log: KyselyLogger,
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
});
}
return this.db;
return this.kysely;
}
static get poolSize() {

View File

@@ -1,59 +0,0 @@
import { NDatabaseSchema } from '@nostrify/db';
import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite';
import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
import SqliteWorker from '@/workers/sqlite.ts';
export class DittoSQLite {
static db: Kysely<DittoTables> & Kysely<NDatabaseSchema> | undefined;
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NDatabaseSchema>> {
if (!this.db) {
const sqliteWorker = new SqliteWorker();
await sqliteWorker.open(this.path);
this.db = new Kysely({
dialect: new PolySqliteDialect({
database: sqliteWorker,
}),
log: KyselyLogger,
}) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
// Set PRAGMA values.
await Promise.all([
sql`PRAGMA synchronous = normal`.execute(this.db),
sql`PRAGMA temp_store = memory`.execute(this.db),
sql`PRAGMA foreign_keys = ON`.execute(this.db),
sql`PRAGMA auto_vacuum = FULL`.execute(this.db),
sql`PRAGMA journal_mode = WAL`.execute(this.db),
sql.raw(`PRAGMA mmap_size = ${Conf.sqlite.mmapSize}`).execute(this.db),
]);
}
return this.db;
}
/** Get the relative or absolute path based on the `DATABASE_URL`. */
static get path() {
if (Conf.databaseUrl === 'sqlite://:memory:') {
return ':memory:';
}
const { host, pathname } = Conf.db.url;
if (!pathname) return '';
// Get relative path.
if (host === '') {
return pathname;
} else if (host === '.') {
return pathname;
} else if (host) {
return host + pathname;
}
return '';
}
}

View File

@@ -1,13 +1,8 @@
import { Kysely, sql } from 'kysely';
import { Kysely } from 'kysely';
import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect === 'sqlite') {
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
}
export async function up(_db: Kysely<any>): Promise<void> {
// This migration used to create an FTS table for SQLite, but SQLite support was removed.
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('events_fts').ifExists().execute();
export async function down(_db: Kysely<any>): Promise<void> {
}

View File

@@ -1,25 +1,13 @@
import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts';
import { Kysely } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('events').renameTo('nostr_events').execute();
await db.schema.alterTable('tags').renameTo('nostr_tags').execute();
await db.schema.alterTable('nostr_tags').renameColumn('tag', 'name').execute();
if (Conf.db.dialect === 'sqlite') {
await db.schema.dropTable('events_fts').execute();
await sql`CREATE VIRTUAL TABLE nostr_fts5 USING fts5(event_id, content)`.execute(db);
}
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('nostr_events').renameTo('events').execute();
await db.schema.alterTable('nostr_tags').renameTo('tags').execute();
await db.schema.alterTable('tags').renameColumn('name', 'tag').execute();
if (Conf.db.dialect === 'sqlite') {
await db.schema.dropTable('nostr_fts5').execute();
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
}
}

View File

@@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => {
if (bech32.startsWith('token1')) {
try {
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
const { user_pubkey, server_seckey, relays } = await kysely
.selectFrom('nip46_tokens')

View File

@@ -53,7 +53,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
throw new RelayError('blocked', 'user is disabled');
}
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
await Promise.all([
storeEvent(event, signal),
@@ -104,7 +104,7 @@ async function existsInDB(event: DittoEvent): Promise<boolean> {
async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
await hydrateEvents({ events: [event], store: await Storages.db(), signal });
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
const domain = await kysely
.selectFrom('pubkey_domains')
.select('domain')
@@ -118,7 +118,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> {
if (NKinds.ephemeral(event.kind)) return;
const store = await Storages.db();
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
await updateStats({ event, store, kysely }).catch(debug);
await store.event(event, { signal });
@@ -146,7 +146,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
// Track pubkey domain.
try {
const { kysely } = await DittoDB.getInstance();
const kysely = await DittoDB.getInstance();
const { domain } = parseNip05(nip05);
await sql`

View File

@@ -16,7 +16,7 @@ export class Storages {
private static _pubsub: Promise<InternalRelay> | undefined;
private static _search: Promise<SearchStore> | undefined;
/** SQLite database to store events this Ditto server cares about. */
/** SQL database to store events this Ditto server cares about. */
public static async db(): Promise<EventsDB> {
if (!this._db) {
this._db = (async () => {

View File

@@ -13,10 +13,11 @@ import {
NStore,
} from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely } from 'kysely';
import { nip27 } from 'nostr-tools';
import { Conf } from '@/config.ts';
import { DittoDatabase } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { dbEventsCounter } from '@/metrics.ts';
import { RelayError } from '@/RelayError.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
@@ -30,7 +31,7 @@ type TagCondition = ({ event, count, value }: {
value: string;
}) => boolean;
/** SQLite database storage adapter for Nostr events. */
/** SQL database storage adapter for Nostr events. */
class EventsDB implements NStore {
private store: NDatabase | NPostgres;
private console = new Stickynotes('ditto:db:events');
@@ -52,21 +53,11 @@ class EventsDB implements NStore {
't': ({ event, count, value }) => (event.kind === 1985 ? count < 20 : count < 5) && value.length < 50,
};
constructor(private database: DittoDatabase) {
const { dialect, kysely } = database;
if (dialect === 'postgres') {
this.store = new NPostgres(kysely, {
indexTags: EventsDB.indexTags,
indexSearch: EventsDB.searchText,
});
} else {
this.store = new NDatabase(kysely, {
fts: 'sqlite',
indexTags: EventsDB.indexTags,
searchText: EventsDB.searchText,
});
}
constructor(private kysely: Kysely<DittoTables>) {
this.store = new NPostgres(kysely, {
indexTags: EventsDB.indexTags,
indexSearch: EventsDB.searchText,
});
}
/** Insert an event (and its tags) into the database. */
@@ -273,7 +264,7 @@ class EventsDB implements NStore {
return tags.map(([_tag, value]) => value).join('\n');
}
/** Converts filters to more performant, simpler filters that are better for SQLite. */
/** Converts filters to more performant, simpler filters. */
async expandFilters(filters: NostrFilter[]): Promise<NostrFilter[]> {
filters = structuredClone(filters);
@@ -286,7 +277,7 @@ class EventsDB implements NStore {
) as { key: 'domain'; value: string } | undefined)?.value;
if (domain) {
const query = this.database.kysely
const query = this.kysely
.selectFrom('pubkey_domains')
.select('pubkey')
.where('domain', '=', domain);

View File

@@ -18,7 +18,7 @@ interface HydrateOpts {
/** Hydrate events using the provided storage. */
async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts;
const { events, store, signal, kysely = await DittoDB.getInstance() } = opts;
if (!events.length) {
return events;

View File

@@ -1,21 +1,17 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { Database as Sqlite } from '@db/sqlite';
import { NDatabase, NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
import { PGlite } from '@electric-sql/pglite';
import { NostrEvent } from '@nostrify/nostrify';
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
import { PgliteDialect } from '@soapbox/kysely-pglite';
import { finalizeEvent, generateSecretKey } from 'nostr-tools';
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Kysely } from 'kysely';
import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js';
import postgres from 'postgres';
import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts';
import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
import { Conf } from '@/config.ts';
/** Import an event fixture by name in tests. */
export async function eventFixture(name: string): Promise<NostrEvent> {
@@ -42,97 +38,45 @@ export function genEvent(t: Partial<NostrEvent> = {}, sk: Uint8Array = generateS
return purifyEvent(event);
}
/** Get an in-memory SQLite database to use for testing. It's automatically destroyed when it goes out of scope. */
export async function getTestDB() {
const kysely = new Kysely<DittoTables>({
dialect: new DenoSqlite3Dialect({
database: new Sqlite(':memory:'),
}),
});
/** Create an database for testing. */
export const createTestDB = async (databaseUrl = Conf.testDatabaseUrl) => {
const { protocol } = new URL(databaseUrl);
const migrator = new Migrator({
db: kysely,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname,
}),
});
const kysely: Kysely<DittoTables> = (() => {
switch (protocol) {
case 'postgres:':
case 'postgresql:':
return new Kysely({
// @ts-ignore Kysely version mismatch.
dialect: new PostgresJSDialect({
postgres: postgres(databaseUrl, {
max: Conf.pg.poolSize,
}) as unknown as PostgresJSDialectConfig['postgres'],
}),
log: KyselyLogger,
});
case 'file:':
case 'memory:':
return new Kysely({
dialect: new PgliteDialect({
database: new PGlite(databaseUrl),
}),
});
default:
throw new Error(`Unsupported database URL protocol: ${protocol}`);
}
})();
await migrator.migrateToLatest();
const store = new NDatabase(kysely);
await DittoDB.migrate(kysely);
const store = new EventsDB(kysely);
return {
store,
kysely,
[Symbol.asyncDispose]: () => kysely.destroy(),
};
}
/** Create an database for testing. */
export const createTestDB = async (databaseUrl?: string) => {
databaseUrl ??= Deno.env.get('DATABASE_URL') ?? 'sqlite://:memory:';
let dialect: 'sqlite' | 'postgres' = (() => {
const protocol = databaseUrl.split(':')[0];
switch (protocol) {
case 'sqlite':
return 'sqlite';
case 'postgres':
return protocol;
case 'postgresql':
return 'postgres';
default:
throw new Error(`Unsupported protocol: ${protocol}`);
}
})();
const allowToUseDATABASE_URL = Deno.env.get('ALLOW_TO_USE_DATABASE_URL')?.toLowerCase() ?? '';
if (allowToUseDATABASE_URL !== 'true' && dialect === 'postgres') {
console.warn(
'%cRunning tests with sqlite, if you meant to use Postgres, run again with ALLOW_TO_USE_DATABASE_URL environment variable set to true',
'color: yellow;',
);
dialect = 'sqlite';
}
console.warn(`Using: ${dialect}`);
const db: DittoDatabase = { dialect } as DittoDatabase;
if (dialect === 'sqlite') {
// migration 021_pgfts_index.ts calls 'Conf.db.dialect',
// and this calls the DATABASE_URL environment variable.
// The following line ensures to NOT use the DATABASE_URL that may exist in an .env file.
Deno.env.set('DATABASE_URL', 'sqlite://:memory:');
db.kysely = new Kysely({
dialect: new DenoSqlite3Dialect({
database: new Sqlite(':memory:'),
}),
}) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
} else {
db.kysely = new Kysely({
// @ts-ignore Kysely version mismatch.
dialect: new PostgresJSDialect({
postgres: postgres(Conf.databaseUrl, {
max: Conf.pg.poolSize,
}) as unknown as PostgresJSDialectConfig['postgres'],
}),
log: KyselyLogger,
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
}
await DittoDB.migrate(db.kysely);
const store = new EventsDB(db);
return {
dialect,
store,
kysely: db.kysely,
[Symbol.asyncDispose]: async () => {
if (dialect === 'postgres') {
// If we're testing against real Postgres, we will reuse the database
// between tests, so we should drop the tables to keep each test fresh.
if (['postgres:', 'postgresql:'].includes(protocol)) {
for (
const table of [
'author_stats',
@@ -142,16 +86,13 @@ export const createTestDB = async (databaseUrl?: string) => {
'kysely_migration_lock',
'nip46_tokens',
'pubkey_domains',
'unattached_media',
'nostr_events',
'nostr_tags',
'nostr_pgfts',
'event_zaps',
]
) {
await db.kysely.schema.dropTable(table).ifExists().cascade().execute();
await kysely.schema.dropTable(table).ifExists().cascade().execute();
}
await db.kysely.destroy();
await kysely.destroy();
}
},
};

View File

@@ -1,9 +1,10 @@
import { NostrFilter } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { sql } from 'kysely';
import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { handleEvent } from '@/pipeline.ts';
import { AdminSigner } from '@/signers/AdminSigner.ts';
import { Time } from '@/utils/time.ts';
@@ -13,88 +14,50 @@ const console = new Stickynotes('ditto:trends');
/** Get trending tag values for a given tag in the given time frame. */
export async function getTrendingTagValues(
/** Kysely instance to execute queries on. */
{ dialect, kysely }: DittoDatabase,
kysely: Kysely<DittoTables>,
/** Tag name to filter by, eg `t` or `r`. */
tagNames: string[],
/** Filter of eligible events. */
filter: NostrFilter,
): Promise<{ value: string; authors: number; uses: number }[]> {
if (dialect === 'postgres') {
let query = kysely
.selectFrom([
'nostr_events',
sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'),
sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'),
])
.select(({ fn }) => [
fn<string>('lower', ['element.value']).as('value'),
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
fn.countAll<number>().as('uses'),
])
.where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames)))
.groupBy((eb) => eb.fn<string>('lower', ['element.value']))
.orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
let query = kysely
.selectFrom([
'nostr_events',
sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'),
sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'),
])
.select(({ fn }) => [
fn<string>('lower', ['element.value']).as('value'),
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
fn.countAll<number>().as('uses'),
])
.where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames)))
.groupBy((eb) => eb.fn<string>('lower', ['element.value']))
.orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
if (filter.kinds) {
query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds)));
}
if (filter.authors) {
query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors)));
}
if (typeof filter.since === 'number') {
query = query.where('nostr_events.created_at', '>=', filter.since);
}
if (typeof filter.until === 'number') {
query = query.where('nostr_events.created_at', '<=', filter.until);
}
if (typeof filter.limit === 'number') {
query = query.limit(filter.limit);
}
const rows = await query.execute();
return rows.map((row) => ({
value: row.value,
authors: Number(row.authors),
uses: Number(row.uses),
}));
if (filter.kinds) {
query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds)));
}
if (filter.authors) {
query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors)));
}
if (typeof filter.since === 'number') {
query = query.where('nostr_events.created_at', '>=', filter.since);
}
if (typeof filter.until === 'number') {
query = query.where('nostr_events.created_at', '<=', filter.until);
}
if (typeof filter.limit === 'number') {
query = query.limit(filter.limit);
}
if (dialect === 'sqlite') {
let query = kysely
.selectFrom('nostr_tags')
.select(({ fn }) => [
'nostr_tags.value',
fn.agg<number>('count', ['nostr_tags.pubkey']).distinct().as('authors'),
fn.countAll<number>().as('uses'),
])
.where('nostr_tags.name', 'in', tagNames)
.groupBy('nostr_tags.value')
.orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc');
const rows = await query.execute();
if (filter.kinds) {
query = query.where('nostr_tags.kind', 'in', filter.kinds);
}
if (typeof filter.since === 'number') {
query = query.where('nostr_tags.created_at', '>=', filter.since);
}
if (typeof filter.until === 'number') {
query = query.where('nostr_tags.created_at', '<=', filter.until);
}
if (typeof filter.limit === 'number') {
query = query.limit(filter.limit);
}
const rows = await query.execute();
return rows.map((row) => ({
value: row.value,
authors: Number(row.authors),
uses: Number(row.uses),
}));
}
return [];
return rows.map((row) => ({
value: row.value,
authors: Number(row.authors),
uses: Number(row.uses),
}));
}
/** Get trending tags and publish an event with them. */

View File

@@ -17,7 +17,7 @@ export class SimpleLRU<
constructor(fetchFn: FetchFn<K, V, { signal: AbortSignal }>, opts: LRUCache.Options<K, V, void>) {
this.cache = new LRUCache({
fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as AbortSignal }),
fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as unknown as AbortSignal }),
...opts,
});
}

View File

@@ -5,7 +5,6 @@ import { z } from 'zod';
import { DittoTables } from '@/db/DittoTables.ts';
import { findQuoteTag, findReplyTag, getTagSet } from '@/utils/tags.ts';
import { Conf } from '@/config.ts';
interface UpdateStatsOpts {
kysely: Kysely<DittoTables>;
@@ -197,16 +196,13 @@ export async function updateAuthorStats(
notes_count: 0,
};
let query = kysely
const prev = await kysely
.selectFrom('author_stats')
.selectAll()
.where('pubkey', '=', pubkey);
.forUpdate()
.where('pubkey', '=', pubkey)
.executeTakeFirst();
if (Conf.db.dialect === 'postgres') {
query = query.forUpdate();
}
const prev = await query.executeTakeFirst();
const stats = fn(prev ?? empty);
if (prev) {
@@ -249,16 +245,13 @@ export async function updateEventStats(
reactions: '{}',
};
let query = kysely
const prev = await kysely
.selectFrom('event_stats')
.selectAll()
.where('event_id', '=', eventId);
.forUpdate()
.where('event_id', '=', eventId)
.executeTakeFirst();
if (Conf.db.dialect === 'postgres') {
query = query.forUpdate();
}
const prev = await query.executeTakeFirst();
const stats = fn(prev ?? empty);
if (prev) {

View File

@@ -1,7 +1,7 @@
import Debug from '@soapbox/stickynotes/debug';
import * as Comlink from 'comlink';
import './handlers/abortsignal.ts';
import '@/workers/handlers/abortsignal.ts';
import '@/sentry.ts';
const debug = Debug('ditto:fetch.worker');

View File

@@ -1,52 +0,0 @@
import * as Comlink from 'comlink';
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
import { CompiledQuery, QueryResult } from 'kysely';
import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts';
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
class SqliteWorker {
#worker: Worker;
#client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>;
#ready: Promise<void>;
constructor() {
this.#worker = new Worker(new URL('./sqlite.worker.ts', import.meta.url).href, { type: 'module' });
this.#client = Comlink.wrap<typeof _SqliteWorker>(this.#worker);
this.#ready = new Promise<void>((resolve) => {
const handleEvent = (event: MessageEvent) => {
if (event.data[0] === 'ready') {
this.#worker.removeEventListener('message', handleEvent);
resolve();
}
};
this.#worker.addEventListener('message', handleEvent);
});
}
async open(path: string): Promise<void> {
await this.#ready;
return this.#client.open(path);
}
async executeQuery<R>(query: CompiledQuery): Promise<QueryResult<R>> {
await this.#ready;
return this.#client.executeQuery(query) as Promise<QueryResult<R>>;
}
async *streamQuery<R>(query: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
await this.#ready;
for await (const result of await this.#client.streamQuery(query)) {
yield result as QueryResult<R>;
}
}
destroy(): Promise<void> {
return this.#client.destroy();
}
}
export default SqliteWorker;

View File

@@ -1,42 +0,0 @@
/// <reference lib="webworker" />
import { Database as SQLite } from '@db/sqlite';
import * as Comlink from 'comlink';
import { CompiledQuery, QueryResult } from 'kysely';
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
import '@/sentry.ts';
let db: SQLite | undefined;
export const SqliteWorker = {
open(path: string): void {
db = new SQLite(path);
},
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
if (!db) throw new Error('Database not open');
return {
rows: db!.prepare(sql).all(...parameters as any[]) as R[],
numAffectedRows: BigInt(db!.changes),
insertId: BigInt(db!.lastInsertRowId),
};
},
async *streamQuery<R>({ sql, parameters }: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
if (!db) throw new Error('Database not open');
const stmt = db.prepare(sql).bind(...parameters as any[]);
for (const row of stmt) {
yield {
rows: [row],
};
}
},
destroy() {
db?.close();
},
};
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
Comlink.expose(SqliteWorker);
self.postMessage(['ready']);