This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 069f81f8a37d10694fdd77b52a4b3bbea6cbc1bc Author: Matthieu Baechler <[email protected]> AuthorDate: Mon Dec 9 16:16:49 2019 +0100 JAMES-3003 Mailbox event delivery should run listeners concurrently --- .../apache/james/mailbox/events/GroupContract.java | 57 ++++++++++++++++++++++ .../apache/james/mailbox/events/KeyContract.java | 44 +++++++++++++++++ .../apache/james/mailbox/events/InVMEventBus.java | 20 ++++---- .../mailbox/events/delivery/EventDelivery.java | 42 +--------------- .../mailbox/events/delivery/InVmEventDelivery.java | 11 ++--- .../events/delivery/InVmEventDeliveryTest.java | 21 ++------ .../james/mailbox/events/EventDispatcher.java | 6 +-- 7 files changed, 122 insertions(+), 79 deletions(-) diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java index 2f1aca9..ebfd4ea 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java @@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_UNSUPPOR import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS; import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A; import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_B; +import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_C; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND; import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION; @@ -42,6 +43,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -54,6 +57,7 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; +import reactor.core.scheduler.Schedulers; public interface GroupContract { @@ -97,6 +101,59 @@ public interface GroupContract { } @Test + default void groupNotificationShouldDeliverASingleEventToAllListenersAtTheSameTime() { + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>(); + eventBus().register(new MailboxListener.GroupMailboxListener() { + @Override + public Group getDefaultGroup() { + return new GenericGroup("groupA"); + } + + @Override + public void event(Event event) throws Exception { + threads.add(Thread.currentThread().getName()); + countDownLatch.await(); + } + }, GROUP_A); + eventBus().register(new MailboxListener.GroupMailboxListener() { + @Override + public Group getDefaultGroup() { + return new GenericGroup("groupB"); + } + + @Override + public void event(Event event) throws Exception { + threads.add(Thread.currentThread().getName()); + countDownLatch.await(); + } + }, GROUP_B); + eventBus().register(new MailboxListener.GroupMailboxListener() { + @Override + public Group getDefaultGroup() { + return new GenericGroup("groupC"); + } + + @Override + public void event(Event event) throws Exception { + threads.add(Thread.currentThread().getName()); + countDownLatch.await(); + } + }, GROUP_C); + + eventBus().dispatch(EVENT, NO_KEYS).subscribeOn(Schedulers.elastic()).subscribe(); + + + WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> assertThat(threads).hasSize(3)); + assertThat(threads).doesNotHaveDuplicates(); + } finally { + countDownLatch.countDown(); + } + } + + @Test default void listenersShouldBeAbleToDispatch() { AtomicBoolean successfulRetry = new AtomicBoolean(false); MailboxListener listener = event -> { diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java index f365ef3..2c8e581 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +57,7 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; +import reactor.core.scheduler.Schedulers; public interface KeyContract extends EventBusContract { @@ -86,6 +88,36 @@ public interface KeyContract extends EventBusContract { } @Test + default void notificationShouldDeliverASingleEventToAllListenersAtTheSameTime() { + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>(); + eventBus().register(event -> { + threads.add(Thread.currentThread().getName()); + countDownLatch.await(); + }, KEY_1); + eventBus().register(event -> { + threads.add(Thread.currentThread().getName()); + countDownLatch.await(); + }, KEY_1); + eventBus().register(event -> { + threads.add(Thread.currentThread().getName()); + countDownLatch.await(); + }, KEY_1); + + eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe(); + + + WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> assertThat(threads).hasSize(3)); + assertThat(threads).doesNotHaveDuplicates(); + } finally { + countDownLatch.countDown(); + } + } + + + @Test default void registeredListenersShouldNotReceiveNoopEvents() throws Exception { MailboxListener listener = newListener(); @@ -293,6 +325,18 @@ public interface KeyContract extends EventBusContract { .event(any()); } + + @Test + default void dispatchShouldNotifyAsynchronousListener() throws Exception { + MailboxListener listener = newListener(); + when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); + eventBus().register(listener, KEY_1); + + eventBus().dispatch(EVENT, KEY_1).block(); + + verify(listener, after(FIVE_HUNDRED_MS.toMillis())).event(EVENT); + } + @Test default void dispatchShouldNotBlockAsynchronousListener() throws Exception { MailboxListener listener = newListener(); diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java index f8bc8c9..bcd2376 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java @@ -34,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class InVMEventBus implements EventBus { @@ -79,8 +79,6 @@ public class InVMEventBus implements EventBus { public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) { if (!event.isNoop()) { return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys)) - .reduceWith(EventDelivery.ExecutionStages::empty, EventDelivery.ExecutionStages::combine) - .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture) .then() .onErrorResume(throwable -> Mono.empty()); } @@ -90,9 +88,7 @@ public class InVMEventBus implements EventBus { @Override public Mono<Void> reDeliver(Group group, Event event) { if (!event.isNoop()) { - return Mono.fromCallable(() -> groupDelivery(event, retrieveListenerFromGroup(group), group)) - .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture) - .then(); + return groupDelivery(event, retrieveListenerFromGroup(group), group); } return Mono.empty(); } @@ -102,17 +98,19 @@ public class InVMEventBus implements EventBus { .orElseThrow(() -> new GroupRegistrationNotFound(group)); } - private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> keys) { + private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) { return Flux.fromIterable(registeredListenersByKeys(keys)) - .map(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none())); + .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()).subscribeOn(Schedulers.elastic())) + .then(); } - private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) { + private Mono<Void> groupDeliveries(Event event) { return Flux.fromIterable(groups.entrySet()) - .map(entry -> groupDelivery(event, entry.getValue(), entry.getKey())); + .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()).subscribeOn(Schedulers.elastic())) + .then(); } - private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) { + private Mono<Void> groupDelivery(Event event, MailboxListener mailboxListener, Group group) { return eventDelivery.deliver( mailboxListener, event, diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java index 0f56a60..26c972d 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java @@ -32,7 +32,6 @@ import org.apache.james.mailbox.events.RetryBackoffConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -129,44 +128,5 @@ public interface EventDelivery { Mono<Void> handle(Event event); } - class ExecutionStages { - - public static ExecutionStages empty() { - return new ExecutionStages(Mono.empty(), Mono.empty()); - } - - static ExecutionStages synchronous(Mono<Void> synchronousListenerFuture) { - return new ExecutionStages(synchronousListenerFuture, Mono.empty()); - } - - static ExecutionStages asynchronous(Mono<Void> asynchronousListenerFuture) { - return new ExecutionStages(Mono.empty(),asynchronousListenerFuture); - } - - private final Mono<Void> synchronousListenerFuture; - private final Mono<Void> asynchronousListenerFuture; - - private ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) { - this.synchronousListenerFuture = synchronousListenerFuture; - this.asynchronousListenerFuture = asynchronousListenerFuture; - } - - public Mono<Void> synchronousListenerFuture() { - return synchronousListenerFuture; - } - - public Mono<Void> allListenerFuture() { - return synchronousListenerFuture - .concatWith(asynchronousListenerFuture) - .then(); - } - - public ExecutionStages combine(ExecutionStages another) { - return new ExecutionStages( - Flux.concat(this.synchronousListenerFuture, another.synchronousListenerFuture).then(), - Flux.concat(this.asynchronousListenerFuture, another.asynchronousListenerFuture).then()); - } - } - - ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option); + Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option); } diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java index 718d5f1..a36119f 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java @@ -51,18 +51,17 @@ public class InVmEventDelivery implements EventDelivery { } @Override - public ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option) { + public Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option) { Mono<Void> executionResult = deliverByOption(listener, event, option); - return toExecutionStages(listener.getExecutionMode(), executionResult); + return waitForResultIfNeeded(listener.getExecutionMode(), executionResult); } - private ExecutionStages toExecutionStages(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) { + private Mono<Void> waitForResultIfNeeded(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) { if (executionMode.equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) { - return ExecutionStages.synchronous(executionResult); + return executionResult; } - - return ExecutionStages.asynchronous(executionResult); + return executionResult.or(Mono.empty()).onErrorResume(throwable -> Mono.empty()); } private Mono<Void> deliverByOption(MailboxListener listener, Event event, DeliveryOption deliveryOption) { diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java index 547e192..dfea972 100644 --- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java +++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java @@ -63,7 +63,6 @@ class InVmEventDeliveryTest { void deliverShouldDeliverEvent() { when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS); inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .allListenerFuture() .block(); assertThat(listener.numberOfEventCalls()) @@ -74,7 +73,6 @@ class InVmEventDeliveryTest { void deliverShouldReturnSuccessSynchronousMono() { when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS); assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .synchronousListenerFuture() .block()) .doesNotThrowAnyException(); } @@ -86,7 +84,6 @@ class InVmEventDeliveryTest { .when(listener).event(EVENT); assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .allListenerFuture() .block()) .isInstanceOf(RuntimeException.class); @@ -101,7 +98,6 @@ class InVmEventDeliveryTest { .when(listener).event(EVENT); assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .synchronousListenerFuture() .block()) .isInstanceOf(RuntimeException.class); } @@ -114,7 +110,6 @@ class InVmEventDeliveryTest { void deliverShouldDeliverEvent() { when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .allListenerFuture() .block(); assertThat(listener.numberOfEventCalls()) @@ -125,24 +120,19 @@ class InVmEventDeliveryTest { void deliverShouldReturnSuccessSynchronousMono() { when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .synchronousListenerFuture() .block()) .doesNotThrowAnyException(); } @Test - void deliverShouldNotDeliverWhenListenerGetException() { + void deliverShouldNotFailWhenListenerGetException() { when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); doThrow(new RuntimeException()) .when(listener).event(EVENT); - assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .allListenerFuture() + assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) .block()) - .isInstanceOf(RuntimeException.class); - - assertThat(listener.numberOfEventCalls()) - .isEqualTo(0); + .doesNotThrowAnyException(); } @Test @@ -152,7 +142,6 @@ class InVmEventDeliveryTest { .when(listener).event(EVENT); assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none()) - .synchronousListenerFuture() .block()) .doesNotThrowAnyException(); } @@ -174,7 +163,6 @@ class InVmEventDeliveryTest { DeliveryOption.of( BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener), PermanentFailureHandler.NO_HANDLER)) - .allListenerFuture() .block(); assertThat(listener.numberOfEventCalls()) @@ -193,7 +181,6 @@ class InVmEventDeliveryTest { DeliveryOption.of( Retryer.NO_RETRYER, PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter))) - .allListenerFuture() .block(); assertThat(deadLetter.groupsWithFailedEvents().toStream()) @@ -214,7 +201,6 @@ class InVmEventDeliveryTest { DeliveryOption.of( BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener), PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter))) - .allListenerFuture() .block(); SoftAssertions.assertSoftly(softy -> { @@ -242,7 +228,6 @@ class InVmEventDeliveryTest { DeliveryOption.of( BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener), PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter))) - .allListenerFuture() .block(); SoftAssertions.assertSoftly(softy -> { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index 47844d7..6c1b586 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -90,13 +90,13 @@ class EventDispatcher { return Flux.fromIterable(keys) .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key) .map(listener -> Tuples.of(key, listener))) - .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) - .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1())) + .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS) + .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()).subscribeOn(Schedulers.elastic())) .then(); } private Mono<Void> executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) { - return Mono.from((sink) -> { + return Mono.from(sink -> { try { mailboxListenerExecutor.execute(mailboxListener, MDCBuilder.create() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
