MAILBOX-374 Our connection mono should cache the connection as long as it is open
We should regenerate the connection mono when the connection had been close, for instance due to error. A custom callable had been written as I miss a `Mono<T>.cacheWhen(Predicate<T>)` Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/edf85300 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/edf85300 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/edf85300 Branch: refs/heads/master Commit: edf85300021eafd1ed1ef944d291c18bcfd9d996 Parents: 603de62 Author: Benoit Tellier <btell...@linagora.com> Authored: Thu Jan 24 14:13:56 2019 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Fri Jan 25 10:33:25 2019 +0700 ---------------------------------------------------------------------- .../rabbitmq/RabbitMQConnectionFactory.java | 32 ++++++++++++++++---- .../james/mailbox/events/RabbitMQEventBus.java | 6 ++-- 2 files changed, 30 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/edf85300/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java index e02fb28..c13b21b 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java @@ -19,6 +19,8 @@ package org.apache.james.backend.rabbitmq; import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; import javax.inject.Inject; @@ -29,16 +31,34 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class RabbitMQConnectionFactory { + private class ConnectionCallable implements Callable<Connection> { + private final ConnectionFactory connectionFactory; + private Optional<Connection> connection; + + ConnectionCallable(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + connection = Optional.empty(); + } + + @Override + public Connection call() throws Exception { + if (connection.map(Connection::isOpen).orElse(false)) { + return connection.get(); + } + Connection newConnection = connectionFactory.newConnection(); + connection = Optional.of(newConnection); + return newConnection; + } + } + private final ConnectionFactory connectionFactory; - private final int maxRetries; - private final int minDelay; + private final RabbitMQConfiguration configuration; @Inject public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) { this.connectionFactory = from(rabbitMQConfiguration); - this.maxRetries = rabbitMQConfiguration.getMaxRetries(); - this.minDelay = rabbitMQConfiguration.getMinDelay(); + this.configuration = rabbitMQConfiguration; } private ConnectionFactory from(RabbitMQConfiguration rabbitMQConfiguration) { @@ -56,8 +76,8 @@ public class RabbitMQConnectionFactory { } public Mono<Connection> connectionMono() { - return Mono.fromCallable(connectionFactory::newConnection) - .retryBackoff(maxRetries, Duration.ofMillis(minDelay)) + return Mono.fromCallable(new ConnectionCallable(connectionFactory)) + .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelay())) .publishOn(Schedulers.elastic()); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/edf85300/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index b203032..ecfeb0b 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -31,6 +31,7 @@ import org.apache.james.mailbox.Event; import org.apache.james.mailbox.MailboxListener; import org.apache.james.metrics.api.MetricFactory; +import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; @@ -64,7 +65,7 @@ public class RabbitMQEventBus implements EventBus { EventDeadLetters eventDeadLetters, MetricFactory metricFactory) { this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory); this.eventBusId = EventBusId.random(); - this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache(); + this.connectionMono = rabbitMQConnectionFactory.connectionMono(); this.eventSerializer = eventSerializer; this.routingKeyConverter = routingKeyConverter; this.retryBackoff = retryBackoff; @@ -74,7 +75,8 @@ public class RabbitMQEventBus implements EventBus { public void start() { if (!isRunning.get()) { - sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)); + sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono) + .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel)))); MailboxListenerRegistry mailboxListenerRegistry = new MailboxListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, mailboxListenerRegistry, mailboxListenerExecutor); groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org