This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ca12220c101ac6cdce4377fe1de4c7d975e4c205 Author: Matthieu Baechler <[email protected]> AuthorDate: Tue Mar 12 11:11:24 2019 +0100 JAMES-2683 Plug the thread leak related to Connection and Channel management Make sure only SimpleConnectionPool can create a connection and always call the close method when done with the pool --- .../backend/rabbitmq/RabbitMQChannelPool.java | 2 + .../rabbitmq/RabbitMQConnectionFactory.java | 28 +------ .../backend/rabbitmq/RabbitMQHealthCheck.java | 19 ++--- .../james/backend/rabbitmq/SimpleChannelPool.java | 79 ++++++++++++------- .../backend/rabbitmq/SimpleConnectionPool.java | 90 ++++++++++++++++++++++ .../james/backend/rabbitmq/RabbitMQExtension.java | 16 ++-- .../james/mailbox/events/RabbitMQEventBus.java | 6 +- .../james/mailbox/events/RabbitMQEventBusTest.java | 45 +++++------ .../rabbitmq/host/RabbitMQEventBusHostSystem.java | 8 +- 9 files changed, 191 insertions(+), 102 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java index f26efbd..e8fef49 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java @@ -49,5 +49,7 @@ public interface RabbitMQChannelPool { Flux<AcknowledgableDelivery> receive(String queueName); + boolean tryConnection(); + void close() throws Exception; } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java index 151e983..1f0e329 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java @@ -19,37 +19,15 @@ package org.apache.james.backend.rabbitmq; import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.Callable; import javax.inject.Inject; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; - import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class RabbitMQConnectionFactory { - private class ConnectionCallable implements Callable<Connection> { - private final ConnectionFactory connectionFactory; - private Optional<Connection> connection; - - ConnectionCallable(ConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; - connection = Optional.empty(); - } - - @Override - public synchronized Connection call() throws Exception { - if (connection.map(Connection::isOpen).orElse(false)) { - return connection.get(); - } - Connection newConnection = connectionFactory.newConnection(); - connection = Optional.of(newConnection); - return newConnection; - } - } private final ConnectionFactory connectionFactory; @@ -71,12 +49,12 @@ public class RabbitMQConnectionFactory { } } - public Connection create() { + Connection create() { return connectionMono().block(); } - public Mono<Connection> connectionMono() { - return Mono.fromCallable(new ConnectionCallable(connectionFactory)) + Mono<Connection> connectionMono() { + return Mono.fromCallable(connectionFactory::newConnection) .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())) .publishOn(Schedulers.elastic()); } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java index 17a7ead..0290a47 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java @@ -19,10 +19,6 @@ package org.apache.james.backend.rabbitmq; -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; - import javax.inject.Inject; import org.apache.james.core.healthcheck.ComponentName; @@ -38,7 +34,7 @@ public class RabbitMQHealthCheck implements HealthCheck { private final RabbitMQChannelPool rabbitChannelPoolImpl; @Inject - public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) { this.rabbitChannelPoolImpl = rabbitChannelPoolImpl; } @@ -50,13 +46,12 @@ public class RabbitMQHealthCheck implements HealthCheck { @Override public Result check() { try { - return rabbitChannelPoolImpl.execute(channel -> { - if (channel.isOpen()) { - return Result.healthy(COMPONENT_NAME); - } - LOGGER.error("The created connection was not opened"); - return Result.unhealthy(COMPONENT_NAME); - }); + if (rabbitChannelPoolImpl.tryConnection()) { + return Result.healthy(COMPONENT_NAME); + } else { + LOGGER.error("The created connection was not opened"); + return Result.unhealthy(COMPONENT_NAME); + } } catch (Exception e) { LOGGER.error("Unhealthy RabbitMQ instances: could not establish a connection", e); return Result.unhealthy(COMPONENT_NAME); diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java index b5819e3..2a7da85 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java @@ -19,7 +19,10 @@ package org.apache.james.backend.rabbitmq; +import java.io.IOException; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.PreDestroy; @@ -28,8 +31,9 @@ import javax.inject.Inject; import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; @@ -37,18 +41,16 @@ import reactor.rabbitmq.ReceiverOptions; public class SimpleChannelPool implements RabbitMQChannelPool { private final AtomicReference<Channel> channelReference; - private final AtomicReference<Connection> connectionReference; - private final RabbitMQConnectionFactory connectionFactory; private final Receiver rabbitFlux; + private final SimpleConnectionPool connectionPool; @Inject @VisibleForTesting - SimpleChannelPool(RabbitMQConnectionFactory factory) { - this.connectionFactory = factory; - this.connectionReference = new AtomicReference<>(); + SimpleChannelPool(SimpleConnectionPool connectionPool) { + this.connectionPool = connectionPool; this.channelReference = new AtomicReference<>(); this.rabbitFlux = RabbitFlux - .createReceiver(new ReceiverOptions().connectionMono(connectionFactory.connectionMono())); + .createReceiver(new ReceiverOptions().connectionMono(connectionPool.getResilientConnection())); } @Override @@ -57,26 +59,22 @@ public class SimpleChannelPool implements RabbitMQChannelPool { } @Override - public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { - return f.execute(getResilientChannel()); + public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { + return f.execute(getResilientChannel().block()); } @Override - public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { - f.execute(getResilientChannel()); + public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { + f.execute(getResilientChannel().block()); } @PreDestroy @Override - public synchronized void close() { + public void close() { Optional.ofNullable(channelReference.get()) .filter(Channel::isOpen) .ifPresent(Throwing.<Channel>consumer(Channel::close).orDoNothing()); - Optional.ofNullable(connectionReference.get()) - .filter(Connection::isOpen) - .ifPresent(Throwing.<Connection>consumer(Connection::close).orDoNothing()); - try { rabbitFlux.close(); } catch (Throwable ignored) { @@ -84,24 +82,47 @@ public class SimpleChannelPool implements RabbitMQChannelPool { } } - private Connection getResilientConnection() { - return connectionReference.updateAndGet(this::getOpenConnection); + private Mono<Channel> getResilientChannel() { + int numRetries = 100; + Duration initialDelay = Duration.ofMillis(100); + return Mono.defer(this::getOpenChannel) + .publishOn(Schedulers.elastic()) + .retryBackoff(numRetries, initialDelay); } - private Connection getOpenConnection(Connection checkedConnection) { - return Optional.ofNullable(checkedConnection) - .filter(Connection::isOpen) - .orElseGet(connectionFactory::create); + private Mono<Channel> getOpenChannel() { + Channel previous = channelReference.get(); + return Mono.justOrEmpty(previous) + .publishOn(Schedulers.elastic()) + .filter(Channel::isOpen) + .switchIfEmpty(connectionPool.getResilientConnection() + .flatMap(connection -> Mono.fromCallable(connection::createChannel))) + .flatMap(current -> replaceCurrentChannel(previous, current)) + .onErrorMap(t -> new RuntimeException("unable to create and register a new Channel", t)); } - private Channel getResilientChannel() { - return channelReference.updateAndGet(Throwing.unaryOperator(this::getOpenChannel)); + private Mono<Channel> replaceCurrentChannel(Channel previous, Channel current) { + if (channelReference.compareAndSet(previous, current)) { + return Mono.just(current); + } else { + try { + current.close(); + } catch (IOException | TimeoutException e) { + //error below + } + return Mono.error(new RuntimeException("unable to create and register a new Channel")); + } } - private Channel getOpenChannel(Channel checkedChannel) { - return Optional.ofNullable(checkedChannel) - .filter(Channel::isOpen) - .orElseGet(Throwing.supplier(() -> getResilientConnection().createChannel()) - .sneakyThrow()); + @Override + public boolean tryConnection() { + try { + return connectionPool.tryConnection() && + getOpenChannel() + .blockOptional() + .isPresent(); + } catch (Throwable t) { + return false; + } } } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java new file mode 100644 index 0000000..519c1fa --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java @@ -0,0 +1,90 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import java.io.IOException; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; +import com.rabbitmq.client.Connection; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class SimpleConnectionPool implements AutoCloseable { + private final AtomicReference<Connection> connectionReference; + private final RabbitMQConnectionFactory connectionFactory; + + @Inject + @VisibleForTesting + public SimpleConnectionPool(RabbitMQConnectionFactory factory) { + this.connectionFactory = factory; + this.connectionReference = new AtomicReference<>(); + } + + @PreDestroy + @Override + public void close() { + Optional.ofNullable(connectionReference.get()) + .filter(Connection::isOpen) + .ifPresent(Throwing.<Connection>consumer(Connection::close).orDoNothing()); + } + + public Mono<Connection> getResilientConnection() { + int numRetries = 100; + Duration initialDelay = Duration.ofMillis(100); + return Mono.defer(this::getOpenConnection) + .subscribeOn(Schedulers.elastic()) + .retryBackoff(numRetries, initialDelay); + } + + private Mono<Connection> getOpenConnection() { + Connection previous = connectionReference.get(); + Connection current = Optional.ofNullable(previous) + .filter(Connection::isOpen) + .orElseGet(connectionFactory::create); + boolean updated = connectionReference.compareAndSet(previous, current); + if (updated) { + return Mono.just(current); + } else { + try { + current.close(); + } catch (IOException e) { + //error below + } + return Mono.error(new RuntimeException("unable to create and register a new Connection")); + } + } + + public boolean tryConnection() { + try { + return getOpenConnection() + .blockOptional(Duration.ofSeconds(1)) + .isPresent(); + } catch (Throwable t) { + return false; + } + } +} diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java index 4e3f388..37083b4 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java @@ -21,6 +21,7 @@ package org.apache.james.backend.rabbitmq; import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL; import java.net.URISyntaxException; +import java.util.Collection; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; @@ -35,6 +36,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, private DockerRabbitMQ rabbitMQ; private SimpleChannelPool simpleChannelPool; private RabbitMQConnectionFactory connectionFactory; + private SimpleConnectionPool connectionPool; @Override public void beforeAll(ExtensionContext context) { @@ -45,12 +47,14 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { connectionFactory = createRabbitConnectionFactory(); - this.simpleChannelPool = new SimpleChannelPool(connectionFactory); + connectionPool = new SimpleConnectionPool(connectionFactory); + this.simpleChannelPool = new SimpleChannelPool(connectionPool); } @Override - public void afterEach(ExtensionContext context) throws Exception { + public void afterEach(ExtensionContext context) { simpleChannelPool.close(); + connectionPool.close(); } @Override @@ -72,12 +76,12 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, return simpleChannelPool; } - public DockerRabbitMQ getRabbitMQ() { - return rabbitMQ; + public SimpleConnectionPool getRabbitConnectionPool() { + return connectionPool; } - public RabbitMQConnectionFactory getConnectionFactory() { - return connectionFactory; + public DockerRabbitMQ getRabbitMQ() { + return rabbitMQ; } public RabbitMQManagementAPI managementAPI() throws Exception { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 0a9fd41..041ded3 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -24,7 +24,7 @@ import java.util.Set; import javax.annotation.PreDestroy; import javax.inject.Inject; -import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; +import org.apache.james.backend.rabbitmq.SimpleConnectionPool; import org.apache.james.event.json.EventSerializer; import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.MetricFactory; @@ -59,13 +59,13 @@ public class RabbitMQEventBus implements EventBus, Startable { private Sender sender; @Inject - public RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer, + public RabbitMQEventBus(SimpleConnectionPool simpleConnectionPool, EventSerializer eventSerializer, RetryBackoffConfiguration retryBackoff, RoutingKeyConverter routingKeyConverter, EventDeadLetters eventDeadLetters, MetricFactory metricFactory) { this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory); this.eventBusId = EventBusId.random(); - this.connectionMono = rabbitMQConnectionFactory.connectionMono(); + this.connectionMono = simpleConnectionPool.getResilientConnection(); this.eventSerializer = eventSerializer; this.routingKeyConverter = routingKeyConverter; this.retryBackoff = retryBackoff; diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 7d70ea5..83eccba 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -51,7 +51,6 @@ import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; -import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.RabbitMQExtension; import org.apache.james.backend.rabbitmq.RabbitMQManagementAPI; import org.apache.james.event.json.EventSerializer; @@ -71,7 +70,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.stubbing.Answer; import com.rabbitmq.client.Connection; - import reactor.core.publisher.Mono; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; @@ -94,16 +92,14 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, private RabbitMQEventBus eventBus2; private RabbitMQEventBus eventBus3; private Sender sender; - private RabbitMQConnectionFactory connectionFactory; private EventSerializer eventSerializer; private RoutingKeyConverter routingKeyConverter; private MemoryEventDeadLetters memoryEventDeadLetters; + private Mono<Connection> resilientConnection; @BeforeEach void setUp() { - connectionFactory = rabbitMQExtension.getConnectionFactory(); memoryEventDeadLetters = new MemoryEventDeadLetters(); - Mono<Connection> connectionMono = Mono.fromSupplier(connectionFactory::create).cache(); TestId.Factory mailboxIdFactory = new TestId.Factory(); eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory()); @@ -116,7 +112,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.start(); eventBus2.start(); eventBus3.start(); - sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)); + resilientConnection = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); + sender = RabbitFlux.createSender(new SenderOptions().connectionMono(resilientConnection)); } @AfterEach @@ -132,7 +129,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } private RabbitMQEventBus newEventBus() { - return new RabbitMQEventBus(connectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory()); + return new RabbitMQEventBus(rabbitMQExtension.getRabbitConnectionPool(), eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory()); } @Override @@ -232,30 +229,31 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, @Nested class PublishingTest { private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue"; + private Sender sender1; @BeforeEach void setUp() { - createQueue(); - } + SenderOptions senderOption = new SenderOptions().connectionMono(resilientConnection); + sender1 = RabbitFlux.createSender(senderOption); - private void createQueue() { - SenderOptions senderOption = new SenderOptions() - .connectionMono(Mono.fromSupplier(connectionFactory::create)); - Sender sender = RabbitFlux.createSender(senderOption); - - sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME) + sender1.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) .arguments(NO_ARGUMENTS)) .block(); - sender.bind(BindingSpecification.binding() + sender1.bind(BindingSpecification.binding() .exchange(MAILBOX_EVENT_EXCHANGE_NAME) .queue(MAILBOX_WORK_QUEUE_NAME) .routingKey(EMPTY_ROUTING_KEY)) .block(); } + @AfterEach + void tearDown() { + sender1.close(); + } + @Test void dispatchShouldPublishSerializedEventToRabbitMQ() { eventBus.dispatch(EVENT, NO_KEYS).block(); @@ -271,15 +269,14 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } private Event dequeueEvent() { - RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory(); - Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create()))); - - byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME) - .blockFirst() - .getBody(); + try (Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(resilientConnection))) { + byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME) + .blockFirst() + .getBody(); - return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8)) - .get(); + return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8)) + .get(); + } } } diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java index 14b8592..a74b8f1 100644 --- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java +++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java @@ -24,7 +24,7 @@ import java.net.URISyntaxException; import java.util.concurrent.TimeUnit; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; -import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; +import org.apache.james.backend.rabbitmq.SimpleConnectionPool; import org.apache.james.core.quota.QuotaCount; import org.apache.james.core.quota.QuotaSize; import org.apache.james.event.json.EventSerializer; @@ -61,6 +61,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { private final DockerRabbitMQ dockerRabbitMQ; private RabbitMQEventBus eventBus; private InMemoryIntegrationResources.Resources resources; + private SimpleConnectionPool connectionPool; RabbitMQEventBusHostSystem(DockerRabbitMQ dockerRabbitMQ) { this.dockerRabbitMQ = dockerRabbitMQ; @@ -70,6 +71,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { public void beforeTest() throws Exception { super.beforeTest(); + connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory()); eventBus = createEventBus(); eventBus.start(); @@ -95,14 +97,14 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory(); EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory); RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory))); - RabbitMQConnectionFactory rabbitConnectionFactory = dockerRabbitMQ.createRabbitConnectionFactory(); - return new RabbitMQEventBus(rabbitConnectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT, + return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(), new NoopMetricFactory()); } @Override public void afterTest() { eventBus.stop(); + connectionPool.close(); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
