This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5f74bef2a4163562aa29b48f334ef14b9c513b97 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Apr 1 11:04:55 2020 +0700 JAMES-3117 Reactive HealthCheck --- .../cassandra/utils/CassandraAsyncExecutor.java | 6 ++++++ .../cassandra/utils/CassandraHealthCheck.java | 21 +++++++++---------- .../apache/james/core/healthcheck/HealthCheck.java | 12 ++++++++++- .../events/EventDeadLettersHealthCheck.java | 24 +++++++++++----------- .../org/apache/james/PeriodicalHealthChecks.java | 3 +-- 5 files changed, 40 insertions(+), 26 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 8b911dd..86b192b 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -48,6 +48,12 @@ public class CassandraAsyncExecutor { .publishOn(Schedulers.elastic())); } + public Mono<ResultSet> execute(String statement) { + return Mono.defer(() -> Mono.fromFuture(FutureConverter + .toCompletableFuture(session.executeAsync(statement))) + .publishOn(Schedulers.elastic())); + } + public Mono<Boolean> executeReturnApplied(Statement statement) { return execute(statement) .map(row -> row.wasApplied()); diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java index d184e30..c502d57 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java @@ -27,6 +27,8 @@ import org.apache.james.core.healthcheck.Result; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; + /** * Health check for the Cassandra backend. * @@ -36,11 +38,11 @@ public class CassandraHealthCheck implements HealthCheck { private static final ComponentName COMPONENT_NAME = new ComponentName("Cassandra backend"); private static final String SAMPLE_QUERY = "SELECT NOW() FROM system.local"; - private final Session session; + private final CassandraAsyncExecutor queryExecutor; @Inject public CassandraHealthCheck(Session session) { - this.session = session; + this.queryExecutor = new CassandraAsyncExecutor(session); } @Override @@ -49,14 +51,11 @@ public class CassandraHealthCheck implements HealthCheck { } @Override - public Result check() { - try { - // execute a simple query to check if cassandra is responding - // idea from: https://stackoverflow.com/questions/10246287 - session.execute(SAMPLE_QUERY); - return Result.healthy(COMPONENT_NAME); - } catch (Exception e) { - return Result.unhealthy(COMPONENT_NAME, "Error checking Cassandra backend", e); - } + public Mono<Result> checkReactive() { + // execute a simple query to check if cassandra is responding + // idea from: https://stackoverflow.com/questions/10246287 + return queryExecutor.execute(SAMPLE_QUERY) + .map(resultSet -> Result.healthy(COMPONENT_NAME)) + .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking Cassandra backend", e))); } } diff --git a/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java b/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java index f7659b0..effd3c6 100644 --- a/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java +++ b/core/src/main/java/org/apache/james/core/healthcheck/HealthCheck.java @@ -18,8 +18,18 @@ ****************************************************************/ package org.apache.james.core.healthcheck; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Mono; + public interface HealthCheck { ComponentName componentName(); - Result check(); + default Result check() { + return Mono.from(checkReactive()).block(); + } + + default Publisher<Result> checkReactive() { + return Mono.fromCallable(this::check); + } } diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java index 8fc1b1b..dfc8ce1 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLettersHealthCheck.java @@ -25,6 +25,8 @@ import org.apache.james.core.healthcheck.ComponentName; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; +import reactor.core.publisher.Mono; + public class EventDeadLettersHealthCheck implements HealthCheck { private static final ComponentName COMPONENT_NAME = new ComponentName("EventDeadLettersHealthCheck"); @@ -41,17 +43,15 @@ public class EventDeadLettersHealthCheck implements HealthCheck { } @Override - public Result check() { - try { - boolean containEvents = eventDeadLetters.containEvents().block(); - - if (containEvents) { - return Result.degraded(COMPONENT_NAME, "EventDeadLetters contain events. This might indicate transient failure on mailbox event processing."); - } - - return Result.healthy(COMPONENT_NAME); - } catch (Exception e) { - return Result.unhealthy(COMPONENT_NAME, "Error checking EventDeadLettersHealthCheck", e); - } + public Mono<Result> checkReactive() { + return eventDeadLetters.containEvents() + .map(containEvents -> { + if (containEvents) { + return Result.degraded(COMPONENT_NAME, "EventDeadLetters contain events. This might indicate transient failure on mailbox event processing."); + } + + return Result.healthy(COMPONENT_NAME); + }) + .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking EventDeadLettersHealthCheck", e))); } } diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java index acede14..68c08b7 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java @@ -60,8 +60,7 @@ public class PeriodicalHealthChecks implements Startable { public void start() { disposable = Flux.interval(configuration.getPeriod(), scheduler) .flatMap(any -> Flux.fromIterable(healthChecks) - .flatMap(healthCheck -> - Mono.fromCallable(healthCheck::check))) + .flatMap(healthCheck -> Mono.from(healthCheck.checkReactive()))) .doOnNext(this::logResult) .onErrorContinue(this::logError) .subscribeOn(Schedulers.elastic()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
