This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ec2450754a92d7b9d5f5704783757f880da92c51 Author: Benoit TELLIER <[email protected]> AuthorDate: Thu Jan 8 13:56:30 2026 +0100 JAMES-4159 s/executionRate/maxConcurrency/g --- docs/modules/servers/partials/configure/listeners.adoc | 2 +- event-bus/api/src/main/java/org/apache/james/events/EventBus.java | 6 +++--- .../api/src/test/java/org/apache/james/events/GroupContract.java | 2 +- .../api/src/test/java/org/apache/james/events/KeyContract.java | 2 +- .../src/main/java/org/apache/james/events/EventDispatcher.java | 4 ++-- .../src/main/java/org/apache/james/events/GroupRegistration.java | 6 +++--- .../java/org/apache/james/events/GroupRegistrationHandler.java | 6 +++--- .../main/java/org/apache/james/events/KeyRegistrationHandler.java | 8 ++++---- .../src/main/java/org/apache/james/events/RabbitMQEventBus.java | 2 +- .../in-vm/src/main/java/org/apache/james/events/InVMEventBus.java | 6 +++--- .../java/org/apache/james/events/delivery/InVmEventDelivery.java | 2 +- .../java/org/apache/james/modules/mailbox/DefaultEventModule.java | 2 +- 12 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/modules/servers/partials/configure/listeners.adoc b/docs/modules/servers/partials/configure/listeners.adoc index 39ea610964..f4b9d657ef 100644 --- a/docs/modules/servers/partials/configure/listeners.adoc +++ b/docs/modules/servers/partials/configure/listeners.adoc @@ -26,7 +26,7 @@ If *true* the execution will be scheduled in a reactor elastic scheduler. If *fa Already provided additional listeners are documented below. -The <executionRate> property controls the number of events processed in parallel. Defaults to 10. +The <maxConcurrency> property controls the number of events processed in parallel. Defaults to 10. The <executionTimeout> property (duration) controls the timeout for the execution of each listener. None if omitted. diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java index 19f3c98fa2..61723074a8 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java @@ -33,11 +33,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface EventBus { - record Configuration(int executionRate, Optional<Duration> executionTimeout) { - public static Configuration DEFAULT = new Configuration(EXECUTION_RATE, Optional.empty()); + record Configuration(int maxConcurrency, Optional<Duration> executionTimeout) { + public static Configuration DEFAULT = new Configuration(DEFAULT_MAX_CONCURRENCY, Optional.empty()); } - int EXECUTION_RATE = 10; + int DEFAULT_MAX_CONCURRENCY = 10; interface StructuredLoggingFields { String EVENT_ID = "eventId"; diff --git a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java index 7b6efd5829..2615caa9b3 100644 --- a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java +++ b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java @@ -82,7 +82,7 @@ public interface GroupContract { @Override public void event(Event event) throws Exception { - if (inFlight.incrementAndGet() > EventBus.EXECUTION_RATE) { + if (inFlight.incrementAndGet() > EventBus.DEFAULT_MAX_CONCURRENCY) { rateExceeded.set(true); } nbCalls.incrementAndGet(); diff --git a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java index 162bc37cf7..0e0812e8d3 100644 --- a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java +++ b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java @@ -73,7 +73,7 @@ public interface KeyContract extends EventBusContract { AtomicBoolean rateExceeded = new AtomicBoolean(false); Mono.from(eventBus().register(event -> { - if (nbCalls.get() - finishedExecutions.get() > EventBus.EXECUTION_RATE) { + if (nbCalls.get() - finishedExecutions.get() > EventBus.DEFAULT_MAX_CONCURRENCY) { rateExceeded.set(true); } nbCalls.incrementAndGet(); diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java index 32624cdb86..f0316ea3e2 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java @@ -137,9 +137,9 @@ public class EventDispatcher { private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) { return Flux.fromIterable(keys) .flatMap(key -> Flux.fromIterable(localListenerRegistry.getLocalListeners(key)) - .map(listener -> Tuples.of(key, listener)), EventBus.EXECUTION_RATE) + .map(listener -> Tuples.of(key, listener)), EventBus.DEFAULT_MAX_CONCURRENCY) .filter(pair -> pair.getT2().getExecutionMode() == EventListener.ExecutionMode.SYNCHRONOUS) - .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()), EventBus.EXECUTION_RATE) + .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()), EventBus.DEFAULT_MAX_CONCURRENCY) .then(); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index 5dcf4734b0..bd9e72f4a9 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -107,7 +107,7 @@ class GroupRegistration implements Registration { } GroupRegistration start() { - scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "group-handler"); + scheduler = Schedulers.newBoundedElastic(configurations.eventBusConfiguration().maxConcurrency(), ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "group-handler"); receiverSubscriber = Optional .of(createGroupWorkQueue() .then(retryHandler.createRetryExchange(queueName)) @@ -139,11 +139,11 @@ class GroupRegistration implements Registration { private Disposable consumeWorkQueue() { return Flux.using( receiverProvider::createReceiver, - receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())), + receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(configurations.eventBusConfiguration().maxConcurrency())), Receiver::close) .publishOn(Schedulers.parallel()) .filter(delivery -> Objects.nonNull(delivery.getBody())) - .flatMap(this::deliver, configurations.eventBusConfiguration().executionRate()) + .flatMap(this::deliver, configurations.eventBusConfiguration().maxConcurrency()) .subscribeOn(scheduler) .subscribe(); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index 5b6ecd08c5..6f4b540f53 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -93,7 +93,7 @@ public class GroupRegistrationHandler { this.configurations = configurations; this.groupRegistrations = new ConcurrentHashMap<>(); this.queueName = namingStrategy.workQueue(GROUP); - this.scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); + this.scheduler = Schedulers.newBoundedElastic(configurations.eventBusConfiguration().maxConcurrency(), ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); this.consumer = Optional.empty(); } @@ -124,10 +124,10 @@ public class GroupRegistrationHandler { private Disposable consumeWorkQueue() { return Flux.using( receiverProvider::createReceiver, - receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())), + receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(configurations.eventBusConfiguration().maxConcurrency())), Receiver::close) .filter(delivery -> Objects.nonNull(delivery.getBody())) - .flatMap(this::deliver, configurations.eventBusConfiguration().executionRate()) + .flatMap(this::deliver, configurations.eventBusConfiguration().maxConcurrency()) .subscribeOn(scheduler) .subscribe(); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java index f7d5aad970..ab378fbec6 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java @@ -95,14 +95,14 @@ class KeyRegistrationHandler { } void start() { - scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "keys-handler"); + scheduler = Schedulers.newBoundedElastic(EventBus.DEFAULT_MAX_CONCURRENCY, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "keys-handler"); declareQueue(); newSubscription = Flux.using( receiverProvider::createReceiver, - receiver -> receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)), + receiver -> receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.DEFAULT_MAX_CONCURRENCY)), Receiver::close) - .flatMap(this::handleDelivery, EventBus.EXECUTION_RATE) + .flatMap(this::handleDelivery, EventBus.DEFAULT_MAX_CONCURRENCY) .subscribeOn(scheduler) .subscribe(); receiverSubscriber = Optional.of(newSubscription); @@ -194,7 +194,7 @@ class KeyRegistrationHandler { List<Event> events = toEvent(delivery); return Flux.fromIterable(listenersToCall) - .flatMap(listener -> executeListener(listener, events, registrationKey), EventBus.EXECUTION_RATE) + .flatMap(listener -> executeListener(listener, events, registrationKey), EventBus.DEFAULT_MAX_CONCURRENCY) .then(); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index b8261bca63..558866651d 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -68,7 +68,7 @@ public class RabbitMQEventBus implements EventBus, Startable { public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, RetryBackoffConfiguration retryBackoff, EventBus.Configuration eventBusConfiguration) { public Configurations(RabbitMQConfiguration rabbitMQConfiguration, RetryBackoffConfiguration retryBackoff) { - this(rabbitMQConfiguration, retryBackoff, new Configuration(EventBus.EXECUTION_RATE, Optional.empty())); + this(rabbitMQConfiguration, retryBackoff, new Configuration(EventBus.DEFAULT_MAX_CONCURRENCY, Optional.empty())); } } diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java index bd7c15a9cd..5ebe475d18 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java @@ -134,19 +134,19 @@ public class InVMEventBus implements EventBus { private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) { return Flux.fromIterable(registeredListenersByKeys(keys)) - .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE) + .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()), EventBus.DEFAULT_MAX_CONCURRENCY) .then(); } private Mono<Void> keyDeliveries(List<Event> events, Set<RegistrationKey> keys) { return Flux.fromIterable(registeredListenersByKeys(keys)) - .flatMap(listener -> eventDelivery.deliver(listener, events, EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE) + .flatMap(listener -> eventDelivery.deliver(listener, events, EventDelivery.DeliveryOption.none()), EventBus.DEFAULT_MAX_CONCURRENCY) .then(); } private Mono<Void> groupDeliveries(List<Event> events) { return Flux.fromIterable(groups.entrySet()) - .flatMap(entry -> groupDelivery(events, entry.getValue(), entry.getKey()), EventBus.EXECUTION_RATE) + .flatMap(entry -> groupDelivery(events, entry.getValue(), entry.getKey()), EventBus.DEFAULT_MAX_CONCURRENCY) .then(); } diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java index 0f6d2dbefa..4f124fa67f 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java @@ -53,7 +53,7 @@ public class InVmEventDelivery implements EventDelivery { @VisibleForTesting public InVmEventDelivery(MetricFactory metricFactory) { this.metricFactory = metricFactory; - this.configuration = new EventBus.Configuration(EventBus.EXECUTION_RATE, Optional.empty()); + this.configuration = new EventBus.Configuration(EventBus.DEFAULT_MAX_CONCURRENCY, Optional.empty()); } @Inject diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java index 6341002ec4..bfeda1cd68 100644 --- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java +++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java @@ -92,7 +92,7 @@ public class DefaultEventModule extends AbstractModule { EventBus.Configuration providesEventBusConfiguration(ConfigurationProvider configurationProvider) throws ConfigurationException { HierarchicalConfiguration<ImmutableNode> configuration = configurationProvider.getConfiguration("listeners"); - return new EventBus.Configuration(configuration.getInt("executionRate", EventBus.EXECUTION_RATE), + return new EventBus.Configuration(configuration.getInt("maxConcurrency", EventBus.DEFAULT_MAX_CONCURRENCY), Optional.ofNullable(configuration.getString("executionTimeout", null)).map(DurationParser::parse)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
