This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 34d0b9c3b5 [ENHANCEMENT] Do not retry storing mails when OverQuotaException (#2582) 34d0b9c3b5 is described below commit 34d0b9c3b5ab1d4a1d282a4e3a507928b6c41e9e Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com> AuthorDate: Mon Jan 6 11:57:34 2025 +0700 [ENHANCEMENT] Do not retry storing mails when OverQuotaException (#2582) --- .../james/transport/mailets/LocalDelivery.java | 1 + .../transport/mailets/delivery/MailDispatcher.java | 36 +++++++++++++++++++--- .../mailets/delivery/MailboxAppenderImpl.java | 3 +- .../mailets/delivery/MailDispatcherTest.java | 20 ++++++++++++ 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java index a069db8ecb..e5f9c033d6 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java @@ -83,6 +83,7 @@ public class LocalDelivery extends GenericMailet { .onMailetException(getInitParameter("onMailetException", Mail.ERROR)) .retries(MailetUtil.getInitParameterAsInteger(getInitParameter("retries"), Optional.of(MailDispatcher.RETRIES))) .mailetContext(getMailetContext()) + .usersRepository(usersRepository) .build(); } diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java index d115db5641..e393ec9d56 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java @@ -29,8 +29,12 @@ import jakarta.mail.MessagingException; import jakarta.mail.internet.MimeMessage; import org.apache.james.core.MailAddress; +import org.apache.james.core.Username; import org.apache.james.lifecycle.api.LifecycleUtil; +import org.apache.james.mailbox.exception.OverQuotaException; import org.apache.james.server.core.MailImpl; +import org.apache.james.user.api.UsersRepository; +import org.apache.james.user.api.UsersRepositoryException; import org.apache.james.util.AuditTrail; import org.apache.mailet.Mail; import org.apache.mailet.MailetContext; @@ -64,6 +68,7 @@ public class MailDispatcher { static final boolean DEFAULT_CONSUME = true; static final String DEFAULT_ERROR_PROCESSOR = Mail.ERROR; private MailStore mailStore; + private UsersRepository usersRepository; private Boolean consume; private MailetContext mailetContext; private String onMailetException; @@ -84,6 +89,11 @@ public class MailDispatcher { return this; } + public Builder usersRepository(UsersRepository usersRepository) { + this.usersRepository = usersRepository; + return this; + } + public Builder onMailetException(String onMailetException) { this.onMailetException = onMailetException; return this; @@ -99,9 +109,11 @@ public class MailDispatcher { public MailDispatcher build() { Preconditions.checkNotNull(mailStore); Preconditions.checkNotNull(mailetContext); + Preconditions.checkNotNull(usersRepository); + return new MailDispatcher(mailStore, mailetContext, Optional.ofNullable(consume).orElse(DEFAULT_CONSUME), - retries, Optional.ofNullable(onMailetException).orElse(DEFAULT_ERROR_PROCESSOR)); + retries, Optional.ofNullable(onMailetException).orElse(DEFAULT_ERROR_PROCESSOR), usersRepository); } } @@ -112,8 +124,9 @@ public class MailDispatcher { private final boolean propagate; private final Optional<Integer> retries; private final String errorProcessor; + private final UsersRepository usersRepository; - private MailDispatcher(MailStore mailStore, MailetContext mailetContext, boolean consume, Optional<Integer> retries, String onMailetException) { + private MailDispatcher(MailStore mailStore, MailetContext mailetContext, boolean consume, Optional<Integer> retries, String onMailetException, UsersRepository usersRepository) { this.mailStore = mailStore; this.consume = consume; this.mailetContext = mailetContext; @@ -121,6 +134,7 @@ public class MailDispatcher { this.errorProcessor = onMailetException; this.ignoreError = onMailetException.equalsIgnoreCase("ignore"); this.propagate = onMailetException.equalsIgnoreCase("propagate"); + this.usersRepository = usersRepository; } public void dispatch(Mail mail) throws MessagingException { @@ -195,18 +209,32 @@ public class MailDispatcher { } private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) { + Username username = computeUsername(recipient); AtomicInteger remainRetries = new AtomicInteger(retries.orElse(0)); Mono<Void> operation = Mono.from(mailStore.storeMail(recipient, mail)) - .doOnError(error -> LOGGER.warn("Error While storing mail. This error will be retried for {} more times.", remainRetries.getAndDecrement(), error)); + .doOnError(OverQuotaException.class, e -> LOGGER.info("Could not store mail due to quota error for user {}", username.asString())) + .doOnError(e -> !(e instanceof OverQuotaException), error -> LOGGER.warn("Error While storing mail. This error will be retried for {} more times.", remainRetries.getAndDecrement(), error)); return retries.map(count -> operation - .retryWhen(Retry.backoff(count, FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.parallel())) + .retryWhen(Retry.backoff(count, FIRST_BACKOFF) + .maxBackoff(MAX_BACKOFF) + .filter(e -> !(e instanceof OverQuotaException)) + .scheduler(Schedulers.parallel())) .then()) .orElse(operation); } + private Username computeUsername(MailAddress recipient) { + try { + return usersRepository.getUsername(recipient); + } catch (UsersRepositoryException e) { + LOGGER.warn("Unable to retrieve username for {}", recipient.asPrettyString(), e); + return Username.of(recipient.asString()); + } + } + private Map<String, List<String>> saveHeaders(Mail mail, MailAddress recipient) throws MessagingException { ImmutableMap.Builder<String, List<String>> backup = ImmutableMap.builder(); Collection<String> headersToSave = mail.getPerRecipientSpecificHeaders().getHeaderNamesForRecipient(recipient); diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java index 3cc8b974c4..0f5859b349 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java @@ -124,8 +124,7 @@ public class MailboxAppenderImpl implements MailboxAppender { }, session -> appendMessageToMailbox(mail, session, mailboxPath, flags), this::closeProcessing) - .onErrorMap(OverQuotaException.class, e -> new MessagingException("Could not append due to quota error", e)) - .onErrorMap(MailboxException.class, e -> new MessagingException("Unable to access mailbox.", e)); + .onErrorMap(e -> e instanceof MailboxException && !(e instanceof OverQuotaException), e -> new MessagingException("Unable to access mailbox.", (MailboxException) e)); } protected Mono<AppendResult> appendMessageToMailbox(MimeMessage mail, MailboxSession session, MailboxPath path, Optional<Flags> flags) { diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java index d1c76870a8..8b86d5f6c8 100644 --- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java +++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java @@ -38,6 +38,8 @@ import jakarta.mail.MessagingException; import org.apache.commons.io.IOUtils; import org.apache.james.core.MailAddress; import org.apache.james.core.builder.MimeMessageBuilder; +import org.apache.james.user.api.UsersRepository; +import org.apache.james.user.memory.MemoryUsersRepository; import org.apache.james.util.MimeMessageUtil; import org.apache.mailet.Mail; import org.apache.mailet.PerRecipientHeaders.Header; @@ -64,12 +66,14 @@ class MailDispatcherTest { private FakeMailContext fakeMailContext; private MailStore mailStore; + private UsersRepository usersRepository; @BeforeEach public void setUp() throws Exception { fakeMailContext = FakeMailContext.defaultContext(); mailStore = mock(MailStore.class); when(mailStore.storeMail(any(), any())).thenReturn(Mono.empty()); + usersRepository = MemoryUsersRepository.withVirtualHosting(null); } @Test @@ -78,6 +82,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .consume(true) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -101,6 +106,7 @@ class MailDispatcherTest { .retries(3) .mailStore(mailStore) .consume(true) + .usersRepository(usersRepository) .build(); AtomicInteger counter = new AtomicInteger(0); @@ -132,6 +138,7 @@ class MailDispatcherTest { .retries(0) .mailStore(mailStore) .consume(true) + .usersRepository(usersRepository) .build(); AtomicInteger counter = new AtomicInteger(0); @@ -162,6 +169,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .consume(true) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -181,6 +189,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .consume(false) + .usersRepository(usersRepository) .build(); String state = "state"; @@ -201,6 +210,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .consume(true) + .usersRepository(usersRepository) .build(); doReturn(Mono.error(new MessagingException())) .when(mailStore) @@ -237,6 +247,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .consume(false) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -262,6 +273,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(accumulatorTestHeaderMailStore) .consume(false) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -285,6 +297,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(accumulatorTestHeaderMailStore) .consume(false) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -308,6 +321,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(accumulatorTestHeaderMailStore) .consume(false) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -333,6 +347,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(accumulatorTestHeaderMailStore) .consume(false) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -359,6 +374,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(accumulatorTestHeaderMailStore) .consume(false) + .usersRepository(usersRepository) .build(); FakeMail mail = FakeMail.builder() @@ -382,6 +398,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(accumulatorTestHeaderMailStore) .consume(false) + .usersRepository(usersRepository) .build(); String headerValue = "arbitraryValue"; @@ -406,6 +423,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .onMailetException("ignore") + .usersRepository(usersRepository) .build(); doReturn(Mono.error(new MessagingException())) @@ -434,6 +452,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .onMailetException("errorProcessor1") + .usersRepository(usersRepository) .build(); doReturn(Mono.error(new MessagingException())) @@ -463,6 +482,7 @@ class MailDispatcherTest { .mailetContext(fakeMailContext) .mailStore(mailStore) .onMailetException("propagate") + .usersRepository(usersRepository) .build(); doReturn(Mono.error(new MessagingException())) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org