This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.7.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8a70627ac20f60802764b3ed323201b1aad075a2 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Mar 4 21:16:06 2022 +0700 JAMES-3720 RabbitMQMailQueue::requeue need to dispose emails Failure to do so will lead to a temporary file leak... (cherry picked from commit 02aee6e965eef8ab192b9b3cb4c6275989b24be6) --- .../main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java | 9 ++++++++- .../java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java | 2 ++ .../view/cassandra/model/EnqueuedItemWithSlicingContext.java | 8 +++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java index 4bdc4a9..daa424a 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java @@ -23,11 +23,13 @@ import java.time.Instant; import java.util.Objects; import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.lifecycle.api.Disposable; +import org.apache.james.lifecycle.api.LifecycleUtil; import org.apache.mailet.Mail; import com.google.common.base.Preconditions; -public class EnqueuedItem { +public class EnqueuedItem implements Disposable { interface Builder { @@ -122,6 +124,11 @@ public class EnqueuedItem { } @Override + public void dispose() { + LifecycleUtil.dispose(mail); + } + + @Override public final boolean equals(Object o) { if (o instanceof EnqueuedItem) { EnqueuedItem that = (EnqueuedItem) o; diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 85f651d..602e679 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -40,6 +40,7 @@ import com.google.common.base.MoreObjects; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class RabbitMQMailQueue implements ManageableMailQueue { @@ -138,6 +139,7 @@ public class RabbitMQMailQueue implements ManageableMailQueue { public Flux<String> republishNotProcessedMails(Instant olderThan) { Function<CassandraMailQueueBrowser.CassandraMailQueueItemView, Mono<String>> requeue = item -> enqueuer.reQueue(item) + .then(Mono.fromRunnable(item::dispose).subscribeOn(Schedulers.elastic())) .thenReturn(item.getMail().getName()); return mailQueueView.browseOlderThanReactive(olderThan) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java index 45ff74c..b154b87 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java @@ -22,11 +22,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model; import java.time.Instant; import java.util.Objects; +import org.apache.james.lifecycle.api.Disposable; import org.apache.james.queue.rabbitmq.EnqueuedItem; import com.google.common.base.Preconditions; -public class EnqueuedItemWithSlicingContext { +public class EnqueuedItemWithSlicingContext implements Disposable { public static class SlicingContext { @@ -117,6 +118,11 @@ public class EnqueuedItemWithSlicingContext { } @Override + public void dispose() { + enqueuedItem.dispose(); + } + + @Override public final boolean equals(Object o) { if (o instanceof EnqueuedItemWithSlicingContext) { EnqueuedItemWithSlicingContext that = (EnqueuedItemWithSlicingContext) o; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
