import { ulid } from "ulid"; export type Task = { key: string; execute: () => Promise; }; type TaskStatus = { id: string; task: Task; } & ( | { status: "pending" | "completed" | "failed"; } | { status: "running"; promise: Promise; } ); type Options = { maxConcurrency: number; }; export class TaskExecutor { private taskStatuses: TaskStatus[] = []; private executionPromise?: { resolve: () => void; reject: (reason?: unknown) => void; promise: Promise; }; private options: Options; constructor(options?: Partial) { this.options = { maxConcurrency: 10, ...options, }; } private setExecutionPromise() { let resolveExecution: (() => void) | undefined; let rejectExecution: ((reason?: unknown) => void) | undefined; const promise = new Promise((resolve, reject) => { resolveExecution = resolve; rejectExecution = reject; }); if (resolveExecution === undefined || rejectExecution === undefined) { throw new Error("Illegal state: Promise not created"); } this.executionPromise = { resolve: resolveExecution, reject: rejectExecution, promise, }; } public setTasks(tasks: Task[]) { const newTaskStatuses: TaskStatus[] = tasks.map((task) => ({ id: `${task.key}-${ulid()}`, status: "pending", task, })); this.taskStatuses.push(...newTaskStatuses); } private get pendingTasks() { return this.taskStatuses.filter((task) => task.status === "pending"); } private get runningTasks() { return this.taskStatuses.filter((task) => task.status === "running"); } private updateStatus(id: string, status: TaskStatus) { const found = this.taskStatuses.find((task) => task.id === id); if (!found) { throw new Error(`Task not found: ${id}`); } Object.assign(found, status); } public async execute() { this.setExecutionPromise(); this.refresh(); await this.executionPromise?.promise; } private refresh() { if (this.runningTasks.length === 0 && this.pendingTasks.length === 0) { this.executionPromise?.resolve(); console.log("execution completed."); return; } const remainingTaskCount = this.options.maxConcurrency - this.runningTasks.length; if (remainingTaskCount <= 0) { return; } for (const task of this.pendingTasks.slice(0, remainingTaskCount)) { this.updateStatus(task.id, { id: task.id, status: "running", task: task.task, promise: (async () => { try { await task.task.execute(); this.updateStatus(task.id, { id: task.id, status: "completed", task: task.task, }); } catch (error) { console.error(error); this.updateStatus(task.id, { id: task.id, status: "failed", task: task.task, }); } finally { this.refresh(); } })(), }); } } }