JAMES-2545 Retry connection to RabbitMQ
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d031739f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d031739f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d031739f Branch: refs/heads/master Commit: d031739fde64b7234c9c806e00fdcadd0c431524 Parents: f2da55f Author: Antoine Duprat <[email protected]> Authored: Wed Sep 12 15:33:12 2018 +0200 Committer: Benoit Tellier <[email protected]> Committed: Fri Sep 14 10:18:33 2018 +0700 ---------------------------------------------------------------------- backends-common/rabbitmq/pom.xml | 1 - .../backend/rabbitmq/RabbitChannelPool.java | 21 +++++++++++----- .../rabbitmq/RabbitMQConnectionFactory.java | 25 +++++++++++++------- .../rabbitmq/RabbitMQConnectionFactoryTest.java | 20 +++++++++++++--- .../rabbitmq/RabbitMQHealthCheckTest.java | 19 ++++++++++----- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 21 +++++++++++++++- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 21 +++++++++++++++- 7 files changed, 101 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml index 9583901..d260153 100644 --- a/backends-common/rabbitmq/pom.xml +++ b/backends-common/rabbitmq/pom.xml @@ -43,7 +43,6 @@ <dependency> <groupId>${james.groupId}</groupId> <artifactId>james-server-util</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>com.google.guava</groupId> http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java index f6f4154..9db1a19 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java @@ -19,12 +19,18 @@ package org.apache.james.backend.rabbitmq; +import java.util.function.Supplier; + +import javax.inject.Inject; + import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.james.util.MemoizedSupplier; +import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -37,15 +43,17 @@ public class RabbitChannelPool { } private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { - private final Connection connection; + private final Supplier<Connection> connection; - public ChannelBasePooledObjectFactory(Connection connection) { - this.connection = connection; + public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory factory) { + this.connection = MemoizedSupplier.of( + Throwing.supplier(() -> factory.create()).sneakyThrow()); } @Override public Channel create() throws Exception { - return connection.createChannel(); + return connection.get() + .createChannel(); } @Override @@ -66,9 +74,10 @@ public class RabbitChannelPool { private final ObjectPool<Channel> pool; - public RabbitChannelPool(Connection connection) { + @Inject + public RabbitChannelPool(RabbitMQConnectionFactory factory) { pool = new GenericObjectPool<>( - new ChannelBasePooledObjectFactory(connection)); + new ChannelBasePooledObjectFactory(factory)); } public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java index b6a6ba2..2f901f4 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java @@ -18,23 +18,30 @@ ****************************************************************/ package org.apache.james.backend.rabbitmq; +import java.util.concurrent.ExecutionException; + import javax.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.james.util.retry.RetryExecutorUtil; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQConnectionFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConnectionFactory.class); - + private final AsyncRetryExecutor executor; private final ConnectionFactory connectionFactory; + private final int maxRetries; + private final int minDelay; + @Inject - public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) { + public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration, AsyncRetryExecutor executor) { + this.executor = executor; this.connectionFactory = from(rabbitMQConfiguration); + this.maxRetries = rabbitMQConfiguration.getMaxRetries(); + this.minDelay = rabbitMQConfiguration.getMinDelay(); } private ConnectionFactory from(RabbitMQConfiguration rabbitMQConfiguration) { @@ -43,16 +50,16 @@ public class RabbitMQConnectionFactory { connectionFactory.setUri(rabbitMQConfiguration.getUri()); return connectionFactory; } catch (Exception e) { - LOGGER.error("Fail to create the RabbitMQ connection factory."); throw new RuntimeException(e); } } public Connection create() { try { - return connectionFactory.newConnection(); - } catch (Exception e) { - LOGGER.error("Fail to create a RabbitMQ connection."); + return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, Exception.class) + .getWithRetry(context -> connectionFactory.newConnection()) + .get(); + } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java index 99cca33..3d0c13f 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java @@ -21,11 +21,23 @@ package org.apache.james.backend.rabbitmq; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; + class RabbitMQConnectionFactoryTest { + private ScheduledExecutorService scheduledExecutor; + + @BeforeEach + void setUp() throws Exception { + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + } + @Test void creatingAFactoryShouldWorkWhenConfigurationIsValid() { RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() @@ -33,7 +45,7 @@ class RabbitMQConnectionFactoryTest { .managementUri(URI.create("http://james:james@rabbitmq_host:15672/api/")) .build(); - new RabbitMQConnectionFactory(rabbitMQConfiguration); + new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(scheduledExecutor)); } @Test @@ -43,7 +55,7 @@ class RabbitMQConnectionFactoryTest { .managementUri(URI.create("http://james:james@rabbitmq_host:15672/api/")) .build(); - assertThatThrownBy(() -> new RabbitMQConnectionFactory(rabbitMQConfiguration)) + assertThatThrownBy(() -> new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(scheduledExecutor))) .isInstanceOf(RuntimeException.class); } @@ -52,9 +64,11 @@ class RabbitMQConnectionFactoryTest { RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() .amqpUri(URI.create("amqp://james:james@rabbitmq_host:5672")) .managementUri(URI.create("http://james:james@rabbitmq_host:15672/api/")) + .maxRetries(1) + .minDelay(1) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration); + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(scheduledExecutor)); assertThatThrownBy(() -> rabbitMQConnectionFactory.create()) .isInstanceOf(RuntimeException.class); http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java index 0938688..ec50384 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java @@ -22,14 +22,14 @@ package org.apache.james.backend.rabbitmq; import static org.assertj.core.api.Assertions.assertThat; import java.net.URI; +import java.util.concurrent.Executors; import org.apache.james.core.healthcheck.Result; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import com.rabbitmq.client.ConnectionFactory; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; @ExtendWith(DockerRabbitMQExtension.class) class RabbitMQHealthCheckTest { @@ -38,10 +38,18 @@ class RabbitMQHealthCheckTest { @BeforeEach void setUp(DockerRabbitMQ rabbitMQ) throws Exception { URI amqpUri = URI.create("amqp://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getPort()); - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setUri(amqpUri); + URI managementUri = URI.create("http://" + rabbitMQ.getHostIp() + ":15672/api/"); + + RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() + .amqpUri(amqpUri) + .managementUri(managementUri) + .build(); + + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, + new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + healthCheck = new RabbitMQHealthCheck( - new RabbitChannelPool(connectionFactory.newConnection())); + new RabbitChannelPool(rabbitMQConnectionFactory)); } @Test @@ -61,7 +69,6 @@ class RabbitMQHealthCheckTest { } @Test - @Disabled("connection don't recover instantly, we should try several time (depending on heartbeat rabbit conf") void checkShouldDetectWhenRabbitMQRecovered(DockerRabbitMQ rabbitMQ) throws Exception { rabbitMQ.stopApp(); healthCheck.check(); http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index c2d5c07..811c361 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import javax.mail.internet.MimeMessage; @@ -29,6 +30,8 @@ import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; import org.apache.james.backend.rabbitmq.RabbitChannelPool; +import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; +import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; @@ -47,6 +50,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; + @ExtendWith({ReusableDockerRabbitMQExtension.class, DockerCassandraExtension.class}) public class RabbitMQMailQueueTest implements MailQueueContract { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -65,13 +70,27 @@ public class RabbitMQMailQueueTest implements MailQueueContract { CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY); Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore(); + URI amqpUri = new URIBuilder() + .setScheme("amqp") + .setHost(rabbitMQ.getHostIp()) + .setPort(rabbitMQ.getPort()) + .build(); URI rabbitManagementUri = new URIBuilder() .setScheme("http") .setHost(rabbitMQ.getHostIp()) .setPort(rabbitMQ.getAdminPort()) .build(); - RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQ.connectionFactory().newConnection())); + + RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() + .amqpUri(amqpUri) + .managementUri(rabbitManagementUri) + .build(); + + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, + new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + + RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); http://git-wip-us.apache.org/repos/asf/james-project/blob/d031739f/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index bbb4734..1df4d92 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import javax.mail.internet.MimeMessage; @@ -29,6 +30,8 @@ import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; import org.apache.james.backend.rabbitmq.RabbitChannelPool; +import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; +import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; @@ -47,6 +50,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; + @ExtendWith({ReusableDockerRabbitMQExtension.class, DockerCassandraExtension.class}) class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -65,13 +70,27 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY); Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore(); + URI amqpUri = new URIBuilder() + .setScheme("amqp") + .setHost(rabbitMQ.getHostIp()) + .setPort(rabbitMQ.getPort()) + .build(); URI rabbitManagementUri = new URIBuilder() .setScheme("http") .setHost(rabbitMQ.getHostIp()) .setPort(rabbitMQ.getAdminPort()) .build(); - RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQ.connectionFactory().newConnection())); + + RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() + .amqpUri(amqpUri) + .managementUri(rabbitManagementUri) + .build(); + + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, + new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + + RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
