This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fd3294e9f992048355a531dc0a815b8fc88adbc1 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 22 00:04:06 2021 +0700 JAMES-3589 Reasonable tests regarding mailet container execution We expect: - A mailet to be executed once for each recipients upon partial match and not several time - A terminating mailet should not abort processing of previously split mails - Mail modifications upon partial match should be retained - After a partial match, downstream recipients should not be executed twice for a given recipient The tests pass b replacing Camel logic by simple Java code: ``` public class CamelMailetProcessor extends AbstractStateMailetProcessor implements CamelContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(CamelMailetProcessor.class); private CamelContext context; private ProducerTemplate producerTemplate; private final MetricFactory metricFactory; private List<MatcherMailetPair> pairs; public CamelMailetProcessor(MetricFactory metricFactory) { this.metricFactory = metricFactory; } @Override public void service(Mail mail) throws MessagingException { pairs.stream() .reduce(ImmutableList.of(mail), (mails, pair) -> { if (mails.size() > 0) { return executePair(mails, pair); } return ImmutableList.of(); }, (a, b) -> { throw new NotImplementedException("Fold left implementation. Should never be called"); }); } private ImmutableList<Mail> executePair(ImmutableList<Mail> mails, MatcherMailetPair pair) { MatcherSplitter matcherSplitter = new MatcherSplitter(metricFactory, this, pair); ImmutableList<Mail> afterMatching = mails.stream() .flatMap(Throwing.<Mail, Stream<Mail>>function(m -> matcherSplitter.split(m).stream()).sneakyThrow()) .collect(Guavate.toImmutableList()); afterMatching .stream().filter(mail -> mail.removeAttribute(MATCHER_MATCHED_ATTRIBUTE).isPresent()) .forEach(Throwing.<Mail>consumer(m -> new CamelProcessor(metricFactory, this, pair.getMailet()) .processMail(m)).sneakyThrow()); afterMatching.stream() .filter(mail -> !mail.getState().equals(getState())) .filter(mail -> !mail.getState().equals(Mail.GHOST)) .forEach(Throwing.consumer(this::toProcessor).sneakyThrow()); return afterMatching.stream() .filter(mail -> mail.getState().equals(getState())) .collect(Guavate.toImmutableList()); } // Few boiler plate methods } ``` --- .../apache/james/mailets/flow/AddRecipient.java | 40 + .../james/mailets/flow/ClearRecipientsMailet.java | 32 + .../mailets/flow/CollectMailAttributeMailet.java | 48 + .../mailets/flow/CollectingExecutionMailet.java | 44 + .../mailets/flow/CollectingExecutionMailetBis.java | 44 + .../mailets/flow/CountingExecutionMailet.java | 42 + .../mailets/flow/CountingExecutionMailetBis.java | 42 + .../flow/CountingExecutionTerminatingMailet.java | 45 + .../james/mailets/flow/EnsureNotDisposed.java | 51 + .../james/mailets/flow/ExecutionFlowTest.java | 1077 ++++++++++++++++++++ .../flow/FirstRecipientCountingExecutions.java | 49 + .../java/org/apache/james/mailets/flow/None.java | 34 + .../apache/james/mailets/flow/NoneWithNull.java | 33 + 13 files changed, 1581 insertions(+) diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/AddRecipient.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/AddRecipient.java new file mode 100644 index 0000000..d0acc4f --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/AddRecipient.java @@ -0,0 +1,40 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import static org.apache.james.mailets.configuration.Constants.RECIPIENT2; + +import javax.mail.internet.AddressException; + +import org.apache.james.core.MailAddress; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +import com.google.common.collect.ImmutableList; + +public class AddRecipient extends GenericMailet { + @Override + public void service(Mail mail) throws AddressException { + mail.setRecipients(ImmutableList.<MailAddress>builder() + .add(new MailAddress(RECIPIENT2)) + .addAll(mail.getRecipients()) + .build()); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ClearRecipientsMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ClearRecipientsMailet.java new file mode 100644 index 0000000..100751e --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ClearRecipientsMailet.java @@ -0,0 +1,32 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +import com.google.common.collect.ImmutableList; + +public class ClearRecipientsMailet extends GenericMailet { + @Override + public void service(Mail mail) { + mail.setRecipients(ImmutableList.of()); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectMailAttributeMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectMailAttributeMailet.java new file mode 100644 index 0000000..66ecb79 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectMailAttributeMailet.java @@ -0,0 +1,48 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; + +import org.apache.mailet.AttributeName; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; + +public class CollectMailAttributeMailet extends GenericMailet { + public static final String MY_ATTRIBUTE = "myAttribute"; + private static final ConcurrentLinkedDeque<String> encounteredAttributes = new ConcurrentLinkedDeque<>(); + + public static void reset() { + encounteredAttributes.clear(); + } + + public static List<String> encounteredAttributes() { + return ImmutableList.copyOf(encounteredAttributes); + } + + @Override + public void service(Mail mail) { + mail.getAttribute(AttributeName.of(MY_ATTRIBUTE)) + .ifPresent(attribute -> encounteredAttributes.add((String) attribute.getValue() + .value())); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectingExecutionMailet.java new file mode 100644 index 0000000..eea9d07 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectingExecutionMailet.java @@ -0,0 +1,44 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; + +import org.apache.james.core.MailAddress; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +public class CollectingExecutionMailet extends GenericMailet { + private static final ConcurrentLinkedDeque<MailAddress> executedFor = new ConcurrentLinkedDeque<>(); + + public static void reset() { + executedFor.clear(); + } + + public static List<MailAddress> executionFor() { + return org.testcontainers.shaded.com.google.common.collect.ImmutableList.copyOf(executedFor); + } + + @Override + public void service(Mail mail) { + executedFor.addAll(mail.getRecipients()); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectingExecutionMailetBis.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectingExecutionMailetBis.java new file mode 100644 index 0000000..3b1e158 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CollectingExecutionMailetBis.java @@ -0,0 +1,44 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; + +import org.apache.james.core.MailAddress; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +public class CollectingExecutionMailetBis extends GenericMailet { + private static final ConcurrentLinkedDeque<MailAddress> executedFor = new ConcurrentLinkedDeque<>(); + + public static void reset() { + executedFor.clear(); + } + + public static List<MailAddress> executionFor() { + return org.testcontainers.shaded.com.google.common.collect.ImmutableList.copyOf(executedFor); + } + + @Override + public void service(Mail mail) { + executedFor.addAll(mail.getRecipients()); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionMailet.java new file mode 100644 index 0000000..955593d --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionMailet.java @@ -0,0 +1,42 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +public class CountingExecutionMailet extends GenericMailet { + private static final AtomicLong executionCount = new AtomicLong(); + + public static void reset() { + executionCount.set(0L); + } + + public static long executionCount() { + return executionCount.get(); + } + + @Override + public void service(Mail mail) { + executionCount.incrementAndGet(); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionMailetBis.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionMailetBis.java new file mode 100644 index 0000000..1543c93 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionMailetBis.java @@ -0,0 +1,42 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +public class CountingExecutionMailetBis extends GenericMailet { + private static final AtomicLong executionCount = new AtomicLong(); + + public static void reset() { + executionCount.set(0L); + } + + public static long executionCount() { + return executionCount.get(); + } + + @Override + public void service(Mail mail) { + executionCount.incrementAndGet(); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionTerminatingMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionTerminatingMailet.java new file mode 100644 index 0000000..e1d4ea1 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/CountingExecutionTerminatingMailet.java @@ -0,0 +1,45 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import static org.apache.mailet.Mail.GHOST; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +public class CountingExecutionTerminatingMailet extends GenericMailet { + private static final AtomicLong executionCount = new AtomicLong(); + + public static void reset() { + executionCount.set(0L); + } + + public static long executionCount() { + return executionCount.get(); + } + + @Override + public void service(Mail mail) { + executionCount.incrementAndGet(); + mail.setState(GHOST); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/EnsureNotDisposed.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/EnsureNotDisposed.java new file mode 100644 index 0000000..8a395f9 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/EnsureNotDisposed.java @@ -0,0 +1,51 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicLong; + +import javax.mail.MessagingException; + +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +public class EnsureNotDisposed extends GenericMailet { + private static final AtomicLong executionCount = new AtomicLong(); + + public static void reset() { + executionCount.set(0L); + } + + public static long executionCount() { + return executionCount.get(); + } + + @Override + public void service(Mail mail) throws MessagingException { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThat(mail.getMessage()).isNotNull(); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java new file mode 100644 index 0000000..55b2fbe --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java @@ -0,0 +1,1077 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import static org.apache.james.MemoryJamesServerMain.SMTP_AND_IMAP_MODULE; +import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN; +import static org.apache.james.mailets.configuration.Constants.FROM; +import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP; +import static org.apache.james.mailets.configuration.Constants.PASSWORD; +import static org.apache.james.mailets.configuration.Constants.RECIPIENT; +import static org.apache.james.mailets.configuration.Constants.RECIPIENT2; +import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute; +import static org.apache.james.utils.TestIMAPClient.INBOX; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; + +import org.apache.james.core.MailAddress; +import org.apache.james.mailets.TemporaryJamesServer; +import org.apache.james.mailets.configuration.CommonProcessors; +import org.apache.james.mailets.configuration.MailetConfiguration; +import org.apache.james.mailets.configuration.MailetContainer; +import org.apache.james.mailets.configuration.ProcessorConfiguration; +import org.apache.james.modules.protocols.ImapGuiceProbe; +import org.apache.james.modules.protocols.SmtpGuiceProbe; +import org.apache.james.transport.mailets.NoopMailet; +import org.apache.james.transport.mailets.Null; +import org.apache.james.transport.mailets.PostmasterAlias; +import org.apache.james.transport.mailets.SetMailAttribute; +import org.apache.james.transport.mailets.ToProcessor; +import org.apache.james.transport.matchers.All; +import org.apache.james.transport.matchers.RecipientIs; +import org.apache.james.transport.matchers.RelayLimit; +import org.apache.james.utils.DataProbeImpl; +import org.apache.james.utils.SMTPMessageSender; +import org.apache.james.utils.SpoolerProbe; +import org.apache.james.utils.TestIMAPClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import com.google.common.collect.ImmutableList; + +public class ExecutionFlowTest { + @RegisterExtension + public SMTPMessageSender smtpMessageSender = new SMTPMessageSender(DEFAULT_DOMAIN); + @RegisterExtension + public TestIMAPClient testIMAPClient = new TestIMAPClient(); + + private TemporaryJamesServer jamesServer; + + @BeforeEach + public void test() { + CountingExecutionMailet.reset(); + CountingExecutionMailetBis.reset(); + CountingExecutionTerminatingMailet.reset(); + CollectingExecutionMailet.reset(); + CollectingExecutionMailetBis.reset(); + CollectMailAttributeMailet.reset(); + FirstRecipientCountingExecutions.reset(); + } + + @AfterEach + public void tearDown() { + if (jamesServer != null) { + jamesServer.shutdown(); + } + } + + @Test + public void partialMatchShouldLeadToSingleExecutionOfMailet(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + } + + @Test + public void partialMatchShouldLeadToSingleExecutionOfMatcher(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(FirstRecipientCountingExecutions.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(FirstRecipientCountingExecutions.executionCount()).isEqualTo(1); + } + + @Test + public void partialMatchShouldLeadToSingleExecutionOfUpstreamMailet(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + } + + @Test + public void partialMatchShouldLeadToSingleExecutionOfUpstreamRootMailets(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor( ProcessorConfiguration.root() + .enableJmx(false) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(PostmasterAlias.class)) + .addMailet(MailetConfiguration.builder() + .matcher(RelayLimit.class) + .matcherCondition("30") + .mailet(Null.class)) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.TO_TRANSPORT))) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + } + + @Test + public void mutationsOfDownstreamMailetsShouldNotAffectUpStreamMailets(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectMailAttributeMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(SetMailAttribute.class) + .addProperty(CollectMailAttributeMailet.MY_ATTRIBUTE, "value1") + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CollectMailAttributeMailet.encounteredAttributes()).isEmpty(); + } + + @Test + public void mutationsOfDownstreamMailetsShouldNotAffectUpStreamMailetsUponSplit(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectMailAttributeMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(SetMailAttribute.class) + .addProperty(CollectMailAttributeMailet.MY_ATTRIBUTE, "value1") + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CollectMailAttributeMailet.encounteredAttributes()).isEmpty(); + } + + @Test + public void totalMatchShouldNotSplitMail(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(2) + .containsOnly(new MailAddress(FROM), new MailAddress(RECIPIENT)); + } + + @Test + public void noMatchShouldNotExecuteMailet(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(None.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(0); + } + + @Test + public void noMatchWithNullShouldNotExecuteMailet(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(NoneWithNull.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(0); + } + + @Test + public void noMatchShouldNotSplitMailet(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(None.class) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(2) + .containsOnly(new MailAddress(FROM), new MailAddress(RECIPIENT)); + } + + @Test + public void noMatchWithNullShouldNotSplitMailet(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(NoneWithNull.class) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(2) + .containsOnly(new MailAddress(FROM), new MailAddress(RECIPIENT)); + } + + @Test + public void nullMailetShouldAbortProcessing(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(Null.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(0); + } + + @Test + public void nullMailetShouldAbortProcessingOnlOfMatchedEmails(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(Null.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(1) + .containsOnly(new MailAddress(FROM)); + } + + @Test + public void clearRecipientsMailetShouldAbortProcessing(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(ClearRecipientsMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(0); + } + + @Test + public void clearRecipientsMailetShouldAbortProcessingOnlOfMatchedEmails(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(ClearRecipientsMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(1); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(1) + .containsOnly(new MailAddress(FROM)); + } + + @Test + public void mailetCanEditRecipients(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(AddRecipient.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(3) + .containsOnly(new MailAddress(FROM), new MailAddress(RECIPIENT), new MailAddress(RECIPIENT2)); + } + + @Test + public void toProcessorShouldSwitchExecutingProcessor(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(ToProcessor.class) + .addProperty("processor", "custom") + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(ProcessorConfiguration.builder() + .state("custom") + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailetBis.class) + .build())) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()).isEqualTo(0); + assertThat(CountingExecutionMailetBis.executionCount()).isEqualTo(1); + } + + @Test + public void toProcessorShouldSupportPartialMatches(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(ToProcessor.class) + .addProperty("processor", "custom") + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(ProcessorConfiguration.builder() + .state("custom") + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailetBis.class) + .build())) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(1) + .containsOnly(new MailAddress(FROM)); + assertThat(CollectingExecutionMailetBis.executionFor()) + .hasSize(1) + .containsOnly(new MailAddress(RECIPIENT)); + } + + @Test + public void toProcessorSplitShouldNotDisposeContent(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(ToProcessor.class) + .addProperty("processor", "custom") + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(EnsureNotDisposed.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(ProcessorConfiguration.builder() + .state("custom") + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(EnsureNotDisposed.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailetBis.class) + .build())) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(1) + .containsOnly(new MailAddress(FROM)); + assertThat(CollectingExecutionMailetBis.executionFor()) + .hasSize(1) + .containsOnly(new MailAddress(RECIPIENT)); + } + + @Test + public void toProcessorShouldSendToErrorWhenNotFound(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(ToProcessor.class) + .addProperty("processor", "custom") + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(ProcessorConfiguration.error() + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class))) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()) + .isEqualTo(1); + } + + @Test + public void splitShouldNotDisposeContent(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(EnsureNotDisposed.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionMailet.executionCount()) + .isEqualTo(2); + } + + @Test + public void partialMatchShouldLeadToExecutionOfDownStreamMailetsForEachSplitedMails(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CountingExecutionTerminatingMailet.class) + .build())) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CountingExecutionTerminatingMailet.executionCount()).isEqualTo(2); + } + + @Test + public void emailModificationsShouldBePreservedOnPartialMatch(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .matcherCondition(RECIPIENT) + .mailet(SetMailAttribute.class) + .addProperty(CollectMailAttributeMailet.MY_ATTRIBUTE, "value1") + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(SetMailAttribute.class) + .addProperty(CollectMailAttributeMailet.MY_ATTRIBUTE, "value2") + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectMailAttributeMailet.class) + .build())) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session + awaitAtMostOneMinute.untilAsserted(() -> assertThat( + jamesServer.getProbe(SpoolerProbe.class).processingFinished()) + .isTrue()); + assertThat(CollectMailAttributeMailet.encounteredAttributes()) + .hasSize(2) + .containsOnly("value1", "value2"); + } + + @Test + public void matcherSplitShouldNotDuplicateRecipients(@TempDir File temporaryFolder) throws Exception { + jamesServer = TemporaryJamesServer.builder() + .withBase(SMTP_AND_IMAP_MODULE) + .withMailetContainer(MailetContainer.builder() + .putProcessor(ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.BCC_STRIPPER) + .addMailet(MailetConfiguration.builder() + .matcher(RecipientIs.class) + .matcherCondition(RECIPIENT) + .mailet(NoopMailet.class) + .build()) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(CollectingExecutionMailet.class) + .build()) + .addMailet(MailetConfiguration.LOCAL_DELIVERY)) + .putProcessor(CommonProcessors.error()) + .putProcessor(CommonProcessors.root())) + .build(temporaryFolder); + jamesServer.start(); + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(FROM, PASSWORD) + .addUser(RECIPIENT, PASSWORD); + + smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(FROM, PASSWORD) + .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(RECIPIENT, PASSWORD) + .select(INBOX) + .awaitMessage(awaitAtMostOneMinute); + assertThat(CollectingExecutionMailet.executionFor()) + .hasSize(2) + .containsOnly(new MailAddress(FROM), new MailAddress(RECIPIENT)); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/FirstRecipientCountingExecutions.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/FirstRecipientCountingExecutions.java new file mode 100644 index 0000000..8247528 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/FirstRecipientCountingExecutions.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.james.core.MailAddress; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMatcher; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; + +public class FirstRecipientCountingExecutions extends GenericMatcher { + private static final AtomicLong executionCount = new AtomicLong(); + + public static void reset() { + executionCount.set(0L); + } + + public static long executionCount() { + return executionCount.get(); + } + + @Override + public Collection<MailAddress> match(Mail mail) { + executionCount.incrementAndGet(); + return mail.getRecipients().stream() + .findFirst() + .map(ImmutableList::of) + .orElse(ImmutableList.<MailAddress>of()); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/None.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/None.java new file mode 100644 index 0000000..ade967b --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/None.java @@ -0,0 +1,34 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.Collection; + +import org.apache.james.core.MailAddress; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMatcher; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; + +public class None extends GenericMatcher { + @Override + public Collection<MailAddress> match(Mail mail) { + return ImmutableList.of(); + } +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/NoneWithNull.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/NoneWithNull.java new file mode 100644 index 0000000..98b9baa --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/NoneWithNull.java @@ -0,0 +1,33 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailets.flow; + +import java.util.Collection; + +import org.apache.james.core.MailAddress; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMatcher; + +public class NoneWithNull extends GenericMatcher { + @Override + public Collection<MailAddress> match(Mail mail) { + return null; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
