This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 25fa48948d8457e7ca38ddd6cd9c4193bcf85db5 Author: Benoit Tellier <[email protected]> AuthorDate: Tue Sep 22 11:52:50 2020 +0700 JAMES-3305 James do not start if eventBus workQueues don't rely on dead-letter Difficulties encountered: - The reactive AMQP java client do not allow to see if a queue already exist. We need to actually try to create it, and resume the error to handle creation errors. - The error causes the entire channel (sender) to be crashed. As such, the potentially erroneous createQueue needs to be performed in a separate channel. --- .../rabbitmq/ReactorRabbitMQChannelPool.java | 28 +++++++ .../james/mailbox/events/GroupRegistration.java | 17 +++-- .../mailbox/events/GroupRegistrationHandler.java | 7 +- .../james/mailbox/events/RabbitMQEventBus.java | 9 ++- ...RabbitMQEventBusDeadLetterQueueUpgradeTest.java | 87 ++++++++++++++++++++++ .../james/mailbox/events/RabbitMQEventBusTest.java | 5 +- .../rabbitmq/host/RabbitMQEventBusHostSystem.java | 3 +- 7 files changed, 141 insertions(+), 15 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index 913d84d..1de99d3 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -37,13 +37,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; +import com.google.common.base.Preconditions; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ShutdownSignalException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ChannelPool; +import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.ReceiverOptions; @@ -236,6 +241,29 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { connectionMono.map(Throwing.function(Connection::createChannel)).cache())); } + public Mono<Void> createWorkQueue(QueueSpecification queueSpecification, BindingSpecification bindingSpecification) { + Preconditions.checkArgument(queueSpecification.getName() != null, "WorkQueue pattern do not make sense for unnamed queues"); + Preconditions.checkArgument(queueSpecification.getName().equals(bindingSpecification.getQueue()), + "Binding needs to be targetting the created queue %s instead of %s", + queueSpecification.getName(), bindingSpecification.getQueue()); + + return Flux.concat( + Mono.using(this::createSender, + managementSender -> managementSender.declareQueue(queueSpecification), + Sender::close) + .onErrorResume( + e -> e instanceof ShutdownSignalException + && e.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue"), + e -> { + LOGGER.warn("{} already exists without dead-letter setup. Dead lettered messages to it will be lost. " + + "To solve this, re-create the queue with the x-dead-letter-exchange argument set up.", + queueSpecification.getName()); + return Mono.empty(); + }), + sender.bind(bindingSpecification)) + .then(); + } + private void invalidateObject(Channel channel) { try { pool.invalidateObject(channel); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 5141ae5..99c5cb9 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.event.json.EventSerializer; import org.apache.james.util.MDCBuilder; @@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import reactor.core.Disposable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.AcknowledgableDelivery; @@ -78,6 +78,7 @@ class GroupRegistration implements Registration { static final String RETRY_COUNT = "retry-count"; static final int DEFAULT_RETRY_COUNT = 0; + private final ReactorRabbitMQChannelPool channelPool; private final MailboxListener.ReactiveMailboxListener mailboxListener; private final WorkQueueName queueName; private final Receiver receiver; @@ -91,10 +92,11 @@ class GroupRegistration implements Registration { private final MailboxListenerExecutor mailboxListenerExecutor; private Optional<Disposable> receiverSubscriber; - GroupRegistration(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, + GroupRegistration(ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, MailboxListener.ReactiveMailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters, Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) { + this.channelPool = channelPool; this.eventSerializer = eventSerializer; this.mailboxListener = mailboxListener; this.queueName = WorkQueueName.of(group); @@ -120,17 +122,16 @@ class GroupRegistration implements Registration { } private Mono<Void> createGroupWorkQueue() { - return Flux.concat( - sender.declareQueue(QueueSpecification.queue(queueName.asString()) + return channelPool.createWorkQueue( + QueueSpecification.queue(queueName.asString()) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(deadLetterQueue(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME))), - sender.bind(BindingSpecification.binding() + .arguments(deadLetterQueue(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)), + BindingSpecification.binding() .exchange(MAILBOX_EVENT_EXCHANGE_NAME) .queue(queueName.asString()) - .routingKey(EMPTY_ROUTING_KEY))) - .then(); + .routingKey(EMPTY_ROUTING_KEY)); } private Disposable consumeWorkQueue() { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java index 3f729e0..251b375 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.event.json.EventSerializer; @@ -31,16 +32,18 @@ import reactor.rabbitmq.Sender; class GroupRegistrationHandler { private final Map<Group, GroupRegistration> groupRegistrations; private final EventSerializer eventSerializer; + private final ReactorRabbitMQChannelPool channelPool; private final Sender sender; private final ReceiverProvider receiverProvider; private final RetryBackoffConfiguration retryBackoff; private final EventDeadLetters eventDeadLetters; private final MailboxListenerExecutor mailboxListenerExecutor; - GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, + GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) { this.eventSerializer = eventSerializer; + this.channelPool = channelPool; this.sender = sender; this.receiverProvider = receiverProvider; this.retryBackoff = retryBackoff; @@ -71,7 +74,7 @@ class GroupRegistrationHandler { private GroupRegistration newGroupRegistration(MailboxListener.ReactiveMailboxListener listener, Group group) { return new GroupRegistration( - sender, + channelPool, sender, receiverProvider, eventSerializer, listener, diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 29dcc6f..5fb8be6 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -24,6 +24,7 @@ import java.util.Set; import javax.annotation.PreDestroy; import javax.inject.Inject; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.event.json.EventSerializer; import org.apache.james.lifecycle.api.Startable; @@ -53,6 +54,7 @@ public class RabbitMQEventBus implements EventBus, Startable { private final MailboxListenerExecutor mailboxListenerExecutor; private final Sender sender; private final ReceiverProvider receiverProvider; + private final ReactorRabbitMQChannelPool channelPool; private volatile boolean isRunning; private volatile boolean isStopping; @@ -64,10 +66,11 @@ public class RabbitMQEventBus implements EventBus, Startable { public RabbitMQEventBus(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, RetryBackoffConfiguration retryBackoff, RoutingKeyConverter routingKeyConverter, - EventDeadLetters eventDeadLetters, MetricFactory metricFactory) { + EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool) { this.sender = sender; this.receiverProvider = receiverProvider; this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory); + this.channelPool = channelPool; this.eventBusId = EventBusId.random(); this.eventSerializer = eventSerializer; this.routingKeyConverter = routingKeyConverter; @@ -82,7 +85,7 @@ public class RabbitMQEventBus implements EventBus, Startable { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff); - groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); + groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters); eventDispatcher.start(); @@ -97,7 +100,7 @@ public class RabbitMQEventBus implements EventBus, Startable { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff); - groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); + groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters); keyRegistrationHandler.declareQueue(); diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java new file mode 100644 index 0000000..89c64cc --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java @@ -0,0 +1,87 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; +import static org.apache.james.backends.rabbitmq.Constants.DURABLE; +import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; +import static org.apache.james.mailbox.events.EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION; +import static org.assertj.core.api.Assertions.assertThatCode; + +import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.event.json.EventSerializer; +import org.apache.james.mailbox.events.EventBusTestFixture.GroupA; +import org.apache.james.mailbox.events.GroupRegistration.WorkQueueName; +import org.apache.james.mailbox.model.TestId; +import org.apache.james.mailbox.model.TestMessageId; +import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver; +import org.apache.james.mailbox.util.EventCollector; +import org.apache.james.metrics.tests.RecordingMetricFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import reactor.rabbitmq.QueueSpecification; + +class RabbitMQEventBusDeadLetterQueueUpgradeTest { + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ() + .isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK); + + private RabbitMQEventBus eventBus; + + @BeforeEach + void setUp() { + MemoryEventDeadLetters memoryEventDeadLetters = new MemoryEventDeadLetters(); + + TestId.Factory mailboxIdFactory = new TestId.Factory(); + EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer()); + RoutingKeyConverter routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)); + + eventBus = new RabbitMQEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), + eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, + memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool()); + + eventBus.start(); + } + + @AfterEach + void tearDown() { + eventBus.stop(); + } + + @Test + void eventBusShouldStartWhenDeadLetterUpgradeWasNotPerformed() { + GroupA registeredGroup = new GroupA(); + WorkQueueName workQueueName = WorkQueueName.of(registeredGroup); + + rabbitMQExtension.getSender() + .declareQueue(QueueSpecification.queue(workQueueName.asString()) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(!AUTO_DELETE)) + .block(); + + assertThatCode(eventBus::start).doesNotThrowAnyException(); + assertThatCode(() -> eventBus.register(new EventCollector(), registeredGroup)).doesNotThrowAnyException(); + } + +} diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 78079d5..80650a9 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -154,7 +154,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } private RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider receiverProvider) { - return new RabbitMQEventBus(sender, receiverProvider, eventSerializer, EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory()); + return new RabbitMQEventBus(sender, receiverProvider, eventSerializer, + EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, + memoryEventDeadLetters, new RecordingMetricFactory(), + rabbitMQExtension.getRabbitChannelPool()); } @Override diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java index 70128f3..7923b37 100644 --- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java +++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java @@ -112,7 +112,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory))); return new RabbitMQEventBus(reactorRabbitMQChannelPool.getSender(), reactorRabbitMQChannelPool::createReceiver, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(), - new RecordingMetricFactory()); + new RecordingMetricFactory(), + reactorRabbitMQChannelPool); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
