This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3850c34a294b3ba2b2fc0a67b5396cba50f0f50b
Author: Benoit Tellier <[email protected]>
AuthorDate: Fri Apr 17 08:10:27 2020 +0700

    JAMES-3117 Reactive RabbitMQ healthCHeck
---
 .../backends/rabbitmq/RabbitMQHealthCheck.java     | 42 ++++++++++++++++------
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 24 ++++++-------
 .../backends/rabbitmq/SimpleConnectionPool.java    | 32 +++++++----------
 3 files changed, 55 insertions(+), 43 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java
index a5e6d00..098dc26 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQHealthCheck.java
@@ -27,6 +27,9 @@ import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class RabbitMQHealthCheck implements HealthCheck {
     private static final RabbitMQServerVersion MINIMAL_VERSION = 
RabbitMQServerVersion.of("3.8.1");
     private static final ComponentName COMPONENT_NAME = new 
ComponentName("RabbitMQ backend");
@@ -46,27 +49,44 @@ public class RabbitMQHealthCheck implements HealthCheck {
     }
 
     @Override
-    public Result check() {
+    public Mono<Result> checkReactive() {
         try {
-            if (connectionPool.tryConnection() && 
rabbitChannelPoolImpl.tryChannel()) {
-                Optional<RabbitMQServerVersion> version = 
connectionPool.version();
+            return Flux.concat(connectionPool.tryConnection(),
+                rabbitChannelPoolImpl.tryChannel())
+                .reduce(true, (a, b) -> a && b)
+                .flatMap(channelOpen -> {
+                    if (channelOpen) {
+                        return checkVersion();
+                    } else {
+                        return Mono.just(Result.unhealthy(COMPONENT_NAME, "The 
created connection was not opened"));
+                    }
+                })
+                .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME,
+                    "Unhealthy RabbitMQ instances: could not establish a 
connection", e)));
+        } catch (Exception e) {
+            return Mono.just(Result.unhealthy(COMPONENT_NAME,
+                "Unhealthy RabbitMQ instances: could not establish a 
connection", e));
+        }
+    }
+
+    private Mono<? extends Result> checkVersion() {
+        return connectionPool.version()
+            .map(Optional::of)
+            .defaultIfEmpty(Optional.empty())
+            .flatMap(version -> {
                 boolean isCompatible = version
                     .map(fetchedVersion -> 
fetchedVersion.isAtLeast(MINIMAL_VERSION))
                     .orElse(false);
+
                 if (!isCompatible) {
                     String versionCompatibilityError = String.format(
                         "RabbitMQ version(%s) is not compatible with the 
required one(%s)",
                         
version.map(RabbitMQServerVersion::asString).orElse("no versions fetched"),
                         MINIMAL_VERSION.asString());
-                    return Result.unhealthy(COMPONENT_NAME, 
versionCompatibilityError);
+                    return Mono.just(Result.unhealthy(COMPONENT_NAME, 
versionCompatibilityError));
                 }
 
-                return Result.healthy(COMPONENT_NAME);
-            } else {
-                return Result.unhealthy(COMPONENT_NAME, "The created 
connection was not opened");
-            }
-        } catch (Exception e) {
-            return Result.unhealthy(COMPONENT_NAME, "Unhealthy RabbitMQ 
instances: could not establish a connection", e);
-        }
+                return Mono.just(Result.healthy(COMPONENT_NAME));
+            });
     }
 }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 0459abe..0c89212 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -214,18 +214,16 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
         pool.close();
     }
 
-    public boolean tryChannel() {
-        Channel channel = null;
-        try {
-            channel = borrow().block();
-            return channel.isOpen();
-        } catch (Throwable t) {
-            return false;
-        } finally {
-            if (channel != null) {
-                borrowedChannels.remove(channel);
-                pool.returnObject(channel);
-            }
-        }
+    public Mono<Boolean> tryChannel() {
+        return Mono.usingWhen(borrow(),
+            channel -> Mono.just(channel.isOpen()),
+            channel -> {
+                if (channel != null) {
+                    borrowedChannels.remove(channel);
+                    pool.returnObject(channel);
+                }
+                return Mono.empty();
+            })
+            .onErrorResume(any -> Mono.just(false));
     }
 }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index f4fa4e1..c8f282b 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -79,26 +79,20 @@ public class SimpleConnectionPool implements AutoCloseable {
         }
     }
 
-    public boolean tryConnection() {
-        try {
-            return getOpenConnection()
-                .blockOptional(Duration.ofSeconds(1))
-                .isPresent();
-        } catch (Throwable t) {
-            return false;
-        }
+    public Mono<Boolean> tryConnection() {
+        return getOpenConnection()
+            .timeout(Duration.ofSeconds(1))
+            .hasElement()
+            .onErrorResume(any -> Mono.just(false));
     }
 
-    public Optional<RabbitMQServerVersion> version() {
-        try {
-            return getOpenConnection()
-                .map(Connection::getServerProperties)
-                .flatMap(serverProperties -> 
Mono.justOrEmpty(serverProperties.get("version")))
-                .map(Object::toString)
-                .map(RabbitMQServerVersion::of)
-                .blockOptional(Duration.ofSeconds(1));
-        } catch (Throwable t) {
-            return Optional.empty();
-        }
+    public Mono<RabbitMQServerVersion> version() {
+        return getOpenConnection()
+            .map(Connection::getServerProperties)
+            .flatMap(serverProperties -> 
Mono.justOrEmpty(serverProperties.get("version")))
+            .map(Object::toString)
+            .map(RabbitMQServerVersion::of)
+            .timeout(Duration.ofSeconds(1))
+            .onErrorResume(any -> Mono.empty());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to