This is an automated email from the ASF dual-hosted git repository.

hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 88bbfb95c4 JAMES-4154 Refactor message content deletion (#2884)
88bbfb95c4 is described below

commit 88bbfb95c426a621948709c7d9170677fe139c61
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Mon Dec 15 21:55:04 2025 +0700

    JAMES-4154 Refactor message content deletion (#2884)
---
 docs/modules/servers/partials/configure/vault.adoc |   3 -
 .../org/apache/james/events/NamingStrategy.java    |   2 +
 ...DeletionEventBusDeadLetterQueueHealthCheck.java |  63 +++++
 ...tionEventBusDeadLetterQueueHealthCheckTest.java | 131 ++++++++++
 .../apache/james/mailbox/events/MailboxEvents.java |  19 ++
 .../CassandraMailboxSessionMapperFactory.java      |   9 +-
 .../mailbox/cassandra/DeleteMessageListener.java   | 109 +++-----
 .../cassandra/CassandraMailboxManagerProvider.java |   2 +-
 .../cassandra/CassandraTestSystemFixture.java      |   2 +-
 .../james/event/json/MailboxEventSerializer.scala  |  26 +-
 .../MessageContentDeletionSerializationTest.java   |  86 +++++++
 .../org/apache/james/vault/VaultConfiguration.java |  22 +-
 .../apache/james/vault/VaultConfigurationTest.java |  12 +-
 .../james/mailbox/store/event/EventFactory.java    |  83 ++++++
 .../deletedMessageVault.properties                 |   4 -
 .../james/CassandraRabbitMQJamesServerMain.java    |  10 +-
 .../deletedMessageVault.properties                 |   4 -
 .../james/DistributedPOP3JamesServerMain.java      |  10 +-
 .../org/apache/james/PostgresJamesServerMain.java  |   2 +
 .../james/modules/data/CassandraJmapModule.java    |  11 +-
 .../CassandraDeletedMessageVaultModule.java        |  70 -----
 .../modules/mailbox/CassandraMailboxModule.java    |   1 -
 .../event/ContentDeletionEventBusModule.java       | 161 ++++++++++++
 .../DeletedMessageVaultDeletionListener.java       |  61 +++--
 ...edMessageVaultWorkQueueReconnectionHandler.java |  43 ----
 ...ributedDeletedMessageVaultDeletionCallback.java | 282 ---------------------
 .../DistributedDeletedMessageVaultModule.java      |  21 +-
 ...MessageFastViewProjectionDeletionListener.java} |  35 ++-
 ...itMQWebAdminServerIntegrationImmutableTest.java |   3 +-
 ...eEnabledDeletedMessageVaultIntegrationTest.java |  81 ------
 ...icated-eventbus-for-message-content-deletion.md |  45 ++++
 src/site/xdoc/server/config-vault.xml              |   7 -
 32 files changed, 762 insertions(+), 658 deletions(-)

diff --git a/docs/modules/servers/partials/configure/vault.adoc 
b/docs/modules/servers/partials/configure/vault.adoc
index 8949686175..ed8ffd5254 100644
--- a/docs/modules/servers/partials/configure/vault.adoc
+++ b/docs/modules/servers/partials/configure/vault.adoc
@@ -24,9 +24,6 @@ to get some examples and hints.
 | enabled
 | Allows to enable or disable usage of the Deleted Message Vault. Default to 
false.
 
-| workQueueEnabled
-| Enable work queue to be used with deleted message vault. Default to false.
-
 | retentionPeriod
 | Deleted messages stored in the Deleted Messages Vault are expired after this 
period (default: 1 year). It can be expressed in *y* years, *d* days, *h* 
hours, ...
 
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
index d3ce4a6f7d..b0f03b4660 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
@@ -24,8 +24,10 @@ import reactor.rabbitmq.QueueSpecification;
 public class NamingStrategy {
     public static final EventBusName JMAP_EVENT_BUS_NAME = new 
EventBusName("jmapEvent");
     public static final EventBusName MAILBOX_EVENT_BUS_NAME = new 
EventBusName("mailboxEvent");
+    public static final EventBusName CONTENT_DELETION_EVENT_BUS_NAME = new 
EventBusName("contentDeletionEvent");
     public static final NamingStrategy JMAP_NAMING_STRATEGY = new 
NamingStrategy(JMAP_EVENT_BUS_NAME);
     public static final NamingStrategy MAILBOX_EVENT_NAMING_STRATEGY = new 
NamingStrategy(MAILBOX_EVENT_BUS_NAME);
+    public static final NamingStrategy CONTENT_DELETION_NAMING_STRATEGY = new 
NamingStrategy(CONTENT_DELETION_EVENT_BUS_NAME);
 
     private final EventBusName eventBusName;
 
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck.java
new file mode 100644
index 0000000000..411279eac8
--- /dev/null
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck.java
@@ -0,0 +1,63 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck 
implements HealthCheck {
+    public static final ComponentName COMPONENT_NAME = new 
ComponentName("RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck");
+    private static final String DEFAULT_VHOST = "/";
+
+    private final RabbitMQConfiguration configuration;
+    private final RabbitMQManagementAPI api;
+
+    @Inject
+    public 
RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck(RabbitMQConfiguration 
configuration) {
+        this.configuration = configuration;
+        this.api = RabbitMQManagementAPI.from(configuration);
+    }
+
+    @Override
+    public ComponentName componentName() {
+        return COMPONENT_NAME;
+    }
+
+    @Override
+    public Mono<Result> check() {
+        return Mono.fromCallable(() -> 
api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), 
NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY.deadLetterQueue().getName()).getQueueLength())
+            .map(queueSize -> {
+                if (queueSize != 0) {
+                    return Result.degraded(COMPONENT_NAME, "RabbitMQ dead 
letter queue of the content deletion event bus contain events. This might 
indicate transient failure on event processing.");
+                }
+                return Result.healthy(COMPONENT_NAME);
+            })
+            .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, 
"Error checking RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck", e)))
+            .subscribeOn(Schedulers.boundedElastic());
+    }
+}
diff --git 
a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest.java
 
b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest.java
new file mode 100644
index 0000000000..bf8e5282cb
--- /dev/null
+++ 
b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest.java
@@ -0,0 +1,131 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static 
org.apache.james.backends.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+class RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest {
+    @RegisterExtension
+    RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+        .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);
+
+    public static final ImmutableMap<String, Object> 
NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
+    public static final String ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS = 
"contentDeletionRoutingKey";
+
+    private Connection connection;
+    private Channel channel;
+    private RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck testee;
+
+    @BeforeEach
+    void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, 
URISyntaxException {
+        ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+        connectionFactory.setNetworkRecoveryInterval(1000);
+        connection = connectionFactory.newConnection();
+        channel = connection.createChannel();
+        testee = new 
RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck(rabbitMQ.getConfiguration());
+    }
+
+    @AfterEach
+    void tearDown(DockerRabbitMQ rabbitMQ) throws Exception {
+        closeQuietly(connection, channel);
+        rabbitMQ.reset();
+    }
+
+    @Test
+    void healthCheckShouldReturnUnhealthyWhenRabbitMQIsDown() throws Exception 
{
+        rabbitMQExtension.getRabbitMQ().stopApp();
+
+        assertThat(testee.check().block().isUnHealthy()).isTrue();
+    }
+
+    @Test
+    void 
healthCheckShouldReturnHealthyWhenContentDeletionEventBusDeadLetterQueueIsEmpty()
 throws Exception {
+        createDeadLetterQueue(channel, 
NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY, 
ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS);
+
+        assertThat(testee.check().block().isHealthy()).isTrue();
+    }
+
+    @Test
+    void healthCheckShouldReturnUnhealthyWhenThereIsNoDeadLetterQueue() {
+        assertThat(testee.check().block().isUnHealthy()).isTrue();
+    }
+
+    @Test
+    void 
healthCheckShouldReturnDegradedWhenContentDeletionEventBusDeadLetterQueueIsNotEmpty()
 throws Exception {
+        createDeadLetterQueue(channel, 
NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY, 
ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS);
+        publishAMessage(channel, 
ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS);
+
+        awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+    }
+
+    private void createDeadLetterQueue(Channel channel, NamingStrategy 
namingStrategy, String routingKey) throws IOException {
+        channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE);
+        channel.queueDeclare(namingStrategy.deadLetterQueue().getName(), 
DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
+        channel.queueBind(namingStrategy.deadLetterQueue().getName(), 
EXCHANGE_NAME, routingKey);
+    }
+
+    private void publishAMessage(Channel channel, String routingKey) throws 
IOException {
+        AMQP.BasicProperties basicProperties = new 
AMQP.BasicProperties.Builder()
+            .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+            .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+            .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .build();
+
+        channel.basicPublish(EXCHANGE_NAME, routingKey, basicProperties, 
"Hello, world!".getBytes(StandardCharsets.UTF_8));
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
+    }
+}
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
index 5ba8f6ca7a..d9bba5159d 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
@@ -536,6 +536,25 @@ public interface MailboxEvents {
         }
     }
 
+    record MessageContentDeletionEvent(EventId eventId, Username username, 
MailboxId mailboxId, MessageId messageId, long size,
+                                       Instant internalDate, boolean 
hasAttachments, String headerBlobId, String bodyBlobId) implements Event {
+
+        @Override
+        public EventId getEventId() {
+            return eventId;
+        }
+
+        @Override
+        public Username getUsername() {
+            return username;
+        }
+
+        @Override
+        public boolean isNoop() {
+            return false;
+        }
+    }
+
     /**
      * A mailbox event related to added message
      */
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 9813f8dd76..67b9b21728 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -25,6 +25,7 @@ import jakarta.inject.Inject;
 
 import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.events.EventBus;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.mail.ACLMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraAnnotationMapper;
@@ -63,7 +64,7 @@ import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.user.SubscriptionMapper;
 
 import com.datastax.oss.driver.api.core.CqlSession;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Cassandra implementation of {@link MailboxSessionMapperFactory}
@@ -229,9 +230,11 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
 
     }
 
-    public DeleteMessageListener deleteMessageListener() {
+    @VisibleForTesting
+    public DeleteMessageListener deleteMessageListener(EventBus 
contentDeletionEventBus) {
         return new DeleteMessageListener(threadDAO, threadLookupDAO, 
imapUidDAO, messageIdDAO, messageDAOV3, attachmentDAOV2,
             aclMapper, userMailboxRightsDAO, applicableFlagDAO, 
firstUnseenDAO, deletedMessageDAO,
-            mailboxCounterDAO, mailboxRecentsDAO, blobStore, 
cassandraConfiguration, ImmutableSet.of());
+            mailboxCounterDAO, mailboxRecentsDAO, blobStore, 
cassandraConfiguration,
+            contentDeletionEventBus);
     }
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 7e0a113607..f20dc5ca3e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -23,18 +23,17 @@ import static 
org.apache.james.backends.cassandra.init.configuration.JamesExecut
 import static 
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles.ConsistencyChoice.WEAK;
 import static org.apache.james.util.FunctionalUtils.negate;
 
-import java.util.Date;
 import java.util.Optional;
-import java.util.Set;
 
 import jakarta.inject.Inject;
+import jakarta.inject.Named;
 
 import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import 
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
-import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.core.Username;
 import org.apache.james.events.Event;
+import org.apache.james.events.EventBus;
 import org.apache.james.events.EventListener;
 import org.apache.james.events.Group;
 import org.apache.james.mailbox.acl.ACLDiff;
@@ -59,13 +58,15 @@ import 
org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
-import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.store.event.EventFactory;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.util.streams.Limit;
 import org.reactivestreams.Publisher;
 
+import com.google.common.collect.ImmutableSet;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -81,80 +82,14 @@ import reactor.core.publisher.Mono;
  * idempotent.
  */
 public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventListener {
+    public static final String CONTENT_DELETION = "contentDeletion";
+
     private static final Optional<CassandraId> ALL_MAILBOXES = 
Optional.empty();
 
     public static class DeleteMessageListenerGroup extends Group {
 
     }
 
-    public static class DeletedMessageCopyCommand {
-        public static DeletedMessageCopyCommand of(MessageRepresentation 
message, MailboxId mailboxId, Username owner) {
-            return new DeletedMessageCopyCommand(message.getMessageId(), 
mailboxId, owner, message.getInternalDate(),
-                message.getSize(), !message.getAttachments().isEmpty(), 
message.getHeaderId(), message.getBodyId());
-        }
-
-        private final MessageId messageId;
-        private final MailboxId mailboxId;
-        private final Username owner;
-        private final Date internalDate;
-        private final long size;
-        private final boolean hasAttachments;
-        private final BlobId headerId;
-        private final BlobId bodyId;
-
-        public DeletedMessageCopyCommand(MessageId messageId, MailboxId 
mailboxId, Username owner, Date internalDate, long size, boolean 
hasAttachments, BlobId headerId, BlobId bodyId) {
-            this.messageId = messageId;
-            this.mailboxId = mailboxId;
-            this.owner = owner;
-            this.internalDate = internalDate;
-            this.size = size;
-            this.hasAttachments = hasAttachments;
-            this.headerId = headerId;
-            this.bodyId = bodyId;
-        }
-
-        public Username getOwner() {
-            return owner;
-        }
-
-        public MessageId getMessageId() {
-            return messageId;
-        }
-
-        public MailboxId getMailboxId() {
-            return mailboxId;
-        }
-
-        public Date getInternalDate() {
-            return internalDate;
-        }
-
-        public long getSize() {
-            return size;
-        }
-
-        public boolean hasAttachments() {
-            return hasAttachments;
-        }
-
-        public BlobId getHeaderId() {
-            return headerId;
-        }
-
-        public BlobId getBodyId() {
-            return bodyId;
-        }
-    }
-
-    @FunctionalInterface
-    public interface DeletionCallback {
-        default Mono<Void> forMessage(MessageRepresentation message, MailboxId 
mailboxId, Username owner) {
-            return forMessage(DeletedMessageCopyCommand.of(message, mailboxId, 
owner));
-        }
-        
-        Mono<Void> forMessage(DeletedMessageCopyCommand copyCommand);
-    }
-
     private final CassandraThreadDAO threadDAO;
     private final CassandraThreadLookupDAO threadLookupDAO;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
@@ -170,7 +105,7 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
     private final CassandraMailboxRecentsDAO recentsDAO;
     private final BlobStore blobStore;
     private final CassandraConfiguration cassandraConfiguration;
-    private final Set<DeletionCallback> deletionCallbackList;
+    private final EventBus contentDeletionEventBus;
 
     @Inject
     public DeleteMessageListener(CassandraThreadDAO threadDAO, 
CassandraThreadLookupDAO threadLookupDAO,
@@ -180,7 +115,8 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
                                  CassandraUserMailboxRightsDAO rightsDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
                                  CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraDeletedMessageDAO deletedMessageDAO,
                                  CassandraMailboxCounterDAO counterDAO, 
CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore,
-                                 CassandraConfiguration 
cassandraConfiguration, Set<DeletionCallback> deletionCallbackList) {
+                                 CassandraConfiguration cassandraConfiguration,
+                                 @Named(CONTENT_DELETION) EventBus 
contentDeletionEventBus) {
         this.threadDAO = threadDAO;
         this.threadLookupDAO = threadLookupDAO;
         this.imapUidDAO = imapUidDAO;
@@ -196,7 +132,7 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
         this.recentsDAO = recentsDAO;
         this.blobStore = blobStore;
         this.cassandraConfiguration = cassandraConfiguration;
-        this.deletionCallbackList = deletionCallbackList;
+        this.contentDeletionEventBus = contentDeletionEventBus;
     }
 
     @Override
@@ -260,7 +196,8 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
         return Mono.just(messageId)
             .filterWhen(this::isReferenced)
             .flatMap(id -> readMessage(id)
-                .flatMap(message -> 
Flux.fromIterable(deletionCallbackList).concatMap(callback -> 
callback.forMessage(message, mailboxId, owner)).then().thenReturn(message))
+                .flatMap(message -> 
dispatchMessageContentDeletionEvent(mailboxId, owner, message)
+                    .thenReturn(message))
                 .flatMap(message -> 
deleteUnreferencedAttachments(message).thenReturn(message))
                 .flatMap(this::deleteMessageBlobs)
                 .then(messageDAOV3.delete(messageId))
@@ -270,18 +207,34 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
                 .then(threadLookupDAO.deleteOneRow(threadId, messageId)));
     }
 
+    private Mono<Void> dispatchMessageContentDeletionEvent(MailboxId 
mailboxId, Username owner, MessageRepresentation message) {
+        return 
Mono.from(contentDeletionEventBus.dispatch(EventFactory.messageContentDeleted()
+            .randomEventId()
+            .user(owner)
+            .mailboxId(mailboxId)
+            .messageId(message.getMessageId())
+            .size(message.getSize())
+            .instant(message.getInternalDate().toInstant())
+            .hasAttachments(!message.getAttachments().isEmpty())
+            .headerBlobId(message.getHeaderId().asString())
+            .bodyBlobId(message.getBodyId().asString())
+            .build(),
+            ImmutableSet.of()));
+    }
+
     private Mono<Void> 
handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, 
ThreadId threadId, CassandraId excludedId, Username owner) {
         return Mono.just(messageId)
             .filterWhen(id -> isReferenced(id, excludedId))
             .flatMap(id -> readMessage(id)
-                .flatMap(message -> 
Flux.fromIterable(deletionCallbackList).concatMap(callback -> 
callback.forMessage(message, excludedId, owner)).then().thenReturn(message))
+                .flatMap(message -> 
dispatchMessageContentDeletionEvent(excludedId, owner, message)
+                    .thenReturn(message)))
                 .flatMap(message -> 
deleteUnreferencedAttachments(message).thenReturn(message))
                 .flatMap(this::deleteMessageBlobs)
                 .then(messageDAOV3.delete(messageId))
                 .then(threadLookupDAO.selectOneRow(threadId, messageId)
                     .flatMap(key -> threadDAO.deleteSome(key.getUsername(), 
key.getMimeMessageIds())
                         .collectList()))
-                .then(threadLookupDAO.deleteOneRow(threadId, messageId)));
+                .then(threadLookupDAO.deleteOneRow(threadId, messageId));
     }
 
     private Mono<MessageRepresentation> 
deleteMessageBlobs(MessageRepresentation message) {
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
index 8e66b4c286..a94bc3f3a5 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
@@ -132,7 +132,7 @@ public class CassandraMailboxManagerProvider {
 
         eventBus.register(quotaUpdater);
         eventBus.register(new MailboxAnnotationListener(mapperFactory, 
sessionProvider));
-        eventBus.register(mapperFactory.deleteMessageListener());
+        eventBus.register(mapperFactory.deleteMessageListener(eventBus));
 
         return manager;
     }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index 143e4d6520..c5e6912d47 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -84,7 +84,7 @@ public class CassandraTestSystemFixture {
             new NaiveThreadIdGuessingAlgorithm(), new 
UpdatableTickingClock(Instant.now()));
 
         eventBus.register(new MailboxAnnotationListener(mapperFactory, 
sessionProvider));
-        eventBus.register(mapperFactory.deleteMessageListener());
+        eventBus.register(mapperFactory.deleteMessageListener(eventBus));
 
         return cassandraMailboxManager;
     }
diff --git 
a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
 
b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
index 88ae462624..5e428aac42 100644
--- 
a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
+++ 
b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
@@ -33,7 +33,7 @@ import org.apache.james.events.Event.EventId
 import org.apache.james.events.{EventSerializer, Event => JavaEvent}
 import org.apache.james.mailbox.MailboxSession.SessionId
 import org.apache.james.mailbox.events.MailboxEvents.Added.IS_APPENDED
-import org.apache.james.mailbox.events.MailboxEvents.{Added => JavaAdded, 
Expunged => JavaExpunged, FlagsUpdated => JavaFlagsUpdated, MailboxACLUpdated 
=> JavaMailboxACLUpdated, MailboxAdded => JavaMailboxAdded, MailboxDeletion => 
JavaMailboxDeletion, MailboxRenamed => JavaMailboxRenamed, 
MailboxSubscribedEvent => JavaMailboxSubscribedEvent, MailboxUnsubscribedEvent 
=> JavaMailboxUnsubscribedEvent, QuotaUsageUpdatedEvent => 
JavaQuotaUsageUpdatedEvent}
+import org.apache.james.mailbox.events.MailboxEvents.{Added => JavaAdded, 
Expunged => JavaExpunged, FlagsUpdated => JavaFlagsUpdated, MailboxACLUpdated 
=> JavaMailboxACLUpdated, MailboxAdded => JavaMailboxAdded, MailboxDeletion => 
JavaMailboxDeletion, MailboxRenamed => JavaMailboxRenamed, 
MailboxSubscribedEvent => JavaMailboxSubscribedEvent, MailboxUnsubscribedEvent 
=> JavaMailboxUnsubscribedEvent, MessageContentDeletionEvent => 
JavaMessageContentDeletionEvent, QuotaUsageUpdatedEvent =>  [...]
 import org.apache.james.mailbox.events.{MessageMoveEvent => 
JavaMessageMoveEvent}
 import org.apache.james.mailbox.model.{MailboxId, MessageId, MessageMoves, 
QuotaRoot, ThreadId, MailboxACL => JavaMailboxACL, MessageMetaData => 
JavaMessageMetaData, Quota => JavaQuota}
 import org.apache.james.mailbox.quota.QuotaRootDeserializer
@@ -133,6 +133,18 @@ private object DTO {
   case class MailboxUnSubscribedEvent(eventId: EventId, mailboxPath: 
MailboxPath, mailboxId: MailboxId, user: Username, sessionId: SessionId) 
extends Event {
     override def toJava: JavaEvent = new 
JavaMailboxUnsubscribedEvent(sessionId, user, mailboxPath.toJava, mailboxId, 
eventId)
   }
+
+  case class MessageContentDeletionEvent(eventId: EventId,
+                                         username: Username,
+                                         mailboxId: MailboxId,
+                                         messageId: MessageId,
+                                         size: Long,
+                                         internalDate: Instant,
+                                         hasAttachments: Boolean,
+                                         headerBlobId: String,
+                                         bodyBlobId: String) extends Event {
+    override def toJava: JavaEvent = new 
JavaMessageContentDeletionEvent(eventId, username, mailboxId, messageId, size, 
internalDate, hasAttachments, headerBlobId, bodyBlobId)
+  }
 }
 
 private object ScalaConverter {
@@ -227,6 +239,17 @@ private object ScalaConverter {
     user = event.getUsername,
     sessionId = event.getSessionId)
 
+  private def toScala(event: JavaMessageContentDeletionEvent): 
DTO.MessageContentDeletionEvent = DTO.MessageContentDeletionEvent(
+      eventId = event.getEventId,
+      username = event.getUsername,
+      mailboxId = event.mailboxId(),
+      messageId = event.messageId(),
+      size = event.size(),
+      internalDate = event.internalDate(),
+      hasAttachments = event.hasAttachments,
+      headerBlobId = event.headerBlobId(),
+      bodyBlobId = event.bodyBlobId())
+
   def toScala(javaEvent: JavaEvent): Event = javaEvent match {
     case e: JavaAdded => toScala(e)
     case e: JavaExpunged => toScala(e)
@@ -239,6 +262,7 @@ private object ScalaConverter {
     case e: JavaQuotaUsageUpdatedEvent => toScala(e)
     case e: JavaMailboxSubscribedEvent => toScala(e)
     case e: JavaMailboxUnsubscribedEvent => toScala(e)
+    case e: JavaMessageContentDeletionEvent => toScala(e)
     case _ => throw new RuntimeException("no Scala conversion known")
   }
 }
diff --git 
a/mailbox/event/json/src/test/java/org/apache/james/event/json/MessageContentDeletionSerializationTest.java
 
b/mailbox/event/json/src/test/java/org/apache/james/event/json/MessageContentDeletionSerializationTest.java
new file mode 100644
index 0000000000..9edb6e5d2d
--- /dev/null
+++ 
b/mailbox/event/json/src/test/java/org/apache/james/event/json/MessageContentDeletionSerializationTest.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.event.json;
+
+import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson;
+import static org.apache.james.event.json.SerializerFixture.EVENT_ID;
+import static org.apache.james.event.json.SerializerFixture.EVENT_SERIALIZER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+
+import org.apache.james.core.Username;
+import 
org.apache.james.mailbox.events.MailboxEvents.MessageContentDeletionEvent;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.Test;
+
+class MessageContentDeletionSerializationTest {
+    private static final Username USERNAME = Username.of("[email protected]");
+    private static final MailboxId MAILBOX_ID = TestId.of(18);
+    private static final MessageId MESSAGE_ID = TestMessageId.of(42);
+    private static final long SIZE = 12345L;
+    private static final Instant INTERNAL_DATE = 
Instant.parse("2024-12-15T08:23:45Z");
+    private static final boolean HAS_ATTACHMENTS = true;
+    private static final String HEADER_BLOB_ID = "header-blob-id";
+    private static final String BODY_BLOB_ID = "body-blob-id";
+
+    private static final MessageContentDeletionEvent EVENT = new 
MessageContentDeletionEvent(
+        EVENT_ID,
+        USERNAME,
+        MAILBOX_ID,
+        MESSAGE_ID,
+        SIZE,
+        INTERNAL_DATE,
+        HAS_ATTACHMENTS,
+        HEADER_BLOB_ID,
+        BODY_BLOB_ID);
+
+    private static final String JSON = """
+        {
+            "MessageContentDeletionEvent": {
+                "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4",
+                "username": "[email protected]",
+                "size": 12345,
+                "hasAttachments": true,
+                "internalDate": "2024-12-15T08:23:45Z",
+                "mailboxId": "18",
+                "headerBlobId": "header-blob-id",
+                "messageId": "42",
+                "bodyBlobId": "body-blob-id"
+            }
+        }
+        """;
+
+    @Test
+    void messageContentDeletionEventShouldBeWellSerialized() {
+        assertThatJson(EVENT_SERIALIZER.toJson(EVENT))
+            .isEqualTo(JSON);
+    }
+
+    @Test
+    void messageContentDeletionEventShouldBeWellDeserialized() {
+        assertThat(EVENT_SERIALIZER.fromJson(JSON).get())
+            .isEqualTo(EVENT);
+    }
+
+}
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
index 2b7a34ac20..c6e46fb0d8 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
@@ -32,11 +32,9 @@ import com.google.common.base.Preconditions;
 
 public class VaultConfiguration {
     public static final VaultConfiguration DEFAULT =
-        new VaultConfiguration(false, false, ChronoUnit.YEARS.getDuration(), 
DefaultMailboxes.RESTORED_MESSAGES);
+        new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), 
DefaultMailboxes.RESTORED_MESSAGES);
     public static final VaultConfiguration ENABLED_DEFAULT =
-        new VaultConfiguration(true, false, ChronoUnit.YEARS.getDuration(), 
DefaultMailboxes.RESTORED_MESSAGES);
-    public static final VaultConfiguration ENABLED_WORKQUEUE =
-        new VaultConfiguration(true, true, ChronoUnit.YEARS.getDuration(), 
DefaultMailboxes.RESTORED_MESSAGES);
+        new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), 
DefaultMailboxes.RESTORED_MESSAGES);
 
     public static VaultConfiguration from(Configuration 
propertiesConfiguration) {
         Duration retentionPeriod = 
Optional.ofNullable(propertiesConfiguration.getString("retentionPeriod"))
@@ -45,18 +43,15 @@ public class VaultConfiguration {
         String restoreLocation = 
Optional.ofNullable(propertiesConfiguration.getString("restoreLocation"))
             .orElse(DEFAULT.getRestoreLocation());
         boolean enabled = propertiesConfiguration.getBoolean("enabled", false);
-        boolean workQueueEnabled = 
propertiesConfiguration.getBoolean("workQueueEnabled", false);
-        return new VaultConfiguration(enabled, workQueueEnabled, 
retentionPeriod, restoreLocation);
+        return new VaultConfiguration(enabled, retentionPeriod, 
restoreLocation);
     }
 
     private final boolean enabled;
-    private final boolean workQueueEnabled;
     private final Duration retentionPeriod;
     private final String restoreLocation;
 
-    VaultConfiguration(boolean enabled, boolean workQueueEnabled, Duration 
retentionPeriod, String restoreLocation) {
+    VaultConfiguration(boolean enabled, Duration retentionPeriod, String 
restoreLocation) {
         this.enabled = enabled;
-        this.workQueueEnabled = workQueueEnabled;
         Preconditions.checkNotNull(retentionPeriod);
         Preconditions.checkNotNull(restoreLocation);
 
@@ -68,10 +63,6 @@ public class VaultConfiguration {
         return enabled;
     }
 
-    public boolean isWorkQueueEnabled() {
-        return workQueueEnabled;
-    }
-
     public Duration getRetentionPeriod() {
         return retentionPeriod;
     }
@@ -87,14 +78,13 @@ public class VaultConfiguration {
 
             return Objects.equals(this.retentionPeriod, that.retentionPeriod)
                 && Objects.equals(this.restoreLocation, that.restoreLocation)
-                && Objects.equals(this.enabled, that.enabled)
-                && Objects.equals(this.workQueueEnabled, 
that.workQueueEnabled);
+                && Objects.equals(this.enabled, that.enabled);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(retentionPeriod, restoreLocation, enabled, 
workQueueEnabled);
+        return Objects.hash(retentionPeriod, restoreLocation, enabled);
     }
 }
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
index 6fe39dd648..656af119d6 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
@@ -40,13 +40,13 @@ class VaultConfigurationTest {
 
     @Test
     void constructorShouldThrowWhenRetentionPeriodIsNull() {
-        assertThatThrownBy(() -> new VaultConfiguration(true, false, null, 
DefaultMailboxes.RESTORED_MESSAGES))
+        assertThatThrownBy(() -> new VaultConfiguration(true, null, 
DefaultMailboxes.RESTORED_MESSAGES))
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     void constructorShouldThrowWhenRestoreLocationIsNull() {
-        assertThatThrownBy(() -> new VaultConfiguration(true, false, 
ChronoUnit.YEARS.getDuration(), null))
+        assertThatThrownBy(() -> new VaultConfiguration(true, 
ChronoUnit.YEARS.getDuration(), null))
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -62,7 +62,7 @@ class VaultConfigurationTest {
         configuration.addProperty("restoreLocation", "INBOX");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, false, 
ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
+            new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), 
DefaultMailboxes.INBOX));
     }
 
     @Test
@@ -71,7 +71,7 @@ class VaultConfigurationTest {
         configuration.addProperty("retentionPeriod", "15d");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, false, Duration.ofDays(15), 
DefaultMailboxes.RESTORED_MESSAGES));
+            new VaultConfiguration(false, Duration.ofDays(15), 
DefaultMailboxes.RESTORED_MESSAGES));
     }
 
     @Test
@@ -80,7 +80,7 @@ class VaultConfigurationTest {
         configuration.addProperty("retentionPeriod", "15h");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, false, Duration.ofHours(15), 
DefaultMailboxes.RESTORED_MESSAGES));
+            new VaultConfiguration(false, Duration.ofHours(15), 
DefaultMailboxes.RESTORED_MESSAGES));
     }
 
     @Test
@@ -89,7 +89,7 @@ class VaultConfigurationTest {
         configuration.addProperty("retentionPeriod", "15");
 
         assertThat(VaultConfiguration.from(configuration)).isEqualTo(
-            new VaultConfiguration(false, false, Duration.ofDays(15), 
DefaultMailboxes.RESTORED_MESSAGES));
+            new VaultConfiguration(false, Duration.ofDays(15), 
DefaultMailboxes.RESTORED_MESSAGES));
     }
 
     @Test
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
index 0a248d0cc0..2c19a0b7cb 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
@@ -52,6 +52,7 @@ import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.Quota;
 import org.apache.james.mailbox.model.QuotaRoot;
@@ -95,6 +96,11 @@ public class EventFactory {
         T mailboxId(MailboxId mailboxId);
     }
 
+    @FunctionalInterface
+    public interface RequireMessageId<T> {
+        T messageId(MessageId messageId);
+    }
+
     @FunctionalInterface
     public interface RequirePath<T> {
         T mailboxPath(MailboxPath path);
@@ -197,6 +203,26 @@ public class EventFactory {
         T instant(Instant instant);
     }
 
+    @FunctionalInterface
+    public interface RequireSize<T> {
+        T size(long size);
+    }
+
+    @FunctionalInterface
+    public interface RequireHasAttachments<T> {
+        T hasAttachments(boolean hasAttachments);
+    }
+
+    @FunctionalInterface
+    public interface RequireHeaderBlobId<T> {
+        T headerBlobId(String headerBlobId);
+    }
+
+    @FunctionalInterface
+    public interface RequireBodyBlobId<T> {
+        T bodyBlobId(String bodyBlobId);
+    }
+
     @FunctionalInterface
     public interface RequireMailboxEvent<T> extends 
RequireEventId<RequireSession<RequireMailbox<T>>> {}
 
@@ -520,6 +546,58 @@ public class EventFactory {
         }
     }
 
+    public static final class MessageContentDeletionFinalStage {
+        private final Event.EventId eventId;
+        private final Username username;
+        private final MailboxId mailboxId;
+        private final MessageId messageId;
+        private final long size;
+        private final Instant internalDate;
+        private final boolean hasAttachments;
+        private final String headerBlobId;
+        private final String bodyBlobId;
+
+        MessageContentDeletionFinalStage(Event.EventId eventId,
+                                         Username username,
+                                         MailboxId mailboxId,
+                                         MessageId messageId,
+                                         long size,
+                                         Instant internalDate,
+                                         boolean hasAttachments,
+                                         String headerBlobId,
+                                         String bodyBlobId) {
+            this.eventId = eventId;
+            this.username = username;
+            this.mailboxId = mailboxId;
+            this.messageId = messageId;
+            this.size = size;
+            this.internalDate = internalDate;
+            this.hasAttachments = hasAttachments;
+            this.headerBlobId = headerBlobId;
+            this.bodyBlobId = bodyBlobId;
+        }
+
+        public MailboxEvents.MessageContentDeletionEvent build() {
+            Preconditions.checkNotNull(username);
+            Preconditions.checkNotNull(mailboxId);
+            Preconditions.checkNotNull(messageId);
+            Preconditions.checkNotNull(internalDate);
+            Preconditions.checkNotNull(headerBlobId);
+            Preconditions.checkNotNull(bodyBlobId);
+
+            return new MailboxEvents.MessageContentDeletionEvent(
+                eventId,
+                username,
+                mailboxId,
+                messageId,
+                size,
+                internalDate,
+                hasAttachments,
+                headerBlobId,
+                bodyBlobId);
+        }
+    }
+
     public static class MailboxSubscribedFinalStage {
         private final Event.EventId eventId;
         private final MailboxPath path;
@@ -606,6 +684,11 @@ public class EventFactory {
         return eventId -> user -> quotaRoot -> quotaCount -> quotaSize -> 
instant -> new QuotaUsageUpdatedFinalStage(eventId, user, quotaRoot, 
quotaCount, quotaSize, instant);
     }
 
+    public static 
RequireEventId<RequireUser<RequireMailboxId<RequireMessageId<RequireSize<RequireInstant<RequireHasAttachments<RequireHeaderBlobId<RequireBodyBlobId<MessageContentDeletionFinalStage>>>>>>>>>
 messageContentDeleted() {
+        return eventId -> user -> mailboxId -> messageId -> size -> instant -> 
hasAttachments -> headerBlobId -> bodyBlobId ->
+            new MessageContentDeletionFinalStage(eventId, user, mailboxId, 
messageId, size, instant, hasAttachments, headerBlobId, bodyBlobId);
+    }
+
     public static RequireMailboxEvent<MailboxSubscribedFinalStage> 
mailboxSubscribed() {
         return eventId -> user -> sessionId -> mailboxId -> path -> new 
MailboxSubscribedFinalStage(eventId, path, mailboxId, user, sessionId);
     }
diff --git 
a/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
 
b/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
index 71b344ad27..2e4d1ed4e2 100644
--- 
a/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
+++ 
b/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
@@ -2,10 +2,6 @@
 
 enabled=false
 
-# Enable work queue to be used with deleted message vault
-# Default to false
-# workQueueEnabled=false
-
 # Retention period for your deleted messages into the vault, after which they 
expire and can be potentially cleaned up
 # Optional, default 1y
 # retentionPeriod=1y
diff --git 
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
 
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 6d0ae07e4e..9d17f58a2d 100644
--- 
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ 
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -50,10 +50,10 @@ import 
org.apache.james.modules.data.CassandraSieveQuotaModule;
 import org.apache.james.modules.data.CassandraSieveRepositoryModule;
 import org.apache.james.modules.data.CassandraUsersRepositoryModule;
 import org.apache.james.modules.data.CassandraVacationModule;
+import org.apache.james.modules.event.ContentDeletionEventBusModule;
 import org.apache.james.modules.event.JMAPEventBusModule;
 import org.apache.james.modules.event.MailboxEventBusModule;
 import org.apache.james.modules.eventstore.CassandraEventStoreModule;
-import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.CassandraMailboxModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
@@ -182,6 +182,7 @@ public class CassandraRabbitMQJamesServerMain implements 
JamesServerMain {
     protected static final Module MODULES = 
Modules.override(REQUIRE_TASK_MANAGER_MODULE, new 
DistributedTaskManagerModule())
         .with(new RabbitMQModule(),
             new MailboxEventBusModule(),
+            new ContentDeletionEventBusModule(),
             new DistributedTaskSerializationModule());
 
     public static void main(String[] args) throws Exception {
@@ -232,14 +233,9 @@ public class CassandraRabbitMQJamesServerMain implements 
JamesServerMain {
     }
 
     private static Module chooseDeletedMessageVault(VaultConfiguration 
vaultConfiguration) {
-        if (vaultConfiguration.isEnabled() && 
vaultConfiguration.isWorkQueueEnabled()) {
-            return Modules.combine(
-                new DistributedDeletedMessageVaultModule(),
-                new DeletedMessageVaultRoutesModule());
-        }
         if (vaultConfiguration.isEnabled()) {
             return Modules.combine(
-                new CassandraDeletedMessageVaultModule(),
+                new DistributedDeletedMessageVaultModule(),
                 new DeletedMessageVaultRoutesModule());
         }
         return binder -> {
diff --git 
a/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
 
b/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
index 40873db544..47ec56c5c4 100644
--- 
a/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
+++ 
b/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
@@ -2,10 +2,6 @@
 
 enabled=false
 
-# Enable work queue to be used with deleted message vault
-# Default to false
-# workQueueEnabled=false
-
 # Retention period for your deleted messages into the vault, after which they 
expire and can be potentially cleaned up
 # Optional, default 1y
 # retentionPeriod=1y
diff --git 
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
 
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index c1247e8474..e858004f4a 100644
--- 
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++ 
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -56,11 +56,11 @@ import 
org.apache.james.modules.data.CassandraSieveQuotaModule;
 import org.apache.james.modules.data.CassandraSieveRepositoryModule;
 import org.apache.james.modules.data.CassandraUsersRepositoryModule;
 import org.apache.james.modules.data.CassandraVacationModule;
+import org.apache.james.modules.event.ContentDeletionEventBusModule;
 import org.apache.james.modules.event.JMAPEventBusModule;
 import org.apache.james.modules.event.MailboxEventBusModule;
 import org.apache.james.modules.eventstore.CassandraEventStoreModule;
 import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule;
-import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.CassandraMailboxModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
@@ -176,6 +176,7 @@ public class DistributedPOP3JamesServerMain implements 
JamesServerMain {
             new RabbitMQMailQueueModule(),
             new RabbitMailQueueRoutesModule(),
             new MailboxEventBusModule(),
+            new ContentDeletionEventBusModule(),
             new DistributedTaskSerializationModule());
 
     public static void main(String[] args) throws Exception {
@@ -219,15 +220,10 @@ public class DistributedPOP3JamesServerMain implements 
JamesServerMain {
     }
 
     private static Module chooseDeletedMessageVault(VaultConfiguration 
vaultConfiguration) {
-        if (vaultConfiguration.isEnabled() && 
vaultConfiguration.isWorkQueueEnabled()) {
-            return Modules.combine(
-                new DistributedDeletedMessageVaultModule(),
-                new DeletedMessageVaultRoutesModule());
-        }
         if (vaultConfiguration.isEnabled()) {
             return Modules.combine(
                 new DistributedDeletedMessageVaultModule(),
-                new CassandraDeletedMessageVaultModule());
+                new DeletedMessageVaultRoutesModule());
         }
         return binder -> {
 
diff --git 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 3d7df65509..0ebd94cca6 100644
--- 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++ 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -48,6 +48,7 @@ import org.apache.james.modules.data.PostgresEventStoreModule;
 import org.apache.james.modules.data.PostgresUsersRepositoryModule;
 import org.apache.james.modules.data.PostgresVacationModule;
 import org.apache.james.modules.data.SievePostgresRepositoryModules;
+import org.apache.james.modules.event.ContentDeletionEventBusModule;
 import org.apache.james.modules.event.JMAPEventBusModule;
 import org.apache.james.modules.event.MailboxEventBusModule;
 import org.apache.james.modules.events.PostgresDeadLetterModule;
@@ -232,6 +233,7 @@ public class PostgresJamesServerMain implements 
JamesServerMain {
             case RABBITMQ:
                 return List.of(
                     Modules.override(new DefaultEventModule()).with(new 
MailboxEventBusModule()),
+                    new ContentDeletionEventBusModule(),
                     new RabbitMQModule(),
                     new RabbitMQMailQueueModule(),
                     new FakeMailQueueViewModule(),
diff --git 
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
 
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
index 083596f435..4a53aa0b20 100644
--- 
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
+++ 
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
@@ -19,11 +19,14 @@
 
 package org.apache.james.modules.data;
 
+import static 
org.apache.james.mailbox.cassandra.DeleteMessageListener.CONTENT_DELETION;
+
 import java.io.FileNotFoundException;
 
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
 import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.events.EventListener;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.dto.EventDTO;
@@ -54,14 +57,13 @@ import 
org.apache.james.jmap.cassandra.projections.CassandraEmailQueryView;
 import 
org.apache.james.jmap.cassandra.projections.CassandraEmailQueryViewDataDefinition;
 import 
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjection;
 import 
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjectionDataDefinition;
-import 
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjectionDeletionCallback;
+import 
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjectionDeletionListener;
 import 
org.apache.james.jmap.cassandra.pushsubscription.CassandraPushSubscriptionDataDefinition;
 import 
org.apache.james.jmap.cassandra.pushsubscription.CassandraPushSubscriptionRepository;
 import org.apache.james.jmap.cassandra.upload.CassandraUploadRepository;
 import org.apache.james.jmap.cassandra.upload.CassandraUploadUsageRepository;
 import org.apache.james.jmap.cassandra.upload.UploadDAO;
 import org.apache.james.jmap.cassandra.upload.UploadDataDefinition;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
 import org.apache.james.user.api.DeleteUserDataTaskStep;
 import org.apache.james.user.api.UsernameChangeTaskStep;
 import org.apache.james.utils.PropertiesProvider;
@@ -72,6 +74,7 @@ import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
 
 public class CassandraJmapModule extends AbstractModule {
     @Override
@@ -116,9 +119,9 @@ public class CassandraJmapModule extends AbstractModule {
         
eventDTOModuleBinder.addBinding().toInstance(FilteringRuleSetDefineDTOModules.FILTERING_RULE_SET_DEFINED);
         
eventDTOModuleBinder.addBinding().toInstance(FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT);
 
-        Multibinder.newSetBinder(binder(), 
DeleteMessageListener.DeletionCallback.class)
+        Multibinder.newSetBinder(binder(), 
EventListener.ReactiveGroupEventListener.class, Names.named(CONTENT_DELETION))
             .addBinding()
-            .to(CassandraMessageFastViewProjectionDeletionCallback.class);
+            .to(CassandraMessageFastViewProjectionDeletionListener.class);
 
         Multibinder.newSetBinder(binder(), UsernameChangeTaskStep.class)
             .addBinding()
diff --git 
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraDeletedMessageVaultModule.java
 
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraDeletedMessageVaultModule.java
deleted file mode 100644
index 58fb3951c5..0000000000
--- 
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraDeletedMessageVaultModule.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.modules.vault.DeletedMessageVaultModule;
-import org.apache.james.vault.DeletedMessageVault;
-import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
-import org.apache.james.vault.blob.BucketNameGenerator;
-import 
org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
-import org.apache.james.vault.metadata.CassandraDeletedMessageMetadataVault;
-import org.apache.james.vault.metadata.DeletedMessageMetadataDataDefinition;
-import org.apache.james.vault.metadata.DeletedMessageMetadataVault;
-import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
-import org.apache.james.vault.metadata.MetadataDAO;
-import org.apache.james.vault.metadata.StorageInformationDAO;
-import org.apache.james.vault.metadata.UserPerBucketDAO;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Scopes;
-import com.google.inject.multibindings.Multibinder;
-
-public class CassandraDeletedMessageVaultModule extends AbstractModule {
-
-    @Override
-    protected void configure() {
-        install(new DeletedMessageVaultModule());
-
-        Multibinder<CassandraDataDefinition> cassandraDataDefinitions = 
Multibinder.newSetBinder(binder(), CassandraDataDefinition.class);
-        cassandraDataDefinitions
-            .addBinding()
-            .toInstance(DeletedMessageMetadataDataDefinition.MODULE);
-
-        bind(MetadataDAO.class).in(Scopes.SINGLETON);
-        bind(StorageInformationDAO.class).in(Scopes.SINGLETON);
-        bind(UserPerBucketDAO.class).in(Scopes.SINGLETON);
-        
bind(DeletedMessageWithStorageInformationConverter.class).in(Scopes.SINGLETON);
-
-        bind(CassandraDeletedMessageMetadataVault.class).in(Scopes.SINGLETON);
-        bind(DeletedMessageMetadataVault.class)
-            .to(CassandraDeletedMessageMetadataVault.class);
-
-        bind(BucketNameGenerator.class).in(Scopes.SINGLETON);
-        bind(BlobStoreDeletedMessageVault.class).in(Scopes.SINGLETON);
-        bind(DeletedMessageVault.class)
-            .to(BlobStoreDeletedMessageVault.class);
-
-        Multibinder.newSetBinder(binder(), 
DeleteMessageListener.DeletionCallback.class)
-            .addBinding()
-            .to(DeletedMessageVaultDeletionCallback.class);
-    }
-}
diff --git 
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
 
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
index 48c6d32c08..d71e7e44ad 100644
--- 
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
+++ 
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
@@ -251,7 +251,6 @@ public class CassandraMailboxModule extends AbstractModule {
             .addBinding().to(MailboxSubscriptionListener.class);
         Multibinder.newSetBinder(binder(), 
EventListener.ReactiveGroupEventListener.class)
             .addBinding().to(DeleteMessageListener.class);
-        Multibinder.newSetBinder(binder(), 
DeleteMessageListener.DeletionCallback.class);
 
         
bind(MailboxManager.class).annotatedWith(Names.named(MAILBOXMANAGER_NAME)).to(MailboxManager.class);
 
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
new file mode 100644
index 0000000000..72a4cc07ae
--- /dev/null
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
@@ -0,0 +1,161 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.event;
+
+import static 
org.apache.james.events.NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY;
+
+import java.util.Set;
+
+import jakarta.inject.Named;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.event.json.MailboxEventSerializer;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventBus;
+import org.apache.james.events.EventBusId;
+import org.apache.james.events.EventBusReconnectionHandler;
+import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
+import org.apache.james.events.GroupRegistrationHandler;
+import org.apache.james.events.KeyReconnectionHandler;
+import org.apache.james.events.RabbitEventBusConsumerHealthCheck;
+import 
org.apache.james.events.RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck;
+import org.apache.james.events.RabbitMQEventBus;
+import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.RoutingKeyConverter;
+import org.apache.james.jmap.change.Factory;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+import org.reactivestreams.Publisher;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Names;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
+
+public class ContentDeletionEventBusModule extends AbstractModule {
+    public static class NoopListener implements 
EventListener.ReactiveGroupEventListener {
+        public static class NoopListenerGroup extends Group {
+
+        }
+
+        @Override
+        public Group getDefaultGroup() {
+            return new NoopListenerGroup();
+        }
+
+        @Override
+        public Publisher<Void> reactiveEvent(Event event) {
+            return Mono.empty();
+        }
+
+        @Override
+        public boolean isHandling(Event event) {
+            return false;
+        }
+    }
+
+    public static final String CONTENT_DELETION = "contentDeletion";
+
+    @Override
+    protected void configure() {
+        
bind(EventBusId.class).annotatedWith(Names.named(CONTENT_DELETION)).toInstance(EventBusId.random());
+
+        Multibinder.newSetBinder(binder(), 
EventListener.ReactiveGroupEventListener.class, 
Names.named(DeleteMessageListener.CONTENT_DELETION));
+    }
+
+    @ProvidesIntoSet
+    InitializationOperation workQueue(@Named(CONTENT_DELETION) 
RabbitMQEventBus instance,
+                                      @Named(CONTENT_DELETION) 
Set<EventListener.ReactiveGroupEventListener> contentDeletionListeners) {
+        return InitilizationOperationBuilder
+            .forClass(RabbitMQEventBus.class)
+            .init(() -> {
+                instance.start();
+                contentDeletionListeners.forEach(instance::register);
+
+                // workaround for Postgres app to make the content deletion 
queue created and pass tests
+                // TODO remove after refactoring JAMES-4154 for Postgres app
+                instance.register(new NoopListener());
+            });
+    }
+
+    @ProvidesIntoSet
+    SimpleConnectionPool.ReconnectionHandler 
provideReconnectionHandler(@Named(CONTENT_DELETION) RabbitMQEventBus eventBus) {
+        return new EventBusReconnectionHandler(eventBus);
+    }
+
+    @ProvidesIntoSet
+    SimpleConnectionPool.ReconnectionHandler 
provideReconnectionHandler(@Named(CONTENT_DELETION) EventBusId eventBusId, 
RabbitMQConfiguration configuration) {
+        return new KeyReconnectionHandler(CONTENT_DELETION_NAMING_STRATEGY, 
eventBusId, configuration);
+    }
+
+    @ProvidesIntoSet
+    HealthCheck healthCheck(@Named(CONTENT_DELETION) RabbitMQEventBus eventBus,
+                            SimpleConnectionPool connectionPool) {
+        return new RabbitEventBusConsumerHealthCheck(eventBus, 
CONTENT_DELETION_NAMING_STRATEGY, connectionPool,
+            GroupRegistrationHandler.GROUP);
+    }
+
+    @ProvidesIntoSet
+    HealthCheck 
contentDeletionEventBusDeadLetterQueueHealthCheck(RabbitMQConfiguration 
rabbitMQConfiguration) {
+        return new 
RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck(rabbitMQConfiguration);
+    }
+
+    @Provides
+    @Singleton
+    @Named(CONTENT_DELETION)
+    RabbitMQEventBus provideContentDeletionEventBus(Sender sender, 
ReceiverProvider receiverProvider,
+                                                    MailboxEventSerializer 
eventSerializer,
+                                                    RetryBackoffConfiguration 
retryBackoffConfiguration,
+                                                    EventDeadLetters 
eventDeadLetters,
+                                                    MetricFactory 
metricFactory, ReactorRabbitMQChannelPool channelPool,
+                                                    @Named(CONTENT_DELETION) 
EventBusId eventBusId,
+                                                    RabbitMQConfiguration 
configuration) {
+        return new RabbitMQEventBus(
+            CONTENT_DELETION_NAMING_STRATEGY,
+            sender, receiverProvider, eventSerializer, 
retryBackoffConfiguration, new RoutingKeyConverter(ImmutableSet.of(new 
Factory())),
+            eventDeadLetters, metricFactory, channelPool, eventBusId, 
configuration);
+    }
+
+    @Provides
+    @Singleton
+    @Named(CONTENT_DELETION)
+    EventBus provideContentDeletionEventBus(@Named(CONTENT_DELETION) 
RabbitMQEventBus rabbitMQEventBus) {
+        return rabbitMQEventBus;
+    }
+
+    @ProvidesIntoSet
+    EventBus registerEventBus(@Named(CONTENT_DELETION) EventBus eventBus) {
+        return eventBus;
+    }
+}
diff --git 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultDeletionListener.java
similarity index 66%
rename from 
mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
rename to 
server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultDeletionListener.java
index 40b6bf02f4..f80eb04e26 100644
--- 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultDeletionListener.java
@@ -17,7 +17,7 @@
  * under the License.                                           *
  ****************************************************************/
 
-package org.apache.james.vault.metadata;
+package org.apache.james.modules.mailbox;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -31,10 +31,14 @@ import java.util.Set;
 
 import jakarta.inject.Inject;
 
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.MaybeSender;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
+import 
org.apache.james.mailbox.events.MailboxEvents.MessageContentDeletionEvent;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mime4j.MimeIOException;
 import org.apache.james.mime4j.codec.DecodeMonitor;
@@ -45,6 +49,7 @@ import org.apache.james.mime4j.stream.MimeConfig;
 import org.apache.james.server.core.Envelope;
 import org.apache.james.vault.DeletedMessage;
 import org.apache.james.vault.DeletedMessageVault;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,46 +57,70 @@ import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
 
-public class DeletedMessageVaultDeletionCallback implements 
DeleteMessageListener.DeletionCallback {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DeletedMessageVaultDeletionCallback.class);
+public class DeletedMessageVaultDeletionListener implements 
EventListener.ReactiveGroupEventListener {
+    public static class DeletedMessageVaultListenerGroup extends Group {
 
+    }
+
+    private static final Group DELETED_MESSAGE_VAULT_DELETION_GROUP = new 
DeletedMessageVaultListenerGroup();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DeletedMessageVaultDeletionListener.class);
 
+    private final BlobId.Factory blobIdFactory;
     private final DeletedMessageVault deletedMessageVault;
     private final BlobStore blobStore;
     private final Clock clock;
 
     @Inject
-    public DeletedMessageVaultDeletionCallback(DeletedMessageVault 
deletedMessageVault, BlobStore blobStore, Clock clock) {
+    public DeletedMessageVaultDeletionListener(BlobId.Factory blobIdFactory, 
DeletedMessageVault deletedMessageVault,
+                                               BlobStore blobStore, Clock 
clock) {
+        this.blobIdFactory = blobIdFactory;
         this.deletedMessageVault = deletedMessageVault;
         this.blobStore = blobStore;
         this.clock = clock;
     }
 
     @Override
-    public Mono<Void> 
forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
-        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), 
copyCommand.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
+    public Group getDefaultGroup() {
+        return DELETED_MESSAGE_VAULT_DELETION_GROUP;
+    }
+
+    @Override
+    public boolean isHandling(Event event) {
+        return event instanceof MessageContentDeletionEvent;
+    }
+
+    @Override
+    public Publisher<Void> reactiveEvent(Event event) {
+        if (event instanceof MessageContentDeletionEvent contentDeletionEvent) 
{
+            return forMessage(contentDeletionEvent);
+        }
+
+        return Mono.empty();
+    }
+
+    public Mono<Void> forMessage(MessageContentDeletionEvent 
messageContentDeletionEvent) {
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), 
blobIdFactory.parse(messageContentDeletionEvent.headerBlobId()), 
BlobStore.StoragePolicy.LOW_COST))
             .flatMap(bytes -> {
-                Optional<Message> mimeMessage = parseMessage(new 
ByteArrayInputStream(bytes), copyCommand.getMessageId());
+                Optional<Message> mimeMessage = parseMessage(new 
ByteArrayInputStream(bytes), messageContentDeletionEvent.messageId());
                 DeletedMessage deletedMessage = DeletedMessage.builder()
-                    .messageId(copyCommand.getMessageId())
-                    .originMailboxes(copyCommand.getMailboxId())
-                    .user(copyCommand.getOwner())
-                    
.deliveryDate(ZonedDateTime.ofInstant(copyCommand.getInternalDate().toInstant(),
 ZoneOffset.UTC))
+                    .messageId(messageContentDeletionEvent.messageId())
+                    .originMailboxes(messageContentDeletionEvent.mailboxId())
+                    .user(messageContentDeletionEvent.getUsername())
+                    
.deliveryDate(ZonedDateTime.ofInstant(messageContentDeletionEvent.internalDate(),
 ZoneOffset.UTC))
                     .deletionDate(ZonedDateTime.ofInstant(clock.instant(), 
ZoneOffset.UTC))
                     .sender(retrieveSender(mimeMessage))
                     .recipients(retrieveRecipients(mimeMessage))
-                    .hasAttachment(copyCommand.hasAttachments())
-                    .size(copyCommand.getSize())
+                    
.hasAttachment(messageContentDeletionEvent.hasAttachments())
+                    .size(messageContentDeletionEvent.size())
                     .subject(mimeMessage.map(Message::getSubject))
                     .build();
 
-                return 
Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), 
copyCommand.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
+                return 
Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), 
blobIdFactory.parse(messageContentDeletionEvent.bodyBlobId()), 
BlobStore.StoragePolicy.LOW_COST))
                     .map(bodyStream -> new SequenceInputStream(new 
ByteArrayInputStream(bytes), bodyStream))
                     .flatMap(bodyStream -> 
Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
             });
     }
 
-
     private Optional<Message> parseMessage(InputStream inputStream, MessageId 
messageId) {
         DefaultMessageBuilder messageBuilder = new DefaultMessageBuilder();
         messageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
deleted file mode 100644
index 0893a66d34..0000000000
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import jakarta.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
-import org.reactivestreams.Publisher;
-
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-
-public class DeletedMessageVaultWorkQueueReconnectionHandler implements 
SimpleConnectionPool.ReconnectionHandler {
-    private final DistributedDeletedMessageVaultDeletionCallback 
distributedDeletedMessageVaultDeletionCallback;
-
-    @Inject
-    public 
DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback
 distributedDeletedMessageVaultDeletionCallback) {
-        this.distributedDeletedMessageVaultDeletionCallback = 
distributedDeletedMessageVaultDeletionCallback;
-    }
-
-    @Override
-    public Publisher<Void> handleReconnection(Connection connection) {
-        return 
Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart);
-    }
-}
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
deleted file mode 100644
index 919c6331f5..0000000000
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
-import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
-import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
-import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-
-import java.time.Duration;
-import java.util.Date;
-import java.util.Optional;
-
-import jakarta.annotation.PreDestroy;
-import jakarta.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.ReceiverProvider;
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.core.Username;
-import org.apache.james.lifecycle.api.Startable;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.rabbitmq.client.AMQP;
-
-import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.ExchangeSpecification;
-import reactor.rabbitmq.OutboundMessage;
-import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.Sender;
-
-public class DistributedDeletedMessageVaultDeletionCallback implements 
DeleteMessageListener.DeletionCallback, Startable {
-    public static final Logger LOGGER = 
LoggerFactory.getLogger(DistributedDeletedMessageVaultDeletionCallback.class);
-
-    private static class CopyCommandDTO {
-        public static CopyCommandDTO 
of(DeleteMessageListener.DeletedMessageCopyCommand command) {
-            return new CopyCommandDTO(
-                command.getMessageId().serialize(),
-                command.getMailboxId().serialize(),
-                command.getOwner().asString(),
-                command.getInternalDate(),
-                command.getSize(),
-                command.hasAttachments(),
-                command.getHeaderId().asString(),
-                command.getBodyId().asString());
-        }
-
-        private final String messageId;
-        private final String mailboxId;
-        private final String owner;
-        private final Date internalDate;
-        private final long size;
-        private final boolean hasAttachments;
-        private final String headerId;
-        private final String bodyId;
-
-        @JsonCreator
-        public CopyCommandDTO(@JsonProperty("messageId") String messageId,
-                              @JsonProperty("mailboxId") String mailboxId,
-                              @JsonProperty("owner") String owner,
-                              @JsonProperty("internalDate") Date internalDate,
-                              @JsonProperty("size") long size,
-                              @JsonProperty("hasAttachments") boolean 
hasAttachments,
-                              @JsonProperty("headerId") String headerId,
-                              @JsonProperty("bodyId") String bodyId) {
-            this.messageId = messageId;
-            this.mailboxId = mailboxId;
-            this.owner = owner;
-            this.internalDate = internalDate;
-            this.size = size;
-            this.hasAttachments = hasAttachments;
-            this.headerId = headerId;
-            this.bodyId = bodyId;
-        }
-
-        public String getMessageId() {
-            return messageId;
-        }
-
-        public String getMailboxId() {
-            return mailboxId;
-        }
-
-        public String getOwner() {
-            return owner;
-        }
-
-        public Date getInternalDate() {
-            return internalDate;
-        }
-
-        public long getSize() {
-            return size;
-        }
-
-        public boolean isHasAttachments() {
-            return hasAttachments;
-        }
-
-        public String getHeaderId() {
-            return headerId;
-        }
-
-        public String getBodyId() {
-            return bodyId;
-        }
-
-        @JsonIgnore
-        DeleteMessageListener.DeletedMessageCopyCommand 
asPojo(MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, 
BlobId.Factory blobIdFactory) {
-            return new 
DeleteMessageListener.DeletedMessageCopyCommand(messageIdFactory.fromString(messageId),
-                mailboxIdFactory.fromString(messageId),
-                Username.of(owner),
-                internalDate,
-                size,
-                hasAttachments,
-                blobIdFactory.parse(headerId),
-                blobIdFactory.parse(bodyId));
-        }
-    }
-
-    private static final String EXCHANGE = "deleted-message-vault";
-    private static final String QUEUE = "deleted-message-vault-work-queue";
-    private static final String DEAD_LETTER = QUEUE + "-dead-letter";
-    private static final boolean REQUEUE = true;
-    private static final int QOS = 5;
-
-    private final ReactorRabbitMQChannelPool channelPool;
-    private final RabbitMQConfiguration rabbitMQConfiguration;
-    private final DeletedMessageVaultDeletionCallback callback;
-    private final Sender sender;
-    private final ObjectMapper objectMapper;
-    private final MailboxId.Factory mailboxIdFactory;
-    private final MessageId.Factory messageIdFactory;
-    private final BlobId.Factory blobIdFactory;
-    private final ReceiverProvider receiverProvider;
-    private Disposable disposable;
-
-    @Inject
-    public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
-                                                          
ReactorRabbitMQChannelPool channelPool,
-                                                          
RabbitMQConfiguration rabbitMQConfiguration,
-                                                          
DeletedMessageVaultDeletionCallback callback,
-                                                          MailboxId.Factory 
mailboxIdFactory,
-                                                          MessageId.Factory 
messageIdFactory,
-                                                          BlobId.Factory 
blobIdFactory,
-                                                          ReceiverProvider 
receiverProvider) {
-        this.sender = sender;
-        this.rabbitMQConfiguration = rabbitMQConfiguration;
-        this.callback = callback;
-        this.mailboxIdFactory = mailboxIdFactory;
-        this.messageIdFactory = messageIdFactory;
-        this.blobIdFactory = blobIdFactory;
-        this.objectMapper = new ObjectMapper();
-        this.channelPool = channelPool;
-        this.receiverProvider = receiverProvider;
-    }
-
-    public void init() {
-        Flux.concat(
-                sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE)
-                    .durable(DURABLE)
-                    .type(DIRECT_EXCHANGE)),
-                sender.declareQueue(QueueSpecification.queue(DEAD_LETTER)
-                    .durable(DURABLE)
-                    .exclusive(!EXCLUSIVE)
-                    .autoDelete(!AUTO_DELETE)
-                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
-                        .deadLetter(DEAD_LETTER)
-                        .build())),
-                sender.declareQueue(QueueSpecification.queue(QUEUE)
-                    .durable(DURABLE)
-                    .exclusive(!EXCLUSIVE)
-                    .autoDelete(!AUTO_DELETE)
-                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
-                        .deadLetter(DEAD_LETTER)
-                        .build())),
-                sender.bind(BindingSpecification.binding()
-                    .exchange(EXCHANGE)
-                    .queue(QUEUE)
-                    .routingKey(EMPTY_ROUTING_KEY)))
-            .then()
-            .block();
-
-        disposable = consumeDeletedMessageVaultWorkQueue();
-    }
-
-    private Disposable consumeDeletedMessageVaultWorkQueue() {
-        return Flux.using(
-                receiverProvider::createReceiver,
-                receiver -> receiver.consumeManualAck(QUEUE, new 
ConsumeOptions().qos(QOS)),
-                Receiver::close)
-            .flatMap(this::handleMessage, QOS)
-            .subscribeOn(Schedulers.boundedElastic())
-            .subscribe();
-    }
-
-    public void restart() {
-        Disposable previousConsumer = disposable;
-        disposable = consumeDeletedMessageVaultWorkQueue();
-        previousConsumer.dispose();
-    }
-
-    @PreDestroy
-    public void stop() {
-        Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
-    }
-
-    private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
-        try {
-            CopyCommandDTO copyCommandDTO = 
objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
-
-            return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, 
messageIdFactory, blobIdFactory))
-                .timeout(Duration.ofMinutes(5))
-                .doOnSuccess(any -> delivery.ack())
-                .doOnCancel(() -> delivery.nack(REQUEUE))
-                .onErrorResume(e -> {
-                    LOGGER.error("Failed executing deletion callback for {}", 
copyCommandDTO.messageId, e);
-                    delivery.nack(REQUEUE);
-                    return Mono.empty();
-                });
-        } catch (Exception e) {
-            LOGGER.error("Deserialization error: reject poisonous message for 
distributed Deleted message vault callback", e);
-            // Deserialization error: reject poisonous messages
-            delivery.nack(!REQUEUE);
-            return Mono.empty();
-        }
-    }
-
-    @Override
-    public Mono<Void> 
forMessage(DeleteMessageListener.DeletedMessageCopyCommand command) {
-        CopyCommandDTO dto = CopyCommandDTO.of(command);
-        try {
-            byte[] bytes = objectMapper.writeValueAsBytes(dto);
-            return sender.send(Mono.just(new OutboundMessage(EXCHANGE, 
EMPTY_ROUTING_KEY, new AMQP.BasicProperties.Builder()
-                .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
-                .priority(PERSISTENT_TEXT_PLAIN.getPriority())
-                .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
-                .build(), bytes)));
-        } catch (JsonProcessingException e) {
-            return Mono.error(e);
-        }
-    }
-
-
-}
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
index bc3839cfd2..319a69fd69 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -20,11 +20,9 @@
 package org.apache.james.modules.mailbox;
 
 import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.events.EventListener;
 import org.apache.james.mailbox.cassandra.DeleteMessageListener;
 import org.apache.james.modules.vault.DeletedMessageVaultModule;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.vault.DeletedMessageVault;
 import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
 import org.apache.james.vault.blob.BucketNameGenerator;
@@ -39,7 +37,7 @@ import org.apache.james.vault.metadata.UserPerBucketDAO;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 import com.google.inject.multibindings.Multibinder;
-import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Names;
 
 public class DistributedDeletedMessageVaultModule extends AbstractModule {
     @Override
@@ -65,19 +63,8 @@ public class DistributedDeletedMessageVaultModule extends 
AbstractModule {
         bind(DeletedMessageVault.class)
             .to(BlobStoreDeletedMessageVault.class);
 
-        Multibinder.newSetBinder(binder(), 
DeleteMessageListener.DeletionCallback.class)
+        Multibinder.newSetBinder(binder(), 
EventListener.ReactiveGroupEventListener.class, 
Names.named(DeleteMessageListener.CONTENT_DELETION))
             .addBinding()
-            .to(DistributedDeletedMessageVaultDeletionCallback.class);
-        
bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON);
-
-        Multibinder<SimpleConnectionPool.ReconnectionHandler> 
reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), 
SimpleConnectionPool.ReconnectionHandler.class);
-        
reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class);
-    }
-
-    @ProvidesIntoSet
-    InitializationOperation 
init(DistributedDeletedMessageVaultDeletionCallback callback) {
-        return InitilizationOperationBuilder
-            .forClass(DistributedDeletedMessageVaultDeletionCallback.class)
-            .init(callback::init);
+            .to(DeletedMessageVaultDeletionListener.class);
     }
 }
diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionCallback.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionListener.java
similarity index 59%
rename from 
server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionCallback.java
rename to 
server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionListener.java
index c2e4d25c05..ec5fd46e3c 100644
--- 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionCallback.java
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionListener.java
@@ -21,21 +21,46 @@ package org.apache.james.jmap.cassandra.projections;
 
 import jakarta.inject.Inject;
 
+import org.apache.james.events.Event;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
 import org.apache.james.jmap.api.projections.MessageFastViewProjection;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.mailbox.events.MailboxEvents;
+import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Mono;
 
-public class CassandraMessageFastViewProjectionDeletionCallback implements 
DeleteMessageListener.DeletionCallback {
+public class CassandraMessageFastViewProjectionDeletionListener implements 
EventListener.ReactiveGroupEventListener {
+    public static class 
CassandraMessageFastViewProjectionDeletionListenerGroup extends Group {
+
+    }
+
+    private static final Group GROUP = new 
CassandraMessageFastViewProjectionDeletionListenerGroup();
+
     private final MessageFastViewProjection messageFastViewProjection;
 
     @Inject
-    public 
CassandraMessageFastViewProjectionDeletionCallback(MessageFastViewProjection 
messageFastViewProjection) {
+    public 
CassandraMessageFastViewProjectionDeletionListener(MessageFastViewProjection 
messageFastViewProjection) {
         this.messageFastViewProjection = messageFastViewProjection;
     }
 
     @Override
-    public Mono<Void> 
forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
-        return 
Mono.from(messageFastViewProjection.delete(copyCommand.getMessageId()));
+    public Group getDefaultGroup() {
+        return GROUP;
+    }
+
+    @Override
+    public boolean isHandling(Event event) {
+        return event instanceof MailboxEvents.MessageContentDeletionEvent;
+    }
+
+    @Override
+    public Publisher<Void> reactiveEvent(Event event) {
+        if (event instanceof MailboxEvents.MessageContentDeletionEvent 
contentDeletionEvent) {
+            return 
Mono.from(messageFastViewProjection.delete(contentDeletionEvent.messageId()));
+        }
+
+        return Mono.empty();
     }
+
 }
diff --git 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
index 3591a21b4c..a93b003df8 100644
--- 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
+++ 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
@@ -141,6 +141,7 @@ class RabbitMQWebAdminServerIntegrationImmutableTest 
extends WebAdminServerInteg
             "Cassandra backend", "EventDeadLettersHealthCheck", 
"MessageFastViewProjection",
             "RabbitMQMailQueue BrowseStart", "OpenSearch Backend", 
"ObjectStorage", "DistributedTaskManagerConsumers",
             "EventbusConsumers-jmapEvent", "MailQueueConsumers", 
"EventbusConsumers-mailboxEvent",
-            "RabbitMQJmapEventBusDeadLetterQueueHealthCheck", 
"IMAPHealthCheck");
+            "RabbitMQJmapEventBusDeadLetterQueueHealthCheck", 
"IMAPHealthCheck",
+            "RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck", 
"EventbusConsumers-contentDeletionEvent");
     }
 }
diff --git 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
 
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
deleted file mode 100644
index 38a5773b2f..0000000000
--- 
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.webadmin.integration.rabbitmq.vault;
-
-import org.apache.james.CassandraExtension;
-import org.apache.james.CassandraRabbitMQJamesConfiguration;
-import org.apache.james.CassandraRabbitMQJamesServerMain;
-import org.apache.james.DockerOpenSearchExtension;
-import org.apache.james.GuiceJamesServer;
-import org.apache.james.JamesServerBuilder;
-import org.apache.james.JamesServerExtension;
-import org.apache.james.SearchConfiguration;
-import org.apache.james.junit.categories.BasicFeature;
-import org.apache.james.modules.AwsS3BlobStoreExtension;
-import org.apache.james.modules.RabbitMQExtension;
-import org.apache.james.modules.TestJMAPServerModule;
-import org.apache.james.modules.blobstore.BlobStoreConfiguration;
-import org.apache.james.vault.VaultConfiguration;
-import 
org.apache.james.webadmin.integration.vault.DeletedMessageVaultIntegrationTest;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-class WorkQueueEnabledDeletedMessageVaultIntegrationTest extends 
DeletedMessageVaultIntegrationTest {
-
-    private static final DockerOpenSearchExtension ES_EXTENSION = new 
DockerOpenSearchExtension();
-
-    @RegisterExtension
-    static JamesServerExtension testExtension = new 
JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
-        CassandraRabbitMQJamesConfiguration.builder()
-            .workingDirectory(tmpDir)
-            .configurationFromClasspath()
-            .blobStore(BlobStoreConfiguration.builder()
-                    .s3()
-                    .disableCache()
-                    .deduplication()
-                    .noCryptoConfig())
-            .searchConfiguration(SearchConfiguration.openSearch())
-            .vaultConfiguration(VaultConfiguration.ENABLED_WORKQUEUE)
-            .build())
-        .extension(ES_EXTENSION)
-        .extension(new CassandraExtension())
-        .extension(new AwsS3BlobStoreExtension())
-        .extension(new RabbitMQExtension())
-        .extension(new ClockExtension())
-        .server(configuration -> 
CassandraRabbitMQJamesServerMain.createServer(configuration)
-            .overrideWith(new TestJMAPServerModule()))
-        .build();
-
-    @Override
-    protected void awaitSearchUpToDate() {
-        ES_EXTENSION.await();
-    }
-
-    @Disabled("JAMES-2688 Unstable test")
-    @Test
-    @Tag(BasicFeature.TAG)
-    @Override
-    public void 
vaultExportShouldExportZipContainsVaultMessagesToShareeWhenImapDeletedMailbox(GuiceJamesServer
 jmapServer) {
-
-    }
-
-}
diff --git a/src/adr/0074-dedicated-eventbus-for-message-content-deletion.md 
b/src/adr/0074-dedicated-eventbus-for-message-content-deletion.md
new file mode 100644
index 0000000000..27d44ccf48
--- /dev/null
+++ b/src/adr/0074-dedicated-eventbus-for-message-content-deletion.md
@@ -0,0 +1,45 @@
+# 74. Relying on EventBus for message deletion side effects
+
+Date: 2025-12-12
+
+## Status
+
+Accepted (lazy consensus) & implemented.
+
+## Context
+
+James wants to perform actions upon email deletion, for instance, copying 
events in the Deleted Message Vault or clearing the corresponding message 
preview (used to serve an optimized data projection over the JMAP protocol).
+
+Deletion action can be long and are thus in Cassandra and Postgres done 
asynchronously in the `DeleteMessageListener`.
+
+We encountered issues, most notably with mailbox deletion, which triggered 
those operations on a message set of message, leading to event processing 
stalling and eventually timeout.
+
+This had been historically solved in the Distributed server by adding a custom 
RabbitMQ queue for the deleted message vault to sequence and distribute each 
single deletion. While it leads to a viable use in production, this approach 
suffers from the following pitfall:
+
+- It duplicates the Event bus code, used for work queues
+- It requires a lot of custom code for adoption in other implementations
+- It makes it hard to add other "features" in a composable fashion without 
duplicating a lot of code
+
+## Decision
+
+We will **remove the `DeletionCallback` interface**.
+
+Instead, **message deletion will always publish a 
`MessageContentDeletionEvent`** on a dedicated EventBus.
+
+Consumers previously relying on deletion callbacks must now register as 
EventBus listeners. This ensures that heavy deletion side effect workloads do 
not block or slow down other important mailbox event processing (such as 
indexing).
+
+## Consequences
+
+### Pros
+- heavy, slow deletion workloads no longer block mailbox operations.
+- `DeletionCallback` interface removed, simplifying the codebase.
+- leverage existing EventBus features: asynchronous execution, listener 
isolation, retry and dead-lettering.
+- deletion side effects are handled the same way as other mailbox events.
+
+### Cons
+- Existing deletion callbacks must migrate to asynchronous listeners.
+
+## References
+
+- ADR-0037 – EventBus design
+- [JIRA: JAMES-4154 – Generalization of deletion side 
effects](https://issues.apache.org/jira/browse/JAMES-4154)
diff --git a/src/site/xdoc/server/config-vault.xml 
b/src/site/xdoc/server/config-vault.xml
index 25d5b809fa..7a1c470805 100644
--- a/src/site/xdoc/server/config-vault.xml
+++ b/src/site/xdoc/server/config-vault.xml
@@ -57,13 +57,6 @@
                         Default to false.
                     </dd>
                 </dl>
-                <dl>
-                    <dt><strong>workQueueEnabled</strong></dt>
-                    <dd>
-                        Enable work queue to be used with deleted message 
vault.
-                        Default to false.
-                    </dd>
-                </dl>
                 <dl>
                     <dt><strong>retentionPeriod</strong></dt>
                     <dd>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to