This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5e582a5eb829ed4b0571bd8e04d0ead48bf3294f Author: Rémi KOWALSKI <[email protected]> AuthorDate: Thu Nov 14 14:01:11 2019 +0100 JAMES-2979 add test to ensure that dequeuing from the mailqueue should be done multiple elements at the same time --- .../queue/activemq/ActiveMQMailQueueBlobTest.java | 4 +++ .../queue/activemq/ActiveMQMailQueueTest.java | 6 +++- .../apache/james/queue/api/MailQueueContract.java | 35 +++++++++++++++++++++- .../apache/james/queue/jms/JMSMailQueueTest.java | 6 +++- 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java index 1b021d0..b2c3cea 100644 --- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java +++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.time.temporal.ChronoUnit; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.commons.io.FileUtils; import org.apache.james.filesystem.api.FileSystem; @@ -65,6 +66,9 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) { fileSystem = new MyFileSystem(); ActiveMQConnectionFactory connectionFactory = createConnectionFactory(); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(0); + connectionFactory.setPrefetchPolicy(prefetchPolicy); FileSystemBlobTransferPolicy policy = new FileSystemBlobTransferPolicy(); policy.setFileSystem(fileSystem); policy.setDefaultUploadUrl(BASE_DIR); diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java index acfdbfb..609cef1 100644 --- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java +++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java @@ -21,6 +21,7 @@ package org.apache.james.queue.activemq; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.MetricFactory; @@ -51,7 +52,10 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract @BeforeEach public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(0); + connectionFactory.setPrefetchPolicy(prefetchPolicy); RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory(); MetricFactory metricFactory = metricTestSystem.getMetricFactory(); GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry(); diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index f1d57c8..cec9783 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import java.util.stream.Stream; import javax.mail.internet.MimeMessage; @@ -49,12 +50,12 @@ import org.apache.mailet.Attribute; import org.apache.mailet.Mail; import org.apache.mailet.PerRecipientHeaders; import org.apache.mailet.base.test.FakeMail; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import com.github.fge.lambdas.Throwing; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -510,6 +511,38 @@ public interface MailQueueContract { .hasSize(totalDequeuedMessages); } + @Test + default void dequeueShouldBeConcurrent() { + MailQueue testee = getMailQueue(); + int NB_MAILS = 1000; + IntStream.range(0, NB_MAILS) + .forEach(Throwing.intConsumer(i -> testee.enQueue(defaultMail() + .name("name" + i) + .build()))); + + ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>(); + + Flux.from(testee.deQueue()) + .flatMap(item -> Mono.defer(() -> { + dequeuedMails.add(item.getMail()); + try { + Thread.sleep(100); + item.done(true); + return Mono.empty(); + } catch (MailQueue.MailQueueException | InterruptedException e) { + return Mono.error(e); + } + }).subscribeOn(Schedulers.elastic()), 1000 + ) + .subscribeOn(Schedulers.newSingle("foo")) + .subscribe(); + + Awaitility.await() + .atMost(org.awaitility.Duration.ONE_MINUTE) + .until(() -> dequeuedMails.size() >= NB_MAILS); + + } + class SerializableAttribute implements Serializable { private final String value; diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java index 8f92851..f436a1d 100644 --- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java +++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java @@ -22,6 +22,7 @@ package org.apache.james.queue.jms; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.MetricFactory; @@ -47,7 +48,10 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri @BeforeEach void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(0); + connectionFactory.setPrefetchPolicy(prefetchPolicy); RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory(); MetricFactory metricFactory = metricTestSystem.getMetricFactory(); GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
