mirror of
https://github.com/aljazceru/ditto.git
synced 2026-01-13 18:44:22 +01:00
Limit firehose concurrency
This commit is contained in:
@@ -242,6 +242,10 @@ class Conf {
|
||||
static get firehoseEnabled(): boolean {
|
||||
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
|
||||
}
|
||||
/** Number of events the firehose is allowed to process at one time before they have to wait in a queue. */
|
||||
static get firehoseConcurrency(): number {
|
||||
return Math.ceil(Number(Deno.env.get('FIREHOSE_CONCURRENCY') ?? (Conf.pg.poolSize * 0.25)));
|
||||
}
|
||||
/** Whether to enable Ditto cron jobs. */
|
||||
static get cronEnabled(): boolean {
|
||||
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { Semaphore } from '@lambdalisue/async';
|
||||
import { Stickynotes } from '@soapbox/stickynotes';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { firehoseEventCounter } from '@/metrics.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { nostrNow } from '@/utils.ts';
|
||||
@@ -7,6 +9,7 @@ import { nostrNow } from '@/utils.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
|
||||
const console = new Stickynotes('ditto:firehose');
|
||||
const sem = new Semaphore(Conf.firehoseConcurrency);
|
||||
|
||||
/**
|
||||
* This function watches events on all known relays and performs
|
||||
@@ -22,9 +25,13 @@ export async function startFirehose(): Promise<void> {
|
||||
console.debug(`NostrEvent<${event.kind}> ${event.id}`);
|
||||
firehoseEventCounter.inc({ kind: event.kind });
|
||||
|
||||
pipeline
|
||||
.handleEvent(event, AbortSignal.timeout(5000))
|
||||
.catch(() => {});
|
||||
sem.lock(async () => {
|
||||
try {
|
||||
await pipeline.handleEvent(event, AbortSignal.timeout(5000));
|
||||
} catch (e) {
|
||||
console.warn(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user