mirror of
https://github.com/aljazceru/rabbit.git
synced 2025-12-17 22:14:26 +01:00
refactor: useBatchedEvents
This commit is contained in:
@@ -21,16 +21,19 @@ type ParameterizedReplaceableEventTask = {
|
||||
identifier: string;
|
||||
};
|
||||
|
||||
type TaskArg =
|
||||
| ProfileTask
|
||||
| EventTask
|
||||
| FollowingsTask
|
||||
| ReactionsTask
|
||||
| ZapReceiptsTask
|
||||
| RepostsTask
|
||||
| ParameterizedReplaceableEventTask;
|
||||
type TaskArgs = [
|
||||
ProfileTask,
|
||||
EventTask,
|
||||
FollowingsTask,
|
||||
ReactionsTask,
|
||||
RepostsTask,
|
||||
ZapReceiptsTask,
|
||||
ParameterizedReplaceableEventTask,
|
||||
];
|
||||
|
||||
export class BatchedEventsTask extends ObservableTask<TaskArg, NostrEvent[]> {
|
||||
type TaskArg = TaskArgs[number];
|
||||
|
||||
export class BatchedEventsTask<T = TaskArg> extends ObservableTask<T, NostrEvent[]> {
|
||||
addEvent(event: NostrEvent) {
|
||||
this.updateWith((current) => utils.insertEventIntoDescendingList(current ?? [], event));
|
||||
}
|
||||
@@ -48,6 +51,11 @@ export class BatchedEventsTask extends ObservableTask<TaskArg, NostrEvent[]> {
|
||||
}
|
||||
}
|
||||
|
||||
const isBatchedEventsTaskOf =
|
||||
<T extends TaskArg>(taskType: T['type']) =>
|
||||
(task: BatchedEventsTask): task is BatchedEventsTask<T> =>
|
||||
task.req.type === taskType;
|
||||
|
||||
let count = 0;
|
||||
|
||||
const { setActiveBatchSubscriptions } = useStats();
|
||||
@@ -63,91 +71,205 @@ const keyForParameterizedReplaceableEvent = ({
|
||||
kind,
|
||||
author,
|
||||
identifier,
|
||||
}: ParameterizedReplaceableEventTask) => `${kind}:${author}:${identifier}`;
|
||||
}: {
|
||||
kind: number;
|
||||
author: string;
|
||||
identifier: string;
|
||||
}) => `${kind}:${author}:${identifier}`;
|
||||
|
||||
type AggregatedTasks<T extends TaskArg> = {
|
||||
tasks: Map<string, BatchedEventsTask<T>[]>;
|
||||
add: (task: BatchedEventsTask<T>) => void;
|
||||
buildFilter: () => Filter[];
|
||||
resolve: (event: NostrEvent) => boolean;
|
||||
};
|
||||
|
||||
const createTasks = <T extends TaskArg>({
|
||||
keyExtractor,
|
||||
filtersBuilder,
|
||||
eventKeyExtractor,
|
||||
}: {
|
||||
keyExtractor: (e: T) => string;
|
||||
filtersBuilder: (keys: string[]) => Filter[];
|
||||
eventKeyExtractor: (e: NostrEvent) => string | null | undefined;
|
||||
}): AggregatedTasks<T> => {
|
||||
const tasks = new Map<string, BatchedEventsTask<T>[]>();
|
||||
|
||||
const add = (task: BatchedEventsTask<T>) => {
|
||||
const key = keyExtractor(task.req);
|
||||
const current = tasks.get(key) ?? [];
|
||||
tasks.set(key, [...current, task]);
|
||||
};
|
||||
|
||||
const buildFilter = (): Filter[] => {
|
||||
const keys = Array.from(tasks.keys());
|
||||
if (keys.length === 0) {
|
||||
return [];
|
||||
}
|
||||
return filtersBuilder(keys);
|
||||
};
|
||||
|
||||
const resolve = (event: NostrEvent): boolean => {
|
||||
const key = eventKeyExtractor(event);
|
||||
|
||||
if (key == null) return false;
|
||||
|
||||
const foundTasks = tasks.get(key) ?? [];
|
||||
|
||||
if (foundTasks.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
foundTasks.forEach((task) => {
|
||||
task.addEvent(event);
|
||||
});
|
||||
return true;
|
||||
};
|
||||
|
||||
return { tasks, add, buildFilter, resolve };
|
||||
};
|
||||
|
||||
export const tasksRequestBuilder = (tasks: BatchedEventsTask[]) => {
|
||||
const eventTasks = createTasks<EventTask>({
|
||||
keyExtractor: (req) => req.eventId,
|
||||
filtersBuilder: (ids) => [{ ids }],
|
||||
eventKeyExtractor: (ev) => ev.id,
|
||||
});
|
||||
const profileTasks = createTasks<ProfileTask>({
|
||||
keyExtractor: (req) => req.pubkey,
|
||||
filtersBuilder: (authors) => [{ kinds: [Kind.Metadata], authors }],
|
||||
eventKeyExtractor: (ev) => ev.pubkey,
|
||||
});
|
||||
const followingsTasks = createTasks<FollowingsTask>({
|
||||
keyExtractor: (req) => req.pubkey,
|
||||
filtersBuilder: (authors) => [{ kinds: [Kind.Contacts], authors }],
|
||||
eventKeyExtractor: (ev) => ev.pubkey,
|
||||
});
|
||||
const repostsTasks = createTasks<RepostsTask>({
|
||||
keyExtractor: (req) => req.mentionedEventId,
|
||||
filtersBuilder: (ids) => [{ kinds: [Kind.Repost], '#e': ids }],
|
||||
// Use the last event id for compatibility
|
||||
eventKeyExtractor: (ev) => genericEvent(ev).lastTaggedEventId(),
|
||||
});
|
||||
const reactionsTasks = createTasks<ReactionsTask>({
|
||||
keyExtractor: (req) => req.mentionedEventId,
|
||||
filtersBuilder: (ids) => [{ kinds: [Kind.Reaction], '#e': ids }],
|
||||
// Use the last event id for compatibility
|
||||
eventKeyExtractor: (ev) => genericEvent(ev).lastTaggedEventId(),
|
||||
});
|
||||
const zapReceiptsTasks = createTasks<ZapReceiptsTask>({
|
||||
keyExtractor: (req) => req.mentionedEventId,
|
||||
filtersBuilder: (ids) => [{ kinds: [Kind.Zap], '#e': ids }],
|
||||
eventKeyExtractor: (ev) => genericEvent(ev).lastTaggedEventId(),
|
||||
});
|
||||
const parameterizedReplaceableEventsTasks = createTasks<ParameterizedReplaceableEventTask>({
|
||||
keyExtractor: keyForParameterizedReplaceableEvent,
|
||||
filtersBuilder: (keys) => {
|
||||
const result: Filter[] = [];
|
||||
keys.forEach((key) => {
|
||||
const task = parameterizedReplaceableEventsTasks.tasks.get(key)?.[0];
|
||||
if (task == null) return;
|
||||
const { kind, author, identifier } = task.req;
|
||||
result.push({ kinds: [kind], authors: [author], '#d': [identifier] });
|
||||
});
|
||||
return result;
|
||||
},
|
||||
eventKeyExtractor: (ev) => {
|
||||
const identifier = genericEvent(ev).findFirstTagByName('d')?.[1];
|
||||
if (identifier == null) return undefined;
|
||||
return keyForParameterizedReplaceableEvent({
|
||||
kind: ev.kind,
|
||||
author: ev.pubkey,
|
||||
identifier,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
const add = (task: BatchedEventsTask) => {
|
||||
if (isBatchedEventsTaskOf<EventTask>('Event')(task)) {
|
||||
eventTasks.add(task);
|
||||
} else if (isBatchedEventsTaskOf<ProfileTask>('Profile')(task)) {
|
||||
profileTasks.add(task);
|
||||
} else if (isBatchedEventsTaskOf<FollowingsTask>('Followings')(task)) {
|
||||
followingsTasks.add(task);
|
||||
} else if (isBatchedEventsTaskOf<RepostsTask>('Reposts')(task)) {
|
||||
repostsTasks.add(task);
|
||||
} else if (isBatchedEventsTaskOf<ReactionsTask>('Reactions')(task)) {
|
||||
reactionsTasks.add(task);
|
||||
} else if (isBatchedEventsTaskOf<ZapReceiptsTask>('ZapReceipts')(task)) {
|
||||
zapReceiptsTasks.add(task);
|
||||
} else if (
|
||||
isBatchedEventsTaskOf<ParameterizedReplaceableEventTask>('ParameterizedReplaceableEvent')(
|
||||
task,
|
||||
)
|
||||
) {
|
||||
parameterizedReplaceableEventsTasks.add(task);
|
||||
} else {
|
||||
throw new Error(`unknown task: ${task.req.type}`);
|
||||
}
|
||||
};
|
||||
|
||||
const buildFilters = (): Filter[] => [
|
||||
...eventTasks.buildFilter(),
|
||||
...profileTasks.buildFilter(),
|
||||
...followingsTasks.buildFilter(),
|
||||
...repostsTasks.buildFilter(),
|
||||
...reactionsTasks.buildFilter(),
|
||||
...zapReceiptsTasks.buildFilter(),
|
||||
...parameterizedReplaceableEventsTasks.buildFilter(),
|
||||
];
|
||||
|
||||
const resolve = (event: NostrEvent) => {
|
||||
if (event.kind === (Kind.Metadata as number)) {
|
||||
if (profileTasks.resolve(event)) return;
|
||||
}
|
||||
if (event.kind === (Kind.Contacts as number)) {
|
||||
if (followingsTasks.resolve(event)) return;
|
||||
}
|
||||
if (event.kind === (Kind.Repost as number)) {
|
||||
if (repostsTasks.resolve(event)) return;
|
||||
}
|
||||
if (event.kind === (Kind.Reaction as number)) {
|
||||
if (reactionsTasks.resolve(event)) return;
|
||||
}
|
||||
if (event.kind === (Kind.Zap as number)) {
|
||||
if (zapReceiptsTasks.resolve(event)) return;
|
||||
}
|
||||
if (isParameterizedReplaceableEvent(event)) {
|
||||
if (parameterizedReplaceableEventsTasks.resolve(event)) return;
|
||||
}
|
||||
eventTasks.resolve(event);
|
||||
};
|
||||
|
||||
tasks.forEach((task) => {
|
||||
add(task);
|
||||
});
|
||||
|
||||
return {
|
||||
tasks: {
|
||||
eventTasks,
|
||||
profileTasks,
|
||||
followingsTasks,
|
||||
repostsTasks,
|
||||
reactionsTasks,
|
||||
zapReceiptsTasks,
|
||||
parameterizedReplaceableEventsTasks,
|
||||
},
|
||||
add,
|
||||
buildFilters,
|
||||
resolve,
|
||||
};
|
||||
};
|
||||
|
||||
const { addTask, removeTask } = useBatch<BatchedEventsTask>(() => ({
|
||||
interval: 2000,
|
||||
batchSize: 150,
|
||||
executor: (tasks) => {
|
||||
const profileTasks = new Map<string, BatchedEventsTask[]>();
|
||||
const eventTasks = new Map<string, BatchedEventsTask[]>();
|
||||
const reactionsTasks = new Map<string, BatchedEventsTask[]>();
|
||||
const repostsTasks = new Map<string, BatchedEventsTask[]>();
|
||||
const zapReceiptsTasks = new Map<string, BatchedEventsTask[]>();
|
||||
const parameterizedReplaceableEventsTasks = new Map<string, BatchedEventsTask[]>();
|
||||
const followingsTasks = new Map<string, BatchedEventsTask[]>();
|
||||
|
||||
tasks.forEach((task) => {
|
||||
if (task.req.type === 'Event') {
|
||||
const current = eventTasks.get(task.req.eventId) ?? [];
|
||||
eventTasks.set(task.req.eventId, [...current, task]);
|
||||
} else if (task.req.type === 'Profile') {
|
||||
const current = profileTasks.get(task.req.pubkey) ?? [];
|
||||
profileTasks.set(task.req.pubkey, [...current, task]);
|
||||
} else if (task.req.type === 'Reactions') {
|
||||
const current = reactionsTasks.get(task.req.mentionedEventId) ?? [];
|
||||
reactionsTasks.set(task.req.mentionedEventId, [...current, task]);
|
||||
} else if (task.req.type === 'Reposts') {
|
||||
const current = repostsTasks.get(task.req.mentionedEventId) ?? [];
|
||||
repostsTasks.set(task.req.mentionedEventId, [...current, task]);
|
||||
} else if (task.req.type === 'ZapReceipts') {
|
||||
const current = zapReceiptsTasks.get(task.req.mentionedEventId) ?? [];
|
||||
repostsTasks.set(task.req.mentionedEventId, [...current, task]);
|
||||
} else if (task.req.type === 'ParameterizedReplaceableEvent') {
|
||||
const key = keyForParameterizedReplaceableEvent(task.req);
|
||||
const current = parameterizedReplaceableEventsTasks.get(key) ?? [];
|
||||
parameterizedReplaceableEventsTasks.set(key, [...current, task]);
|
||||
} else if (task.req.type === 'Followings') {
|
||||
const current = followingsTasks.get(task.req.pubkey) ?? [];
|
||||
followingsTasks.set(task.req.pubkey, [...current, task]);
|
||||
}
|
||||
});
|
||||
|
||||
const eventIds = [...eventTasks.keys()];
|
||||
const profilePubkeys = [...profileTasks.keys()];
|
||||
const reactionsIds = [...reactionsTasks.keys()];
|
||||
const repostsIds = [...repostsTasks.keys()];
|
||||
const zapReceiptsIds = [...zapReceiptsTasks.keys()];
|
||||
const followingsIds = [...followingsTasks.keys()];
|
||||
|
||||
const filters: Filter[] = [];
|
||||
|
||||
if (eventIds.length > 0) {
|
||||
filters.push({ ids: eventIds });
|
||||
}
|
||||
if (profilePubkeys.length > 0) {
|
||||
filters.push({ kinds: [Kind.Metadata], authors: profilePubkeys });
|
||||
}
|
||||
if (reactionsIds.length > 0) {
|
||||
filters.push({ kinds: [Kind.Reaction], '#e': reactionsIds });
|
||||
}
|
||||
if (repostsIds.length > 0) {
|
||||
filters.push({ kinds: [6], '#e': repostsIds });
|
||||
}
|
||||
if (zapReceiptsIds.length > 0) {
|
||||
filters.push({ kinds: [9735], '#e': zapReceiptsIds });
|
||||
}
|
||||
if (followingsIds.length > 0) {
|
||||
filters.push({ kinds: [Kind.Contacts], authors: followingsIds });
|
||||
}
|
||||
if (parameterizedReplaceableEventsTasks.size > 0) {
|
||||
Array.from(parameterizedReplaceableEventsTasks.values()).forEach(([firstTask]) => {
|
||||
if (firstTask.req.type !== 'ParameterizedReplaceableEvent') return;
|
||||
const {
|
||||
req: { kind, author, identifier },
|
||||
} = firstTask;
|
||||
filters.push({ kinds: [kind], authors: [author], '#d': [identifier] });
|
||||
});
|
||||
}
|
||||
const builder = tasksRequestBuilder(tasks);
|
||||
const filters = builder.buildFilters();
|
||||
|
||||
if (filters.length === 0) return;
|
||||
|
||||
const resolveTasks = (registeredTasks: BatchedEventsTask[], event: NostrEvent) => {
|
||||
registeredTasks.forEach((task) => {
|
||||
task.addEvent(event);
|
||||
});
|
||||
};
|
||||
|
||||
const finalizeTasks = () => {
|
||||
tasks.forEach((task) => {
|
||||
task.complete();
|
||||
@@ -162,52 +284,7 @@ const { addTask, removeTask } = useBatch<BatchedEventsTask>(() => ({
|
||||
count += 1;
|
||||
|
||||
sub.on('event', (event: NostrEvent & { id: string }) => {
|
||||
if (event.kind === (Kind.Metadata as number)) {
|
||||
const registeredTasks = profileTasks.get(event.pubkey) ?? [];
|
||||
resolveTasks(registeredTasks, event);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.kind === (Kind.Reaction as number)) {
|
||||
// Use the last event id
|
||||
const id = genericEvent(event).lastTaggedEventId();
|
||||
if (id != null) {
|
||||
const registeredTasks = reactionsTasks.get(id) ?? [];
|
||||
resolveTasks(registeredTasks, event);
|
||||
}
|
||||
} else if (event.kind === (Kind.Repost as number)) {
|
||||
// Use the last event id
|
||||
const id = genericEvent(event).lastTaggedEventId();
|
||||
if (id != null) {
|
||||
const registeredTasks = repostsTasks.get(id) ?? [];
|
||||
resolveTasks(registeredTasks, event);
|
||||
}
|
||||
} else if (event.kind === (Kind.Zap as number)) {
|
||||
const eTags = genericEvent(event).eTags();
|
||||
eTags.forEach(([, id]) => {
|
||||
const registeredTasks = repostsTasks.get(id) ?? [];
|
||||
resolveTasks(registeredTasks, event);
|
||||
});
|
||||
} else if (event.kind === (Kind.Contacts as number)) {
|
||||
const registeredTasks = followingsTasks.get(event.pubkey) ?? [];
|
||||
resolveTasks(registeredTasks, event);
|
||||
} else if (isParameterizedReplaceableEvent(event)) {
|
||||
const identifier = genericEvent(event).findFirstTagByName('d')?.[1];
|
||||
if (identifier != null) {
|
||||
const key = `${event.kind}:${event.pubkey}:${identifier}`;
|
||||
const registeredTasks = parameterizedReplaceableEventsTasks.get(key) ?? [];
|
||||
resolveTasks(registeredTasks, event);
|
||||
} else {
|
||||
console.warn('identifier is undefined');
|
||||
}
|
||||
} else {
|
||||
const registeredTasks = eventTasks.get(event.id) ?? [];
|
||||
if (registeredTasks.length > 0) {
|
||||
resolveTasks(registeredTasks, event);
|
||||
} else {
|
||||
console.warn('unknown event received');
|
||||
}
|
||||
}
|
||||
builder.resolve(event);
|
||||
});
|
||||
|
||||
sub.on('eose', () => {
|
||||
@@ -222,7 +299,7 @@ export const registerTask = ({
|
||||
task,
|
||||
signal,
|
||||
}: {
|
||||
task: BatchedEventsTask;
|
||||
task: BatchedEventsTask<TaskArg>;
|
||||
signal?: AbortSignal;
|
||||
}) => {
|
||||
addTask(task);
|
||||
|
||||
Reference in New Issue
Block a user