This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3e6ea8bd7f4021755cb2dea98a317499be028b37 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Oct 31 13:43:23 2019 +0700 JAMES-2937 RabbitMQHealthCheck is testing a non used channel pool Our production code no longer uses SimpleChannelPool but Reactor channel pool --- .../james/backends/rabbitmq/RabbitMQHealthCheck.java | 8 +++++--- .../backends/rabbitmq/ReactorRabbitMQChannelPool.java | 10 ++++++++++ .../james/backends/rabbitmq/RabbitMQExtension.java | 16 ++++++++-------- .../james/backends/rabbitmq/RabbitMQHealthCheckTest.java | 4 ++-- .../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 2 +- 5 files changed, 26 insertions(+), 14 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java index d766a5f..7946173 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java @@ -31,10 +31,12 @@ public class RabbitMQHealthCheck implements HealthCheck { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQHealthCheck.class); private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQ backend"); - private final RabbitMQChannelPool rabbitChannelPoolImpl; + private final SimpleConnectionPool connectionPool; + private final ReactorRabbitMQChannelPool rabbitChannelPoolImpl; @Inject - public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) { + public RabbitMQHealthCheck(SimpleConnectionPool connectionPool, ReactorRabbitMQChannelPool rabbitChannelPoolImpl) { + this.connectionPool = connectionPool; this.rabbitChannelPoolImpl = rabbitChannelPoolImpl; } @@ -46,7 +48,7 @@ public class RabbitMQHealthCheck implements HealthCheck { @Override public Result check() { try { - if (rabbitChannelPoolImpl.tryConnection()) { + if (connectionPool.tryConnection() && rabbitChannelPoolImpl.tryChannel()) { return Result.healthy(COMPONENT_NAME); } else { String message = "The created connection was not opened"; diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index e240b93..645a68a 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; +import com.google.common.base.Preconditions; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -138,6 +139,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { public Mono<? extends Channel> getChannelMono() { return Mono.fromCallable(() -> { Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS); + Preconditions.checkArgument(channel.isOpen()); borrowedChannels.add(channel); return channel; }); @@ -182,4 +184,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { borrowedChannels.clear(); pool.close(); } + + public boolean tryChannel() { + try { + return getChannelMono().block().isOpen(); + } catch (Throwable t) { + return false; + } + } } diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java index 668b808..a909bb0 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java @@ -93,8 +93,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, private final DockerRabbitMQ rabbitMQ; private final DockerRestartPolicy dockerRestartPolicy; - private SimpleChannelPool simpleChannelPool; - private RabbitMQConnectionFactory connectionFactory; + private ReactorRabbitMQChannelPool channelPool; private SimpleConnectionPool connectionPool; public RabbitMQExtension(DockerRabbitMQ rabbitMQ, @@ -112,14 +111,15 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, public void beforeEach(ExtensionContext extensionContext) throws Exception { dockerRestartPolicy.beforeEach(rabbitMQ); - connectionFactory = createRabbitConnectionFactory(); + RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory(); connectionPool = new SimpleConnectionPool(connectionFactory); - this.simpleChannelPool = new SimpleChannelPool(connectionPool); + channelPool = new ReactorRabbitMQChannelPool(connectionPool); + channelPool.start(); } @Override public void afterEach(ExtensionContext context) throws Exception { - simpleChannelPool.close(); + channelPool.close(); connectionPool.close(); rabbitMQ.reset(); dockerRestartPolicy.afterEach(rabbitMQ); @@ -140,11 +140,11 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, return rabbitMQ; } - public RabbitMQChannelPool getRabbitChannelPool() { - return simpleChannelPool; + public ReactorRabbitMQChannelPool getRabbitChannelPool() { + return channelPool; } - public SimpleConnectionPool getRabbitConnectionPool() { + public SimpleConnectionPool getConnectionPool() { return connectionPool; } diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheckTest.java index 15c22c3..9f1d9f8 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheckTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheckTest.java @@ -35,8 +35,8 @@ class RabbitMQHealthCheckTest { private RabbitMQHealthCheck healthCheck; @BeforeEach - void setUp() throws Exception { - healthCheck = new RabbitMQHealthCheck(rabbitMQExtension.getRabbitChannelPool()); + void setUp() { + healthCheck = new RabbitMQHealthCheck(rabbitMQExtension.getConnectionPool(), rabbitMQExtension.getRabbitChannelPool()); } @AfterEach diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java index 21be0bd..9eaf53f 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java @@ -67,7 +67,7 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract { private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) { ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool( - rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(), + rabbitMQExtension.getConnectionPool().getResilientConnection(), poolSize); reactorRabbitMQChannelPool.start(); return reactorRabbitMQChannelPool; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
