This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0f2742b15b7c12baab909a2265ad8f7842a3b14e Author: LanKhuat <[email protected]> AuthorDate: Wed Apr 8 11:12:40 2020 +0700 JAMES-3117 PeriodicalHealthChecks cleanup and additional tests JAMES-3117 Change return type of HealthCheck to Publisher fixup! JAMES-3117 Reactive Healthchecks --- .../cassandra/utils/CassandraAsyncExecutor.java | 8 +- .../cassandra/utils/CassandraHealthCheck.java | 3 +- .../backends/es/ElasticSearchHealthCheck.java | 1 - .../org/apache/james/PeriodicalHealthChecks.java | 6 +- .../apache/james/PeriodicalHealthChecksTest.java | 123 +++++++++++++++++++-- .../james/webadmin/routes/HealthCheckRoutes.java | 49 ++++---- 6 files changed, 148 insertions(+), 42 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 24bf48e..4005cc5 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -50,15 +50,9 @@ public class CassandraAsyncExecutor { .publishOn(Schedulers.elastic())); } - public Mono<ResultSet> execute(String statement) { - return Mono.defer(() -> Mono.fromFuture(FutureConverter - .toCompletableFuture(session.executeAsync(statement))) - .publishOn(Schedulers.elastic())); - } - public Mono<Boolean> executeReturnApplied(Statement statement) { return execute(statement) - .map(row -> row.wasApplied()); + .map(ResultSet::wasApplied); } public Mono<Void> executeVoid(Statement statement) { diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java index 07f575b..d37060d 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraHealthCheck.java @@ -26,6 +26,7 @@ import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; import reactor.core.publisher.Mono; @@ -54,7 +55,7 @@ public class CassandraHealthCheck implements HealthCheck { public Mono<Result> check() { // execute a simple query to check if cassandra is responding // idea from: https://stackoverflow.com/questions/10246287 - return queryExecutor.execute(SAMPLE_QUERY) + return queryExecutor.execute(new SimpleStatement(SAMPLE_QUERY)) .map(resultSet -> Result.healthy(COMPONENT_NAME)) .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking Cassandra backend", e))); } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java index 1d3f92e..fc23ef5 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java @@ -36,7 +36,6 @@ import com.google.common.annotations.VisibleForTesting; import reactor.core.publisher.Mono; - public class ElasticSearchHealthCheck implements HealthCheck { private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend"); diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java index dedddca..8182cc8 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java @@ -91,12 +91,12 @@ public class PeriodicalHealthChecks implements Startable { if (result.getError().isPresent()) { LOGGER.error("UNHEALTHY: {} : {}", result.getComponentName().getName(), - result.getCause(), + result.getCause().orElse(""), result.getError().get()); } else { LOGGER.error("UNHEALTHY: {} : {}", result.getComponentName().getName(), - result.getCause()); + result.getCause().orElse("")); } } @@ -108,7 +108,7 @@ public class PeriodicalHealthChecks implements Startable { error); return; } - LOGGER.error("HealthCheck error. Triggering value: {}, Cause: {}", + LOGGER.error("HealthCheck error. Triggering value: {}, Cause: ", triggeringValue, error); } diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java b/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java index 49e346b..a325098 100644 --- a/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java +++ b/server/container/guice/guice-common/src/test/java/org/apache/james/PeriodicalHealthChecksTest.java @@ -20,6 +20,7 @@ package org.apache.james; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -31,19 +32,49 @@ import org.apache.james.core.healthcheck.ComponentName; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; import org.apache.james.mailbox.events.EventDeadLettersHealthCheck; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.slf4j.LoggerFactory; +import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableSet; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import reactor.core.publisher.Mono; import reactor.test.scheduler.VirtualTimeScheduler; public class PeriodicalHealthChecksTest { - private static final long PERIOD = 10; + @FunctionalInterface + interface TestingHealthCheck extends HealthCheck { + ComponentName COMPONENT_NAME = new ComponentName("testing"); + + Mono<Result> check(); + + default ComponentName componentName() { + return COMPONENT_NAME; + } + } + + public static ListAppender<ILoggingEvent> getListAppenderForClass(Class clazz) { + Logger logger = (Logger) LoggerFactory.getLogger(clazz); + + ListAppender<ILoggingEvent> loggingEventListAppender = new ListAppender<>(); + loggingEventListAppender.start(); + + logger.addAppender(loggingEventListAppender); + + return loggingEventListAppender; + } + + private static final Duration PERIOD = Duration.ofSeconds(10); private static final int EXPECTED_INVOKED_TIME = 10; private HealthCheck mockHealthCheck1; private HealthCheck mockHealthCheck2; @@ -60,27 +91,103 @@ public class PeriodicalHealthChecksTest { scheduler = VirtualTimeScheduler.getOrSet(); testee = new PeriodicalHealthChecks(ImmutableSet.of(mockHealthCheck1, mockHealthCheck2), scheduler, - new PeriodicalHealthChecksConfiguration(Duration.ofSeconds(PERIOD))); + new PeriodicalHealthChecksConfiguration(PERIOD)); } @AfterEach void tearDown() { testee.stop(); } - + @Test void startShouldCallHealthCheckAtLeastOnce() { testee.start(); - scheduler.advanceTimeBy(Duration.ofSeconds(PERIOD)); + scheduler.advanceTimeBy(PERIOD); verify(mockHealthCheck1, atLeast(1)).check(); } @Test + void startShouldLogPeriodicallyWhenUnhealthy() { + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(PeriodicalHealthChecks.class); + + TestingHealthCheck unhealthy = () -> Mono.just(Result.unhealthy(TestingHealthCheck.COMPONENT_NAME, "cause")); + testee = new PeriodicalHealthChecks(ImmutableSet.of(unhealthy), + scheduler, + new PeriodicalHealthChecksConfiguration(PERIOD)); + testee.start(); + + scheduler.advanceTimeBy(PERIOD); + assertThat(loggingEvents.list).hasSize(1) + .allSatisfy(loggingEvent -> { + assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR); + assertThat(loggingEvent.getFormattedMessage()).contains("UNHEALTHY", "testing", "cause"); + }); + } + + @Test + void startShouldLogPeriodicallyWhenDegraded() { + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(PeriodicalHealthChecks.class); + + TestingHealthCheck degraded = () -> Mono.just(Result.degraded(TestingHealthCheck.COMPONENT_NAME, "cause")); + testee = new PeriodicalHealthChecks(ImmutableSet.of(degraded), + scheduler, + new PeriodicalHealthChecksConfiguration(PERIOD)); + testee.start(); + + scheduler.advanceTimeBy(PERIOD); + assertThat(loggingEvents.list).hasSize(1) + .allSatisfy(loggingEvent -> { + assertThat(loggingEvent.getLevel()).isEqualTo(Level.WARN); + assertThat(loggingEvent.getFormattedMessage()).contains("DEGRADED", "testing", "cause"); + }); + } + + @Test + void startShouldNotLogWhenHealthy() { + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(PeriodicalHealthChecks.class); + + TestingHealthCheck healthy = () -> Mono.just(Result.healthy(TestingHealthCheck.COMPONENT_NAME)); + testee = new PeriodicalHealthChecks(ImmutableSet.of(healthy), + scheduler, + new PeriodicalHealthChecksConfiguration(PERIOD)); + testee.start(); + + scheduler.advanceTimeBy(PERIOD); + assertThat(loggingEvents.list).hasSize(0); + } + + @Test + void startShouldLogWhenMultipleHealthChecks() { + ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(PeriodicalHealthChecks.class); + + TestingHealthCheck unhealthy = () -> Mono.just(Result.unhealthy(TestingHealthCheck.COMPONENT_NAME, "cause")); + TestingHealthCheck degraded = () -> Mono.just(Result.degraded(TestingHealthCheck.COMPONENT_NAME, "cause")); + TestingHealthCheck healthy = () -> Mono.just(Result.healthy(TestingHealthCheck.COMPONENT_NAME)); + + testee = new PeriodicalHealthChecks(ImmutableSet.of(unhealthy, degraded, healthy), + scheduler, + new PeriodicalHealthChecksConfiguration(PERIOD)); + testee.start(); + + scheduler.advanceTimeBy(PERIOD); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(loggingEvents.list).hasSize(2); + softly.assertThat(loggingEvents.list.stream() + .map(event -> new Tuple(event.getLevel(), event.getFormattedMessage())) + .collect(Guavate.toImmutableList())) + .containsExactlyInAnyOrder( + new Tuple(Level.ERROR, "UNHEALTHY: testing : cause"), + new Tuple(Level.WARN, "DEGRADED: testing : cause")); + }); + } + + @Test void startShouldCallHealthCheckMultipleTimes() { testee.start(); - scheduler.advanceTimeBy(Duration.ofSeconds(PERIOD * EXPECTED_INVOKED_TIME)); + scheduler.advanceTimeBy(PERIOD.multipliedBy(EXPECTED_INVOKED_TIME)); verify(mockHealthCheck1, times(EXPECTED_INVOKED_TIME)).check(); } @@ -88,18 +195,18 @@ public class PeriodicalHealthChecksTest { void startShouldCallAllHealthChecks() { testee.start(); - scheduler.advanceTimeBy(Duration.ofSeconds(PERIOD * EXPECTED_INVOKED_TIME)); + scheduler.advanceTimeBy(PERIOD.multipliedBy(EXPECTED_INVOKED_TIME)); verify(mockHealthCheck1, times(EXPECTED_INVOKED_TIME)).check(); verify(mockHealthCheck2, times(EXPECTED_INVOKED_TIME)).check(); } @Test void startShouldCallRemainingHealthChecksWhenAHealthCheckThrows() { - when(mockHealthCheck1.check()).thenReturn(Mono.error(RuntimeException::new)); + when(mockHealthCheck1.check()).thenReturn(Mono.error(new RuntimeException())); testee.start(); - scheduler.advanceTimeBy(Duration.ofSeconds(PERIOD * EXPECTED_INVOKED_TIME)); + scheduler.advanceTimeBy(PERIOD.multipliedBy(EXPECTED_INVOKED_TIME)); verify(mockHealthCheck2, times(EXPECTED_INVOKED_TIME)).check(); } } \ No newline at end of file diff --git a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java index 32747c2..8a5a8d6 100644 --- a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java +++ b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java @@ -101,7 +101,7 @@ public class HealthCheckRoutes implements PublicRoutes { response.status(getCorrespondingStatusCode(status)); return new HeathCheckAggregationExecutionResultDto(status, mapResultToDto(results)); } - + @GET @Path("/checks/{" + PARAM_COMPONENT_NAME + "}") @ApiOperation(value = "Perform the component's health check") @@ -153,27 +153,33 @@ public class HealthCheckRoutes implements PublicRoutes { private void logFailedCheck(Result result) { switch (result.getStatus()) { - case UNHEALTHY: - if (result.getError().isPresent()) { - LOGGER.error("HealthCheck failed for {} : {}", - result.getComponentName().getName(), - result.getCause().orElse(""), - result.getError().get()); + case UNHEALTHY: + if (result.getError().isPresent()) { + LOGGER.error("HealthCheck failed for {} : {}", + result.getComponentName().getName(), + result.getCause().orElse(""), + result.getError().get()); + } else { + LOGGER.error("HealthCheck failed for {} : {}", + result.getComponentName().getName(), + result.getCause().orElse("")); + } + break; + case DEGRADED: + if (result.getError().isPresent()) { + LOGGER.warn("HealthCheck is unstable for {} : {}", + result.getComponentName().getName(), + result.getCause().orElse(""), + result.getError().get()); + } else { + LOGGER.warn("HealthCheck is unstable for {} : {}", + result.getComponentName().getName(), + result.getCause().orElse("")); + } + break; + case HEALTHY: + // Here only to fix a warning, such cases are already filtered break; - } - - LOGGER.error("HealthCheck failed for {} : {}", - result.getComponentName().getName(), - result.getCause().orElse("")); - break; - case DEGRADED: - LOGGER.warn("HealthCheck is unstable for {} : {}", - result.getComponentName().getName(), - result.getCause().orElse("")); - break; - case HEALTHY: - // Here only to fix a warning, such cases are already filtered - break; } } @@ -203,5 +209,4 @@ public class HealthCheckRoutes implements PublicRoutes { .type(ErrorResponder.ErrorType.NOT_FOUND) .haltError(); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
