refactor: batching

This commit is contained in:
Shusui MOYATANI
2023-06-19 00:43:34 +09:00
parent 60b6718dbb
commit ab34bef0d6
10 changed files with 212 additions and 291 deletions

View File

@@ -1,112 +0,0 @@
import { createSignal, createMemo } from 'solid-js';
export type Task<TaskArgs, TaskResult> = {
id: number;
args: TaskArgs;
resolve: (result: TaskResult) => void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
reject: (error: any) => void;
};
export type UseBatchProps<TaskArgs, TaskResult> = {
executor: (tasks: Task<TaskArgs, TaskResult>[]) => void;
interval?: number;
batchSize?: number;
};
export type PromiseWithCallbacks<T> = {
promise: Promise<T>;
resolve: (e: T) => void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
reject: (e: any) => void;
};
const promiseWithCallbacks = <T>(): PromiseWithCallbacks<T> => {
let resolve: ((e: T) => void) | undefined;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let reject: ((e: any) => void) | undefined;
const promise = new Promise<T>((resolveFn, rejectFn) => {
resolve = resolveFn;
reject = rejectFn;
});
if (resolve == null || reject == null) {
throw new Error('PromiseWithCallbacks failed to extract callbacks');
}
return { promise, resolve, reject };
};
const useBatch = <TaskArgs, TaskResult>(
propsProvider: () => UseBatchProps<TaskArgs, TaskResult>,
) => {
const props = createMemo(propsProvider);
const batchSize = createMemo(() => props().batchSize ?? 100);
const interval = createMemo(() => props().interval ?? 2000);
const [seqId, setSeqId] = createSignal<number>(0);
const [taskQueue, setTaskQueue] = createSignal<Task<TaskArgs, TaskResult>[]>([]);
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const executeTasks = () => {
const { executor } = props();
const currentTaskQueue = taskQueue();
if (currentTaskQueue.length > 0) {
setTaskQueue([]);
executor(currentTaskQueue);
}
if (timeoutId != null) clearTimeout(timeoutId);
timeoutId = undefined;
};
const nextId = (): number => {
const id = seqId();
setSeqId((currentId) => currentId + 1);
return id;
};
const launchTimer = () => {
if (timeoutId == null) {
timeoutId = setTimeout(() => {
executeTasks();
}, interval());
}
};
const addTask = (task: Task<TaskArgs, TaskResult>) => {
if (taskQueue().length < batchSize()) {
setTaskQueue((currentTaskQueue) => [...currentTaskQueue, task]);
} else {
executeTasks();
setTaskQueue([task]);
}
};
const removeTask = (id: number) => {
setTaskQueue((currentTaskQueue) => currentTaskQueue.filter((task) => task.id !== id));
};
// enqueue task and wait response
const exec = async (args: TaskArgs, signal?: AbortSignal): Promise<TaskResult> => {
const { promise, resolve, reject } = promiseWithCallbacks<TaskResult>();
const id = nextId();
const newTask: Task<TaskArgs, TaskResult> = { id, args, resolve, reject };
addTask(newTask);
launchTimer();
signal?.addEventListener('abort', () => {
removeTask(id);
reject(new Error('AbortError'));
});
return promise;
};
return { exec };
};
export default useBatch;

View File

@@ -4,9 +4,10 @@ import { type Event as NostrEvent, type Filter, Kind } from 'nostr-tools';
import useConfig from '@/core/useConfig';
import { genericEvent } from '@/nostr/event';
import useBatch, { type Task } from '@/nostr/useBatch';
import usePool from '@/nostr/usePool';
import useStats from '@/nostr/useStats';
import ObservableTask from '@/utils/batch/ObservableTask';
import useBatch from '@/utils/batch/useBatch';
type ProfileTask = { type: 'Profile'; pubkey: string };
type EventTask = { type: 'Event'; eventId: string };
@@ -24,15 +25,21 @@ type ParameterizedReplaceableEventTask = {
type TaskArg =
| ProfileTask
| EventTask
| FollowingsTask
| ReactionsTask
| ZapReceiptsTask
| RepostsTask
| FollowingsTask
| ParameterizedReplaceableEventTask;
type BatchedEvents = { completed: boolean; events: NostrEvent[] };
export class BatchedEventsTask extends ObservableTask<TaskArg, NostrEvent[]> {
addEvent(event: NostrEvent) {
this.updateWith((current) => [...(current ?? []), event]);
}
type TaskRes = Accessor<BatchedEvents>;
firstEventPromise(): Promise<NostrEvent> {
return this.toUpdatePromise().then((events) => events[0]);
}
}
let count = 0;
@@ -42,9 +49,6 @@ setInterval(() => {
setActiveBatchSubscriptions(count);
}, 1000);
const EmptyBatchedEvents = { events: [], completed: true };
const emptyBatchedEvents = () => EmptyBatchedEvents;
const isParameterizedReplaceableEvent = (event: NostrEvent) =>
event.kind >= 30000 && event.kind < 40000;
@@ -54,41 +58,41 @@ const keyForParameterizedReplaceableEvent = ({
identifier,
}: ParameterizedReplaceableEventTask) => `${kind}:${author}:${identifier}`;
export const { exec } = useBatch<TaskArg, TaskRes>(() => ({
const { addTask, removeTask } = useBatch<BatchedEventsTask>(() => ({
interval: 2000,
batchSize: 150,
executor: (tasks) => {
const profileTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const eventTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const reactionsTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const repostsTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const zapReceiptsTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const parameterizedReplaceableEventsTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const followingsTasks = new Map<string, Task<TaskArg, TaskRes>[]>();
const profileTasks = new Map<string, BatchedEventsTask[]>();
const eventTasks = new Map<string, BatchedEventsTask[]>();
const reactionsTasks = new Map<string, BatchedEventsTask[]>();
const repostsTasks = new Map<string, BatchedEventsTask[]>();
const zapReceiptsTasks = new Map<string, BatchedEventsTask[]>();
const parameterizedReplaceableEventsTasks = new Map<string, BatchedEventsTask[]>();
const followingsTasks = new Map<string, BatchedEventsTask[]>();
tasks.forEach((task) => {
if (task.args.type === 'Event') {
const current = eventTasks.get(task.args.eventId) ?? [];
eventTasks.set(task.args.eventId, [...current, task]);
} else if (task.args.type === 'Profile') {
const current = profileTasks.get(task.args.pubkey) ?? [];
profileTasks.set(task.args.pubkey, [...current, task]);
} else if (task.args.type === 'Reactions') {
const current = reactionsTasks.get(task.args.mentionedEventId) ?? [];
reactionsTasks.set(task.args.mentionedEventId, [...current, task]);
} else if (task.args.type === 'Reposts') {
const current = repostsTasks.get(task.args.mentionedEventId) ?? [];
repostsTasks.set(task.args.mentionedEventId, [...current, task]);
} else if (task.args.type === 'ZapReceipts') {
const current = zapReceiptsTasks.get(task.args.mentionedEventId) ?? [];
repostsTasks.set(task.args.mentionedEventId, [...current, task]);
} else if (task.args.type === 'ParameterizedReplaceableEvent') {
const key = keyForParameterizedReplaceableEvent(task.args);
if (task.req.type === 'Event') {
const current = eventTasks.get(task.req.eventId) ?? [];
eventTasks.set(task.req.eventId, [...current, task]);
} else if (task.req.type === 'Profile') {
const current = profileTasks.get(task.req.pubkey) ?? [];
profileTasks.set(task.req.pubkey, [...current, task]);
} else if (task.req.type === 'Reactions') {
const current = reactionsTasks.get(task.req.mentionedEventId) ?? [];
reactionsTasks.set(task.req.mentionedEventId, [...current, task]);
} else if (task.req.type === 'Reposts') {
const current = repostsTasks.get(task.req.mentionedEventId) ?? [];
repostsTasks.set(task.req.mentionedEventId, [...current, task]);
} else if (task.req.type === 'ZapReceipts') {
const current = zapReceiptsTasks.get(task.req.mentionedEventId) ?? [];
repostsTasks.set(task.req.mentionedEventId, [...current, task]);
} else if (task.req.type === 'ParameterizedReplaceableEvent') {
const key = keyForParameterizedReplaceableEvent(task.req);
const current = parameterizedReplaceableEventsTasks.get(key) ?? [];
parameterizedReplaceableEventsTasks.set(key, [...current, task]);
} else if (task.args.type === 'Followings') {
const current = followingsTasks.get(task.args.pubkey) ?? [];
followingsTasks.set(task.args.pubkey, [...current, task]);
} else if (task.req.type === 'Followings') {
const current = followingsTasks.get(task.req.pubkey) ?? [];
followingsTasks.set(task.req.pubkey, [...current, task]);
}
});
@@ -121,9 +125,9 @@ export const { exec } = useBatch<TaskArg, TaskRes>(() => ({
}
if (parameterizedReplaceableEventsTasks.size > 0) {
Array.from(parameterizedReplaceableEventsTasks.values()).forEach(([firstTask]) => {
if (firstTask.args.type !== 'ParameterizedReplaceableEvent') return;
if (firstTask.req.type !== 'ParameterizedReplaceableEvent') return;
const {
args: { kind, author, identifier },
req: { kind, author, identifier },
} = firstTask;
filters.push({ kinds: [kind], authors: [author], '#d': [identifier] });
});
@@ -131,30 +135,15 @@ export const { exec } = useBatch<TaskArg, TaskRes>(() => ({
if (filters.length === 0) return;
const signals = new Map<number, Signal<BatchedEvents>>();
const resolveTasks = (registeredTasks: Task<TaskArg, TaskRes>[], event: NostrEvent) => {
const resolveTasks = (registeredTasks: BatchedEventsTask[], event: NostrEvent) => {
registeredTasks.forEach((task) => {
const signal = signals.get(task.id) ?? createSignal({ events: [], completed: false });
signals.set(task.id, signal);
const [batchedEvents, setBatchedEvents] = signal;
setBatchedEvents((current) => ({
...current,
events: [...current.events, event],
}));
task.resolve(batchedEvents);
task.updateWith((current) => [...(current ?? []), event]);
});
};
const finalizeTasks = () => {
tasks.forEach((task) => {
const signal = signals.get(task.id);
if (signal != null) {
const setEvents = signal[1];
setEvents((current) => ({ ...current, completed: true }));
} else {
task.resolve(emptyBatchedEvents);
}
task.complete();
});
};
@@ -222,6 +211,17 @@ export const { exec } = useBatch<TaskArg, TaskRes>(() => ({
},
}));
export const registerTask = ({
task,
signal,
}: {
task: BatchedEventsTask;
signal?: AbortSignal;
}) => {
addTask(task);
signal?.addEventListener('abort', () => removeTask(task));
};
export const pickLatestEvent = (events: NostrEvent[]): NostrEvent | null => {
if (events.length === 0) return null;
return events.reduce((a, b) => (a.created_at > b.created_at ? a : b));

View File

@@ -3,7 +3,7 @@ import { createMemo } from 'solid-js';
import { createQuery, useQueryClient, type CreateQueryResult } from '@tanstack/solid-query';
import { Event as NostrEvent } from 'nostr-tools';
import { exec } from '@/nostr/useBatchedEvents';
import { registerTask, BatchedEventsTask } from '@/nostr/useBatchedEvents';
import timeout from '@/utils/timeout';
export type UseEventProps = {
@@ -24,11 +24,11 @@ const useEvent = (propsProvider: () => UseEventProps | null): UseEvent => {
const [, currentProps] = queryKey;
if (currentProps == null) return null;
const { eventId } = currentProps;
const promise = exec({ type: 'Event', eventId }, signal).then((batchedEvents) => {
const event = batchedEvents().events[0];
if (event == null) throw new Error(`event not found: ${eventId}`);
return event;
const task = new BatchedEventsTask({ type: 'Event', eventId });
const promise = task.firstEventPromise().catch(() => {
throw new Error(`event not found: ${eventId}`);
});
registerTask({ task, signal });
return timeout(15000, `useEvent: ${eventId}`)(promise);
},
{

View File

@@ -4,7 +4,7 @@ import { createQuery, useQueryClient, type CreateQueryResult } from '@tanstack/s
import { Event as NostrEvent } from 'nostr-tools';
import { genericEvent } from '@/nostr/event';
import { exec, pickLatestEvent } from '@/nostr/useBatchedEvents';
import { registerTask, BatchedEventsTask, pickLatestEvent } from '@/nostr/useBatchedEvents';
import timeout from '@/utils/timeout';
type Following = {
@@ -36,21 +36,15 @@ const useFollowings = (propsProvider: () => UseFollowingsProps | null): UseFollo
const [, currentProps] = queryKey;
if (currentProps == null) return Promise.resolve(null);
const { pubkey } = currentProps;
const promise = exec({ type: 'Followings', pubkey }, signal).then((batchedEvents) => {
const latestEvent = () => {
const latest = pickLatestEvent(batchedEvents().events);
if (latest == null) throw new Error(`followings not found: ${pubkey}`);
return latest;
};
observable(batchedEvents).subscribe(() => {
try {
queryClient.setQueryData(queryKey, latestEvent());
} catch (err) {
console.error('error occurred while updating followings cache: ', err);
}
const task = new BatchedEventsTask({ type: 'Followings', pubkey });
const promise = task.firstEventPromise().catch(() => {
throw new Error(`followings not found: ${pubkey}`);
});
return latestEvent();
task.onUpdate((events) => {
const latest = pickLatestEvent(events);
queryClient.setQueryData(queryKey, latest);
});
registerTask({ task, signal });
return timeout(15000, `useFollowings: ${pubkey}`)(promise);
},
{

View File

@@ -3,7 +3,7 @@ import { createMemo, observable } from 'solid-js';
import { createQuery, useQueryClient, type CreateQueryResult } from '@tanstack/solid-query';
import { Event as NostrEvent } from 'nostr-tools';
import { exec, pickLatestEvent } from '@/nostr/useBatchedEvents';
import { registerTask, BatchedEventsTask, pickLatestEvent } from '@/nostr/useBatchedEvents';
import timeout from '@/utils/timeout';
// Parameterized Replaceable Event
@@ -33,32 +33,22 @@ const useParameterizedReplaceableEvent = (
if (currentProps == null) return Promise.resolve(null);
const { kind, author, identifier } = currentProps;
const promise = exec(
{ type: 'ParameterizedReplaceableEvent', kind, author, identifier },
signal,
).then((batchedEvents) => {
const latestEvent = () => {
const latest = pickLatestEvent(batchedEvents().events);
if (latest == null)
const task = new BatchedEventsTask({
type: 'ParameterizedReplaceableEvent',
kind,
author,
identifier,
});
const promise = task.firstEventPromise().catch(() => {
throw new Error(
`parameterized replaceable event not found: ${kind}:${author}:${identifier}`,
);
return latest;
};
observable(batchedEvents).subscribe(() => {
try {
queryClient.setQueryData(queryKey, latestEvent());
} catch (err) {
console.error(
'error occurred while updating parameterized replaceable event cache: ',
err,
);
}
});
return latestEvent();
task.onUpdate((events) => {
const latest = pickLatestEvent(events);
queryClient.setQueryData(queryKey, latest);
});
registerTask({ task, signal });
return timeout(
15000,
`useParameterizedReplaceableEvent: ${kind}:${author}:${identifier}`,

View File

@@ -9,7 +9,7 @@ import {
import { Event as NostrEvent } from 'nostr-tools';
import { Profile, ProfileWithOtherProperties, safeParseProfile } from '@/nostr/event/Profile';
import { exec, pickLatestEvent } from '@/nostr/useBatchedEvents';
import { BatchedEventsTask, pickLatestEvent, registerTask } from '@/nostr/useBatchedEvents';
import timeout from '@/utils/timeout';
export type UseProfileProps = {
@@ -33,39 +33,6 @@ export type UseProfiles = {
type UseProfileQueryKey = readonly ['useProfile', UseProfileProps | null];
const getProfile = ({
queryKey,
signal,
queryClient,
}: {
queryKey: UseProfileQueryKey;
signal?: AbortSignal;
queryClient: QueryClient;
}): Promise<NostrEvent | null> => {
const [, currentProps] = queryKey;
if (currentProps == null) return Promise.resolve(null);
const { pubkey } = currentProps;
const promise = exec({ type: 'Profile', pubkey }, signal).then((batchedEvents) => {
const latestEvent = () => {
const latest = pickLatestEvent(batchedEvents().events);
if (latest == null) throw new Error(`profile not found: ${pubkey}`);
return latest;
};
observable(batchedEvents).subscribe(() => {
try {
queryClient.setQueryData(queryKey, latestEvent());
} catch (err) {
console.error('error occurred while updating profile cache: ', err);
}
});
return latestEvent();
});
// TODO timeoutと同時にsignalでキャンセルするようにしたい
return timeout(3000, `useProfile: ${pubkey}`)(promise);
};
const useProfile = (propsProvider: () => UseProfileProps | null): UseProfile => {
const queryClient = useQueryClient();
const props = createMemo(propsProvider);
@@ -73,7 +40,21 @@ const useProfile = (propsProvider: () => UseProfileProps | null): UseProfile =>
const query = createQuery(
genQueryKey,
({ queryKey, signal }) => getProfile({ queryKey, signal, queryClient }),
({ queryKey, signal }) => {
const [, currentProps] = queryKey;
if (currentProps == null) return null;
const { pubkey } = currentProps;
const task = new BatchedEventsTask({ type: 'Profile', pubkey });
const promise = task.firstEventPromise().catch(() => {
throw new Error(`profile not found: ${pubkey}`);
});
task.onUpdate((events) => {
const latest = pickLatestEvent(events);
queryClient.setQueryData(queryKey, latest);
});
registerTask({ task, signal });
return timeout(3000, `useProfile: ${pubkey}`)(promise);
},
{
// Profiles are updated occasionally, so a short staleTime is used here.
// cacheTime is long so that the user see profiles instantly.

View File

@@ -4,7 +4,7 @@ import { createQuery, useQueryClient, type CreateQueryResult } from '@tanstack/s
import { Event as NostrEvent } from 'nostr-tools';
import useConfig from '@/core/useConfig';
import { exec } from '@/nostr/useBatchedEvents';
import { registerTask, BatchedEventsTask } from '@/nostr/useBatchedEvents';
import timeout from '@/utils/timeout';
export type UseReactionsProps = {
@@ -33,17 +33,13 @@ const useReactions = (propsProvider: () => UseReactionsProps | null): UseReactio
({ queryKey, signal }) => {
const [, currentProps] = queryKey;
if (currentProps == null) return [];
const { eventId: mentionedEventId } = currentProps;
const promise = exec({ type: 'Reactions', mentionedEventId }, signal).then(
(batchedEvents) => {
const events = () => batchedEvents().events;
observable(batchedEvents).subscribe(() => {
queryClient.setQueryData(queryKey, events());
const task = new BatchedEventsTask({ type: 'Reactions', mentionedEventId });
const promise = task.toUpdatePromise().catch(() => []);
task.onUpdate((events) => {
queryClient.setQueryData(queryKey, events);
});
return events();
},
);
registerTask({ task, signal });
return timeout(15000, `useReactions: ${mentionedEventId}`)(promise);
},
{

View File

@@ -4,7 +4,7 @@ import { createQuery, useQueryClient, type CreateQueryResult } from '@tanstack/s
import { Event as NostrEvent } from 'nostr-tools';
import useConfig from '@/core/useConfig';
import { exec } from '@/nostr/useBatchedEvents';
import { BatchedEventsTask, exec, registerTask } from '@/nostr/useBatchedEvents';
import timeout from '@/utils/timeout';
export type UseRepostsProps = {
@@ -30,13 +30,12 @@ const useReposts = (propsProvider: () => UseRepostsProps): UseReposts => {
const [, currentProps] = queryKey;
if (currentProps == null) return [];
const { eventId: mentionedEventId } = currentProps;
const promise = exec({ type: 'Reposts', mentionedEventId }, signal).then((batchedEvents) => {
const events = () => batchedEvents().events;
observable(batchedEvents).subscribe(() => {
queryClient.setQueryData(queryKey, events());
});
return events();
const task = new BatchedEventsTask({ type: 'Reposts', mentionedEventId });
const promise = task.toUpdatePromise().catch(() => []);
task.onUpdate((events) => {
queryClient.setQueryData(queryKey, events);
});
registerTask({ task, signal });
return timeout(15000, `useReposts: ${mentionedEventId}`)(promise);
},
{

View File

@@ -36,46 +36,41 @@ export default class ObservableTask<BatchRequest, BatchResponse> {
#completeListeners: (() => void)[] = [];
#promise: Promise<BatchResponse>;
constructor(req: BatchRequest) {
this.id = nextId();
this.req = req;
this.#promise = new Promise((resolve, reject) => {
this.onComplete(() => {
if (this.res != null) {
resolve(this.res);
} else {
reject();
}
});
});
}
#executeUpdateListeners() {
const { res } = this;
if (res != null) {
#executeUpdateListeners(res: BatchResponse) {
this.#updateListeners.forEach((listener) => {
listener(res);
});
}
#executeCompleteListeners() {
this.#completeListeners.forEach((listener) => {
listener();
});
}
update(res: BatchResponse) {
if (this.isCompleted) {
throw new Error('completed task cannot be updated');
}
this.res = res;
this.#executeUpdateListeners();
this.#executeUpdateListeners(res);
}
updateWith(f: (current: BatchResponse | undefined) => BatchResponse) {
this.res = f(this.res);
this.#executeUpdateListeners();
if (this.isCompleted) {
throw new Error('completed task cannot be updated');
}
this.update(f(this.res));
}
complete() {
this.isCompleted = true;
this.#completeListeners.forEach((listener) => {
listener();
});
this.#executeCompleteListeners();
}
onUpdate(f: (res: BatchResponse) => void) {
@@ -91,7 +86,30 @@ export default class ObservableTask<BatchRequest, BatchResponse> {
this.#completeListeners.push(f);
}
toPromise(): Promise<BatchResponse> {
return this.#promise;
toUpdatePromise(): Promise<BatchResponse> {
if (this.isCompleted && this.res != null) {
return Promise.resolve(this.res);
}
const promise = new Promise<BatchResponse>((resolve) => {
this.onUpdate((res) => resolve(res));
});
return Promise.race([promise, this.toCompletePromise()]);
}
toCompletePromise(): Promise<BatchResponse> {
if (this.isCompleted && this.res != null) {
return Promise.resolve(this.res);
}
return new Promise((resolve, reject) => {
this.onComplete(() => {
if (this.res != null) {
resolve(this.res);
} else {
reject(new Error('result was not set'));
}
});
});
}
}

View File

@@ -0,0 +1,55 @@
import { createSignal, createMemo } from 'solid-js';
export type UseBatchProps<Task> = {
executor: (tasks: Task[]) => void;
interval?: number;
batchSize?: number;
};
const useBatch = <Task>(propsProvider: () => UseBatchProps<Task>) => {
const props = createMemo(propsProvider);
const batchSize = createMemo(() => props().batchSize ?? 100);
const interval = createMemo(() => props().interval ?? 2000);
const [taskQueue, setTaskQueue] = createSignal<Task[]>([]);
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const executeTasks = () => {
const { executor } = props();
const currentTaskQueue = taskQueue();
if (currentTaskQueue.length > 0) {
setTaskQueue([]);
executor(currentTaskQueue);
}
if (timeoutId != null) clearTimeout(timeoutId);
timeoutId = undefined;
};
const launchTimer = () => {
if (timeoutId == null) {
timeoutId = setTimeout(() => {
executeTasks();
}, interval());
}
};
const addTask = (task: Task) => {
if (taskQueue().length < batchSize()) {
setTaskQueue((currentTaskQueue) => [...currentTaskQueue, task]);
} else {
executeTasks();
setTaskQueue([task]);
}
launchTimer();
};
const removeTask = (task: Task) => {
setTaskQueue((currentTaskQueue) => currentTaskQueue.filter((e) => e !== task));
};
return { addTask, removeTask };
};
export default useBatch;