This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new a2c03ed263 JAMES-4188 Improve range optimization to account for
non-contiguous r… (#2972)
a2c03ed263 is described below
commit a2c03ed263fd61ab907eb73952b06daa9cb867bf
Author: Benoit TELLIER <[email protected]>
AuthorDate: Wed Mar 11 03:57:33 2026 +0100
JAMES-4188 Improve range optimization to account for non-contiguous r…
(#2972)
---
.../org/apache/james/mailbox/MailboxManager.java | 10 ++
.../org/apache/james/mailbox/MessageManager.java | 7 +
.../james/mailbox/store/StoreMailboxManager.java | 22 +++
.../james/mailbox/store/StoreMessageManager.java | 170 +++++++++++++++++++++
.../processor/AbstractMessageRangeProcessor.java | 15 +-
.../apache/james/imap/processor/CopyProcessor.java | 10 ++
.../apache/james/imap/processor/MoveProcessor.java | 6 +
.../james/imap/processor/StoreProcessor.java | 44 +++++-
.../james/imap/processor/CopyProcessorTest.java | 5 +-
.../james/imap/processor/MoveProcessorTest.java | 5 +-
.../jmap/rfc8621/contract/WebSocketContract.scala | 154 +++++++++++++++++++
.../jmap/method/EmailSetUpdatePerformer.scala | 16 +-
12 files changed, 446 insertions(+), 18 deletions(-)
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index ec9333e381..aedc0fb3d5 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -358,6 +358,16 @@ public interface MailboxManager extends RequestAware,
RightManager, MailboxAnnot
.flatMapIterable(Function.identity());
}
+ default Publisher<MessageRange> moveMessagesReactive(List<MessageRange>
sets, MailboxId from, MailboxId to, MailboxSession session) {
+ return Flux.fromIterable(sets)
+ .concatMap(set -> moveMessagesReactive(set, from, to, session));
+ }
+
+ default Publisher<MessageRange> copyMessagesReactive(List<MessageRange>
sets, MailboxId from, MailboxId to, MailboxSession session) {
+ return Flux.fromIterable(sets)
+ .concatMap(set -> copyMessagesReactive(set, from, to, session));
+ }
+
enum MailboxSearchFetchType {
Minimal,
Counters
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
index c87729aed6..e2cf98fb2d 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
@@ -163,6 +163,13 @@ public interface MessageManager {
return Mono.fromCallable(() -> setFlags(flags, flagsUpdateMode, set,
mailboxSession));
}
+ default Publisher<Map<MessageUid, Flags>> setFlagsReactive(Flags flags,
FlagsUpdateMode flagsUpdateMode, List<MessageRange> sets, MailboxSession
mailboxSession) {
+ return Flux.fromIterable(sets)
+ .concatMap(set -> Mono.from(setFlagsReactive(flags,
flagsUpdateMode, set, mailboxSession)))
+ .flatMapIterable(Map::entrySet)
+ .collectMap(Map.Entry::getKey, Map.Entry::getValue);
+ }
+
class AppendResult {
private final ComposedMessageId id;
private final Long size;
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index ba95f9a5e5..725b628807 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -818,6 +818,28 @@ public class StoreMailboxManager implements MailboxManager
{
});
}
+ @Override
+ public Flux<MessageRange> moveMessagesReactive(List<MessageRange> sets,
MailboxId from, MailboxId to, MailboxSession session) {
+ return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
+ .flatMapMany(fromTo -> {
+ StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
+ StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
+
+ return fromMessageManager.moveTo(sets, toMessageManager,
session);
+ });
+ }
+
+ @Override
+ public Flux<MessageRange> copyMessagesReactive(List<MessageRange> sets,
MailboxId from, MailboxId to, MailboxSession session) {
+ return Mono.zip(Mono.from(getMailboxReactive(from, session)),
Mono.from(getMailboxReactive(to, session)))
+ .flatMapMany(fromTo -> {
+ StoreMessageManager fromMessageManager = (StoreMessageManager)
fromTo.getT1();
+ StoreMessageManager toMessageManager = (StoreMessageManager)
fromTo.getT2();
+
+ return fromMessageManager.copyTo(sets, toMessageManager,
session);
+ });
+ }
+
@Override
public Flux<MailboxMetaData> search(MailboxQuery expression,
MailboxSearchFetchType fetchType, MailboxSession session) {
Mono<List<Mailbox>> mailboxesMono = searchMailboxes(expression,
session, Right.Lookup).collectList();
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 283cce7167..1db6cc08a3 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -764,6 +764,36 @@ public class StoreMessageManager implements MessageManager
{
});
}
+ @Override
+ public Publisher<Map<MessageUid, Flags>> setFlagsReactive(Flags flags,
FlagsUpdateMode flagsUpdateMode, List<MessageRange> sets, MailboxSession
mailboxSession) {
+ return ensureFlagsWrite(flags, flagsUpdateMode, mailboxSession)
+ .map(Mono::<Map<MessageUid, Flags>>error)
+ .orElseGet(() -> {
+ trimFlags(flags, mailboxSession);
+ MessageMapper messageMapper =
mapperFactory.getMessageMapper(mailboxSession);
+ FlagsUpdateCalculator calculator = new
FlagsUpdateCalculator(flags, flagsUpdateMode);
+
+ return Flux.fromIterable(sets)
+ .concatMap(set ->
messageMapper.executeReactive(messageMapper.updateFlagsReactive(getMailboxEntity(),
calculator, set)))
+ .collectList()
+ .flatMap(allUpdatedFlagsPerRange -> {
+ ImmutableList<UpdatedFlags> allUpdatedFlags =
allUpdatedFlagsPerRange.stream()
+ .flatMap(List::stream)
+ .collect(ImmutableList.toImmutableList());
+ return eventBus.dispatch(EventFactory.flagsUpdated()
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(getMailboxEntity())
+ .updatedFlags(allUpdatedFlags)
+ .build(),
+ new
MailboxIdRegistrationKey(mailbox.getMailboxId()))
+
.thenReturn(allUpdatedFlags.stream().collect(ImmutableMap.toImmutableMap(
+ UpdatedFlags::getUid,
+ UpdatedFlags::getNewFlags)));
+ });
+ });
+ }
+
/**
* Copy the {@link MessageRange} to the {@link StoreMessageManager}
*/
@@ -781,6 +811,66 @@ public class StoreMessageManager implements MessageManager
{
MailboxPathLocker.LockType.Write));
}
+ public Flux<MessageRange> copyTo(List<MessageRange> sets,
StoreMessageManager toMailbox, MailboxSession session) {
+ if (!toMailbox.isWriteable(session)) {
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ }
+ if (!storeRightManager.myRights(toMailbox.mailbox,
session).contains(MailboxACL.Right.Insert)) {
+ return Flux.error(new InsufficientRightsException("Append messages
requires 'i' right"));
+ }
+ return
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+ copyAll(sets, toMailbox, session)
+ .flatMapIterable(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
+ MailboxPathLocker.LockType.Write));
+ }
+
+ private Mono<SortedMap<MessageUid, MessageMetaData>>
copyAll(List<MessageRange> sets, StoreMessageManager to, MailboxSession
session) {
+ return Flux.fromIterable(sets)
+ .concatMap(set -> retrieveOriginalRows(set, session))
+ .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+ .concatMap(window -> window.collectList()
+ .flatMap(originalRows -> to.copy(originalRows,
session).collectList()
+ .map(copyResult -> Pair.of(
+ collectMetadata(copyResult.iterator()),
+ originalRows.stream()
+
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+ .collect(ImmutableList.toImmutableList())))))
+ .collectList()
+ .flatMap(allResults -> {
+ if (allResults.isEmpty()) {
+ return Mono.just(ImmutableSortedMap.of());
+ }
+ SortedMap<MessageUid, MessageMetaData> allCopiedUids = new
TreeMap<>();
+ List<MessageId> allMessageIds = new ArrayList<>();
+ for (Pair<SortedMap<MessageUid, MessageMetaData>,
ImmutableList<MessageId>> result : allResults) {
+ allCopiedUids.putAll(result.getLeft());
+ allMessageIds.addAll(result.getRight());
+ }
+ MessageMoves messageMoves = MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+ .targetMailboxIds(to.getMailboxEntity().getMailboxId(),
getMailboxEntity().getMailboxId())
+ .build();
+ EventBus.EventWithRegistrationKey added = new
EventBus.EventWithRegistrationKey(
+ EventFactory.added()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(to.getMailboxEntity())
+ .metaData(allCopiedUids)
+ .isDelivery(!IS_DELIVERY)
+ .isAppended(!IS_APPENDED)
+ .build(),
+ ImmutableSet.of(new
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())));
+ EventBus.EventWithRegistrationKey moved = new
EventBus.EventWithRegistrationKey(
+ EventFactory.moved()
+ .messageMoves(messageMoves)
+ .messageId(allMessageIds)
+ .session(session)
+ .build(),
+
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet()));
+ return Mono.from(eventBus.dispatch(ImmutableList.of(added,
moved))).thenReturn(allCopiedUids);
+ });
+ }
+
/**
* Move the {@link MessageRange} to the {@link StoreMessageManager}
*/
@@ -804,6 +894,86 @@ public class StoreMessageManager implements MessageManager
{
MailboxPathLocker.LockType.Write));
}
+ public Flux<MessageRange> moveTo(List<MessageRange> sets,
StoreMessageManager toMailbox, MailboxSession session) {
+ if (!isWriteable(session)) {
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ }
+ if (!storeRightManager.myRights(mailbox,
session).contains(MailboxACL.Right.PerformExpunge)) {
+ return Flux.error(new InsufficientRightsException("Deleting
messages requires 'e' right"));
+ }
+ if (!toMailbox.isWriteable(session)) {
+ return Flux.error(new
ReadOnlyException(toMailbox.getMailboxPath()));
+ }
+ if (!storeRightManager.myRights(toMailbox.mailbox,
session).contains(MailboxACL.Right.Insert)) {
+ return Flux.error(new InsufficientRightsException("Append messages
requires 'i' right"));
+ }
+ return
Flux.from(locker.executeReactiveWithLockReactive(toMailbox.getMailboxPath(),
+ moveAll(sets, toMailbox, session)
+ .flatMapIterable(map -> MessageRange.toRanges(new
ArrayList<>(map.keySet()))),
+ MailboxPathLocker.LockType.Write));
+ }
+
+ private Mono<SortedMap<MessageUid, MessageMetaData>>
moveAll(List<MessageRange> sets, StoreMessageManager to, MailboxSession
session) {
+ return Flux.fromIterable(sets)
+ .concatMap(set -> retrieveOriginalRows(set, session))
+ .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE))
+ .concatMap(window -> window
+ .collectList()
+ .flatMap(originalRows -> to.move(originalRows, session)
+ .map(moveResult -> Pair.of(moveResult, originalRows))))
+ .collectList()
+ .flatMap(allResults -> {
+ if (allResults.isEmpty()) {
+ return Mono.just(ImmutableSortedMap.of());
+ }
+
+ SortedMap<MessageUid, MessageMetaData> allMoveUids = new
TreeMap<>();
+ List<MessageMetaData> allOriginalMessages = new ArrayList<>();
+ List<MessageId> allMessageIds = new ArrayList<>();
+
+ for (Pair<MoveResult, List<MailboxMessage>> result :
allResults) {
+
allMoveUids.putAll(collectMetadata(result.getLeft().getMovedMessages().iterator()));
+
allOriginalMessages.addAll(result.getLeft().getOriginalMessages());
+ result.getRight().stream()
+
.map(org.apache.james.mailbox.store.mail.model.Message::getMessageId)
+ .forEach(allMessageIds::add);
+ }
+
+ MessageMoves messageMoves = MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+ .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+ .build();
+
+ EventBus.EventWithRegistrationKey added = new
EventBus.EventWithRegistrationKey(EventFactory.added()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(to.getMailboxEntity())
+ .metaData(allMoveUids)
+ .isDelivery(!IS_DELIVERY)
+ .isAppended(!IS_APPENDED)
+ .movedFrom(getId())
+ .build(),
+ ImmutableSet.of(new
MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())));
+ EventBus.EventWithRegistrationKey expunged = new
EventBus.EventWithRegistrationKey(EventFactory.expunged()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(getMailboxEntity())
+ .addMetaData(allOriginalMessages)
+ .movedTo(to.getId())
+ .build(),
+ ImmutableSet.of(new
MailboxIdRegistrationKey(mailbox.getMailboxId())));
+ EventBus.EventWithRegistrationKey moved = new
EventBus.EventWithRegistrationKey(EventFactory.moved()
+ .messageMoves(messageMoves)
+ .messageId(allMessageIds)
+ .session(session)
+ .build(),
+
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet()));
+
+ return Mono.from(eventBus.dispatch(ImmutableList.of(added,
expunged, moved)))
+ .thenReturn(allMoveUids);
+ });
+ }
+
@Override
public long getMessageCount(MailboxSession mailboxSession) throws
MailboxException {
return
mapperFactory.getMessageMapper(mailboxSession).countMessagesInMailbox(getMailboxEntity());
diff --git
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
index 0c8aa5b8e8..d933ce9468 100644
---
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
+++
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
@@ -19,6 +19,7 @@
package org.apache.james.imap.processor;
+import java.util.List;
import java.util.Objects;
import org.apache.james.core.Username;
@@ -68,6 +69,14 @@ public abstract class AbstractMessageRangeProcessor<R
extends AbstractMessageRan
MailboxSession
mailboxSession,
MessageRange messageSet);
+ protected Flux<MessageRange> processAll(MailboxId targetMailbox,
+ SelectedMailbox currentMailbox,
+ MailboxSession mailboxSession,
+ List<MessageRange> messageSets) {
+ return Flux.fromIterable(messageSets)
+ .concatMap(set -> process(targetMailbox, currentMailbox,
mailboxSession, set));
+ }
+
protected abstract String getOperationName();
@Override
@@ -120,7 +129,8 @@ public abstract class AbstractMessageRangeProcessor<R
extends AbstractMessageRan
.orElseThrow(() -> new
MessageRangeException(range.getFormattedString() + " is an invalid range")))
.sneakyThrow())
.filter(Objects::nonNull)
- .concatMap(range -> process(target.getId(),
session.getSelected(), mailboxSession, range)
+ .collectList()
+ .flatMapMany(ranges -> processAll(target.getId(),
session.getSelected(), mailboxSession, ranges)
.doOnEach(ReactorUtils.logFinally(() ->
AuditTrail.entry()
.username(() ->
mailboxSession.getUser().asString())
.sessionId(() ->
session.sessionId().asString())
@@ -128,8 +138,7 @@ public abstract class AbstractMessageRangeProcessor<R
extends AbstractMessageRan
.action(getOperationName())
.parameters(() ->
ImmutableMap.of("loggedInUser",
mailboxSession.getLoggedInUser().map(Username::asString).orElse(""),
"targetId", target.getId().serialize(),
- "selectedMailboxId",
session.getSelected().getMailboxId().serialize(),
- "range", range.getUidFrom().asLong() + ":"
+ range.getUidTo().asLong()))
+ "selectedMailboxId",
session.getSelected().getMailboxId().serialize()))
.log("IMAP " + getOperationName() + "
succeeded.")))
.map(IdRange::from))
.collect(ImmutableList.<IdRange>toImmutableList())
diff --git
a/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
b/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
index bde419a541..cae14215cd 100644
---
a/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
+++
b/protocols/imap/src/main/java/org/apache/james/imap/processor/CopyProcessor.java
@@ -19,6 +19,8 @@
package org.apache.james.imap.processor;
+import java.util.List;
+
import jakarta.inject.Inject;
import org.apache.james.imap.api.message.IdRange;
@@ -56,6 +58,14 @@ public class CopyProcessor extends
AbstractMessageRangeProcessor<CopyRequest> {
return Flux.from(getMailboxManager().copyMessagesReactive(messageSet,
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
}
+ @Override
+ protected Flux<MessageRange> processAll(MailboxId targetMailbox,
+ SelectedMailbox currentMailbox,
+ MailboxSession mailboxSession,
+ List<MessageRange> messageSets) {
+ return Flux.from(getMailboxManager().copyMessagesReactive(messageSets,
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
+ }
+
@Override
protected MDCBuilder mdc(CopyRequest request) {
return MDCBuilder.create()
diff --git
a/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
b/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
index 6b66e9f937..5a5788c01b 100644
---
a/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
+++
b/protocols/imap/src/main/java/org/apache/james/imap/processor/MoveProcessor.java
@@ -59,6 +59,12 @@ public class MoveProcessor extends
AbstractMessageRangeProcessor<MoveRequest> im
return Flux.from(getMailboxManager().moveMessagesReactive(messageSet,
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
}
+ @Override
+ protected Flux<MessageRange> processAll(MailboxId targetMailbox,
SelectedMailbox currentMailbox,
+ MailboxSession mailboxSession,
List<MessageRange> messageSets) {
+ return Flux.from(getMailboxManager().moveMessagesReactive(messageSets,
currentMailbox.getMailboxId(), targetMailbox, mailboxSession));
+ }
+
@Override
protected String getOperationName() {
return "Move";
diff --git
a/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
b/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
index 9a08669b06..1d136cacaa 100644
---
a/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
+++
b/protocols/imap/src/main/java/org/apache/james/imap/processor/StoreProcessor.java
@@ -98,8 +98,15 @@ public class StoreProcessor extends
AbstractMailboxProcessor<StoreRequest> {
.map(Throwing.<IdRange, MessageRange>function(idRange ->
messageRange(selected, idRange, request.isUseUids())
.orElseThrow(() -> new
MessageRangeException(idRange.getFormattedString() + " is an invalid range")))
.sneakyThrow())
- .concatMap(messageSet -> handleRange(request, session,
responder, selected, mailbox, mailboxSession, failed, failedMsns, userFlags,
messageSet))
- .then())
+ .collectList()
+ .flatMap(messageSets -> {
+ if (request.getUnchangedSince() != -1) {
+ return Flux.fromIterable(messageSets)
+ .concatMap(messageSet -> handleRange(request,
session, responder, selected, mailbox, mailboxSession, failed, failedMsns,
userFlags, messageSet))
+ .then();
+ }
+ return setFlagsAll(request, session, responder, selected,
mailbox, mailboxSession, messageSets);
+ }))
.then(unsolicitedResponses(session, responder, omitExpunged,
request.isUseUids()))
.doOnSuccess(any -> {
// check if we had some failed uids which didn't pass the
UNCHANGEDSINCE filter
@@ -214,18 +221,47 @@ public class StoreProcessor extends
AbstractMailboxProcessor<StoreRequest> {
}
/**
- * Set the flags for given messages
+ * Set the flags for given messages (single range)
*/
private Mono<Void> setFlags(StoreRequest request, MailboxSession
mailboxSession, MessageManager mailbox, MessageRange messageSet, ImapSession
session, Responder responder) {
boolean silent = request.isSilent();
long unchangedSince = request.getUnchangedSince();
-
+
SelectedMailbox selected = session.getSelected();
return Mono.from(mailbox.setFlagsReactive(request.getFlags(),
request.getFlagsUpdateMode(), messageSet, mailboxSession))
.doOnNext(flagsByUid -> handlePermanentFlagChanges(mailboxSession,
mailbox, responder, selected))
.flatMap(flagsByUid -> handleCondstore(request, mailboxSession,
mailbox, messageSet, session, responder, silent, unchangedSince, selected,
flagsByUid));
}
+ /**
+ * Set the flags for all message ranges in a single batched operation,
dispatching a single event.
+ * Only used when UNCHANGEDSINCE == -1 (no CONDSTORE filtering needed).
+ */
+ private Mono<Void> setFlagsAll(StoreRequest request, ImapSession session,
Responder responder,
+ SelectedMailbox selected, MessageManager
mailbox,
+ MailboxSession mailboxSession,
List<MessageRange> messageSets) {
+ boolean silent = request.isSilent();
+ Set<Capability> enabled =
EnableProcessor.getEnabledCapabilities(session);
+ boolean qresyncEnabled =
enabled.contains(ImapConstants.SUPPORTS_QRESYNC);
+ boolean condstoreEnabled =
enabled.contains(ImapConstants.SUPPORTS_CONDSTORE);
+
+ return Mono.from(mailbox.setFlagsReactive(request.getFlags(),
request.getFlagsUpdateMode(), messageSets, mailboxSession))
+ .doOnNext(flagsByUid -> handlePermanentFlagChanges(mailboxSession,
mailbox, responder, selected))
+ .flatMap(flagsByUid -> {
+ if (!silent || qresyncEnabled || condstoreEnabled) {
+ Mono<Map<MessageUid, ModSeq>> modSeqsMono =
(qresyncEnabled || condstoreEnabled)
+ ? Flux.fromIterable(messageSets)
+ .concatMap(set ->
Flux.from(mailbox.listMessagesMetadata(set, mailboxSession)))
+ .collectMap(r ->
r.getComposedMessageId().getUid(), ComposedMessageIdWithMetaData::getModSeq)
+ : Mono.just(ImmutableMap.of());
+ return modSeqsMono
+ .doOnNext(modSeqs -> sendFetchResponses(responder,
request.isUseUids(), silent, -1L, selected, flagsByUid, qresyncEnabled,
condstoreEnabled, modSeqs))
+ .then();
+ }
+ return Mono.empty();
+ });
+ }
+
private Mono<Void> handleCondstore(StoreRequest request, MailboxSession
mailboxSession, MessageManager mailbox, MessageRange messageSet, ImapSession
session, Responder responder, boolean silent, long unchangedSince,
SelectedMailbox selected, Map<MessageUid, Flags> flagsByUid) {
Set<Capability> enabled =
EnableProcessor.getEnabledCapabilities(session);
boolean qresyncEnabled =
enabled.contains(ImapConstants.SUPPORTS_QRESYNC);
diff --git
a/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
b/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
index 66a18ba397..67e467bca5 100644
---
a/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
+++
b/protocols/imap/src/test/java/org/apache/james/imap/processor/CopyProcessorTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.Optional;
import org.apache.james.core.Username;
@@ -112,7 +113,7 @@ class CopyProcessorTest {
when(targetMessageManager.getMailboxEntity()).thenReturn(mailbox);
StatusResponse okResponse = mock(StatusResponse.class);
when(mockStatusResponseFactory.taggedOk(any(Tag.class),
any(ImapCommand.class), any(HumanReadableText.class),
any(StatusResponse.ResponseCode.class))).thenReturn(okResponse);
-
when(mockMailboxManager.copyMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession)))
+
when(mockMailboxManager.copyMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession)))
.thenReturn(Flux.just(MessageRange.range(MessageUid.of(4),
MessageUid.of(6))));
testee.process(copyRequest, mockResponder, imapSession);
@@ -120,7 +121,7 @@ class CopyProcessorTest {
verify(mockMailboxManager).manageProcessing(any(), any());
verify(mockMailboxManager).mailboxExists(INBOX, mailboxSession);
verify(mockMailboxManager).getMailboxReactive(any(MailboxPath.class),
any(MailboxSession.class));
-
verify(mockMailboxManager).copyMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession));
+
verify(mockMailboxManager).copyMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession));
verify(targetMessageManager).getMailboxEntity();
verify(mockResponder).respond(okResponse);
}
diff --git
a/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
b/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
index 9e7f4cf9f3..c883858cca 100644
---
a/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
+++
b/protocols/imap/src/test/java/org/apache/james/imap/processor/MoveProcessorTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.Optional;
import org.apache.james.core.Username;
@@ -126,7 +127,7 @@ public class MoveProcessorTest {
when(targetMessageManager.getMailboxEntity()).thenReturn(mailbox);
StatusResponse okResponse = mock(StatusResponse.class);
when(mockStatusResponseFactory.taggedOk(any(Tag.class),
any(ImapCommand.class), any(HumanReadableText.class),
any(StatusResponse.ResponseCode.class))).thenReturn(okResponse);
-
when(mockMailboxManager.moveMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession)))
+
when(mockMailboxManager.moveMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession)))
.thenReturn(Flux.just(MessageRange.range(MessageUid.of(4),
MessageUid.of(6))));
testee.process(moveRequest, mockResponder, imapSession);
@@ -135,7 +136,7 @@ public class MoveProcessorTest {
verify(mockMailboxManager).manageProcessing(any(), any());
verify(mockMailboxManager).mailboxExists(INBOX, mailboxSession);
verify(mockMailboxManager).getMailboxReactive(INBOX, mailboxSession);
-
verify(mockMailboxManager).moveMessagesReactive(eq(MessageRange.range(MessageUid.of(4),
MessageUid.of(6))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession));
+
verify(mockMailboxManager).moveMessagesReactive(eq(List.of(MessageRange.range(MessageUid.of(4),
MessageUid.of(6)))), any(MailboxId.class), any(MailboxId.class),
eq(mailboxSession));
verify(targetMessageManager).getMailboxEntity();
verify(mockResponder).respond(okResponse);
}
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index fe4ee4e126..7de3da9854 100644
---
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -580,6 +580,160 @@ trait WebSocketContract {
.contains(stateChange)
}
+ @Test
+ @Timeout(180)
+ def bulkMoveWithNonContiguousUidsShouldGenerateSingleStateChange(server:
GuiceJamesServer): Unit = {
+ // Moving N emails with non-contiguous UIDs (gap in the UID sequence)
should
+ // still produce only ONE state change, not one per UID range.
+ val bobPath = MailboxPath.inbox(BOB)
+ val accountId: AccountId = AccountId.fromUsername(BOB)
+ val mailboxProbe = server.getProbe(classOf[MailboxProbeImpl])
+ val mailboxId = mailboxProbe.createMailbox(bobPath)
+ val mailboxId2 = mailboxProbe.createMailbox(MailboxPath.forUser(BOB,
"destination"))
+
+ // Append 5 messages (UIDs will be 1, 2, 3, 4, 5)
+ val message1 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test1").setBody("body1",
StandardCharsets.UTF_8).build()))
+ val message2 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test2").setBody("body2",
StandardCharsets.UTF_8).build()))
+ mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test3").setBody("body3",
StandardCharsets.UTF_8).build())) // UID 3 intentionally NOT moved
+ val message4 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test4").setBody("body4",
StandardCharsets.UTF_8).build()))
+ val message5 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test5").setBody("body5",
StandardCharsets.UTF_8).build()))
+
+ // Move messages 1, 2, 4, 5 — skipping UID 3 — producing 2 UID ranges:
[1,2] and [4,5]
+ val messageId1 = message1.getMessageId.serialize()
+ val messageId2 = message2.getMessageId.serialize()
+ val messageId4 = message4.getMessageId.serialize()
+ val messageId5 = message5.getMessageId.serialize()
+
+ Thread.sleep(100)
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": ["Mailbox", "Email"]
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ ws.send(WebSocketFrame.text(
+ s"""{
+ | "@type": "Request",
+ | "id": "req-bulk-move",
+ | "using": ["urn:ietf:params:jmap:core",
"urn:ietf:params:jmap:mail"],
+ | "methodCalls": [
+ | ["Email/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "update": {
+ | "$messageId1": {"mailboxIds":
{"${mailboxId2.serialize}": true}},
+ | "$messageId2": {"mailboxIds":
{"${mailboxId2.serialize}": true}},
+ | "$messageId4": {"mailboxIds":
{"${mailboxId2.serialize}": true}},
+ | "$messageId5": {"mailboxIds":
{"${mailboxId2.serialize}": true}}
+ | }
+ | }, "c1"]]
+ |}""".stripMargin))
+
+ // Expect: 1 API response + 1 state change (not 2 state changes
for 2 UID ranges)
+ val payload1 = ws.receive().asPayload
+ val payload2 = ws.receive().asPayload
+ List(payload1, payload2)
+ })
+ .send(backend)
+ .body
+
+ Thread.sleep(1000)
+
+ val jmapGuiceProbe: JmapGuiceProbe =
server.getProbe(classOf[JmapGuiceProbe])
+ val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
+ val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
+
+ val globalState: String =
PushState.fromOption(Some(UuidState.fromJava(mailboxState)),
Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String =
s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+
+ assertThat(response.toOption.get.asJava)
+ .hasSize(2) // 1 state change notification + 1 API response (not 2 state
changes)
+ .contains(stateChange)
+ }
+
+ @Test
+ @Timeout(180)
+ def
bulkFlagUpdateWithNonContiguousUidsShouldGenerateSingleStateChange(server:
GuiceJamesServer): Unit = {
+ // Flagging N emails with non-contiguous UIDs (gap in the UID sequence)
should
+ // still produce only ONE state change, not one per UID range.
+ // The optimization path (updateFlagsByRange) is triggered when sameUpdate
&& singleMailbox && size > RANGE_THRESHOLD (3).
+ val bobPath = MailboxPath.inbox(BOB)
+ val accountId: AccountId = AccountId.fromUsername(BOB)
+ val mailboxProbe = server.getProbe(classOf[MailboxProbeImpl])
+ mailboxProbe.createMailbox(bobPath)
+
+ // Append 5 messages (UIDs will be 1, 2, 3, 4, 5)
+ val message1 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test1").setBody("body1",
StandardCharsets.UTF_8).build()))
+ val message2 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test2").setBody("body2",
StandardCharsets.UTF_8).build()))
+ mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test3").setBody("body3",
StandardCharsets.UTF_8).build())) // UID 3 intentionally NOT flagged
+ val message4 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test4").setBody("body4",
StandardCharsets.UTF_8).build()))
+ val message5 = mailboxProbe.appendMessage(BOB.asString(), bobPath,
AppendCommand.from(Message.Builder.of().setSubject("test5").setBody("body5",
StandardCharsets.UTF_8).build()))
+
+ // Flag messages 1, 2, 4, 5 — skipping UID 3 — producing 2 UID ranges:
[1,2] and [4,5]
+ val messageId1 = message1.getMessageId.serialize()
+ val messageId2 = message2.getMessageId.serialize()
+ val messageId4 = message4.getMessageId.serialize()
+ val messageId5 = message5.getMessageId.serialize()
+
+ Thread.sleep(100)
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": ["Mailbox", "Email"]
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ ws.send(WebSocketFrame.text(
+ s"""{
+ | "@type": "Request",
+ | "id": "req-bulk-flag",
+ | "using": ["urn:ietf:params:jmap:core",
"urn:ietf:params:jmap:mail"],
+ | "methodCalls": [
+ | ["Email/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "update": {
+ | "$messageId1": {"keywords/$$seen": true},
+ | "$messageId2": {"keywords/$$seen": true},
+ | "$messageId4": {"keywords/$$seen": true},
+ | "$messageId5": {"keywords/$$seen": true}
+ | }
+ | }, "c1"]]
+ |}""".stripMargin))
+
+ // Expect: 1 API response + 1 state change (not 2 state changes
for 2 UID ranges)
+ val payload1 = ws.receive().asPayload
+ val payload2 = ws.receive().asPayload
+ List(payload1, payload2)
+ })
+ .send(backend)
+ .body
+
+ Thread.sleep(1000)
+
+ val jmapGuiceProbe: JmapGuiceProbe =
server.getProbe(classOf[JmapGuiceProbe])
+ val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
+ val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
+
+ val globalState: String =
PushState.fromOption(Some(UuidState.fromJava(mailboxState)),
Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String =
s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+
+ assertThat(response.toOption.get.asJava)
+ .hasSize(2) // 1 state change notification + 1 API response (not 2 state
changes)
+ .contains(stateChange)
+ }
+
@Test
@Timeout(180)
def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index a4d0c21df8..8ace8e1f95 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -159,11 +159,11 @@ class EmailSetUpdatePerformer @Inject() (serializer:
EmailSetSerializer,
metaData: Map[MessageId,
Iterable[ComposedMessageIdWithMetaData]],
updateMode: FlagsUpdateMode,
session: MailboxSession):
SMono[Seq[EmailUpdateResult]] = {
- val mailboxMono: SMono[MessageManager] =
SMono(mailboxManager.getMailboxReactive(mailboxId, session))
-
- mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
-
- range => SMono(mailbox.setFlagsReactive(flags, updateMode, range,
session)).`then`()))
+ val messageIds = metaData.keys.toSeq
+ SMono(mailboxManager.getMailboxReactive(mailboxId, session))
+ .flatMap(mailbox => SMono(mailbox.setFlagsReactive(flags, updateMode,
ranges.asJava, session)).`then`())
+ .`then`(SMono.just(messageIds.map(EmailUpdateSuccess)))
+ .onErrorResume(e => SMono.just(messageIds.map(id =>
EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
}
private def moveByRange(mailboxId: MailboxId,
@@ -172,9 +172,11 @@ class EmailSetUpdatePerformer @Inject() (serializer:
EmailSetSerializer,
metaData: Map[MessageId,
Iterable[ComposedMessageIdWithMetaData]],
session: MailboxSession):
SMono[Seq[EmailUpdateResult]] = {
val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
+ val messageIds = metaData.keys.toSeq
- updateByRange(ranges, metaData,
- range => SMono(mailboxManager.moveMessagesReactive(range, mailboxId,
targetId, session)).`then`())
+ SMono(mailboxManager.moveMessagesReactive(ranges.asJava, mailboxId,
targetId, session)).`then`()
+ .`then`(SMono.just(messageIds.map(EmailUpdateSuccess)))
+ .onErrorResume(e => SMono.just(messageIds.map(id =>
EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
}
private def updateByRange(ranges: List[MessageRange],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]