This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 70fd3f15fccfc427b7722b764ebf3918eeba9e64 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Mon Oct 14 16:49:57 2019 +0200 JAMES-2813 plug Schedulers leaks --- .../distributed/RabbitMQTerminationSubscriber.java | 10 +++++++--- .../task/eventsourcing/distributed/RabbitMQWorkQueue.java | 13 ++++++++++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 0d97c2b..d2bdeeb 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -71,6 +71,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta private DirectProcessor<Event> listener; private Disposable sendQueueHandle; private Disposable listenQueueHandle; + private Receiver listenerReceiver; + private Sender sender; @Inject public RabbitMQTerminationSubscriber(SimpleConnectionPool simpleConnectionPool, JsonEventSerializer serializer) { @@ -81,7 +83,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta } public void start() { - Sender sender = channelPool.createSender(); + sender = channelPool.createSender(); sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block(); @@ -92,9 +94,9 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta .subscribeOn(Schedulers.boundedElastic()) .subscribe(); - Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + listenerReceiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); listener = DirectProcessor.create(); - listenQueueHandle = receiver + listenQueueHandle = listenerReceiver .consumeAutoAck(queueName) .subscribeOn(Schedulers.boundedElastic()) .concatMap(this::toEvent) @@ -135,6 +137,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta public void close() { Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose); Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose); + Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close); + Optional.ofNullable(sender).ifPresent(Sender::close); channelPool.close(); } } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 5516112..e736798 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -52,6 +52,7 @@ import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Receiver; import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; @@ -79,6 +80,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { private Disposable sendCancelRequestsQueueHandle; private Disposable receiverHandle; private Disposable cancelRequestListenerHandle; + private Sender cancelRequestSender; + private Receiver cancelRequestListener; public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) { this.worker = worker; @@ -140,7 +143,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } void listenToCancelRequests() { - Sender cancelRequestSender = channelPool.createSender(); + cancelRequestSender = channelPool.createSender(); String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString(); cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block(); @@ -156,8 +159,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } private void registerCancelRequestsListener(String queueName) { - cancelRequestListenerHandle = RabbitFlux - .createReceiver(new ReceiverOptions().connectionMono(connectionMono)) + cancelRequestListener = RabbitFlux + .createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + cancelRequestListenerHandle = cancelRequestListener .consumeAutoAck(queueName) .subscribeOn(Schedulers.boundedElastic()) .map(this::readCancelRequestMessage) @@ -201,6 +205,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close); Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose); Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose); + Optional.ofNullable(sender).ifPresent(Sender::close); + Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close); + Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close); channelPool.close(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org