This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c7c3119ef35e607f217e68634e7a9794188833d6 Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Thu Sep 26 17:01:30 2019 +0200 JAMES-2899 Stop leaking RabbitMQWorkQueue in DistributedTaskManagerTest --- .../distributed/RabbitMQWorkQueue.java | 1 - .../distributed/RabbitMQWorkQueueSupplier.scala | 4 +-- .../distributed/DistributedTaskManagerTest.java | 41 ++++++++++++++++++++-- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index b88eaa8..5878091 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -189,7 +189,6 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { @Override @PreDestroy public void close() { - System.out.println("close"); Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose); Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose); Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose); diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala index 5e26147..f9977a4 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala @@ -23,13 +23,13 @@ import javax.inject.Inject import org.apache.james.backends.rabbitmq.SimpleConnectionPool import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.server.task.json.JsonTaskSerializer +import org.apache.james.task.SerialTaskManagerWorker import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener} -import org.apache.james.task.{SerialTaskManagerWorker, WorkQueue} @Inject class RabbitMQWorkQueueSupplier(private val rabbitMQConnectionPool: SimpleConnectionPool, private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier { - override def apply(eventSourcingSystem: EventSourcingSystem): WorkQueue = { + override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = { val listener = WorkerStatusListener(eventSourcingSystem) val worker = new SerialTaskManagerWorker(listener) val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer) diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index 2554d4f..1010524 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -21,6 +21,7 @@ package org.apache.james.task.eventsourcing.distributed; import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -33,6 +34,8 @@ import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; +import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule; @@ -50,6 +53,7 @@ import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; +import org.apache.james.task.WorkQueue; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.WorkQueueSupplier; @@ -58,6 +62,7 @@ import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetai import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule; import org.awaitility.Awaitility; import org.awaitility.Duration; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -68,6 +73,28 @@ import com.github.steveash.guavate.Guavate; @ExtendWith(CountDownLatchExtension.class) class DistributedTaskManagerTest implements TaskManagerContract { + private static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier { + private final List<RabbitMQWorkQueue> workQueues; + private final RabbitMQWorkQueueSupplier supplier; + + TrackedRabbitMQWorkQueueSupplier(SimpleConnectionPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) { + workQueues = new ArrayList<>(); + supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, taskSerializer); + } + + @Override + public WorkQueue apply(EventSourcingSystem eventSourcingSystem) { + RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem); + workQueues.add(workQueue); + return workQueue; + } + + void stopWorkQueues() { + workQueues.forEach(RabbitMQWorkQueue::close); + workQueues.clear(); + } + } + private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer( TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.FAILED_TASK_MODULE, @@ -96,13 +123,21 @@ class DistributedTaskManagerTest implements TaskManagerContract { private final CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider()); private final TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO); - private WorkQueueSupplier workQueueSupplier; + private TrackedRabbitMQWorkQueueSupplier workQueueSupplier; private EventStore eventStore; + private List<RabbitMQTerminationSubscriber> terminationSubscribers; @BeforeEach void setUp(EventStore eventStore) { - workQueueSupplier = new RabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), TASK_SERIALIZER); + workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), TASK_SERIALIZER); this.eventStore = eventStore; + terminationSubscribers = new ArrayList<>(); + } + + @AfterEach + void tearDown() { + terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close); + workQueueSupplier.stopWorkQueues(); } public EventSourcingTaskManager taskManager() { @@ -111,6 +146,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { private EventSourcingTaskManager taskManager(Hostname hostname) { RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), EVENT_SERIALIZER); + terminationSubscribers.add(terminationSubscriber); terminationSubscriber.start(); return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber); } @@ -254,5 +290,4 @@ class DistributedTaskManagerTest implements TaskManagerContract { }); } } - } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org