This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0b7669b9b85eb2bcf81c0963410555979c091d47 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Wed Aug 28 10:00:25 2019 +0200 JAMES-2813 Introduce ReactorRabbitMQChannelPool.createSender --- .../backend/rabbitmq/ReactorRabbitMQChannelPool.java | 15 +++++++++++++++ .../org/apache/james/mailbox/events/RabbitMQEventBus.java | 11 ++--------- .../distributed/RabbitMQTerminationSubscriber.java | 8 +------- .../task/eventsourcing/distributed/RabbitMQWorkQueue.java | 9 +-------- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java index f05a307..814e70a 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java @@ -29,9 +29,11 @@ import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -39,6 +41,9 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.ChannelPool; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Sender; +import reactor.rabbitmq.SenderOptions; public class ReactorRabbitMQChannelPool implements ChannelPool { @@ -87,10 +92,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis(); + private final Mono<Connection> connectionMono; private final GenericObjectPool<Channel> pool; private final ConcurrentSkipListSet<Channel> borrowedChannels; public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize) { + this.connectionMono = connectionMono; ChannelFactory channelFactory = new ChannelFactory(connectionMono); GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>(); @@ -120,6 +127,14 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { }; } + public Sender createSender() { + return RabbitFlux.createSender(new SenderOptions() + .connectionMono(connectionMono) + .channelPool(this) + .resourceManagementChannelMono( + connectionMono.map(Throwing.function(Connection::createChannel)).cache())); + } + private void invalidateObject(Channel channel) { try { pool.invalidateObject(channel); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index bd53e93..36795c6 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -30,16 +30,10 @@ import org.apache.james.event.json.EventSerializer; import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.MetricFactory; -import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; - import reactor.core.publisher.Mono; -import reactor.rabbitmq.ChannelPool; -import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Sender; -import reactor.rabbitmq.SenderOptions; public class RabbitMQEventBus implements EventBus, Startable { private static final int MAX_CHANNELS_NUMBER = 5; @@ -58,7 +52,7 @@ public class RabbitMQEventBus implements EventBus, Startable { private volatile boolean isRunning; private volatile boolean isStopping; - private ChannelPool channelPool; + private ReactorRabbitMQChannelPool channelPool; private GroupRegistrationHandler groupRegistrationHandler; private KeyRegistrationHandler keyRegistrationHandler; EventDispatcher eventDispatcher; @@ -84,8 +78,7 @@ public class RabbitMQEventBus implements EventBus, Startable { if (!isRunning && !isStopping) { this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); - sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono).channelPool(channelPool) - .resourceManagementChannelMono(connectionMono.map(Throwing.<Connection, Channel>function(Connection::createChannel).sneakyThrow()).cache())); + sender = channelPool.createSender(); LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor); groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor); diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index a3a8bb1..89d6e96 100644 --- a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; -import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; @@ -56,7 +55,6 @@ import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; -import reactor.rabbitmq.SenderOptions; public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class); @@ -83,11 +81,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta } public void start() { - Sender sender = RabbitFlux.createSender(new SenderOptions() - .connectionMono(connectionMono) - .channelPool(channelPool) - .resourceManagementChannelMono( - connectionMono.map(Throwing.function(Connection::createChannel)).cache())); + Sender sender = channelPool.createSender(); sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block(); diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 69cdf16..e954c31 100644 --- a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; -import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; @@ -51,10 +50,8 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; -import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; -import reactor.rabbitmq.SenderOptions; public class RabbitMQWorkQueue implements WorkQueue, Startable { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class); @@ -80,11 +77,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } public void start() { - sender = RabbitFlux.createSender(new SenderOptions() - .connectionMono(connectionMono) - .channelPool(channelPool) - .resourceManagementChannelMono( - connectionMono.map(Throwing.function(Connection::createChannel)).cache())); + sender = channelPool.createSender(); receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
