This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.8.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e48da3073ddef517b2035cc73119b400700b2e5f Author: Benoit Tellier <[email protected]> AuthorDate: Wed May 3 11:25:28 2023 +0700 JAMES-3924 Test and fix browse start updates -> Propose unit tests for browseStart updates using Cassandra session instrumentation (Ease testing a behaviour not visible at the API level) -> The update limitation to not occur more than once er slice was not correctly applied -> Fix a couple of warnings --- .../cassandra/CassandraMailQueueMailDelete.java | 13 +++++----- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 29 +++++++++++++++++++++- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index 4d4cdc818e..d519c534ba 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -93,12 +93,13 @@ public class CassandraMailQueueMailDelete { } private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) { - Slice currentSlice = Slice.of(clock.instant()); + Instant now= clock.instant(); return browseStartDao.findBrowseStart(mailQueueName) - .filter(browseStart -> browseStart.isBefore(currentSlice.getStartSliceInstant())) - .flatMapMany(browseStart -> cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart)) - .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart()) - .next(); + .filter(browseStart -> browseStart.isBefore(now.minus(configuration.getSliceWindow()))) + .flatMap(browseStart -> cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart) + .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart()) + .next() + .filter(newBrowseStart -> newBrowseStart.isAfter(browseStart))); } private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Instant newBrowseStartInstant) { @@ -128,6 +129,6 @@ public class CassandraMailQueueMailDelete { private boolean shouldUpdateBrowseStart() { int threshold = configuration.getUpdateBrowseStartPace(); - return Math.abs(ThreadLocalRandom.current().nextInt()) % threshold == 0; + return ThreadLocalRandom.current().nextInt(threshold) % threshold == 0; } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 0e037d5ca3..124a40ab3a 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -25,6 +25,7 @@ import static java.time.temporal.ChronoUnit.HOURS; import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty; +import static org.apache.james.backends.cassandra.StatementRecorder.Selector.preparedStatementStartingWith; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.queue.api.Mails.defaultMail; import static org.apache.james.queue.api.Mails.defaultMailNoRecipient; @@ -54,6 +55,7 @@ import java.util.stream.Stream; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.StatementRecorder; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.rabbitmq.RabbitMQExtension; @@ -90,6 +92,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; @@ -107,7 +110,7 @@ import reactor.rabbitmq.Sender; class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static final int THREE_BUCKET_COUNT = 3; - private static final int UPDATE_BROWSE_START_PACE = 2; + private static final int UPDATE_BROWSE_START_PACE = 10; private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1); private static final org.apache.james.queue.api.MailQueueName SPOOL = org.apache.james.queue.api.MailQueueName.of("spool"); private static final Instant IN_SLICE_1 = Instant.now().minus(60, DAYS); @@ -197,6 +200,30 @@ class RabbitMQMailQueueTest { "5-1", "5-2", "5-3", "5-4", "5-5"); } + @Test + void browseStartShouldBeUpdated(CassandraCluster cassandraCluster) { + int emailCount = 100; + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandraCluster.getConf().recordStatements(statementRecorder); + + clock.setInstant(IN_SLICE_1); + enqueueSomeMails(namePatternForSlice(1), emailCount); + dequeueMails(emailCount); + + clock.setInstant(IN_SLICE_2); + enqueueSomeMails(namePatternForSlice(2), emailCount); + dequeueMails(emailCount); + + clock.setInstant(IN_SLICE_3); + enqueueSomeMails(namePatternForSlice(3), emailCount); + dequeueMails(emailCount); + + // The actual rate of update should actually be lower than the update probability. + assertThat(statementRecorder.listExecutedStatements(preparedStatementStartingWith("UPDATE browsestart"))) + .hasSizeBetween(2, 5); + } + @Test void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { String name1 = "myMail1"; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
