This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5801a7947c58ec6d8d7a2f7755fd86b1d4ad0929
Author: Tran Tien Duc <[email protected]>
AuthorDate: Thu Apr 9 12:41:53 2020 +0700

    JAMES-3139 reDeliver() DispatchingFailureGroup should deliver events to all 
groups
---
 .../james/mailbox/events/RabbitMQEventBus.java     | 14 +++++
 .../james/mailbox/events/RabbitMQEventBusTest.java | 68 ++++++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 311e789..5cc92e8 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -31,11 +31,13 @@ import org.apache.james.metrics.api.MetricFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.Sender;
 
 public class RabbitMQEventBus implements EventBus, Startable {
+    private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not 
running";
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + 
"-exchange";
@@ -143,6 +145,18 @@ public class RabbitMQEventBus implements EventBus, 
Startable {
     public Mono<Void> reDeliver(Group group, Event event) {
         Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
         if (!event.isNoop()) {
+            /*
+            if the eventBus.dispatch() gets error while dispatching an event 
(rabbitMQ network outage maybe),
+            which means all the group consumers will not be receiving that 
event.
+
+            We store the that event in the dead letter and expecting in the 
future, it will be dispatched
+            again not only for a specific consumer but all.
+
+            That's why it is special, and we need to check event type before 
processing further.
+            */
+            if (group instanceof EventDispatcher.DispatchingFailureGroup) {
+                return eventDispatcher.dispatch(event, NO_KEY);
+            }
             return 
groupRegistrationHandler.retrieveGroupRegistration(group).reDeliver(event);
         }
         return Mono.empty();
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index abbfdf7..3dbbb80 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -791,6 +791,74 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
             assertThat(dispatchingFailureEvents()).containsExactly(EVENT, 
EVENT_2);
         }
 
+        @Test
+        void 
dispatchShouldPersistEventsWhenDispatchingTheSameEventGetErrorMultipleTimes() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+
+            assertThat(dispatchingFailureEvents()).containsExactly(EVENT, 
EVENT);
+        }
+
+        @Test
+        void reDeliverShouldDeliverToAllGroupsWhenDispatchingFailure() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            EventCollector eventCollector2 = eventCollector();
+            eventBus().register(eventCollector2, GROUP_B);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            rabbitMQExtension.getRabbitMQ().unpause();
+            dispatchingFailureEvents()
+                .forEach(event -> 
eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, event).block());
+
+            getSpeedProfile().shortWaitCondition()
+                .untilAsserted(() -> assertThat(eventCollector.getEvents())
+                    .hasSameElementsAs(eventCollector2.getEvents())
+                    .containsExactly(EVENT));
+        }
+
+        @Test
+        void reDeliverShouldAddEventInDeadLetterWhenGettingError() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            getSpeedProfile().longWaitCondition()
+                .until(() -> deadLetter().containEvents().block());
+
+            doQuietly(() -> 
eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block());
+            rabbitMQExtension.getRabbitMQ().unpause();
+
+            getSpeedProfile().shortWaitCondition()
+                .untilAsserted(() -> assertThat(dispatchingFailureEvents())
+                    .containsExactly(EVENT, EVENT));
+        }
+
+        @Test
+        void reDeliverShouldNotStoreEventInAnotherGroupWhenGettingError() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            getSpeedProfile().longWaitCondition()
+                .until(() -> deadLetter().containEvents().block());
+
+            doQuietly(() -> 
eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block());
+            rabbitMQExtension.getRabbitMQ().unpause();
+
+            getSpeedProfile().shortWaitCondition()
+                .untilAsserted(() -> 
assertThat(deadLetter().groupsWithFailedEvents().toStream())
+                    .hasOnlyElementsOfType(DispatchingFailureGroup.class));
+        }
+
         private Stream<Event> dispatchingFailureEvents() {
             return deadLetter().failedIds(DispatchingFailureGroup.INSTANCE)
                 .flatMap(insertionId -> 
deadLetter().failedEvent(DispatchingFailureGroup.INSTANCE, insertionId))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to