This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 27d3fa4208f77b89b6813c58951692a697ef26cc
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Oct 31 13:44:32 2019 +0700

    JAMES-2937 RabbitMQTerminationSubscriber should not initialize his own
    channelPool
---
 .../distributed/RabbitMQTerminationSubscriber.java         | 14 +++-----------
 .../distributed/RabbitMQTerminationSubscriberTest.java     |  3 ++-
 2 files changed, 5 insertions(+), 12 deletions(-)

diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index dbb4b8f..44a17f5 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -29,7 +29,6 @@ import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
 import org.apache.james.lifecycle.api.Startable;
@@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
 
 import reactor.core.Disposable;
@@ -52,9 +50,7 @@ import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 public class RabbitMQTerminationSubscriber implements TerminationSubscriber, 
Startable, Closeable {
@@ -65,7 +61,6 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
     private static final String ROUTING_KEY = 
"terminationSubscriberRoutingKey";
 
     private final JsonEventSerializer serializer;
-    private final Mono<Connection> connectionMono;
     private final ReactorRabbitMQChannelPool channelPool;
     private final String queueName;
     private UnicastProcessor<OutboundMessage> sendQueue;
@@ -76,15 +71,13 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
     private Sender sender;
 
     @Inject
-    public RabbitMQTerminationSubscriber(SimpleConnectionPool 
simpleConnectionPool, JsonEventSerializer serializer) {
+    RabbitMQTerminationSubscriber(ReactorRabbitMQChannelPool channelPool, 
JsonEventSerializer serializer) {
         this.serializer = serializer;
-        this.connectionMono = simpleConnectionPool.getResilientConnection();
-        this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, 
MAX_CHANNELS_NUMBER);
+        this.channelPool = channelPool;
         this.queueName = QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
     }
 
     public void start() {
-        channelPool.start();
         sender = channelPool.getSender();
 
         
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
@@ -96,7 +89,7 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();
 
-        listenerReceiver = RabbitFlux.createReceiver(new 
ReceiverOptions().connectionMono(connectionMono));
+        listenerReceiver = channelPool.createReceiver();
         listener = DirectProcessor.create();
         listenQueueHandle = listenerReceiver
             .consumeAutoAck(queueName)
@@ -141,6 +134,5 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
         Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close);
         Optional.ofNullable(sender).ifPresent(Sender::close);
-        channelPool.close();
     }
 }
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index d878c69..e826690 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.steveash.guavate.Guavate;
+
 import reactor.core.publisher.Flux;
 
 class RabbitMQTerminationSubscriberTest implements 
TerminationSubscriberContract {
@@ -53,7 +54,7 @@ class RabbitMQTerminationSubscriberTest implements 
TerminationSubscriberContract
 
     @Override
     public TerminationSubscriber subscriber() {
-        RabbitMQTerminationSubscriber subscriber = new 
RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), 
SERIALIZER);
+        RabbitMQTerminationSubscriber subscriber = new 
RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitChannelPool(), 
SERIALIZER);
         subscriber.start();
         return subscriber;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to