This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5d6fc84b6ac290e2c4025ec0d8709e7c83f7f118 Author: Benoit Tellier <[email protected]> AuthorDate: Fri May 29 14:01:17 2020 +0700 JAMES-3197 Prevent infinite loop upon Error of Mail Processing Leverage a failure count on top of the mailQueue, and use error mail repository. --- .../org/apache/james/mailets/MailetErrorsTest.java | 86 +++++++++-- ...ErrorMailet.java => OneRuntimeErrorMailet.java} | 10 +- ...rrorMailet.java => OneThreadSuicideMailet.java} | 12 +- .../transport/mailets/RuntimeErrorMailet.java | 2 +- ...rrorMailet.java => RuntimeExceptionMailet.java} | 2 +- ...orMatcher.java => RuntimeExceptionMatcher.java} | 2 +- .../mailetcontainer/impl/JamesMailSpooler.java | 80 +++++++++- .../mailetcontainer/impl/JamesMailSpoolerTest.java | 163 --------------------- 8 files changed, 170 insertions(+), 187 deletions(-) diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java index b1cc490..646fe0b 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java @@ -38,8 +38,11 @@ import org.apache.james.transport.mailets.ErrorMatcher; import org.apache.james.transport.mailets.NoClassDefFoundErrorMatcher; import org.apache.james.transport.mailets.NoopMailet; import org.apache.james.transport.mailets.Null; +import org.apache.james.transport.mailets.OneRuntimeErrorMailet; +import org.apache.james.transport.mailets.OneThreadSuicideMailet; import org.apache.james.transport.mailets.RuntimeErrorMailet; -import org.apache.james.transport.mailets.RuntimeErrorMatcher; +import org.apache.james.transport.mailets.RuntimeExceptionMailet; +import org.apache.james.transport.mailets.RuntimeExceptionMatcher; import org.apache.james.transport.mailets.ToRepository; import org.apache.james.transport.matchers.All; import org.apache.james.transport.matchers.HasException; @@ -135,6 +138,71 @@ public class MailetErrorsTest { .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() .matcher(All.class) + .mailet(RuntimeExceptionMailet.class)))) + .build(temporaryFolder.newFolder()); + MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM); + + awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(ERROR_REPOSITORY) == 1); + } + + @Test + public void spoolerShouldEventuallyProcessUponTemporaryError() throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_ONLY_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(CommonProcessors.deliverOnlyTransport()) + .putProcessor(errorProcessor()) + .putProcessor(ProcessorConfiguration.root() + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(OneRuntimeErrorMailet.class)) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(ToRepository.class) + .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString())))) + .build(temporaryFolder.newFolder()); + MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM); + + awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(CUSTOM_REPOSITORY) == 1); + } + + @Test + public void spoolerShouldEventuallyProcessMailsAfterThreadSuicide() throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_ONLY_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(CommonProcessors.deliverOnlyTransport()) + .putProcessor(errorProcessor()) + .putProcessor(ProcessorConfiguration.root() + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(OneThreadSuicideMailet.class)) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(ToRepository.class) + .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString())))) + .build(temporaryFolder.newFolder()); + MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM); + + awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(CUSTOM_REPOSITORY) == 1); + } + + @Test + public void spoolerShouldNotInfinitLoopUponPermanentError() throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_ONLY_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(CommonProcessors.deliverOnlyTransport()) + .putProcessor(errorProcessor()) + .putProcessor(ProcessorConfiguration.root() + .addMailet(MailetConfiguration.builder() + .matcher(All.class) .mailet(RuntimeErrorMailet.class)))) .build(temporaryFolder.newFolder()); MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); @@ -177,7 +245,7 @@ public class MailetErrorsTest { .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() .matcher(All.class) - .mailet(RuntimeErrorMailet.class) + .mailet(RuntimeExceptionMailet.class) .addProperty("onMailetException", CUSTOM_PROCESSOR)))) .build(temporaryFolder.newFolder()); MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); @@ -198,7 +266,7 @@ public class MailetErrorsTest { .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() .matcher(All.class) - .mailet(RuntimeErrorMailet.class) + .mailet(RuntimeExceptionMailet.class) .addProperty("onMailetException", "ignore")) .addMailet(MailetConfiguration.builder() .matcher(All.class) @@ -265,7 +333,7 @@ public class MailetErrorsTest { .putProcessor(errorProcessor()) .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() - .matcher(RuntimeErrorMatcher.class) + .matcher(RuntimeExceptionMatcher.class) .mailet(NoopMailet.class)))) .build(temporaryFolder.newFolder()); MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); @@ -306,7 +374,7 @@ public class MailetErrorsTest { .putProcessor(customProcessor()) .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() - .matcher(RuntimeErrorMatcher.class) + .matcher(RuntimeExceptionMatcher.class) .mailet(NoopMailet.class) .addProperty("onMatchException", CUSTOM_PROCESSOR)))) .build(temporaryFolder.newFolder()); @@ -327,7 +395,7 @@ public class MailetErrorsTest { .putProcessor(customProcessor()) .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() - .matcher(RuntimeErrorMatcher.class) + .matcher(RuntimeExceptionMatcher.class) .mailet(Null.class) .addProperty("onMatchException", "nomatch")) .addMailet(MailetConfiguration.builder() @@ -377,7 +445,7 @@ public class MailetErrorsTest { .putProcessor(customProcessor()) .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() - .matcher(RuntimeErrorMatcher.class) + .matcher(RuntimeExceptionMatcher.class) .mailet(ToRepository.class) .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString()) .addProperty("onMatchException", "matchall")) @@ -465,7 +533,7 @@ public class MailetErrorsTest { .putProcessor(customProcessor()) .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() - .matcher(RuntimeErrorMatcher.class) + .matcher(RuntimeExceptionMatcher.class) .mailet(Null.class)))) .build(temporaryFolder.newFolder()); MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); @@ -524,7 +592,7 @@ public class MailetErrorsTest { .putProcessor(ProcessorConfiguration.root() .addMailet(MailetConfiguration.builder() .matcher(All.class) - .mailet(RuntimeErrorMailet.class)))) + .mailet(RuntimeExceptionMailet.class)))) .build(temporaryFolder.newFolder()); MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class); diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java similarity index 84% copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java index 154c5ec..1b62207 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java @@ -19,14 +19,20 @@ package org.apache.james.transport.mailets; +import java.util.concurrent.atomic.AtomicInteger; + import javax.mail.MessagingException; import org.apache.mailet.Mail; import org.apache.mailet.base.GenericMailet; -public class RuntimeErrorMailet extends GenericMailet { +public class OneRuntimeErrorMailet extends GenericMailet { + private final AtomicInteger callCount = new AtomicInteger(0); + @Override public void service(Mail mail) throws MessagingException { - throw new RuntimeException(); + if (callCount.getAndIncrement() == 0) { + throw new Error(); + } } } diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java similarity index 80% copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java index 154c5ec..1183635 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java @@ -19,14 +19,18 @@ package org.apache.james.transport.mailets; -import javax.mail.MessagingException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.mailet.Mail; import org.apache.mailet.base.GenericMailet; -public class RuntimeErrorMailet extends GenericMailet { +public class OneThreadSuicideMailet extends GenericMailet { + private final AtomicInteger callCount = new AtomicInteger(0); + @Override - public void service(Mail mail) throws MessagingException { - throw new RuntimeException(); + public void service(Mail mail) { + if (callCount.getAndIncrement() == 0) { + Thread.currentThread().interrupt(); + } } } diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java index 154c5ec..6c0a125 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java @@ -27,6 +27,6 @@ import org.apache.mailet.base.GenericMailet; public class RuntimeErrorMailet extends GenericMailet { @Override public void service(Mail mail) throws MessagingException { - throw new RuntimeException(); + throw new Error(); } } diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java similarity index 96% copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java index 154c5ec..ee65cc7 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java @@ -24,7 +24,7 @@ import javax.mail.MessagingException; import org.apache.mailet.Mail; import org.apache.mailet.base.GenericMailet; -public class RuntimeErrorMailet extends GenericMailet { +public class RuntimeExceptionMailet extends GenericMailet { @Override public void service(Mail mail) throws MessagingException { throw new RuntimeException(); diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java similarity index 96% rename from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java rename to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java index abf63d2..7bd2dd1 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java @@ -27,7 +27,7 @@ import org.apache.james.core.MailAddress; import org.apache.mailet.Mail; import org.apache.mailet.base.GenericMatcher; -public class RuntimeErrorMatcher extends GenericMatcher { +public class RuntimeExceptionMatcher extends GenericMatcher { @Override public Collection<MailAddress> match(Mail mail) throws MessagingException { throw new RuntimeException(); diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 9737865..b6ceddb 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -22,11 +22,13 @@ package org.apache.james.mailetcontainer.impl; import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; +import javax.mail.MessagingException; import org.apache.commons.configuration2.HierarchicalConfiguration; import org.apache.commons.configuration2.tree.ImmutableNode; @@ -35,11 +37,18 @@ import org.apache.james.lifecycle.api.Disposable; import org.apache.james.lifecycle.api.LifecycleUtil; import org.apache.james.mailetcontainer.api.MailProcessor; import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean; +import org.apache.james.mailrepository.api.MailRepository; +import org.apache.james.mailrepository.api.MailRepositoryPath; +import org.apache.james.mailrepository.api.MailRepositoryStore; +import org.apache.james.mailrepository.api.MailRepositoryUrl; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.metrics.api.TimeMetric; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueue.MailQueueItem; import org.apache.james.queue.api.MailQueueFactory; +import org.apache.mailet.Attribute; +import org.apache.mailet.AttributeName; +import org.apache.mailet.AttributeValue; import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +66,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class); public static final String SPOOL_PROCESSING = "spoolProcessing"; + public static final AttributeName MAIL_PROCESSING_ERROR_COUNT = AttributeName.of("mail-processing-error-count"); + public static final MailRepositoryPath ERROR_REPOSITORY_PATH = MailRepositoryPath.from("var/mail/error"); + public static final int MAXIMUM_FAILURE_COUNT = 5; /** * concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async @@ -72,21 +84,31 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB * The mail processor */ private final MailProcessor mailProcessor; + private final MailRepositoryStore mailRepositoryStore; private final MailQueueFactory<?> queueFactory; + private MailRepositoryUrl errorRepositoryURL; + private MailRepository errorRepository; private reactor.core.Disposable disposable; private MailQueue queue; @Inject - public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) { + public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> queueFactory) { this.metricFactory = metricFactory; this.mailProcessor = mailProcessor; + this.mailRepositoryStore = mailRepositoryStore; this.queueFactory = queueFactory; } @Override public void configure(HierarchicalConfiguration<ImmutableNode> config) { concurrencyLevel = config.getInt("threads", 100); + errorRepositoryURL = Optional.ofNullable(config.getString("errorRepository", null)) + .map(MailRepositoryUrl::from) + .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol( + mailRepositoryStore.defaultProtocol() + .orElseThrow(() -> new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0")), + ERROR_REPOSITORY_PATH)); } /** @@ -99,6 +121,12 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel)); disposable = run(queue); LOGGER.info("Spooler started"); + try { + this.errorRepository = mailRepositoryStore.select(errorRepositoryURL); + } catch (MailRepositoryStore.MailRepositoryStoreException e) { + throw new RuntimeException(e); + } + } private reactor.core.Disposable run(MailQueue queue) { @@ -135,16 +163,56 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB } queueItem.done(true); } catch (Exception e) { - try { - queueItem.done(false); - } catch (MailQueue.MailQueueException ex) { - throw new RuntimeException(e); - } + handleError(queueItem, mail, e); } finally { LOGGER.debug("==== End processing mail {} ====", mail.getName()); } } + private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) { + int failureCount = computeFailureCount(mail); + + try { + if (failureCount > MAXIMUM_FAILURE_COUNT) { + LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, errorRepositoryURL.asString()); + storeInErrorRepository(queueItem); + } else { + LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException); + reEnqueue(queueItem, failureCount); + } + } catch (Exception nestedE) { + LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), nestedE); + nack(queueItem, processingException); + } + } + + private int computeFailureCount(Mail mail) { + Integer previousFailureCount = mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT) + .flatMap(attribute -> attribute.getValue().valueAs(Integer.class)) + .orElse(0); + return previousFailureCount + 1; + } + + private void reEnqueue(MailQueueItem queueItem, int failureCount) throws MailQueue.MailQueueException { + Mail mail = queueItem.getMail(); + mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(failureCount))); + queue.enQueue(mail); + queueItem.done(true); + } + + private void storeInErrorRepository(MailQueueItem queueItem) throws MessagingException { + errorRepository.store(queueItem.getMail()); + queueItem.done(true); + } + + private void nack(MailQueueItem queueItem, Exception processingException) { + try { + queueItem.done(false); + } catch (MailQueue.MailQueueException ex) { + throw new RuntimeException(processingException); + } + } + /** * The dispose operation is called at the end of a components lifecycle. * Instances of this class use this method to release and destroy any diff --git a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java deleted file mode 100644 index 9485686..0000000 --- a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailetcontainer.impl; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; -import static org.awaitility.Duration.TEN_SECONDS; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.Consumer; - -import javax.mail.MessagingException; - -import org.apache.commons.configuration2.plist.PropertyListConfiguration; -import org.apache.james.mailetcontainer.api.MailProcessor; -import org.apache.james.metrics.api.MetricFactory; -import org.apache.james.metrics.api.TimeMetric; -import org.apache.james.queue.api.MailQueue; -import org.apache.james.queue.api.MailQueueFactory; -import org.apache.mailet.Mail; -import org.apache.mailet.base.test.FakeMail; -import org.awaitility.Awaitility; -import org.awaitility.core.ConditionFactory; -import org.junit.jupiter.api.Test; - -import reactor.core.publisher.UnicastProcessor; - -class JamesMailSpoolerTest { - private static final ConditionFactory CALMLY_AWAIT = Awaitility - .with().pollInterval(ONE_HUNDRED_MILLISECONDS) - .and().pollDelay(ONE_HUNDRED_MILLISECONDS) - .await() - .atMost(TEN_SECONDS); - - @Test - void thrownExceptionShouldAckTheItem() throws MessagingException { - MetricFactory metricFactory = mock(MetricFactory.class); - when(metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING)).thenAnswer(ignored -> mock(TimeMetric.class)); - MailQueueFactory<?> queueFactory = mock(MailQueueFactory.class); - MailProcessor mailProcessor = mock(MailProcessor.class); - JamesMailSpooler spooler = new JamesMailSpooler(metricFactory, mailProcessor, queueFactory); - - UnicastProcessor<MockedMailQueueItem> workQueue = UnicastProcessor.create(); - MockedMailQueueItem item = new MockedMailQueueItem(); - item.addCallback(isDone -> { - if (!isDone) { - workQueue.onNext(item); - } - }); - MailQueue queue = mock(MailQueue.class); - workQueue.onNext(item); - when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone)); - when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue); - - doThrow(new RuntimeException("Arbitrary failure")) - .doNothing() - .when(mailProcessor).service(any()); - - PropertyListConfiguration configuration = new PropertyListConfiguration(); - configuration.addProperty("threads", 2); - spooler.configure(configuration); - spooler.init(); - - CALMLY_AWAIT.until(() -> item.getDones().size() == 2); - - assertThat(item.getDones()).containsExactly(false, true); - } - - @Test - void threadSuicideShouldAckTheItem() throws MessagingException { - MetricFactory metricFactory = mock(MetricFactory.class); - when(metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING)).thenAnswer(ignored -> mock(TimeMetric.class)); - MailQueueFactory<?> queueFactory = mock(MailQueueFactory.class); - MailProcessor mailProcessor = mock(MailProcessor.class); - JamesMailSpooler spooler = new JamesMailSpooler(metricFactory, mailProcessor, queueFactory); - - UnicastProcessor<MockedMailQueueItem> workQueue = UnicastProcessor.create(); - MockedMailQueueItem item = new MockedMailQueueItem(); - item.addCallback(isDone -> { - if (!isDone) { - workQueue.onNext(item); - } - }); - MailQueue queue = mock(MailQueue.class); - workQueue.onNext(item); - when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone)); - when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue); - - doAnswer(ignored -> { - Thread.currentThread().interrupt(); - return null; - }).doNothing().when(mailProcessor).service(any()); - - PropertyListConfiguration configuration = new PropertyListConfiguration(); - configuration.addProperty("threads", 2); - spooler.configure(configuration); - spooler.init(); - - CALMLY_AWAIT.until(() -> item.getDones().size() == 2); - - assertThat(item.getDones()).containsExactly(false, true); - } - - private class MockedMailQueueItem implements MailQueue.MailQueueItem { - private final Collection<Boolean> dones; - private Consumer<Boolean> doneCallback; - - private MockedMailQueueItem() { - dones = new ArrayList<>(); - } - - @Override - public Mail getMail() { - try { - return FakeMail.defaultFakeMail(); - } catch (MessagingException e) { - throw new RuntimeException(e); - } - } - - @Override - public void done(boolean success) throws MailQueue.MailQueueException { - dones.add(success); - doneCallback.accept(success); - } - - public Collection<Boolean> getDones() { - return dones; - } - - public boolean isNotDone() { - return !dones.contains(true); - } - - public void addCallback(Consumer<Boolean> callback) { - doneCallback = callback; - } - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
