This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4d0da670f68c45776477d90a2617074b9c13adad Author: Benoit Tellier <[email protected]> AuthorDate: Wed Apr 8 22:00:52 2020 +0700 JAMES-2774 Avoid nested block in ReactorRabbitMQChannelPool --- .../rabbitmq/ReactorRabbitMQChannelPool.java | 24 ++++++++-------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index db3e900..0459abe 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -138,28 +138,22 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); } - public Mono<Connection> getConnectionMono() { - return connectionMono; - } - @Override public Mono<? extends Channel> getChannelMono() { - return Mono.fromCallable(this::borrow); + return borrow(); } - private Channel borrow() { - Channel channel = tryBorrowFromPool(); - borrowedChannels.add(channel); - return channel; - } - - private Channel tryBorrowFromPool() { - return Mono.fromCallable(this::borrowFromPool) + private Mono<Channel> borrow() { + return tryBorrowFromPool() .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable)) .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic()) .onErrorMap(this::propagateException) .subscribeOn(Schedulers.elastic()) - .block(); + .doOnNext(borrowedChannels::add); + } + + private Mono<Channel> tryBorrowFromPool() { + return Mono.fromCallable(this::borrowFromPool); } private Throwable propagateException(Throwable throwable) { @@ -223,7 +217,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { public boolean tryChannel() { Channel channel = null; try { - channel = borrow(); + channel = borrow().block(); return channel.isOpen(); } catch (Throwable t) { return false; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
