This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4e94085192e7843290e5261ce58a6d4c9b5e91bf Author: Tran Tien Duc <[email protected]> AuthorDate: Mon Feb 10 18:24:12 2020 +0700 JAMES-3063 Evict closed channels on retried borrowing --- .../rabbitmq/ReactorRabbitMQChannelPool.java | 49 ++++++++++++++++++---- .../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 18 +++----- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index 5ccfbe1..2f789bb 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -19,6 +19,7 @@ package org.apache.james.backends.rabbitmq; +import java.io.IOException; import java.time.Duration; import java.util.Comparator; import java.util.concurrent.ConcurrentSkipListSet; @@ -37,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; -import com.google.common.base.Preconditions; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -52,7 +52,12 @@ import reactor.rabbitmq.Sender; import reactor.rabbitmq.SenderOptions; public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { - private static final int MAX_CHANNELS_NUMBER = 5; + + private static class ChannelClosedException extends IOException { + ChannelClosedException(String message) { + super(message); + } + } static class ChannelFactory extends BasePooledObjectFactory<Channel> { @@ -60,7 +65,6 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { private static final int MAX_RETRIES = 5; private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100); - private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); private final Mono<Connection> connectionMono; @@ -69,7 +73,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { } @Override - public Channel create() throws Exception { + public Channel create() { return connectionMono .flatMap(this::openChannel) .block(); @@ -97,7 +101,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { } } + private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class); private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis(); + private static final int MAX_CHANNELS_NUMBER = 5; + private static final int MAX_BORROW_RETRIES = 3; + private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50); + private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); private final Mono<Connection> connectionMono; private final GenericObjectPool<Channel> pool; @@ -140,13 +149,39 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { return Mono.fromCallable(this::borrow); } - private Channel borrow() throws Exception { - Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS); - Preconditions.checkArgument(channel.isOpen()); + private Channel borrow() { + Channel channel = tryBorrowFromPool(); borrowedChannels.add(channel); return channel; } + private Channel tryBorrowFromPool() { + return Mono.fromCallable(this::borrowFromPool) + .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable)) + .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic()) + .onErrorMap(this::propagateException) + .subscribeOn(Schedulers.elastic()) + .block(); + } + + private Throwable propagateException(Throwable throwable) { + if (throwable instanceof IllegalStateException + && throwable.getMessage().contains("Retries exhausted")) { + return throwable.getCause(); + } + + return throwable; + } + + private Channel borrowFromPool() throws Exception { + Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS); + if (!channel.isOpen()) { + invalidateObject(channel); + throw new ChannelClosedException("borrowed channel is already closed"); + } + return channel; + } + @Override public BiConsumer<SignalType, Channel> getChannelCloseHandler() { return (signalType, channel) -> { diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java index 8927930..f4e6d62 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java @@ -20,7 +20,6 @@ package org.apache.james.backends.rabbitmq; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Duration; @@ -32,7 +31,6 @@ import java.util.concurrent.ExecutionException; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -110,22 +108,18 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract { assertThat(channel.isOpen()).isFalse(); } - @Disabled("IllegalArgumentException on channel state checks") @Test void channelBorrowShouldNotThrowWhenClosedChannel() throws Exception { ChannelPool channelPool = generateChannelPool(1); Channel channel = channelPool.getChannelMono().block(); - returnChannel(channel, channelPool); + returnToThePool(channelPool, channel); - // unexpected closing, connection timeout, rabbitmq temporally down... + // unexpected closing, connection timeout, rabbitmq temporary down... channel.close(); - assertThatCode(() -> channelPool.getChannelMono().block()) - .doesNotThrowAnyException(); - } - - private void returnChannel(Channel channel, ChannelPool channelPool) { - channelPool.getChannelCloseHandler() - .accept(SignalType.ON_COMPLETE, channel); + assertThat(channelPool.getChannelMono() + .block() + .isOpen()) + .isTrue(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
