This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit edce29a4a9154ba0f92e2f0d4bc14bf5f7b6ff2c
Author: Matthieu Baechler <[email protected]>
AuthorDate: Wed Jun 12 18:16:06 2019 +0200

    JAMES-2786 limit the number of thread required for the concurrent stress 
test
---
 .../main/java/org/apache/james/mailbox/events/EventDispatcher.java  | 6 ++----
 .../java/org/apache/james/mailbox/events/GroupRegistration.java     | 2 +-
 .../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java  | 3 +--
 3 files changed, 4 insertions(+), 7 deletions(-)

diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 5cb24aa..8f31102 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -82,6 +82,7 @@ class EventDispatcher {
             .concat(
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(serializeEvent(event), keys))
+            .subscribeOn(Schedulers.elastic())
             .then()
             .doOnSuccess(any -> dispatchCount.incrementAndGet())
             .subscribeWith(MonoProcessor.create());
@@ -89,7 +90,6 @@ class EventDispatcher {
 
     private Mono<Void> dispatchToLocalListeners(Event event, 
Set<RegistrationKey> keys) {
         return Flux.fromIterable(keys)
-            .subscribeOn(Schedulers.elastic())
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
             .filter(pair -> 
pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
@@ -127,9 +127,7 @@ class EventDispatcher {
         Stream<OutboundMessage> outboundMessages = routingKeys
             .map(routingKey -> new 
OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), 
basicProperties, serializedEvent));
 
-        return sender.send(Flux.fromStream(outboundMessages))
-            .publishOn(Schedulers.elastic())
-            .doOnError(th -> th.printStackTrace());
+        return sender.send(Flux.fromStream(outboundMessages));
     }
 
     private byte[] serializeEvent(Event event) {
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 985da0a..5164c2d 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -140,7 +140,7 @@ class GroupRegistration implements Registration {
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
         return delayGenerator.delayIfHaveTo(currentRetryCount)
-            .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> 
runListener(event))).publishOn(Schedulers.elastic()))
+            .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> 
runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, 
currentRetryCount, throwable))
             .then(Mono.fromRunnable(acknowledgableDelivery::ack))
             .subscribeWith(MonoProcessor.create())
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index f94b960..eb285d0 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -186,7 +186,6 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     class ConcurrentTest implements 
EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
         EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
 
-        @Disabled("consuming too many threads")
         @Test
         void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws 
Exception {
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution 
countingListener1 = newCountingListener();
@@ -199,7 +198,7 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
             int totalDispatchOperations = threadCount * operationCount;
             eventBus = (RabbitMQEventBus) eventBus();
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> 
eventBus.dispatch(EVENT, NO_KEYS))
+                .operation((threadNumber, operationNumber) -> 
eventBus.dispatch(EVENT, NO_KEYS).block())
                 .threadCount(threadCount)
                 .operationCount(operationCount)
                 .runSuccessfullyWithin(Duration.ofMinutes(10));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to