This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit edce29a4a9154ba0f92e2f0d4bc14bf5f7b6ff2c Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Jun 12 18:16:06 2019 +0200 JAMES-2786 limit the number of thread required for the concurrent stress test --- .../main/java/org/apache/james/mailbox/events/EventDispatcher.java | 6 ++---- .../java/org/apache/james/mailbox/events/GroupRegistration.java | 2 +- .../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java | 3 +-- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index 5cb24aa..8f31102 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -82,6 +82,7 @@ class EventDispatcher { .concat( dispatchToLocalListeners(event, keys), dispatchToRemoteListeners(serializeEvent(event), keys)) + .subscribeOn(Schedulers.elastic()) .then() .doOnSuccess(any -> dispatchCount.incrementAndGet()) .subscribeWith(MonoProcessor.create()); @@ -89,7 +90,6 @@ class EventDispatcher { private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) { return Flux.fromIterable(keys) - .subscribeOn(Schedulers.elastic()) .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key) .map(listener -> Tuples.of(key, listener))) .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) @@ -127,9 +127,7 @@ class EventDispatcher { Stream<OutboundMessage> outboundMessages = routingKeys .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent)); - return sender.send(Flux.fromStream(outboundMessages)) - .publishOn(Schedulers.elastic()) - .doOnError(th -> th.printStackTrace()); + return sender.send(Flux.fromStream(outboundMessages)); } private byte[] serializeEvent(Event event) { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 985da0a..5164c2d 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -140,7 +140,7 @@ class GroupRegistration implements Registration { int currentRetryCount = getRetryCount(acknowledgableDelivery); return delayGenerator.delayIfHaveTo(currentRetryCount) - .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))).publishOn(Schedulers.elastic())) + .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event)))) .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) .then(Mono.fromRunnable(acknowledgableDelivery::ack)) .subscribeWith(MonoProcessor.create()) diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index f94b960..eb285d0 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -186,7 +186,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract { - @Disabled("consuming too many threads") @Test void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws Exception { EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); @@ -199,7 +198,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, int totalDispatchOperations = threadCount * operationCount; eventBus = (RabbitMQEventBus) eventBus(); ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT, NO_KEYS)) + .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT, NO_KEYS).block()) .threadCount(threadCount) .operationCount(operationCount) .runSuccessfullyWithin(Duration.ofMinutes(10)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
