This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9a115d8fa1be464c51c74ff17ec1ec1dec7d6729
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Oct 31 13:45:17 2019 +0700

    JAMES-2937 RabbitMQExtension should be responsible of channelPool lifecycle
---
 .../james/mailbox/events/RabbitMQEventBusTest.java | 46 ++++++----------------
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   |  3 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |  9 +----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 10 +----
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java     |  9 +----
 .../distributed/DistributedTaskManagerTest.java    | 10 +----
 .../distributed/RabbitMQWorkQueueTest.java         | 11 ++----
 7 files changed, 24 insertions(+), 74 deletions(-)

diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 2b92e20..1f0ffc8 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -75,17 +75,11 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
 
 class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract, 
GroupContract.MultipleEventBusGroupContract,
     KeyContract.SingleEventBusKeyContract, 
KeyContract.MultipleEventBusKeyContract,
@@ -100,8 +94,6 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     private EventSerializer eventSerializer;
     private RoutingKeyConverter routingKeyConverter;
     private MemoryEventDeadLetters memoryEventDeadLetters;
-    private Mono<Connection> resilientConnection;
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @BeforeEach
     void setUp() {
@@ -110,8 +102,6 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         TestId.Factory mailboxIdFactory = new TestId.Factory();
         eventSerializer = new EventSerializer(mailboxIdFactory, new 
TestMessageId.Factory(), new 
DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
         routingKeyConverter = RoutingKeyConverter.forFactories(new 
MailboxIdRegistrationKey.Factory(mailboxIdFactory));
-        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
-        reactorRabbitMQChannelPool.start();
 
         eventBus = newEventBus();
         eventBus2 = newEventBus();
@@ -120,7 +110,6 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         eventBus.start();
         eventBus2.start();
         eventBus3.start();
-        resilientConnection = 
rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
     }
 
     @AfterEach
@@ -130,13 +119,15 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         eventBus3.stop();
         ALL_GROUPS.stream()
             .map(GroupRegistration.WorkQueueName::of)
-            .forEach(queueName -> 
reactorRabbitMQChannelPool.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
-        
reactorRabbitMQChannelPool.getSender().delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
-        reactorRabbitMQChannelPool.close();
+            .forEach(queueName -> 
rabbitMQExtension.getRabbitChannelPool().getSender().delete(QueueSpecification.queue(queueName.asString())).block());
+        rabbitMQExtension.getRabbitChannelPool()
+            .getSender()
+            
.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME))
+            .block();
     }
 
     private RabbitMQEventBus newEventBus() {
-        return newEventBus(reactorRabbitMQChannelPool);
+        return newEventBus(rabbitMQExtension.getRabbitChannelPool());
     }
 
     private RabbitMQEventBus newEventBus(ReactorRabbitMQChannelPool 
rabbitMQChannelPool) {
@@ -262,31 +253,24 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + 
"-workQueue";
-        private Sender sender1;
 
         @BeforeEach
         void setUp() {
-            SenderOptions senderOption = new 
SenderOptions().connectionMono(resilientConnection);
-            sender1 = RabbitFlux.createSender(senderOption);
+            Sender sender = 
rabbitMQExtension.getRabbitChannelPool().getSender();
 
-            
sender1.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+            
sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS))
                 .block();
-            sender1.bind(BindingSpecification.binding()
+            sender.bind(BindingSpecification.binding()
                 .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
                 .queue(MAILBOX_WORK_QUEUE_NAME)
                 .routingKey(EMPTY_ROUTING_KEY))
                 .block();
         }
 
-        @AfterEach
-        void tearDown() {
-            sender1.close();
-        }
-
         @Test
         void dispatchShouldPublishSerializedEventToRabbitMQ() {
             eventBus.dispatch(EVENT, NO_KEYS).block();
@@ -302,7 +286,7 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         }
 
         private Event dequeueEvent() {
-            try (Receiver receiver = RabbitFlux.createReceiver(new 
ReceiverOptions().connectionMono(resilientConnection))) {
+            try (Receiver receiver = 
rabbitMQExtension.getRabbitChannelPool().createReceiver()) {
                 byte[] eventInBytes = 
receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
                     .blockFirst()
                     .getBody();
@@ -341,18 +325,10 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
                     .restartPolicy(DockerRestartPolicy.PER_TEST);
 
                 private RabbitMQEventBus rabbitMQEventBusWithNetWorkIssue;
-                private ReactorRabbitMQChannelPool 
reactorRabbitMQChannelPoolWithNetWorkIssue;
 
                 @BeforeEach
                 void beforeEach() {
-                    reactorRabbitMQChannelPoolWithNetWorkIssue = new 
ReactorRabbitMQChannelPool(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool());
-                    reactorRabbitMQChannelPoolWithNetWorkIssue.start();
-                    rabbitMQEventBusWithNetWorkIssue = 
newEventBus(reactorRabbitMQChannelPoolWithNetWorkIssue);
-                }
-
-                @AfterEach
-                void afterEach() {
-                    reactorRabbitMQChannelPoolWithNetWorkIssue.close();
+                    rabbitMQEventBusWithNetWorkIssue = 
newEventBus(rabbitMQNetWorkIssueExtension.getRabbitChannelPool());
                 }
 
                 @Test
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index b48f60f..7c0325a 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -180,7 +180,8 @@ public class RabbitMQMailQueueFactory implements 
MailQueueFactory<RabbitMQMailQu
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS)),
-            
reactorRabbitMQChannelPool.getSender().bind(BindingSpecification.binding()
+            reactorRabbitMQChannelPool.getSender()
+                .bind(BindingSpecification.binding()
                 .exchange(mailQueueName.toRabbitExchangeName().asString())
                 .queue(mailQueueName.toWorkQueueName().asString())
                 .routingKey(EMPTY_ROUTING_KEY)))
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 4ad6f85..a67fd34 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -36,7 +36,6 @@ import 
org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
@@ -90,7 +89,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
     private UpdatableTickingClock clock;
     private RabbitMQMailQueueManagement mqManagementApi;
     private MimeMessageStore.Factory mimeMessageStoreFactory;
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @BeforeEach
     void setup(CassandraCluster cassandra) throws Exception {
@@ -98,14 +96,11 @@ class RabbitMQMailQueueConfigurationChangeTest {
         mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
         clock = new UpdatableTickingClock(IN_SLICE_1);
         mqManagementApi = new 
RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
-        reactorRabbitMQChannelPool.start();
     }
 
     @AfterEach
     void tearDown() {
         mqManagementApi.deleteAllQueues();
-        reactorRabbitMQChannelPool.close();
     }
 
     private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, 
CassandraMailQueueViewConfiguration mailQueueViewConfiguration) throws 
Exception {
@@ -123,14 +118,14 @@ class RabbitMQMailQueueConfigurationChangeTest {
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new 
RabbitMQMailQueueFactory.PrivateFactory(
             new NoopMetricFactory(),
             new NoopGaugeRegistry(),
-            reactorRabbitMQChannelPool,
+            rabbitMQExtension.getRabbitChannelPool(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
             clock,
             new RawMailQueueItemDecoratorFactory(),
             mailQueueSizeConfiguration);
-        RabbitMQMailQueueFactory mailQueueFactory = new 
RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, 
privateFactory);
+        RabbitMQMailQueueFactory mailQueueFactory = new 
RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), 
mqManagementApi, privateFactory);
         return mailQueueFactory.createQueue(SPOOL);
     }
 
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 0102a05..9ba2b50 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -38,13 +38,11 @@ import 
org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
 import org.apache.james.blob.mail.MimeMessageStore;
 import 
org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
-
 import org.apache.james.metrics.api.Gauge;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueMetricContract;
@@ -99,11 +97,9 @@ class RabbitMQMailQueueTest {
     private UpdatableTickingClock clock;
     private RabbitMQMailQueue mailQueue;
     private RabbitMQMailQueueManagement mqManagementApi;
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @AfterEach
     void tearDown() {
-        reactorRabbitMQChannelPool.close();
         mqManagementApi.deleteAllQueues();
     }
 
@@ -275,12 +271,10 @@ class RabbitMQMailQueueTest {
                 .build(),
             mimeMessageStoreFactory);
 
-        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
-        reactorRabbitMQChannelPool.start();
         RabbitMQMailQueueFactory.PrivateFactory factory = new 
RabbitMQMailQueueFactory.PrivateFactory(
             metricTestSystem.getMetricFactory(),
             metricTestSystem.getSpyGaugeRegistry(),
-            reactorRabbitMQChannelPool,
+            rabbitMQExtension.getRabbitChannelPool(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -288,7 +282,7 @@ class RabbitMQMailQueueTest {
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new 
RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new 
RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, factory);
+        mailQueueFactory = new 
RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), 
mqManagementApi, factory);
         mailQueue = mailQueueFactory.createQueue(SPOOL);
     }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 8ac46b3..81a5e18 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
@@ -54,7 +53,6 @@ class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQM
 
     private RabbitMQMailQueueFactory mailQueueFactory;
     private RabbitMQMailQueueManagement mqManagementApi;
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @BeforeEach
     void setup() throws Exception {
@@ -68,12 +66,10 @@ class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQM
             .sizeMetricsEnabled(true)
             .build();
 
-        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
-        reactorRabbitMQChannelPool.start();
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new 
RabbitMQMailQueueFactory.PrivateFactory(
             new NoopMetricFactory(),
             new NoopGaugeRegistry(),
-            reactorRabbitMQChannelPool,
+            rabbitMQExtension.getRabbitChannelPool(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -81,13 +77,12 @@ class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQM
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new 
RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new 
RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, 
privateFactory);
+        mailQueueFactory = new 
RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), 
mqManagementApi, privateFactory);
     }
 
     @AfterEach
     void tearDown() {
         mqManagementApi.deleteAllQueues();
-        reactorRabbitMQChannelPool.close();
     }
 
     @Override
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 83c60d8..d6ed9d3 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -73,9 +73,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.github.steveash.guavate.Guavate;
 
 class DistributedTaskManagerTest implements TaskManagerContract {
-
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
-
     private static class TrackedRabbitMQWorkQueueSupplier implements 
WorkQueueSupplier {
         private final List<RabbitMQWorkQueue> workQueues;
         private final RabbitMQWorkQueueSupplier supplier;
@@ -141,9 +138,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
     @BeforeEach
     void setUp(EventStore eventStore) {
-        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
-        reactorRabbitMQChannelPool.start();
-        workQueueSupplier = new 
TrackedRabbitMQWorkQueueSupplier(reactorRabbitMQChannelPool, TASK_SERIALIZER);
+        workQueueSupplier = new 
TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitChannelPool(), 
TASK_SERIALIZER);
         this.eventStore = eventStore;
         terminationSubscribers = new ArrayList<>();
     }
@@ -152,7 +147,6 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     void tearDown() {
         terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close);
         workQueueSupplier.stopWorkQueues();
-        reactorRabbitMQChannelPool.close();
     }
 
     public EventSourcingTaskManager taskManager() {
@@ -160,7 +154,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     }
 
     private EventSourcingTaskManager taskManager(Hostname hostname) {
-        RabbitMQTerminationSubscriber terminationSubscriber = new 
RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), 
EVENT_SERIALIZER);
+        RabbitMQTerminationSubscriber terminationSubscriber = new 
RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitChannelPool(), 
EVENT_SERIALIZER);
         terminationSubscribers.add(terminationSubscriber);
         terminationSubscriber.start();
         return new EventSourcingTaskManager(workQueueSupplier, eventStore, 
executionDetailsProjection, hostname, terminationSubscriber);
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 5468af1..fefb6f1 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.TestTask;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
@@ -69,7 +68,6 @@ class RabbitMQWorkQueueTest {
     private RabbitMQWorkQueue testee;
     private ImmediateWorker worker;
     private JsonTaskSerializer serializer;
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     private static class ImmediateWorker implements TaskManagerWorker {
 
@@ -103,16 +101,13 @@ class RabbitMQWorkQueueTest {
     void setUp() {
         worker = spy(new ImmediateWorker());
         serializer = new 
JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
-        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
-        reactorRabbitMQChannelPool.start();
-        testee = new RabbitMQWorkQueue(worker, reactorRabbitMQChannelPool, 
serializer);
+        testee = new RabbitMQWorkQueue(worker, 
rabbitMQExtension.getRabbitChannelPool(), serializer);
         testee.start();
     }
 
     @AfterEach
     void tearDown() {
         testee.close();
-        reactorRabbitMQChannelPool.close();
     }
 
     @Test
@@ -137,7 +132,7 @@ class RabbitMQWorkQueueTest {
         testee.submit(TASK_WITH_ID);
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
-        try (RabbitMQWorkQueue otherWorkQueue = new 
RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, 
serializer)) {
+        try (RabbitMQWorkQueue otherWorkQueue = new 
RabbitMQWorkQueue(otherTaskManagerWorker, 
rabbitMQExtension.getRabbitChannelPool(), serializer)) {
             otherWorkQueue.start();
 
             IntStream.range(0, 9)
@@ -156,7 +151,7 @@ class RabbitMQWorkQueueTest {
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
         JsonTaskSerializer otherTaskSerializer = new 
JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE);
-        try (RabbitMQWorkQueue otherWorkQueue = new 
RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, 
otherTaskSerializer)) {
+        try (RabbitMQWorkQueue otherWorkQueue = new 
RabbitMQWorkQueue(otherTaskManagerWorker, 
rabbitMQExtension.getRabbitChannelPool(), otherTaskSerializer)) {
             //wait to be sur that the first workqueue has subscribed as an 
exclusive consumer of the RabbitMQ queue.
             Thread.sleep(200);
             otherWorkQueue.start();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to