mirror of
https://github.com/aljazceru/ditto.git
synced 2026-01-06 15:14:23 +01:00
Remove TrendsWorker
This commit is contained in:
@@ -13,9 +13,8 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
|
||||
import { RelayError } from '@/RelayError.ts';
|
||||
import { hydrateEvents } from '@/storages/hydrate.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { eventAge, nostrDate, parseNip05, Time } from '@/utils.ts';
|
||||
import { eventAge, parseNip05, Time } from '@/utils.ts';
|
||||
import { policyWorker } from '@/workers/policy.ts';
|
||||
import { TrendsWorker } from '@/workers/trends.ts';
|
||||
import { verifyEventWorker } from '@/workers/verify.ts';
|
||||
import { nip05Cache } from '@/utils/nip05.ts';
|
||||
import { updateStats } from '@/utils/stats.ts';
|
||||
@@ -49,7 +48,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
||||
storeEvent(event, signal),
|
||||
parseMetadata(event, signal),
|
||||
DVM.event(event),
|
||||
trackHashtags(event),
|
||||
processMedia(event),
|
||||
streamOut(event),
|
||||
]);
|
||||
@@ -150,25 +148,6 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
|
||||
}
|
||||
}
|
||||
|
||||
/** Track whenever a hashtag is used, for processing trending tags. */
|
||||
async function trackHashtags(event: NostrEvent): Promise<void> {
|
||||
const date = nostrDate(event.created_at);
|
||||
|
||||
const tags = event.tags
|
||||
.filter((tag) => tag[0] === 't')
|
||||
.map((tag) => tag[1])
|
||||
.slice(0, 5);
|
||||
|
||||
if (!tags.length) return;
|
||||
|
||||
try {
|
||||
debug('tracking tags:', JSON.stringify(tags));
|
||||
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
|
||||
} catch (_e) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
/** Delete unattached media entries that are attached to the event. */
|
||||
function processMedia({ tags, pubkey, user }: DittoEvent) {
|
||||
if (user) {
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
import { assertEquals } from '@std/assert';
|
||||
|
||||
import { TrendsWorker } from './trends.ts';
|
||||
|
||||
await TrendsWorker.open(':memory:');
|
||||
|
||||
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
|
||||
|
||||
Deno.test('getTrendingTags', async () => {
|
||||
await TrendsWorker.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']);
|
||||
await TrendsWorker.addTagUsages(p8('00000000'), ['hello']);
|
||||
await TrendsWorker.addTagUsages(p8('00000001'), ['Ditto', 'hello']);
|
||||
await TrendsWorker.addTagUsages(p8('00000010'), ['DITTO']);
|
||||
|
||||
const result = await TrendsWorker.getTrendingTags({
|
||||
since: new Date('1999-01-01T00:00:00'),
|
||||
until: new Date('2999-01-01T00:00:00'),
|
||||
threshold: 1,
|
||||
});
|
||||
|
||||
const expected = [
|
||||
{ tag: 'ditto', accounts: 3, uses: 3 },
|
||||
{ tag: 'hello', accounts: 2, uses: 3 },
|
||||
{ tag: 'yolo', accounts: 1, uses: 1 },
|
||||
];
|
||||
|
||||
assertEquals(result, expected);
|
||||
|
||||
await TrendsWorker.cleanupTagUsages(new Date('2999-01-01T00:00:00'));
|
||||
});
|
||||
@@ -1,19 +0,0 @@
|
||||
import * as Comlink from 'comlink';
|
||||
|
||||
import type { TrendsWorker as _TrendsWorker } from '@/workers/trends.worker.ts';
|
||||
|
||||
const worker = new Worker(new URL('./trends.worker.ts', import.meta.url), { type: 'module' });
|
||||
|
||||
const TrendsWorker = Comlink.wrap<typeof _TrendsWorker>(worker);
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const handleEvent = ({ data }: MessageEvent) => {
|
||||
if (data === 'ready') {
|
||||
worker.removeEventListener('message', handleEvent);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
worker.addEventListener('message', handleEvent);
|
||||
});
|
||||
|
||||
export { TrendsWorker };
|
||||
@@ -1,152 +0,0 @@
|
||||
/// <reference lib="webworker" />
|
||||
import { Database as Sqlite } from '@db/sqlite';
|
||||
import { NSchema } from '@nostrify/nostrify';
|
||||
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
import * as Comlink from 'comlink';
|
||||
|
||||
import { hashtagSchema } from '@/schema.ts';
|
||||
import { generateDateRange, Time } from '@/utils/time.ts';
|
||||
|
||||
interface GetTrendingTagsOpts {
|
||||
since: Date;
|
||||
until: Date;
|
||||
limit?: number;
|
||||
threshold?: number;
|
||||
}
|
||||
|
||||
interface GetTagHistoryOpts {
|
||||
tag: string;
|
||||
since: Date;
|
||||
until: Date;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
}
|
||||
|
||||
interface TagsDB {
|
||||
tag_usages: {
|
||||
tag: string;
|
||||
pubkey8: string;
|
||||
inserted_at: Date;
|
||||
};
|
||||
}
|
||||
|
||||
let kysely: Kysely<TagsDB>;
|
||||
|
||||
export const TrendsWorker = {
|
||||
async open(path: string) {
|
||||
kysely = new Kysely({
|
||||
dialect: new DenoSqlite3Dialect({
|
||||
database: new Sqlite(path),
|
||||
}),
|
||||
});
|
||||
|
||||
await sql`PRAGMA synchronous = normal`.execute(kysely);
|
||||
await sql`PRAGMA temp_store = memory`.execute(kysely);
|
||||
await sql`PRAGMA foreign_keys = ON`.execute(kysely);
|
||||
await sql`PRAGMA auto_vacuum = FULL`.execute(kysely);
|
||||
await sql`PRAGMA journal_mode = WAL`.execute(kysely);
|
||||
await sql`PRAGMA mmap_size = 50000000`.execute(kysely);
|
||||
|
||||
await kysely.schema
|
||||
.createTable('tag_usages')
|
||||
.ifNotExists()
|
||||
.addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`))
|
||||
.addColumn('pubkey8', 'text', (c) => c.notNull())
|
||||
.addColumn('inserted_at', 'integer', (c) => c.notNull())
|
||||
.execute();
|
||||
|
||||
await kysely.schema
|
||||
.createIndex('idx_time_tag')
|
||||
.ifNotExists()
|
||||
.on('tag_usages')
|
||||
.column('inserted_at')
|
||||
.column('tag')
|
||||
.execute();
|
||||
|
||||
Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => {
|
||||
const lastWeek = new Date(new Date().getTime() - Time.days(7));
|
||||
await this.cleanupTagUsages(lastWeek);
|
||||
});
|
||||
},
|
||||
|
||||
/** Gets the most used hashtags between the date range. */
|
||||
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{
|
||||
tag: string;
|
||||
accounts: number;
|
||||
uses: number;
|
||||
}[]> {
|
||||
return kysely.selectFrom('tag_usages')
|
||||
.select(({ fn }) => [
|
||||
'tag',
|
||||
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('inserted_at', '>=', since)
|
||||
.where('inserted_at', '<', until)
|
||||
.groupBy('tag')
|
||||
.having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold))
|
||||
.orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc')
|
||||
.limit(limit)
|
||||
.execute();
|
||||
},
|
||||
|
||||
/**
|
||||
* Gets the tag usage count for a specific tag.
|
||||
* It returns an array with counts for each date between the range.
|
||||
*/
|
||||
async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
||||
const result = await kysely
|
||||
.selectFrom('tag_usages')
|
||||
.select(({ fn }) => [
|
||||
sql<number>`date(inserted_at)`.as('day'),
|
||||
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('tag', '=', tag)
|
||||
.where('inserted_at', '>=', since)
|
||||
.where('inserted_at', '<', until)
|
||||
.groupBy(sql`date(inserted_at)`)
|
||||
.orderBy(sql`date(inserted_at)`, 'desc')
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
.execute();
|
||||
|
||||
/** Full date range between `since` and `until`. */
|
||||
const dateRange = generateDateRange(
|
||||
new Date(since.getTime() + Time.days(1)),
|
||||
new Date(until.getTime() - Time.days(offset)),
|
||||
).reverse();
|
||||
|
||||
// Fill in missing dates with 0 usages.
|
||||
return dateRange.map((day) => {
|
||||
const data = result.find((item) => new Date(item.day).getTime() === day.getTime());
|
||||
if (data) {
|
||||
return { ...data, day: new Date(data.day) };
|
||||
} else {
|
||||
return { day, accounts: 0, uses: 0 };
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date()): Promise<void> {
|
||||
const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8);
|
||||
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
||||
|
||||
await kysely
|
||||
.insertInto('tag_usages')
|
||||
.values(tags.map((tag) => ({ tag, pubkey8, inserted_at })))
|
||||
.execute();
|
||||
},
|
||||
|
||||
async cleanupTagUsages(until: Date): Promise<void> {
|
||||
await kysely
|
||||
.deleteFrom('tag_usages')
|
||||
.where('inserted_at', '<', until)
|
||||
.execute();
|
||||
},
|
||||
};
|
||||
|
||||
Comlink.expose(TrendsWorker);
|
||||
|
||||
self.postMessage('ready');
|
||||
Reference in New Issue
Block a user