This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new b2047b381c JAMES-3892 Allow configuring the count of retries in 
LocalDelivery (#1467)
b2047b381c is described below

commit b2047b381c79623ac2d99a4aa48c136d8add4a02
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Mar 7 09:29:41 2023 +0700

    JAMES-3892 Allow configuring the count of retries in LocalDelivery (#1467)
---
 .../java/org/apache/mailet/base/MailetUtil.java    | 28 +++++----
 .../james/transport/mailets/LocalDelivery.java     |  4 ++
 .../transport/mailets/delivery/MailDispatcher.java | 31 +++++++---
 .../mailets/delivery/MailDispatcherTest.java       | 66 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 19 deletions(-)

diff --git a/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java 
b/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
index c6b33626c5..1c19c2d249 100644
--- a/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
+++ b/mailet/base/src/main/java/org/apache/mailet/base/MailetUtil.java
@@ -25,6 +25,7 @@ import javax.mail.MessagingException;
 
 import org.apache.mailet.MailetConfig;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Strings;
 
 /**
@@ -52,30 +53,33 @@ public class MailetUtil {
     }
 
     public static int getInitParameterAsStrictlyPositiveInteger(String 
condition, int defaultValue) throws MessagingException {
-        String defaultStringValue = String.valueOf(defaultValue);
-        return getInitParameterAsStrictlyPositiveInteger(condition, 
Optional.of(defaultStringValue));
+        return getInitParameterAsStrictlyPositiveInteger(condition, 
Optional.of(defaultValue));
     }
 
     public static int getInitParameterAsStrictlyPositiveInteger(String 
condition) throws MessagingException {
         return getInitParameterAsStrictlyPositiveInteger(condition, 
Optional.empty());
     }
 
-    private static int getInitParameterAsStrictlyPositiveInteger(String 
condition, Optional<String> defaultValue) throws MessagingException {
-        String value = Optional.ofNullable(condition)
-            .orElse(defaultValue.orElse(null));
-
-        if (Strings.isNullOrEmpty(value)) {
-            throw new MessagingException("Condition is required. It should be 
a strictly positive integer");
-        }
-
-        int valueAsInt = tryParseInteger(value);
+    private static int getInitParameterAsStrictlyPositiveInteger(String 
condition, Optional<Integer> defaultValue) throws MessagingException {
+        int valueAsInt = getInitParameterAsInteger(condition, defaultValue);
 
         if (valueAsInt < 1) {
-            throw new MessagingException("Expecting condition to be a strictly 
positive integer. Got " + value);
+            throw new MessagingException("Expecting condition to be a strictly 
positive integer. Got " + valueAsInt);
         }
         return valueAsInt;
     }
 
+    public static int getInitParameterAsInteger(String condition, 
Optional<Integer> defaultValue) throws MessagingException {
+        if (Strings.isNullOrEmpty(condition) && defaultValue.isEmpty()) {
+            throw new MessagingException("Condition is required. It should be 
a strictly positive integer");
+        }
+
+        return Optional.ofNullable(condition)
+            .map(Throwing.<String, 
Integer>function(MailetUtil::tryParseInteger).sneakyThrow())
+            .or(() -> defaultValue)
+            .get();
+    }
+
     private static int tryParseInteger(String value) throws MessagingException 
{
         try {
             return Integer.parseInt(value);
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
index 3b11028137..b1d51e7904 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.transport.mailets;
 
+import java.util.Optional;
+
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.mail.MessagingException;
@@ -32,6 +34,7 @@ import 
org.apache.james.transport.mailets.delivery.SimpleMailStore;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
+import org.apache.mailet.base.MailetUtil;
 
 /**
  * Receives a Mail from the Queue and takes care of delivery of the
@@ -78,6 +81,7 @@ public class LocalDelivery extends GenericMailet {
                 .build())
             .consume(getInitParameter("consume", true))
             .onMailetException(getInitParameter("onMailetException", 
Mail.ERROR))
+            
.retries(MailetUtil.getInitParameterAsInteger(getInitParameter("retries"), 
Optional.of(MailDispatcher.RETRIES)))
             .mailetContext(getMailetContext())
             .build();
     }
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index d731ab1e59..8416cf5710 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -50,7 +51,7 @@ import reactor.util.retry.Retry;
 public class MailDispatcher {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MailDispatcher.class);
     private static final String[] NO_HEADERS = {};
-    private static final int RETRIES = 3;
+    public static final int RETRIES = 3;
     private static final Duration FIRST_BACKOFF = Duration.ofMillis(200);
     private static final Duration MAX_BACKOFF = Duration.ofSeconds(1);
 
@@ -65,6 +66,7 @@ public class MailDispatcher {
         private Boolean consume;
         private MailetContext mailetContext;
         private String onMailetException;
+        private Optional<Integer> retries = Optional.empty();
 
         public Builder consume(boolean consume) {
             this.consume = consume;
@@ -86,12 +88,19 @@ public class MailDispatcher {
             return this;
         }
 
+        public Builder retries(int retries) {
+            if (retries > 0) {
+                this.retries = Optional.of(retries);
+            }
+            return this;
+        }
+
         public MailDispatcher build() {
             Preconditions.checkNotNull(mailStore);
             Preconditions.checkNotNull(mailetContext);
             return new MailDispatcher(mailStore, mailetContext,
                 Optional.ofNullable(consume).orElse(DEFAULT_CONSUME),
-                
Optional.ofNullable(onMailetException).orElse(DEFAULT_ERROR_PROCESSOR));
+                retries, 
Optional.ofNullable(onMailetException).orElse(DEFAULT_ERROR_PROCESSOR));
         }
     }
 
@@ -100,12 +109,14 @@ public class MailDispatcher {
     private final boolean consume;
     private final boolean ignoreError;
     private final boolean propagate;
+    private final Optional<Integer> retries;
     private final String errorProcessor;
 
-    private MailDispatcher(MailStore mailStore, MailetContext mailetContext, 
boolean consume, String onMailetException) {
+    private MailDispatcher(MailStore mailStore, MailetContext mailetContext, 
boolean consume, Optional<Integer> retries, String onMailetException) {
         this.mailStore = mailStore;
         this.consume = consume;
         this.mailetContext = mailetContext;
+        this.retries = retries;
         this.errorProcessor = onMailetException;
         this.ignoreError = onMailetException.equalsIgnoreCase("ignore");
         this.propagate = onMailetException.equalsIgnoreCase("propagate");
@@ -172,10 +183,16 @@ public class MailDispatcher {
     }
 
     private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) {
-       return Mono.from(mailStore.storeMail(recipient, mail))
-           .doOnError(error -> LOGGER.warn("Error While storing mail. This 
error will be retried.", error))
-           .retryWhen(Retry.backoff(RETRIES, 
FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.parallel()))
-           .then();
+        AtomicInteger remainRetries = new AtomicInteger(retries.orElse(0));
+
+        Mono<Void> operation = Mono.from(mailStore.storeMail(recipient, mail))
+            .doOnError(error -> LOGGER.warn("Error While storing mail. This 
error will be retried for {} more times.", remainRetries.getAndDecrement(), 
error));
+
+        return retries.map(count ->
+            operation
+                .retryWhen(Retry.backoff(count, 
FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.parallel()))
+                .then())
+            .orElse(operation);
     }
 
     private Map<String, List<String>> saveHeaders(Mail mail, MailAddress 
recipient) throws MessagingException {
diff --git 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
index 744893cbbb..c11f587fdb 100644
--- 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
+++ 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
@@ -22,6 +22,7 @@ package org.apache.james.transport.mailets.delivery;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -31,6 +32,7 @@ import static org.mockito.Mockito.when;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.mail.MessagingException;
 
@@ -47,6 +49,8 @@ import org.apache.mailet.base.test.FakeMailContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
@@ -93,6 +97,68 @@ class MailDispatcherTest {
         verifyNoMoreInteractions(mailStore);
     }
 
+    @Test
+    void dispatchShouldPerformRetries() throws Exception {
+        MailDispatcher testee = MailDispatcher.builder()
+            .mailetContext(fakeMailContext)
+            .retries(3)
+            .mailStore(mailStore)
+            .consume(true)
+            .build();
+
+        AtomicInteger counter = new AtomicInteger(0);
+        when(mailStore.storeMail(any(), any())).thenAnswer(invocationOnMock -> 
Mono.error(() -> {
+            counter.getAndIncrement();
+            return new RuntimeException();
+        }));
+
+        FakeMail mail = FakeMail.builder()
+            .name("name")
+            .sender(MailAddressFixture.OTHER_AT_JAMES)
+            .recipients(MailAddressFixture.ANY_AT_JAMES)
+            .state("state")
+            .mimeMessage(MimeMessageUtil.defaultMimeMessage())
+            .build();
+        try {
+            testee.dispatch(mail);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        assertThat(counter.get()).isEqualTo(4);
+    }
+
+    @Test
+    void disableRetries() throws Exception {
+        MailDispatcher testee = MailDispatcher.builder()
+            .mailetContext(fakeMailContext)
+            .retries(0)
+            .mailStore(mailStore)
+            .consume(true)
+            .build();
+
+        AtomicInteger counter = new AtomicInteger(0);
+        when(mailStore.storeMail(any(), any())).thenAnswer(invocationOnMock -> 
Mono.error(() -> {
+            counter.getAndIncrement();
+            return new RuntimeException();
+        }));
+
+        FakeMail mail = FakeMail.builder()
+            .name("name")
+            .sender(MailAddressFixture.OTHER_AT_JAMES)
+            .recipients(MailAddressFixture.ANY_AT_JAMES)
+            .state("state")
+            .mimeMessage(MimeMessageUtil.defaultMimeMessage())
+            .build();
+        try {
+            testee.dispatch(mail);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        assertThat(counter.get()).isEqualTo(1);
+    }
+
     @Test
     void dispatchShouldConsumeMailIfSpecified() throws Exception {
         MailDispatcher testee = MailDispatcher.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to