This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 314b6a74385582bff888132fcb6ea617e0a929c5
Author: Jean Helou <j...@xn--gml-cma.com>
AuthorDate: Thu Sep 19 14:43:55 2024 +0200

    [JAMES-3696] fixes broken MailQueueContract test
    
    The test started breaking when the deadletter policy was introduced on the 
pulsar mailqueue.
    It wasn't detected because test suites that are tagged unstable are never 
executed on CI (:scream:)
    
    The core issue is that the way the test was setup the same mail could end 
up being retried several times while others were being acknowledge in success 
immediately.
    As long as no dead letter policy was applied, it was fine and the email 
would be retried several times before being dequeued. With the dead letter 
policy this was no longer possible and the test never completed because at 
least one of the mail ended up in dead letter.
    
    I changed the test to track the state of each email and apply the proper 
action (retry or success) ensuring each mail is retried once and then 
acknowledged.
---
 .../apache/james/queue/api/MailQueueContract.java  | 37 ++++++++++++++++------
 1 file changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index 4430ec7d37..32c5b86537 100644
--- 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -35,6 +35,7 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -516,6 +517,12 @@ public interface MailQueueContract {
             .hasSize(totalDequeuedMessages);
     }
 
+    enum EnqueueDequeueSteps {
+        RETRY,
+        DEQUEUE,
+        END
+    }
+
     @Test
     default void concurrentEnqueueDequeueWithAckNackShouldNotFail() throws 
Exception {
         MailQueue testee = getMailQueue();
@@ -526,22 +533,34 @@ public interface MailQueueContract {
         int operationCount = 15;
         int totalDequeuedMessages = 50;
         LinkedBlockingDeque<MailQueue.MailQueueItem> deque = new 
LinkedBlockingDeque<>();
+        var mailStates = new ConcurrentHashMap<String, EnqueueDequeueSteps>();
         
Flux.from(testee.deQueue()).subscribeOn(SCHEDULER).doOnNext(deque::addFirst).subscribe();
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step % 3 == 0) {
+                    String name = "name" + threadNumber + "-" + step;
+                    mailStates.put(name, EnqueueDequeueSteps.RETRY);
                     testee.enQueue(defaultMail()
-                        .name("name" + threadNumber + "-" + step)
+                        .name(name)
                         .build());
-                }
-                if (step % 3 == 1) {
-                    MailQueue.MailQueueItem mailQueueItem = deque.takeLast();
-                    
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
-                }
-                if (step % 3 == 2) {
+                } else {
                     MailQueue.MailQueueItem mailQueueItem = deque.takeLast();
-                    dequeuedMails.add(mailQueueItem.getMail());
-                    
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
+                    var name = mailQueueItem.getMail().getName();
+                    var currentState = 
mailStates.get(mailQueueItem.getMail().getName());
+                    var nextState = switch (currentState) {
+                        case RETRY -> EnqueueDequeueSteps.DEQUEUE;
+                        case DEQUEUE -> EnqueueDequeueSteps.END;
+                        case END ->
+                                throw new IllegalStateException("trying to 
dequeue mail " + name + " multiple times !");
+                    };
+                    mailStates.put(name, nextState);
+                    if (currentState == EnqueueDequeueSteps.RETRY) {
+                        
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
+                    } else {
+                        dequeuedMails.add(mailQueueItem.getMail());
+                        
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
+                    }
+
                 }
             })
             .threadCount(threadCount)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to