This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 25fa48948d8457e7ca38ddd6cd9c4193bcf85db5
Author: Benoit Tellier <[email protected]>
AuthorDate: Tue Sep 22 11:52:50 2020 +0700

    JAMES-3305 James do not start if eventBus workQueues don't rely on 
dead-letter
    
    Difficulties encountered:
    
     - The reactive AMQP java client do not allow to see if a queue already 
exist. We need
     to actually try to create it, and resume the error to handle creation 
errors.
     - The error causes the entire channel (sender) to be crashed. As such, the 
potentially
     erroneous createQueue needs to be performed in a separate channel.
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 28 +++++++
 .../james/mailbox/events/GroupRegistration.java    | 17 +++--
 .../mailbox/events/GroupRegistrationHandler.java   |  7 +-
 .../james/mailbox/events/RabbitMQEventBus.java     |  9 ++-
 ...RabbitMQEventBusDeadLetterQueueUpgradeTest.java | 87 ++++++++++++++++++++++
 .../james/mailbox/events/RabbitMQEventBusTest.java |  5 +-
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  |  3 +-
 7 files changed, 141 insertions(+), 15 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 913d84d..1de99d3 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -37,13 +37,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ShutdownSignalException;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.SignalType;
 import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ChannelPool;
+import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.ReceiverOptions;
@@ -236,6 +241,29 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
                
connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
     }
 
+    public Mono<Void> createWorkQueue(QueueSpecification queueSpecification, 
BindingSpecification bindingSpecification) {
+        Preconditions.checkArgument(queueSpecification.getName() != null, 
"WorkQueue pattern do not make sense for unnamed queues");
+        
Preconditions.checkArgument(queueSpecification.getName().equals(bindingSpecification.getQueue()),
+            "Binding needs to be targetting the created queue %s instead of 
%s",
+            queueSpecification.getName(), bindingSpecification.getQueue());
+
+        return Flux.concat(
+            Mono.using(this::createSender,
+                managementSender -> 
managementSender.declareQueue(queueSpecification),
+                Sender::close)
+                .onErrorResume(
+                    e -> e instanceof ShutdownSignalException
+                        && e.getMessage().contains("reply-code=406, 
reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for 
queue"),
+                    e -> {
+                        LOGGER.warn("{} already exists without dead-letter 
setup. Dead lettered messages to it will be lost. " +
+                            "To solve this, re-create the queue with the 
x-dead-letter-exchange argument set up.",
+                            queueSpecification.getName());
+                        return Mono.empty();
+                    }),
+            sender.bind(bindingSpecification))
+            .then();
+    }
+
     private void invalidateObject(Channel channel) {
         try {
             pool.invalidateObject(channel);
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 5141ae5..99c5cb9 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -34,6 +34,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Predicate;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.util.MDCBuilder;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
@@ -78,6 +78,7 @@ class GroupRegistration implements Registration {
     static final String RETRY_COUNT = "retry-count";
     static final int DEFAULT_RETRY_COUNT = 0;
 
+    private final ReactorRabbitMQChannelPool channelPool;
     private final MailboxListener.ReactiveMailboxListener mailboxListener;
     private final WorkQueueName queueName;
     private final Receiver receiver;
@@ -91,10 +92,11 @@ class GroupRegistration implements Registration {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
-    GroupRegistration(Sender sender, ReceiverProvider receiverProvider, 
EventSerializer eventSerializer,
+    GroupRegistration(ReactorRabbitMQChannelPool channelPool, Sender sender, 
ReceiverProvider receiverProvider, EventSerializer eventSerializer,
                       MailboxListener.ReactiveMailboxListener mailboxListener, 
Group group, RetryBackoffConfiguration retryBackoff,
                       EventDeadLetters eventDeadLetters,
                       Runnable unregisterGroup, MailboxListenerExecutor 
mailboxListenerExecutor) {
+        this.channelPool = channelPool;
         this.eventSerializer = eventSerializer;
         this.mailboxListener = mailboxListener;
         this.queueName = WorkQueueName.of(group);
@@ -120,17 +122,16 @@ class GroupRegistration implements Registration {
     }
 
     private Mono<Void> createGroupWorkQueue() {
-        return Flux.concat(
-            sender.declareQueue(QueueSpecification.queue(queueName.asString())
+        return channelPool.createWorkQueue(
+            QueueSpecification.queue(queueName.asString())
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                
.arguments(deadLetterQueue(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME))),
-            sender.bind(BindingSpecification.binding()
+                
.arguments(deadLetterQueue(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)),
+            BindingSpecification.binding()
                 .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
                 .queue(queueName.asString())
-                .routingKey(EMPTY_ROUTING_KEY)))
-            .then();
+                .routingKey(EMPTY_ROUTING_KEY));
     }
 
     private Disposable consumeWorkQueue() {
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index 3f729e0..251b375 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 
@@ -31,16 +32,18 @@ import reactor.rabbitmq.Sender;
 class GroupRegistrationHandler {
     private final Map<Group, GroupRegistration> groupRegistrations;
     private final EventSerializer eventSerializer;
+    private final ReactorRabbitMQChannelPool channelPool;
     private final Sender sender;
     private final ReceiverProvider receiverProvider;
     private final RetryBackoffConfiguration retryBackoff;
     private final EventDeadLetters eventDeadLetters;
     private final MailboxListenerExecutor mailboxListenerExecutor;
 
-    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, 
ReceiverProvider receiverProvider,
+    GroupRegistrationHandler(EventSerializer eventSerializer, 
ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider 
receiverProvider,
                              RetryBackoffConfiguration retryBackoff,
                              EventDeadLetters eventDeadLetters, 
MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventSerializer = eventSerializer;
+        this.channelPool = channelPool;
         this.sender = sender;
         this.receiverProvider = receiverProvider;
         this.retryBackoff = retryBackoff;
@@ -71,7 +74,7 @@ class GroupRegistrationHandler {
 
     private GroupRegistration 
newGroupRegistration(MailboxListener.ReactiveMailboxListener listener, Group 
group) {
         return new GroupRegistration(
-            sender,
+            channelPool, sender,
             receiverProvider,
             eventSerializer,
             listener,
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 29dcc6f..5fb8be6 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
@@ -53,6 +54,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private final Sender sender;
     private final ReceiverProvider receiverProvider;
+    private final ReactorRabbitMQChannelPool channelPool;
 
     private volatile boolean isRunning;
     private volatile boolean isStopping;
@@ -64,10 +66,11 @@ public class RabbitMQEventBus implements EventBus, 
Startable {
     public RabbitMQEventBus(Sender sender, ReceiverProvider receiverProvider, 
EventSerializer eventSerializer,
                             RetryBackoffConfiguration retryBackoff,
                             RoutingKeyConverter routingKeyConverter,
-                            EventDeadLetters eventDeadLetters, MetricFactory 
metricFactory) {
+                            EventDeadLetters eventDeadLetters, MetricFactory 
metricFactory, ReactorRabbitMQChannelPool channelPool) {
         this.sender = sender;
         this.receiverProvider = receiverProvider;
         this.mailboxListenerExecutor = new 
MailboxListenerExecutor(metricFactory);
+        this.channelPool = channelPool;
         this.eventBusId = EventBusId.random();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
@@ -82,7 +85,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
             LocalListenerRegistry localListenerRegistry = new 
LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, 
eventSerializer, sender, receiverProvider, routingKeyConverter, 
localListenerRegistry, mailboxListenerExecutor, retryBackoff);
-            groupRegistrationHandler = new 
GroupRegistrationHandler(eventSerializer, sender, receiverProvider, 
retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+            groupRegistrationHandler = new 
GroupRegistrationHandler(eventSerializer, channelPool, sender, 
receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
             eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, 
sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters);
 
             eventDispatcher.start();
@@ -97,7 +100,7 @@ public class RabbitMQEventBus implements EventBus, Startable 
{
 
             LocalListenerRegistry localListenerRegistry = new 
LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, 
eventSerializer, sender, receiverProvider, routingKeyConverter, 
localListenerRegistry, mailboxListenerExecutor, retryBackoff);
-            groupRegistrationHandler = new 
GroupRegistrationHandler(eventSerializer, sender, receiverProvider, 
retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+            groupRegistrationHandler = new 
GroupRegistrationHandler(eventSerializer, channelPool, sender, 
receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
             eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, 
sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters);
 
             keyRegistrationHandler.declareQueue();
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java
new file mode 100644
index 0000000..89c64cc
--- /dev/null
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static 
org.apache.james.mailbox.events.EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import org.apache.james.mailbox.events.GroupRegistration.WorkQueueName;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver;
+import org.apache.james.mailbox.util.EventCollector;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.rabbitmq.QueueSpecification;
+
+class RabbitMQEventBusDeadLetterQueueUpgradeTest {
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = 
RabbitMQExtension.singletonRabbitMQ()
+        .isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
+
+    private RabbitMQEventBus eventBus;
+
+    @BeforeEach
+    void setUp() {
+        MemoryEventDeadLetters memoryEventDeadLetters = new 
MemoryEventDeadLetters();
+
+        TestId.Factory mailboxIdFactory = new TestId.Factory();
+        EventSerializer eventSerializer = new 
EventSerializer(mailboxIdFactory, new TestMessageId.Factory(), new 
DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
+        RoutingKeyConverter routingKeyConverter = 
RoutingKeyConverter.forFactories(new 
MailboxIdRegistrationKey.Factory(mailboxIdFactory));
+
+        eventBus = new RabbitMQEventBus(rabbitMQExtension.getSender(), 
rabbitMQExtension.getReceiverProvider(),
+            eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
+            memoryEventDeadLetters, new RecordingMetricFactory(), 
rabbitMQExtension.getRabbitChannelPool());
+
+        eventBus.start();
+    }
+
+    @AfterEach
+    void tearDown() {
+        eventBus.stop();
+    }
+
+    @Test
+    void eventBusShouldStartWhenDeadLetterUpgradeWasNotPerformed() {
+        GroupA registeredGroup = new GroupA();
+        WorkQueueName workQueueName = WorkQueueName.of(registeredGroup);
+        
+        rabbitMQExtension.getSender()
+            .declareQueue(QueueSpecification.queue(workQueueName.asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE))
+            .block();
+
+        assertThatCode(eventBus::start).doesNotThrowAnyException();
+        assertThatCode(() -> eventBus.register(new EventCollector(), 
registeredGroup)).doesNotThrowAnyException();
+    }
+
+}
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 78079d5..80650a9 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -154,7 +154,10 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     }
 
     private RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider 
receiverProvider) {
-        return new RabbitMQEventBus(sender, receiverProvider, eventSerializer, 
EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, 
memoryEventDeadLetters, new RecordingMetricFactory());
+        return new RabbitMQEventBus(sender, receiverProvider, eventSerializer,
+            EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, 
routingKeyConverter,
+            memoryEventDeadLetters, new RecordingMetricFactory(),
+            rabbitMQExtension.getRabbitChannelPool());
     }
 
     @Override
diff --git 
a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
 
b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 70128f3..7923b37 100644
--- 
a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ 
b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -112,7 +112,8 @@ public class RabbitMQEventBusHostSystem extends 
JamesImapHostSystem {
         RoutingKeyConverter routingKeyConverter = new 
RoutingKeyConverter(ImmutableSet.of(new 
MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
         return new RabbitMQEventBus(reactorRabbitMQChannelPool.getSender(), 
reactorRabbitMQChannelPool::createReceiver,
             eventSerializer, RetryBackoffConfiguration.DEFAULT, 
routingKeyConverter, new MemoryEventDeadLetters(),
-            new RecordingMetricFactory());
+            new RecordingMetricFactory(),
+            reactorRabbitMQChannelPool);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to