This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e48da3073ddef517b2035cc73119b400700b2e5f
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed May 3 11:25:28 2023 +0700

    JAMES-3924 Test and fix browse start updates
    
     -> Propose unit tests for browseStart updates using Cassandra session 
instrumentation
       (Ease testing a behaviour not visible at the API level)
     -> The update limitation to not occur more than once er slice was not 
correctly applied
     -> Fix a couple of warnings
---
 .../cassandra/CassandraMailQueueMailDelete.java    | 13 +++++-----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 29 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index 4d4cdc818e..d519c534ba 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -93,12 +93,13 @@ public class CassandraMailQueueMailDelete {
     }
 
     private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
-        Slice currentSlice = Slice.of(clock.instant());
+        Instant now= clock.instant();
         return browseStartDao.findBrowseStart(mailQueueName)
-            .filter(browseStart -> 
browseStart.isBefore(currentSlice.getStartSliceInstant()))
-            .flatMapMany(browseStart -> 
cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart))
-            .map(enqueuedItem -> 
enqueuedItem.getSlicingContext().getTimeRangeStart())
-            .next();
+            .filter(browseStart -> 
browseStart.isBefore(now.minus(configuration.getSliceWindow())))
+            .flatMap(browseStart -> 
cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart)
+                .map(enqueuedItem -> 
enqueuedItem.getSlicingContext().getTimeRangeStart())
+                .next()
+                .filter(newBrowseStart -> 
newBrowseStart.isAfter(browseStart)));
     }
 
     private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, 
Instant newBrowseStartInstant) {
@@ -128,6 +129,6 @@ public class CassandraMailQueueMailDelete {
 
     private boolean shouldUpdateBrowseStart() {
         int threshold = configuration.getUpdateBrowseStartPace();
-        return Math.abs(ThreadLocalRandom.current().nextInt()) % threshold == 
0;
+        return ThreadLocalRandom.current().nextInt(threshold) % threshold == 0;
     }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 0e037d5ca3..124a40ab3a 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -25,6 +25,7 @@ import static java.time.temporal.ChronoUnit.HOURS;
 import static 
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
+import static 
org.apache.james.backends.cassandra.StatementRecorder.Selector.preparedStatementStartingWith;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.apache.james.queue.api.Mails.defaultMailNoRecipient;
@@ -54,6 +55,7 @@ import java.util.stream.Stream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.StatementRecorder;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
@@ -90,6 +92,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
@@ -107,7 +110,7 @@ import reactor.rabbitmq.Sender;
 class RabbitMQMailQueueTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
     private static final int THREE_BUCKET_COUNT = 3;
-    private static final int UPDATE_BROWSE_START_PACE = 2;
+    private static final int UPDATE_BROWSE_START_PACE = 10;
     private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
     private static final org.apache.james.queue.api.MailQueueName SPOOL = 
org.apache.james.queue.api.MailQueueName.of("spool");
     private static final Instant IN_SLICE_1 = Instant.now().minus(60, DAYS);
@@ -197,6 +200,30 @@ class RabbitMQMailQueueTest {
                 "5-1", "5-2", "5-3", "5-4", "5-5");
         }
 
+        @Test
+        void browseStartShouldBeUpdated(CassandraCluster cassandraCluster) {
+            int emailCount = 100;
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandraCluster.getConf().recordStatements(statementRecorder);
+
+            clock.setInstant(IN_SLICE_1);
+            enqueueSomeMails(namePatternForSlice(1), emailCount);
+            dequeueMails(emailCount);
+
+            clock.setInstant(IN_SLICE_2);
+            enqueueSomeMails(namePatternForSlice(2), emailCount);
+            dequeueMails(emailCount);
+
+            clock.setInstant(IN_SLICE_3);
+            enqueueSomeMails(namePatternForSlice(3), emailCount);
+            dequeueMails(emailCount);
+
+            // The actual rate of update should actually be lower than the 
update probability.
+            
assertThat(statementRecorder.listExecutedStatements(preparedStatementStartingWith("UPDATE
 browsestart")))
+                .hasSizeBetween(2, 5);
+        }
+
         @Test
         void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws 
Exception {
             String name1 = "myMail1";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to