This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f230fb434f4c70d38497b1ec82cc3c870e8eab5c Author: Tung Van TRAN <[email protected]> AuthorDate: Fri Feb 11 12:41:16 2022 +0700 JAMES-3711 Implement a Requeue mailet --- .../apache/james/transport/mailets/Requeue.java | 94 ++++++++ .../james/transport/mailets/RequeueTest.java | 257 +++++++++++++++++++++ 2 files changed, 351 insertions(+) diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java new file mode 100644 index 0000000..59525a1 --- /dev/null +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java @@ -0,0 +1,94 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets; + +import static org.apache.mailet.Mail.GHOST; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Optional; + +import javax.inject.Inject; +import javax.mail.MessagingException; + +import org.apache.james.lifecycle.api.LifecycleUtil; +import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.util.DurationParser; +import org.apache.mailet.Mail; +import org.apache.mailet.base.GenericMailet; + +import com.github.fge.lambdas.Throwing; +import com.google.common.base.Preconditions; + +public class Requeue extends GenericMailet { + private final MailQueueFactory<?> mailQueueFactory; + + private MailQueue mailQueue; + private Optional<Duration> delayDuration; + private String processor; + private boolean consume; + + @Inject + public Requeue(MailQueueFactory<?> mailQueueFactory) { + this.mailQueueFactory = mailQueueFactory; + } + + @Override + public void init() throws MessagingException { + MailQueueName mailQueueName = Optional.ofNullable(getInitParameter("queue")) + .map(MailQueueName::of).orElse(MailQueueFactory.SPOOL); + + mailQueue = mailQueueFactory.createQueue(mailQueueName); + delayDuration = Optional.ofNullable(getInitParameter("delay")) + .map(delayValue -> DurationParser.parse(delayValue, ChronoUnit.SECONDS)); + processor = Optional.ofNullable(getInitParameter("processor")) + .orElse(Mail.DEFAULT); + + consume = getInitParameter("consume", true); + + Preconditions.checkArgument(delayDuration.isEmpty() || !delayDuration.get().isNegative(), + "Duration should be non-negative"); + } + + @Override + public void service(Mail mail) throws MessagingException { + if (consume) { + enqueue(mail); + mail.setState(GHOST); + } else { + Mail newMail = null; + try { + newMail = mail.duplicate(); + enqueue(newMail); + } finally { + LifecycleUtil.dispose(newMail); + } + } + } + + private void enqueue(Mail mail) { + mail.setState(processor); + delayDuration.ifPresentOrElse( + Throwing.consumer(delay -> mailQueue.enQueue(mail, delay)), + Throwing.runnable(() -> mailQueue.enQueue(mail)).sneakyThrow()); + } +} diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/RequeueTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/RequeueTest.java new file mode 100644 index 0000000..84cef00 --- /dev/null +++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/RequeueTest.java @@ -0,0 +1,257 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +import javax.mail.MessagingException; + +import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; +import org.apache.james.queue.memory.MemoryMailQueueFactory; +import org.apache.mailet.Mail; +import org.apache.mailet.MailetConfig; +import org.apache.mailet.base.test.FakeMail; +import org.apache.mailet.base.test.FakeMailetConfig; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import reactor.core.publisher.Flux; + +public class RequeueTest { + private Mail mailSample; + private MailQueueFactory<?> mailQueueFactory; + private MailQueue spoolQueue; + + @BeforeEach + void beforeEach() throws MessagingException { + mailSample = FakeMail.builder() + .name("mail1") + .sender("[email protected]") + .recipients("[email protected]") + .state("newState") + .build(); + + mailQueueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); + spoolQueue = mailQueueFactory.createQueue(MailQueueFactory.SPOOL); + } + + @AfterEach + void afterEach() throws IOException { + spoolQueue.close(); + } + + private Requeue testee(MailetConfig mailetConfig) throws MessagingException { + Requeue requeue = new Requeue(mailQueueFactory); + requeue.init(mailetConfig); + return requeue; + } + + @Test + void mailetShouldRequeueMail() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .build()); + mailet.service(mailSample); + + assertThat(mailSample.getState()).isEqualTo("ghost"); + + assertThat(Flux.from(spoolQueue.deQueue()) + .next() + .block()) + .isNotNull(); + } + + @Test + void requeueShouldShouldAppliedProcessorWhenConfigureIsProvided() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("processor", "processor2") + .build()); + + mailet.service(mailSample); + + MailQueue.MailQueueItem mailQueueItem = Flux.from(spoolQueue.deQueue()) + .next() + .block(); + assertThat(mailQueueItem).isNotNull(); + assertThat(mailQueueItem.getMail().getState()).isEqualTo("processor2"); + } + + @Test + void requeueShouldShouldAppliedDefaultProcessorWhenConfigureIsNotProvided() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .build()); + + mailet.service(mailSample); + + MailQueue.MailQueueItem mailQueueItem = Flux.from(spoolQueue.deQueue()) + .next() + .block(); + assertThat(mailQueueItem).isNotNull(); + assertThat(mailQueueItem.getMail().getState()).isEqualTo(Mail.DEFAULT); + } + + @Test + void requeueShouldShouldNotConsumeWhenConfigureIsProvided() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("consume", "false") + .build()); + + mailet.service(mailSample); + + assertThat(mailSample.getState()).isEqualTo("newState"); + } + + @Test + void requeueShouldShouldConsumeProcessorByDefault() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .build()); + + mailet.service(mailSample); + + assertThat(mailSample.getState()).isEqualTo("ghost"); + } + + @Test + void requeueShouldShouldAppliedDelayWhenConfigureIsProvided() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("delay", "2s") + .build()); + + Instant enqueueTime = Instant.now(); + mailet.service(mailSample); + + AtomicReference<Instant> dequeueTime = new AtomicReference<>(); + MailQueue.MailQueueItem mailQueueItem = Flux.from(spoolQueue.deQueue()) + .next() + .doOnNext(any -> dequeueTime.set(Instant.now())) + .block(); + + assertThat(mailQueueItem).isNotNull(); + assertThat(Duration.between(enqueueTime, dequeueTime.get()).abs().toSeconds()).isGreaterThanOrEqualTo(2); + } + + @Test + void requeueShouldShouldNotAppliedDelayWhenConfigureIsNotProvided() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .build()); + + Instant enqueueTime = Instant.now(); + mailet.service(mailSample); + + AtomicReference<Instant> dequeueTime = new AtomicReference<>(); + MailQueue.MailQueueItem mailQueueItem = Flux.from(spoolQueue.deQueue()) + .next() + .doOnNext(any -> dequeueTime.set(Instant.now())) + .block(); + + assertThat(mailQueueItem).isNotNull(); + assertThat(Duration.between(enqueueTime, dequeueTime.get()).abs().toSeconds()).isEqualTo(0); + } + + @Test + void requeueShouldUseDefaultQueueWhenConfigIsNotProvided() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .build()); + + mailet.service(mailSample); + + assertThat(Flux.from(spoolQueue.deQueue()) + .next() + .block()) + .isNotNull(); + } + + @Test + void requeueShouldUseQueueWhenConfigIsProvided() throws MessagingException { + MailQueueName newQueueName = MailQueueName.of("newQueue"); + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("queue", "newQueue") + .build()); + + mailet.service(mailSample); + + assertThat(Flux.from(mailQueueFactory.createQueue(newQueueName).deQueue()) + .map(MailQueue.MailQueueItem::getMail) + .blockFirst()) + .isNotNull(); + } + + @Nested + class Configuration { + + @Test + void shouldFailWhenBadDelay() { + Assertions.assertThatThrownBy(() -> testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("delay", "bad") + .build())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldFailWhenNegativeDuration() { + Assertions.assertThatThrownBy(() -> testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("delay", "-3s") + .build())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void durationWithNoUnitShouldDefaultToSeconds() throws MessagingException { + Requeue mailet = testee(FakeMailetConfig.builder() + .mailetName("Requeue") + .setProperty("delay", "2") + .build()); + + Instant enqueueTime = Instant.now(); + mailet.service(mailSample); + + AtomicReference<Instant> dequeueTime = new AtomicReference<>(); + + MailQueue.MailQueueItem mailQueueItem = Flux.from(spoolQueue.deQueue()) + .next() + .doOnNext(any -> dequeueTime.set(Instant.now())) + .block(); + + assertThat(mailQueueItem).isNotNull(); + assertThat(Duration.between(enqueueTime, dequeueTime.get()).abs().toSeconds()).isGreaterThanOrEqualTo(2); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
