This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.7.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 952271012d1e509632c32b21a685d5e538d28090 Author: Benoit Tellier <[email protected]> AuthorDate: Sat Jul 1 19:03:40 2023 +0700 JAMES-3924 Fix checkstyles and some unstable tests Co-authored-by: Rene Cordier <[email protected]> Co-authored-by: Quan Tran <[email protected]> --- .../view/cassandra/CassandraMailQueueBrowser.java | 1 - .../cassandra/CassandraMailQueueMailDelete.java | 2 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 61 ++++++++++++---------- 3 files changed, 33 insertions(+), 31 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 9c1241351d..40ca47a197 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -21,7 +21,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; -import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; import java.time.Clock; import java.time.Instant; diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index b5ef41f2c9..76a599a3bf 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -93,7 +93,7 @@ public class CassandraMailQueueMailDelete { } private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) { - Instant now= clock.instant(); + Instant now = clock.instant(); return browseStartDao.findBrowseStart(mailQueueName) .filter(browseStart -> browseStart.isBefore(now.minus(configuration.getSliceWindow()))) .flatMap(browseStart -> cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index eee81a381f..25623493b8 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -64,10 +64,12 @@ import org.apache.james.blob.cassandra.CassandraBlobStoreFactory; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.core.builder.MimeMessageBuilder; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule; +import org.apache.james.junit.categories.Unstable; import org.apache.james.lifecycle.api.LifecycleUtil; import org.apache.james.metrics.api.Gauge; import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueMetricContract; import org.apache.james.queue.api.MailQueueMetricExtension; import org.apache.james.queue.api.ManageableMailQueue; @@ -86,12 +88,12 @@ import org.apache.james.utils.UpdatableTickingClock; import org.apache.mailet.Mail; import org.apache.mailet.base.test.FakeMail; import org.assertj.core.api.SoftAssertions; -import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; @@ -109,7 +111,7 @@ import reactor.rabbitmq.Sender; class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static final int THREE_BUCKET_COUNT = 3; - private static final int UPDATE_BROWSE_START_PACE = 10; + private static final int UPDATE_BROWSE_START_PACE = 25; private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1); private static final org.apache.james.queue.api.MailQueueName SPOOL = org.apache.james.queue.api.MailQueueName.of("spool"); private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z"); @@ -198,13 +200,14 @@ class RabbitMQMailQueueTest { "5-1", "5-2", "5-3", "5-4", "5-5"); } - @Test + @RepeatedTest(50) + @Tag(Unstable.TAG) void browseStartShouldBeUpdated(CassandraCluster cassandraCluster) { - int emailCount = 100; + int emailCount = 250; StatementRecorder.Selector selector = preparedStatementStartingWith("UPDATE browsestart"); StatementRecorder statementRecorder = cassandraCluster.getConf() - .recordStatements(selector); + .recordStatements(selector); clock.setInstant(IN_SLICE_1); enqueueSomeMails(namePatternForSlice(1), emailCount); @@ -220,12 +223,13 @@ class RabbitMQMailQueueTest { // The actual rate of update should actually be lower than the update probability. assertThat(statementRecorder.listExecutedStatements(selector)) - .hasSizeBetween(2, 9); + .hasSizeBetween(2, 12); } - @Test + @RepeatedTest(50) + @Tag(Unstable.TAG) void contentStartShouldBeUpdated(CassandraCluster cassandraCluster) { - int emailCount = 100; + int emailCount = 250; StatementRecorder.Selector selector = preparedStatementStartingWith("UPDATE contentstart"); StatementRecorder statementRecorder = cassandraCluster.getConf().recordStatements(selector); @@ -244,7 +248,7 @@ class RabbitMQMailQueueTest { // The actual rate of update should actually be lower than the update probability. assertThat(statementRecorder.listExecutedStatements(selector)) - .hasSizeBetween(2, 9); + .hasSizeBetween(2, 12); } @Test @@ -352,7 +356,7 @@ class RabbitMQMailQueueTest { @Test void enqueuedEmailsShouldEventuallyBeCleaned() { ManageableMailQueue mailQueue = getManageableMailQueue(); - int emailCount = 5; + int emailCount = 100; clock.setInstant(IN_SLICE_1); enqueueSomeMails(namePatternForSlice(1), emailCount); @@ -367,10 +371,10 @@ class RabbitMQMailQueueTest { enqueueSomeMails(namePatternForSlice(5), emailCount); clock.setInstant(IN_SLICE_7); - dequeueMails(5); - dequeueMails(5); - dequeueMails(5); - dequeueMails(5); + dequeueMails(emailCount); + dequeueMails(emailCount); + dequeueMails(emailCount); + dequeueMails(emailCount); // ensure slice 1 was cleaned EnqueuedMailsDAO mailsDAO = new EnqueuedMailsDAO(cassandraCluster.getCassandraCluster().getConf(), new HashBlobId.Factory()); @@ -696,8 +700,8 @@ class RabbitMQMailQueueTest { IntStream.rangeClosed(1, emailCount) .forEach(Throwing.intConsumer(i -> { FakeMail mail = defaultMail() - .name(namePattern.apply(i)) - .build(); + .name(namePattern.apply(i)) + .build(); enQueue(mail); LifecycleUtil.dispose(mail); })); @@ -720,8 +724,8 @@ class RabbitMQMailQueueTest { try { await() - .atMost(Duration.ofMinutes(10)) - .untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times)); + .atMost(Duration.ofMinutes(10)) + .untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times)); } finally { disposable.dispose(); } @@ -980,18 +984,17 @@ class RabbitMQMailQueueTest { .setSubject(identicalSubject) .setText(identicalContent)) .build()); - - List<MailQueue.MailQueueItem> items = dequeueFlux.take(2) - .concatMap(mailQueueItem -> Mono.fromCallable(() -> { - mailQueueItem.done(true); - return mailQueueItem; - })) + + Flux.from(mailQueue.deQueue()) + .take(2) + .concatMap(mailQueueItem -> + Mono.fromCallable(() -> { + assertThat(mailQueueItem.getMail().getMessage().getContent()).isEqualTo(identicalContent); + mailQueueItem.done(true); + return mailQueueItem; + }).subscribeOn(Schedulers.elastic())) .collectList() .block(Duration.ofSeconds(10)); - - assertThat(items) - .allSatisfy(Throwing.consumer(item -> assertThat(item.getMail().getMessage().getContent()) - .isEqualTo(identicalContent))); } } @@ -1022,6 +1025,6 @@ class RabbitMQMailQueueTest { configuration); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, factory, rabbitMQExtension.getRabbitMQ().getConfiguration()); - mailQueue = mailQueueFactory.createQueue(SPOOL); + mailQueue = mailQueueFactory.createQueue(SPOOL, MailQueueFactory.prefetchCount(3)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
