This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1de4fa58380d288852b11d02fd6a6291ec38a665 Author: Rene Cordier <[email protected]> AuthorDate: Fri Jul 10 11:25:01 2020 +0700 JAMES-3305 Invalid EventBus messages needs to be stored in dead letter queue --- .../james/mailbox/events/EventDispatcher.java | 28 +++++++++++++++++-- .../james/mailbox/events/GroupRegistration.java | 5 ++-- .../james/mailbox/events/RabbitMQEventBus.java | 2 ++ .../james/mailbox/events/RabbitMQEventBusTest.java | 32 +++++++++++++++++++++- 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java index c150184..999dc40 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java @@ -20,9 +20,15 @@ package org.apache.james.mailbox.events; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; +import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; +import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; +import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; +import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_QUEUE; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; import java.nio.charset.StandardCharsets; @@ -46,8 +52,10 @@ import com.rabbitmq.client.AMQP; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Sender; import reactor.util.function.Tuples; @@ -83,9 +91,23 @@ public class EventDispatcher { } void start() { - sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME) - .durable(DURABLE) - .type(DIRECT_EXCHANGE)) + Flux.concat( + sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME) + .durable(DURABLE) + .type(DIRECT_EXCHANGE)), + sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME) + .durable(DURABLE) + .type(DIRECT_EXCHANGE)), + sender.declareQueue(QueueSpecification.queue(MAILBOX_EVENT_DEAD_LETTER_QUEUE) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(!AUTO_DELETE) + .arguments(NO_ARGUMENTS)), + sender.bind(BindingSpecification.binding() + .exchange(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME) + .queue(MAILBOX_EVENT_DEAD_LETTER_QUEUE) + .routingKey(EMPTY_ROUTING_KEY))) + .then() .block(); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index d95e6bd..5141ae5 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -23,9 +23,10 @@ import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; +import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; import java.nio.charset.StandardCharsets; @@ -124,7 +125,7 @@ class GroupRegistration implements Registration { .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(NO_ARGUMENTS)), + .arguments(deadLetterQueue(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME))), sender.bind(BindingSpecification.binding() .exchange(MAILBOX_EVENT_EXCHANGE_NAME) .queue(queueName.asString()) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index a73f1ed..29dcc6f 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -40,7 +40,9 @@ public class RabbitMQEventBus implements EventBus, Startable { private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of(); private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running"; static final String MAILBOX_EVENT = "mailboxEvent"; + static final String MAILBOX_EVENT_DEAD_LETTER_QUEUE = MAILBOX_EVENT + "-dead-letter-queue"; static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange"; + static final String MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME = MAILBOX_EVENT + "-dead-letter-exchange"; static final String EVENT_BUS_ID = "eventBusId"; private final EventSerializer eventSerializer; diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 810187c..58d1979 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -37,6 +37,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListen import static org.apache.james.mailbox.events.EventBusTestFixture.newListener; import static org.apache.james.mailbox.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_QUEUE; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -53,6 +54,7 @@ import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -72,6 +74,7 @@ import org.apache.james.mailbox.util.EventCollector; import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.assertj.core.data.Percentage; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -83,6 +86,7 @@ import org.mockito.stubbing.Answer; import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; @@ -212,6 +216,31 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } @Test + void eventProcessingShouldStoreInvalidMessagesInDeadLetterQueue() { + EventCollector listener = new EventCollector(); + EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA(); + eventBus.register(listener, registeredGroup); + + String emptyRoutingKey = ""; + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, + emptyRoutingKey, + "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))) + .block(); + + AtomicInteger deadLetteredCount = new AtomicInteger(); + rabbitMQExtension.getRabbitChannelPool() + .createReceiver() + .consumeAutoAck(MAILBOX_EVENT_DEAD_LETTER_QUEUE) + .doOnNext(next -> deadLetteredCount.incrementAndGet()) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + + Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1)); + } + + @Test void deserializeEventCollectorGroup() throws Exception { assertThat(Group.deserialize("org.apache.james.mailbox.util.EventCollector$EventCollectorGroup")) .isEqualTo(new EventCollector.EventCollectorGroup()); @@ -727,7 +756,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBusWithKeyHandlerNotStarted.stop(); assertThat(rabbitManagementAPI.listQueues()) - .filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX)) + .filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX) + && !queue.getName().startsWith(MAILBOX_EVENT_DEAD_LETTER_QUEUE)) .isEmpty(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
