This is an automated email from the ASF dual-hosted git repository.
hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 88bbfb95c4 JAMES-4154 Refactor message content deletion (#2884)
88bbfb95c4 is described below
commit 88bbfb95c426a621948709c7d9170677fe139c61
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Mon Dec 15 21:55:04 2025 +0700
JAMES-4154 Refactor message content deletion (#2884)
---
docs/modules/servers/partials/configure/vault.adoc | 3 -
.../org/apache/james/events/NamingStrategy.java | 2 +
...DeletionEventBusDeadLetterQueueHealthCheck.java | 63 +++++
...tionEventBusDeadLetterQueueHealthCheckTest.java | 131 ++++++++++
.../apache/james/mailbox/events/MailboxEvents.java | 19 ++
.../CassandraMailboxSessionMapperFactory.java | 9 +-
.../mailbox/cassandra/DeleteMessageListener.java | 109 +++-----
.../cassandra/CassandraMailboxManagerProvider.java | 2 +-
.../cassandra/CassandraTestSystemFixture.java | 2 +-
.../james/event/json/MailboxEventSerializer.scala | 26 +-
.../MessageContentDeletionSerializationTest.java | 86 +++++++
.../org/apache/james/vault/VaultConfiguration.java | 22 +-
.../apache/james/vault/VaultConfigurationTest.java | 12 +-
.../james/mailbox/store/event/EventFactory.java | 83 ++++++
.../deletedMessageVault.properties | 4 -
.../james/CassandraRabbitMQJamesServerMain.java | 10 +-
.../deletedMessageVault.properties | 4 -
.../james/DistributedPOP3JamesServerMain.java | 10 +-
.../org/apache/james/PostgresJamesServerMain.java | 2 +
.../james/modules/data/CassandraJmapModule.java | 11 +-
.../CassandraDeletedMessageVaultModule.java | 70 -----
.../modules/mailbox/CassandraMailboxModule.java | 1 -
.../event/ContentDeletionEventBusModule.java | 161 ++++++++++++
.../DeletedMessageVaultDeletionListener.java | 61 +++--
...edMessageVaultWorkQueueReconnectionHandler.java | 43 ----
...ributedDeletedMessageVaultDeletionCallback.java | 282 ---------------------
.../DistributedDeletedMessageVaultModule.java | 21 +-
...MessageFastViewProjectionDeletionListener.java} | 35 ++-
...itMQWebAdminServerIntegrationImmutableTest.java | 3 +-
...eEnabledDeletedMessageVaultIntegrationTest.java | 81 ------
...icated-eventbus-for-message-content-deletion.md | 45 ++++
src/site/xdoc/server/config-vault.xml | 7 -
32 files changed, 762 insertions(+), 658 deletions(-)
diff --git a/docs/modules/servers/partials/configure/vault.adoc
b/docs/modules/servers/partials/configure/vault.adoc
index 8949686175..ed8ffd5254 100644
--- a/docs/modules/servers/partials/configure/vault.adoc
+++ b/docs/modules/servers/partials/configure/vault.adoc
@@ -24,9 +24,6 @@ to get some examples and hints.
| enabled
| Allows to enable or disable usage of the Deleted Message Vault. Default to
false.
-| workQueueEnabled
-| Enable work queue to be used with deleted message vault. Default to false.
-
| retentionPeriod
| Deleted messages stored in the Deleted Messages Vault are expired after this
period (default: 1 year). It can be expressed in *y* years, *d* days, *h*
hours, ...
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
index d3ce4a6f7d..b0f03b4660 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
@@ -24,8 +24,10 @@ import reactor.rabbitmq.QueueSpecification;
public class NamingStrategy {
public static final EventBusName JMAP_EVENT_BUS_NAME = new
EventBusName("jmapEvent");
public static final EventBusName MAILBOX_EVENT_BUS_NAME = new
EventBusName("mailboxEvent");
+ public static final EventBusName CONTENT_DELETION_EVENT_BUS_NAME = new
EventBusName("contentDeletionEvent");
public static final NamingStrategy JMAP_NAMING_STRATEGY = new
NamingStrategy(JMAP_EVENT_BUS_NAME);
public static final NamingStrategy MAILBOX_EVENT_NAMING_STRATEGY = new
NamingStrategy(MAILBOX_EVENT_BUS_NAME);
+ public static final NamingStrategy CONTENT_DELETION_NAMING_STRATEGY = new
NamingStrategy(CONTENT_DELETION_EVENT_BUS_NAME);
private final EventBusName eventBusName;
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck.java
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck.java
new file mode 100644
index 0000000000..411279eac8
--- /dev/null
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck.java
@@ -0,0 +1,63 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck
implements HealthCheck {
+ public static final ComponentName COMPONENT_NAME = new
ComponentName("RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck");
+ private static final String DEFAULT_VHOST = "/";
+
+ private final RabbitMQConfiguration configuration;
+ private final RabbitMQManagementAPI api;
+
+ @Inject
+ public
RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck(RabbitMQConfiguration
configuration) {
+ this.configuration = configuration;
+ this.api = RabbitMQManagementAPI.from(configuration);
+ }
+
+ @Override
+ public ComponentName componentName() {
+ return COMPONENT_NAME;
+ }
+
+ @Override
+ public Mono<Result> check() {
+ return Mono.fromCallable(() ->
api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST),
NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY.deadLetterQueue().getName()).getQueueLength())
+ .map(queueSize -> {
+ if (queueSize != 0) {
+ return Result.degraded(COMPONENT_NAME, "RabbitMQ dead
letter queue of the content deletion event bus contain events. This might
indicate transient failure on event processing.");
+ }
+ return Result.healthy(COMPONENT_NAME);
+ })
+ .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME,
"Error checking RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck", e)))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+}
diff --git
a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest.java
b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest.java
new file mode 100644
index 0000000000..bf8e5282cb
--- /dev/null
+++
b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest.java
@@ -0,0 +1,131 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static
org.apache.james.backends.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+class RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheckTest {
+ @RegisterExtension
+ RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+ .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);
+
+ public static final ImmutableMap<String, Object>
NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
+ public static final String ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS =
"contentDeletionRoutingKey";
+
+ private Connection connection;
+ private Channel channel;
+ private RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck testee;
+
+ @BeforeEach
+ void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException,
URISyntaxException {
+ ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+ connectionFactory.setNetworkRecoveryInterval(1000);
+ connection = connectionFactory.newConnection();
+ channel = connection.createChannel();
+ testee = new
RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck(rabbitMQ.getConfiguration());
+ }
+
+ @AfterEach
+ void tearDown(DockerRabbitMQ rabbitMQ) throws Exception {
+ closeQuietly(connection, channel);
+ rabbitMQ.reset();
+ }
+
+ @Test
+ void healthCheckShouldReturnUnhealthyWhenRabbitMQIsDown() throws Exception
{
+ rabbitMQExtension.getRabbitMQ().stopApp();
+
+ assertThat(testee.check().block().isUnHealthy()).isTrue();
+ }
+
+ @Test
+ void
healthCheckShouldReturnHealthyWhenContentDeletionEventBusDeadLetterQueueIsEmpty()
throws Exception {
+ createDeadLetterQueue(channel,
NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY,
ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS);
+
+ assertThat(testee.check().block().isHealthy()).isTrue();
+ }
+
+ @Test
+ void healthCheckShouldReturnUnhealthyWhenThereIsNoDeadLetterQueue() {
+ assertThat(testee.check().block().isUnHealthy()).isTrue();
+ }
+
+ @Test
+ void
healthCheckShouldReturnDegradedWhenContentDeletionEventBusDeadLetterQueueIsNotEmpty()
throws Exception {
+ createDeadLetterQueue(channel,
NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY,
ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS);
+ publishAMessage(channel,
ROUTING_KEY_CONTENT_DELETION_EVENTS_EVENT_BUS);
+
+ awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+ }
+
+ private void createDeadLetterQueue(Channel channel, NamingStrategy
namingStrategy, String routingKey) throws IOException {
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE);
+ channel.queueDeclare(namingStrategy.deadLetterQueue().getName(),
DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
+ channel.queueBind(namingStrategy.deadLetterQueue().getName(),
EXCHANGE_NAME, routingKey);
+ }
+
+ private void publishAMessage(Channel channel, String routingKey) throws
IOException {
+ AMQP.BasicProperties basicProperties = new
AMQP.BasicProperties.Builder()
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .build();
+
+ channel.basicPublish(EXCHANGE_NAME, routingKey, basicProperties,
"Hello, world!".getBytes(StandardCharsets.UTF_8));
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ Arrays.stream(closeables).forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignore error
+ }
+ }
+}
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
index 5ba8f6ca7a..d9bba5159d 100644
---
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
+++
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java
@@ -536,6 +536,25 @@ public interface MailboxEvents {
}
}
+ record MessageContentDeletionEvent(EventId eventId, Username username,
MailboxId mailboxId, MessageId messageId, long size,
+ Instant internalDate, boolean
hasAttachments, String headerBlobId, String bodyBlobId) implements Event {
+
+ @Override
+ public EventId getEventId() {
+ return eventId;
+ }
+
+ @Override
+ public Username getUsername() {
+ return username;
+ }
+
+ @Override
+ public boolean isNoop() {
+ return false;
+ }
+ }
+
/**
* A mailbox event related to added message
*/
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 9813f8dd76..67b9b21728 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -25,6 +25,7 @@ import jakarta.inject.Inject;
import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobStore;
+import org.apache.james.events.EventBus;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.cassandra.mail.ACLMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraAnnotationMapper;
@@ -63,7 +64,7 @@ import org.apache.james.mailbox.store.mail.UidProvider;
import org.apache.james.mailbox.store.user.SubscriptionMapper;
import com.datastax.oss.driver.api.core.CqlSession;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
/**
* Cassandra implementation of {@link MailboxSessionMapperFactory}
@@ -229,9 +230,11 @@ public class CassandraMailboxSessionMapperFactory extends
MailboxSessionMapperFa
}
- public DeleteMessageListener deleteMessageListener() {
+ @VisibleForTesting
+ public DeleteMessageListener deleteMessageListener(EventBus
contentDeletionEventBus) {
return new DeleteMessageListener(threadDAO, threadLookupDAO,
imapUidDAO, messageIdDAO, messageDAOV3, attachmentDAOV2,
aclMapper, userMailboxRightsDAO, applicableFlagDAO,
firstUnseenDAO, deletedMessageDAO,
- mailboxCounterDAO, mailboxRecentsDAO, blobStore,
cassandraConfiguration, ImmutableSet.of());
+ mailboxCounterDAO, mailboxRecentsDAO, blobStore,
cassandraConfiguration,
+ contentDeletionEventBus);
}
}
diff --git
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 7e0a113607..f20dc5ca3e 100644
---
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -23,18 +23,17 @@ import static
org.apache.james.backends.cassandra.init.configuration.JamesExecut
import static
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles.ConsistencyChoice.WEAK;
import static org.apache.james.util.FunctionalUtils.negate;
-import java.util.Date;
import java.util.Optional;
-import java.util.Set;
import jakarta.inject.Inject;
+import jakarta.inject.Named;
import
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
-import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.core.Username;
import org.apache.james.events.Event;
+import org.apache.james.events.EventBus;
import org.apache.james.events.EventListener;
import org.apache.james.events.Group;
import org.apache.james.mailbox.acl.ACLDiff;
@@ -59,13 +58,15 @@ import
org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
-import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.store.event.EventFactory;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.util.streams.Limit;
import org.reactivestreams.Publisher;
+import com.google.common.collect.ImmutableSet;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -81,80 +82,14 @@ import reactor.core.publisher.Mono;
* idempotent.
*/
public class DeleteMessageListener implements
EventListener.ReactiveGroupEventListener {
+ public static final String CONTENT_DELETION = "contentDeletion";
+
private static final Optional<CassandraId> ALL_MAILBOXES =
Optional.empty();
public static class DeleteMessageListenerGroup extends Group {
}
- public static class DeletedMessageCopyCommand {
- public static DeletedMessageCopyCommand of(MessageRepresentation
message, MailboxId mailboxId, Username owner) {
- return new DeletedMessageCopyCommand(message.getMessageId(),
mailboxId, owner, message.getInternalDate(),
- message.getSize(), !message.getAttachments().isEmpty(),
message.getHeaderId(), message.getBodyId());
- }
-
- private final MessageId messageId;
- private final MailboxId mailboxId;
- private final Username owner;
- private final Date internalDate;
- private final long size;
- private final boolean hasAttachments;
- private final BlobId headerId;
- private final BlobId bodyId;
-
- public DeletedMessageCopyCommand(MessageId messageId, MailboxId
mailboxId, Username owner, Date internalDate, long size, boolean
hasAttachments, BlobId headerId, BlobId bodyId) {
- this.messageId = messageId;
- this.mailboxId = mailboxId;
- this.owner = owner;
- this.internalDate = internalDate;
- this.size = size;
- this.hasAttachments = hasAttachments;
- this.headerId = headerId;
- this.bodyId = bodyId;
- }
-
- public Username getOwner() {
- return owner;
- }
-
- public MessageId getMessageId() {
- return messageId;
- }
-
- public MailboxId getMailboxId() {
- return mailboxId;
- }
-
- public Date getInternalDate() {
- return internalDate;
- }
-
- public long getSize() {
- return size;
- }
-
- public boolean hasAttachments() {
- return hasAttachments;
- }
-
- public BlobId getHeaderId() {
- return headerId;
- }
-
- public BlobId getBodyId() {
- return bodyId;
- }
- }
-
- @FunctionalInterface
- public interface DeletionCallback {
- default Mono<Void> forMessage(MessageRepresentation message, MailboxId
mailboxId, Username owner) {
- return forMessage(DeletedMessageCopyCommand.of(message, mailboxId,
owner));
- }
-
- Mono<Void> forMessage(DeletedMessageCopyCommand copyCommand);
- }
-
private final CassandraThreadDAO threadDAO;
private final CassandraThreadLookupDAO threadLookupDAO;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
@@ -170,7 +105,7 @@ public class DeleteMessageListener implements
EventListener.ReactiveGroupEventLi
private final CassandraMailboxRecentsDAO recentsDAO;
private final BlobStore blobStore;
private final CassandraConfiguration cassandraConfiguration;
- private final Set<DeletionCallback> deletionCallbackList;
+ private final EventBus contentDeletionEventBus;
@Inject
public DeleteMessageListener(CassandraThreadDAO threadDAO,
CassandraThreadLookupDAO threadLookupDAO,
@@ -180,7 +115,8 @@ public class DeleteMessageListener implements
EventListener.ReactiveGroupEventLi
CassandraUserMailboxRightsDAO rightsDAO,
CassandraApplicableFlagDAO applicableFlagDAO,
CassandraFirstUnseenDAO firstUnseenDAO,
CassandraDeletedMessageDAO deletedMessageDAO,
CassandraMailboxCounterDAO counterDAO,
CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore,
- CassandraConfiguration
cassandraConfiguration, Set<DeletionCallback> deletionCallbackList) {
+ CassandraConfiguration cassandraConfiguration,
+ @Named(CONTENT_DELETION) EventBus
contentDeletionEventBus) {
this.threadDAO = threadDAO;
this.threadLookupDAO = threadLookupDAO;
this.imapUidDAO = imapUidDAO;
@@ -196,7 +132,7 @@ public class DeleteMessageListener implements
EventListener.ReactiveGroupEventLi
this.recentsDAO = recentsDAO;
this.blobStore = blobStore;
this.cassandraConfiguration = cassandraConfiguration;
- this.deletionCallbackList = deletionCallbackList;
+ this.contentDeletionEventBus = contentDeletionEventBus;
}
@Override
@@ -260,7 +196,8 @@ public class DeleteMessageListener implements
EventListener.ReactiveGroupEventLi
return Mono.just(messageId)
.filterWhen(this::isReferenced)
.flatMap(id -> readMessage(id)
- .flatMap(message ->
Flux.fromIterable(deletionCallbackList).concatMap(callback ->
callback.forMessage(message, mailboxId, owner)).then().thenReturn(message))
+ .flatMap(message ->
dispatchMessageContentDeletionEvent(mailboxId, owner, message)
+ .thenReturn(message))
.flatMap(message ->
deleteUnreferencedAttachments(message).thenReturn(message))
.flatMap(this::deleteMessageBlobs)
.then(messageDAOV3.delete(messageId))
@@ -270,18 +207,34 @@ public class DeleteMessageListener implements
EventListener.ReactiveGroupEventLi
.then(threadLookupDAO.deleteOneRow(threadId, messageId)));
}
+ private Mono<Void> dispatchMessageContentDeletionEvent(MailboxId
mailboxId, Username owner, MessageRepresentation message) {
+ return
Mono.from(contentDeletionEventBus.dispatch(EventFactory.messageContentDeleted()
+ .randomEventId()
+ .user(owner)
+ .mailboxId(mailboxId)
+ .messageId(message.getMessageId())
+ .size(message.getSize())
+ .instant(message.getInternalDate().toInstant())
+ .hasAttachments(!message.getAttachments().isEmpty())
+ .headerBlobId(message.getHeaderId().asString())
+ .bodyBlobId(message.getBodyId().asString())
+ .build(),
+ ImmutableSet.of()));
+ }
+
private Mono<Void>
handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId,
ThreadId threadId, CassandraId excludedId, Username owner) {
return Mono.just(messageId)
.filterWhen(id -> isReferenced(id, excludedId))
.flatMap(id -> readMessage(id)
- .flatMap(message ->
Flux.fromIterable(deletionCallbackList).concatMap(callback ->
callback.forMessage(message, excludedId, owner)).then().thenReturn(message))
+ .flatMap(message ->
dispatchMessageContentDeletionEvent(excludedId, owner, message)
+ .thenReturn(message)))
.flatMap(message ->
deleteUnreferencedAttachments(message).thenReturn(message))
.flatMap(this::deleteMessageBlobs)
.then(messageDAOV3.delete(messageId))
.then(threadLookupDAO.selectOneRow(threadId, messageId)
.flatMap(key -> threadDAO.deleteSome(key.getUsername(),
key.getMimeMessageIds())
.collectList()))
- .then(threadLookupDAO.deleteOneRow(threadId, messageId)));
+ .then(threadLookupDAO.deleteOneRow(threadId, messageId));
}
private Mono<MessageRepresentation>
deleteMessageBlobs(MessageRepresentation message) {
diff --git
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
index 8e66b4c286..a94bc3f3a5 100644
---
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
+++
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
@@ -132,7 +132,7 @@ public class CassandraMailboxManagerProvider {
eventBus.register(quotaUpdater);
eventBus.register(new MailboxAnnotationListener(mapperFactory,
sessionProvider));
- eventBus.register(mapperFactory.deleteMessageListener());
+ eventBus.register(mapperFactory.deleteMessageListener(eventBus));
return manager;
}
diff --git
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index 143e4d6520..c5e6912d47 100644
---
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -84,7 +84,7 @@ public class CassandraTestSystemFixture {
new NaiveThreadIdGuessingAlgorithm(), new
UpdatableTickingClock(Instant.now()));
eventBus.register(new MailboxAnnotationListener(mapperFactory,
sessionProvider));
- eventBus.register(mapperFactory.deleteMessageListener());
+ eventBus.register(mapperFactory.deleteMessageListener(eventBus));
return cassandraMailboxManager;
}
diff --git
a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
index 88ae462624..5e428aac42 100644
---
a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
+++
b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
@@ -33,7 +33,7 @@ import org.apache.james.events.Event.EventId
import org.apache.james.events.{EventSerializer, Event => JavaEvent}
import org.apache.james.mailbox.MailboxSession.SessionId
import org.apache.james.mailbox.events.MailboxEvents.Added.IS_APPENDED
-import org.apache.james.mailbox.events.MailboxEvents.{Added => JavaAdded,
Expunged => JavaExpunged, FlagsUpdated => JavaFlagsUpdated, MailboxACLUpdated
=> JavaMailboxACLUpdated, MailboxAdded => JavaMailboxAdded, MailboxDeletion =>
JavaMailboxDeletion, MailboxRenamed => JavaMailboxRenamed,
MailboxSubscribedEvent => JavaMailboxSubscribedEvent, MailboxUnsubscribedEvent
=> JavaMailboxUnsubscribedEvent, QuotaUsageUpdatedEvent =>
JavaQuotaUsageUpdatedEvent}
+import org.apache.james.mailbox.events.MailboxEvents.{Added => JavaAdded,
Expunged => JavaExpunged, FlagsUpdated => JavaFlagsUpdated, MailboxACLUpdated
=> JavaMailboxACLUpdated, MailboxAdded => JavaMailboxAdded, MailboxDeletion =>
JavaMailboxDeletion, MailboxRenamed => JavaMailboxRenamed,
MailboxSubscribedEvent => JavaMailboxSubscribedEvent, MailboxUnsubscribedEvent
=> JavaMailboxUnsubscribedEvent, MessageContentDeletionEvent =>
JavaMessageContentDeletionEvent, QuotaUsageUpdatedEvent => [...]
import org.apache.james.mailbox.events.{MessageMoveEvent =>
JavaMessageMoveEvent}
import org.apache.james.mailbox.model.{MailboxId, MessageId, MessageMoves,
QuotaRoot, ThreadId, MailboxACL => JavaMailboxACL, MessageMetaData =>
JavaMessageMetaData, Quota => JavaQuota}
import org.apache.james.mailbox.quota.QuotaRootDeserializer
@@ -133,6 +133,18 @@ private object DTO {
case class MailboxUnSubscribedEvent(eventId: EventId, mailboxPath:
MailboxPath, mailboxId: MailboxId, user: Username, sessionId: SessionId)
extends Event {
override def toJava: JavaEvent = new
JavaMailboxUnsubscribedEvent(sessionId, user, mailboxPath.toJava, mailboxId,
eventId)
}
+
+ case class MessageContentDeletionEvent(eventId: EventId,
+ username: Username,
+ mailboxId: MailboxId,
+ messageId: MessageId,
+ size: Long,
+ internalDate: Instant,
+ hasAttachments: Boolean,
+ headerBlobId: String,
+ bodyBlobId: String) extends Event {
+ override def toJava: JavaEvent = new
JavaMessageContentDeletionEvent(eventId, username, mailboxId, messageId, size,
internalDate, hasAttachments, headerBlobId, bodyBlobId)
+ }
}
private object ScalaConverter {
@@ -227,6 +239,17 @@ private object ScalaConverter {
user = event.getUsername,
sessionId = event.getSessionId)
+ private def toScala(event: JavaMessageContentDeletionEvent):
DTO.MessageContentDeletionEvent = DTO.MessageContentDeletionEvent(
+ eventId = event.getEventId,
+ username = event.getUsername,
+ mailboxId = event.mailboxId(),
+ messageId = event.messageId(),
+ size = event.size(),
+ internalDate = event.internalDate(),
+ hasAttachments = event.hasAttachments,
+ headerBlobId = event.headerBlobId(),
+ bodyBlobId = event.bodyBlobId())
+
def toScala(javaEvent: JavaEvent): Event = javaEvent match {
case e: JavaAdded => toScala(e)
case e: JavaExpunged => toScala(e)
@@ -239,6 +262,7 @@ private object ScalaConverter {
case e: JavaQuotaUsageUpdatedEvent => toScala(e)
case e: JavaMailboxSubscribedEvent => toScala(e)
case e: JavaMailboxUnsubscribedEvent => toScala(e)
+ case e: JavaMessageContentDeletionEvent => toScala(e)
case _ => throw new RuntimeException("no Scala conversion known")
}
}
diff --git
a/mailbox/event/json/src/test/java/org/apache/james/event/json/MessageContentDeletionSerializationTest.java
b/mailbox/event/json/src/test/java/org/apache/james/event/json/MessageContentDeletionSerializationTest.java
new file mode 100644
index 0000000000..9edb6e5d2d
--- /dev/null
+++
b/mailbox/event/json/src/test/java/org/apache/james/event/json/MessageContentDeletionSerializationTest.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.event.json;
+
+import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson;
+import static org.apache.james.event.json.SerializerFixture.EVENT_ID;
+import static org.apache.james.event.json.SerializerFixture.EVENT_SERIALIZER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+
+import org.apache.james.core.Username;
+import
org.apache.james.mailbox.events.MailboxEvents.MessageContentDeletionEvent;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.Test;
+
+class MessageContentDeletionSerializationTest {
+ private static final Username USERNAME = Username.of("[email protected]");
+ private static final MailboxId MAILBOX_ID = TestId.of(18);
+ private static final MessageId MESSAGE_ID = TestMessageId.of(42);
+ private static final long SIZE = 12345L;
+ private static final Instant INTERNAL_DATE =
Instant.parse("2024-12-15T08:23:45Z");
+ private static final boolean HAS_ATTACHMENTS = true;
+ private static final String HEADER_BLOB_ID = "header-blob-id";
+ private static final String BODY_BLOB_ID = "body-blob-id";
+
+ private static final MessageContentDeletionEvent EVENT = new
MessageContentDeletionEvent(
+ EVENT_ID,
+ USERNAME,
+ MAILBOX_ID,
+ MESSAGE_ID,
+ SIZE,
+ INTERNAL_DATE,
+ HAS_ATTACHMENTS,
+ HEADER_BLOB_ID,
+ BODY_BLOB_ID);
+
+ private static final String JSON = """
+ {
+ "MessageContentDeletionEvent": {
+ "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4",
+ "username": "[email protected]",
+ "size": 12345,
+ "hasAttachments": true,
+ "internalDate": "2024-12-15T08:23:45Z",
+ "mailboxId": "18",
+ "headerBlobId": "header-blob-id",
+ "messageId": "42",
+ "bodyBlobId": "body-blob-id"
+ }
+ }
+ """;
+
+ @Test
+ void messageContentDeletionEventShouldBeWellSerialized() {
+ assertThatJson(EVENT_SERIALIZER.toJson(EVENT))
+ .isEqualTo(JSON);
+ }
+
+ @Test
+ void messageContentDeletionEventShouldBeWellDeserialized() {
+ assertThat(EVENT_SERIALIZER.fromJson(JSON).get())
+ .isEqualTo(EVENT);
+ }
+
+}
diff --git
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
index 2b7a34ac20..c6e46fb0d8 100644
---
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
+++
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java
@@ -32,11 +32,9 @@ import com.google.common.base.Preconditions;
public class VaultConfiguration {
public static final VaultConfiguration DEFAULT =
- new VaultConfiguration(false, false, ChronoUnit.YEARS.getDuration(),
DefaultMailboxes.RESTORED_MESSAGES);
+ new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(),
DefaultMailboxes.RESTORED_MESSAGES);
public static final VaultConfiguration ENABLED_DEFAULT =
- new VaultConfiguration(true, false, ChronoUnit.YEARS.getDuration(),
DefaultMailboxes.RESTORED_MESSAGES);
- public static final VaultConfiguration ENABLED_WORKQUEUE =
- new VaultConfiguration(true, true, ChronoUnit.YEARS.getDuration(),
DefaultMailboxes.RESTORED_MESSAGES);
+ new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(),
DefaultMailboxes.RESTORED_MESSAGES);
public static VaultConfiguration from(Configuration
propertiesConfiguration) {
Duration retentionPeriod =
Optional.ofNullable(propertiesConfiguration.getString("retentionPeriod"))
@@ -45,18 +43,15 @@ public class VaultConfiguration {
String restoreLocation =
Optional.ofNullable(propertiesConfiguration.getString("restoreLocation"))
.orElse(DEFAULT.getRestoreLocation());
boolean enabled = propertiesConfiguration.getBoolean("enabled", false);
- boolean workQueueEnabled =
propertiesConfiguration.getBoolean("workQueueEnabled", false);
- return new VaultConfiguration(enabled, workQueueEnabled,
retentionPeriod, restoreLocation);
+ return new VaultConfiguration(enabled, retentionPeriod,
restoreLocation);
}
private final boolean enabled;
- private final boolean workQueueEnabled;
private final Duration retentionPeriod;
private final String restoreLocation;
- VaultConfiguration(boolean enabled, boolean workQueueEnabled, Duration
retentionPeriod, String restoreLocation) {
+ VaultConfiguration(boolean enabled, Duration retentionPeriod, String
restoreLocation) {
this.enabled = enabled;
- this.workQueueEnabled = workQueueEnabled;
Preconditions.checkNotNull(retentionPeriod);
Preconditions.checkNotNull(restoreLocation);
@@ -68,10 +63,6 @@ public class VaultConfiguration {
return enabled;
}
- public boolean isWorkQueueEnabled() {
- return workQueueEnabled;
- }
-
public Duration getRetentionPeriod() {
return retentionPeriod;
}
@@ -87,14 +78,13 @@ public class VaultConfiguration {
return Objects.equals(this.retentionPeriod, that.retentionPeriod)
&& Objects.equals(this.restoreLocation, that.restoreLocation)
- && Objects.equals(this.enabled, that.enabled)
- && Objects.equals(this.workQueueEnabled,
that.workQueueEnabled);
+ && Objects.equals(this.enabled, that.enabled);
}
return false;
}
@Override
public final int hashCode() {
- return Objects.hash(retentionPeriod, restoreLocation, enabled,
workQueueEnabled);
+ return Objects.hash(retentionPeriod, restoreLocation, enabled);
}
}
diff --git
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
index 6fe39dd648..656af119d6 100644
---
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
+++
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java
@@ -40,13 +40,13 @@ class VaultConfigurationTest {
@Test
void constructorShouldThrowWhenRetentionPeriodIsNull() {
- assertThatThrownBy(() -> new VaultConfiguration(true, false, null,
DefaultMailboxes.RESTORED_MESSAGES))
+ assertThatThrownBy(() -> new VaultConfiguration(true, null,
DefaultMailboxes.RESTORED_MESSAGES))
.isInstanceOf(NullPointerException.class);
}
@Test
void constructorShouldThrowWhenRestoreLocationIsNull() {
- assertThatThrownBy(() -> new VaultConfiguration(true, false,
ChronoUnit.YEARS.getDuration(), null))
+ assertThatThrownBy(() -> new VaultConfiguration(true,
ChronoUnit.YEARS.getDuration(), null))
.isInstanceOf(NullPointerException.class);
}
@@ -62,7 +62,7 @@ class VaultConfigurationTest {
configuration.addProperty("restoreLocation", "INBOX");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, false,
ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
+ new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(),
DefaultMailboxes.INBOX));
}
@Test
@@ -71,7 +71,7 @@ class VaultConfigurationTest {
configuration.addProperty("retentionPeriod", "15d");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, false, Duration.ofDays(15),
DefaultMailboxes.RESTORED_MESSAGES));
+ new VaultConfiguration(false, Duration.ofDays(15),
DefaultMailboxes.RESTORED_MESSAGES));
}
@Test
@@ -80,7 +80,7 @@ class VaultConfigurationTest {
configuration.addProperty("retentionPeriod", "15h");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, false, Duration.ofHours(15),
DefaultMailboxes.RESTORED_MESSAGES));
+ new VaultConfiguration(false, Duration.ofHours(15),
DefaultMailboxes.RESTORED_MESSAGES));
}
@Test
@@ -89,7 +89,7 @@ class VaultConfigurationTest {
configuration.addProperty("retentionPeriod", "15");
assertThat(VaultConfiguration.from(configuration)).isEqualTo(
- new VaultConfiguration(false, false, Duration.ofDays(15),
DefaultMailboxes.RESTORED_MESSAGES));
+ new VaultConfiguration(false, Duration.ofDays(15),
DefaultMailboxes.RESTORED_MESSAGES));
}
@Test
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
index 0a248d0cc0..2c19a0b7cb 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
@@ -52,6 +52,7 @@ import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.Quota;
import org.apache.james.mailbox.model.QuotaRoot;
@@ -95,6 +96,11 @@ public class EventFactory {
T mailboxId(MailboxId mailboxId);
}
+ @FunctionalInterface
+ public interface RequireMessageId<T> {
+ T messageId(MessageId messageId);
+ }
+
@FunctionalInterface
public interface RequirePath<T> {
T mailboxPath(MailboxPath path);
@@ -197,6 +203,26 @@ public class EventFactory {
T instant(Instant instant);
}
+ @FunctionalInterface
+ public interface RequireSize<T> {
+ T size(long size);
+ }
+
+ @FunctionalInterface
+ public interface RequireHasAttachments<T> {
+ T hasAttachments(boolean hasAttachments);
+ }
+
+ @FunctionalInterface
+ public interface RequireHeaderBlobId<T> {
+ T headerBlobId(String headerBlobId);
+ }
+
+ @FunctionalInterface
+ public interface RequireBodyBlobId<T> {
+ T bodyBlobId(String bodyBlobId);
+ }
+
@FunctionalInterface
public interface RequireMailboxEvent<T> extends
RequireEventId<RequireSession<RequireMailbox<T>>> {}
@@ -520,6 +546,58 @@ public class EventFactory {
}
}
+ public static final class MessageContentDeletionFinalStage {
+ private final Event.EventId eventId;
+ private final Username username;
+ private final MailboxId mailboxId;
+ private final MessageId messageId;
+ private final long size;
+ private final Instant internalDate;
+ private final boolean hasAttachments;
+ private final String headerBlobId;
+ private final String bodyBlobId;
+
+ MessageContentDeletionFinalStage(Event.EventId eventId,
+ Username username,
+ MailboxId mailboxId,
+ MessageId messageId,
+ long size,
+ Instant internalDate,
+ boolean hasAttachments,
+ String headerBlobId,
+ String bodyBlobId) {
+ this.eventId = eventId;
+ this.username = username;
+ this.mailboxId = mailboxId;
+ this.messageId = messageId;
+ this.size = size;
+ this.internalDate = internalDate;
+ this.hasAttachments = hasAttachments;
+ this.headerBlobId = headerBlobId;
+ this.bodyBlobId = bodyBlobId;
+ }
+
+ public MailboxEvents.MessageContentDeletionEvent build() {
+ Preconditions.checkNotNull(username);
+ Preconditions.checkNotNull(mailboxId);
+ Preconditions.checkNotNull(messageId);
+ Preconditions.checkNotNull(internalDate);
+ Preconditions.checkNotNull(headerBlobId);
+ Preconditions.checkNotNull(bodyBlobId);
+
+ return new MailboxEvents.MessageContentDeletionEvent(
+ eventId,
+ username,
+ mailboxId,
+ messageId,
+ size,
+ internalDate,
+ hasAttachments,
+ headerBlobId,
+ bodyBlobId);
+ }
+ }
+
public static class MailboxSubscribedFinalStage {
private final Event.EventId eventId;
private final MailboxPath path;
@@ -606,6 +684,11 @@ public class EventFactory {
return eventId -> user -> quotaRoot -> quotaCount -> quotaSize ->
instant -> new QuotaUsageUpdatedFinalStage(eventId, user, quotaRoot,
quotaCount, quotaSize, instant);
}
+ public static
RequireEventId<RequireUser<RequireMailboxId<RequireMessageId<RequireSize<RequireInstant<RequireHasAttachments<RequireHeaderBlobId<RequireBodyBlobId<MessageContentDeletionFinalStage>>>>>>>>>
messageContentDeleted() {
+ return eventId -> user -> mailboxId -> messageId -> size -> instant ->
hasAttachments -> headerBlobId -> bodyBlobId ->
+ new MessageContentDeletionFinalStage(eventId, user, mailboxId,
messageId, size, instant, hasAttachments, headerBlobId, bodyBlobId);
+ }
+
public static RequireMailboxEvent<MailboxSubscribedFinalStage>
mailboxSubscribed() {
return eventId -> user -> sessionId -> mailboxId -> path -> new
MailboxSubscribedFinalStage(eventId, path, mailboxId, user, sessionId);
}
diff --git
a/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
b/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
index 71b344ad27..2e4d1ed4e2 100644
---
a/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
+++
b/server/apps/distributed-app/sample-configuration/deletedMessageVault.properties
@@ -2,10 +2,6 @@
enabled=false
-# Enable work queue to be used with deleted message vault
-# Default to false
-# workQueueEnabled=false
-
# Retention period for your deleted messages into the vault, after which they
expire and can be potentially cleaned up
# Optional, default 1y
# retentionPeriod=1y
diff --git
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 6d0ae07e4e..9d17f58a2d 100644
---
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -50,10 +50,10 @@ import
org.apache.james.modules.data.CassandraSieveQuotaModule;
import org.apache.james.modules.data.CassandraSieveRepositoryModule;
import org.apache.james.modules.data.CassandraUsersRepositoryModule;
import org.apache.james.modules.data.CassandraVacationModule;
+import org.apache.james.modules.event.ContentDeletionEventBusModule;
import org.apache.james.modules.event.JMAPEventBusModule;
import org.apache.james.modules.event.MailboxEventBusModule;
import org.apache.james.modules.eventstore.CassandraEventStoreModule;
-import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
import org.apache.james.modules.mailbox.CassandraMailboxModule;
import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
@@ -182,6 +182,7 @@ public class CassandraRabbitMQJamesServerMain implements
JamesServerMain {
protected static final Module MODULES =
Modules.override(REQUIRE_TASK_MANAGER_MODULE, new
DistributedTaskManagerModule())
.with(new RabbitMQModule(),
new MailboxEventBusModule(),
+ new ContentDeletionEventBusModule(),
new DistributedTaskSerializationModule());
public static void main(String[] args) throws Exception {
@@ -232,14 +233,9 @@ public class CassandraRabbitMQJamesServerMain implements
JamesServerMain {
}
private static Module chooseDeletedMessageVault(VaultConfiguration
vaultConfiguration) {
- if (vaultConfiguration.isEnabled() &&
vaultConfiguration.isWorkQueueEnabled()) {
- return Modules.combine(
- new DistributedDeletedMessageVaultModule(),
- new DeletedMessageVaultRoutesModule());
- }
if (vaultConfiguration.isEnabled()) {
return Modules.combine(
- new CassandraDeletedMessageVaultModule(),
+ new DistributedDeletedMessageVaultModule(),
new DeletedMessageVaultRoutesModule());
}
return binder -> {
diff --git
a/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
b/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
index 40873db544..47ec56c5c4 100644
---
a/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
+++
b/server/apps/distributed-pop3-app/sample-configuration/deletedMessageVault.properties
@@ -2,10 +2,6 @@
enabled=false
-# Enable work queue to be used with deleted message vault
-# Default to false
-# workQueueEnabled=false
-
# Retention period for your deleted messages into the vault, after which they
expire and can be potentially cleaned up
# Optional, default 1y
# retentionPeriod=1y
diff --git
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index c1247e8474..e858004f4a 100644
---
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -56,11 +56,11 @@ import
org.apache.james.modules.data.CassandraSieveQuotaModule;
import org.apache.james.modules.data.CassandraSieveRepositoryModule;
import org.apache.james.modules.data.CassandraUsersRepositoryModule;
import org.apache.james.modules.data.CassandraVacationModule;
+import org.apache.james.modules.event.ContentDeletionEventBusModule;
import org.apache.james.modules.event.JMAPEventBusModule;
import org.apache.james.modules.event.MailboxEventBusModule;
import org.apache.james.modules.eventstore.CassandraEventStoreModule;
import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule;
-import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
import org.apache.james.modules.mailbox.CassandraMailboxModule;
import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
@@ -176,6 +176,7 @@ public class DistributedPOP3JamesServerMain implements
JamesServerMain {
new RabbitMQMailQueueModule(),
new RabbitMailQueueRoutesModule(),
new MailboxEventBusModule(),
+ new ContentDeletionEventBusModule(),
new DistributedTaskSerializationModule());
public static void main(String[] args) throws Exception {
@@ -219,15 +220,10 @@ public class DistributedPOP3JamesServerMain implements
JamesServerMain {
}
private static Module chooseDeletedMessageVault(VaultConfiguration
vaultConfiguration) {
- if (vaultConfiguration.isEnabled() &&
vaultConfiguration.isWorkQueueEnabled()) {
- return Modules.combine(
- new DistributedDeletedMessageVaultModule(),
- new DeletedMessageVaultRoutesModule());
- }
if (vaultConfiguration.isEnabled()) {
return Modules.combine(
new DistributedDeletedMessageVaultModule(),
- new CassandraDeletedMessageVaultModule());
+ new DeletedMessageVaultRoutesModule());
}
return binder -> {
diff --git
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 3d7df65509..0ebd94cca6 100644
---
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -48,6 +48,7 @@ import org.apache.james.modules.data.PostgresEventStoreModule;
import org.apache.james.modules.data.PostgresUsersRepositoryModule;
import org.apache.james.modules.data.PostgresVacationModule;
import org.apache.james.modules.data.SievePostgresRepositoryModules;
+import org.apache.james.modules.event.ContentDeletionEventBusModule;
import org.apache.james.modules.event.JMAPEventBusModule;
import org.apache.james.modules.event.MailboxEventBusModule;
import org.apache.james.modules.events.PostgresDeadLetterModule;
@@ -232,6 +233,7 @@ public class PostgresJamesServerMain implements
JamesServerMain {
case RABBITMQ:
return List.of(
Modules.override(new DefaultEventModule()).with(new
MailboxEventBusModule()),
+ new ContentDeletionEventBusModule(),
new RabbitMQModule(),
new RabbitMQMailQueueModule(),
new FakeMailQueueViewModule(),
diff --git
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
index 083596f435..4a53aa0b20 100644
---
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
+++
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
@@ -19,11 +19,14 @@
package org.apache.james.modules.data;
+import static
org.apache.james.mailbox.cassandra.DeleteMessageListener.CONTENT_DELETION;
+
import java.io.FileNotFoundException;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.events.EventListener;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.eventsourcing.eventstore.dto.EventDTO;
@@ -54,14 +57,13 @@ import
org.apache.james.jmap.cassandra.projections.CassandraEmailQueryView;
import
org.apache.james.jmap.cassandra.projections.CassandraEmailQueryViewDataDefinition;
import
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjection;
import
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjectionDataDefinition;
-import
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjectionDeletionCallback;
+import
org.apache.james.jmap.cassandra.projections.CassandraMessageFastViewProjectionDeletionListener;
import
org.apache.james.jmap.cassandra.pushsubscription.CassandraPushSubscriptionDataDefinition;
import
org.apache.james.jmap.cassandra.pushsubscription.CassandraPushSubscriptionRepository;
import org.apache.james.jmap.cassandra.upload.CassandraUploadRepository;
import org.apache.james.jmap.cassandra.upload.CassandraUploadUsageRepository;
import org.apache.james.jmap.cassandra.upload.UploadDAO;
import org.apache.james.jmap.cassandra.upload.UploadDataDefinition;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
import org.apache.james.user.api.DeleteUserDataTaskStep;
import org.apache.james.user.api.UsernameChangeTaskStep;
import org.apache.james.utils.PropertiesProvider;
@@ -72,6 +74,7 @@ import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
public class CassandraJmapModule extends AbstractModule {
@Override
@@ -116,9 +119,9 @@ public class CassandraJmapModule extends AbstractModule {
eventDTOModuleBinder.addBinding().toInstance(FilteringRuleSetDefineDTOModules.FILTERING_RULE_SET_DEFINED);
eventDTOModuleBinder.addBinding().toInstance(FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT);
- Multibinder.newSetBinder(binder(),
DeleteMessageListener.DeletionCallback.class)
+ Multibinder.newSetBinder(binder(),
EventListener.ReactiveGroupEventListener.class, Names.named(CONTENT_DELETION))
.addBinding()
- .to(CassandraMessageFastViewProjectionDeletionCallback.class);
+ .to(CassandraMessageFastViewProjectionDeletionListener.class);
Multibinder.newSetBinder(binder(), UsernameChangeTaskStep.class)
.addBinding()
diff --git
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraDeletedMessageVaultModule.java
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraDeletedMessageVaultModule.java
deleted file mode 100644
index 58fb3951c5..0000000000
---
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraDeletedMessageVaultModule.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.modules.vault.DeletedMessageVaultModule;
-import org.apache.james.vault.DeletedMessageVault;
-import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
-import org.apache.james.vault.blob.BucketNameGenerator;
-import
org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
-import org.apache.james.vault.metadata.CassandraDeletedMessageMetadataVault;
-import org.apache.james.vault.metadata.DeletedMessageMetadataDataDefinition;
-import org.apache.james.vault.metadata.DeletedMessageMetadataVault;
-import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
-import org.apache.james.vault.metadata.MetadataDAO;
-import org.apache.james.vault.metadata.StorageInformationDAO;
-import org.apache.james.vault.metadata.UserPerBucketDAO;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Scopes;
-import com.google.inject.multibindings.Multibinder;
-
-public class CassandraDeletedMessageVaultModule extends AbstractModule {
-
- @Override
- protected void configure() {
- install(new DeletedMessageVaultModule());
-
- Multibinder<CassandraDataDefinition> cassandraDataDefinitions =
Multibinder.newSetBinder(binder(), CassandraDataDefinition.class);
- cassandraDataDefinitions
- .addBinding()
- .toInstance(DeletedMessageMetadataDataDefinition.MODULE);
-
- bind(MetadataDAO.class).in(Scopes.SINGLETON);
- bind(StorageInformationDAO.class).in(Scopes.SINGLETON);
- bind(UserPerBucketDAO.class).in(Scopes.SINGLETON);
-
bind(DeletedMessageWithStorageInformationConverter.class).in(Scopes.SINGLETON);
-
- bind(CassandraDeletedMessageMetadataVault.class).in(Scopes.SINGLETON);
- bind(DeletedMessageMetadataVault.class)
- .to(CassandraDeletedMessageMetadataVault.class);
-
- bind(BucketNameGenerator.class).in(Scopes.SINGLETON);
- bind(BlobStoreDeletedMessageVault.class).in(Scopes.SINGLETON);
- bind(DeletedMessageVault.class)
- .to(BlobStoreDeletedMessageVault.class);
-
- Multibinder.newSetBinder(binder(),
DeleteMessageListener.DeletionCallback.class)
- .addBinding()
- .to(DeletedMessageVaultDeletionCallback.class);
- }
-}
diff --git
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
index 48c6d32c08..d71e7e44ad 100644
---
a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
+++
b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
@@ -251,7 +251,6 @@ public class CassandraMailboxModule extends AbstractModule {
.addBinding().to(MailboxSubscriptionListener.class);
Multibinder.newSetBinder(binder(),
EventListener.ReactiveGroupEventListener.class)
.addBinding().to(DeleteMessageListener.class);
- Multibinder.newSetBinder(binder(),
DeleteMessageListener.DeletionCallback.class);
bind(MailboxManager.class).annotatedWith(Names.named(MAILBOXMANAGER_NAME)).to(MailboxManager.class);
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
new file mode 100644
index 0000000000..72a4cc07ae
--- /dev/null
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
@@ -0,0 +1,161 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.modules.event;
+
+import static
org.apache.james.events.NamingStrategy.CONTENT_DELETION_NAMING_STRATEGY;
+
+import java.util.Set;
+
+import jakarta.inject.Named;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.event.json.MailboxEventSerializer;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventBus;
+import org.apache.james.events.EventBusId;
+import org.apache.james.events.EventBusReconnectionHandler;
+import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
+import org.apache.james.events.GroupRegistrationHandler;
+import org.apache.james.events.KeyReconnectionHandler;
+import org.apache.james.events.RabbitEventBusConsumerHealthCheck;
+import
org.apache.james.events.RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck;
+import org.apache.james.events.RabbitMQEventBus;
+import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.RoutingKeyConverter;
+import org.apache.james.jmap.change.Factory;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+import org.reactivestreams.Publisher;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Names;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
+
+public class ContentDeletionEventBusModule extends AbstractModule {
+ public static class NoopListener implements
EventListener.ReactiveGroupEventListener {
+ public static class NoopListenerGroup extends Group {
+
+ }
+
+ @Override
+ public Group getDefaultGroup() {
+ return new NoopListenerGroup();
+ }
+
+ @Override
+ public Publisher<Void> reactiveEvent(Event event) {
+ return Mono.empty();
+ }
+
+ @Override
+ public boolean isHandling(Event event) {
+ return false;
+ }
+ }
+
+ public static final String CONTENT_DELETION = "contentDeletion";
+
+ @Override
+ protected void configure() {
+
bind(EventBusId.class).annotatedWith(Names.named(CONTENT_DELETION)).toInstance(EventBusId.random());
+
+ Multibinder.newSetBinder(binder(),
EventListener.ReactiveGroupEventListener.class,
Names.named(DeleteMessageListener.CONTENT_DELETION));
+ }
+
+ @ProvidesIntoSet
+ InitializationOperation workQueue(@Named(CONTENT_DELETION)
RabbitMQEventBus instance,
+ @Named(CONTENT_DELETION)
Set<EventListener.ReactiveGroupEventListener> contentDeletionListeners) {
+ return InitilizationOperationBuilder
+ .forClass(RabbitMQEventBus.class)
+ .init(() -> {
+ instance.start();
+ contentDeletionListeners.forEach(instance::register);
+
+ // workaround for Postgres app to make the content deletion
queue created and pass tests
+ // TODO remove after refactoring JAMES-4154 for Postgres app
+ instance.register(new NoopListener());
+ });
+ }
+
+ @ProvidesIntoSet
+ SimpleConnectionPool.ReconnectionHandler
provideReconnectionHandler(@Named(CONTENT_DELETION) RabbitMQEventBus eventBus) {
+ return new EventBusReconnectionHandler(eventBus);
+ }
+
+ @ProvidesIntoSet
+ SimpleConnectionPool.ReconnectionHandler
provideReconnectionHandler(@Named(CONTENT_DELETION) EventBusId eventBusId,
RabbitMQConfiguration configuration) {
+ return new KeyReconnectionHandler(CONTENT_DELETION_NAMING_STRATEGY,
eventBusId, configuration);
+ }
+
+ @ProvidesIntoSet
+ HealthCheck healthCheck(@Named(CONTENT_DELETION) RabbitMQEventBus eventBus,
+ SimpleConnectionPool connectionPool) {
+ return new RabbitEventBusConsumerHealthCheck(eventBus,
CONTENT_DELETION_NAMING_STRATEGY, connectionPool,
+ GroupRegistrationHandler.GROUP);
+ }
+
+ @ProvidesIntoSet
+ HealthCheck
contentDeletionEventBusDeadLetterQueueHealthCheck(RabbitMQConfiguration
rabbitMQConfiguration) {
+ return new
RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck(rabbitMQConfiguration);
+ }
+
+ @Provides
+ @Singleton
+ @Named(CONTENT_DELETION)
+ RabbitMQEventBus provideContentDeletionEventBus(Sender sender,
ReceiverProvider receiverProvider,
+ MailboxEventSerializer
eventSerializer,
+ RetryBackoffConfiguration
retryBackoffConfiguration,
+ EventDeadLetters
eventDeadLetters,
+ MetricFactory
metricFactory, ReactorRabbitMQChannelPool channelPool,
+ @Named(CONTENT_DELETION)
EventBusId eventBusId,
+ RabbitMQConfiguration
configuration) {
+ return new RabbitMQEventBus(
+ CONTENT_DELETION_NAMING_STRATEGY,
+ sender, receiverProvider, eventSerializer,
retryBackoffConfiguration, new RoutingKeyConverter(ImmutableSet.of(new
Factory())),
+ eventDeadLetters, metricFactory, channelPool, eventBusId,
configuration);
+ }
+
+ @Provides
+ @Singleton
+ @Named(CONTENT_DELETION)
+ EventBus provideContentDeletionEventBus(@Named(CONTENT_DELETION)
RabbitMQEventBus rabbitMQEventBus) {
+ return rabbitMQEventBus;
+ }
+
+ @ProvidesIntoSet
+ EventBus registerEventBus(@Named(CONTENT_DELETION) EventBus eventBus) {
+ return eventBus;
+ }
+}
diff --git
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultDeletionListener.java
similarity index 66%
rename from
mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
rename to
server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultDeletionListener.java
index 40b6bf02f4..f80eb04e26 100644
---
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultDeletionListener.java
@@ -17,7 +17,7 @@
* under the License. *
****************************************************************/
-package org.apache.james.vault.metadata;
+package org.apache.james.modules.mailbox;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -31,10 +31,14 @@ import java.util.Set;
import jakarta.inject.Inject;
+import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
+import
org.apache.james.mailbox.events.MailboxEvents.MessageContentDeletionEvent;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mime4j.MimeIOException;
import org.apache.james.mime4j.codec.DecodeMonitor;
@@ -45,6 +49,7 @@ import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.server.core.Envelope;
import org.apache.james.vault.DeletedMessage;
import org.apache.james.vault.DeletedMessageVault;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,46 +57,70 @@ import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
-public class DeletedMessageVaultDeletionCallback implements
DeleteMessageListener.DeletionCallback {
- private static final Logger LOGGER =
LoggerFactory.getLogger(DeletedMessageVaultDeletionCallback.class);
+public class DeletedMessageVaultDeletionListener implements
EventListener.ReactiveGroupEventListener {
+ public static class DeletedMessageVaultListenerGroup extends Group {
+ }
+
+ private static final Group DELETED_MESSAGE_VAULT_DELETION_GROUP = new
DeletedMessageVaultListenerGroup();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DeletedMessageVaultDeletionListener.class);
+ private final BlobId.Factory blobIdFactory;
private final DeletedMessageVault deletedMessageVault;
private final BlobStore blobStore;
private final Clock clock;
@Inject
- public DeletedMessageVaultDeletionCallback(DeletedMessageVault
deletedMessageVault, BlobStore blobStore, Clock clock) {
+ public DeletedMessageVaultDeletionListener(BlobId.Factory blobIdFactory,
DeletedMessageVault deletedMessageVault,
+ BlobStore blobStore, Clock
clock) {
+ this.blobIdFactory = blobIdFactory;
this.deletedMessageVault = deletedMessageVault;
this.blobStore = blobStore;
this.clock = clock;
}
@Override
- public Mono<Void>
forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
- return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
copyCommand.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
+ public Group getDefaultGroup() {
+ return DELETED_MESSAGE_VAULT_DELETION_GROUP;
+ }
+
+ @Override
+ public boolean isHandling(Event event) {
+ return event instanceof MessageContentDeletionEvent;
+ }
+
+ @Override
+ public Publisher<Void> reactiveEvent(Event event) {
+ if (event instanceof MessageContentDeletionEvent contentDeletionEvent)
{
+ return forMessage(contentDeletionEvent);
+ }
+
+ return Mono.empty();
+ }
+
+ public Mono<Void> forMessage(MessageContentDeletionEvent
messageContentDeletionEvent) {
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
blobIdFactory.parse(messageContentDeletionEvent.headerBlobId()),
BlobStore.StoragePolicy.LOW_COST))
.flatMap(bytes -> {
- Optional<Message> mimeMessage = parseMessage(new
ByteArrayInputStream(bytes), copyCommand.getMessageId());
+ Optional<Message> mimeMessage = parseMessage(new
ByteArrayInputStream(bytes), messageContentDeletionEvent.messageId());
DeletedMessage deletedMessage = DeletedMessage.builder()
- .messageId(copyCommand.getMessageId())
- .originMailboxes(copyCommand.getMailboxId())
- .user(copyCommand.getOwner())
-
.deliveryDate(ZonedDateTime.ofInstant(copyCommand.getInternalDate().toInstant(),
ZoneOffset.UTC))
+ .messageId(messageContentDeletionEvent.messageId())
+ .originMailboxes(messageContentDeletionEvent.mailboxId())
+ .user(messageContentDeletionEvent.getUsername())
+
.deliveryDate(ZonedDateTime.ofInstant(messageContentDeletionEvent.internalDate(),
ZoneOffset.UTC))
.deletionDate(ZonedDateTime.ofInstant(clock.instant(),
ZoneOffset.UTC))
.sender(retrieveSender(mimeMessage))
.recipients(retrieveRecipients(mimeMessage))
- .hasAttachment(copyCommand.hasAttachments())
- .size(copyCommand.getSize())
+
.hasAttachment(messageContentDeletionEvent.hasAttachments())
+ .size(messageContentDeletionEvent.size())
.subject(mimeMessage.map(Message::getSubject))
.build();
- return
Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(),
copyCommand.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
+ return
Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(),
blobIdFactory.parse(messageContentDeletionEvent.bodyBlobId()),
BlobStore.StoragePolicy.LOW_COST))
.map(bodyStream -> new SequenceInputStream(new
ByteArrayInputStream(bytes), bodyStream))
.flatMap(bodyStream ->
Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
});
}
-
private Optional<Message> parseMessage(InputStream inputStream, MessageId
messageId) {
DefaultMessageBuilder messageBuilder = new DefaultMessageBuilder();
messageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
deleted file mode 100644
index 0893a66d34..0000000000
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import jakarta.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
-import org.reactivestreams.Publisher;
-
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-
-public class DeletedMessageVaultWorkQueueReconnectionHandler implements
SimpleConnectionPool.ReconnectionHandler {
- private final DistributedDeletedMessageVaultDeletionCallback
distributedDeletedMessageVaultDeletionCallback;
-
- @Inject
- public
DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback
distributedDeletedMessageVaultDeletionCallback) {
- this.distributedDeletedMessageVaultDeletionCallback =
distributedDeletedMessageVaultDeletionCallback;
- }
-
- @Override
- public Publisher<Void> handleReconnection(Connection connection) {
- return
Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart);
- }
-}
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
deleted file mode 100644
index 919c6331f5..0000000000
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.modules.mailbox;
-
-import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
-import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
-import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
-import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-
-import java.time.Duration;
-import java.util.Date;
-import java.util.Optional;
-
-import jakarta.annotation.PreDestroy;
-import jakarta.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.ReceiverProvider;
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.core.Username;
-import org.apache.james.lifecycle.api.Startable;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.rabbitmq.client.AMQP;
-
-import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.ExchangeSpecification;
-import reactor.rabbitmq.OutboundMessage;
-import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.Sender;
-
-public class DistributedDeletedMessageVaultDeletionCallback implements
DeleteMessageListener.DeletionCallback, Startable {
- public static final Logger LOGGER =
LoggerFactory.getLogger(DistributedDeletedMessageVaultDeletionCallback.class);
-
- private static class CopyCommandDTO {
- public static CopyCommandDTO
of(DeleteMessageListener.DeletedMessageCopyCommand command) {
- return new CopyCommandDTO(
- command.getMessageId().serialize(),
- command.getMailboxId().serialize(),
- command.getOwner().asString(),
- command.getInternalDate(),
- command.getSize(),
- command.hasAttachments(),
- command.getHeaderId().asString(),
- command.getBodyId().asString());
- }
-
- private final String messageId;
- private final String mailboxId;
- private final String owner;
- private final Date internalDate;
- private final long size;
- private final boolean hasAttachments;
- private final String headerId;
- private final String bodyId;
-
- @JsonCreator
- public CopyCommandDTO(@JsonProperty("messageId") String messageId,
- @JsonProperty("mailboxId") String mailboxId,
- @JsonProperty("owner") String owner,
- @JsonProperty("internalDate") Date internalDate,
- @JsonProperty("size") long size,
- @JsonProperty("hasAttachments") boolean
hasAttachments,
- @JsonProperty("headerId") String headerId,
- @JsonProperty("bodyId") String bodyId) {
- this.messageId = messageId;
- this.mailboxId = mailboxId;
- this.owner = owner;
- this.internalDate = internalDate;
- this.size = size;
- this.hasAttachments = hasAttachments;
- this.headerId = headerId;
- this.bodyId = bodyId;
- }
-
- public String getMessageId() {
- return messageId;
- }
-
- public String getMailboxId() {
- return mailboxId;
- }
-
- public String getOwner() {
- return owner;
- }
-
- public Date getInternalDate() {
- return internalDate;
- }
-
- public long getSize() {
- return size;
- }
-
- public boolean isHasAttachments() {
- return hasAttachments;
- }
-
- public String getHeaderId() {
- return headerId;
- }
-
- public String getBodyId() {
- return bodyId;
- }
-
- @JsonIgnore
- DeleteMessageListener.DeletedMessageCopyCommand
asPojo(MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory,
BlobId.Factory blobIdFactory) {
- return new
DeleteMessageListener.DeletedMessageCopyCommand(messageIdFactory.fromString(messageId),
- mailboxIdFactory.fromString(messageId),
- Username.of(owner),
- internalDate,
- size,
- hasAttachments,
- blobIdFactory.parse(headerId),
- blobIdFactory.parse(bodyId));
- }
- }
-
- private static final String EXCHANGE = "deleted-message-vault";
- private static final String QUEUE = "deleted-message-vault-work-queue";
- private static final String DEAD_LETTER = QUEUE + "-dead-letter";
- private static final boolean REQUEUE = true;
- private static final int QOS = 5;
-
- private final ReactorRabbitMQChannelPool channelPool;
- private final RabbitMQConfiguration rabbitMQConfiguration;
- private final DeletedMessageVaultDeletionCallback callback;
- private final Sender sender;
- private final ObjectMapper objectMapper;
- private final MailboxId.Factory mailboxIdFactory;
- private final MessageId.Factory messageIdFactory;
- private final BlobId.Factory blobIdFactory;
- private final ReceiverProvider receiverProvider;
- private Disposable disposable;
-
- @Inject
- public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
-
ReactorRabbitMQChannelPool channelPool,
-
RabbitMQConfiguration rabbitMQConfiguration,
-
DeletedMessageVaultDeletionCallback callback,
- MailboxId.Factory
mailboxIdFactory,
- MessageId.Factory
messageIdFactory,
- BlobId.Factory
blobIdFactory,
- ReceiverProvider
receiverProvider) {
- this.sender = sender;
- this.rabbitMQConfiguration = rabbitMQConfiguration;
- this.callback = callback;
- this.mailboxIdFactory = mailboxIdFactory;
- this.messageIdFactory = messageIdFactory;
- this.blobIdFactory = blobIdFactory;
- this.objectMapper = new ObjectMapper();
- this.channelPool = channelPool;
- this.receiverProvider = receiverProvider;
- }
-
- public void init() {
- Flux.concat(
- sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE)
- .durable(DURABLE)
- .type(DIRECT_EXCHANGE)),
- sender.declareQueue(QueueSpecification.queue(DEAD_LETTER)
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
-
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
- .deadLetter(DEAD_LETTER)
- .build())),
- sender.declareQueue(QueueSpecification.queue(QUEUE)
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
-
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
- .deadLetter(DEAD_LETTER)
- .build())),
- sender.bind(BindingSpecification.binding()
- .exchange(EXCHANGE)
- .queue(QUEUE)
- .routingKey(EMPTY_ROUTING_KEY)))
- .then()
- .block();
-
- disposable = consumeDeletedMessageVaultWorkQueue();
- }
-
- private Disposable consumeDeletedMessageVaultWorkQueue() {
- return Flux.using(
- receiverProvider::createReceiver,
- receiver -> receiver.consumeManualAck(QUEUE, new
ConsumeOptions().qos(QOS)),
- Receiver::close)
- .flatMap(this::handleMessage, QOS)
- .subscribeOn(Schedulers.boundedElastic())
- .subscribe();
- }
-
- public void restart() {
- Disposable previousConsumer = disposable;
- disposable = consumeDeletedMessageVaultWorkQueue();
- previousConsumer.dispose();
- }
-
- @PreDestroy
- public void stop() {
- Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
- }
-
- private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
- try {
- CopyCommandDTO copyCommandDTO =
objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
-
- return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory,
messageIdFactory, blobIdFactory))
- .timeout(Duration.ofMinutes(5))
- .doOnSuccess(any -> delivery.ack())
- .doOnCancel(() -> delivery.nack(REQUEUE))
- .onErrorResume(e -> {
- LOGGER.error("Failed executing deletion callback for {}",
copyCommandDTO.messageId, e);
- delivery.nack(REQUEUE);
- return Mono.empty();
- });
- } catch (Exception e) {
- LOGGER.error("Deserialization error: reject poisonous message for
distributed Deleted message vault callback", e);
- // Deserialization error: reject poisonous messages
- delivery.nack(!REQUEUE);
- return Mono.empty();
- }
- }
-
- @Override
- public Mono<Void>
forMessage(DeleteMessageListener.DeletedMessageCopyCommand command) {
- CopyCommandDTO dto = CopyCommandDTO.of(command);
- try {
- byte[] bytes = objectMapper.writeValueAsBytes(dto);
- return sender.send(Mono.just(new OutboundMessage(EXCHANGE,
EMPTY_ROUTING_KEY, new AMQP.BasicProperties.Builder()
- .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
- .priority(PERSISTENT_TEXT_PLAIN.getPriority())
- .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
- .build(), bytes)));
- } catch (JsonProcessingException e) {
- return Mono.error(e);
- }
- }
-
-
-}
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
index bc3839cfd2..319a69fd69 100644
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -20,11 +20,9 @@
package org.apache.james.modules.mailbox;
import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.events.EventListener;
import org.apache.james.mailbox.cassandra.DeleteMessageListener;
import org.apache.james.modules.vault.DeletedMessageVaultModule;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.vault.DeletedMessageVault;
import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
import org.apache.james.vault.blob.BucketNameGenerator;
@@ -39,7 +37,7 @@ import org.apache.james.vault.metadata.UserPerBucketDAO;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
-import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Names;
public class DistributedDeletedMessageVaultModule extends AbstractModule {
@Override
@@ -65,19 +63,8 @@ public class DistributedDeletedMessageVaultModule extends
AbstractModule {
bind(DeletedMessageVault.class)
.to(BlobStoreDeletedMessageVault.class);
- Multibinder.newSetBinder(binder(),
DeleteMessageListener.DeletionCallback.class)
+ Multibinder.newSetBinder(binder(),
EventListener.ReactiveGroupEventListener.class,
Names.named(DeleteMessageListener.CONTENT_DELETION))
.addBinding()
- .to(DistributedDeletedMessageVaultDeletionCallback.class);
-
bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON);
-
- Multibinder<SimpleConnectionPool.ReconnectionHandler>
reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(),
SimpleConnectionPool.ReconnectionHandler.class);
-
reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class);
- }
-
- @ProvidesIntoSet
- InitializationOperation
init(DistributedDeletedMessageVaultDeletionCallback callback) {
- return InitilizationOperationBuilder
- .forClass(DistributedDeletedMessageVaultDeletionCallback.class)
- .init(callback::init);
+ .to(DeletedMessageVaultDeletionListener.class);
}
}
diff --git
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionCallback.java
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionListener.java
similarity index 59%
rename from
server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionCallback.java
rename to
server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionListener.java
index c2e4d25c05..ec5fd46e3c 100644
---
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionCallback.java
+++
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjectionDeletionListener.java
@@ -21,21 +21,46 @@ package org.apache.james.jmap.cassandra.projections;
import jakarta.inject.Inject;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
import org.apache.james.jmap.api.projections.MessageFastViewProjection;
-import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.mailbox.events.MailboxEvents;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
-public class CassandraMessageFastViewProjectionDeletionCallback implements
DeleteMessageListener.DeletionCallback {
+public class CassandraMessageFastViewProjectionDeletionListener implements
EventListener.ReactiveGroupEventListener {
+ public static class
CassandraMessageFastViewProjectionDeletionListenerGroup extends Group {
+
+ }
+
+ private static final Group GROUP = new
CassandraMessageFastViewProjectionDeletionListenerGroup();
+
private final MessageFastViewProjection messageFastViewProjection;
@Inject
- public
CassandraMessageFastViewProjectionDeletionCallback(MessageFastViewProjection
messageFastViewProjection) {
+ public
CassandraMessageFastViewProjectionDeletionListener(MessageFastViewProjection
messageFastViewProjection) {
this.messageFastViewProjection = messageFastViewProjection;
}
@Override
- public Mono<Void>
forMessage(DeleteMessageListener.DeletedMessageCopyCommand copyCommand) {
- return
Mono.from(messageFastViewProjection.delete(copyCommand.getMessageId()));
+ public Group getDefaultGroup() {
+ return GROUP;
+ }
+
+ @Override
+ public boolean isHandling(Event event) {
+ return event instanceof MailboxEvents.MessageContentDeletionEvent;
+ }
+
+ @Override
+ public Publisher<Void> reactiveEvent(Event event) {
+ if (event instanceof MailboxEvents.MessageContentDeletionEvent
contentDeletionEvent) {
+ return
Mono.from(messageFastViewProjection.delete(contentDeletionEvent.messageId()));
+ }
+
+ return Mono.empty();
}
+
}
diff --git
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
index 3591a21b4c..a93b003df8 100644
---
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
+++
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationImmutableTest.java
@@ -141,6 +141,7 @@ class RabbitMQWebAdminServerIntegrationImmutableTest
extends WebAdminServerInteg
"Cassandra backend", "EventDeadLettersHealthCheck",
"MessageFastViewProjection",
"RabbitMQMailQueue BrowseStart", "OpenSearch Backend",
"ObjectStorage", "DistributedTaskManagerConsumers",
"EventbusConsumers-jmapEvent", "MailQueueConsumers",
"EventbusConsumers-mailboxEvent",
- "RabbitMQJmapEventBusDeadLetterQueueHealthCheck",
"IMAPHealthCheck");
+ "RabbitMQJmapEventBusDeadLetterQueueHealthCheck",
"IMAPHealthCheck",
+ "RabbitMQContentDeletionEventBusDeadLetterQueueHealthCheck",
"EventbusConsumers-contentDeletionEvent");
}
}
diff --git
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
deleted file mode 100644
index 38a5773b2f..0000000000
---
a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/vault/WorkQueueEnabledDeletedMessageVaultIntegrationTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.webadmin.integration.rabbitmq.vault;
-
-import org.apache.james.CassandraExtension;
-import org.apache.james.CassandraRabbitMQJamesConfiguration;
-import org.apache.james.CassandraRabbitMQJamesServerMain;
-import org.apache.james.DockerOpenSearchExtension;
-import org.apache.james.GuiceJamesServer;
-import org.apache.james.JamesServerBuilder;
-import org.apache.james.JamesServerExtension;
-import org.apache.james.SearchConfiguration;
-import org.apache.james.junit.categories.BasicFeature;
-import org.apache.james.modules.AwsS3BlobStoreExtension;
-import org.apache.james.modules.RabbitMQExtension;
-import org.apache.james.modules.TestJMAPServerModule;
-import org.apache.james.modules.blobstore.BlobStoreConfiguration;
-import org.apache.james.vault.VaultConfiguration;
-import
org.apache.james.webadmin.integration.vault.DeletedMessageVaultIntegrationTest;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-class WorkQueueEnabledDeletedMessageVaultIntegrationTest extends
DeletedMessageVaultIntegrationTest {
-
- private static final DockerOpenSearchExtension ES_EXTENSION = new
DockerOpenSearchExtension();
-
- @RegisterExtension
- static JamesServerExtension testExtension = new
JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
- CassandraRabbitMQJamesConfiguration.builder()
- .workingDirectory(tmpDir)
- .configurationFromClasspath()
- .blobStore(BlobStoreConfiguration.builder()
- .s3()
- .disableCache()
- .deduplication()
- .noCryptoConfig())
- .searchConfiguration(SearchConfiguration.openSearch())
- .vaultConfiguration(VaultConfiguration.ENABLED_WORKQUEUE)
- .build())
- .extension(ES_EXTENSION)
- .extension(new CassandraExtension())
- .extension(new AwsS3BlobStoreExtension())
- .extension(new RabbitMQExtension())
- .extension(new ClockExtension())
- .server(configuration ->
CassandraRabbitMQJamesServerMain.createServer(configuration)
- .overrideWith(new TestJMAPServerModule()))
- .build();
-
- @Override
- protected void awaitSearchUpToDate() {
- ES_EXTENSION.await();
- }
-
- @Disabled("JAMES-2688 Unstable test")
- @Test
- @Tag(BasicFeature.TAG)
- @Override
- public void
vaultExportShouldExportZipContainsVaultMessagesToShareeWhenImapDeletedMailbox(GuiceJamesServer
jmapServer) {
-
- }
-
-}
diff --git a/src/adr/0074-dedicated-eventbus-for-message-content-deletion.md
b/src/adr/0074-dedicated-eventbus-for-message-content-deletion.md
new file mode 100644
index 0000000000..27d44ccf48
--- /dev/null
+++ b/src/adr/0074-dedicated-eventbus-for-message-content-deletion.md
@@ -0,0 +1,45 @@
+# 74. Relying on EventBus for message deletion side effects
+
+Date: 2025-12-12
+
+## Status
+
+Accepted (lazy consensus) & implemented.
+
+## Context
+
+James wants to perform actions upon email deletion, for instance, copying
events in the Deleted Message Vault or clearing the corresponding message
preview (used to serve an optimized data projection over the JMAP protocol).
+
+Deletion action can be long and are thus in Cassandra and Postgres done
asynchronously in the `DeleteMessageListener`.
+
+We encountered issues, most notably with mailbox deletion, which triggered
those operations on a message set of message, leading to event processing
stalling and eventually timeout.
+
+This had been historically solved in the Distributed server by adding a custom
RabbitMQ queue for the deleted message vault to sequence and distribute each
single deletion. While it leads to a viable use in production, this approach
suffers from the following pitfall:
+
+- It duplicates the Event bus code, used for work queues
+- It requires a lot of custom code for adoption in other implementations
+- It makes it hard to add other "features" in a composable fashion without
duplicating a lot of code
+
+## Decision
+
+We will **remove the `DeletionCallback` interface**.
+
+Instead, **message deletion will always publish a
`MessageContentDeletionEvent`** on a dedicated EventBus.
+
+Consumers previously relying on deletion callbacks must now register as
EventBus listeners. This ensures that heavy deletion side effect workloads do
not block or slow down other important mailbox event processing (such as
indexing).
+
+## Consequences
+
+### Pros
+- heavy, slow deletion workloads no longer block mailbox operations.
+- `DeletionCallback` interface removed, simplifying the codebase.
+- leverage existing EventBus features: asynchronous execution, listener
isolation, retry and dead-lettering.
+- deletion side effects are handled the same way as other mailbox events.
+
+### Cons
+- Existing deletion callbacks must migrate to asynchronous listeners.
+
+## References
+
+- ADR-0037 – EventBus design
+- [JIRA: JAMES-4154 – Generalization of deletion side
effects](https://issues.apache.org/jira/browse/JAMES-4154)
diff --git a/src/site/xdoc/server/config-vault.xml
b/src/site/xdoc/server/config-vault.xml
index 25d5b809fa..7a1c470805 100644
--- a/src/site/xdoc/server/config-vault.xml
+++ b/src/site/xdoc/server/config-vault.xml
@@ -57,13 +57,6 @@
Default to false.
</dd>
</dl>
- <dl>
- <dt><strong>workQueueEnabled</strong></dt>
- <dd>
- Enable work queue to be used with deleted message
vault.
- Default to false.
- </dd>
- </dl>
<dl>
<dt><strong>retentionPeriod</strong></dt>
<dd>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]