This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ca12220c101ac6cdce4377fe1de4c7d975e4c205
Author: Matthieu Baechler <[email protected]>
AuthorDate: Tue Mar 12 11:11:24 2019 +0100

    JAMES-2683 Plug the thread leak related to Connection and Channel management
    
        Make sure only SimpleConnectionPool can create a connection
        and always call the close method when done with the pool
---
 .../backend/rabbitmq/RabbitMQChannelPool.java      |  2 +
 .../rabbitmq/RabbitMQConnectionFactory.java        | 28 +------
 .../backend/rabbitmq/RabbitMQHealthCheck.java      | 19 ++---
 .../james/backend/rabbitmq/SimpleChannelPool.java  | 79 ++++++++++++-------
 .../backend/rabbitmq/SimpleConnectionPool.java     | 90 ++++++++++++++++++++++
 .../james/backend/rabbitmq/RabbitMQExtension.java  | 16 ++--
 .../james/mailbox/events/RabbitMQEventBus.java     |  6 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 45 +++++------
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  |  8 +-
 9 files changed, 191 insertions(+), 102 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
index f26efbd..e8fef49 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
@@ -49,5 +49,7 @@ public interface RabbitMQChannelPool {
 
     Flux<AcknowledgableDelivery> receive(String queueName);
 
+    boolean tryConnection();
+
     void close() throws Exception;
 }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index 151e983..1f0e329 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -19,37 +19,15 @@
 package org.apache.james.backend.rabbitmq;
 
 import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.Callable;
 
 import javax.inject.Inject;
 
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class RabbitMQConnectionFactory {
-    private class ConnectionCallable implements Callable<Connection> {
-        private final ConnectionFactory connectionFactory;
-        private Optional<Connection> connection;
-
-        ConnectionCallable(ConnectionFactory connectionFactory) {
-            this.connectionFactory = connectionFactory;
-            connection = Optional.empty();
-        }
-
-        @Override
-        public synchronized Connection call() throws Exception {
-            if (connection.map(Connection::isOpen).orElse(false)) {
-                return connection.get();
-            }
-            Connection newConnection = connectionFactory.newConnection();
-            connection = Optional.of(newConnection);
-            return newConnection;
-        }
-    }
 
     private final ConnectionFactory connectionFactory;
 
@@ -71,12 +49,12 @@ public class RabbitMQConnectionFactory {
         }
     }
 
-    public Connection create() {
+    Connection create() {
         return connectionMono().block();
     }
 
-    public Mono<Connection> connectionMono() {
-        return Mono.fromCallable(new ConnectionCallable(connectionFactory))
+    Mono<Connection> connectionMono() {
+        return Mono.fromCallable(connectionFactory::newConnection)
             .retryBackoff(configuration.getMaxRetries(), 
Duration.ofMillis(configuration.getMinDelayInMs()))
             .publishOn(Schedulers.elastic());
     }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
index 17a7ead..0290a47 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
@@ -19,10 +19,6 @@
 
 package org.apache.james.backend.rabbitmq;
 
-import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-
 import javax.inject.Inject;
 
 import org.apache.james.core.healthcheck.ComponentName;
@@ -38,7 +34,7 @@ public class RabbitMQHealthCheck implements HealthCheck {
     private final RabbitMQChannelPool rabbitChannelPoolImpl;
 
     @Inject
-    public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) 
throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
+    public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) {
         this.rabbitChannelPoolImpl = rabbitChannelPoolImpl;
     }
 
@@ -50,13 +46,12 @@ public class RabbitMQHealthCheck implements HealthCheck {
     @Override
     public Result check() {
         try {
-            return rabbitChannelPoolImpl.execute(channel -> {
-                    if (channel.isOpen()) {
-                        return Result.healthy(COMPONENT_NAME);
-                    }
-                    LOGGER.error("The created connection was not opened");
-                    return Result.unhealthy(COMPONENT_NAME);
-            });
+            if (rabbitChannelPoolImpl.tryConnection()) {
+                return Result.healthy(COMPONENT_NAME);
+            } else {
+                LOGGER.error("The created connection was not opened");
+                return Result.unhealthy(COMPONENT_NAME);
+            }
         } catch (Exception e) {
             LOGGER.error("Unhealthy RabbitMQ instances: could not establish a 
connection", e);
             return Result.unhealthy(COMPONENT_NAME);
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
index b5819e3..2a7da85 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -19,7 +19,10 @@
 
 package org.apache.james.backend.rabbitmq;
 
+import java.io.IOException;
+import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.PreDestroy;
@@ -28,8 +31,9 @@ import javax.inject.Inject;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
@@ -37,18 +41,16 @@ import reactor.rabbitmq.ReceiverOptions;
 
 public class SimpleChannelPool implements RabbitMQChannelPool {
     private final AtomicReference<Channel> channelReference;
-    private final AtomicReference<Connection> connectionReference;
-    private final RabbitMQConnectionFactory connectionFactory;
     private final Receiver rabbitFlux;
+    private final SimpleConnectionPool connectionPool;
 
     @Inject
     @VisibleForTesting
-    SimpleChannelPool(RabbitMQConnectionFactory factory) {
-        this.connectionFactory = factory;
-        this.connectionReference = new AtomicReference<>();
+    SimpleChannelPool(SimpleConnectionPool connectionPool) {
+        this.connectionPool = connectionPool;
         this.channelReference = new AtomicReference<>();
         this.rabbitFlux = RabbitFlux
-            .createReceiver(new 
ReceiverOptions().connectionMono(connectionFactory.connectionMono()));
+            .createReceiver(new 
ReceiverOptions().connectionMono(connectionPool.getResilientConnection()));
     }
 
     @Override
@@ -57,26 +59,22 @@ public class SimpleChannelPool implements 
RabbitMQChannelPool {
     }
 
     @Override
-    public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, 
E> f) throws E, ConnectionFailedException {
-        return f.execute(getResilientChannel());
+    public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws 
E, ConnectionFailedException {
+        return f.execute(getResilientChannel().block());
     }
 
     @Override
-    public synchronized <E extends Throwable> void execute(RabbitConsumer<E> 
f) throws E, ConnectionFailedException {
-        f.execute(getResilientChannel());
+    public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, 
ConnectionFailedException {
+        f.execute(getResilientChannel().block());
     }
 
     @PreDestroy
     @Override
-    public synchronized void close() {
+    public void close() {
         Optional.ofNullable(channelReference.get())
             .filter(Channel::isOpen)
             
.ifPresent(Throwing.<Channel>consumer(Channel::close).orDoNothing());
 
-        Optional.ofNullable(connectionReference.get())
-            .filter(Connection::isOpen)
-            
.ifPresent(Throwing.<Connection>consumer(Connection::close).orDoNothing());
-
         try {
             rabbitFlux.close();
         } catch (Throwable ignored) {
@@ -84,24 +82,47 @@ public class SimpleChannelPool implements 
RabbitMQChannelPool {
         }
     }
 
-    private Connection getResilientConnection() {
-        return connectionReference.updateAndGet(this::getOpenConnection);
+    private Mono<Channel> getResilientChannel() {
+        int numRetries = 100;
+        Duration initialDelay = Duration.ofMillis(100);
+        return Mono.defer(this::getOpenChannel)
+            .publishOn(Schedulers.elastic())
+            .retryBackoff(numRetries, initialDelay);
     }
 
-    private Connection getOpenConnection(Connection checkedConnection) {
-        return Optional.ofNullable(checkedConnection)
-            .filter(Connection::isOpen)
-            .orElseGet(connectionFactory::create);
+    private Mono<Channel> getOpenChannel() {
+        Channel previous = channelReference.get();
+        return Mono.justOrEmpty(previous)
+            .publishOn(Schedulers.elastic())
+            .filter(Channel::isOpen)
+            .switchIfEmpty(connectionPool.getResilientConnection()
+                .flatMap(connection -> 
Mono.fromCallable(connection::createChannel)))
+            .flatMap(current -> replaceCurrentChannel(previous, current))
+            .onErrorMap(t -> new RuntimeException("unable to create and 
register a new Channel", t));
     }
 
-    private Channel getResilientChannel() {
-        return 
channelReference.updateAndGet(Throwing.unaryOperator(this::getOpenChannel));
+    private Mono<Channel> replaceCurrentChannel(Channel previous, Channel 
current) {
+        if (channelReference.compareAndSet(previous, current)) {
+            return Mono.just(current);
+        } else {
+            try {
+                current.close();
+            } catch (IOException | TimeoutException e) {
+                //error below
+            }
+            return Mono.error(new RuntimeException("unable to create and 
register a new Channel"));
+        }
     }
 
-    private Channel getOpenChannel(Channel checkedChannel) {
-        return Optional.ofNullable(checkedChannel)
-            .filter(Channel::isOpen)
-            .orElseGet(Throwing.supplier(() -> 
getResilientConnection().createChannel())
-                .sneakyThrow());
+    @Override
+    public boolean tryConnection() {
+        try {
+            return connectionPool.tryConnection() &&
+                getOpenChannel()
+                    .blockOptional()
+                    .isPresent();
+        } catch (Throwable t) {
+            return false;
+        }
     }
 }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
new file mode 100644
index 0000000..519c1fa
--- /dev/null
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.rabbitmq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+import com.rabbitmq.client.Connection;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class SimpleConnectionPool implements AutoCloseable {
+    private final AtomicReference<Connection> connectionReference;
+    private final RabbitMQConnectionFactory connectionFactory;
+
+    @Inject
+    @VisibleForTesting
+    public SimpleConnectionPool(RabbitMQConnectionFactory factory) {
+        this.connectionFactory = factory;
+        this.connectionReference = new AtomicReference<>();
+    }
+
+    @PreDestroy
+    @Override
+    public void close() {
+        Optional.ofNullable(connectionReference.get())
+            .filter(Connection::isOpen)
+            
.ifPresent(Throwing.<Connection>consumer(Connection::close).orDoNothing());
+    }
+
+    public Mono<Connection> getResilientConnection() {
+        int numRetries = 100;
+        Duration initialDelay = Duration.ofMillis(100);
+        return Mono.defer(this::getOpenConnection)
+            .subscribeOn(Schedulers.elastic())
+            .retryBackoff(numRetries, initialDelay);
+    }
+
+    private Mono<Connection> getOpenConnection() {
+        Connection previous = connectionReference.get();
+        Connection current = Optional.ofNullable(previous)
+            .filter(Connection::isOpen)
+            .orElseGet(connectionFactory::create);
+        boolean updated = connectionReference.compareAndSet(previous, current);
+        if (updated) {
+            return Mono.just(current);
+        } else {
+            try {
+                current.close();
+            } catch (IOException e) {
+                //error below
+            }
+            return Mono.error(new RuntimeException("unable to create and 
register a new Connection"));
+        }
+    }
+
+    public boolean tryConnection() {
+        try {
+            return getOpenConnection()
+                .blockOptional(Duration.ofSeconds(1))
+                .isPresent();
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+}
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index 4e3f388..37083b4 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -21,6 +21,7 @@ package org.apache.james.backend.rabbitmq;
 import static 
org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
 
 import java.net.URISyntaxException;
+import java.util.Collection;
 
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
@@ -35,6 +36,7 @@ public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback,
     private DockerRabbitMQ rabbitMQ;
     private SimpleChannelPool simpleChannelPool;
     private RabbitMQConnectionFactory connectionFactory;
+    private SimpleConnectionPool connectionPool;
 
     @Override
     public void beforeAll(ExtensionContext context) {
@@ -45,12 +47,14 @@ public class RabbitMQExtension implements 
BeforeAllCallback, BeforeEachCallback,
     @Override
     public void beforeEach(ExtensionContext extensionContext) throws Exception 
{
         connectionFactory = createRabbitConnectionFactory();
-        this.simpleChannelPool = new SimpleChannelPool(connectionFactory);
+        connectionPool = new SimpleConnectionPool(connectionFactory);
+        this.simpleChannelPool = new SimpleChannelPool(connectionPool);
     }
 
     @Override
-    public void afterEach(ExtensionContext context) throws Exception {
+    public void afterEach(ExtensionContext context) {
         simpleChannelPool.close();
+        connectionPool.close();
     }
 
     @Override
@@ -72,12 +76,12 @@ public class RabbitMQExtension implements 
BeforeAllCallback, BeforeEachCallback,
         return simpleChannelPool;
     }
 
-    public DockerRabbitMQ getRabbitMQ() {
-        return rabbitMQ;
+    public SimpleConnectionPool getRabbitConnectionPool() {
+        return connectionPool;
     }
 
-    public RabbitMQConnectionFactory getConnectionFactory() {
-        return connectionFactory;
+    public DockerRabbitMQ getRabbitMQ() {
+        return rabbitMQ;
     }
 
     public RabbitMQManagementAPI managementAPI() throws Exception {
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 0a9fd41..041ded3 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -24,7 +24,7 @@ import java.util.Set;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.MetricFactory;
@@ -59,13 +59,13 @@ public class RabbitMQEventBus implements EventBus, 
Startable {
     private Sender sender;
 
     @Inject
-    public RabbitMQEventBus(RabbitMQConnectionFactory 
rabbitMQConnectionFactory, EventSerializer eventSerializer,
+    public RabbitMQEventBus(SimpleConnectionPool simpleConnectionPool, 
EventSerializer eventSerializer,
                      RetryBackoffConfiguration retryBackoff,
                      RoutingKeyConverter routingKeyConverter,
                      EventDeadLetters eventDeadLetters, MetricFactory 
metricFactory) {
         this.mailboxListenerExecutor = new 
MailboxListenerExecutor(metricFactory);
         this.eventBusId = EventBusId.random();
-        this.connectionMono = rabbitMQConnectionFactory.connectionMono();
+        this.connectionMono = simpleConnectionPool.getResilientConnection();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
         this.retryBackoff = retryBackoff;
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 7d70ea5..83eccba 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -51,7 +51,6 @@ import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
 import org.apache.james.backend.rabbitmq.RabbitMQManagementAPI;
 import org.apache.james.event.json.EventSerializer;
@@ -71,7 +70,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
 import com.rabbitmq.client.Connection;
-
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
@@ -94,16 +92,14 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     private RabbitMQEventBus eventBus2;
     private RabbitMQEventBus eventBus3;
     private Sender sender;
-    private RabbitMQConnectionFactory connectionFactory;
     private EventSerializer eventSerializer;
     private RoutingKeyConverter routingKeyConverter;
     private MemoryEventDeadLetters memoryEventDeadLetters;
+    private Mono<Connection> resilientConnection;
 
     @BeforeEach
     void setUp() {
-        connectionFactory = rabbitMQExtension.getConnectionFactory();
         memoryEventDeadLetters = new MemoryEventDeadLetters();
-        Mono<Connection> connectionMono = 
Mono.fromSupplier(connectionFactory::create).cache();
 
         TestId.Factory mailboxIdFactory = new TestId.Factory();
         eventSerializer = new EventSerializer(mailboxIdFactory, new 
TestMessageId.Factory());
@@ -116,7 +112,8 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         eventBus.start();
         eventBus2.start();
         eventBus3.start();
-        sender = RabbitFlux.createSender(new 
SenderOptions().connectionMono(connectionMono));
+        resilientConnection = 
rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+        sender = RabbitFlux.createSender(new 
SenderOptions().connectionMono(resilientConnection));
     }
 
     @AfterEach
@@ -132,7 +129,7 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     }
 
     private RabbitMQEventBus newEventBus() {
-        return new RabbitMQEventBus(connectionFactory, eventSerializer, 
RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, 
new NoopMetricFactory());
+        return new 
RabbitMQEventBus(rabbitMQExtension.getRabbitConnectionPool(), eventSerializer, 
RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, 
new NoopMetricFactory());
     }
 
     @Override
@@ -232,30 +229,31 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + 
"-workQueue";
+        private Sender sender1;
 
         @BeforeEach
         void setUp() {
-            createQueue();
-        }
+            SenderOptions senderOption = new 
SenderOptions().connectionMono(resilientConnection);
+            sender1 = RabbitFlux.createSender(senderOption);
 
-        private void createQueue() {
-            SenderOptions senderOption = new SenderOptions()
-                .connectionMono(Mono.fromSupplier(connectionFactory::create));
-            Sender sender = RabbitFlux.createSender(senderOption);
-
-            
sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+            
sender1.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS))
                 .block();
-            sender.bind(BindingSpecification.binding()
+            sender1.bind(BindingSpecification.binding()
                 .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
                 .queue(MAILBOX_WORK_QUEUE_NAME)
                 .routingKey(EMPTY_ROUTING_KEY))
                 .block();
         }
 
+        @AfterEach
+        void tearDown() {
+            sender1.close();
+        }
+
         @Test
         void dispatchShouldPublishSerializedEventToRabbitMQ() {
             eventBus.dispatch(EVENT, NO_KEYS).block();
@@ -271,15 +269,14 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         }
 
         private Event dequeueEvent() {
-            RabbitMQConnectionFactory connectionFactory = 
rabbitMQExtension.getConnectionFactory();
-            Receiver receiver = RabbitFlux.createReceiver(new 
ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
-
-            byte[] eventInBytes = 
receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
-                .blockFirst()
-                .getBody();
+            try (Receiver receiver = RabbitFlux.createReceiver(new 
ReceiverOptions().connectionMono(resilientConnection))) {
+                byte[] eventInBytes = 
receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+                    .blockFirst()
+                    .getBody();
 
-            return eventSerializer.fromJson(new String(eventInBytes, 
StandardCharsets.UTF_8))
-                .get();
+                return eventSerializer.fromJson(new String(eventInBytes, 
StandardCharsets.UTF_8))
+                    .get();
+            }
         }
     }
 
diff --git 
a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
 
b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 14b8592..a74b8f1 100644
--- 
a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ 
b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -24,7 +24,7 @@ import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
 import org.apache.james.core.quota.QuotaCount;
 import org.apache.james.core.quota.QuotaSize;
 import org.apache.james.event.json.EventSerializer;
@@ -61,6 +61,7 @@ public class RabbitMQEventBusHostSystem extends 
JamesImapHostSystem {
     private final DockerRabbitMQ dockerRabbitMQ;
     private RabbitMQEventBus eventBus;
     private InMemoryIntegrationResources.Resources resources;
+    private SimpleConnectionPool connectionPool;
 
     RabbitMQEventBusHostSystem(DockerRabbitMQ dockerRabbitMQ) {
         this.dockerRabbitMQ = dockerRabbitMQ;
@@ -70,6 +71,7 @@ public class RabbitMQEventBusHostSystem extends 
JamesImapHostSystem {
     public void beforeTest() throws Exception {
         super.beforeTest();
 
+        connectionPool = new 
SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
         eventBus = createEventBus();
         eventBus.start();
 
@@ -95,14 +97,14 @@ public class RabbitMQEventBusHostSystem extends 
JamesImapHostSystem {
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
         EventSerializer eventSerializer = new 
EventSerializer(mailboxIdFactory, messageIdFactory);
         RoutingKeyConverter routingKeyConverter = new 
RoutingKeyConverter(ImmutableSet.of(new 
MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
-        RabbitMQConnectionFactory rabbitConnectionFactory = 
dockerRabbitMQ.createRabbitConnectionFactory();
-        return new RabbitMQEventBus(rabbitConnectionFactory, eventSerializer, 
RetryBackoffConfiguration.DEFAULT,
+        return new RabbitMQEventBus(connectionPool, eventSerializer, 
RetryBackoffConfiguration.DEFAULT,
             routingKeyConverter, new MemoryEventDeadLetters(), new 
NoopMetricFactory());
     }
 
     @Override
     public void afterTest() {
         eventBus.stop();
+        connectionPool.close();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to