This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.9.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 395171def2a01c89ea8d0cd406b79594a772c4dc
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Nov 10 11:31:24 2025 +0100

    JAMES-3728 Implement MessageIdManager::updateEmail
    
    This method bundles together move and flag update firing a single event for 
both operations.
    This prevents a data race from arising in listeners when the two operations 
are sequentially done,
    causing flags updates to be dropped.
---
 .../org/apache/james/mailbox/MessageIdManager.java |   2 +
 .../james/mailbox/store/StoreMessageIdManager.java | 122 ++++++++++++---------
 2 files changed, 71 insertions(+), 53 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 4605a0fcbd..d682448910 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -76,6 +76,8 @@ public interface MessageIdManager {
 
     Publisher<Void> setInMailboxesReactive(MessageId messageId, 
Collection<MailboxId> mailboxIds, MailboxSession mailboxSession);
 
+    Publisher<Void> updateEmail(MessageId messageId, List<MailboxId> 
mailboxIds, Flags newState, FlagsUpdateMode replace, MailboxSession 
mailboxSession);
+
     default List<MessageResult> getMessage(MessageId messageId, FetchGroup 
fetchGroup, MailboxSession mailboxSession) throws MailboxException {
         return getMessages(ImmutableList.of(messageId), fetchGroup, 
mailboxSession);
     }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 6a7187e15c..3c5d4ef76e 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -38,6 +38,7 @@ import jakarta.mail.Flags;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.events.EventBus;
+import org.apache.james.events.EventBus.EventWithRegistrationKey;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager;
@@ -94,6 +95,8 @@ import reactor.core.scheduler.Schedulers;
 
 public class StoreMessageIdManager implements MessageIdManager {
 
+    public static final int SET_FLAGS_CONCURRENCY = 4;
+
     public static ImmutableSet<MailboxId> toMailboxIds(List<MailboxMessage> 
mailboxMessages) {
         return mailboxMessages
             .stream()
@@ -129,21 +132,22 @@ public class StoreMessageIdManager implements 
MessageIdManager {
 
     @Override
     public Mono<Void> setFlagsReactive(Flags newState, 
MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> 
mailboxIds, MailboxSession mailboxSession) {
-        MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
-        MailboxMapper mailboxMapper = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
-
-        int concurrency = 4;
+        return setFlagsReactiveWithoutEventPublishing(newState, replace, 
messageId, mailboxIds, mailboxSession)
+            .collectList()
+            .flatMap(eventBus::dispatch);
+    }
 
+    private Flux<EventWithRegistrationKey> 
setFlagsReactiveWithoutEventPublishing(Flags newState, 
MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> 
mailboxIds, MailboxSession mailboxSession) {
         return Flux.fromIterable(mailboxIds)
-            .flatMap(mailboxMapper::findMailboxById, concurrency)
+            
.flatMap(mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)::findMailboxById,
 SET_FLAGS_CONCURRENCY)
             .collect(ImmutableList.toImmutableList())
-            .flatMap(Throwing.<List<Mailbox>, 
Mono<Void>>function(targetMailboxes -> {
+            .flatMapMany(Throwing.<List<Mailbox>, 
Flux<EventWithRegistrationKey>>function(targetMailboxes -> {
                 assertRightsOnMailboxes(targetMailboxes, mailboxSession, 
Right.Write);
 
-                return messageIdMapper.setFlags(messageId, mailboxIds, 
newState, replace)
+                return 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession)
+                    .setFlags(messageId, mailboxIds, newState, replace)
                     .flatMapIterable(updatedFlags -> 
updatedFlags.asMap().entrySet())
-                    .concatMap(entry -> dispatchFlagsChange(mailboxSession, 
entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes))
-                    .then();
+                    .concatMap(entry -> flagsChangedEvent(mailboxSession, 
entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes));
             }).sneakyThrow());
     }
 
@@ -321,44 +325,60 @@ public class StoreMessageIdManager implements 
MessageIdManager {
 
     @Override
     public Mono<Void> setInMailboxesReactive(MessageId messageId, 
Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) {
+        return setInMailboxesWithoutEventPublishing(messageId, 
targetMailboxIds, mailboxSession)
+            .collectList()
+            .flatMap(eventBus::dispatch);
+    }
+
+    private Flux<EventWithRegistrationKey> 
setInMailboxesWithoutEventPublishing(MessageId messageId, Collection<MailboxId> 
targetMailboxIds, MailboxSession mailboxSession) {
         return findRelatedMailboxMessages(messageId, mailboxSession)
-            .flatMap(currentMailboxMessages -> 
messageMovesWithMailbox(MessageMoves.builder()
+            .flatMapMany(currentMailboxMessages -> 
messageMovesWithMailbox(MessageMoves.builder()
                 .targetMailboxIds(targetMailboxIds)
                 .previousMailboxIds(toMailboxIds(currentMailboxMessages))
                 .build(), mailboxSession)
-                .flatMap(Throwing.<MessageMovesWithMailbox, 
Mono<Void>>function(messageMove -> {
+                .flatMapMany(Throwing.<MessageMovesWithMailbox, 
Flux<EventWithRegistrationKey>>function(messageMove -> {
                     MessageMovesWithMailbox refined = 
messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read));
 
                     if (messageMove.getPreviousMailboxes().isEmpty()) {
                         LOGGER.info("Tried to access {} not accessible for 
{}", messageId, mailboxSession.getUser().asString());
-                        return Mono.empty();
+                        return Flux.empty();
                     }
                     if (refined.getPreviousMailboxes().isEmpty()) {
                         MailboxPath unreadablePreviousMailbox = 
messageMove.getPreviousMailboxes().iterator().next().generateAssociatedPath();
-                        return Mono.error(() -> new 
MailboxNotFoundException(unreadablePreviousMailbox));
+                        return Flux.error(() -> new 
MailboxNotFoundException(unreadablePreviousMailbox));
                     }
                     if (refined.isChange()) {
                         return applyMessageMoves(mailboxSession, 
currentMailboxMessages, refined);
                     }
-                    return Mono.empty();
+                    return Flux.empty();
                 }).sneakyThrow()));
     }
 
+    @Override
+    public Publisher<Void> updateEmail(MessageId messageId, List<MailboxId> 
targetMailboxIds, Flags newState, MessageManager.FlagsUpdateMode replace, 
MailboxSession mailboxSession) {
+        return Flux.concat(
+                setInMailboxesWithoutEventPublishing(messageId, 
targetMailboxIds, mailboxSession),
+                setFlagsReactiveWithoutEventPublishing(newState, replace, 
messageId, targetMailboxIds, mailboxSession))
+            .collectList()
+            .flatMap(eventBus::dispatch);
+    }
+
     public void setInMailboxesNoCheck(MessageId messageId, MailboxId 
targetMailboxId, MailboxSession mailboxSession) throws MailboxException {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
         List<MailboxMessage> currentMailboxMessages = 
messageIdMapper.find(ImmutableList.of(messageId), 
MessageMapper.FetchType.METADATA);
 
-
         
MailboxReactorUtils.block(messageMovesWithMailbox(MessageMoves.builder()
             .targetMailboxIds(targetMailboxId)
             .previousMailboxIds(toMailboxIds(currentMailboxMessages))
             .build(), mailboxSession)
-            .flatMap(messageMove -> {
+            .flatMapMany(messageMove -> {
                 if (messageMove.isChange()) {
                     return applyMessageMoveNoMailboxChecks(mailboxSession, 
currentMailboxMessages, messageMove);
                 }
-                return Mono.empty();
-            }));
+                return Flux.empty();
+            })
+            .collectList()
+            .flatMap(eventBus::dispatch));
     }
 
     private Mono<List<MailboxMessage>> findRelatedMailboxMessages(MessageId 
messageId, MailboxSession mailboxSession) {
@@ -368,7 +388,7 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(ImmutableList.toImmutableList());
     }
 
-    private Mono<Void> applyMessageMoves(MailboxSession mailboxSession, 
List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox 
messageMoves) throws MailboxNotFoundException {
+    private Flux<EventWithRegistrationKey> applyMessageMoves(MailboxSession 
mailboxSession, List<MailboxMessage> currentMailboxMessages, 
MessageMovesWithMailbox messageMoves) throws MailboxNotFoundException {
         assertRightsOnMailboxes(messageMoves.addedMailboxes(), mailboxSession, 
Right.Insert);
         assertRightsOnMailboxes(messageMoves.removedMailboxes(), 
mailboxSession, Right.DeleteMessages);
         assertRightsOnMailboxes(messageMoves.getTargetMailboxes(), 
mailboxSession, Right.Read);
@@ -376,11 +396,11 @@ public class StoreMessageIdManager implements 
MessageIdManager {
         return applyMessageMoveNoMailboxChecks(mailboxSession, 
currentMailboxMessages, messageMoves);
     }
 
-    private Mono<Void> applyMessageMoveNoMailboxChecks(MailboxSession 
mailboxSession, List<MailboxMessage> currentMailboxMessages, 
MessageMovesWithMailbox messageMoves) {
+    private Flux<EventWithRegistrationKey> 
applyMessageMoveNoMailboxChecks(MailboxSession mailboxSession, 
List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox 
messageMoves) {
         Optional<MailboxMessage> mailboxMessage = 
currentMailboxMessages.stream().findAny();
 
         if (mailboxMessage.isEmpty()) {
-            return Mono.error(new MailboxNotFoundException("can't load 
message"));
+            return Flux.error(new MailboxNotFoundException("can't load 
message"));
         }
         List<Pair<MailboxMessage, Mailbox>> messagesToRemove = 
currentMailboxMessages.stream()
             .flatMap(message -> messageMoves.removedMailboxes()
@@ -390,23 +410,21 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(ImmutableList.toImmutableList());
 
         return validateQuota(messageMoves, mailboxMessage.get())
-            .then(Flux.concat(
-                    addMessageToMailboxes(mailboxMessage.get(), messageMoves, 
mailboxSession),
-                    
expungeMessageFromMailboxes(mailboxMessage.get().getMessageId(), 
messagesToRemove, mailboxSession, messageMoves),
-                    Flux.just(new 
EventBus.EventWithRegistrationKey(EventFactory.moved()
-                            .session(mailboxSession)
-                            .messageMoves(messageMoves.asMessageMoves())
-                            .messageId(mailboxMessage.get().getMessageId())
-                            .build(),
-                        messageMoves.impactedMailboxes()
-                            .map(Mailbox::getMailboxId)
-                            .map(MailboxIdRegistrationKey::new)
-                            .collect(ImmutableSet.toImmutableSet()))))
-                .collectList()
-                .flatMap(eventBus::dispatch));
-    }
-
-    private Flux<EventBus.EventWithRegistrationKey> 
expungeMessageFromMailboxes(MessageId messageId, List<Pair<MailboxMessage, 
Mailbox>> messages, MailboxSession mailboxSession, MessageMovesWithMailbox 
messageMoves) {
+            .thenMany(Flux.concat(
+                addMessageToMailboxes(mailboxMessage.get(), messageMoves, 
mailboxSession),
+                
expungeMessageFromMailboxes(mailboxMessage.get().getMessageId(), 
messagesToRemove, mailboxSession, messageMoves),
+                Flux.just(new EventWithRegistrationKey(EventFactory.moved()
+                    .session(mailboxSession)
+                    .messageMoves(messageMoves.asMessageMoves())
+                    .messageId(mailboxMessage.get().getMessageId())
+                    .build(),
+                    messageMoves.impactedMailboxes()
+                        .map(Mailbox::getMailboxId)
+                        .map(MailboxIdRegistrationKey::new)
+                        .collect(ImmutableSet.toImmutableSet())))));
+    }
+
+    private Flux<EventWithRegistrationKey> 
expungeMessageFromMailboxes(MessageId messageId, List<Pair<MailboxMessage, 
Mailbox>> messages, MailboxSession mailboxSession, MessageMovesWithMailbox 
messageMoves) {
         if (messages.isEmpty()) {
             return Flux.empty();
         }
@@ -422,7 +440,7 @@ public class StoreMessageIdManager implements 
MessageIdManager {
                 .map(message -> expungedEvent(message, mailboxSession, 
messageMoves)));
     }
 
-    private EventBus.EventWithRegistrationKey 
expungedEvent(Pair<MailboxMessage, Mailbox> message, MailboxSession 
mailboxSession, MessageMovesWithMailbox messageMoves) {
+    private EventWithRegistrationKey expungedEvent(Pair<MailboxMessage, 
Mailbox> message, MailboxSession mailboxSession, MessageMovesWithMailbox 
messageMoves) {
         EventFactory.ExpungedFinalStage.Builder eventBuilder = 
EventFactory.expunged()
             .randomEventId()
             .mailboxSession(mailboxSession)
@@ -431,23 +449,21 @@ public class StoreMessageIdManager implements 
MessageIdManager {
         if (isSingleMove(messageMoves)) {
             
eventBuilder.movedTo(messageMoves.addedMailboxes().iterator().next().getMailboxId());
         }
-        return new EventBus.EventWithRegistrationKey(eventBuilder.build(), 
ImmutableSet.of(new 
MailboxIdRegistrationKey(message.getRight().getMailboxId())));
+        return new EventWithRegistrationKey(eventBuilder.build(), 
ImmutableSet.of(new 
MailboxIdRegistrationKey(message.getRight().getMailboxId())));
     }
 
-    private Mono<Void> dispatchFlagsChange(MailboxSession mailboxSession, 
MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> 
knownMailboxes) {
+    private Mono<EventWithRegistrationKey> flagsChangedEvent(MailboxSession 
mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, 
List<Mailbox> knownMailboxes) {
         return knownMailboxes.stream()
             .filter(knownMailbox -> 
knownMailbox.getMailboxId().equals(mailboxId))
             .findFirst()
             .map(Mono::just)
             .orElseGet(() -> 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId))
-            .flatMap(mailbox ->
-                eventBus.dispatch(EventFactory.flagsUpdated()
-                        .randomEventId()
-                        .mailboxSession(mailboxSession)
-                        .mailbox(mailbox)
-                        .updatedFlags(updatedFlags)
-                        .build(),
-                    new MailboxIdRegistrationKey(mailboxId)));
+            .map(mailbox -> new 
EventWithRegistrationKey(EventFactory.flagsUpdated()
+                .randomEventId()
+                .mailboxSession(mailboxSession)
+                .mailbox(mailbox)
+                .updatedFlags(updatedFlags)
+                .build(), ImmutableSet.of(new 
MailboxIdRegistrationKey(mailboxId))));
     }
 
     private Mono<Void> validateQuota(MessageMovesWithMailbox messageMoves, 
MailboxMessage mailboxMessage) {
@@ -490,11 +506,11 @@ public class StoreMessageIdManager implements 
MessageIdManager {
         }
     }
 
-    private Flux<EventBus.EventWithRegistrationKey> 
addMessageToMailboxes(MailboxMessage mailboxMessage, MessageMovesWithMailbox 
messageMoves, MailboxSession mailboxSession) {
+    private Flux<EventWithRegistrationKey> 
addMessageToMailboxes(MailboxMessage mailboxMessage, MessageMovesWithMailbox 
messageMoves, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
         return Flux.fromIterable(messageMoves.addedMailboxes())
-            .flatMap(Throwing.<Mailbox, 
Mono<EventBus.EventWithRegistrationKey>>function(mailbox -> {
+            .flatMap(Throwing.<Mailbox, 
Mono<EventWithRegistrationKey>>function(mailbox -> {
                 MailboxACL.Rfc4314Rights myRights = 
rightManager.myRights(mailbox, mailboxSession);
                 boolean shouldPreserveFlags = myRights.contains(Right.Write);
                 MailboxMessage copy = mailboxMessage.copy(mailbox);
@@ -513,7 +529,7 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             }).sneakyThrow());
     }
 
-    private EventBus.EventWithRegistrationKey addedEvent(MailboxSession 
mailboxSession, Mailbox mailbox, MessageMetaData messageMetaData, 
MessageMovesWithMailbox messageMoves) {
+    private EventWithRegistrationKey addedEvent(MailboxSession mailboxSession, 
Mailbox mailbox, MessageMetaData messageMetaData, MessageMovesWithMailbox 
messageMoves) {
         EventFactory.AddedFinalStage.Builder appended = EventFactory.added()
             .randomEventId()
             .mailboxSession(mailboxSession)
@@ -524,7 +540,7 @@ public class StoreMessageIdManager implements 
MessageIdManager {
         if (isSingleMove(messageMoves)) {
             
appended.movedFrom(messageMoves.removedMailboxes().iterator().next().getMailboxId());
         }
-        return new EventBus.EventWithRegistrationKey(appended.build(), 
ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId())));
+        return new EventWithRegistrationKey(appended.build(), 
ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId())));
     }
 
     private boolean isSingleMove(MessageMovesWithMailbox messageMoves) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to