This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ee069651973f2af439c4e5de64533975113d56ef
Author: Tran Tien Duc <[email protected]>
AuthorDate: Wed Apr 8 17:32:20 2020 +0700

    JAMES-3139 RabbitMQEventBus error dispatching handling
---
 .../james/mailbox/events/EventDispatcher.java      | 60 ++++++++++++++++---
 .../james/mailbox/events/RabbitMQEventBus.java     |  4 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 70 +++++++++++++++-------
 3 files changed, 101 insertions(+), 33 deletions(-)

diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 239d716..c56e02b 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -26,8 +26,9 @@ import static 
org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
 import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
-import java.util.stream.Stream;
 
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.events.RoutingKeyConverter.RoutingKey;
@@ -37,6 +38,7 @@ import org.apache.james.util.StructuredLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.rabbitmq.client.AMQP;
@@ -50,7 +52,11 @@ import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.Sender;
 import reactor.util.function.Tuples;
 
-class EventDispatcher {
+public class EventDispatcher {
+    public static class DispatchingFailureGroup extends Group {
+        public static DispatchingFailureGroup INSTANCE = new 
DispatchingFailureGroup();
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EventDispatcher.class);
 
     private final EventSerializer eventSerializer;
@@ -58,8 +64,12 @@ class EventDispatcher {
     private final LocalListenerRegistry localListenerRegistry;
     private final AMQP.BasicProperties basicProperties;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    private final EventDeadLetters deadLetters;
 
-    EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, 
Sender sender, LocalListenerRegistry localListenerRegistry, 
MailboxListenerExecutor mailboxListenerExecutor) {
+    EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, 
Sender sender,
+                    LocalListenerRegistry localListenerRegistry,
+                    MailboxListenerExecutor mailboxListenerExecutor,
+                    EventDeadLetters deadLetters) {
         this.eventSerializer = eventSerializer;
         this.sender = sender;
         this.localListenerRegistry = localListenerRegistry;
@@ -70,6 +80,7 @@ class EventDispatcher {
             .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
             .build();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
+        this.deadLetters = deadLetters;
     }
 
     void start() {
@@ -83,7 +94,7 @@ class EventDispatcher {
         return Flux
             .concat(
                 dispatchToLocalListeners(event, keys),
-                dispatchToRemoteListeners(serializeEvent(event), keys))
+                dispatchToRemoteListeners(event, keys))
             .subscribeOn(Schedulers.elastic())
             .doOnError(throwable -> LOGGER.error("error while dispatching 
event", throwable))
             .then()
@@ -123,13 +134,44 @@ class EventDispatcher {
             .addField(EventBus.StructuredLoggingFields.REGISTRATION_KEYS, 
keys);
     }
 
-    private Mono<Void> dispatchToRemoteListeners(byte[] serializedEvent, 
Set<RegistrationKey> keys) {
-        Stream<RoutingKey> routingKeys = 
Stream.concat(Stream.of(RoutingKey.empty()), keys.stream().map(RoutingKey::of));
+    private Mono<Void> dispatchToRemoteListeners(Event event, 
Set<RegistrationKey> keys) {
+        return Mono.fromCallable(() -> serializeEvent(event))
+            .flatMap(serializedEvent -> Mono.zipDelayError(
+                remoteGroupsDispatch(serializedEvent, event),
+                remoteKeysDispatch(serializedEvent, keys)))
+            .then();
+    }
 
-        Stream<OutboundMessage> outboundMessages = routingKeys
-            .map(routingKey -> new 
OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), 
basicProperties, serializedEvent));
+    private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, Event 
event) {
+        return remoteDispatch(serializedEvent, 
Collections.singletonList(RoutingKey.empty()))
+            .doOnError(ex -> LOGGER.error(
+                "cannot dispatch event of type '{}' belonging '{}' with id 
'{}' to remote groups, store it into dead letter",
+                event.getClass().getSimpleName(),
+                event.getUsername().asString(),
+                event.getEventId().getId(),
+                ex))
+            .onErrorResume(ex -> 
deadLetters.store(DispatchingFailureGroup.INSTANCE, event)
+                .then(Mono.error(ex)));
+    }
+
+    private Mono<Void> remoteKeysDispatch(byte[] serializedEvent, 
Set<RegistrationKey> keys) {
+        return remoteDispatch(serializedEvent,
+            keys.stream()
+                .map(RoutingKey::of)
+                .collect(Guavate.toImmutableList()));
+    }
+
+    private Mono<Void> remoteDispatch(byte[] serializedEvent, 
Collection<RoutingKey> routingKeys) {
+        if (routingKeys.isEmpty()) {
+            return Mono.empty();
+        }
+        return sender.send(toMessages(serializedEvent, routingKeys))
+            .subscribeOn(Schedulers.elastic());
+    }
 
-        return sender.send(Flux.fromStream(outboundMessages));
+    private Flux<OutboundMessage> toMessages(byte[] serializedEvent, 
Collection<RoutingKey> routingKeys) {
+        return Flux.fromIterable(routingKeys)
+                .map(routingKey -> new 
OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), 
basicProperties, serializedEvent));
     }
 
     private byte[] serializeEvent(Event event) {
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index f823ff1..311e789 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -79,7 +79,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
             LocalListenerRegistry localListenerRegistry = new 
LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, 
eventSerializer, sender, receiverProvider, routingKeyConverter, 
localListenerRegistry, mailboxListenerExecutor, retryBackoff);
             groupRegistrationHandler = new 
GroupRegistrationHandler(eventSerializer, sender, receiverProvider, 
retryBackoff, eventDeadLetters, mailboxListenerExecutor);
-            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, 
sender, localListenerRegistry, mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, 
sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters);
 
             eventDispatcher.start();
             keyRegistrationHandler.start();
@@ -94,7 +94,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
             LocalListenerRegistry localListenerRegistry = new 
LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, 
eventSerializer, sender, receiverProvider, routingKeyConverter, 
localListenerRegistry, mailboxListenerExecutor, retryBackoff);
             groupRegistrationHandler = new 
GroupRegistrationHandler(eventSerializer, sender, receiverProvider, 
retryBackoff, eventDeadLetters, mailboxListenerExecutor);
-            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, 
sender, localListenerRegistry, mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, 
sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters);
 
             keyRegistrationHandler.declareQueue();
 
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index f462fba..abbfdf7 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -28,7 +28,9 @@ import static 
org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static 
org.apache.james.mailbox.events.EventBusConcurrentTestContract.newCountingListener;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_B;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static 
org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListener;
@@ -51,6 +53,7 @@ import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import 
org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy;
@@ -60,6 +63,7 @@ import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
 import 
org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
+import org.apache.james.mailbox.events.EventDispatcher.DispatchingFailureGroup;
 import org.apache.james.mailbox.model.TestId;
 import org.apache.james.mailbox.model.TestMessageId;
 import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver;
@@ -732,51 +736,73 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
 
             rabbitMQExtension.getRabbitMQ().pause();
 
-            try {
-                eventBus().dispatch(EVENT, NO_KEYS).block();
-            } catch (Exception e) {
-                // ignore
-            }
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
 
-            getSpeedProfile().longWaitCondition()
-                .untilAsserted(() -> 
assertThat(eventCollector.getEvents()).isEmpty());
+            assertThat(eventCollector.getEvents()).isEmpty();
         }
 
         @Test
-        void dispatchShouldNotPersistEventWhenDispatchingNoKeyGetError() {
+        void dispatchShouldPersistEventWhenDispatchingNoKeyGetError() {
             EventCollector eventCollector = eventCollector();
             eventBus().register(eventCollector, GROUP_A);
 
             rabbitMQExtension.getRabbitMQ().pause();
 
-            try {
-                eventBus().dispatch(EVENT, NO_KEYS).block();
-            } catch (Exception e) {
-                // ignore
-            }
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
 
-            getSpeedProfile().longWaitCondition()
-                .untilAsserted(() ->
-                    
assertThat(deadLetter().containEvents().block()).isFalse());
+            assertThat(dispatchingFailureEvents()).containsOnly(EVENT);
         }
 
         @Test
-        void dispatchShouldNotPersistEventWhenDispatchingWithKeysGetError() {
+        void dispatchShouldPersistEventWhenDispatchingWithKeysGetError() {
             EventCollector eventCollector = eventCollector();
             eventBus().register(eventCollector, GROUP_A);
             eventBus().register(eventCollector, KEY_1);
 
             rabbitMQExtension.getRabbitMQ().pause();
 
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+
+            assertThat(dispatchingFailureEvents()).containsOnly(EVENT);
+        }
+
+        @Test
+        void 
dispatchShouldPersistOnlyOneEventWhenDispatchingMultiGroupsGetError() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+            eventBus().register(eventCollector, GROUP_B);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+
+            assertThat(dispatchingFailureEvents()).containsOnly(EVENT);
+        }
+
+        @Test
+        void 
dispatchShouldPersistEventsWhenDispatchingGroupsGetErrorMultipleTimes() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            doQuietly(() -> eventBus().dispatch(EVENT_2, NO_KEYS).block());
+
+            assertThat(dispatchingFailureEvents()).containsExactly(EVENT, 
EVENT_2);
+        }
+
+        private Stream<Event> dispatchingFailureEvents() {
+            return deadLetter().failedIds(DispatchingFailureGroup.INSTANCE)
+                .flatMap(insertionId -> 
deadLetter().failedEvent(DispatchingFailureGroup.INSTANCE, insertionId))
+                .toStream();
+        }
+
+        private void doQuietly(Runnable runnable) {
             try {
-                eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+                runnable.run();
             } catch (Exception e) {
                 // ignore
             }
-
-            getSpeedProfile().longWaitCondition()
-                .untilAsserted(() ->
-                    
assertThat(deadLetter().containEvents().block()).isFalse());
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to