This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new a2c03ed263 JAMES-4188 Improve range optimization to account for 
non-contiguous r… (#2972)
a2c03ed263 is described below

commit a2c03ed263fd61ab907eb73952b06daa9cb867bf
Author: Benoit TELLIER <[email protected]>
AuthorDate: Wed Mar 11 03:57:33 2026 +0100

    JAMES-4188 Improve range optimization to account for non-contiguous r… 
(#2972)
---
 .../org/apache/james/mailbox/MailboxManager.java   |  10 ++
 .../org/apache/james/mailbox/MessageManager.java   |   7 +
 .../james/mailbox/store/StoreMailboxManager.java   |  22 +++
 .../james/mailbox/store/StoreMessageManager.java   | 170 +++++++++++++++++++++
 .../processor/AbstractMessageRangeProcessor.java   |  15 +-
 .../apache/james/imap/processor/CopyProcessor.java |  10 ++
 .../apache/james/imap/processor/MoveProcessor.java |   6 +
 .../james/imap/processor/StoreProcessor.java       |  44 +++++-
 .../james/imap/processor/CopyProcessorTest.java    |   5 +-
 .../james/imap/processor/MoveProcessorTest.java    |   5 +-
 .../jmap/rfc8621/contract/WebSocketContract.scala  | 154 +++++++++++++++++++
 .../jmap/method/EmailSetUpdatePerformer.scala      |  16 +-
 12 files changed, 446 insertions(+), 18 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index ec9333e381..aedc0fb3d5 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -358,6 +358,16 @@ public interface MailboxManager extends RequestAware, 
RightManager, MailboxAnnot
             .flatMapIterable(Function.identity());
     }
 
+    default Publisher<MessageRange> moveMessagesReactive(List<MessageRange> 
sets, MailboxId from, MailboxId to, MailboxSession session) {
+        return Flux.fromIterable(sets)
+            .concatMap(set -> moveMessagesReactive(set, from, to, session));
+    }
+
+    default Publisher<MessageRange> copyMessagesReactive(List<MessageRange> 
sets, MailboxId from, MailboxId to, MailboxSession session) {
+        return Flux.fromIterable(sets)
+            .concatMap(set -> copyMessagesReactive(set, from, to, session));
+    }
+
     enum MailboxSearchFetchType {
         Minimal,
         Counters
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
index c87729aed6..e2cf98fb2d 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
@@ -163,6 +163,13 @@ public interface MessageManager {
         return Mono.fromCallable(() -> setFlags(flags, flagsUpdateMode, set, 
mailboxSession));
     }
 
+    default Publisher<Map<MessageUid, Flags>> setFlagsReactive(Flags flags, 
FlagsUpdateMode flagsUpdateMode, List<MessageRange> sets, MailboxSession 
mailboxSession) {
+        return Flux.fromIterable(sets)
+            .concatMap(set -> Mono.from(setFlagsReactive(flags, 
flagsUpdateMode, set, mailboxSession)))
+            .flatMapIterable(Map::entrySet)
+            .collectMap(Map.Entry::getKey, Map.Entry::getValue);
+    }
+
     class AppendResult {
         private final ComposedMessageId id;
         private final Long size;
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index ba95f9a5e5..725b628807 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -818,6 +818,28 @@ public class StoreMailboxManager implements MailboxManager 
{
             });
     }
 
+    @Override
+    public Flux<MessageRange> moveMessagesReactive(List<MessageRange> sets, 
MailboxId from, MailboxId to, MailboxSession session) {
+        return Mono.zip(Mono.from(getMailboxReactive(from, session)), 
Mono.from(getMailboxReactive(to, session)))
+            .flatMapMany(fromTo -> {
+                StoreMessageManager fromMessageManager = (StoreMessageManager) 
fromTo.getT1();
+                StoreMessageManager toMessageManager = (StoreMessageManager) 
fromTo.getT2();
+
+                return fromMessageManager.moveTo(sets, toMessageManager, 
session);
+            });
+    }
+
+    @Override
+    public Flux<MessageRange> copyMessagesReactive(List<MessageRange> sets, 
MailboxId from, MailboxId to, MailboxSession session) {
+        return Mono.zip(Mono.from(getMailboxReactive(from, session)), 
Mono.from(getMailboxReactive(to, session)))
+            .flatMapMany(fromTo -> {
+                StoreMessageManager fromMessageManager = (StoreMessageManager) 
fromTo.getT1();
+                StoreMessageManager toMessageManager = (StoreMessageManager) 
fromTo.getT2();
+
+                return fromMessageManager.copyTo(sets, toMessageManager, 
session);
+            });
+    }
+
     @Override
     public Flux<MailboxMetaData> search(MailboxQuery expression, 
MailboxSearchFetchType fetchType, MailboxSession session) {
         Mono<List<Mailbox>> mailboxesMono = searchMailboxes(expression, 
session, Right.Lookup).collectList();
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 283cce7167..1db6cc08a3 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -764,6 +764,36 @@ public class StoreMessageManager implements MessageManager 
{
             });
     }
 
+    @Override
+    public Publisher<Map<MessageUid, Flags>> setFlagsReactive(Flags flags, 
FlagsUpdateMode flagsUpdateMode, List<MessageRange> sets, MailboxSession 
mailboxSession) {
+        return ensureFlagsWrite(flags, flagsUpdateMode, mailboxSession)
+            .map(Mono::<Map<MessageUid, Flags>>error)
+            .orElseGet(() -> {
+                trimFlags(flags, mailboxSession);
+                MessageMapper messageMapper = 
mapperFactory.getMessageMapper(mailboxSession);
+                FlagsUpdateCalculator calculator = new 
FlagsUpdateCalculator(flags, flagsUpdateMode);
+
+                return Flux.fromIterable(sets)
+                    .concatMap(set -> 
messageMapper.executeReactive(messageMapper.updateFlagsReactive(getMailboxEntity(),
 calculator, set)))
+                    .collectList()
+                    .flatMap(allUpdatedFlagsPerRange -> {
+                        ImmutableList<UpdatedFlags> allUpdatedFlags = 
allUpdatedFlagsPerRange.stream()
+                            .flatMap(List::stream)
+                            .collect(ImmutableList.toImmutableList());
+                        return eventBus.dispatch(EventFactory.flagsUpdated()
+                                    .randomEventId()
+                                    .mailboxSession(mailboxSession)
+                                    .mailbox(getMailboxEntity())
+                                    .updatedFlags(allUpdatedFlags)
+                                    .build(),
+                                new 
MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                            
.thenReturn(allUpdatedFlags.stream().collect(ImmutableMap.toImmutableMap(
+                                UpdatedFlags::getUid,
+                                UpdatedFlags::getNewFlags)));
+                    });
+            });
+    }
+
     /**
      * Copy the {@link MessageRange} to the {@link StoreMessageManager}
      */
@@ -781,6 +811,66 @@ public class StoreMessageManager implements MessageManager 
{
             MailboxPathLocker.LockType.Write));
     }
 
+    public Flux<MessageRange> copyTo(List<MessageRange> sets, 
StoreMessageManager toMailbox, MailboxSession session) {
+        if (!toMailbox.isWriteable(session)) {
+            return Flux.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
+        }
+        if (!storeRightManager.myRights(toMailbox.mailbox, 
session).contains(MailboxACL.Right.Insert)) {
+            return Flux.error(new InsufficientRightsException("Append messages 
requires 'i' right"));
+        }
+        return 
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+            copyAll(sets, toMailbox, session)
+                .flatMapIterable(map -> MessageRange.toRanges(new 
ArrayList<>(map.keySet()))),
+            MailboxPathLocker.LockType.Write));
+    }
+
+    private Mono<SortedMap<MessageUid, MessageMetaData>> 
copyAll(List<MessageRange> sets, StoreMessageManager to, MailboxSession 
session) {
+        return Flux.fromIterable(sets)
+            .concatMap(set -> retrieveOriginalRows(set, session))
+            .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+            .concatMap(window -> window.collectList()
+                .flatMap(originalRows -> to.copy(originalRows, 
session).collectList()
+                    .map(copyResult -> Pair.of(
+                        collectMetadata(copyResult.iterator()),
+                        originalRows.stream()
+                            
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+                            .collect(ImmutableList.toImmutableList())))))
+            .collectList()
+            .flatMap(allResults -> {
+                if (allResults.isEmpty()) {
+                    return Mono.just(ImmutableSortedMap.of());
+                }
+                SortedMap<MessageUid, MessageMetaData> allCopiedUids = new 
TreeMap<>();
+                List<MessageId> allMessageIds = new ArrayList<>();
+                for (Pair<SortedMap<MessageUid, MessageMetaData>, 
ImmutableList<MessageId>> result : allResults) {
+                    allCopiedUids.putAll(result.getLeft());
+                    allMessageIds.addAll(result.getRight());
+                }
+                MessageMoves messageMoves = MessageMoves.builder()
+                    .previousMailboxIds(getMailboxEntity().getMailboxId())
+                    .targetMailboxIds(to.getMailboxEntity().getMailboxId(), 
getMailboxEntity().getMailboxId())
+                    .build();
+                EventBus.EventWithRegistrationKey added = new 
EventBus.EventWithRegistrationKey(
+                    EventFactory.added()
+                        .randomEventId()
+                        .mailboxSession(session)
+                        .mailbox(to.getMailboxEntity())
+                        .metaData(allCopiedUids)
+                        .isDelivery(!IS_DELIVERY)
+                        .isAppended(!IS_APPENDED)
+                        .build(),
+                    ImmutableSet.of(new 
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())));
+                EventBus.EventWithRegistrationKey moved = new 
EventBus.EventWithRegistrationKey(
+                    EventFactory.moved()
+                        .messageMoves(messageMoves)
+                        .messageId(allMessageIds)
+                        .session(session)
+                        .build(),
+                    
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet()));
+                return Mono.from(eventBus.dispatch(ImmutableList.of(added, 
moved))).thenReturn(allCopiedUids);
+            });
+    }
+
     /**
      * Move the {@link MessageRange} to the {@link StoreMessageManager}
      */
@@ -804,6 +894,86 @@ public class StoreMessageManager implements MessageManager 
{
             MailboxPathLocker.LockType.Write));
     }
 
+    public Flux<MessageRange> moveTo(List<MessageRange> sets, 
StoreMessageManager toMailbox, MailboxSession session) {
+        if (!isWriteable(session)) {
+            return Flux.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
+        }
+        if (!storeRightManager.myRights(mailbox, 
session).contains(MailboxACL.Right.PerformExpunge)) {
+            return Flux.error(new InsufficientRightsException("Deleting 
messages requires 'e' right"));
+        }
+        if (!toMailbox.isWriteable(session)) {
+            return Flux.error(new 
ReadOnlyException(toMailbox.getMailboxPath()));
+        }
+        if (!storeRightManager.myRights(toMailbox.mailbox, 
session).contains(MailboxACL.Right.Insert)) {
+            return Flux.error(new InsufficientRightsException("Append messages 
requires 'i' right"));
+        }
+        return 
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+            moveAll(sets, toMailbox, session)
+                .flatMapIterable(map -> MessageRange.toRanges(new 
ArrayList<>(map.keySet()))),
+            MailboxPathLocker.LockType.Write));
+    }
+
+    private Mono<SortedMap<MessageUid, MessageMetaData>> 
moveAll(List<MessageRange> sets, StoreMessageManager to, MailboxSession 
session) {
+        return Flux.fromIterable(sets)
+            .concatMap(set -> retrieveOriginalRows(set, session))
+            .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+            .concatMap(window -> window
+                .collectList()
+                .flatMap(originalRows -> to.move(originalRows, session)
+                    .map(moveResult -> Pair.of(moveResult, originalRows))))
+            .collectList()
+            .flatMap(allResults -> {
+                if (allResults.isEmpty()) {
+                    return Mono.just(ImmutableSortedMap.of());
+                }
+
+                SortedMap<MessageUid, MessageMetaData> allMoveUids = new 
TreeMap<>();
+                List<MessageMetaData> allOriginalMessages = new ArrayList<>();
+                List<MessageId> allMessageIds = new ArrayList<>();
+
+                for (Pair<MoveResult, List<MailboxMessage>> result : 
allResults) {
+                    
allMoveUids.putAll(collectMetadata(result.getLeft().getMovedMessages().iterator()));
+                    
allOriginalMessages.addAll(result.getLeft().getOriginalMessages());
+                    result.getRight().stream()
+                        
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+                        .forEach(allMessageIds::add);
+                }
+
+                MessageMoves messageMoves = MessageMoves.builder()
+                    .previousMailboxIds(getMailboxEntity().getMailboxId())
+                    .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+                    .build();
+
+                EventBus.EventWithRegistrationKey added = new 
EventBus.EventWithRegistrationKey(EventFactory.added()
+                    .randomEventId()
+                    .mailboxSession(session)
+                    .mailbox(to.getMailboxEntity())
+                    .metaData(allMoveUids)
+                    .isDelivery(!IS_DELIVERY)
+                    .isAppended(!IS_APPENDED)
+                    .movedFrom(getId())
+                    .build(),
+                    ImmutableSet.of(new 
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())));
+                EventBus.EventWithRegistrationKey expunged = new 
EventBus.EventWithRegistrationKey(EventFactory.expunged()
+                    .randomEventId()
+                    .mailboxSession(session)
+                    .mailbox(getMailboxEntity())
+                    .addMetaData(allOriginalMessages)
+                    .movedTo(to.getId())
+                    .build(),
+                    ImmutableSet.of(new 
MailboxIdRegistrationKey(mailbox.getMailboxId())));
+                EventBus.EventWithRegistrationKey moved = new 
EventBus.EventWithRegistrationKey(EventFactory.moved()
+                    .messageMoves(messageMoves)
+                    .messageId(allMessageIds)
+                    .session(session)
+                    .build(),
+                    
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet()));
+
+                return Mono.from(eventBus.dispatch(ImmutableList.of(added, 
expunged, moved)))
+                    .thenReturn(allMoveUids);
+            });
+    }
+
     @Override
     public long getMessageCount(MailboxSession mailboxSession) throws 
MailboxException {
         return 
mapperFactory.getMessageMapper(mailboxSession).countMessagesInMailbox(getMailboxEntity());
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
index 0c8aa5b8e8..d933ce9468 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.imap.processor;
 
+import java.util.List;
 import java.util.Objects;
 
 import org.apache.james.core.Username;
@@ -68,6 +69,14 @@ public abstract class AbstractMessageRangeProcessor<R 
extends AbstractMessageRan
                                                   MailboxSession 
mailboxSession,
                                                   MessageRange messageSet);
 
+    protected Flux<MessageRange> processAll(MailboxId targetMailbox,
+                                            SelectedMailbox currentMailbox,
+                                            MailboxSession mailboxSession,
+                                            List<MessageRange> messageSets) {
+        return Flux.fromIterable(messageSets)
+            .concatMap(set -> process(targetMailbox, currentMailbox, 
mailboxSession, set));
+    }
+
     protected abstract String getOperationName();
 
     @Override
@@ -120,7 +129,8 @@ public abstract class AbstractMessageRangeProcessor<R 
extends AbstractMessageRan
                                 .orElseThrow(() -> new 
MessageRangeException(range.getFormattedString() + " is an invalid range")))
                             .sneakyThrow())
                         .filter(Objects::nonNull)
-                        .concatMap(range -> process(target.getId(), 
session.getSelected(), mailboxSession, range)
+                        .collectList()
+                        .flatMapMany(ranges -> processAll(target.getId(), 
session.getSelected(), mailboxSession, ranges)
                             .doOnEach(ReactorUtils.logFinally(() -> 
AuditTrail.entry()
                                 .username(() -> 
mailboxSession.getUser().asString())
                                 .sessionId(() -> 
session.sessionId().asString())
@@ -128,8 +138,7 @@ public abstract class AbstractMessageRangeProcessor<R 
extends AbstractMessageRan
                                 .action(getOperationName())
                                 .parameters(() -> 
ImmutableMap.of("loggedInUser", 
mailboxSession.getLoggedInUser().map(Username::asString).orElse(""),
                                     "targetId", target.getId().serialize(),
-                                    "selectedMailboxId", 
session.getSelected().getMailboxId().serialize(),
-                                    "range", range.getUidFrom().asLong() + ":" 
+ range.getUidTo().asLong()))
+                                    "selectedMailboxId", 
session.getSelected().getMailboxId().serialize()))
                                 .log("IMAP " + getOperationName() + " 
succeeded.")))
                             .map(IdRange::from))
                         .collect(ImmutableList.<IdRange>toImmutableList())
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
index bde419a541..cae14215cd 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.imap.processor;
 
+import java.util.List;
+
 import jakarta.inject.Inject;
 
 import org.apache.james.imap.api.message.IdRange;
@@ -56,6 +58,14 @@ public class CopyProcessor extends 
AbstractMessageRangeProcessor<CopyRequest> {
         return Flux.from(getMailboxManager().copyMessagesReactive(messageSet, 
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
     }
 
+    @Override
+    protected Flux<MessageRange> processAll(MailboxId targetMailbox,
+                                            SelectedMailbox currentMailbox,
+                                            MailboxSession mailboxSession,
+                                            List<MessageRange> messageSets) {
+        return Flux.from(getMailboxManager().copyMessagesReactive(messageSets, 
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
+    }
+
     @Override
     protected MDCBuilder mdc(CopyRequest request) {
         return MDCBuilder.create()
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
index 6b66e9f937..5a5788c01b 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
@@ -59,6 +59,12 @@ public class MoveProcessor extends 
AbstractMessageRangeProcessor<MoveRequest> im
         return Flux.from(getMailboxManager().moveMessagesReactive(messageSet, 
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
     }
 
+    @Override
+    protected Flux<MessageRange> processAll(MailboxId targetMailbox, 
SelectedMailbox currentMailbox,
+                                            MailboxSession mailboxSession, 
List<MessageRange> messageSets) {
+        return Flux.from(getMailboxManager().moveMessagesReactive(messageSets, 
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
+    }
+
     @Override
     protected String getOperationName() {
         return "Move";
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
index 9a08669b06..1d136cacaa 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
@@ -98,8 +98,15 @@ public class StoreProcessor extends 
AbstractMailboxProcessor<StoreRequest> {
                 .map(Throwing.<IdRange, MessageRange>function(idRange -> 
messageRange(selected, idRange, request.isUseUids())
                     .orElseThrow(() -> new 
MessageRangeException(idRange.getFormattedString() + " is an invalid range")))
                     .sneakyThrow())
-                .concatMap(messageSet -> handleRange(request, session, 
responder, selected, mailbox, mailboxSession, failed, failedMsns, userFlags, 
messageSet))
-                .then())
+                .collectList()
+                .flatMap(messageSets -> {
+                    if (request.getUnchangedSince() != -1) {
+                        return Flux.fromIterable(messageSets)
+                            .concatMap(messageSet -> handleRange(request, 
session, responder, selected, mailbox, mailboxSession, failed, failedMsns, 
userFlags, messageSet))
+                            .then();
+                    }
+                    return setFlagsAll(request, session, responder, selected, 
mailbox, mailboxSession, messageSets);
+                }))
             .then(unsolicitedResponses(session, responder, omitExpunged, 
request.isUseUids()))
             .doOnSuccess(any -> {
                 // check if we had some failed uids which didn't pass the 
UNCHANGEDSINCE filter
@@ -214,18 +221,47 @@ public class StoreProcessor extends 
AbstractMailboxProcessor<StoreRequest> {
     }
 
     /**
-     * Set the flags for given messages
+     * Set the flags for given messages (single range)
      */
     private Mono<Void> setFlags(StoreRequest request, MailboxSession 
mailboxSession, MessageManager mailbox, MessageRange messageSet, ImapSession 
session, Responder responder) {
         boolean silent = request.isSilent();
         long unchangedSince = request.getUnchangedSince();
-        
+
         SelectedMailbox selected = session.getSelected();
         return Mono.from(mailbox.setFlagsReactive(request.getFlags(), 
request.getFlagsUpdateMode(), messageSet, mailboxSession))
             .doOnNext(flagsByUid -> handlePermanentFlagChanges(mailboxSession, 
mailbox, responder, selected))
             .flatMap(flagsByUid -> handleCondstore(request, mailboxSession, 
mailbox, messageSet, session, responder, silent, unchangedSince, selected, 
flagsByUid));
     }
 
+    /**
+     * Set the flags for all message ranges in a single batched operation, 
dispatching a single event.
+     * Only used when UNCHANGEDSINCE == -1 (no CONDSTORE filtering needed).
+     */
+    private Mono<Void> setFlagsAll(StoreRequest request, ImapSession session, 
Responder responder,
+                                    SelectedMailbox selected, MessageManager 
mailbox,
+                                    MailboxSession mailboxSession, 
List<MessageRange> messageSets) {
+        boolean silent = request.isSilent();
+        Set<Capability> enabled = 
EnableProcessor.getEnabledCapabilities(session);
+        boolean qresyncEnabled = 
enabled.contains(ImapConstants.SUPPORTS_QRESYNC);
+        boolean condstoreEnabled = 
enabled.contains(ImapConstants.SUPPORTS_CONDSTORE);
+
+        return Mono.from(mailbox.setFlagsReactive(request.getFlags(), 
request.getFlagsUpdateMode(), messageSets, mailboxSession))
+            .doOnNext(flagsByUid -> handlePermanentFlagChanges(mailboxSession, 
mailbox, responder, selected))
+            .flatMap(flagsByUid -> {
+                if (!silent || qresyncEnabled || condstoreEnabled) {
+                    Mono<Map<MessageUid, ModSeq>> modSeqsMono = 
(qresyncEnabled || condstoreEnabled)
+                        ? Flux.fromIterable(messageSets)
+                            .concatMap(set -> 
Flux.from(mailbox.listMessagesMetadata(set, mailboxSession)))
+                            .collectMap(r -> 
r.getComposedMessageId().getUid(), ComposedMessageIdWithMetaData::getModSeq)
+                        : Mono.just(ImmutableMap.of());
+                    return modSeqsMono
+                        .doOnNext(modSeqs -> sendFetchResponses(responder, 
request.isUseUids(), silent, -1L, selected, flagsByUid, qresyncEnabled, 
condstoreEnabled, modSeqs))
+                        .then();
+                }
+                return Mono.empty();
+            });
+    }
+
     private Mono<Void> handleCondstore(StoreRequest request, MailboxSession 
mailboxSession, MessageManager mailbox, MessageRange messageSet, ImapSession 
session, Responder responder, boolean silent, long unchangedSince, 
SelectedMailbox selected, Map<MessageUid, Flags> flagsByUid) {
         Set<Capability> enabled = 
EnableProcessor.getEnabledCapabilities(session);
         boolean qresyncEnabled = 
enabled.contains(ImapConstants.SUPPORTS_QRESYNC);
diff --git 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
index 66a18ba397..67e467bca5 100644
--- 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
+++ 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.util.List;
 import java.util.Optional;
 
 import org.apache.james.core.Username;
@@ -112,7 +113,7 @@ class CopyProcessorTest {
         when(targetMessageManager.getMailboxEntity()).thenReturn(mailbox);
         StatusResponse okResponse = mock(StatusResponse.class);
         when(mockStatusResponseFactory.taggedOk(any(Tag.class), 
any(ImapCommand.class), any(HumanReadableText.class), 
any(StatusResponse.ResponseCode.class))).thenReturn(okResponse);
-        
when(mockMailboxManager.copyMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession)))
+        
when(mockMailboxManager.copyMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession)))
             .thenReturn(Flux.just(MessageRange.range(MessageUid.of(4), 
MessageUid.of(6))));
 
         testee.process(copyRequest, mockResponder, imapSession);
@@ -120,7 +121,7 @@ class CopyProcessorTest {
         verify(mockMailboxManager).manageProcessing(any(), any());
         verify(mockMailboxManager).mailboxExists(INBOX, mailboxSession);
         verify(mockMailboxManager).getMailboxReactive(any(MailboxPath.class), 
any(MailboxSession.class));
-        
verify(mockMailboxManager).copyMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession));
+        
verify(mockMailboxManager).copyMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession));
         verify(targetMessageManager).getMailboxEntity();
         verify(mockResponder).respond(okResponse);
     }
diff --git 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
index 9e7f4cf9f3..c883858cca 100644
--- 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
+++ 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.util.List;
 import java.util.Optional;
 
 import org.apache.james.core.Username;
@@ -126,7 +127,7 @@ public class MoveProcessorTest {
         when(targetMessageManager.getMailboxEntity()).thenReturn(mailbox);
         StatusResponse okResponse = mock(StatusResponse.class);
         when(mockStatusResponseFactory.taggedOk(any(Tag.class), 
any(ImapCommand.class), any(HumanReadableText.class), 
any(StatusResponse.ResponseCode.class))).thenReturn(okResponse);
-        
when(mockMailboxManager.moveMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession)))
+        
when(mockMailboxManager.moveMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession)))
             .thenReturn(Flux.just(MessageRange.range(MessageUid.of(4), 
MessageUid.of(6))));
 
         testee.process(moveRequest, mockResponder, imapSession);
@@ -135,7 +136,7 @@ public class MoveProcessorTest {
         verify(mockMailboxManager).manageProcessing(any(), any());
         verify(mockMailboxManager).mailboxExists(INBOX, mailboxSession);
         verify(mockMailboxManager).getMailboxReactive(INBOX, mailboxSession);
-        
verify(mockMailboxManager).moveMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession));
+        
verify(mockMailboxManager).moveMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
 MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class), 
eq(mailboxSession));
         verify(targetMessageManager).getMailboxEntity();
         verify(mockResponder).respond(okResponse);
     }
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index fe4ee4e126..7de3da9854 100644
--- 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -580,6 +580,160 @@ trait WebSocketContract {
       .contains(stateChange)
   }
 
+  @Test
+  @Timeout(180)
+  def bulkMoveWithNonContiguousUidsShouldGenerateSingleStateChange(server: 
GuiceJamesServer): Unit = {
+    // Moving N emails with non-contiguous UIDs (gap in the UID sequence) 
should
+    // still produce only ONE state change, not one per UID range.
+    val bobPath = MailboxPath.inbox(BOB)
+    val accountId: AccountId = AccountId.fromUsername(BOB)
+    val mailboxProbe = server.getProbe(classOf[MailboxProbeImpl])
+    val mailboxId = mailboxProbe.createMailbox(bobPath)
+    val mailboxId2 = mailboxProbe.createMailbox(MailboxPath.forUser(BOB, 
"destination"))
+
+    // Append 5 messages (UIDs will be 1, 2, 3, 4, 5)
+    val message1 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test1").setBody("body1", 
StandardCharsets.UTF_8).build()))
+    val message2 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test2").setBody("body2", 
StandardCharsets.UTF_8).build()))
+    mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test3").setBody("body3", 
StandardCharsets.UTF_8).build())) // UID 3 intentionally NOT moved
+    val message4 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test4").setBody("body4", 
StandardCharsets.UTF_8).build()))
+    val message5 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test5").setBody("body5", 
StandardCharsets.UTF_8).build()))
+
+    // Move messages 1, 2, 4, 5 — skipping UID 3 — producing 2 UID ranges: 
[1,2] and [4,5]
+    val messageId1 = message1.getMessageId.serialize()
+    val messageId2 = message2.getMessageId.serialize()
+    val messageId4 = message4.getMessageId.serialize()
+    val messageId5 = message5.getMessageId.serialize()
+
+    Thread.sleep(100)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["Mailbox", "Email"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            ws.send(WebSocketFrame.text(
+              s"""{
+                 |  "@type": "Request",
+                 |  "id": "req-bulk-move",
+                 |  "using": ["urn:ietf:params:jmap:core", 
"urn:ietf:params:jmap:mail"],
+                 |  "methodCalls": [
+                 |    ["Email/set", {
+                 |      "accountId": "$ACCOUNT_ID",
+                 |      "update": {
+                 |        "$messageId1": {"mailboxIds": 
{"${mailboxId2.serialize}": true}},
+                 |        "$messageId2": {"mailboxIds": 
{"${mailboxId2.serialize}": true}},
+                 |        "$messageId4": {"mailboxIds": 
{"${mailboxId2.serialize}": true}},
+                 |        "$messageId5": {"mailboxIds": 
{"${mailboxId2.serialize}": true}}
+                 |      }
+                 |    }, "c1"]]
+                 |}""".stripMargin))
+
+            // Expect: 1 API response + 1 state change (not 2 state changes 
for 2 UID ranges)
+            val payload1 = ws.receive().asPayload
+            val payload2 = ws.receive().asPayload
+            List(payload1, payload2)
+        })
+        .send(backend)
+        .body
+
+    Thread.sleep(1000)
+
+    val jmapGuiceProbe: JmapGuiceProbe = 
server.getProbe(classOf[JmapGuiceProbe])
+    val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
+    val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
+
+    val globalState: String = 
PushState.fromOption(Some(UuidState.fromJava(mailboxState)), 
Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = 
s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+
+    assertThat(response.toOption.get.asJava)
+      .hasSize(2) // 1 state change notification + 1 API response (not 2 state 
changes)
+      .contains(stateChange)
+  }
+
+  @Test
+  @Timeout(180)
+  def 
bulkFlagUpdateWithNonContiguousUidsShouldGenerateSingleStateChange(server: 
GuiceJamesServer): Unit = {
+    // Flagging N emails with non-contiguous UIDs (gap in the UID sequence) 
should
+    // still produce only ONE state change, not one per UID range.
+    // The optimization path (updateFlagsByRange) is triggered when sameUpdate 
&& singleMailbox && size > RANGE_THRESHOLD (3).
+    val bobPath = MailboxPath.inbox(BOB)
+    val accountId: AccountId = AccountId.fromUsername(BOB)
+    val mailboxProbe = server.getProbe(classOf[MailboxProbeImpl])
+    mailboxProbe.createMailbox(bobPath)
+
+    // Append 5 messages (UIDs will be 1, 2, 3, 4, 5)
+    val message1 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test1").setBody("body1", 
StandardCharsets.UTF_8).build()))
+    val message2 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test2").setBody("body2", 
StandardCharsets.UTF_8).build()))
+    mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test3").setBody("body3", 
StandardCharsets.UTF_8).build())) // UID 3 intentionally NOT flagged
+    val message4 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test4").setBody("body4", 
StandardCharsets.UTF_8).build()))
+    val message5 = mailboxProbe.appendMessage(BOB.asString(), bobPath, 
AppendCommand.from(Message.Builder.of().setSubject("test5").setBody("body5", 
StandardCharsets.UTF_8).build()))
+
+    // Flag messages 1, 2, 4, 5 — skipping UID 3 — producing 2 UID ranges: 
[1,2] and [4,5]
+    val messageId1 = message1.getMessageId.serialize()
+    val messageId2 = message2.getMessageId.serialize()
+    val messageId4 = message4.getMessageId.serialize()
+    val messageId5 = message5.getMessageId.serialize()
+
+    Thread.sleep(100)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["Mailbox", "Email"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            ws.send(WebSocketFrame.text(
+              s"""{
+                 |  "@type": "Request",
+                 |  "id": "req-bulk-flag",
+                 |  "using": ["urn:ietf:params:jmap:core", 
"urn:ietf:params:jmap:mail"],
+                 |  "methodCalls": [
+                 |    ["Email/set", {
+                 |      "accountId": "$ACCOUNT_ID",
+                 |      "update": {
+                 |        "$messageId1": {"keywords/$$seen": true},
+                 |        "$messageId2": {"keywords/$$seen": true},
+                 |        "$messageId4": {"keywords/$$seen": true},
+                 |        "$messageId5": {"keywords/$$seen": true}
+                 |      }
+                 |    }, "c1"]]
+                 |}""".stripMargin))
+
+            // Expect: 1 API response + 1 state change (not 2 state changes 
for 2 UID ranges)
+            val payload1 = ws.receive().asPayload
+            val payload2 = ws.receive().asPayload
+            List(payload1, payload2)
+        })
+        .send(backend)
+        .body
+
+    Thread.sleep(1000)
+
+    val jmapGuiceProbe: JmapGuiceProbe = 
server.getProbe(classOf[JmapGuiceProbe])
+    val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
+    val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
+
+    val globalState: String = 
PushState.fromOption(Some(UuidState.fromJava(mailboxState)), 
Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = 
s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+
+    assertThat(response.toOption.get.asJava)
+      .hasSize(2) // 1 state change notification + 1 API response (not 2 state 
changes)
+      .contains(stateChange)
+  }
+
   @Test
   @Timeout(180)
   def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index a4d0c21df8..8ace8e1f95 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -159,11 +159,11 @@ class EmailSetUpdatePerformer @Inject() (serializer: 
EmailSetSerializer,
                                  metaData: Map[MessageId, 
Iterable[ComposedMessageIdWithMetaData]],
                                  updateMode: FlagsUpdateMode,
                                  session: MailboxSession): 
SMono[Seq[EmailUpdateResult]] = {
-    val mailboxMono: SMono[MessageManager] = 
SMono(mailboxManager.getMailboxReactive(mailboxId, session))
-
-    mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
-
-      range => SMono(mailbox.setFlagsReactive(flags, updateMode, range, 
session)).`then`()))
+    val messageIds = metaData.keys.toSeq
+    SMono(mailboxManager.getMailboxReactive(mailboxId, session))
+      .flatMap(mailbox => SMono(mailbox.setFlagsReactive(flags, updateMode, 
ranges.asJava, session)).`then`())
+      .`then`(SMono.just(messageIds.map(EmailUpdateSuccess)))
+      .onErrorResume(e => SMono.just(messageIds.map(id => 
EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
   }
 
   private def moveByRange(mailboxId: MailboxId,
@@ -172,9 +172,11 @@ class EmailSetUpdatePerformer @Inject() (serializer: 
EmailSetSerializer,
                           metaData: Map[MessageId, 
Iterable[ComposedMessageIdWithMetaData]],
                           session: MailboxSession): 
SMono[Seq[EmailUpdateResult]] = {
     val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
+    val messageIds = metaData.keys.toSeq
 
-    updateByRange(ranges, metaData,
-      range => SMono(mailboxManager.moveMessagesReactive(range, mailboxId, 
targetId, session)).`then`())
+    SMono(mailboxManager.moveMessagesReactive(ranges.asJava, mailboxId, 
targetId, session)).`then`()
+      .`then`(SMono.just(messageIds.map(EmailUpdateSuccess)))
+      .onErrorResume(e => SMono.just(messageIds.map(id => 
EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
   }
 
   private def updateByRange(ranges: List[MessageRange],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to