JAMES-2545 implement the healthcheck on top of channelpool
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b847583c Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b847583c Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b847583c Branch: refs/heads/master Commit: b847583c455ff7586e588f1f8a529b7fd548b536 Parents: 3313b53 Author: Matthieu Baechler <[email protected]> Authored: Mon Sep 10 18:22:26 2018 +0200 Committer: Benoit Tellier <[email protected]> Committed: Fri Sep 14 10:17:42 2018 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/RabbitChannelPool.java | 16 +++++++++---- .../backend/rabbitmq/RabbitMQHealthCheck.java | 24 +++++++++----------- .../rabbitmq/RabbitMQHealthCheckTest.java | 13 ++++++----- 3 files changed, 29 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b847583c/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java index c4a9fe6..f6f4154 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java @@ -30,6 +30,12 @@ import com.rabbitmq.client.Connection; public class RabbitChannelPool { + public static class ConnectionFailedException extends RuntimeException { + public ConnectionFailedException(Throwable cause) { + super(cause); + } + } + private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { private final Connection connection; @@ -65,7 +71,7 @@ public class RabbitChannelPool { new ChannelBasePooledObjectFactory(connection)); } - public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E { + public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { Channel channel = borrowChannel(); try { return f.execute(channel); @@ -75,7 +81,7 @@ public class RabbitChannelPool { } - public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E { + public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { Channel channel = borrowChannel(); try { f.execute(channel); @@ -88,15 +94,15 @@ public class RabbitChannelPool { try { return pool.borrowObject(); } catch (Exception e) { - throw new RuntimeException(e); + throw new ConnectionFailedException(e); } } private void returnChannel(Channel channel) { try { pool.returnObject(channel); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Exception ignore) { + //ignore when return is failing } } http://git-wip-us.apache.org/repos/asf/james-project/blob/b847583c/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java index b850e26..fd9f757 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java @@ -31,19 +31,15 @@ import org.apache.james.core.healthcheck.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - public class RabbitMQHealthCheck implements HealthCheck { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQHealthCheck.class); private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQ backend"); - private final ConnectionFactory connectionFactory; + private final RabbitChannelPool rabbitChannelPool; @Inject - public RabbitMQHealthCheck(RabbitMQConfiguration configuration) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { - this.connectionFactory = new ConnectionFactory(); - this.connectionFactory.setUri(configuration.getUri()); + public RabbitMQHealthCheck(RabbitChannelPool rabbitChannelPool) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + this.rabbitChannelPool = rabbitChannelPool; } @Override @@ -53,12 +49,14 @@ public class RabbitMQHealthCheck implements HealthCheck { @Override public Result check() { - try (Connection connection = connectionFactory.newConnection()) { - if (connection.isOpen()) { - return Result.healthy(COMPONENT_NAME); - } - LOGGER.error("The created connection was not opened"); - return Result.unhealthy(COMPONENT_NAME); + try { + return rabbitChannelPool.execute(channel -> { + if (channel.isOpen()) { + return Result.healthy(COMPONENT_NAME); + } + LOGGER.error("The created connection was not opened"); + return Result.unhealthy(COMPONENT_NAME); + }); } catch (Exception e) { LOGGER.error("Unhealthy RabbitMQ instances: could not establish a connection", e); return Result.unhealthy(COMPONENT_NAME); http://git-wip-us.apache.org/repos/asf/james-project/blob/b847583c/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java index 4ba13fe..0938688 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java @@ -25,9 +25,12 @@ import java.net.URI; import org.apache.james.core.healthcheck.Result; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import com.rabbitmq.client.ConnectionFactory; + @ExtendWith(DockerRabbitMQExtension.class) class RabbitMQHealthCheckTest { private RabbitMQHealthCheck healthCheck; @@ -35,13 +38,10 @@ class RabbitMQHealthCheckTest { @BeforeEach void setUp(DockerRabbitMQ rabbitMQ) throws Exception { URI amqpUri = URI.create("amqp://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getPort()); - URI managementUri = URI.create("http://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getAdminPort()); - + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(amqpUri); healthCheck = new RabbitMQHealthCheck( - RabbitMQConfiguration.builder() - .amqpUri(amqpUri) - .managementUri(managementUri) - .build()); + new RabbitChannelPool(connectionFactory.newConnection())); } @Test @@ -61,6 +61,7 @@ class RabbitMQHealthCheckTest { } @Test + @Disabled("connection don't recover instantly, we should try several time (depending on heartbeat rabbit conf") void checkShouldDetectWhenRabbitMQRecovered(DockerRabbitMQ rabbitMQ) throws Exception { rabbitMQ.stopApp(); healthCheck.check(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
