This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d6c0a6b283161d4ee3b2f0d0f6f04804e9feb881 Author: Matthieu Baechler <[email protected]> AuthorDate: Tue Feb 18 15:47:05 2020 +0100 JAMES-3041 Change the way we control resources for Spooler mail processing For now, we controlled resources using a bounded threadpool for mail processing. However, it makes the concurrency of the dequeuing harder to achieve and doesn't leverage modern tools Reactor brings like explicit control over concurrency for a given Flux. With this commit we start using elastic Scheduler everywhere and control resources by specifying concurrency parameter. --- .../mailetcontainer/impl/JamesMailSpooler.java | 53 ++++++++-------------- .../mailetcontainer/impl/JamesMailSpoolerTest.java | 5 +- .../remote/delivery/RemoteDeliveryTest.java | 2 +- .../SetMessagesOutboxFlagUpdateTest.java | 4 +- .../routes/MailRepositoriesRoutesTest.java | 2 +- .../webadmin/service/ReprocessingServiceTest.java | 2 +- .../apache/james/queue/api/MailQueueFactory.java | 47 ++++++++++++++++++- .../james/queue/api/MailQueueFactoryTest.java | 26 +++++++++++ .../api/ManageableMailQueueFactoryContract.java | 6 +-- .../james/queue/file/FileMailQueueFactory.java | 4 +- .../queue/library/AbstractMailQueueFactory.java | 6 +-- .../james/queue/memory/MemoryMailQueueFactory.java | 6 +-- .../MemoryCacheableMailQueueFactoryTest.java | 6 +-- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 11 +++-- .../queue/rabbitmq/RabbitMQMailQueueFactory.java | 22 ++++----- 15 files changed, 130 insertions(+), 72 deletions(-) diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 28383cd..aaa86d7 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -22,7 +22,6 @@ package org.apache.james.mailetcontainer.impl; import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.PostConstruct; @@ -41,14 +40,12 @@ import org.apache.james.metrics.api.TimeMetric; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueue.MailQueueItem; import org.apache.james.queue.api.MailQueueFactory; -import org.apache.james.util.concurrent.NamedThreadFactory; import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** @@ -62,9 +59,10 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB public static final String SPOOL_PROCESSING = "spoolProcessing"; /** - * The number of threads used to move mail through the spool. + * concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async + * process. */ - private int numThreads; + private int concurrencyLevel; private final AtomicInteger processingActive = new AtomicInteger(0); @@ -77,11 +75,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private final MailQueueFactory<?> queueFactory; private reactor.core.Disposable disposable; - private Scheduler spooler; - private int parallelismLevel; private MailQueue queue; - @Inject public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) { this.metricFactory = metricFactory; @@ -91,10 +86,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @Override public void configure(HierarchicalConfiguration<ImmutableNode> config) { - numThreads = config.getInt("threads", 100); - //Reactor helps us run things in parallel but we have to ensure there are always threads available - //in the threadpool to avoid starvation. - parallelismLevel = Math.max(1, numThreads - 2); + concurrencyLevel = config.getInt("threads", 100); } /** @@ -103,18 +95,17 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @PostConstruct public void init() { LOGGER.info("init..."); - queue = queueFactory.createQueue(MailQueueFactory.SPOOL); - spooler = Schedulers.fromExecutor(Executors.newFixedThreadPool(numThreads, NamedThreadFactory.withName("spooler"))); - LOGGER.info("uses {} Thread(s)", numThreads); - run(); + LOGGER.info("Concurrency level is {}", concurrencyLevel); + queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel)); + disposable = run(queue); + LOGGER.info("Spooler started"); } - private void run() { - LOGGER.info("Queue={}", queue); - disposable = Flux.from(queue.deQueue()) - .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler), parallelismLevel) + private reactor.core.Disposable run(MailQueue queue) { + return Flux.from(queue.deQueue()) + .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel) .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)) - .subscribeOn(spooler) + .subscribeOn(Schedulers.elastic()) .subscribe(); } @@ -122,7 +113,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING); try { return Mono.fromCallable(processingActive::incrementAndGet) - .flatMap(ignore -> processMail(queueItem).subscribeOn(spooler)) + .flatMap(ignore -> processMail(queueItem)) .doOnSuccess(any -> timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)) .doOnSuccess(any -> processingActive.decrementAndGet()); } catch (Throwable e) { @@ -134,16 +125,12 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB return Mono .using( queueItem::getMail, - resource -> Mono.just(resource) - .doOnNext(mail -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName())) - .map(mail -> performProcessMail(queueItem, mail)) - .doOnNext(mail -> LOGGER.debug("==== End processing mail {} ====", mail.getName())), - LifecycleUtil::dispose) - .then(); + mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, mail)), + LifecycleUtil::dispose); } - - private Mail performProcessMail(MailQueueItem queueItem, Mail mail) { + private void performProcessMail(MailQueueItem queueItem, Mail mail) { + LOGGER.debug("==== Begin processing mail {} ====", mail.getName()); try { mailProcessor.service(mail); @@ -157,8 +144,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB } catch (MailQueue.MailQueueException ex) { throw new RuntimeException(e); } + } finally { + LOGGER.debug("==== End processing mail {} ====", mail.getName()); } - return mail; } /** @@ -174,7 +162,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB public void dispose() { LOGGER.info("start dispose() ..."); disposable.dispose(); - spooler.dispose(); try { queue.close(); } catch (IOException e) { @@ -185,7 +172,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @Override public int getThreadCount() { - return numThreads; + return concurrencyLevel; } @Override diff --git a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java index 0b62199..9485686 100644 --- a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java +++ b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; import static org.awaitility.Duration.TEN_SECONDS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -73,7 +74,7 @@ class JamesMailSpoolerTest { MailQueue queue = mock(MailQueue.class); workQueue.onNext(item); when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone)); - when(queueFactory.createQueue(MailQueueFactory.SPOOL)).thenAnswer(any -> queue); + when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue); doThrow(new RuntimeException("Arbitrary failure")) .doNothing() @@ -107,7 +108,7 @@ class JamesMailSpoolerTest { MailQueue queue = mock(MailQueue.class); workQueue.onNext(item); when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone)); - when(queueFactory.createQueue(MailQueueFactory.SPOOL)).thenAnswer(any -> queue); + when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue); doAnswer(ignored -> { Thread.currentThread().interrupt(); diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java index 1e62160..cfcc648 100644 --- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java +++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java @@ -99,7 +99,7 @@ public class RemoteDeliveryTest { @Before public void setUp() throws ConfigurationException { - MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); + MailQueueFactory<? extends ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); mailQueue = queueFactory.createQueue(RemoteDeliveryConfiguration.DEFAULT_OUTGOING_QUEUE_NAME); DNSService dnsService = mock(DNSService.class); MemoryDomainList domainList = new MemoryDomainList(dnsService); diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java index 0f82362..bf21d1e 100644 --- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java +++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java @@ -73,12 +73,12 @@ public abstract class SetMessagesOutboxFlagUpdateTest { protected MailQueueFactory<MailQueue> noopMailQueueFactory = new MailQueueFactory<MailQueue>() { @Override - public Optional<MailQueue> getQueue(MailQueueName name) { + public Optional<MailQueue> getQueue(MailQueueName name, PrefetchCount prefetchCount) { return Optional.of(createQueue(name)); } @Override - public MailQueue createQueue(MailQueueName name) { + public MailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) { return new MailQueue() { @Override public void close() throws IOException { diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java index 613c506..b69d674 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java @@ -114,7 +114,7 @@ public class MailRepositoriesRoutesTest { MemoryTaskManager taskManager = new MemoryTaskManager(new Hostname("foo")); JsonTransformer jsonTransformer = new JsonTransformer(); - MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); + MailQueueFactory<? extends ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL); customQueue = queueFactory.createQueue(CUSTOM_QUEUE); diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java index 994c7aa..02cbc0d 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java @@ -64,7 +64,7 @@ public class ReprocessingServiceTest { private ReprocessingService reprocessingService; private MemoryMailRepositoryStore mailRepositoryStore; - private MailQueueFactory<ManageableMailQueue> queueFactory; + private MailQueueFactory<? extends ManageableMailQueue> queueFactory; private FakeMail mail1; private FakeMail mail2; private FakeMail mail3; diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java index 3f0a5dc..35a4967 100644 --- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java +++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java @@ -22,11 +22,46 @@ package org.apache.james.queue.api; import java.util.Optional; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * Factory for {@link MailQueue} */ public interface MailQueueFactory<T extends MailQueue> { + static PrefetchCount defaultPrefetchCount() { + return prefetchCount(5); + } + + static PrefetchCount prefetchCount(int count) { + return new PrefetchCount(count); + } + + /** + * {@link PrefetchCount} provides producers insights about what kind of load consumers expect. + * If you expect to consume the mailqueue with 10 concurrent workers, it's a good idea to + * ensure the producer will fill the stream with at least 10 elements. + */ + class PrefetchCount { + private final int value; + + @VisibleForTesting + PrefetchCount(int value) { + Preconditions.checkArgument(value >= 0, "only non-negative values are allowed"); + this.value = value; + } + + public int asInt() { + return value; + } + + @Override + public String toString() { + return "PrefetchCount = " + value; + } + } + /** * {@link MailQueue} which is used for spooling the messages */ @@ -38,9 +73,17 @@ public interface MailQueueFactory<T extends MailQueue> { * @param name * @return queue */ - Optional<T> getQueue(MailQueueName name); + default Optional<T> getQueue(MailQueueName name) { + return getQueue(name, defaultPrefetchCount()); + } + + Optional<T> getQueue(MailQueueName name, PrefetchCount count); + + default T createQueue(MailQueueName name) { + return createQueue(name, defaultPrefetchCount()); + } - T createQueue(MailQueueName name); + T createQueue(MailQueueName name, PrefetchCount count); Set<MailQueueName> listCreatedMailQueues(); } diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java new file mode 100644 index 0000000..c565603 --- /dev/null +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java @@ -0,0 +1,26 @@ +package org.apache.james.queue.api; + +import static org.junit.jupiter.api.Assertions.*; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +class MailQueueFactoryTest { + + @Test + void prefetchCountShouldNotBeNegative() { + Assertions.assertThatThrownBy(() -> new MailQueueFactory.PrefetchCount(-12)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void prefetchCountCouldBeZero() { + Assertions.assertThatCode(() -> new MailQueueFactory.PrefetchCount(0)).doesNotThrowAnyException(); + } + + @Test + void prefetchCountCouldBePositive() { + Assertions.assertThatCode(() -> new MailQueueFactory.PrefetchCount(12)).doesNotThrowAnyException(); + } + +} \ No newline at end of file diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java index 5ae065f..fc671b8 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java @@ -25,15 +25,15 @@ import javax.mail.MessagingException; import org.junit.jupiter.api.Test; -public interface ManageableMailQueueFactoryContract { +public interface ManageableMailQueueFactoryContract<T extends ManageableMailQueue> { MailQueueName NAME_1 = MailQueueName.of("name1"); - MailQueueFactory<ManageableMailQueue> getMailQueueFactory(); + MailQueueFactory<T> getMailQueueFactory(); @Test default void createMailQueueShouldNotConflictIfAlreadyExists() throws MessagingException { - MailQueueFactory<ManageableMailQueue> mailQueueFactory = getMailQueueFactory(); + MailQueueFactory<T> mailQueueFactory = getMailQueueFactory(); MailQueue firstCreation = mailQueueFactory.createQueue(NAME_1); firstCreation.enQueue(Mails.defaultMail().name("name").build()); diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java index 0587620..f727bcd 100644 --- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java +++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java @@ -76,12 +76,12 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu } @Override - public Optional<ManageableMailQueue> getQueue(MailQueueName name) { + public Optional<ManageableMailQueue> getQueue(MailQueueName name, PrefetchCount prefetchCount) { return Optional.ofNullable(queues.get(name)); } @Override - public ManageableMailQueue createQueue(MailQueueName name) { + public ManageableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) { return queues.computeIfAbsent(name, mailQueueName -> { try { return new FileCacheableMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), mailQueueName, sync); diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java index 5fa22ef..052b0d2 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java @@ -96,13 +96,13 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M } @Override - public final synchronized Optional<T> getQueue(MailQueueName name) { + public final synchronized Optional<T> getQueue(MailQueueName name, PrefetchCount prefetchCount) { return Optional.ofNullable(queues.get(name)); } @Override - public synchronized T createQueue(MailQueueName name) { - return getQueue(name).orElseGet(() -> createAndRegisterQueue(name)); + public synchronized T createQueue(MailQueueName name, PrefetchCount prefetchCount) { + return getQueue(name, prefetchCount).orElseGet(() -> createAndRegisterQueue(name)); } private T createAndRegisterQueue(MailQueueName name) { diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index e13cb10..da9e8b1 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -58,7 +58,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> { +public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> { private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> mailQueues; private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory; @@ -78,12 +78,12 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu } @Override - public Optional<ManageableMailQueue> getQueue(MailQueueName name) { + public Optional<MemoryCacheableMailQueue> getQueue(MailQueueName name, PrefetchCount count) { return Optional.ofNullable(mailQueues.get(name)); } @Override - public MemoryCacheableMailQueue createQueue(MailQueueName name) { + public MemoryCacheableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) { return mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory)); } diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java index aa2e294..a50722b 100644 --- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java +++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java @@ -21,12 +21,12 @@ package org.apache.james.queue.memory; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueFactoryContract; -import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.ManageableMailQueueFactoryContract; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.junit.jupiter.api.BeforeEach; -class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { +class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<MemoryMailQueueFactory.MemoryCacheableMailQueue>, + ManageableMailQueueFactoryContract<MemoryMailQueueFactory.MemoryCacheableMailQueue> { MemoryMailQueueFactory memoryMailQueueFactory; @@ -36,7 +36,7 @@ class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<Ma } @Override - public MailQueueFactory<ManageableMailQueue> getMailQueueFactory() { + public MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> getMailQueueFactory() { return memoryMailQueueFactory; } } \ No newline at end of file diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 64c0c93..b22d850 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -30,6 +30,7 @@ import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; @@ -39,13 +40,13 @@ import com.rabbitmq.client.Delivery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.Receiver; class Dequeuer implements Closeable { private static final boolean REQUEUE = true; - private static final int EXECUTION_RATE = 5; private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { @@ -84,14 +85,14 @@ class Dequeuer implements Closeable { Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory, - MailQueueView mailQueueView) { + MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) { this.mailLoader = mailLoader; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); this.receiver = receiverProvider.createReceiver(); this.flux = this.receiver - .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE)) + .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(prefetchCount.asInt())) .filter(getResponse -> getResponse.getBody() != null); } @@ -101,8 +102,8 @@ class Dequeuer implements Closeable { } Flux<? extends MailQueue.MailQueueItem> deQueue() { - return flux.concatMap(this::loadItem) - .concatMap(this::filterIfDeleted); + return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic())) + .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic())); } private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index 3805e74..fc2f784 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -93,7 +93,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu this.configuration = configuration; } - RabbitMQMailQueue create(MailQueueName mailQueueName) { + RabbitMQMailQueue create(MailQueueName mailQueueName, PrefetchCount prefetchCount) { MailQueueView mailQueueView = mailQueueViewFactory.create(mailQueueName); mailQueueView.initialize(mailQueueName); @@ -103,7 +103,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer, metricFactory, mailQueueView, clock), new Dequeuer(mailQueueName, receiverProvider, mailLoader, mailReferenceSerializer, - metricFactory, mailQueueView), + metricFactory, mailQueueView, prefetchCount), mailQueueView, decoratorFactory); @@ -133,15 +133,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu } @Override - public Optional<RabbitMQMailQueue> getQueue(org.apache.james.queue.api.MailQueueName name) { - return getQueueFromRabbitServer(MailQueueName.fromString(name.asString())); + public Optional<RabbitMQMailQueue> getQueue(org.apache.james.queue.api.MailQueueName name, PrefetchCount count) { + return getQueueFromRabbitServer(MailQueueName.fromString(name.asString()), count); } @Override - public RabbitMQMailQueue createQueue(org.apache.james.queue.api.MailQueueName name) { + public RabbitMQMailQueue createQueue(org.apache.james.queue.api.MailQueueName name, PrefetchCount count) { MailQueueName mailQueueName = MailQueueName.fromString(name.asString()); - return getQueueFromRabbitServer(mailQueueName) - .orElseGet(() -> createQueueIntoRabbitServer(mailQueueName)); + return getQueueFromRabbitServer(mailQueueName, count) + .orElseGet(() -> createQueueIntoRabbitServer(mailQueueName, count)); } @Override @@ -152,7 +152,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu .collect(ImmutableSet.toImmutableSet()); } - private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) { + private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName, PrefetchCount prefetchCount) { String exchangeName = mailQueueName.toRabbitExchangeName().asString(); Flux.concat( sender.declareExchange(ExchangeSpecification.exchange(exchangeName) @@ -169,13 +169,13 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu .routingKey(EMPTY_ROUTING_KEY))) .then() .block(); - return privateFactory.create(mailQueueName); + return privateFactory.create(mailQueueName, prefetchCount); } - private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name) { + private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name, PrefetchCount prefetchCount) { return mqManagementApi.listCreatedMailQueueNames() .filter(name::equals) - .map(privateFactory::create) + .map(queueName -> privateFactory.create(queueName, prefetchCount)) .findFirst(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
