Repository: james-project Updated Branches: refs/heads/master d87517fb8 -> 790a203ce
JAMES-2551 Closing connections of RabbitMQChannelPool when shutting down this pool Before that, only channels were closed, this makes much error logs in tests, because connections are still not closed after each test. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e410971d Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e410971d Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e410971d Branch: refs/heads/master Commit: e410971d61e78cb9acfd42cf016fcf75bc8baeec Parents: d87517f Author: duc <[email protected]> Authored: Mon Oct 8 16:50:41 2018 +0700 Committer: tran tien duc <[email protected]> Committed: Thu Oct 25 10:10:42 2018 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/RabbitChannelPoolImpl.java | 19 +++++++++----- .../rabbitmq/RabbitMQHealthCheckTest.java | 27 +++++--------------- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 14 +++------- 3 files changed, 23 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/e410971d/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java index fcfdcba..ed58f6c 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java @@ -19,6 +19,7 @@ package org.apache.james.backend.rabbitmq; +import java.io.IOException; import java.util.function.Supplier; import javax.annotation.PreDestroy; @@ -38,16 +39,16 @@ import com.rabbitmq.client.Connection; public class RabbitChannelPoolImpl implements RabbitMQChannelPool { private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { - private final Supplier<Connection> connection; + private final Supplier<Connection> rabbitConnection; public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory factory) { - this.connection = MemoizedSupplier.of( - Throwing.supplier(() -> factory.create()).sneakyThrow()); + this.rabbitConnection = MemoizedSupplier.of( + Throwing.supplier(factory::create).sneakyThrow()); } @Override public Channel create() throws Exception { - return connection.get() + return rabbitConnection.get() .createChannel(); } @@ -64,11 +65,12 @@ public class RabbitChannelPoolImpl implements RabbitMQChannelPool { } private final ObjectPool<Channel> pool; + private final ChannelBasePooledObjectFactory pooledObjectFactory; @Inject public RabbitChannelPoolImpl(RabbitMQConnectionFactory factory) { - pool = new GenericObjectPool<>( - new ChannelBasePooledObjectFactory(factory)); + pooledObjectFactory = new ChannelBasePooledObjectFactory(factory); + pool = new GenericObjectPool<>(pooledObjectFactory); } @Override @@ -92,8 +94,11 @@ public class RabbitChannelPoolImpl implements RabbitMQChannelPool { } @PreDestroy - public void close() { + public void close() throws IOException { pool.close(); + pooledObjectFactory.rabbitConnection + .get() + .close(); } private Channel borrowChannel() { http://git-wip-us.apache.org/repos/asf/james-project/blob/e410971d/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java index b88ef8f..ff85217 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java @@ -19,36 +19,23 @@ package org.apache.james.backend.rabbitmq; -import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL; import static org.assertj.core.api.Assertions.assertThat; -import java.util.concurrent.Executors; - import org.apache.james.core.healthcheck.Result; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; +import org.junit.jupiter.api.extension.RegisterExtension; -@ExtendWith(RabbitMQExtension.class) class RabbitMQHealthCheckTest { - private RabbitMQHealthCheck healthCheck; - @BeforeEach - void setUp(DockerRabbitMQ rabbitMQ) throws Exception { + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension(); - RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() - .amqpUri(rabbitMQ.amqpUri()) - .managementUri(rabbitMQ.managementUri()) - .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) - .build(); - - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, - new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + private RabbitMQHealthCheck healthCheck; - healthCheck = new RabbitMQHealthCheck( - new RabbitChannelPoolImpl(rabbitMQConnectionFactory)); + @BeforeEach + void setUp() throws Exception { + healthCheck = new RabbitMQHealthCheck(rabbitMQExtension.getRabbitChannelPool()); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/e410971d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index bb06e36..909e787 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; import java.time.Instant; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.IntStream; @@ -36,9 +35,7 @@ import javax.mail.internet.MimeMessage; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; -import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.RabbitMQExtension; -import org.apache.james.backend.rabbitmq.SimpleChannelPool; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -63,13 +60,10 @@ import org.apache.james.util.streams.Iterators; import org.apache.mailet.Mail; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import com.github.fge.lambdas.Throwing; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -@ExtendWith(RabbitMQExtension.class) public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static final int THREE_BUCKET_COUNT = 3; @@ -88,6 +82,9 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ CassandraMailQueueViewModule.MODULE, CassandraEventStoreModule.MODULE)); + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension(); + private RabbitMQMailQueueFactory mailQueueFactory; private UpdatableTickingClock clock; private RabbitMQMailQueue mailQueue; @@ -124,10 +121,7 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, - new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); - - RabbitClient rabbitClient = new RabbitClient(new SimpleChannelPool(rabbitMQConnectionFactory)); + RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( metricTestSystem.getSpyMetricFactory(), metricTestSystem.getSpyGaugeRegistry(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
