JAMES-2545 Retry connection to RabbitMQ

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d031739f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d031739f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d031739f

Branch: refs/heads/master
Commit: d031739fde64b7234c9c806e00fdcadd0c431524
Parents: f2da55f
Author: Antoine Duprat <[email protected]>
Authored: Wed Sep 12 15:33:12 2018 +0200
Committer: Benoit Tellier <[email protected]>
Committed: Fri Sep 14 10:18:33 2018 +0700

----------------------------------------------------------------------
 backends-common/rabbitmq/pom.xml                |  1 -
 .../backend/rabbitmq/RabbitChannelPool.java     | 21 +++++++++++-----
 .../rabbitmq/RabbitMQConnectionFactory.java     | 25 +++++++++++++-------
 .../rabbitmq/RabbitMQConnectionFactoryTest.java | 20 +++++++++++++---
 .../rabbitmq/RabbitMQHealthCheckTest.java       | 19 ++++++++++-----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   | 21 +++++++++++++++-
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  | 21 +++++++++++++++-
 7 files changed, 101 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index 9583901..d260153 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -43,7 +43,6 @@
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
index f6f4154..9db1a19 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
@@ -19,12 +19,18 @@
 
 package org.apache.james.backend.rabbitmq;
 
+import java.util.function.Supplier;
+
+import javax.inject.Inject;
+
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.james.util.MemoizedSupplier;
 
+import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -37,15 +43,17 @@ public class RabbitChannelPool {
     }
 
     private static class ChannelBasePooledObjectFactory extends 
BasePooledObjectFactory<Channel> {
-        private final Connection connection;
+        private final Supplier<Connection> connection;
 
-        public ChannelBasePooledObjectFactory(Connection connection) {
-            this.connection = connection;
+        public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory 
factory) {
+            this.connection = MemoizedSupplier.of(
+                    Throwing.supplier(() -> factory.create()).sneakyThrow());
         }
 
         @Override
         public Channel create() throws Exception {
-            return connection.createChannel();
+            return connection.get()
+                    .createChannel();
         }
 
         @Override
@@ -66,9 +74,10 @@ public class RabbitChannelPool {
 
     private final ObjectPool<Channel> pool;
 
-    public RabbitChannelPool(Connection connection) {
+    @Inject
+    public RabbitChannelPool(RabbitMQConnectionFactory factory) {
         pool = new GenericObjectPool<>(
-            new ChannelBasePooledObjectFactory(connection));
+            new ChannelBasePooledObjectFactory(factory));
     }
 
     public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws 
E, ConnectionFailedException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index b6a6ba2..2f901f4 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -18,23 +18,30 @@
  ****************************************************************/
 package org.apache.james.backend.rabbitmq;
 
+import java.util.concurrent.ExecutionException;
+
 import javax.inject.Inject;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.james.util.retry.RetryExecutorUtil;
 
+import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
 public class RabbitMQConnectionFactory {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQConnectionFactory.class);
-
+    private final AsyncRetryExecutor executor;
     private final ConnectionFactory connectionFactory;
 
+    private final int maxRetries;
+    private final int minDelay;
+
     @Inject
-    public RabbitMQConnectionFactory(RabbitMQConfiguration 
rabbitMQConfiguration) {
+    public RabbitMQConnectionFactory(RabbitMQConfiguration 
rabbitMQConfiguration, AsyncRetryExecutor executor) {
+        this.executor = executor;
         this.connectionFactory = from(rabbitMQConfiguration);
+        this.maxRetries = rabbitMQConfiguration.getMaxRetries();
+        this.minDelay = rabbitMQConfiguration.getMinDelay();
     }
 
     private ConnectionFactory from(RabbitMQConfiguration 
rabbitMQConfiguration) {
@@ -43,16 +50,16 @@ public class RabbitMQConnectionFactory {
             connectionFactory.setUri(rabbitMQConfiguration.getUri());
             return connectionFactory;
         } catch (Exception e) {
-            LOGGER.error("Fail to create the RabbitMQ connection factory.");
             throw new RuntimeException(e);
         }
     }
 
     public Connection create() {
         try {
-            return connectionFactory.newConnection();
-        } catch (Exception e) {
-            LOGGER.error("Fail to create a RabbitMQ connection.");
+            return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, 
minDelay, Exception.class)
+                    .getWithRetry(context -> connectionFactory.newConnection())
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
index 99cca33..3d0c13f 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
@@ -21,11 +21,23 @@ package org.apache.james.backend.rabbitmq;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+
 class RabbitMQConnectionFactoryTest {
 
+    private ScheduledExecutorService scheduledExecutor;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+    }
+
     @Test
     void creatingAFactoryShouldWorkWhenConfigurationIsValid() {
         RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
@@ -33,7 +45,7 @@ class RabbitMQConnectionFactoryTest {
             
.managementUri(URI.create("http://james:james@rabbitmq_host:15672/api/";))
             .build();
 
-        new RabbitMQConnectionFactory(rabbitMQConfiguration);
+        new RabbitMQConnectionFactory(rabbitMQConfiguration, new 
AsyncRetryExecutor(scheduledExecutor));
     }
 
     @Test
@@ -43,7 +55,7 @@ class RabbitMQConnectionFactoryTest {
             
.managementUri(URI.create("http://james:james@rabbitmq_host:15672/api/";))
             .build();
 
-        assertThatThrownBy(() -> new 
RabbitMQConnectionFactory(rabbitMQConfiguration))
+        assertThatThrownBy(() -> new 
RabbitMQConnectionFactory(rabbitMQConfiguration, new 
AsyncRetryExecutor(scheduledExecutor)))
             .isInstanceOf(RuntimeException.class);
     }
 
@@ -52,9 +64,11 @@ class RabbitMQConnectionFactoryTest {
         RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
                 .amqpUri(URI.create("amqp://james:james@rabbitmq_host:5672"))
                 
.managementUri(URI.create("http://james:james@rabbitmq_host:15672/api/";))
+                .maxRetries(1)
+                .minDelay(1)
                 .build();
 
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration);
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration, new 
AsyncRetryExecutor(scheduledExecutor));
 
         assertThatThrownBy(() -> rabbitMQConnectionFactory.create())
             .isInstanceOf(RuntimeException.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
index 0938688..ec50384 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
@@ -22,14 +22,14 @@ package org.apache.james.backend.rabbitmq;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.net.URI;
+import java.util.concurrent.Executors;
 
 import org.apache.james.core.healthcheck.Result;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import com.rabbitmq.client.ConnectionFactory;
+import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
 @ExtendWith(DockerRabbitMQExtension.class)
 class RabbitMQHealthCheckTest {
@@ -38,10 +38,18 @@ class RabbitMQHealthCheckTest {
     @BeforeEach
     void setUp(DockerRabbitMQ rabbitMQ) throws Exception {
         URI amqpUri = URI.create("amqp://" + rabbitMQ.getHostIp() + ":" + 
rabbitMQ.getPort());
-        ConnectionFactory connectionFactory = new ConnectionFactory();
-        connectionFactory.setUri(amqpUri);
+        URI managementUri = URI.create("http://"; + rabbitMQ.getHostIp() + 
":15672/api/");
+
+        RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
+            .amqpUri(amqpUri)
+            .managementUri(managementUri)
+            .build();
+
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
+                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+
         healthCheck = new RabbitMQHealthCheck(
-            new RabbitChannelPool(connectionFactory.newConnection()));
+            new RabbitChannelPool(rabbitMQConnectionFactory));
     }
 
     @Test
@@ -61,7 +69,6 @@ class RabbitMQHealthCheckTest {
     }
 
     @Test
-    @Disabled("connection don't recover instantly, we should try several time 
(depending on heartbeat rabbit conf")
     void checkShouldDetectWhenRabbitMQRecovered(DockerRabbitMQ rabbitMQ) 
throws Exception {
         rabbitMQ.stopApp();
         healthCheck.check();

http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index c2d5c07..811c361 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
 import javax.mail.internet.MimeMessage;
@@ -29,6 +30,8 @@ import javax.mail.internet.MimeMessage;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
 import org.apache.james.backend.rabbitmq.RabbitChannelPool;
+import org.apache.james.backend.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraExtension;
@@ -47,6 +50,8 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+
 @ExtendWith({ReusableDockerRabbitMQExtension.class, 
DockerCassandraExtension.class})
 public class RabbitMQMailQueueTest implements MailQueueContract {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
@@ -65,13 +70,27 @@ public class RabbitMQMailQueueTest implements 
MailQueueContract {
         CassandraBlobsDAO blobsDAO = new 
CassandraBlobsDAO(cassandra.getConf(), 
CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY);
         Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = 
MimeMessageStore.factory(blobsDAO).mimeMessageStore();
 
+        URI amqpUri = new URIBuilder()
+            .setScheme("amqp")
+            .setHost(rabbitMQ.getHostIp())
+            .setPort(rabbitMQ.getPort())
+            .build();
         URI rabbitManagementUri = new URIBuilder()
             .setScheme("http")
             .setHost(rabbitMQ.getHostIp())
             .setPort(rabbitMQ.getAdminPort())
             .build();
 
-        RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQ.connectionFactory().newConnection()));
+
+        RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
+            .amqpUri(amqpUri)
+            .managementUri(rabbitManagementUri)
+            .build();
+
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
+                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+
+        RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQConnectionFactory));
         RabbitMQMailQueue.Factory factory = new 
RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY);
         RabbitMQManagementApi mqManagementApi = new 
RabbitMQManagementApi(rabbitManagementUri, new 
RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, 
mqManagementApi, factory);

http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index bbb4734..1df4d92 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
 import javax.mail.internet.MimeMessage;
@@ -29,6 +30,8 @@ import javax.mail.internet.MimeMessage;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
 import org.apache.james.backend.rabbitmq.RabbitChannelPool;
+import org.apache.james.backend.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraExtension;
@@ -47,6 +50,8 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+
 @ExtendWith({ReusableDockerRabbitMQExtension.class, 
DockerCassandraExtension.class})
 class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQMailQueue> {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
@@ -65,13 +70,27 @@ class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQM
         CassandraBlobsDAO blobsDAO = new 
CassandraBlobsDAO(cassandra.getConf(), 
CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY);
         Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = 
MimeMessageStore.factory(blobsDAO).mimeMessageStore();
 
+        URI amqpUri = new URIBuilder()
+            .setScheme("amqp")
+            .setHost(rabbitMQ.getHostIp())
+            .setPort(rabbitMQ.getPort())
+            .build();
         URI rabbitManagementUri = new URIBuilder()
             .setScheme("http")
             .setHost(rabbitMQ.getHostIp())
             .setPort(rabbitMQ.getAdminPort())
             .build();
 
-        RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQ.connectionFactory().newConnection()));
+
+        RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
+            .amqpUri(amqpUri)
+            .managementUri(rabbitManagementUri)
+            .build();
+
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
+                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+
+        RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQConnectionFactory));
         RabbitMQMailQueue.Factory factory = new 
RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY);
         RabbitMQManagementApi mqManagementApi = new 
RabbitMQManagementApi(rabbitManagementUri, new 
RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, 
mqManagementApi, factory);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to