This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 98f9ed49ac5c2d0b97af592ca1d914e52ce108ac Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jun 14 11:06:28 2019 +0700 JAMES-2794 RabbitMQ should not dequeue deleted elements --- .../java/org/apache/james/queue/rabbitmq/Dequeuer.java | 17 +++++++++++++++-- .../james/queue/rabbitmq/RabbitMQMailQueueTest.java | 8 -------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index d053096..a2e7a7e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -34,6 +34,7 @@ import org.apache.mailet.Mail; import com.github.fge.lambdas.consumers.ThrowingConsumer; import com.rabbitmq.client.Delivery; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.rabbitmq.AcknowledgableDelivery; @@ -80,8 +81,20 @@ class Dequeuer { .filter(getResponse -> getResponse.getBody() != null); } - Flux<MailQueue.MailQueueItem> deQueue() { - return flux.flatMap(this::loadItem); + Flux<? extends MailQueue.MailQueueItem> deQueue() { + return flux.flatMap(this::loadItem) + .flatMap(this::filterIfDeleted); + } + + private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) { + return mailQueueView.isPresent(item.getMail()) + .flatMap(isPresent -> { + if (isPresent) { + return Mono.just(item); + } + item.done(true); + return Mono.empty(); + }); } private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) { diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 65a6f20..f6d57c6 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -231,14 +231,6 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ } - @Disabled("JAMES-2733 Deleted elements are still dequeued") - @Test - @Override - public void deletedElementsShouldNotBeDequeued() { - - } - - private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) { IntStream.rangeClosed(1, emailCount) .forEach(Throwing.intConsumer(i -> enQueue(defaultMail() --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org