This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d3908cda7d7fa8a767432a7019b883ff1fa49548 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Fri Jun 14 10:52:04 2019 +0200 JAMES-2786 Use a channel pool for EventBus --- .../james/mailbox/events/EventBusConcurrentTestContract.java | 12 ++++++------ .../org/apache/james/mailbox/events/GroupRegistration.java | 1 + .../org/apache/james/mailbox/events/RabbitMQEventBus.java | 12 +++++++++++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java index 2f6e9cb..738969f 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java @@ -74,7 +74,7 @@ public interface EventBusConcurrentTestContract { int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(FIVE_SECONDS); @@ -97,7 +97,7 @@ public interface EventBusConcurrentTestContract { int totalEventBus = 1; ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(FIVE_SECONDS); @@ -127,7 +127,7 @@ public interface EventBusConcurrentTestContract { int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS; ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(FIVE_SECONDS); @@ -159,7 +159,7 @@ public interface EventBusConcurrentTestContract { int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(FIVE_SECONDS); @@ -187,7 +187,7 @@ public interface EventBusConcurrentTestContract { int totalEventBus = 2; // eventBus1 + eventBus2 ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(FIVE_SECONDS); @@ -223,7 +223,7 @@ public interface EventBusConcurrentTestContract { int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS; ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block()) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(FIVE_SECONDS); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 5164c2d..bbdd46b 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -140,6 +140,7 @@ class GroupRegistration implements Registration { int currentRetryCount = getRetryCount(acknowledgableDelivery); return delayGenerator.delayIfHaveTo(currentRetryCount) + .publishOn(Schedulers.elastic()) .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event)))) .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) .then(Mono.fromRunnable(acknowledgableDelivery::ack)) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 8ec96d6..784baab 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -34,11 +34,15 @@ import com.google.common.base.Preconditions; import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; +import reactor.rabbitmq.ChannelPool; +import reactor.rabbitmq.ChannelPoolFactory; +import reactor.rabbitmq.ChannelPoolOptions; import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Sender; import reactor.rabbitmq.SenderOptions; public class RabbitMQEventBus implements EventBus, Startable { + private static final int MAX_CHANNELS_NUMBER = 5; private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running"; static final String MAILBOX_EVENT = "mailboxEvent"; static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange"; @@ -54,6 +58,7 @@ public class RabbitMQEventBus implements EventBus, Startable { private volatile boolean isRunning; private volatile boolean isStopping; + private ChannelPool channelPool; private GroupRegistrationHandler groupRegistrationHandler; private KeyRegistrationHandler keyRegistrationHandler; EventDispatcher eventDispatcher; @@ -77,7 +82,11 @@ public class RabbitMQEventBus implements EventBus, Startable { public void start() { if (!isRunning && !isStopping) { - sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono) + this.channelPool = ChannelPoolFactory.createChannelPool( + connectionMono, + new ChannelPoolOptions().maxCacheSize(MAX_CHANNELS_NUMBER) + ); + sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono).channelPool(channelPool) .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel)))); LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor); @@ -97,6 +106,7 @@ public class RabbitMQEventBus implements EventBus, Startable { isRunning = false; groupRegistrationHandler.stop(); keyRegistrationHandler.stop(); + channelPool.close(); sender.close(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
