chibenwa commented on a change in pull request #880:
URL: https://github.com/apache/james-project/pull/880#discussion_r803272182



##########
File path: 
server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RequeueThrottlingIntegrationTest.java
##########
@@ -0,0 +1,183 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import static 
org.apache.james.mailets.configuration.CommonProcessors.ERROR_REPOSITORY;
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static 
org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.stream.IntStream;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.core.Username;
+import org.apache.james.core.builder.MimeMessageBuilder;
+import org.apache.james.jmap.draft.MessageIdProbe;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailets.TemporaryJamesServer;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.MailetContainer;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.modules.MailboxProbeImpl;
+import org.apache.james.modules.protocols.ImapGuiceProbe;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.rate.limiter.memory.MemoryRateLimiterModule;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.apache.mailet.base.test.FakeMail;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import com.github.fge.lambdas.Throwing;
+
+public class RequeueThrottlingIntegrationTest {
+    private static final String SENDER = "sender1@" + DEFAULT_DOMAIN;
+    private static final String RECIPIENT1 = "recipient1@" + DEFAULT_DOMAIN;
+    private TemporaryJamesServer jamesServer;
+    private MimeMessage message;
+
+    @RegisterExtension
+    public TestIMAPClient testIMAPClient = new TestIMAPClient();
+    @RegisterExtension
+    public SMTPMessageSender messageSender = new 
SMTPMessageSender(DEFAULT_DOMAIN);
+
+    @BeforeEach
+    void setup(@TempDir File temporaryFolder) throws Exception {
+        MailetContainer.Builder mailetContainer = 
TemporaryJamesServer.simpleMailetContainerConfiguration()
+            .putProcessor(ProcessorConfiguration.error()
+                .enableJmx(false)
+                .addMailet(MailetConfiguration.builder()
+                    .matcher(All.class)
+                    .mailet(ToRepository.class)
+                    .addProperty("repositoryPath", 
ERROR_REPOSITORY.asString()))
+                .build())
+            .putProcessor(ProcessorConfiguration.transport()
+                .addMailet(MailetConfiguration.builder()
+                    .matcher(All.class)
+                    .mailet(PerSenderRateLimit.class)
+                    .addProperty("duration", "3s")
+                    .addProperty("count", "1")
+                    .addProperty("exceededProcessor", "tooMuchEmails")
+                    .build())
+                .addMailetsFrom(CommonProcessors.transport()))
+            .putProcessor(ProcessorConfiguration.builder()
+                .state("tooMuchEmails")
+                .addMailet(MailetConfiguration.builder()
+                    .matcher(All.class)
+                    .mailet(Requeue.class)
+                    .addProperty("delay", "4s")
+                    .build())
+                .build());
+
+        jamesServer = TemporaryJamesServer.builder()
+            .withMailetContainer(mailetContainer)
+            .withOverrides(new MemoryRateLimiterModule())
+            .build(temporaryFolder);
+        jamesServer.start();
+
+        jamesServer.getProbe(DataProbeImpl.class).fluent()
+            .addDomain(DEFAULT_DOMAIN)
+            .addUser(SENDER, PASSWORD)
+            .addUser(RECIPIENT1, PASSWORD);
+
+        message = MimeMessageBuilder.mimeMessageBuilder()
+            .setSubject("test")
+            .setText("text1")
+            .build();
+    }
+
+    @AfterEach
+    void tearDown() {
+        jamesServer.shutdown();
+    }
+
+    @Test
+    void throttlingShouldWork() throws Exception {
+        messageSender.connect(LOCALHOST_IP, 
jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+            .authenticate(SENDER, PASSWORD)
+            .sendMessage(FakeMail.builder()
+                .name("name")
+                .mimeMessage(message)
+                .sender(SENDER)
+                .recipient(RECIPIENT1));
+
+        // await first message
+        testIMAPClient.connect(LOCALHOST_IP, 
jamesServer.getProbe(ImapGuiceProbe.class).getImapPort())
+            .login(RECIPIENT1, PASSWORD)
+            .select(TestIMAPClient.INBOX)
+            .awaitMessage(awaitAtMostOneMinute);
+
+        // the second message should exceed count limit and re-enqueued
+        messageSender.connect(LOCALHOST_IP, 
jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+            .authenticate(SENDER, PASSWORD)
+            .sendMessage(FakeMail.builder()
+                .name("name")
+                .mimeMessage(message)
+                .sender(SENDER)
+                .recipient(RECIPIENT1));
+
+        // Then the second message should be sent to recipient after delay
+        MailboxProbeImpl mailboxProbe = 
jamesServer.getProbe(MailboxProbeImpl.class);
+        awaitAtMostOneMinute.until(() -> 
mailboxProbe.searchMessage(MultimailboxesSearchQuery.from(SearchQuery.of(SearchQuery.all())).build(),
 RECIPIENT1, Integer.MAX_VALUE).size() == 2);
+
+        // Checking the received dates of the emails to make sure emails were 
throttled
+        ArrayList<MessageId> messageIds = new 
ArrayList<>(mailboxProbe.searchMessage(MultimailboxesSearchQuery.from(SearchQuery.of(SearchQuery.all())).build(),
 RECIPIENT1, Integer.MAX_VALUE));
+        Username recipient = Username.of(RECIPIENT1);
+        MessageIdProbe messageIdProbe = 
jamesServer.getProbe(MessageIdProbe.class);
+        MessageResult firstMessage = 
messageIdProbe.getMessages(messageIds.get(0), recipient).get(0);
+        MessageResult secondMessage = 
messageIdProbe.getMessages(messageIds.get(1), recipient).get(0);
+
+        
assertThat(Duration.between(firstMessage.getInternalDate().toInstant(), 
secondMessage.getInternalDate().toInstant()).abs().toSeconds())
+            .isGreaterThan(3);

Review comment:
       Awseome!

##########
File path: 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
##########
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p><b>Requeue</b> puts back the email in a queue.
+ * It can be used for throttling when combined with rate limiting mailet. 
{@link org.apache.james.transport.mailets.PerSenderRateLimit}</p>
+ *
+ * <ul>Here are supported configuration parameters:
+ *      <li><b>queue</b>: a Mail Queue name (optional, default to spool).</li>
+ *      <li><b>processor</b>: a target processor (optional, defaults to 
root).</li>
+ *      <li><b>delay</b>: a delay when en-queueing mail (optional, defaults to 
none). Supported units include: s (second), m (minute), h (hour), d (day).</li>
+ *      <li><b>consume</b>: a processor to which current email should be 
redirected to (optional, defaults to ghost).</li>
+ *  </ul>
+ *
+ *  <p>For instance, to apply all the examples given above:</p>
+ *
+ *  <pre><code>
+ *  &lt;mailet matcher=&quot;All&quot; class=&quot;Requeue&quot;&gt;
+ *      &lt;queue&gt;spool&lt;/queue&gt;
+ *      &lt;processor&gt;root&lt;/processor&gt;
+ *      &lt;delay&gt;2h&lt;/delay&gt;
+ *      &lt;consume&gt;ghost&lt;/consume&gt;
+ *  &lt;/mailet&gt;
+ *   </code></pre>
+ *
+ * <p>Note that this is a naive approach: if you have a large number of emails 
(say N) submitted at once,
+ * you would need O(N) re-queues to eventually send all your emails,
+ * where the constant is the number of mail allowed over the time window.
+ * So this gives an overall complexity of O(N2), while careful planning could 
achieve this in O(N). </p>
+ */
+public class Requeue extends GenericMailet {
+    private final MailQueueFactory<?> mailQueueFactory;
+
+    private MailQueue mailQueue;
+    private Optional<Duration> delayDuration;
+    private String processor;
+    private String consumeProcessor;
+
+    /**
+     *
+     * @param mailQueueFactory Allows to get mail queue by name.
+     */
+    @Inject
+    public Requeue(MailQueueFactory<?> mailQueueFactory) {
+        this.mailQueueFactory = mailQueueFactory;
+    }
+
+    @Override
+    public void init() throws MessagingException {
+        MailQueueName mailQueueName = 
Optional.ofNullable(getInitParameter("queue"))
+            .map(MailQueueName::of).orElse(MailQueueFactory.SPOOL);
+
+        mailQueue = mailQueueFactory.getQueue(mailQueueName)
+            .orElseThrow(() -> new IllegalArgumentException("Can not find 
queue " + mailQueueName.asString()));
+        delayDuration = Optional.ofNullable(getInitParameter("delay"))
+            .map(delayValue -> DurationParser.parse(delayValue, 
ChronoUnit.SECONDS));
+        processor = Optional.ofNullable(getInitParameter("processor"))
+            .orElse(Mail.DEFAULT);
+        consumeProcessor = Optional.ofNullable(getInitParameter("consume"))
+            .orElse(Mail.GHOST);
+
+        Preconditions.checkArgument(delayDuration.isEmpty() || 
!delayDuration.get().isNegative(),
+            "Duration should be non-negative");
+    }
+
+    @Override
+    public void service(Mail mail) throws MessagingException {
+        mail.setState(consumeProcessor);

Review comment:
       ```suggestion
   if (consume) {
      enqueue(mail);
      mail.setState(GHOST);
   } else {
           Mail newMail = null;
           try {
               newMail = mail.duplicate();
               enqueue(newMail)
           } finally {
               LifecycleUtil.dispose(newMail);
           }
   } 
   ```
   
   With
   
   ```
   private void enqueue(Mail mail) {
               mail.setState(processor);
               duration.ifPresentOrElse(
                   delay -> mailQueue.enQueue(mail, delay),
                   () -> mailQueue.enQueue(mail));
   }
   ```
   
    - 1. Don't duplicate the mail when consume is true: that's expensive
    - 2. If spotted on an Optional which is an anti-pattern
    - 3. Correct the handling of consume...

##########
File path: 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
##########
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p><b>Requeue</b> puts back the email in a queue.
+ * It can be used for throttling when combined with rate limiting mailet. 
{@link org.apache.james.transport.mailets.PerSenderRateLimit}</p>
+ *
+ * <ul>Here are supported configuration parameters:
+ *      <li><b>queue</b>: a Mail Queue name (optional, default to spool).</li>
+ *      <li><b>processor</b>: a target processor (optional, defaults to 
root).</li>
+ *      <li><b>delay</b>: a delay when en-queueing mail (optional, defaults to 
none). Supported units include: s (second), m (minute), h (hour), d (day).</li>
+ *      <li><b>consume</b>: a processor to which current email should be 
redirected to (optional, defaults to ghost).</li>

Review comment:
       ```suggestion
    *      <li><b>consume</b>: a boolean. true will consume the mail: no 
further processing would be done on the mail (default) while false wil continue 
the processing of the email in addition to the requeue operation, which 
effectively duplicates the email.</li>
   ```

##########
File path: 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
##########
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p><b>Requeue</b> puts back the email in a queue.
+ * It can be used for throttling when combined with rate limiting mailet. 
{@link org.apache.james.transport.mailets.PerSenderRateLimit}</p>
+ *
+ * <ul>Here are supported configuration parameters:
+ *      <li><b>queue</b>: a Mail Queue name (optional, default to spool).</li>
+ *      <li><b>processor</b>: a target processor (optional, defaults to 
root).</li>
+ *      <li><b>delay</b>: a delay when en-queueing mail (optional, defaults to 
none). Supported units include: s (second), m (minute), h (hour), d (day).</li>
+ *      <li><b>consume</b>: a processor to which current email should be 
redirected to (optional, defaults to ghost).</li>
+ *  </ul>
+ *
+ *  <p>For instance, to apply all the examples given above:</p>
+ *
+ *  <pre><code>
+ *  &lt;mailet matcher=&quot;All&quot; class=&quot;Requeue&quot;&gt;
+ *      &lt;queue&gt;spool&lt;/queue&gt;
+ *      &lt;processor&gt;root&lt;/processor&gt;
+ *      &lt;delay&gt;2h&lt;/delay&gt;
+ *      &lt;consume&gt;ghost&lt;/consume&gt;

Review comment:
       ```suggestion
    *      &lt;consume&gt;true&lt;/consume&gt;
   ```

##########
File path: 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
##########
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p><b>Requeue</b> puts back the email in a queue.
+ * It can be used for throttling when combined with rate limiting mailet. 
{@link org.apache.james.transport.mailets.PerSenderRateLimit}</p>
+ *
+ * <ul>Here are supported configuration parameters:
+ *      <li><b>queue</b>: a Mail Queue name (optional, default to spool).</li>
+ *      <li><b>processor</b>: a target processor (optional, defaults to 
root).</li>
+ *      <li><b>delay</b>: a delay when en-queueing mail (optional, defaults to 
none). Supported units include: s (second), m (minute), h (hour), d (day).</li>
+ *      <li><b>consume</b>: a processor to which current email should be 
redirected to (optional, defaults to ghost).</li>
+ *  </ul>
+ *
+ *  <p>For instance, to apply all the examples given above:</p>
+ *
+ *  <pre><code>
+ *  &lt;mailet matcher=&quot;All&quot; class=&quot;Requeue&quot;&gt;
+ *      &lt;queue&gt;spool&lt;/queue&gt;
+ *      &lt;processor&gt;root&lt;/processor&gt;
+ *      &lt;delay&gt;2h&lt;/delay&gt;
+ *      &lt;consume&gt;ghost&lt;/consume&gt;
+ *  &lt;/mailet&gt;
+ *   </code></pre>
+ *
+ * <p>Note that this is a naive approach: if you have a large number of emails 
(say N) submitted at once,
+ * you would need O(N) re-queues to eventually send all your emails,
+ * where the constant is the number of mail allowed over the time window.
+ * So this gives an overall complexity of O(N2), while careful planning could 
achieve this in O(N). </p>
+ */
+public class Requeue extends GenericMailet {
+    private final MailQueueFactory<?> mailQueueFactory;
+
+    private MailQueue mailQueue;
+    private Optional<Duration> delayDuration;
+    private String processor;
+    private String consumeProcessor;
+
+    /**
+     *
+     * @param mailQueueFactory Allows to get mail queue by name.
+     */

Review comment:
       ```suggestion
   ```
   
   Useless java doc -> remove

##########
File path: 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
##########
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p><b>Requeue</b> puts back the email in a queue.
+ * It can be used for throttling when combined with rate limiting mailet. 
{@link org.apache.james.transport.mailets.PerSenderRateLimit}</p>
+ *
+ * <ul>Here are supported configuration parameters:
+ *      <li><b>queue</b>: a Mail Queue name (optional, default to spool).</li>
+ *      <li><b>processor</b>: a target processor (optional, defaults to 
root).</li>
+ *      <li><b>delay</b>: a delay when en-queueing mail (optional, defaults to 
none). Supported units include: s (second), m (minute), h (hour), d (day).</li>
+ *      <li><b>consume</b>: a processor to which current email should be 
redirected to (optional, defaults to ghost).</li>
+ *  </ul>
+ *
+ *  <p>For instance, to apply all the examples given above:</p>
+ *
+ *  <pre><code>
+ *  &lt;mailet matcher=&quot;All&quot; class=&quot;Requeue&quot;&gt;
+ *      &lt;queue&gt;spool&lt;/queue&gt;
+ *      &lt;processor&gt;root&lt;/processor&gt;
+ *      &lt;delay&gt;2h&lt;/delay&gt;
+ *      &lt;consume&gt;ghost&lt;/consume&gt;
+ *  &lt;/mailet&gt;
+ *   </code></pre>
+ *
+ * <p>Note that this is a naive approach: if you have a large number of emails 
(say N) submitted at once,
+ * you would need O(N) re-queues to eventually send all your emails,
+ * where the constant is the number of mail allowed over the time window.
+ * So this gives an overall complexity of O(N2), while careful planning could 
achieve this in O(N). </p>
+ */
+public class Requeue extends GenericMailet {
+    private final MailQueueFactory<?> mailQueueFactory;
+
+    private MailQueue mailQueue;
+    private Optional<Duration> delayDuration;
+    private String processor;
+    private String consumeProcessor;

Review comment:
       ```suggestion
       private boolean consume;
   ```

##########
File path: 
server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/RequeueTest.java
##########
@@ -0,0 +1,246 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import javax.mail.MessagingException;
+
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
+import org.apache.james.queue.memory.MemoryMailQueueFactory;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailetConfig;
+import org.apache.mailet.base.test.FakeMail;
+import org.apache.mailet.base.test.FakeMailetConfig;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class RequeueTest {
+    private Mail mailSample;
+    private MemoryMailQueueFactory mailQueueFactory;
+    private MemoryMailQueueFactory.MemoryCacheableMailQueue spoolQueue;
+
+    @BeforeEach
+    void beforeEach() throws MessagingException {
+        mailSample = FakeMail.builder()
+            .name("mail1")
+            .sender("[email protected]")
+            .recipients("[email protected]")
+            .state("newState")
+            .build();
+        MemoryMailQueueFactory memoryMailQueueFactory = new 
MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+        memoryMailQueueFactory.createQueue(MailQueueFactory.SPOOL);
+        spoolQueue = 
spy(memoryMailQueueFactory.getQueue(MailQueueFactory.SPOOL).orElseThrow());
+
+        mailQueueFactory = mock(MemoryMailQueueFactory.class);
+        
when(mailQueueFactory.getQueue(MailQueueFactory.SPOOL)).thenReturn(Optional.of(spoolQueue));
+    }
+
+    private Requeue testee(MailetConfig mailetConfig) throws 
MessagingException {
+        Requeue requeue = new Requeue(mailQueueFactory);
+        requeue.init(mailetConfig);
+        return requeue;
+    }
+
+    @Test
+    void mailetShouldRequeueMail() throws MessagingException {
+        Requeue mailet = testee(FakeMailetConfig.builder()
+            .mailetName("Requeue")
+            .build());
+        mailet.service(mailSample);
+
+        ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
+        Mockito.verify(spoolQueue).enQueue(mailCaptor.capture());

Review comment:
       Why do we need mocks if we have a memory implementation at hand?
   
   I hate mocks.
   
   Can we try rewrite this without mocks?

##########
File path: 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
##########
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.util.DurationParser;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p><b>Requeue</b> puts back the email in a queue.
+ * It can be used for throttling when combined with rate limiting mailet. 
{@link org.apache.james.transport.mailets.PerSenderRateLimit}</p>
+ *
+ * <ul>Here are supported configuration parameters:
+ *      <li><b>queue</b>: a Mail Queue name (optional, default to spool).</li>
+ *      <li><b>processor</b>: a target processor (optional, defaults to 
root).</li>
+ *      <li><b>delay</b>: a delay when en-queueing mail (optional, defaults to 
none). Supported units include: s (second), m (minute), h (hour), d (day).</li>
+ *      <li><b>consume</b>: a processor to which current email should be 
redirected to (optional, defaults to ghost).</li>
+ *  </ul>
+ *
+ *  <p>For instance, to apply all the examples given above:</p>
+ *
+ *  <pre><code>
+ *  &lt;mailet matcher=&quot;All&quot; class=&quot;Requeue&quot;&gt;
+ *      &lt;queue&gt;spool&lt;/queue&gt;
+ *      &lt;processor&gt;root&lt;/processor&gt;
+ *      &lt;delay&gt;2h&lt;/delay&gt;
+ *      &lt;consume&gt;ghost&lt;/consume&gt;
+ *  &lt;/mailet&gt;
+ *   </code></pre>
+ *
+ * <p>Note that this is a naive approach: if you have a large number of emails 
(say N) submitted at once,
+ * you would need O(N) re-queues to eventually send all your emails,
+ * where the constant is the number of mail allowed over the time window.
+ * So this gives an overall complexity of O(N2), while careful planning could 
achieve this in O(N). </p>
+ */
+public class Requeue extends GenericMailet {
+    private final MailQueueFactory<?> mailQueueFactory;
+
+    private MailQueue mailQueue;
+    private Optional<Duration> delayDuration;
+    private String processor;
+    private String consumeProcessor;
+
+    /**
+     *
+     * @param mailQueueFactory Allows to get mail queue by name.
+     */
+    @Inject
+    public Requeue(MailQueueFactory<?> mailQueueFactory) {
+        this.mailQueueFactory = mailQueueFactory;
+    }
+
+    @Override
+    public void init() throws MessagingException {
+        MailQueueName mailQueueName = 
Optional.ofNullable(getInitParameter("queue"))
+            .map(MailQueueName::of).orElse(MailQueueFactory.SPOOL);
+
+        mailQueue = mailQueueFactory.getQueue(mailQueueName)

Review comment:
       ```suggestion
           mailQueue = mailQueueFactory.createQueue(mailQueueName)
   ```
   
   Otherwise this could lead to startup ordering issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to