This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 98f9ed49ac5c2d0b97af592ca1d914e52ce108ac
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Fri Jun 14 11:06:28 2019 +0700

    JAMES-2794 RabbitMQ should not dequeue deleted elements
---
 .../java/org/apache/james/queue/rabbitmq/Dequeuer.java  | 17 +++++++++++++++--
 .../james/queue/rabbitmq/RabbitMQMailQueueTest.java     |  8 --------
 2 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index d053096..a2e7a7e 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -34,6 +34,7 @@ import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
 import com.rabbitmq.client.Delivery;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.AcknowledgableDelivery;
@@ -80,8 +81,20 @@ class Dequeuer {
             .filter(getResponse -> getResponse.getBody() != null);
     }
 
-    Flux<MailQueue.MailQueueItem> deQueue() {
-        return flux.flatMap(this::loadItem);
+    Flux<? extends MailQueue.MailQueueItem> deQueue() {
+        return flux.flatMap(this::loadItem)
+            .flatMap(this::filterIfDeleted);
+    }
+
+    private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem 
item) {
+        return mailQueueView.isPresent(item.getMail())
+            .flatMap(isPresent -> {
+                if (isPresent) {
+                    return Mono.just(item);
+                }
+                item.done(true);
+                return Mono.empty();
+            });
     }
 
     private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery 
response) {
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 65a6f20..f6d57c6 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -231,14 +231,6 @@ public class RabbitMQMailQueueTest implements 
ManageableMailQueueContract, MailQ
 
     }
 
-    @Disabled("JAMES-2733 Deleted elements are still dequeued")
-    @Test
-    @Override
-    public void deletedElementsShouldNotBeDequeued() {
-
-    }
-
-
     private void enqueueSomeMails(Function<Integer, String> namePattern, int 
emailCount) {
         IntStream.rangeClosed(1, emailCount)
             .forEach(Throwing.intConsumer(i -> enQueue(defaultMail()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to