This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ee069651973f2af439c4e5de64533975113d56ef Author: Tran Tien Duc <[email protected]> AuthorDate: Wed Apr 8 17:32:20 2020 +0700 JAMES-3139 RabbitMQEventBus error dispatching handling --- .../james/mailbox/events/EventDispatcher.java | 60 ++++++++++++++++--- .../james/mailbox/events/RabbitMQEventBus.java | 4 +- .../james/mailbox/events/RabbitMQEventBusTest.java | 70 +++++++++++++++------- 3 files changed, 101 insertions(+), 33 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index 239d716..c56e02b 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -26,8 +26,9 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; import java.util.Set; -import java.util.stream.Stream; import org.apache.james.event.json.EventSerializer; import org.apache.james.mailbox.events.RoutingKeyConverter.RoutingKey; @@ -37,6 +38,7 @@ import org.apache.james.util.StructuredLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.rabbitmq.client.AMQP; @@ -50,7 +52,11 @@ import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.Sender; import reactor.util.function.Tuples; -class EventDispatcher { +public class EventDispatcher { + public static class DispatchingFailureGroup extends Group { + public static DispatchingFailureGroup INSTANCE = new DispatchingFailureGroup(); + } + private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class); private final EventSerializer eventSerializer; @@ -58,8 +64,12 @@ class EventDispatcher { private final LocalListenerRegistry localListenerRegistry; private final AMQP.BasicProperties basicProperties; private final MailboxListenerExecutor mailboxListenerExecutor; + private final EventDeadLetters deadLetters; - EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) { + EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, + LocalListenerRegistry localListenerRegistry, + MailboxListenerExecutor mailboxListenerExecutor, + EventDeadLetters deadLetters) { this.eventSerializer = eventSerializer; this.sender = sender; this.localListenerRegistry = localListenerRegistry; @@ -70,6 +80,7 @@ class EventDispatcher { .contentType(PERSISTENT_TEXT_PLAIN.getContentType()) .build(); this.mailboxListenerExecutor = mailboxListenerExecutor; + this.deadLetters = deadLetters; } void start() { @@ -83,7 +94,7 @@ class EventDispatcher { return Flux .concat( dispatchToLocalListeners(event, keys), - dispatchToRemoteListeners(serializeEvent(event), keys)) + dispatchToRemoteListeners(event, keys)) .subscribeOn(Schedulers.elastic()) .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable)) .then() @@ -123,13 +134,44 @@ class EventDispatcher { .addField(EventBus.StructuredLoggingFields.REGISTRATION_KEYS, keys); } - private Mono<Void> dispatchToRemoteListeners(byte[] serializedEvent, Set<RegistrationKey> keys) { - Stream<RoutingKey> routingKeys = Stream.concat(Stream.of(RoutingKey.empty()), keys.stream().map(RoutingKey::of)); + private Mono<Void> dispatchToRemoteListeners(Event event, Set<RegistrationKey> keys) { + return Mono.fromCallable(() -> serializeEvent(event)) + .flatMap(serializedEvent -> Mono.zipDelayError( + remoteGroupsDispatch(serializedEvent, event), + remoteKeysDispatch(serializedEvent, keys))) + .then(); + } - Stream<OutboundMessage> outboundMessages = routingKeys - .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent)); + private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, Event event) { + return remoteDispatch(serializedEvent, Collections.singletonList(RoutingKey.empty())) + .doOnError(ex -> LOGGER.error( + "cannot dispatch event of type '{}' belonging '{}' with id '{}' to remote groups, store it into dead letter", + event.getClass().getSimpleName(), + event.getUsername().asString(), + event.getEventId().getId(), + ex)) + .onErrorResume(ex -> deadLetters.store(DispatchingFailureGroup.INSTANCE, event) + .then(Mono.error(ex))); + } + + private Mono<Void> remoteKeysDispatch(byte[] serializedEvent, Set<RegistrationKey> keys) { + return remoteDispatch(serializedEvent, + keys.stream() + .map(RoutingKey::of) + .collect(Guavate.toImmutableList())); + } + + private Mono<Void> remoteDispatch(byte[] serializedEvent, Collection<RoutingKey> routingKeys) { + if (routingKeys.isEmpty()) { + return Mono.empty(); + } + return sender.send(toMessages(serializedEvent, routingKeys)) + .subscribeOn(Schedulers.elastic()); + } - return sender.send(Flux.fromStream(outboundMessages)); + private Flux<OutboundMessage> toMessages(byte[] serializedEvent, Collection<RoutingKey> routingKeys) { + return Flux.fromIterable(routingKeys) + .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent)); } private byte[] serializeEvent(Event event) { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index f823ff1..311e789 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -79,7 +79,7 @@ public class RabbitMQEventBus implements EventBus, Startable { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff); groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); - eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor); + eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters); eventDispatcher.start(); keyRegistrationHandler.start(); @@ -94,7 +94,7 @@ public class RabbitMQEventBus implements EventBus, Startable { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff); groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); - eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor); + eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters); keyRegistrationHandler.declareQueue(); diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index f462fba..abbfdf7 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -28,7 +28,9 @@ import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.newCountingListener; import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT; +import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2; import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A; +import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_B; import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; import static org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListener; @@ -51,6 +53,7 @@ import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy; @@ -60,6 +63,7 @@ import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.event.json.EventSerializer; import org.apache.james.mailbox.events.EventBusTestFixture.GroupA; import org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution; +import org.apache.james.mailbox.events.EventDispatcher.DispatchingFailureGroup; import org.apache.james.mailbox.model.TestId; import org.apache.james.mailbox.model.TestMessageId; import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver; @@ -732,51 +736,73 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, rabbitMQExtension.getRabbitMQ().pause(); - try { - eventBus().dispatch(EVENT, NO_KEYS).block(); - } catch (Exception e) { - // ignore - } + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); - getSpeedProfile().longWaitCondition() - .untilAsserted(() -> assertThat(eventCollector.getEvents()).isEmpty()); + assertThat(eventCollector.getEvents()).isEmpty(); } @Test - void dispatchShouldNotPersistEventWhenDispatchingNoKeyGetError() { + void dispatchShouldPersistEventWhenDispatchingNoKeyGetError() { EventCollector eventCollector = eventCollector(); eventBus().register(eventCollector, GROUP_A); rabbitMQExtension.getRabbitMQ().pause(); - try { - eventBus().dispatch(EVENT, NO_KEYS).block(); - } catch (Exception e) { - // ignore - } + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); - getSpeedProfile().longWaitCondition() - .untilAsserted(() -> - assertThat(deadLetter().containEvents().block()).isFalse()); + assertThat(dispatchingFailureEvents()).containsOnly(EVENT); } @Test - void dispatchShouldNotPersistEventWhenDispatchingWithKeysGetError() { + void dispatchShouldPersistEventWhenDispatchingWithKeysGetError() { EventCollector eventCollector = eventCollector(); eventBus().register(eventCollector, GROUP_A); eventBus().register(eventCollector, KEY_1); rabbitMQExtension.getRabbitMQ().pause(); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + + assertThat(dispatchingFailureEvents()).containsOnly(EVENT); + } + + @Test + void dispatchShouldPersistOnlyOneEventWhenDispatchingMultiGroupsGetError() { + EventCollector eventCollector = eventCollector(); + eventBus().register(eventCollector, GROUP_A); + eventBus().register(eventCollector, GROUP_B); + + rabbitMQExtension.getRabbitMQ().pause(); + + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + + assertThat(dispatchingFailureEvents()).containsOnly(EVENT); + } + + @Test + void dispatchShouldPersistEventsWhenDispatchingGroupsGetErrorMultipleTimes() { + EventCollector eventCollector = eventCollector(); + eventBus().register(eventCollector, GROUP_A); + + rabbitMQExtension.getRabbitMQ().pause(); + doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block()); + doQuietly(() -> eventBus().dispatch(EVENT_2, NO_KEYS).block()); + + assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT_2); + } + + private Stream<Event> dispatchingFailureEvents() { + return deadLetter().failedIds(DispatchingFailureGroup.INSTANCE) + .flatMap(insertionId -> deadLetter().failedEvent(DispatchingFailureGroup.INSTANCE, insertionId)) + .toStream(); + } + + private void doQuietly(Runnable runnable) { try { - eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); + runnable.run(); } catch (Exception e) { // ignore } - - getSpeedProfile().longWaitCondition() - .untilAsserted(() -> - assertThat(deadLetter().containEvents().block()).isFalse()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
