This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4ba2d6c81ef09451dc6253f6d0852b67d10c4376 Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Thu Sep 26 17:00:34 2019 +0200 JAMES-2899 Stop leaking receivers in RabbitMQWorkQueue --- .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index f0869c3..b88eaa8 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -81,6 +81,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { private RabbitMQExclusiveConsumer receiver; private UnicastProcessor<TaskId> sendCancelRequestsQueue; private Disposable sendCancelRequestsQueueHandle; + private Disposable receiverHandle; + private Disposable cancelRequestListenerHandle; public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) { this.worker = worker; @@ -105,7 +107,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { private void consumeWorkqueue() { receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); - receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions()) + receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions()) .subscribeOn(Schedulers.boundedElastic()) .flatMap(this::executeTask) .subscribe(); @@ -145,7 +147,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } private void registerCancelRequestsListener(String queueName) { - RabbitFlux + cancelRequestListenerHandle = RabbitFlux .createReceiver(new ReceiverOptions().connectionMono(connectionMono)) .consumeAutoAck(queueName) .subscribeOn(Schedulers.boundedElastic()) @@ -187,7 +189,10 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { @Override @PreDestroy public void close() { + System.out.println("close"); + Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose); Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose); + Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose); channelPool.close(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org