This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 20ca53821b0342b0bc12d466c1450534e3b197e1
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 1 22:06:42 2021 +0700

    JAMES-3575 Make message deletion slightly more reactive
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 11 +++--
 .../james/mailbox/store/StoreMessageIdManager.java | 50 +++++++++++-----------
 .../james/mailbox/store/mail/MessageIdMapper.java  |  4 ++
 3 files changed, 37 insertions(+), 28 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 1224932..5d5972e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -217,13 +217,18 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
 
     @Override
     public void delete(Multimap<MessageId, MailboxId> ids) {
-        Flux.fromIterable(ids.asMap()
+        deleteReactive(ids)
+            .block();
+    }
+
+
+    public Mono<Void> deleteReactive(Multimap<MessageId, MailboxId> ids) {
+        return Flux.fromIterable(ids.asMap()
             .entrySet())
             .publishOn(Schedulers.elastic())
             .flatMap(entry -> deleteReactive(entry.getKey(), 
entry.getValue()), cassandraConfiguration.getExpungeChunkSize(),
                 DEFAULT_CONCURRENCY)
-            .then()
-            .block();
+            .then();
     }
 
     private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, 
Optional<CassandraId> mailboxId) {
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index e40d42d..d34b4a7 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -206,7 +206,9 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(Guavate.toImmutableList());
 
         if (!messageList.isEmpty()) {
-            deleteWithPreHooks(messageIdMapper, messageList, mailboxSession);
+            deleteWithPreHooks(messageIdMapper, messageList, mailboxSession)
+                .subscribeOn(Schedulers.elastic())
+                .block();
             return DeleteResult.destroyed(messageId);
         }
         return DeleteResult.notFound(messageId);
@@ -230,7 +232,9 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(Guavate.toImmutableSet());
         Sets.SetView<MessageId> nonAccessibleMessages = 
Sets.difference(ImmutableSet.copyOf(messageIds), accessibleMessageIds);
 
-        deleteWithPreHooks(messageIdMapper, accessibleMessages, 
mailboxSession);
+        deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession)
+            .subscribeOn(Schedulers.elastic())
+            .block();
 
         return DeleteResult.builder()
             .addDestroyed(accessibleMessageIds)
@@ -238,38 +242,34 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .build();
     }
 
-    private void deleteWithPreHooks(MessageIdMapper messageIdMapper, 
List<MailboxMessage> messageList, MailboxSession mailboxSession) throws 
MailboxException {
+    private Mono<Void> deleteWithPreHooks(MessageIdMapper messageIdMapper, 
List<MailboxMessage> messageList, MailboxSession mailboxSession) {
         ImmutableList<MetadataWithMailboxId> metadataWithMailbox = 
messageList.stream()
             .map(mailboxMessage -> 
MetadataWithMailboxId.from(mailboxMessage.metaData(), 
mailboxMessage.getMailboxId()))
             .collect(Guavate.toImmutableList());
 
-        
preDeletionHooks.runHooks(PreDeletionHook.DeleteOperation.from(metadataWithMailbox))
-            .then(Mono.fromRunnable(Throwing.runnable(
-                () -> delete(messageIdMapper, messageList, mailboxSession, 
metadataWithMailbox))))
-            .block();
+        return 
preDeletionHooks.runHooks(PreDeletionHook.DeleteOperation.from(metadataWithMailbox))
+            .then(delete(messageIdMapper, messageList, mailboxSession, 
metadataWithMailbox));
     }
 
-    private void delete(MessageIdMapper messageIdMapper, List<MailboxMessage> 
messageList, MailboxSession mailboxSession,
-                        ImmutableList<MetadataWithMailboxId> 
metadataWithMailbox) throws MailboxException {
-        messageIdMapper.delete(
+    private Mono<Void> delete(MessageIdMapper messageIdMapper, 
List<MailboxMessage> messageList, MailboxSession mailboxSession, 
ImmutableList<MetadataWithMailboxId> metadataWithMailbox) {
+        MailboxMapper mailboxMapper = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
+
+        return messageIdMapper.deleteReactive(
             messageList.stream()
                 .collect(Guavate.toImmutableListMultimap(
                     Message::getMessageId,
-                    MailboxMessage::getMailboxId)));
-
-        MailboxMapper mailboxMapper = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
-        Flux.fromIterable(metadataWithMailbox)
-            .flatMap(metadataWithMailboxId -> 
mailboxMapper.findMailboxById(metadataWithMailboxId.getMailboxId())
-                .flatMap(mailbox -> eventBus.dispatch(EventFactory.expunged()
-                        .randomEventId()
-                        .mailboxSession(mailboxSession)
-                        .mailbox(mailbox)
-                        
.addMetaData(metadataWithMailboxId.getMessageMetaData())
-                        .build(),
-                    new 
MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))), 
DEFAULT_CONCURRENCY)
-            .then()
-            .subscribeOn(Schedulers.elastic())
-            .block();
+                    MailboxMessage::getMailboxId)))
+            .then(
+                Flux.fromIterable(metadataWithMailbox)
+                    .flatMap(metadataWithMailboxId -> 
mailboxMapper.findMailboxById(metadataWithMailboxId.getMailboxId())
+                        .flatMap(mailbox -> 
eventBus.dispatch(EventFactory.expunged()
+                                .randomEventId()
+                                .mailboxSession(mailboxSession)
+                                .mailbox(mailbox)
+                                
.addMetaData(metadataWithMailboxId.getMessageMetaData())
+                                .build(),
+                            new 
MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))), 
DEFAULT_CONCURRENCY)
+                    .then());
     }
 
     @Override
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 5191435..7bdafbc 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -69,6 +69,10 @@ public interface MessageIdMapper {
             .forEach(this::delete);
     }
 
+    default Mono<Void> deleteReactive(Multimap<MessageId, MailboxId> ids) {
+        return Mono.fromRunnable(() -> delete(ids));
+    }
+
     /**
      * Updates the flags of the messages with the given MessageId in the 
supplied mailboxes
      *

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to