This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5f74bef2a4163562aa29b48f334ef14b9c513b97
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Apr 1 11:04:55 2020 +0700

    JAMES-3117 Reactive HealthCheck
---
 .../cassandra/utils/CassandraAsyncExecutor.java    |  6 ++++++
 .../cassandra/utils/CassandraHealthCheck.java      | 21 +++++++++----------
 .../apache/james/core/healthcheck/HealthCheck.java | 12 ++++++++++-
 .../events/EventDeadLettersHealthCheck.java        | 24 +++++++++++-----------
 .../org/apache/james/PeriodicalHealthChecks.java   |  3 +--
 5 files changed, 40 insertions(+), 26 deletions(-)

diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 8b911dd..86b192b 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -48,6 +48,12 @@ public class CassandraAsyncExecutor {
                 .publishOn(Schedulers.elastic()));
     }
 
+    public Mono<ResultSet> execute(String statement) {
+        return Mono.defer(() -> Mono.fromFuture(FutureConverter
+                .toCompletableFuture(session.executeAsync(statement)))
+                .publishOn(Schedulers.elastic()));
+    }
+
     public Mono<Boolean> executeReturnApplied(Statement statement) {
         return execute(statement)
                 .map(row -> row.wasApplied());
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java
index d184e30..c502d57 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java
@@ -27,6 +27,8 @@ import org.apache.james.core.healthcheck.Result;
 
 import com.datastax.driver.core.Session;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Health check for the Cassandra backend.
  *
@@ -36,11 +38,11 @@ public class CassandraHealthCheck implements HealthCheck {
     private static final ComponentName COMPONENT_NAME = new 
ComponentName("Cassandra backend");
     private static final String SAMPLE_QUERY = "SELECT NOW() FROM 
system.local";
 
-    private final Session session;
+    private final CassandraAsyncExecutor queryExecutor;
 
     @Inject
     public CassandraHealthCheck(Session session) {
-        this.session = session;
+        this.queryExecutor = new CassandraAsyncExecutor(session);
     }
 
     @Override
@@ -49,14 +51,11 @@ public class CassandraHealthCheck implements HealthCheck {
     }
 
     @Override
-    public Result check() {
-        try {
-            // execute a simple query to check if cassandra is responding
-            // idea from: https://stackoverflow.com/questions/10246287
-            session.execute(SAMPLE_QUERY);
-            return Result.healthy(COMPONENT_NAME);
-        } catch (Exception e) {
-            return Result.unhealthy(COMPONENT_NAME, "Error checking Cassandra 
backend", e);
-        }
+    public Mono<Result> checkReactive() {
+        // execute a simple query to check if cassandra is responding
+        // idea from: https://stackoverflow.com/questions/10246287
+        return queryExecutor.execute(SAMPLE_QUERY)
+            .map(resultSet -> Result.healthy(COMPONENT_NAME))
+            .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, 
"Error checking Cassandra backend", e)));
     }
 }
diff --git 
a/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java 
b/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java
index f7659b0..effd3c6 100644
--- a/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java
+++ b/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java
@@ -18,8 +18,18 @@
  ****************************************************************/
 package org.apache.james.core.healthcheck;
 
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+
 public interface HealthCheck {
     ComponentName componentName();
 
-    Result check();
+    default Result check() {
+        return Mono.from(checkReactive()).block();
+    }
+
+    default Publisher<Result> checkReactive() {
+        return Mono.fromCallable(this::check);
+    }
 }
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java
index 8fc1b1b..dfc8ce1 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java
@@ -25,6 +25,8 @@ import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
 
+import reactor.core.publisher.Mono;
+
 public class EventDeadLettersHealthCheck implements HealthCheck {
     private static final ComponentName COMPONENT_NAME = new 
ComponentName("EventDeadLettersHealthCheck");
 
@@ -41,17 +43,15 @@ public class EventDeadLettersHealthCheck implements 
HealthCheck {
     }
 
     @Override
-    public Result check() {
-        try {
-            boolean containEvents = eventDeadLetters.containEvents().block();
-
-            if (containEvents) {
-                return Result.degraded(COMPONENT_NAME, "EventDeadLetters 
contain events. This might indicate transient failure on mailbox event 
processing.");
-            }
-
-            return Result.healthy(COMPONENT_NAME);
-        } catch (Exception e) {
-            return Result.unhealthy(COMPONENT_NAME, "Error checking 
EventDeadLettersHealthCheck", e);
-        }
+    public Mono<Result> checkReactive() {
+        return eventDeadLetters.containEvents()
+            .map(containEvents -> {
+                if (containEvents) {
+                    return Result.degraded(COMPONENT_NAME, "EventDeadLetters 
contain events. This might indicate transient failure on mailbox event 
processing.");
+                }
+
+                return Result.healthy(COMPONENT_NAME);
+            })
+            .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, 
"Error checking EventDeadLettersHealthCheck", e)));
     }
 }
diff --git 
a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
 
b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
index acede14..68c08b7 100644
--- 
a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
+++ 
b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
@@ -60,8 +60,7 @@ public class PeriodicalHealthChecks implements Startable {
     public void start() {
         disposable = Flux.interval(configuration.getPeriod(), scheduler)
             .flatMap(any -> Flux.fromIterable(healthChecks)
-                .flatMap(healthCheck ->
-                    Mono.fromCallable(healthCheck::check)))
+                .flatMap(healthCheck -> 
Mono.from(healthCheck.checkReactive())))
             .doOnNext(this::logResult)
             .onErrorContinue(this::logError)
             .subscribeOn(Schedulers.elastic())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to