This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c7c3119ef35e607f217e68634e7a9794188833d6
Author: Gautier DI FOLCO <gdifo...@linagora.com>
AuthorDate: Thu Sep 26 17:01:30 2019 +0200

    JAMES-2899 Stop leaking RabbitMQWorkQueue in DistributedTaskManagerTest
---
 .../distributed/RabbitMQWorkQueue.java             |  1 -
 .../distributed/RabbitMQWorkQueueSupplier.scala    |  4 +--
 .../distributed/DistributedTaskManagerTest.java    | 41 ++++++++++++++++++++--
 3 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index b88eaa8..5878091 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -189,7 +189,6 @@ public class RabbitMQWorkQueue implements WorkQueue, 
Startable {
     @Override
     @PreDestroy
     public void close() {
-        System.out.println("close");
         Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
         
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
diff --git 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index 5e26147..f9977a4 100644
--- 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -23,13 +23,13 @@ import javax.inject.Inject
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
+import org.apache.james.task.SerialTaskManagerWorker
 import org.apache.james.task.eventsourcing.{WorkQueueSupplier, 
WorkerStatusListener}
-import org.apache.james.task.{SerialTaskManagerWorker, WorkQueue}
 
 @Inject
 class RabbitMQWorkQueueSupplier(private val rabbitMQConnectionPool: 
SimpleConnectionPool,
                                 private val jsonTaskSerializer: 
JsonTaskSerializer) extends WorkQueueSupplier {
-  override def apply(eventSourcingSystem: EventSourcingSystem): WorkQueue = {
+  override def apply(eventSourcingSystem: EventSourcingSystem): 
RabbitMQWorkQueue = {
     val listener = WorkerStatusListener(eventSourcingSystem)
     val worker = new SerialTaskManagerWorker(listener)
     val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, 
rabbitMQConnectionPool, jsonTaskSerializer)
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 2554d4f..1010524 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.james.task.eventsourcing.distributed;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -33,6 +34,8 @@ import 
org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import 
org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
 import 
org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
@@ -50,6 +53,7 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
+import org.apache.james.task.WorkQueue;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
@@ -58,6 +62,7 @@ import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetai
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -68,6 +73,28 @@ import com.github.steveash.guavate.Guavate;
 @ExtendWith(CountDownLatchExtension.class)
 class DistributedTaskManagerTest implements TaskManagerContract {
 
+    private static class TrackedRabbitMQWorkQueueSupplier implements 
WorkQueueSupplier {
+        private final List<RabbitMQWorkQueue> workQueues;
+        private final RabbitMQWorkQueueSupplier supplier;
+
+        TrackedRabbitMQWorkQueueSupplier(SimpleConnectionPool 
rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
+            workQueues = new ArrayList<>();
+            supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, 
taskSerializer);
+        }
+
+        @Override
+        public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
+            RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem);
+            workQueues.add(workQueue);
+            return workQueue;
+        }
+
+        void stopWorkQueues() {
+            workQueues.forEach(RabbitMQWorkQueue::close);
+            workQueues.clear();
+        }
+    }
+
     private static final JsonTaskSerializer TASK_SERIALIZER = new 
JsonTaskSerializer(
         TestTaskDTOModules.COMPLETED_TASK_MODULE,
         TestTaskDTOModules.FAILED_TASK_MODULE,
@@ -96,13 +123,21 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     private final CassandraTaskExecutionDetailsProjectionDAO 
cassandraTaskExecutionDetailsProjectionDAO = new 
CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), 
cassandra.getTypesProvider());
     private final TaskExecutionDetailsProjection executionDetailsProjection = 
new 
CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
 
-    private WorkQueueSupplier workQueueSupplier;
+    private TrackedRabbitMQWorkQueueSupplier workQueueSupplier;
     private EventStore eventStore;
+    private List<RabbitMQTerminationSubscriber> terminationSubscribers;
 
     @BeforeEach
     void setUp(EventStore eventStore) {
-        workQueueSupplier = new 
RabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), 
TASK_SERIALIZER);
+        workQueueSupplier = new 
TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), 
TASK_SERIALIZER);
         this.eventStore = eventStore;
+        terminationSubscribers = new ArrayList<>();
+    }
+
+    @AfterEach
+    void tearDown() {
+        terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close);
+        workQueueSupplier.stopWorkQueues();
     }
 
     public EventSourcingTaskManager taskManager() {
@@ -111,6 +146,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
     private EventSourcingTaskManager taskManager(Hostname hostname) {
         RabbitMQTerminationSubscriber terminationSubscriber = new 
RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(), 
EVENT_SERIALIZER);
+        terminationSubscribers.add(terminationSubscriber);
         terminationSubscriber.start();
         return new EventSourcingTaskManager(workQueueSupplier, eventStore, 
executionDetailsProjection, hostname, terminationSubscriber);
     }
@@ -254,5 +290,4 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
                 });
         }
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to