This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4d0da670f68c45776477d90a2617074b9c13adad
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Apr 8 22:00:52 2020 +0700

    JAMES-2774 Avoid nested block in ReactorRabbitMQChannelPool
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 24 ++++++++--------------
 1 file changed, 9 insertions(+), 15 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index db3e900..0459abe 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -138,28 +138,22 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
         return RabbitFlux.createReceiver(new 
ReceiverOptions().connectionMono(connectionMono));
     }
 
-    public Mono<Connection> getConnectionMono() {
-        return connectionMono;
-    }
-
     @Override
     public Mono<? extends Channel> getChannelMono() {
-        return Mono.fromCallable(this::borrow);
+        return borrow();
     }
 
-    private Channel borrow() {
-        Channel channel = tryBorrowFromPool();
-        borrowedChannels.add(channel);
-        return channel;
-    }
-
-    private Channel tryBorrowFromPool() {
-        return Mono.fromCallable(this::borrowFromPool)
+    private Mono<Channel> borrow() {
+        return tryBorrowFromPool()
             .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", 
throwable))
             .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, 
Schedulers.elastic())
             .onErrorMap(this::propagateException)
             .subscribeOn(Schedulers.elastic())
-            .block();
+            .doOnNext(borrowedChannels::add);
+    }
+
+    private Mono<Channel> tryBorrowFromPool() {
+        return Mono.fromCallable(this::borrowFromPool);
     }
 
     private Throwable propagateException(Throwable throwable) {
@@ -223,7 +217,7 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
     public boolean tryChannel() {
         Channel channel = null;
         try {
-            channel = borrow();
+            channel = borrow().block();
             return channel.isOpen();
         } catch (Throwable t) {
             return false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to