This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cf0a390afd8b62daf7719c2b9233d03f34c0f9aa Author: Tran Tien Duc <[email protected]> AuthorDate: Wed Jun 5 15:16:56 2019 +0700 JAMES-2786 RabbitMQ EventBus stop consume messages under heavy load --- .../james/mailbox/events/EventDispatcher.java | 7 ++++- .../james/mailbox/events/RabbitMQEventBus.java | 3 +- .../james/mailbox/events/RabbitMQEventBusTest.java | 35 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index 09c777e..5cb24aa 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXC import java.nio.charset.StandardCharsets; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.james.event.json.EventSerializer; @@ -57,6 +58,7 @@ class EventDispatcher { private final LocalListenerRegistry localListenerRegistry; private final AMQP.BasicProperties basicProperties; private final MailboxListenerExecutor mailboxListenerExecutor; + final AtomicInteger dispatchCount = new AtomicInteger(); EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) { this.eventSerializer = eventSerializer; @@ -81,6 +83,7 @@ class EventDispatcher { dispatchToLocalListeners(event, keys), dispatchToRemoteListeners(serializeEvent(event), keys)) .then() + .doOnSuccess(any -> dispatchCount.incrementAndGet()) .subscribeWith(MonoProcessor.create()); } @@ -124,7 +127,9 @@ class EventDispatcher { Stream<OutboundMessage> outboundMessages = routingKeys .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent)); - return sender.send(Flux.fromStream(outboundMessages)); + return sender.send(Flux.fromStream(outboundMessages)) + .publishOn(Schedulers.elastic()) + .doOnError(th -> th.printStackTrace()); } private byte[] serializeEvent(Event event) { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 041ded3..8ec96d6 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; import com.rabbitmq.client.Connection; + import reactor.core.publisher.Mono; import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Sender; @@ -55,7 +56,7 @@ public class RabbitMQEventBus implements EventBus, Startable { private volatile boolean isStopping; private GroupRegistrationHandler groupRegistrationHandler; private KeyRegistrationHandler keyRegistrationHandler; - private EventDispatcher eventDispatcher; + EventDispatcher eventDispatcher; private Sender sender; @Inject diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 66ae6d3..f94b960 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -19,12 +19,15 @@ package org.apache.james.mailbox.events; +import static com.jayway.awaitility.Awaitility.await; import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE; import static org.apache.james.backend.rabbitmq.Constants.DURABLE; import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS; +import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.newCountingListener; +import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.totalEventsReceived; import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT; import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A; @@ -47,6 +50,7 @@ import static org.mockito.Mockito.when; import java.io.Closeable; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.concurrent.TimeUnit; import org.apache.james.backend.rabbitmq.RabbitMQExtension; @@ -70,6 +74,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.stubbing.Answer; +import com.google.common.collect.ImmutableList; import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; @@ -181,6 +186,36 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract { + @Disabled("consuming too many threads") + @Test + void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws Exception { + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); + + eventBus().register(countingListener1, new EventBusTestFixture.GroupA()); + int totalGlobalRegistrations = 1; // GroupA + GroupB + GroupC + + int threadCount = 10; + int operationCount = 10000; + int totalDispatchOperations = threadCount * operationCount; + eventBus = (RabbitMQEventBus) eventBus(); + ConcurrentTestRunner.builder() + .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT, NO_KEYS)) + .threadCount(threadCount) + .operationCount(operationCount) + .runSuccessfullyWithin(Duration.ofMinutes(10)); + + // there is a moment when RabbitMQ EventBus consumed amount of messages, then it will stop to consume more + await() + .pollInterval(com.jayway.awaitility.Duration.FIVE_SECONDS) + .timeout(com.jayway.awaitility.Duration.TEN_MINUTES).until(() -> { + int totalEventsReceived = totalEventsReceived(ImmutableList.of(countingListener1)); + System.out.println("event received: " + totalEventsReceived); + System.out.println("dispatching count: " + eventBus.eventDispatcher.dispatchCount.get()); + assertThat(totalEventsReceived) + .isEqualTo(totalGlobalRegistrations * totalDispatchOperations); + }); + } + @Override public EventBus eventBus3() { return eventBus3; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
