This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e7c695fcb5bb5b32b675215e6e174e6338b54a79
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Tue Dec 10 21:36:39 2024 +0100

    [PERF] bundle JMAP move events into one
---
 .../james/mailbox/store/StoreMessageIdManager.java | 102 ++++++++++-----------
 1 file changed, 46 insertions(+), 56 deletions(-)

diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index d4950bd309..1d52e2a526 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -380,22 +380,25 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(ImmutableList.toImmutableList());
 
         return validateQuota(messageMoves, mailboxMessage.get())
-            .then(addMessageToMailboxes(mailboxMessage.get(), messageMoves, 
mailboxSession))
-            
.then(expungeMessageFromMailboxes(mailboxMessage.get().getMessageId(), 
messagesToRemove, mailboxSession, messageMoves))
-            .then(eventBus.dispatch(EventFactory.moved()
-                    .session(mailboxSession)
-                    .messageMoves(messageMoves.asMessageMoves())
-                    .messageId(mailboxMessage.get().getMessageId())
-                    .build(),
-                messageMoves.impactedMailboxes()
-                    .map(Mailbox::getMailboxId)
-                    .map(MailboxIdRegistrationKey::new)
-                    .collect(ImmutableSet.toImmutableSet())));
+            .then(Flux.concat(
+                    addMessageToMailboxes(mailboxMessage.get(), messageMoves, 
mailboxSession),
+                    
expungeMessageFromMailboxes(mailboxMessage.get().getMessageId(), 
messagesToRemove, mailboxSession, messageMoves),
+                    Flux.just(new 
EventBus.EventWithRegistrationKey(EventFactory.moved()
+                            .session(mailboxSession)
+                            .messageMoves(messageMoves.asMessageMoves())
+                            .messageId(mailboxMessage.get().getMessageId())
+                            .build(),
+                        messageMoves.impactedMailboxes()
+                            .map(Mailbox::getMailboxId)
+                            .map(MailboxIdRegistrationKey::new)
+                            .collect(ImmutableSet.toImmutableSet()))))
+                .collectList()
+                .flatMap(eventBus::dispatch));
     }
 
-    private Mono<Void> expungeMessageFromMailboxes(MessageId messageId, 
List<Pair<MailboxMessage, Mailbox>> messages, MailboxSession mailboxSession, 
MessageMovesWithMailbox messageMoves) {
+    private Flux<EventBus.EventWithRegistrationKey> 
expungeMessageFromMailboxes(MessageId messageId, List<Pair<MailboxMessage, 
Mailbox>> messages, MailboxSession mailboxSession, MessageMovesWithMailbox 
messageMoves) {
         if (messages.isEmpty()) {
-            return Mono.empty();
+            return Flux.empty();
         }
 
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
@@ -405,27 +408,20 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(ImmutableList.toImmutableList());
 
         return Mono.from(messageIdMapper.deleteReactive(messageId, mailboxIds))
-            .then(Flux.fromIterable(messages)
-                .flatMap(message -> dispatchExpungedEvent(message, 
mailboxSession, messageMoves), DEFAULT_CONCURRENCY)
-                .then());
-    }
-
-    private Mono<Void> dispatchExpungedEvent(Pair<MailboxMessage, Mailbox> 
message, MailboxSession mailboxSession, MessageMovesWithMailbox messageMoves) {
-        return Mono.just(EventFactory.expunged()
-                .randomEventId()
-                .mailboxSession(mailboxSession)
-                .mailbox(message.getRight())
-                .addMetaData(message.getLeft().metaData()))
-            .map(eventBuilder -> {
-                if (isSingleMove(messageMoves)) {
-                    return eventBuilder
-                        
.movedTo(messageMoves.addedMailboxes().iterator().next().getMailboxId())
-                        .build();
-                } else {
-                    return eventBuilder.build();
-                }
-            })
-            .flatMap(event -> eventBus.dispatch(event, new 
MailboxIdRegistrationKey(message.getRight().getMailboxId())));
+            .thenMany(Flux.fromIterable(messages)
+                .map(message -> expungedEvent(message, mailboxSession, 
messageMoves)));
+    }
+
+    private EventBus.EventWithRegistrationKey 
expungedEvent(Pair<MailboxMessage, Mailbox> message, MailboxSession 
mailboxSession, MessageMovesWithMailbox messageMoves) {
+        EventFactory.ExpungedFinalStage.Builder eventBuilder = 
EventFactory.expunged()
+            .randomEventId()
+            .mailboxSession(mailboxSession)
+            .mailbox(message.getRight())
+            .addMetaData(message.getLeft().metaData());
+        if (isSingleMove(messageMoves)) {
+            
eventBuilder.movedTo(messageMoves.addedMailboxes().iterator().next().getMailboxId());
+        }
+        return new EventBus.EventWithRegistrationKey(eventBuilder.build(), 
ImmutableSet.of(new 
MailboxIdRegistrationKey(message.getRight().getMailboxId())));
     }
 
     private Mono<Void> dispatchFlagsChange(MailboxSession mailboxSession, 
MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> 
knownMailboxes) {
@@ -487,11 +483,11 @@ public class StoreMessageIdManager implements 
MessageIdManager {
         }
     }
 
-    private Mono<Void> addMessageToMailboxes(MailboxMessage mailboxMessage, 
MessageMovesWithMailbox messageMoves, MailboxSession mailboxSession) {
+    private Flux<EventBus.EventWithRegistrationKey> 
addMessageToMailboxes(MailboxMessage mailboxMessage, MessageMovesWithMailbox 
messageMoves, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
         return Flux.fromIterable(messageMoves.addedMailboxes())
-            .flatMap(Throwing.<Mailbox, Mono<Void>>function(mailbox -> {
+            .flatMap(Throwing.<Mailbox, 
Mono<EventBus.EventWithRegistrationKey>>function(mailbox -> {
                 MailboxACL.Rfc4314Rights myRights = 
rightManager.myRights(mailbox, mailboxSession);
                 boolean shouldPreserveFlags = myRights.contains(Right.Write);
                 MailboxMessage copy = mailboxMessage.copy(mailbox);
@@ -506,28 +502,22 @@ public class StoreMessageIdManager implements 
MessageIdManager {
                     .build());
 
                 return save(messageIdMapper, copy, mailbox)
-                    .flatMap(metadata -> dispatchAddedEvent(mailboxSession, 
mailbox, metadata, messageMoves));
-            }).sneakyThrow())
-            .then();
+                    .map(metadata -> addedEvent(mailboxSession, mailbox, 
metadata, messageMoves));
+            }).sneakyThrow());
     }
 
-    private Mono<Void> dispatchAddedEvent(MailboxSession mailboxSession, 
Mailbox mailbox, MessageMetaData messageMetaData, MessageMovesWithMailbox 
messageMoves) {
-        return Mono.just(EventFactory.added()
-                .randomEventId()
-                .mailboxSession(mailboxSession)
-                .mailbox(mailbox)
-                .addMetaData(messageMetaData)
-                .isDelivery(!IS_DELIVERY)
-                .isAppended(!IS_APPENDED))
-            .map(eventBuilder -> {
-                if (isSingleMove(messageMoves)) {
-                    return eventBuilder
-                        
.movedFrom(messageMoves.removedMailboxes().iterator().next().getMailboxId())
-                        .build();
-                } else {
-                    return eventBuilder.build();
-                }
-            }).flatMap(event -> eventBus.dispatch(event, new 
MailboxIdRegistrationKey(mailbox.getMailboxId())));
+    private EventBus.EventWithRegistrationKey addedEvent(MailboxSession 
mailboxSession, Mailbox mailbox, MessageMetaData messageMetaData, 
MessageMovesWithMailbox messageMoves) {
+        EventFactory.AddedFinalStage.Builder appended = EventFactory.added()
+            .randomEventId()
+            .mailboxSession(mailboxSession)
+            .mailbox(mailbox)
+            .addMetaData(messageMetaData)
+            .isDelivery(!IS_DELIVERY)
+            .isAppended(!IS_APPENDED);
+        if (isSingleMove(messageMoves)) {
+            
appended.movedFrom(messageMoves.removedMailboxes().iterator().next().getMailboxId());
+        }
+        return new EventBus.EventWithRegistrationKey(appended.build(), 
ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId())));
     }
 
     private boolean isSingleMove(MessageMovesWithMailbox messageMoves) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to