This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b3ea48605e656ee2074fa1b47020f61528b235bf Author: Benoit Tellier <[email protected]> AuthorDate: Wed May 20 11:21:50 2020 +0700 JAMES-3191 Specify scheduler for EventBus key registration --- .../org/apache/james/mailbox/events/EventBus.java | 6 +- .../mailbox/MailboxManagerStressContract.java | 12 +-- .../apache/james/mailbox/MailboxManagerTest.java | 26 +++---- .../mailbox/events/ErrorHandlingContract.java | 8 +- .../events/EventBusConcurrentTestContract.java | 38 ++++----- .../apache/james/mailbox/events/KeyContract.java | 90 +++++++++++----------- .../apache/james/mailbox/events/InVMEventBus.java | 4 +- .../mailbox/events/KeyRegistrationHandler.java | 27 ++++--- .../james/mailbox/events/RabbitMQEventBus.java | 2 +- .../james/mailbox/events/RabbitMQEventBusTest.java | 14 ++-- .../apache/james/imap/processor/IdleProcessor.java | 7 +- .../imap/processor/base/SelectedMailboxImpl.java | 7 +- .../processor/base/SelectedMailboxImplTest.java | 8 +- 13 files changed, 137 insertions(+), 112 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java index 752740e..c07cfb9 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java @@ -21,6 +21,8 @@ package org.apache.james.mailbox.events; import java.util.Set; +import org.reactivestreams.Publisher; + import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Mono; @@ -44,11 +46,11 @@ public interface EventBus { } } - default Registration register(MailboxListener listener, RegistrationKey key) { + default Publisher<Registration> register(MailboxListener listener, RegistrationKey key) { return register(MailboxListener.wrapReactive(listener), key); } - Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key); + Publisher<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key); Registration register(MailboxListener.ReactiveMailboxListener listener, Group group) throws GroupAlreadyRegistered; diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java index c32b854..192140f 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.james.core.Username; -import org.apache.james.mailbox.events.Event; import org.apache.james.mailbox.events.EventBus; import org.apache.james.mailbox.events.MailboxIdRegistrationKey; import org.apache.james.mailbox.events.MailboxListener; @@ -46,6 +45,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public interface MailboxManagerStressContract<T extends MailboxManager> { int APPEND_OPERATIONS = 200; @@ -66,13 +67,12 @@ public interface MailboxManagerStressContract<T extends MailboxManager> { getManager().startProcessingRequest(session); MailboxPath path = MailboxPath.forUser(username, "INBOX"); MailboxId mailboxId = getManager().createMailbox(path, session).get(); - retrieveEventBus().register(new MailboxListener() { - @Override - public void event(Event event) { + Mono.from(retrieveEventBus() + .register(event -> { MessageUid u = ((MailboxListener.Added) event).getUids().iterator().next(); uList.add(u); - } - }, new MailboxIdRegistrationKey(mailboxId)); + }, new MailboxIdRegistrationKey(mailboxId))) + .block(); getManager().endProcessingRequest(session); getManager().logout(session); diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java index 15ab35d..c262c91 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java @@ -719,7 +719,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { @Test void deleteMailboxShouldFireMailboxDeletionEvent() throws Exception { assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Quota)); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); mailboxManager.deleteMailbox(inbox, session); @@ -737,7 +737,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { @Test void deleteMailboxByIdShouldFireMailboxDeletionEvent() throws Exception { assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Quota)); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); mailboxManager.deleteMailbox(inboxId, session); @@ -792,7 +792,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { @Test void addingMessageShouldFireAddedEvent() throws Exception { - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); inboxManager.appendMessage(MessageManager.AppendCommand.builder() .build(message), session); @@ -810,7 +810,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session); inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); inboxManager.expunge(MessageRange.all(), session); assertThat(listener.getEvents()) @@ -827,7 +827,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { ComposedMessageId messageId = inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session).getId(); inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); inboxManager.delete(ImmutableList.of(messageId.getUid()), session); assertThat(listener.getEvents()) @@ -843,7 +843,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { void setFlagsShouldFireFlagsUpdatedEvent() throws Exception { inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); inboxManager.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session); assertThat(listener.getEvents()) @@ -861,7 +861,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session); inboxManager.appendMessage(AppendCommand.builder().build(message), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get())); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block(); mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) @@ -879,7 +879,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { mailboxManager.createMailbox(newPath, session); inboxManager.appendMessage(AppendCommand.builder().build(message), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) @@ -896,7 +896,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session); inboxManager.appendMessage(AppendCommand.builder().build(message), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get())); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block(); mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) @@ -915,7 +915,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session); ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId(); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get())); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block(); mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) @@ -933,7 +933,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { mailboxManager.createMailbox(newPath, session); inboxManager.appendMessage(AppendCommand.builder().build(message), session); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) @@ -948,7 +948,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session); ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId(); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get())); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block(); mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) @@ -966,7 +966,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { mailboxManager.createMailbox(newPath, session); ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId(); - retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId)); + Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block(); mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session); assertThat(listener.getEvents()) diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java index 7979ed2..b598ac1 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java @@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + interface ErrorHandlingContract extends EventBusContract { class ThrowingListener implements MailboxListener { @@ -82,7 +84,7 @@ interface ErrorHandlingContract extends EventBusContract { doThrow(new RuntimeException()) .when(eventCollector).event(EVENT); - eventBus().register(eventCollector, KEY_1); + Mono.from(eventBus().register(eventCollector, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); assertThat(eventCollector.getEvents()) @@ -232,7 +234,7 @@ interface ErrorHandlingContract extends EventBusContract { .doCallRealMethod() .when(eventCollector).event(EVENT); - eventBus().register(eventCollector, KEY_1); + Mono.from(eventBus().register(eventCollector, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); TimeUnit.SECONDS.sleep(1); @@ -352,7 +354,7 @@ interface ErrorHandlingContract extends EventBusContract { EventCollector eventCollector2 = eventCollector(); eventBus().register(eventCollector, GROUP_A); - eventBus().register(eventCollector2, KEY_1); + Mono.from(eventBus().register(eventCollector2, KEY_1)).block(); eventBus().reDeliver(GROUP_A, EVENT).block(); getSpeedProfile().longWaitCondition() diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java index 99b64c1..1aa3c2e 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java @@ -39,6 +39,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public interface EventBusConcurrentTestContract { Duration FIVE_SECONDS = Duration.ofSeconds(5); @@ -89,9 +91,9 @@ public interface EventBusConcurrentTestContract { EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener(); EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener(); EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener(); - eventBus().register(countingListener1, KEY_1); - eventBus().register(countingListener2, KEY_2); - eventBus().register(countingListener3, KEY_3); + Mono.from(eventBus().register(countingListener1, KEY_1)).block(); + Mono.from(eventBus().register(countingListener2, KEY_2)).block(); + Mono.from(eventBus().register(countingListener3, KEY_3)).block(); int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3 int totalEventBus = 1; @@ -120,9 +122,9 @@ public interface EventBusConcurrentTestContract { int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS; - eventBus().register(countingListener1, KEY_1); - eventBus().register(countingListener2, KEY_2); - eventBus().register(countingListener3, KEY_3); + Mono.from(eventBus().register(countingListener1, KEY_1)).block(); + Mono.from(eventBus().register(countingListener2, KEY_2)).block(); + Mono.from(eventBus().register(countingListener3, KEY_3)).block(); int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3 int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS; @@ -175,13 +177,13 @@ public interface EventBusConcurrentTestContract { EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener(); EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener(); - eventBus().register(countingListener1, KEY_1); - eventBus().register(countingListener2, KEY_2); - eventBus().register(countingListener3, KEY_3); + Mono.from(eventBus().register(countingListener1, KEY_1)).block(); + Mono.from(eventBus().register(countingListener2, KEY_2)).block(); + Mono.from(eventBus().register(countingListener3, KEY_3)).block(); - eventBus2().register(countingListener1, KEY_1); - eventBus2().register(countingListener2, KEY_2); - eventBus2().register(countingListener3, KEY_3); + Mono.from(eventBus2().register(countingListener1, KEY_1)).block(); + Mono.from(eventBus2().register(countingListener2, KEY_2)).block(); + Mono.from(eventBus2().register(countingListener3, KEY_3)).block(); int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3 int totalEventBus = 2; // eventBus1 + eventBus2 @@ -209,14 +211,14 @@ public interface EventBusConcurrentTestContract { int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS; - eventBus().register(countingListener1, KEY_1); - eventBus().register(countingListener2, KEY_2); + Mono.from(eventBus().register(countingListener1, KEY_1)).block(); + Mono.from(eventBus().register(countingListener2, KEY_2)).block(); - eventBus2().register(countingListener1, KEY_1); - eventBus2().register(countingListener2, KEY_2); + Mono.from(eventBus2().register(countingListener1, KEY_1)).block(); + Mono.from(eventBus2().register(countingListener2, KEY_2)).block(); - eventBus3().register(countingListener3, KEY_1); - eventBus3().register(countingListener3, KEY_2); + Mono.from(eventBus3().register(countingListener3, KEY_1)).block(); + Mono.from(eventBus3().register(countingListener3, KEY_2)).block(); int totalKeyListenerRegistrations = 2; // KEY1 + KEY2 int totalEventBus = 3; // eventBus1 + eventBus2 + eventBus3 diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java index 34dd756..908f156 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java @@ -56,6 +56,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; + +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public interface KeyContract extends EventBusContract { @@ -68,7 +70,7 @@ public interface KeyContract extends EventBusContract { AtomicInteger finishedExecutions = new AtomicInteger(0); AtomicBoolean rateExceeded = new AtomicBoolean(false); - eventBus().register(event -> { + Mono.from(eventBus().register(event -> { if (nbCalls.get() - finishedExecutions.get() > EventBus.EXECUTION_RATE) { rateExceeded.set(true); } @@ -76,7 +78,7 @@ public interface KeyContract extends EventBusContract { Thread.sleep(Duration.ofMillis(200).toMillis()); finishedExecutions.incrementAndGet(); - }, KEY_1); + }, KEY_1)).block(); IntStream.range(0, eventCount) .forEach(i -> eventBus().dispatch(EVENT, KEY_1).block()); @@ -91,18 +93,18 @@ public interface KeyContract extends EventBusContract { CountDownLatch countDownLatch = new CountDownLatch(1); try { ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>(); - eventBus().register(event -> { + Mono.from(eventBus().register(event -> { threads.add(Thread.currentThread().getName()); countDownLatch.await(); - }, KEY_1); - eventBus().register(event -> { + }, KEY_1)).block(); + Mono.from(eventBus().register(event -> { threads.add(Thread.currentThread().getName()); countDownLatch.await(); - }, KEY_1); - eventBus().register(event -> { + }, KEY_1)).block(); + Mono.from(eventBus().register(event -> { threads.add(Thread.currentThread().getName()); countDownLatch.await(); - }, KEY_1); + }, KEY_1)).block(); eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe(); @@ -120,7 +122,7 @@ public interface KeyContract extends EventBusContract { default void registeredListenersShouldNotReceiveNoopEvents() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); Username bob = Username.of("bob"); MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), bob, MailboxPath.forUser(bob, "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random()); @@ -134,7 +136,7 @@ public interface KeyContract extends EventBusContract { default void registeredListenersShouldReceiveOnlyHandledEvents() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT_UNSUPPORTED_BY_LISTENER, KEY_1).block(); @@ -147,7 +149,7 @@ public interface KeyContract extends EventBusContract { MailboxListener listener = newListener(); doThrow(new RuntimeException()).when(listener).event(any()); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block()) .doesNotThrowAnyException(); @@ -156,7 +158,7 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, NO_KEYS).block(); @@ -167,7 +169,7 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block(); @@ -178,7 +180,7 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldNotifyRegisteredListeners() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -188,7 +190,7 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldNotifyLocalRegisteredListenerWithoutDelay() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -199,8 +201,8 @@ public interface KeyContract extends EventBusContract { default void dispatchShouldNotifyOnlyRegisteredListener() throws Exception { MailboxListener listener = newListener(); MailboxListener listener2 = newListener(); - eventBus().register(listener, KEY_1); - eventBus().register(listener2, KEY_2); + Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener2, KEY_2)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -213,8 +215,8 @@ public interface KeyContract extends EventBusContract { default void dispatchShouldNotifyAllListenersRegisteredOnAKey() throws Exception { MailboxListener listener = newListener(); MailboxListener listener2 = newListener(); - eventBus().register(listener, KEY_1); - eventBus().register(listener2, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener2, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -225,8 +227,8 @@ public interface KeyContract extends EventBusContract { @Test default void registerShouldAllowDuplicatedRegistration() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -236,8 +238,8 @@ public interface KeyContract extends EventBusContract { @Test default void unregisterShouldRemoveDoubleRegisteredListener() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); - eventBus().register(listener, KEY_1).unregister(); + Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener, KEY_1)).block().unregister(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -251,7 +253,7 @@ public interface KeyContract extends EventBusContract { eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never()) .event(any()); @@ -260,8 +262,8 @@ public interface KeyContract extends EventBusContract { @Test default void callingAllUnregisterMethodShouldUnregisterTheListener() throws Exception { MailboxListener listener = newListener(); - Registration registration = eventBus().register(listener, KEY_1); - eventBus().register(listener, KEY_1).unregister(); + Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener, KEY_1)).block().unregister(); registration.unregister(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -273,8 +275,8 @@ public interface KeyContract extends EventBusContract { @Test default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); - eventBus().register(listener, KEY_2).unregister(); + Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener, KEY_2)).block().unregister(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -285,7 +287,7 @@ public interface KeyContract extends EventBusContract { default void unregisterShouldBeIdempotentForKeyRegistrations() { MailboxListener listener = newListener(); - Registration registration = eventBus().register(listener, KEY_1); + Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block(); registration.unregister(); assertThatCode(registration::unregister) @@ -295,7 +297,7 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldAcceptSeveralKeys() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block(); @@ -305,8 +307,8 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1); - eventBus().register(listener, KEY_2); + Mono.from(eventBus().register(listener, KEY_1)).block(); + Mono.from(eventBus().register(listener, KEY_2)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block(); @@ -316,7 +318,7 @@ public interface KeyContract extends EventBusContract { @Test default void dispatchShouldNotNotifyUnregisteredListener() throws Exception { MailboxListener listener = newListener(); - eventBus().register(listener, KEY_1).unregister(); + Mono.from(eventBus().register(listener, KEY_1)).block().unregister(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -329,7 +331,7 @@ public interface KeyContract extends EventBusContract { default void dispatchShouldNotifyAsynchronousListener() throws Exception { MailboxListener listener = newListener(); when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, KEY_1).block(); @@ -356,7 +358,7 @@ public interface KeyContract extends EventBusContract { @Test default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() { EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT)); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, KEY_1).block(); eventBus().dispatch(EVENT_2, KEY_1).block(); @@ -373,8 +375,8 @@ public interface KeyContract extends EventBusContract { when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS); doThrow(new RuntimeException()).when(failingListener).event(any()); - eventBus().register(failingListener, KEY_1); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(failingListener, KEY_1)).block(); + Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -388,7 +390,7 @@ public interface KeyContract extends EventBusContract { default void crossEventBusRegistrationShouldBeAllowed() throws Exception { MailboxListener mailboxListener = newListener(); - eventBus().register(mailboxListener, KEY_1); + Mono.from(eventBus().register(mailboxListener, KEY_1)).block(); eventBus2().dispatch(EVENT, KEY_1).block(); @@ -399,7 +401,7 @@ public interface KeyContract extends EventBusContract { default void unregisteredDistantListenersShouldNotBeNotified() throws Exception { MailboxListener mailboxListener = newListener(); - eventBus().register(mailboxListener, KEY_1).unregister(); + Mono.from(eventBus().register(mailboxListener, KEY_1)).block().unregister(); eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); @@ -412,8 +414,8 @@ public interface KeyContract extends EventBusContract { MailboxListener mailboxListener1 = newListener(); MailboxListener mailboxListener2 = newListener(); - eventBus().register(mailboxListener1, KEY_1); - eventBus2().register(mailboxListener2, KEY_1); + Mono.from(eventBus().register(mailboxListener1, KEY_1)).block(); + Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block(); eventBus2().dispatch(EVENT, KEY_1).block(); @@ -427,7 +429,7 @@ public interface KeyContract extends EventBusContract { eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); - eventBus().register(listener, KEY_1); + Mono.from(eventBus().register(listener, KEY_1)).block(); verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never()) .event(any()); @@ -438,8 +440,8 @@ public interface KeyContract extends EventBusContract { MailboxListener mailboxListener1 = newListener(); MailboxListener mailboxListener2 = newListener(); - eventBus().register(mailboxListener1, KEY_1); - eventBus2().register(mailboxListener2, KEY_1); + Mono.from(eventBus().register(mailboxListener1, KEY_1)).block(); + Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block(); eventBus2().dispatch(EVENT, KEY_1).block(); diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java index 36d9d49..ab7cb2f 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java @@ -55,9 +55,9 @@ public class InVMEventBus implements EventBus { } @Override - public Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) { + public Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) { registrations.put(key, listener); - return () -> registrations.remove(key, listener); + return Mono.just(() -> registrations.remove(key, listener)); } @Override diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index c44be2d..1d5fdd9 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -125,20 +125,25 @@ class KeyRegistrationHandler { .block(); } - Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) { + Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) { LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener); + + return registerIfNeeded(key, registration) + .thenReturn(new KeyRegistration(() -> { + if (registration.unregister().lastListenerRemoved()) { + registrationBinder.unbind(key) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) + .block(); + } + })); + } + + private Mono<Void> registerIfNeeded(RegistrationKey key, LocalListenerRegistry.LocalRegistration registration) { if (registration.isFirstListener()) { - registrationBinder.bind(key) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) - .block(); + return registrationBinder.bind(key) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())); } - return new KeyRegistration(() -> { - if (registration.unregister().lastListenerRemoved()) { - registrationBinder.unbind(key) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) - .block(); - } - }); + return Mono.empty(); } private Mono<Void> handleDelivery(Delivery delivery) { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 196ac8c..a73f1ed 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -121,7 +121,7 @@ public class RabbitMQEventBus implements EventBus, Startable { } @Override - public Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) { + public Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) { Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE); return keyRegistrationHandler.register(listener, key); } diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 3dbbb80..bfbae43 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -372,7 +372,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() { rabbitMQEventBusWithNetWorkIssue.start(); MailboxListener listener = newListener(); - rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1); + Mono.from(rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1)).block(); rabbitMQNetWorkIssueExtension.getRabbitMQ().pause(); @@ -454,7 +454,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception { eventBus.start(); MailboxListener listener = newListener(); - eventBus.register(listener, KEY_1); + Mono.from(eventBus.register(listener, KEY_1)).block(); rabbitMQExtension.getRabbitMQ().restart(); @@ -466,7 +466,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception { eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler(); MailboxListener listener = newAsyncListener(); - eventBusWithKeyHandlerNotStarted.register(listener, KEY_1); + Mono.from(eventBusWithKeyHandlerNotStarted.register(listener, KEY_1)).block(); Mono<Void> dispatch = eventBusWithKeyHandlerNotStarted.dispatch(EVENT, KEY_1); dispatch.block(); @@ -484,7 +484,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, rabbitMQExtension.getRabbitMQ().restart(); - eventBus.register(listener, KEY_1); + Mono.from(eventBus.register(listener, KEY_1)).block(); eventBus.dispatch(EVENT, KEY_1).block(); assertThatListenerReceiveOneEvent(listener); @@ -530,7 +530,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.start(); MailboxListener listener = newListener(); when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); - eventBus.register(listener, KEY_1); + Mono.from(eventBus.register(listener, KEY_1)).block(); rabbitMQExtension.getRabbitMQ().pause(); @@ -558,7 +558,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, rabbitMQExtension.getRabbitMQ().unpause(); - eventBus.register(listener, KEY_1); + Mono.from(eventBus.register(listener, KEY_1)).block(); eventBus.dispatch(EVENT, KEY_1).block(); assertThatListenerReceiveOneEvent(listener); } @@ -757,7 +757,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, void dispatchShouldPersistEventWhenDispatchingWithKeysGetError() { EventCollector eventCollector = eventCollector(); eventBus().register(eventCollector, GROUP_A); - eventBus().register(eventCollector, KEY_1); + Mono.from(eventBus().register(eventCollector, KEY_1)).block(); rabbitMQExtension.getRabbitMQ().pause(); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java index 58b95b4..7681c33 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java @@ -53,6 +53,9 @@ import org.apache.james.util.concurrent.NamedThreadFactory; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> implements CapabilityImplementingProcessor { private static final List<Capability> CAPS = ImmutableList.of(SUPPORTS_IDLE); public static final int DEFAULT_SCHEDULED_POOL_CORE_SIZE = 5; @@ -88,7 +91,9 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme SelectedMailbox sm = session.getSelected(); Registration registration; if (sm != null) { - registration = eventBus.register(new IdleMailboxListener(session, responder), new MailboxIdRegistrationKey(sm.getMailboxId())); + registration = Mono.from(eventBus.register(new IdleMailboxListener(session, responder), new MailboxIdRegistrationKey(sm.getMailboxId()))) + .subscribeOn(Schedulers.elastic()) + .block(); } else { registration = null; } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java index 1aef2bf..5457960 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java @@ -52,6 +52,9 @@ import org.apache.james.mailbox.model.UpdatedFlags; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + /** * Default implementation of {@link SelectedMailbox} */ @@ -89,7 +92,9 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener { mailboxId = messageManager.getId(); - registration = eventBus.register(this, new MailboxIdRegistrationKey(mailboxId)); + registration = Mono.from(eventBus.register(this, new MailboxIdRegistrationKey(mailboxId))) + .subscribeOn(Schedulers.elastic()) + .block(); applicableFlags = messageManager.getApplicableFlags(mailboxSession); try (Stream<MessageUid> stream = messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)) { diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java index f93ffc3..1e0cd54 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Date; -import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -47,6 +46,7 @@ import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.events.EventBus; import org.apache.james.mailbox.events.MailboxIdRegistrationKey; import org.apache.james.mailbox.events.MailboxListener; +import org.apache.james.mailbox.events.Registration; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.MessageMetaData; @@ -62,6 +62,8 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + public class SelectedMailboxImplTest { @@ -152,7 +154,7 @@ public class SelectedMailboxImplTest { }; } - private Answer<Iterator<MessageUid>> generateEmitEventAnswer(AtomicInteger success) { + private Answer<Mono<Registration>> generateEmitEventAnswer(AtomicInteger success) { return invocation -> { Object[] args = invocation.getArguments(); MailboxListener mailboxListener = (MailboxListener) args[0]; @@ -164,7 +166,7 @@ public class SelectedMailboxImplTest { LOGGER.error("Error while processing event on a concurrent thread", e); } }); - return null; + return Mono.just(() -> {}); }; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
