This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5801a7947c58ec6d8d7a2f7755fd86b1d4ad0929 Author: Tran Tien Duc <[email protected]> AuthorDate: Thu Apr 9 12:41:53 2020 +0700 JAMES-3139 reDeliver() DispatchingFailureGroup should deliver events to all groups --- .../james/mailbox/events/RabbitMQEventBus.java | 14 +++++ .../james/mailbox/events/RabbitMQEventBusTest.java | 68 ++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 311e789..5cc92e8 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -31,11 +31,13 @@ import org.apache.james.metrics.api.MetricFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Mono; import reactor.rabbitmq.Sender; public class RabbitMQEventBus implements EventBus, Startable { + private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of(); private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running"; static final String MAILBOX_EVENT = "mailboxEvent"; static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange"; @@ -143,6 +145,18 @@ public class RabbitMQEventBus implements EventBus, Startable { public Mono<Void> reDeliver(Group group, Event event) { Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE); if (!event.isNoop()) { + /* + if the eventBus.dispatch() gets error while dispatching an event (rabbitMQ network outage maybe), + which means all the group consumers will not be receiving that event. + + We store the that event in the dead letter and expecting in the future, it will be dispatched + again not only for a specific consumer but all. + + That's why it is special, and we need to check event type before processing further. + */ + if (group instanceof EventDispatcher.DispatchingFailureGroup) { + return eventDispatcher.dispatch(event, NO_KEY); + } return groupRegistrationHandler.retrieveGroupRegistration(group).reDeliver(event); } return Mono.empty(); diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index abbfdf7..3dbbb80 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -791,6 +791,74 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT_2); } + @Test + void dispatchShouldPersistEventsWhenDispatchingTheSameEventGetErrorMultipleTimes() { + EventCollector eventCollector = eventCollector(); + eventBus().register(eventCollector, GROUP_A); + + rabbitMQExtension.getRabbitMQ().pause(); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + + assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT); + } + + @Test + void reDeliverShouldDeliverToAllGroupsWhenDispatchingFailure() { + EventCollector eventCollector = eventCollector(); + eventBus().register(eventCollector, GROUP_A); + + EventCollector eventCollector2 = eventCollector(); + eventBus().register(eventCollector2, GROUP_B); + + rabbitMQExtension.getRabbitMQ().pause(); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + rabbitMQExtension.getRabbitMQ().unpause(); + dispatchingFailureEvents() + .forEach(event -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, event).block()); + + getSpeedProfile().shortWaitCondition() + .untilAsserted(() -> assertThat(eventCollector.getEvents()) + .hasSameElementsAs(eventCollector2.getEvents()) + .containsExactly(EVENT)); + } + + @Test + void reDeliverShouldAddEventInDeadLetterWhenGettingError() { + EventCollector eventCollector = eventCollector(); + eventBus().register(eventCollector, GROUP_A); + + rabbitMQExtension.getRabbitMQ().pause(); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + getSpeedProfile().longWaitCondition() + .until(() -> deadLetter().containEvents().block()); + + doQuietly(() -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block()); + rabbitMQExtension.getRabbitMQ().unpause(); + + getSpeedProfile().shortWaitCondition() + .untilAsserted(() -> assertThat(dispatchingFailureEvents()) + .containsExactly(EVENT, EVENT)); + } + + @Test + void reDeliverShouldNotStoreEventInAnotherGroupWhenGettingError() { + EventCollector eventCollector = eventCollector(); + eventBus().register(eventCollector, GROUP_A); + + rabbitMQExtension.getRabbitMQ().pause(); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + getSpeedProfile().longWaitCondition() + .until(() -> deadLetter().containEvents().block()); + + doQuietly(() -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block()); + rabbitMQExtension.getRabbitMQ().unpause(); + + getSpeedProfile().shortWaitCondition() + .untilAsserted(() -> assertThat(deadLetter().groupsWithFailedEvents().toStream()) + .hasOnlyElementsOfType(DispatchingFailureGroup.class)); + } + private Stream<Event> dispatchingFailureEvents() { return deadLetter().failedIds(DispatchingFailureGroup.INSTANCE) .flatMap(insertionId -> deadLetter().failedEvent(DispatchingFailureGroup.INSTANCE, insertionId)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
