This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.9.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 395171def2a01c89ea8d0cd406b79594a772c4dc Author: Benoit TELLIER <[email protected]> AuthorDate: Mon Nov 10 11:31:24 2025 +0100 JAMES-3728 Implement MessageIdManager::updateEmail This method bundles together move and flag update firing a single event for both operations. This prevents a data race from arising in listeners when the two operations are sequentially done, causing flags updates to be dropped. --- .../org/apache/james/mailbox/MessageIdManager.java | 2 + .../james/mailbox/store/StoreMessageIdManager.java | 122 ++++++++++++--------- 2 files changed, 71 insertions(+), 53 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index 4605a0fcbd..d682448910 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -76,6 +76,8 @@ public interface MessageIdManager { Publisher<Void> setInMailboxesReactive(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession); + Publisher<Void> updateEmail(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, FlagsUpdateMode replace, MailboxSession mailboxSession); + default List<MessageResult> getMessage(MessageId messageId, FetchGroup fetchGroup, MailboxSession mailboxSession) throws MailboxException { return getMessages(ImmutableList.of(messageId), fetchGroup, mailboxSession); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 6a7187e15c..3c5d4ef76e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -38,6 +38,7 @@ import jakarta.mail.Flags; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.events.EventBus; +import org.apache.james.events.EventBus.EventWithRegistrationKey; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageIdManager; import org.apache.james.mailbox.MessageManager; @@ -94,6 +95,8 @@ import reactor.core.scheduler.Schedulers; public class StoreMessageIdManager implements MessageIdManager { + public static final int SET_FLAGS_CONCURRENCY = 4; + public static ImmutableSet<MailboxId> toMailboxIds(List<MailboxMessage> mailboxMessages) { return mailboxMessages .stream() @@ -129,21 +132,22 @@ public class StoreMessageIdManager implements MessageIdManager { @Override public Mono<Void> setFlagsReactive(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) { - MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); - MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); - - int concurrency = 4; + return setFlagsReactiveWithoutEventPublishing(newState, replace, messageId, mailboxIds, mailboxSession) + .collectList() + .flatMap(eventBus::dispatch); + } + private Flux<EventWithRegistrationKey> setFlagsReactiveWithoutEventPublishing(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) { return Flux.fromIterable(mailboxIds) - .flatMap(mailboxMapper::findMailboxById, concurrency) + .flatMap(mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)::findMailboxById, SET_FLAGS_CONCURRENCY) .collect(ImmutableList.toImmutableList()) - .flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> { + .flatMapMany(Throwing.<List<Mailbox>, Flux<EventWithRegistrationKey>>function(targetMailboxes -> { assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write); - return messageIdMapper.setFlags(messageId, mailboxIds, newState, replace) + return mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession) + .setFlags(messageId, mailboxIds, newState, replace) .flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet()) - .concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes)) - .then(); + .concatMap(entry -> flagsChangedEvent(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes)); }).sneakyThrow()); } @@ -321,44 +325,60 @@ public class StoreMessageIdManager implements MessageIdManager { @Override public Mono<Void> setInMailboxesReactive(MessageId messageId, Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) { + return setInMailboxesWithoutEventPublishing(messageId, targetMailboxIds, mailboxSession) + .collectList() + .flatMap(eventBus::dispatch); + } + + private Flux<EventWithRegistrationKey> setInMailboxesWithoutEventPublishing(MessageId messageId, Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) { return findRelatedMailboxMessages(messageId, mailboxSession) - .flatMap(currentMailboxMessages -> messageMovesWithMailbox(MessageMoves.builder() + .flatMapMany(currentMailboxMessages -> messageMovesWithMailbox(MessageMoves.builder() .targetMailboxIds(targetMailboxIds) .previousMailboxIds(toMailboxIds(currentMailboxMessages)) .build(), mailboxSession) - .flatMap(Throwing.<MessageMovesWithMailbox, Mono<Void>>function(messageMove -> { + .flatMapMany(Throwing.<MessageMovesWithMailbox, Flux<EventWithRegistrationKey>>function(messageMove -> { MessageMovesWithMailbox refined = messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read)); if (messageMove.getPreviousMailboxes().isEmpty()) { LOGGER.info("Tried to access {} not accessible for {}", messageId, mailboxSession.getUser().asString()); - return Mono.empty(); + return Flux.empty(); } if (refined.getPreviousMailboxes().isEmpty()) { MailboxPath unreadablePreviousMailbox = messageMove.getPreviousMailboxes().iterator().next().generateAssociatedPath(); - return Mono.error(() -> new MailboxNotFoundException(unreadablePreviousMailbox)); + return Flux.error(() -> new MailboxNotFoundException(unreadablePreviousMailbox)); } if (refined.isChange()) { return applyMessageMoves(mailboxSession, currentMailboxMessages, refined); } - return Mono.empty(); + return Flux.empty(); }).sneakyThrow())); } + @Override + public Publisher<Void> updateEmail(MessageId messageId, List<MailboxId> targetMailboxIds, Flags newState, MessageManager.FlagsUpdateMode replace, MailboxSession mailboxSession) { + return Flux.concat( + setInMailboxesWithoutEventPublishing(messageId, targetMailboxIds, mailboxSession), + setFlagsReactiveWithoutEventPublishing(newState, replace, messageId, targetMailboxIds, mailboxSession)) + .collectList() + .flatMap(eventBus::dispatch); + } + public void setInMailboxesNoCheck(MessageId messageId, MailboxId targetMailboxId, MailboxSession mailboxSession) throws MailboxException { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); List<MailboxMessage> currentMailboxMessages = messageIdMapper.find(ImmutableList.of(messageId), MessageMapper.FetchType.METADATA); - MailboxReactorUtils.block(messageMovesWithMailbox(MessageMoves.builder() .targetMailboxIds(targetMailboxId) .previousMailboxIds(toMailboxIds(currentMailboxMessages)) .build(), mailboxSession) - .flatMap(messageMove -> { + .flatMapMany(messageMove -> { if (messageMove.isChange()) { return applyMessageMoveNoMailboxChecks(mailboxSession, currentMailboxMessages, messageMove); } - return Mono.empty(); - })); + return Flux.empty(); + }) + .collectList() + .flatMap(eventBus::dispatch)); } private Mono<List<MailboxMessage>> findRelatedMailboxMessages(MessageId messageId, MailboxSession mailboxSession) { @@ -368,7 +388,7 @@ public class StoreMessageIdManager implements MessageIdManager { .collect(ImmutableList.toImmutableList()); } - private Mono<Void> applyMessageMoves(MailboxSession mailboxSession, List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox messageMoves) throws MailboxNotFoundException { + private Flux<EventWithRegistrationKey> applyMessageMoves(MailboxSession mailboxSession, List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox messageMoves) throws MailboxNotFoundException { assertRightsOnMailboxes(messageMoves.addedMailboxes(), mailboxSession, Right.Insert); assertRightsOnMailboxes(messageMoves.removedMailboxes(), mailboxSession, Right.DeleteMessages); assertRightsOnMailboxes(messageMoves.getTargetMailboxes(), mailboxSession, Right.Read); @@ -376,11 +396,11 @@ public class StoreMessageIdManager implements MessageIdManager { return applyMessageMoveNoMailboxChecks(mailboxSession, currentMailboxMessages, messageMoves); } - private Mono<Void> applyMessageMoveNoMailboxChecks(MailboxSession mailboxSession, List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox messageMoves) { + private Flux<EventWithRegistrationKey> applyMessageMoveNoMailboxChecks(MailboxSession mailboxSession, List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox messageMoves) { Optional<MailboxMessage> mailboxMessage = currentMailboxMessages.stream().findAny(); if (mailboxMessage.isEmpty()) { - return Mono.error(new MailboxNotFoundException("can't load message")); + return Flux.error(new MailboxNotFoundException("can't load message")); } List<Pair<MailboxMessage, Mailbox>> messagesToRemove = currentMailboxMessages.stream() .flatMap(message -> messageMoves.removedMailboxes() @@ -390,23 +410,21 @@ public class StoreMessageIdManager implements MessageIdManager { .collect(ImmutableList.toImmutableList()); return validateQuota(messageMoves, mailboxMessage.get()) - .then(Flux.concat( - addMessageToMailboxes(mailboxMessage.get(), messageMoves, mailboxSession), - expungeMessageFromMailboxes(mailboxMessage.get().getMessageId(), messagesToRemove, mailboxSession, messageMoves), - Flux.just(new EventBus.EventWithRegistrationKey(EventFactory.moved() - .session(mailboxSession) - .messageMoves(messageMoves.asMessageMoves()) - .messageId(mailboxMessage.get().getMessageId()) - .build(), - messageMoves.impactedMailboxes() - .map(Mailbox::getMailboxId) - .map(MailboxIdRegistrationKey::new) - .collect(ImmutableSet.toImmutableSet())))) - .collectList() - .flatMap(eventBus::dispatch)); - } - - private Flux<EventBus.EventWithRegistrationKey> expungeMessageFromMailboxes(MessageId messageId, List<Pair<MailboxMessage, Mailbox>> messages, MailboxSession mailboxSession, MessageMovesWithMailbox messageMoves) { + .thenMany(Flux.concat( + addMessageToMailboxes(mailboxMessage.get(), messageMoves, mailboxSession), + expungeMessageFromMailboxes(mailboxMessage.get().getMessageId(), messagesToRemove, mailboxSession, messageMoves), + Flux.just(new EventWithRegistrationKey(EventFactory.moved() + .session(mailboxSession) + .messageMoves(messageMoves.asMessageMoves()) + .messageId(mailboxMessage.get().getMessageId()) + .build(), + messageMoves.impactedMailboxes() + .map(Mailbox::getMailboxId) + .map(MailboxIdRegistrationKey::new) + .collect(ImmutableSet.toImmutableSet()))))); + } + + private Flux<EventWithRegistrationKey> expungeMessageFromMailboxes(MessageId messageId, List<Pair<MailboxMessage, Mailbox>> messages, MailboxSession mailboxSession, MessageMovesWithMailbox messageMoves) { if (messages.isEmpty()) { return Flux.empty(); } @@ -422,7 +440,7 @@ public class StoreMessageIdManager implements MessageIdManager { .map(message -> expungedEvent(message, mailboxSession, messageMoves))); } - private EventBus.EventWithRegistrationKey expungedEvent(Pair<MailboxMessage, Mailbox> message, MailboxSession mailboxSession, MessageMovesWithMailbox messageMoves) { + private EventWithRegistrationKey expungedEvent(Pair<MailboxMessage, Mailbox> message, MailboxSession mailboxSession, MessageMovesWithMailbox messageMoves) { EventFactory.ExpungedFinalStage.Builder eventBuilder = EventFactory.expunged() .randomEventId() .mailboxSession(mailboxSession) @@ -431,23 +449,21 @@ public class StoreMessageIdManager implements MessageIdManager { if (isSingleMove(messageMoves)) { eventBuilder.movedTo(messageMoves.addedMailboxes().iterator().next().getMailboxId()); } - return new EventBus.EventWithRegistrationKey(eventBuilder.build(), ImmutableSet.of(new MailboxIdRegistrationKey(message.getRight().getMailboxId()))); + return new EventWithRegistrationKey(eventBuilder.build(), ImmutableSet.of(new MailboxIdRegistrationKey(message.getRight().getMailboxId()))); } - private Mono<Void> dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> knownMailboxes) { + private Mono<EventWithRegistrationKey> flagsChangedEvent(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> knownMailboxes) { return knownMailboxes.stream() .filter(knownMailbox -> knownMailbox.getMailboxId().equals(mailboxId)) .findFirst() .map(Mono::just) .orElseGet(() -> mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId)) - .flatMap(mailbox -> - eventBus.dispatch(EventFactory.flagsUpdated() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailbox) - .updatedFlags(updatedFlags) - .build(), - new MailboxIdRegistrationKey(mailboxId))); + .map(mailbox -> new EventWithRegistrationKey(EventFactory.flagsUpdated() + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailbox) + .updatedFlags(updatedFlags) + .build(), ImmutableSet.of(new MailboxIdRegistrationKey(mailboxId)))); } private Mono<Void> validateQuota(MessageMovesWithMailbox messageMoves, MailboxMessage mailboxMessage) { @@ -490,11 +506,11 @@ public class StoreMessageIdManager implements MessageIdManager { } } - private Flux<EventBus.EventWithRegistrationKey> addMessageToMailboxes(MailboxMessage mailboxMessage, MessageMovesWithMailbox messageMoves, MailboxSession mailboxSession) { + private Flux<EventWithRegistrationKey> addMessageToMailboxes(MailboxMessage mailboxMessage, MessageMovesWithMailbox messageMoves, MailboxSession mailboxSession) { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); return Flux.fromIterable(messageMoves.addedMailboxes()) - .flatMap(Throwing.<Mailbox, Mono<EventBus.EventWithRegistrationKey>>function(mailbox -> { + .flatMap(Throwing.<Mailbox, Mono<EventWithRegistrationKey>>function(mailbox -> { MailboxACL.Rfc4314Rights myRights = rightManager.myRights(mailbox, mailboxSession); boolean shouldPreserveFlags = myRights.contains(Right.Write); MailboxMessage copy = mailboxMessage.copy(mailbox); @@ -513,7 +529,7 @@ public class StoreMessageIdManager implements MessageIdManager { }).sneakyThrow()); } - private EventBus.EventWithRegistrationKey addedEvent(MailboxSession mailboxSession, Mailbox mailbox, MessageMetaData messageMetaData, MessageMovesWithMailbox messageMoves) { + private EventWithRegistrationKey addedEvent(MailboxSession mailboxSession, Mailbox mailbox, MessageMetaData messageMetaData, MessageMovesWithMailbox messageMoves) { EventFactory.AddedFinalStage.Builder appended = EventFactory.added() .randomEventId() .mailboxSession(mailboxSession) @@ -524,7 +540,7 @@ public class StoreMessageIdManager implements MessageIdManager { if (isSingleMove(messageMoves)) { appended.movedFrom(messageMoves.removedMailboxes().iterator().next().getMailboxId()); } - return new EventBus.EventWithRegistrationKey(appended.build(), ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId()))); + return new EventWithRegistrationKey(appended.build(), ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId()))); } private boolean isSingleMove(MessageMovesWithMailbox messageMoves) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
