This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 97540d77a05b0e1629726243e43912b8dc6ac867 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Thu May 20 11:52:15 2021 +0700 JAMES-3588 JamesMailSpooler should defend itself against mutability in the face of failed processing The mail processing can alter the recipients, and the errored mail was never redelivered to some addresses, duplicated for others. Restoring original recipients allows for predictability here and prevents message loss. --- .../org/apache/james/mailets/MailetErrorsTest.java | 53 +++++++++++++++++++++- .../mailetcontainer/impl/JamesMailSpooler.java | 10 +++- .../org/apache/james/utils/SMTPMessageSender.java | 9 ++++ 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java index 15fd7ec..700d212 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java @@ -24,7 +24,10 @@ import static org.apache.james.mailets.configuration.CommonProcessors.ERROR_REPO import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN; import static org.apache.james.mailets.configuration.Constants.FROM; import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP; +import static org.apache.james.mailets.configuration.Constants.PASSWORD; +import static org.apache.james.mailets.configuration.Constants.RECIPIENT; import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; @@ -33,6 +36,7 @@ import org.apache.james.mailets.configuration.MailetConfiguration; import org.apache.james.mailets.configuration.MailetContainer; import org.apache.james.mailets.configuration.ProcessorConfiguration; import org.apache.james.mailrepository.api.MailRepositoryUrl; +import org.apache.james.modules.protocols.ImapGuiceProbe; import org.apache.james.modules.protocols.SmtpGuiceProbe; import org.apache.james.transport.mailets.ErrorMailet; import org.apache.james.transport.mailets.ErrorMatcher; @@ -41,6 +45,7 @@ import org.apache.james.transport.mailets.NoClassDefFoundErrorMatcher; import org.apache.james.transport.mailets.NoopMailet; import org.apache.james.transport.mailets.Null; import org.apache.james.transport.mailets.OneRuntimeErrorMailet; +import org.apache.james.transport.mailets.OneRuntimeExceptionMailet; import org.apache.james.transport.mailets.OneThreadSuicideMailet; import org.apache.james.transport.mailets.RuntimeErrorMailet; import org.apache.james.transport.mailets.RuntimeExceptionMailet; @@ -48,19 +53,26 @@ import org.apache.james.transport.mailets.RuntimeExceptionMatcher; import org.apache.james.transport.mailets.ToRepository; import org.apache.james.transport.matchers.All; import org.apache.james.transport.matchers.HasException; +import org.apache.james.transport.matchers.RecipientIs; +import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.MailRepositoryProbeImpl; import org.apache.james.utils.SMTPMessageSender; -import org.junit.jupiter.api.Test; +import org.apache.james.utils.TestIMAPClient; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; +import com.google.common.collect.ImmutableList; + class MailetErrorsTest { public static final String CUSTOM_PROCESSOR = "custom"; public static final MailRepositoryUrl CUSTOM_REPOSITORY = MailRepositoryUrl.from("memory://var/mail/custom/"); @RegisterExtension public SMTPMessageSender smtpMessageSender = new SMTPMessageSender(DEFAULT_DOMAIN); + @RegisterExtension + public TestIMAPClient testIMAPClient = new TestIMAPClient(); private TemporaryJamesServer jamesServer; @@ -132,6 +144,45 @@ class MailetErrorsTest { } @Test + void retryShouldSucceedUponSplittedMail(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(OneRuntimeErrorMailet.class) + .addProperty("onMailetException", "propagate")) + .addMailetsFrom(CommonProcessors.transport())) + .putProcessor(errorProcessor()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class).fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(5000); + + assertThat(testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(TestIMAPClient.INBOX) + .awaitMessage(awaitAtMostOneMinute) + .getMessageCount(TestIMAPClient.INBOX)).isEqualTo(1); + + assertThat(testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(FROM, PASSWORD) + .select(TestIMAPClient.INBOX) + .awaitMessage(awaitAtMostOneMinute) + .getMessageCount(TestIMAPClient.INBOX)).isGreaterThanOrEqualTo(1); + } + + @Test void mailetProcessorsShouldHandleRuntimeException(@TempDir File temporaryFolder) throws Exception { jamesServer = TemporaryJamesServer.builder() .withBase(SMTP_ONLY_MODULE) diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 4b7935f..87a4ac3 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -30,6 +30,7 @@ import javax.mail.MessagingException; import org.apache.commons.configuration2.HierarchicalConfiguration; import org.apache.commons.configuration2.tree.ImmutableNode; +import org.apache.james.core.MailAddress; import org.apache.james.lifecycle.api.Configurable; import org.apache.james.lifecycle.api.Disposable; import org.apache.james.lifecycle.api.LifecycleUtil; @@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -110,6 +112,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private void performProcessMail(MailQueueItem queueItem, Mail mail) { LOGGER.debug("==== Begin processing mail {} ====", mail.getName()); + ImmutableList<MailAddress> originalRecipients = ImmutableList.copyOf(mail.getRecipients()); try { mailProcessor.service(mail); @@ -118,15 +121,18 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB } queueItem.done(true); } catch (Exception e) { - handleError(queueItem, mail, e); + handleError(queueItem, mail, originalRecipients, e); } finally { LOGGER.debug("==== End processing mail {} ====", mail.getName()); } } - private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) { + private void handleError(MailQueueItem queueItem, Mail mail, ImmutableList<MailAddress> originalRecipients, Exception processingException) { int failureCount = computeFailureCount(mail); + // Restore original recipients + queueItem.getMail().setRecipients(originalRecipients); + try { if (failureCount > MAXIMUM_FAILURE_COUNT) { LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, configuration.getErrorRepositoryURL().asString()); diff --git a/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java b/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java index d17c2c1..05e80e1 100644 --- a/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java +++ b/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java @@ -102,6 +102,15 @@ public class SMTPMessageSender extends ExternalResource implements Closeable, Af return sendMessageWithHeaders(from, ImmutableList.of(recipient), message); } + public SMTPMessageSender sendMessage(String from, List<String> recipients) throws IOException { + String message = "FROM: " + from + "\r\n" + + "subject: test\r\n" + + "\r\n" + + "content\r\n" + + ".\r\n"; + return sendMessageWithHeaders(from, recipients, message); + } + public SMTPMessageSender sendMessageNoBracket(String from, String recipient) throws IOException { doHelo(); doSetSender(from); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org