This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 4da699c622 [FIX] Better error management for DeletedMessageVault consumer (#2651) 4da699c622 is described below commit 4da699c62288c8fc189ac1d4fd9681719dfdb246 Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com> AuthorDate: Mon Mar 3 08:03:01 2025 +0700 [FIX] Better error management for DeletedMessageVault consumer (#2651) Otherwise, blobstore errors for example could crash the DTM consumer. Co-authored-by: Benoit TELLIER <btell...@linagora.com> --- .../mailbox/DistributedDeletedMessageVaultDeletionCallback.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java index 6bed7178c5..d848c037c1 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java @@ -226,7 +226,7 @@ public class DistributedDeletedMessageVaultDeletionCallback implements DeleteMes receiverProvider::createReceiver, receiver -> receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)), Receiver::close) - .flatMap(this::handleMessage) + .flatMap(this::handleMessage, QOS) .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } @@ -248,9 +248,10 @@ public class DistributedDeletedMessageVaultDeletionCallback implements DeleteMes return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory, blobIdFactory)) .timeout(Duration.ofMinutes(5)) - .doOnError(e -> { + .onErrorResume(e -> { LOGGER.error("Failed executing deletion callback for {}", copyCommandDTO.messageId, e); delivery.nack(REQUEUE); + return Mono.empty(); }) .doOnSuccess(any -> delivery.ack()) .doOnCancel(() -> delivery.nack(REQUEUE)); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org