This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da699c622 [FIX] Better error management for DeletedMessageVault 
consumer (#2651)
4da699c622 is described below

commit 4da699c62288c8fc189ac1d4fd9681719dfdb246
Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com>
AuthorDate: Mon Mar 3 08:03:01 2025 +0700

    [FIX] Better error management for DeletedMessageVault consumer (#2651)
    
    Otherwise, blobstore errors for example could crash the DTM consumer.
    
    Co-authored-by: Benoit TELLIER <btell...@linagora.com>
---
 .../mailbox/DistributedDeletedMessageVaultDeletionCallback.java      | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
index 6bed7178c5..d848c037c1 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -226,7 +226,7 @@ public class DistributedDeletedMessageVaultDeletionCallback 
implements DeleteMes
                 receiverProvider::createReceiver,
                 receiver -> receiver.consumeManualAck(QUEUE, new 
ConsumeOptions().qos(QOS)),
                 Receiver::close)
-            .flatMap(this::handleMessage)
+            .flatMap(this::handleMessage, QOS)
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();
     }
@@ -248,9 +248,10 @@ public class 
DistributedDeletedMessageVaultDeletionCallback implements DeleteMes
 
             return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, 
messageIdFactory, blobIdFactory))
                 .timeout(Duration.ofMinutes(5))
-                .doOnError(e -> {
+                .onErrorResume(e -> {
                     LOGGER.error("Failed executing deletion callback for {}", 
copyCommandDTO.messageId, e);
                     delivery.nack(REQUEUE);
+                    return Mono.empty();
                 })
                 .doOnSuccess(any -> delivery.ack())
                 .doOnCancel(() -> delivery.nack(REQUEUE));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to