This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch rabbit-polish in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d3a169a252747ac776fbbb9b941a7f9d66effc17 Author: Benoit TELLIER <[email protected]> AuthorDate: Sun Feb 11 22:14:27 2024 +0100 [RABBITMQ] Reactify some RabbitMQ related health checks --- .../events/RabbitEventBusConsumerHealthCheck.java | 2 -- .../RabbitMQMailQueueConsumerHealthCheck.java | 23 +++++++++------------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java index ceb3c8b106..8478561121 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java @@ -34,7 +34,6 @@ import com.rabbitmq.client.Channel; import reactor.core.publisher.Mono; public class RabbitEventBusConsumerHealthCheck implements HealthCheck { - public static final ComponentName COMPONENT_NAME = new ComponentName("EventbusConsumersHealthCheck"); public static final String COMPONENT = "EventbusConsumers"; private final RabbitMQEventBus eventBus; @@ -57,7 +56,6 @@ public class RabbitEventBusConsumerHealthCheck implements HealthCheck { public Mono<Result> check() { return connectionPool.getResilientConnection() .map(Throwing.function(connection -> { - try (Channel channel = connection.createChannel()) { return check(channel); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java index c3b3a44a18..12d0f34996 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java @@ -27,9 +27,9 @@ import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.core.healthcheck.ComponentName; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; -import org.apache.james.util.ReactorUtils; import com.github.fge.lambdas.Throwing; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import reactor.core.publisher.Mono; @@ -58,14 +58,13 @@ public class RabbitMQMailQueueConsumerHealthCheck implements HealthCheck { @Override public Mono<Result> check() { return connectionPool.getResilientConnection() - .map(Throwing.function(connection -> { - try (Channel channel = connection.createChannel()) { - return check(channel); - } - })).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER); + .flatMap(connection -> Mono.using(connection::createChannel, + channel -> check(connection, channel), + Throwing.consumer(Channel::close))) + .subscribeOn(Schedulers.boundedElastic()); } - private Result check(Channel channel) { + private Mono<Result> check(Connection connection, Channel channel) { boolean queueWithoutConsumers = queueFactory.listCreatedMailQueues() .stream() .map(org.apache.james.queue.api.MailQueueName::asString) @@ -74,14 +73,10 @@ public class RabbitMQMailQueueConsumerHealthCheck implements HealthCheck { .anyMatch(Throwing.predicate(queue -> channel.consumerCount(queue) == 0)); if (queueWithoutConsumers) { - reconnectionHandlers.forEach(r -> connectionPool.getResilientConnection() - .flatMap(c -> Mono.from(r.handleReconnection(c))) - .subscribeOn(Schedulers.boundedElastic()) - .block()); - - return Result.degraded(COMPONENT, "No consumers"); + return Mono.fromRunnable(() -> reconnectionHandlers.forEach(r -> r.handleReconnection(connection))) + .thenReturn(Result.degraded(COMPONENT, "No consumers")); } else { - return Result.healthy(COMPONENT); + return Mono.just(Result.healthy(COMPONENT)); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
