This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 27d3fa4208f77b89b6813c58951692a697ef26cc Author: Benoit Tellier <[email protected]> AuthorDate: Thu Oct 31 13:44:32 2019 +0700 JAMES-2937 RabbitMQTerminationSubscriber should not initialize his own channelPool --- .../distributed/RabbitMQTerminationSubscriber.java | 14 +++----------- .../distributed/RabbitMQTerminationSubscriberTest.java | 3 ++- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index dbb4b8f..44a17f5 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -29,7 +29,6 @@ import javax.annotation.PreDestroy; import javax.inject.Inject; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; import org.apache.james.lifecycle.api.Startable; @@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; import reactor.core.Disposable; @@ -52,9 +50,7 @@ import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; -import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; -import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable { @@ -65,7 +61,6 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta private static final String ROUTING_KEY = "terminationSubscriberRoutingKey"; private final JsonEventSerializer serializer; - private final Mono<Connection> connectionMono; private final ReactorRabbitMQChannelPool channelPool; private final String queueName; private UnicastProcessor<OutboundMessage> sendQueue; @@ -76,15 +71,13 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta private Sender sender; @Inject - public RabbitMQTerminationSubscriber(SimpleConnectionPool simpleConnectionPool, JsonEventSerializer serializer) { + RabbitMQTerminationSubscriber(ReactorRabbitMQChannelPool channelPool, JsonEventSerializer serializer) { this.serializer = serializer; - this.connectionMono = simpleConnectionPool.getResilientConnection(); - this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); + this.channelPool = channelPool; this.queueName = QUEUE_NAME_PREFIX + UUID.randomUUID().toString(); } public void start() { - channelPool.start(); sender = channelPool.getSender(); sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); @@ -96,7 +89,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta .subscribeOn(Schedulers.boundedElastic()) .subscribe(); - listenerReceiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + listenerReceiver = channelPool.createReceiver(); listener = DirectProcessor.create(); listenQueueHandle = listenerReceiver .consumeAutoAck(queueName) @@ -141,6 +134,5 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose); Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close); Optional.ofNullable(sender).ifPresent(Sender::close); - channelPool.close(); } } diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java index d878c69..e826690 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import com.github.steveash.guavate.Guavate; + import reactor.core.publisher.Flux; class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract { @@ -53,7 +54,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract @Override public TerminationSubscriber subscriber() { - RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), SERIALIZER); + RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitChannelPool(), SERIALIZER); subscriber.start(); return subscriber; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
