This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch rabbit-polish
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d3a169a252747ac776fbbb9b941a7f9d66effc17
Author: Benoit TELLIER <[email protected]>
AuthorDate: Sun Feb 11 22:14:27 2024 +0100

    [RABBITMQ] Reactify some RabbitMQ related health checks
---
 .../events/RabbitEventBusConsumerHealthCheck.java  |  2 --
 .../RabbitMQMailQueueConsumerHealthCheck.java      | 23 +++++++++-------------
 2 files changed, 9 insertions(+), 16 deletions(-)

diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
index ceb3c8b106..8478561121 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
@@ -34,7 +34,6 @@ import com.rabbitmq.client.Channel;
 import reactor.core.publisher.Mono;
 
 public class RabbitEventBusConsumerHealthCheck implements HealthCheck {
-    public static final ComponentName COMPONENT_NAME = new 
ComponentName("EventbusConsumersHealthCheck");
     public static final String COMPONENT = "EventbusConsumers";
 
     private final RabbitMQEventBus eventBus;
@@ -57,7 +56,6 @@ public class RabbitEventBusConsumerHealthCheck implements 
HealthCheck {
     public Mono<Result> check() {
         return connectionPool.getResilientConnection()
             .map(Throwing.function(connection -> {
-
                 try (Channel channel = connection.createChannel()) {
                     return check(channel);
                 }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
index c3b3a44a18..12d0f34996 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConsumerHealthCheck.java
@@ -27,9 +27,9 @@ import 
org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
-import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Channel;
 
 import reactor.core.publisher.Mono;
@@ -58,14 +58,13 @@ public class RabbitMQMailQueueConsumerHealthCheck 
implements HealthCheck {
     @Override
     public Mono<Result> check() {
         return connectionPool.getResilientConnection()
-            .map(Throwing.function(connection -> {
-                try (Channel channel = connection.createChannel()) {
-                    return check(channel);
-                }
-            })).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+            .flatMap(connection -> Mono.using(connection::createChannel,
+                channel -> check(connection, channel),
+                Throwing.consumer(Channel::close)))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
-    private Result check(Channel channel) {
+    private Mono<Result> check(Connection connection, Channel channel) {
         boolean queueWithoutConsumers = queueFactory.listCreatedMailQueues()
             .stream()
             .map(org.apache.james.queue.api.MailQueueName::asString)
@@ -74,14 +73,10 @@ public class RabbitMQMailQueueConsumerHealthCheck 
implements HealthCheck {
             .anyMatch(Throwing.predicate(queue -> channel.consumerCount(queue) 
== 0));
 
         if (queueWithoutConsumers) {
-            reconnectionHandlers.forEach(r -> 
connectionPool.getResilientConnection()
-                .flatMap(c -> Mono.from(r.handleReconnection(c)))
-                .subscribeOn(Schedulers.boundedElastic())
-                .block());
-
-            return Result.degraded(COMPONENT, "No consumers");
+            return Mono.fromRunnable(() -> reconnectionHandlers.forEach(r -> 
r.handleReconnection(connection)))
+                .thenReturn(Result.degraded(COMPONENT, "No consumers"));
         } else {
-            return Result.healthy(COMPONENT);
+            return Mono.just(Result.healthy(COMPONENT));
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to