This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3850c34a294b3ba2b2fc0a67b5396cba50f0f50b Author: Benoit Tellier <[email protected]> AuthorDate: Fri Apr 17 08:10:27 2020 +0700 JAMES-3117 Reactive RabbitMQ healthCHeck --- .../backends/rabbitmq/RabbitMQHealthCheck.java | 42 ++++++++++++++++------ .../rabbitmq/ReactorRabbitMQChannelPool.java | 24 ++++++------- .../backends/rabbitmq/SimpleConnectionPool.java | 32 +++++++---------- 3 files changed, 55 insertions(+), 43 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java index a5e6d00..098dc26 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java @@ -27,6 +27,9 @@ import org.apache.james.core.healthcheck.ComponentName; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class RabbitMQHealthCheck implements HealthCheck { private static final RabbitMQServerVersion MINIMAL_VERSION = RabbitMQServerVersion.of("3.8.1"); private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQ backend"); @@ -46,27 +49,44 @@ public class RabbitMQHealthCheck implements HealthCheck { } @Override - public Result check() { + public Mono<Result> checkReactive() { try { - if (connectionPool.tryConnection() && rabbitChannelPoolImpl.tryChannel()) { - Optional<RabbitMQServerVersion> version = connectionPool.version(); + return Flux.concat(connectionPool.tryConnection(), + rabbitChannelPoolImpl.tryChannel()) + .reduce(true, (a, b) -> a && b) + .flatMap(channelOpen -> { + if (channelOpen) { + return checkVersion(); + } else { + return Mono.just(Result.unhealthy(COMPONENT_NAME, "The created connection was not opened")); + } + }) + .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, + "Unhealthy RabbitMQ instances: could not establish a connection", e))); + } catch (Exception e) { + return Mono.just(Result.unhealthy(COMPONENT_NAME, + "Unhealthy RabbitMQ instances: could not establish a connection", e)); + } + } + + private Mono<? extends Result> checkVersion() { + return connectionPool.version() + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .flatMap(version -> { boolean isCompatible = version .map(fetchedVersion -> fetchedVersion.isAtLeast(MINIMAL_VERSION)) .orElse(false); + if (!isCompatible) { String versionCompatibilityError = String.format( "RabbitMQ version(%s) is not compatible with the required one(%s)", version.map(RabbitMQServerVersion::asString).orElse("no versions fetched"), MINIMAL_VERSION.asString()); - return Result.unhealthy(COMPONENT_NAME, versionCompatibilityError); + return Mono.just(Result.unhealthy(COMPONENT_NAME, versionCompatibilityError)); } - return Result.healthy(COMPONENT_NAME); - } else { - return Result.unhealthy(COMPONENT_NAME, "The created connection was not opened"); - } - } catch (Exception e) { - return Result.unhealthy(COMPONENT_NAME, "Unhealthy RabbitMQ instances: could not establish a connection", e); - } + return Mono.just(Result.healthy(COMPONENT_NAME)); + }); } } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index 0459abe..0c89212 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -214,18 +214,16 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { pool.close(); } - public boolean tryChannel() { - Channel channel = null; - try { - channel = borrow().block(); - return channel.isOpen(); - } catch (Throwable t) { - return false; - } finally { - if (channel != null) { - borrowedChannels.remove(channel); - pool.returnObject(channel); - } - } + public Mono<Boolean> tryChannel() { + return Mono.usingWhen(borrow(), + channel -> Mono.just(channel.isOpen()), + channel -> { + if (channel != null) { + borrowedChannels.remove(channel); + pool.returnObject(channel); + } + return Mono.empty(); + }) + .onErrorResume(any -> Mono.just(false)); } } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java index f4fa4e1..c8f282b 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java @@ -79,26 +79,20 @@ public class SimpleConnectionPool implements AutoCloseable { } } - public boolean tryConnection() { - try { - return getOpenConnection() - .blockOptional(Duration.ofSeconds(1)) - .isPresent(); - } catch (Throwable t) { - return false; - } + public Mono<Boolean> tryConnection() { + return getOpenConnection() + .timeout(Duration.ofSeconds(1)) + .hasElement() + .onErrorResume(any -> Mono.just(false)); } - public Optional<RabbitMQServerVersion> version() { - try { - return getOpenConnection() - .map(Connection::getServerProperties) - .flatMap(serverProperties -> Mono.justOrEmpty(serverProperties.get("version"))) - .map(Object::toString) - .map(RabbitMQServerVersion::of) - .blockOptional(Duration.ofSeconds(1)); - } catch (Throwable t) { - return Optional.empty(); - } + public Mono<RabbitMQServerVersion> version() { + return getOpenConnection() + .map(Connection::getServerProperties) + .flatMap(serverProperties -> Mono.justOrEmpty(serverProperties.get("version"))) + .map(Object::toString) + .map(RabbitMQServerVersion::of) + .timeout(Duration.ofSeconds(1)) + .onErrorResume(any -> Mono.empty()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
